bugfix> google-cloud-platform > 投稿

5分ごとに実行されるGCP composerにAirflow DAGがあります。 DAGが実行を開始する時刻と、それが成功した実行か失敗した実行かを識別するフラグを持つBigQueryテーブルを作成したいと思います。たとえば、DAGが2020-03-23 02:30に実行され、実行が失敗した場合、BigQueryテーブルには2020-03-23 02:30の時間列と1のフラグ列があります。実行が成功した場合、テーブルには2020-03-23 02:30の時間列と0のフラグ列があります。テーブルは新しい行を追加します。

前もって感謝します

回答 4 件
  • list_dag_runs CLIを使用して、特定のdag_idのDAG実行を一覧表示できます。返される情報には、各実行の状態が含まれます。

    別のオプションは、いくつかの異なる方法でPythonコードを介して情報を取得することです。私が過去に使用したそのような1つの方法は、airflow.models.dagrun.DagRunの「find」メソッドです。

    dag_id = 'my_dag'
    dag_runs = DagRun.find(dag_id=dag_id)
    for dag_run in dag_runs:
          print(dag_run.state)
    
    

    最後に、BigQuery演算子を使用して、DAg情報をBigQueryテーブルに書き込みます。 BigQueryOperatorの使用例については、こちらをご覧ください。

  • @Enriqueによる解決策に基づいて、これが私の最終的な解決策です。

    def status_check(**kwargs):
            dag_id = 'dag_id'
            dag_runs = DagRun.find(dag_id=dag_id)
            import pandas as pd
            import pandas_gbq
            from google.cloud import bigquery
            arr = []
            arr1 = []
            for dag_run in dag_runs:
                arr.append(dag_run.state)
                arr1.append(dag_run.execution_date)
            data1 = {'dag_status': arr, 'time': arr1}
            df = pd.DataFrame(data1)
            project_name = "project_name"
            dataset = "Dataset"
            outputBQtableName = '{}'.format(dataset)+'.dag_status_tab'
            df.to_gbq(outputBQtableName, project_id=project_name, if_exists='replace', progress_bar=False, \
                table_schema= \
                    [{'name': 'dag_status', 'type': 'STRING'}, \
                     {'name': 'time', 'type': 'TIMESTAMP'}])
            return None
    
    Dag_status = PythonOperator(
            task_id='Dag_status',
            python_callable=status_check,
        )
    
    

あなたの答え