bugfix> airflow > 投稿

table_2からtable_1にデータを挿入することが予想されるAirflowスクリプトがあります。気流の初期化プロセスの一環として、挿入機能をトリガーまたはスケジュールしていなくても、挿入機能がバックグラウンドで実行され続けていることがわかります。自動的にトリガーするスクリプトの何が問題になっているのでしょうか。初期化プロセスの一部としてコマンドを実行しないようにするには、以下のスクリプトで何を変更する必要がありますか。

## Library Imports
import psycopg2
import airflow
from airflow import DAG
from airflow.operators import BashOperator
from sqlalchemy import create_engine
import io

# Following are defaults which can be overridden later on
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime(2018, 5, 25),
'email': ['admin@mail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG('sample', default_args=default_args)

#######################
def db_login():
    global db_con
try:
    db_con = psycopg2.connect(" dbname = 'db' user = 'user' password = 'pass' host = 'hostname' port = '5439' sslmode = 'require' ")
except:
    print("I am unable to connect to the database.")
print('Connection Task Complete: Connected to DB')
return(db_con)

#######################
def insert_data():
    cur = db_con.cursor()
    cur.execute("""insert into table_1 select id,name,status from table_2 limit 2 ;""")
    db_con.commit()
    print('ETL Task Complete: Inserting data into table_1')

db_login()
insert_data()
db_con.close()
##########################################

t1 = BashOperator(
task_id='db_con',
python_callable=db_login(),
bash_command='python3 ~/airflow/dags/sample.py',
email_on_failure=True,
email=['admin@mail.com'],
dag=dag)
t2 = BashOperator(
task_id='insert',
python_callable=insert_data(),
bash_command='python3 ~/airflow/dags/sample.py',
email_on_failure=True,
email=['admin@mail.com'],
dag=dag)

t1.set_downstream(t2)

誰でもこれを支援できますか?ありがとう。

更新されたコード:

## Third party Library Imports
import psycopg2
import airflow
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io

default_args = {
'owner': 'admin',
#'depends_on_past': False,
'start_date': datetime(2018, 5, 25),
 'email': ['admin@mail.com'],
 'email_on_failure': True,
 'email_on_retry': True,
 'retries': 1,
 'retry_delay': timedelta(minutes=1), }
dag = DAG('sample', default_args=default_args, catchup=False, schedule_interval="@once")

def db_login():
    global db_con
    try:
        db_con = psycopg2.connect(
        " dbname = 'db' user = 'user' password = 'password' host = 'host' port = '5439' sslmode = 'require' ")
    except:
        print("I am unable to connect to the database.")
    print('Connection success')
    return (db_con)
def insert_data():
    cur = db_con.cursor()
    cur.execute("""insert into table_1 select id,name,status from table_2 limit 2;""")
    db_con.commit()
    print('ETL Task Complete: Inserting data into table_1')
def load_etl():
    db_login()
    insert_data()
    dwh_connection.close()
#Function to execute the query
load_etl()
t1 = BashOperator(
    task_id='db_connection',
    python_callable=load_etl(),
    bash_command='python3 ~/airflow/dags/sample.py',
    email_on_failure=True,
    email=['admin@mail.com'],
    dag=dag)
#t2 = BashOperator(
#task_id='ops_load_del',
#python_callable=insert_data(),
#bash_command='python3 ~/airflow/dags/sample.py',
#email_on_failure=True,
#email=['admin@mail.com'],
#dag=dag)
t1
#t1.set_downstream(t2)

回答 1 件
  • Pythonのような観点からDAGを見ると、インデントはいくつかの考えを引き起こしています。

    まず、 python name-of-dag.py でDAGを実行してみてください 。はい、 airflow を使用しないでください  コマンド。これは、何をすべきかをチェックするためにAirflowの一部によっても行われています。

    現在、何らかのコードが実行されている場合、これは意図に関連している可能性があります。

    機能分析

    ここのインデントは壊れているようです:

    def db_login():     グローバルdb_con 試してください:     db_con = psycopg2.connect( "dbname = 'db' user = 'user' password = 'pass' host = 'hostname' port = '5439' sslmode = 'require'") を除く:     print( "データベースに接続できません。") print( 'Connection Task Complete:Connected to DB') return(db_con)

    そのはず:

    def db_login():
        global db_con
        try:
            db_con = psycopg2.connect(" dbname = 'db' user = 'user' password = 'pass' host = 'hostname' port = '5439' sslmode = 'require' ")
        except:
            print("I am unable to connect to the database.")
        print('Connection Task Complete: Connected to DB')
        return(db_con)
    
    

    それ以外の場合、一番左のコードが常に実行されます。

    また:グローバル Airflowの他のメソッド内で変数を使用できる必要はありません!接続を共有するには、例えば気流 XCOM :https://airflow.apache.org/concepts.html#xcoms

    DAG内で直接関数を呼び出す

    また、私には知られていない何らかの理由で、いくつかの機能を実行したい完全に気流の制御外 しかし、すべての実行で。

    db_login() 
    insert_data()
    db_con.close()
    
    

    このコードが実行されますDAGが呼び出されるたびに (多くの場合があります)、希望するスケジュールとは完全に異なる場合があります。

    このコードをテスト目的で使用したい場合は、メインコールに入れてください。

    if __name__ == '__main__':
        db_login() 
        insert_data()
        db_con.close()
    
    

    その場合でも、閉じる操作はこのワークフローでのみ使用でき、DAGでは使用できません。接続を閉じるタスクはありません。

    PythonOperator を使用しているため  また、これを行うために小さなdefを作成し、このdefを呼び出すタスクを1つだけにするのが賢明かもしれません。

    def load_etl():
        db_login() 
        insert_data()
        db_con.close()
    
    

    TL/DR: すべてのインデントエラーを排除して、純粋にPythonでファイルを呼び出した場合、コードが実行されないようにします。

    編集

    これは、タスクまたはdefの外部に関数呼び出しがないことも意味します。この行

    #Function to execute the query
    load_etl()
    
    

    タスクまたはdefの一部ではないため、実行されます。削除する必要があります。関数呼び出しはタスクの一部であるため、動作するはずです。

    この関数はPython関数であるため、 PythonOperator を使用する必要があります  およびそのパラメーター python_callable=load_etl  (注:行末に括弧はありません)

あなたの答え