bugfix> google-bigquery > 投稿

ユーザー定義関数(UDF)を使用するAirflow(GoogleのComposerを使用)タスクで基本的なbigquery演算子を実行しようとしています。

例はhttps://cloud.google.com/bigquery/docs/reference/standard-sql/user-defined-functions BigQueryで完全に動作します。

ただし、作曲家にアップロードすると、「関数が見つかりません:multiplyInputs ...」というメッセージが表示されます。以下のPythonスクリプトを参照してください。

BigQueryOperatorのudf_configフィールドにはリストが必要なので、UDFを1つの文字列を含むリストとして定義しました-UDFとして登録されていないことが明らかであるため、これが正しいかどうかはわかりません

どんな助けも大歓迎です。

import datetime
from airflow import models
from airflow.contrib.operators import bigquery_operator
yesterday = datetime.datetime.combine(datetime.datetime.today() -             
datetime.timedelta(1),
                                  datetime.datetime.min.time())
default_dag_args = {
                # Setting start date as yesterday starts the DAG 
immediately when it is
                # detected in the Cloud Storage bucket.
                'start_date': yesterday,
                # To email on failure or retry set 'email' arg to your 
email and enable
                # emailing here.
                'email_on_failure': False,
                'email_on_retry': False,
                # If a task fails, retry it once after waiting at least 
5 minutes
                'retries': 1,
                'retry_delay': datetime.timedelta(minutes=5),
                'project_id': 'vital-platform-791'
}
with models.DAG('udf_example',
                schedule_interval=datetime.timedelta(days=1),
                default_args=default_dag_args) as dag:
    table = 'udf_table'
    # flatten fe table
    task_id = table + '_fe'
    udf_config = ["""CREATE TEMPORARY FUNCTION multiplyInputs(x 
                  FLOAT64, y FLOAT64)
                  RETURNS FLOAT64
                  LANGUAGE js AS \"""
                  return x*y;
                  \""";
                  """]
    print udf_config
    query = """WITH numbers AS
              (SELECT 1 AS x, 5 as y
              UNION ALL
              SELECT 2 AS x, 10 as y
              UNION ALL
              SELECT 3 as x, 15 as y)
            SELECT x, y, multiplyInputs(x, y) as product
            FROM numbers"""
    print query
    query = query
    destination_table = 'vital-platform-791.alpha_factors. 
                          {table}_fe'.format(table=table)
    t_fe = bigquery_operator.BigQueryOperator(task_id=task_id,
                                          bql=query,                                         
                     destination_dataset_table=destination_table,
                                          use_legacy_sql=False,                                              
                     write_disposition='WRITE_TRUNCATE',
                                          udf_config=udf_config)

回答 2 件
  • この例に少し混乱しています。 udf_config をマージするだけでいいようです  および query

    query = ""CREATE TEMPORARY FUNCTION multiplyInputs(x 
                  FLOAT64, y FLOAT64)
                  RETURNS FLOAT64
                  LANGUAGE js AS \"""
                  return x*y;
                  \""";
                  WITH numbers AS
              (SELECT 1 AS x, 5 as y
              UNION ALL
              SELECT 2 AS x, 10 as y
              UNION ALL
              SELECT 3 as x, 15 as y)
            SELECT x, y, multiplyInputs(x, y) as product
            FROM numbers;"""
    
    

  • Google Cloud StorageにUDF関数をアップロードし、それを udf_config に渡します  パラメータ

    例えば:

    UDF関数は gs://test-bucket/testfolder/udf.js にあります

    次に、気流のダグを使用します:

    udf_gcs_path = "gs://test-bucket/testfolder/udf.js"
    bigquery_operator.BigQueryOperator(task_id=task_id,
        bql=query,                                         
        destination_dataset_table=destination_table,
        use_legacy_sql=False,
        write_disposition='WRITE_TRUNCATE',
        udf_config=[{"resourceUri": udf_gcs_path}])
    
    

    参照資料

    https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.userDefinedFunctionResources

    https://cloud.google.com/bigquery/user-defined-functions#referencing-code-from-google-cloud-storage

あなたの答え