本文介绍了如何控制气流中的DAG并发的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我使用气流v1.7.1.3
I use airflow v1.7.1.3
我有两个DAG,dag_a和dag_b。
我一次设置了10个dag_a任务,理论上应该一个接一个地执行。实际上,这10个dag_a任务是并行执行的。
并发参数不起作用。谁能告诉我为什么?
I have two DAG, dag_a and dag_b.I set up 10 dag_a tasks at one time, which theoretically should be execution one by one. In reality, the 10 dag_a tasks are executed in parallel.The concurrency parameter doesn't work. Can anyone tell me why?
这是伪代码:
在dag_a.py
dag = DAG('dag_a',
start_date=datetime.now(),
default_args=default_args,
schedule_interval=None,
concurrency=1,
max_active_runs=1)
dag_b.py
from fabric.api import local
dag = DAG('dag_b',
start_date=datetime.now(),
default_args=default_args,
schedule_interval='0 22 */1 * *',
concurrency=1,
max_active_runs=1)
def trigger_dag_a(**context):
dag_list = []
for rec in rang(1,10):
time.sleep(2)
cmd = "airflow trigger_dag dag_a"
log.info("cmd:%s"%cmd)
msg = local(cmd) #"local" is function in fabric
log.info(msg)
trigger_dag_a_proc = PythonOperator(python_callable=trigger_dag_a,
provide_context=True,
task_id='trigger_dag_a_proc',
dag=dag)
推荐答案
您可以通过指定池来限制任务实例。
You can limit your task instances by specifying a pool.
- 在用户界面中创建池:
2。然后设置您的dag来使用该池:
2.Then setup your dags to use this pool:
default_args = {
'email_on_failure': False,
'email_on_retry': False,
'start_date': datetime(2017, 12, 16),
'pool': 'my_pool'
}
dag = DAG(
dag_id='foo',
schedule_interval='@daily',
default_args=default_args,
)
这篇关于如何控制气流中的DAG并发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!