Airflowは、同じデータ内の異なる時間にタスクを実行しますか?

ヤマ勘

私は1つのタスクに30の個別のタスクを持っていますが、それらは相互に依存関係がありません。タスクは同じコードを実行します。唯一の違いはデータ量です。一部のタスクは数秒で終了し、一部のタスクは2時間以上かかります。

問題はキャッチアップ中にあり、数秒で終了するタスクは、次の実行日に移動する前に終了するのに数時間かかるタスクによってブロックされます。

私はそれらを個々のダグに分割することができますが、それはばかげているようで、30のタスクは将来さらに多くなるでしょう。

異なる実行時間に同じデータでタスクを実行する方法はありますか?タスクが終了したらすぐに、他のタスクの実行状況に関係なく、次の実行日になります。

説明のために写真を追加します。基本的に、3番目の行がまだ遅れている間に、最初の行にさらに2つの緑色のボックスが表示されるようにしたいと思います。

airflow_dag_ideal

編集:

y2k-shubhamの説明の、私はそれを実装しようとしました。しかし、それはまだ機能していません。速いタスクはで開始し2019-01-30 00、1秒で終了し2019-01-30 01、遅いタスクがまだ実行されているため開始しません可能ならば、実行するために理想的だろう2019-01-30 012019-01-30 022019-01-30 03可能であれば並行して...

コード例の追加

import time
from datetime import datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule

default_args = {
    'owner': 'test',
    'depends_on_past': False,
    'start_date': datetime(2019, 1, 30, 0, 0, 0),
    'trigger_rule': TriggerRule.DUMMY
}

dag = DAG(dag_id='test_dag', default_args=default_args, schedule_interval='@hourly')


def fast(**kwargs):
    return 1


def slow(**kwargs):
    time.sleep(600)
    return 1


fast_task = PythonOperator(
    task_id='fast',
    python_callable=fast,
    provide_context=True,
    priority_weight=10000,
    pool='fast_pool',
    # weight_rule='upstream', # using 1.9, this param doesn't exist
    dag=dag
)

slow_task = PythonOperator(
    task_id='slow',
    python_callable=slow,
    provide_context=True,
    priority_weight=500,
    pool='slow_pool',
    # weight_rule='upstream', # using 1.9, this param doesn't exist
    dag=dag
)

fast_task >> slow_task # not working
ヤマ勘

私の問題を非常に簡単に解決するために設定できる2つの変数があることがわかりました。

concurrency そして max_active_runs

以下の例では、4つのダグを実行し、各ダグで4つのタスクを同時に実行できます。他の組み合わせも可能です。

dag = DAG(
    dag_id='sample_dag',
    default_args=default_args,
    schedule_interval='@daily',
    # this will allow up to 16 tasks to be run at the same time
    concurrency=16,
    # this will allow up to 4 dags to be run at the same time
    max_active_runs=4,
)

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

同じデータベースで同時に同じクエリを実行しますが、結果は異なりますか?[ビッグクエリ]

分類Dev

同じMySQLデータベースで同じクエリを実行すると、異なる時間がかかります

分類Dev

異なるタスクを同じスレッドで同時に実行できますか?

分類Dev

同じLinqクエリの異なるフィールドを異なる時間に使用した場合、Entity Frameworkはデータベースを複数回クエリしますか?

分類Dev

実行に異なる時間がかかる同じSQLServerクエリ(ステートメントは同じですが構文が異なります)

分類Dev

同じSQLクエリの場合、データベースが応答を返すまでに異なる時間がかかります

分類Dev

postgresの同じ列内に異なるデータ型を保存する正しい方法はありますか?

分類Dev

graphvizのfdpレイアウトを使用して、同じクラスター内のノード間と異なるクラスター内のノード間で異なるエッジスプラインを使用する方法はありますか?

分類Dev

同じデータフレーム内の異なる要因に対して同じ線形回帰を実行します

分類Dev

2つの異なるAccessデータベースマクロを同時に実行するにはどうすればよいですか?

分類Dev

同じデータベースを使用して、異なるアプリケーション間でdjangoルックアップをどのように実行しますか?

分類Dev

MySQL同じ行の異なるデータを同じクエリに選択します

分類Dev

Apache Airflow 1.10+スケジューラーは、特定の時間に異なるDST対応タイムゾーンで2つのDAGを実行することをサポートしていますか?

分類Dev

UITableViewCell内で時間のかかるタスクを実行し、スクロールを一時停止します

分類Dev

daliyデータと行データに対して同じクエリを実行すると、ドルイドの数が異なります

分類Dev

Oracleが同じデータファイル内の異なる行を同時に更新する方法

分類Dev

JDBCステートメントのSYSDATEは、同じOracleデータベースに対して照会すると異なる時間を返すようです。

