私は1つのタスクに30の個別のタスクを持っていますが、それらは相互に依存関係がありません。タスクは同じコードを実行します。唯一の違いはデータ量です。一部のタスクは数秒で終了し、一部のタスクは2時間以上かかります。
問題はキャッチアップ中にあり、数秒で終了するタスクは、次の実行日に移動する前に終了するのに数時間かかるタスクによってブロックされます。
私はそれらを個々のダグに分割することができますが、それはばかげているようで、30のタスクは将来さらに多くなるでしょう。
異なる実行時間に同じデータでタスクを実行する方法はありますか?タスクが終了したらすぐに、他のタスクの実行状況に関係なく、次の実行日になります。
説明のために写真を追加します。基本的に、3番目の行がまだ遅れている間に、最初の行にさらに2つの緑色のボックスが表示されるようにしたいと思います。
編集:
y2k-shubhamの説明の後、私はそれを実装しようとしました。しかし、それはまだ機能していません。速いタスクはで開始し2019-01-30 00
、1秒で終了し2019-01-30 01
、遅いタスクがまだ実行されているため開始しません。可能ならば、実行するために理想的だろう2019-01-30 01
、2019-01-30 02
、2019-01-30 03
可能であれば並行して...
コード例の追加
import time
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': datetime(2019, 1, 30, 0, 0, 0),
'trigger_rule': TriggerRule.DUMMY
}
dag = DAG(dag_id='test_dag', default_args=default_args, schedule_interval='@hourly')
def fast(**kwargs):
return 1
def slow(**kwargs):
time.sleep(600)
return 1
fast_task = PythonOperator(
task_id='fast',
python_callable=fast,
provide_context=True,
priority_weight=10000,
pool='fast_pool',
# weight_rule='upstream', # using 1.9, this param doesn't exist
dag=dag
)
slow_task = PythonOperator(
task_id='slow',
python_callable=slow,
provide_context=True,
priority_weight=500,
pool='slow_pool',
# weight_rule='upstream', # using 1.9, this param doesn't exist
dag=dag
)
fast_task >> slow_task # not working
私の問題を非常に簡単に解決するために設定できる2つの変数があることがわかりました。
concurrency
そして max_active_runs
以下の例では、4つのダグを実行し、各ダグで4つのタスクを同時に実行できます。他の組み合わせも可能です。
dag = DAG(
dag_id='sample_dag',
default_args=default_args,
schedule_interval='@daily',
# this will allow up to 16 tasks to be run at the same time
concurrency=16,
# this will allow up to 4 dags to be run at the same time
max_active_runs=4,
)
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加