本文介绍了如何控制 Airflow 安装的并行性或并发性?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的一些 Apache Airflow 安装中,即使调度程序似乎没有完全加载,计划运行的 DAG 或任务也不会运行.如何增加可并发运行的 DAG 或任务的数量?

In some of my Apache Airflow installations, DAGs or tasks that are scheduled to run do not run even when the scheduler doesn't appear to be fully loaded. How can I increase the number of DAGs or tasks that can run concurrently?

同样,如果我的安装在高负载下并且我想限制我的 Airflow 工作人员拉入队列任务的速度(例如减少资源消耗),我可以调整什么来降低平均负载?

Similarly, if my installation is under high load and I want to limit how quickly my Airflow workers pull queued tasks (such as to reduce resource consumption), what can I adjust to reduce the average load?

推荐答案

这是自 Airflow v1.10.2 以来可用的配置选项的扩展列表.有些可以基于每个 DAG 或每个运算符进行设置,但如果未指定它们,也可能会回退到设置范围的默认值.

Here's an expanded list of configuration options that are available since Airflow v1.10.2. Some can be set on a per-DAG or per-operator basis, but may also fall back to the setup-wide defaults when they are not specified.

可以基于每个 DAG 指定的选项:

  • concurrency:允许在 DAG 的所有活动运行中同时运行的任务实例数.如果未设置,则默认为 core.dag_concurrency
  • max_active_runs:此 DAG 的最大活动运行次数.一旦达到此限制,调度程序将不会创建新的活动 DAG 运行.如果未设置,则默认为 core.max_active_runs_per_dag
  • concurrency: the number of task instances allowed to run concurrently across all active runs of the DAG this is set on. Defaults to core.dag_concurrency if not set
  • max_active_runs: maximum number of active runs for this DAG. The scheduler will not create new active DAG runs once this limit is hit. Defaults to core.max_active_runs_per_dag if not set

示例:

# Only allow one run of this DAG to be running at any given time
dag = DAG('my_dag_id', max_active_runs=1)

# Allow a maximum of 10 tasks to be running across a max of 2 active DAG runs
dag = DAG('example2', concurrency=10, max_active_runs=2)


可以针对每个运营商指定的选项:

  • pool:在其中执行任务的池.Pools 可用于限制仅任务子集
  • 的并行性
  • task_concurrency:跨多个 DAG 运行的同一任务的并发限制
  • pool: the pool to execute the task in. Pools can be used to limit parallelism for only a subset of tasks
  • task_concurrency: concurrency limit for the same task across multiple DAG runs

示例:

t1 = BaseOperator(pool='my_custom_pool', task_concurrency=12)


在整个 Airflow 设置中指定的选项:

  • core.parallelism:在整个 Airflow 安装中运行的最大任务数
  • core.dag_concurrency:每个 DAG 可以运行的最大任务数(跨多个 DAG 运行)
  • core.non_pooled_task_slot_count:分配给不在池中运行的任务的任务槽数
  • core.max_active_runs_per_dag:最大活动 DAG 运行次数,每个 DAG
  • scheduler.max_threads:调度程序进程应该使用多少线程来调度 DAG
  • celery.worker_concurrency:如果使用 CeleryExecutor,worker 一次将处理的最大任务实例数
  • celery.sync_parallelism:CeleryExecutor 用于同步任务状态的进程数
  • core.parallelism: maximum number of tasks running across an entire Airflow installation
  • core.dag_concurrency: max number of tasks that can be running per DAG (across multiple DAG runs)
  • core.non_pooled_task_slot_count: number of task slots allocated to tasks not running in a pool
  • core.max_active_runs_per_dag: maximum number of active DAG runs, per DAG
  • scheduler.max_threads: how many threads the scheduler process should use to use to schedule DAGs
  • celery.worker_concurrency: max number of task instances that a worker will process at a time if using CeleryExecutor
  • celery.sync_parallelism: number of processes CeleryExecutor should use to sync task state

这篇关于如何控制 Airflow 安装的并行性或并发性?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-04 06:27