本文介绍了如何控制气流中的DAG并发的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用气流v1.7.1.3

I use airflow v1.7.1.3

我有两个DAG,dag_a和dag_b。
我一次设置了10个dag_a任务,理论上应该一个接一个地执行。实际上,这10个dag_a任务是并行执行的。
并发参数不起作用。谁能告诉我为什么?

I have two DAG, dag_a and dag_b.I set up 10 dag_a tasks at one time, which theoretically should be execution one by one. In reality, the 10 dag_a tasks are executed in parallel.The concurrency parameter doesn't work. Can anyone tell me why?

这是伪代码:

在dag_a.py

dag = DAG('dag_a',
            start_date=datetime.now(),
            default_args=default_args,
            schedule_interval=None,
            concurrency=1,
            max_active_runs=1)

dag_b.py

from fabric.api import local

dag = DAG('dag_b',
            start_date=datetime.now(),
            default_args=default_args,
            schedule_interval='0 22 */1 * *',
            concurrency=1,
            max_active_runs=1)


def trigger_dag_a(**context):

    dag_list = []
    for rec in rang(1,10):
        time.sleep(2)
        cmd = "airflow trigger_dag dag_a"

        log.info("cmd:%s"%cmd)
        msg = local(cmd)    #"local" is function in fabric
        log.info(msg)


trigger_dag_a_proc = PythonOperator(python_callable=trigger_dag_a,
                          provide_context=True,
                          task_id='trigger_dag_a_proc',
                          dag=dag)


推荐答案

您可以通过指定池来限制任务实例。

You can limit your task instances by specifying a pool.


  1. 在用户界面中创建池:

2。然后设置您的dag来使用该池:

2.Then setup your dags to use this pool:

        default_args = {
                    'email_on_failure': False,
                    'email_on_retry': False,
                    'start_date': datetime(2017, 12, 16),
                    'pool': 'my_pool'
                }

        dag = DAG(
            dag_id='foo',
            schedule_interval='@daily',
            default_args=default_args,
        )

这篇关于如何控制气流中的DAG并发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-04 06:45