分類Dev

相互に干渉しないように、Oracleデータベースで関連する手順または同じ手順を同時に実行します

分類Dev

分離したデータソースを持つ異なる永続性ユニットが同じデータソースにクエリを実行するのはなぜですか?

分類Dev

Wekaは常に異なるデータに対して同じクラスターを生成します

分類Dev

Wekaは常に異なるデータに対して同じクラスターを生成します

分類Dev

エイリアスSparkScalaを使用して、同じデータフレーム内の異なる列で複数の集計を実行します

分類Dev

データフレーム内の異なる長さの列間でクロスチェックを実行し、新しいデータフレームを作成するにはどうすればよいですか?

分類Dev

データベースから返されるLaravel日時は、同じデータベース内のレコードとは異なります

分類Dev

PHP PDOは、サポートする12の異なるデータベースに対して同じクエリ構文を使用しますか?

分類Dev

デフォルトの関数パラメーターとしてのPythondatetime.now()は、異なる時間に同じ値を返します

分類Dev

何千ものデータベース呼び出しを同時に実行しようとしています-それらは同時に実行されるのではなくスタックします

分類Dev

XSLT:最初は同じタグ内で、次に2つの異なるタグでグループ化を2回実行します

分類Dev

gensimが同じデータでの異なる実行に対して同じWord2Vecモデルを生成することを確認します

Related 関連記事

  1. 1

    同じデータベースで同時に同じクエリを実行しますが、結果は異なりますか?[ビッグクエリ]

  2. 2

    同じMySQLデータベースで同じクエリを実行すると、異なる時間がかかります

  3. 3

    異なるタスクを同じスレッドで同時に実行できますか?

  4. 4

    同じLinqクエリの異なるフィールドを異なる時間に使用した場合、Entity Frameworkはデータベースを複数回クエリしますか?

  5. 5

    実行に異なる時間がかかる同じSQLServerクエリ(ステートメントは同じですが構文が異なります)

  6. 6

    同じSQLクエリの場合、データベースが応答を返すまでに異なる時間がかかります

  7. 7

    postgresの同じ列内に異なるデータ型を保存する正しい方法はありますか?

  8. 8

    graphvizのfdpレイアウトを使用して、同じクラスター内のノード間と異なるクラスター内のノード間で異なるエッジスプラインを使用する方法はありますか?

  9. 9

    同じデータフレーム内の異なる要因に対して同じ線形回帰を実行します

  10. 10

    2つの異なるAccessデータベースマクロを同時に実行するにはどうすればよいですか?

  11. 11

    同じデータベースを使用して、異なるアプリケーション間でdjangoルックアップをどのように実行しますか?

  12. 12

    MySQL同じ行の異なるデータを同じクエリに選択します

  13. 13

    Apache Airflow 1.10+スケジューラーは、特定の時間に異なるDST対応タイムゾーンで2つのDAGを実行することをサポートしていますか?

  14. 14

    UITableViewCell内で時間のかかるタスクを実行し、スクロールを一時停止します

  15. 15

    daliyデータと行データに対して同じクエリを実行すると、ドルイドの数が異なります

  16. 16

    Oracleが同じデータファイル内の異なる行を同時に更新する方法

  17. 17

    JDBCステートメントのSYSDATEは、同じOracleデータベースに対して照会すると異なる時間を返すようです。

  18. 18

    相互に干渉しないように、Oracleデータベースで関連する手順または同じ手順を同時に実行します

  19. 19

    分離したデータソースを持つ異なる永続性ユニットが同じデータソースにクエリを実行するのはなぜですか?

  20. 20

    Wekaは常に異なるデータに対して同じクラスターを生成します

  21. 21

    Wekaは常に異なるデータに対して同じクラスターを生成します

  22. 22

    エイリアスSparkScalaを使用して、同じデータフレーム内の異なる列で複数の集計を実行します

  23. 23

    データフレーム内の異なる長さの列間でクロスチェックを実行し、新しいデータフレームを作成するにはどうすればよいですか?

  24. 24

    データベースから返されるLaravel日時は、同じデータベース内のレコードとは異なります

  25. 25

    PHP PDOは、サポートする12の異なるデータベースに対して同じクエリ構文を使用しますか?

  26. 26

    デフォルトの関数パラメーターとしてのPythondatetime.now()は、異なる時間に同じ値を返します

  27. 27

    何千ものデータベース呼び出しを同時に実行しようとしています-それらは同時に実行されるのではなくスタックします

  28. 28

    XSLT:最初は同じタグ内で、次に2つの異なるタグでグループ化を2回実行します

  29. 29

    gensimが同じデータでの異なる実行に対して同じWord2Vecモデルを生成することを確認します

ホットタグ

アーカイブ