在Celery在python中的应用除了实现异步任务(async task)外也可以执行定时任务(beat)

1.Celery定时任务是什么?

Celery默认任务单元由任务生产者触发,但有时可能需要其自动触发, 而beat进程正是负责此类任务,能够自动触发定时/周期性任务.

python celery beat实现定时任务-LMLPHP

只需要在配置中配置好周期任务,然后在运行一个周期任务触发器(beat)即可

2.直接上代码

目录结构如下:

python celery beat实现定时任务-LMLPHP

celery_app.py 文件代码如下:

import os
import sys
import time
import celery
from pathlib import Path
from datetime import timedelta

# 实例化celery对象
app = celery.Celery(
    "celery_worker",
    backend="redis://:@127.0.0.1:6379/4",
    broker="redis://:@127.0.0.1:6379/5",
    include=[
        "celery_worker.email.tasks"
    ],
)

# celery beat 定时任务
beat_schedule = {
    'periodic_task-every-minute': {
        # 'task': 'celery_worker.email.tasks.add',
        'task': 'chain.send_chains',
        'schedule': timedelta(seconds=10),
        'args': (11, 22)
    },
}

# 配置文件
app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_default_queue="normal",
    timezone="Asia/Shanghai",
    enable_utc=False,
    task_ignore_result=True,
    redis_max_connections=100,
    result_expires=3600,
    beat_schedule=beat_schedule
)

"""
celery -A celery_worker.celery_app worker -l info
celery -A celery_worker.celery_app beat
"""

email.tasks.py 代码如下:

from loguru import logger
# 模块化之后
from celery_worker.celery_app import app

@app.task(name='chain.send_chains')
def add(x, y):
    logger.info(f'number_add 进来了...x:{x}, y:{y}')
    return x + y

然后顺序启动 worker 和 beat 定时任务(记得两个都必须启动)

执行如下命令:

celery -A celery_worker.celery_app worker -l info   (启动干活的人)
celery -A celery_worker.celery_app beat  (启动定时任务 类似crontab)

效果如下:

python celery beat实现定时任务-LMLPHP

python celery beat实现定时任务-LMLPHP

其实简单的来说就是这点代码:

beat_schedule = {
    'periodic_task-every-minute': {
        # 'task': 'celery_worker.email.tasks.add',
        'task': 'chain.send_chains',
        'schedule': timedelta(seconds=10),
        'args': (11, 22)
    },
}

periodic_task-every-minute 这个就是定时任务的名字 ,随便起无所谓。

重点是这个 "task",经过实际测试,如果这个工作函数没有指定name 名字的话,默认就是 函数路径+函数名称 也就是 'celery_worker.email.tasks.add'

但是如果这函数添加name属性值的话 直接用名字也是可以的,也就是'chain.send_chains'

好了 小伙伴们也自己实操下吧!

03-04 06:08