前言

最近做的这个项目(基于Django),需要做个功能,实现定时采集车辆定位。

这让我想起来几年前那个OneCat项目,当时我用的是Celery这个很重的组件

Celery实在是太重了,后来我做公众号采集平台的时候,又接触了Django-RQ和Django-Q这俩,前者是对RQ的封装,让RQ和Django更好的结合在一起;后者是一个全新的「多进程任务队列」组件,相比起celery很轻量,当时使用的时候就给我留下不错的印象。

于是这个项目我决定继续使用Django-Q来实现一些异步操作和定时任务。

关于Django-Q

官方介绍:

快速开始

安装

pip install django-q

添加到 INSTALLED_APPS

INSTALLED_APPS = (
    # other apps
    'django_q',
)

数据库迁移

由于Django-Q会把执行结果放到数据库里,所以要执行一下数据库迁移的操作

python manage.py migrate

这个操作会生成 django_q_ormqdjango_q_scheduledjango_q_task 三个表

配置

因为本身项目用的缓存就是Redis,所以我直接用Redis作为消息队列的后端(broker)

Django-Q支持很多种后端,除了Redis还有Disque、IronMQ、Amazon SQS、MongoDB或者是Django的ORM~

settings.py 中添加以下配置:

Q_CLUSTER = {
    'name': 'project_name',
    'workers': 4,
    'recycle': 500,
    'timeout': 60,
    'compress': True,
    'cpu_affinity': 1,
    'save_limit': 250,
    'queue_limit': 500,
    'label': 'Django Q',
    'redis': {
        'host': 127.0.0.1',
        'port': 6379,
        'db': 0,
    }
}

启动服务

python manage.py qcluster

搞定,现在消息队列服务已经跑起来了

我们可以添加异步任务或者定时任务

异步任务

最简单的方式是使用它提供的 async_task 方法,添加一个新的异步任务到队列中

来写个例子,输入一个数,求阶乘之后开平方

import math

def demo_task(number: int):
    return math.sqrt(math.factorial(number))

启动任务

然后来添加一个异步任务

from django_q.tasks import async_task, Task

def task_finish(task: Task):
    print(f'任务 {task.name}(ID:{task.id})完成!')

task_id = async_task(
    demo_task, 10,
    task_name='任务名称',
    hook=task_finish,
)

可以看到,直接调用 async_task 方法就行

这个方法的定义是

async_task(func: Any, *args: Any, **kwargs: Any)

传入要异步执行的方法之后,可以把该方法的参数跟在后面传进去,也可以用 kwargs 的方式传入

这两种方式都可以的:

  • async_task(demo_task, 10)
  • async_task(demo_task, number=10)

我个人比较喜欢第一种,因为Django-Q本身有几个命名参数,比如 task_namehooktimeout之类的,用第一种方式传参不容易和Django-Q默认的命名参数冲突。

获取执行结果

有两种方式获取任务的执行结果:

  • admin后台
  • 使用 result 方法,在代码中获取

第一种方式无需赘述,在安装Django-Q组件后执行了数据库迁移,就会生成 Failed tasksScheduled tasksSuccessful tasks 三个admin模块,顾名思义,在 Failed tasksSuccessful tasks 中可以看到任务的执行结果,也就是我们写在 demo_task 里的返回值。

第二种方式,代码如下:

from django_q.tasks import result

task_result = result(task_id)

task_id 传入就可以查询任务执行的结果,如果任务还没执行完,那结果就是 None

这个 result 方法还有个 wait 参数,可以设置等待时间,单位是毫秒

执行完成回调

上面代码中,我们还设置了 hook 参数

作用就是任务执行完成之后,执行 task_finish 这个函数

task_finish 里可以通过 task 参数获取任务信息

就是这样~

async_task 的其他参数

创建异步任务的这个方法还有很多参数,官网文档写得还算可以,很多参数都是 Q_CLUSTER 配置里面有的,在 async_task 里设置这些参数就会覆盖默认的配置。

我直接搬运一波,权当翻译文档了~

除了上面介绍到的 task_namehook 还有这些参数:

  • group: str 任务的分组名称
  • save 配置任务运行结果的存储后端,不过文档里只是一句话的介绍,具体如何配置还得研究一下。(稍微看了一下源码,没搞懂,动态语言太折磨人了)
  • timeout: int 任务超时时间,单位是秒。回顾一下前面的 Q_CLUSTER 配置,里面有 timeout 配置,设置这个参数可以覆盖前面的配置,如果任务运行超出了这个时间,就会被直接终止。
  • ack_failures: bool 设置为True时,也承认失败的任务。这会导致失败的任务被视为成功交付,从而将其从任务队列中删除。默认值为False。(说实话我没看懂是啥意思)
  • sync: bool 设置为True的时候,所有异步任务会变成同步执行,这个功能在测试的时候比较有用。默认是False。
  • cached 这个参数既可以设置为True,也可以传入数字,代表缓存过期时间。根据文档描述,异步任务的执行结果会存在数据库里,当这个参数设置为True的时候,结果不写入数据库,而是保存在缓存里。这个功能在短时间内要大量执行异步任务,且不需要把结果立刻写入数据库的情况下比较有用,可以提高性能。
  • broker 需要传入一个 Broker 对象的实例,用来控制这个异步任务在哪个Broker里执行。
  • q_options: dict 这是最后一个参数了。我下面单独介绍一下

q_options 参数

根据前面启动任务的部分,我们启动异步任务的时候,可以通过命名参数向任务方法传递参数,比如:

async_task(demo_task, number=10)

async_task 这个方法本身又有很多参数,如果这个参数名称和我们要执行的任务 demo_task 参数重名的话,这些参数就被 async_task 拿走了,我们的任务 demo_task 就拿不到这些参数了。

怎么办?

q_options 参数就是为了解决这个问题

可以把要传给 async_task 的参数都包装在一个 dict 里面,然后通过 q_options 参数传入

假如我们的 demo_task 是这样的:

def demo_task(number: int, timeout: int):
  ...

除了 number 这个参数,还要接收一个跟 async_task 自有参数重名的 timeout 参数,使用 q_options 的解决方案如下

opts = {
    'hook': 'hooks.print_result',
    'group': 'math',
    'timeout': 30
}

async_task(demo_task, number=10, timeout=100, q_options=opts)

这样既能……又能……,完美啊~

当然我还是建议用 *args 的方式传参,这样就没有参数重名的问题了。

定时任务

有两种方式添加定时任务

  • 在代码添加
  • admin后台

在代码中添加

比较简单,直接上代码

from django_q.tasks import schedule

schedule(
  'demo_task',
  schedule_type=Schedule.MINUTES,
  minutes=1,
  task_name='任务名称',
)

有一点注意的是,因为添加后的定时任务是要保存在数据库中的

所以需要把要执行的方法(包含完整包名),以字符串的形式传入

假如在我们的Django项目中,要执行的是在 apps/test/tasks.py 文件中的 demo_task 方法

那么需要把 apps.test.tasks.demo_task 这个完整的名称传入

在admin中添加也是一样

时间间隔设置

Django-Q的定时任务有很多类型:

  • 一次性
  • 按x分钟执行一次
  • 每小时一次
  • 每天
  • 每周
  • 每月
  • 每季度
  • 每年
  • Cron表达式

注意,即使是Cron表达式,定时任务执行的最短间隔也是1分钟

这点我一开始不知道,用Cron表达式写了个15秒的任务,但执行时间根本不对,然后我翻了一下github上的issues,看到作者的解答才知道~

那个Issues的地址:https://github.com/Koed00/django-q/issues/179

作者的回复:

这点感觉有些鸡肋,如果要高频执行的任务,那只能选择Celery了

在admin后台添加

这个更简单,傻瓜式操作

所以这部分略过了~

docker部署

现在后端服务基本是用docker部署的

为了能在docker中使用Django-Q

我们需要在原有Django容器的基础上,再起一个同样的容器,然后入口改成qcluster的启动命令

这里有个issues也有讨论这个问题:https://github.com/Koed00/django-q/issues/513

来个 docker-compose.yml 的例子

version: "3.9"
services:
  redis:
    image: redis:alpine
    ports:
      - 6379:6379
  web:
    build: .
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - .:/code
    ports:
      - "8000:8000"
    depends_on:
      - redis
      - django_q
  django_q:
    build: .
    command: python manage.py qcluster
    volumes:
      - .:/code
    depends_on:
      - redis

一个简单的例子

其他的类似环境变量这些,根据实际情况来

注意:

  • Django容器有的东西(环境变量、依赖),Django-Q也要同步加进去
  • Django项目代码修改之后,如果是通过uwsgi之类的自动重启服务,那要注意Django-Q不会自动重启,需要手动执行 docker-compose restart django_q ,才能使修改的代码生效

其他

命令行工具

Django-Q还提供了一些命令行工具

  • 监控cluster执行情况:python manage.py qmonitor
  • 监控内容:python manage.py qmemory
  • 查看当前状态信息:python manage.py qinfo

除了使用命令监控,还可以在代码里做监控,不过我暂时没用到,所以还没研究,有需要的同学可以直接看文档

admin自定义

安装完Django-Q后,会在admin出现三个菜单,跟普通的Django app一样,这些也是通过 admin 注册进去的,因此我们可以重新注册这些 ModelAdmin 来自定义admin上的操作界面

来一段官方关于失败任务界面的代码:

from django_q import models as q_models
from django_q import admin as q_admin

admin.site.unregister([q_models.Failure])
@admin.register(q_models.Failure)
class ChildClassAdmin(q_admin.FailAdmin):
    list_display = (
        'name',
        'func',
        'result',
        'started',
        # add attempt_count to list_display
        'attempt_count'
    )

跟普通的 ModelAdmin 是一样的

我们可以自行添加搜索框、过滤字段之类的。记得要先执行 admin.site.unregister([q_models.Failure]) 取消之前Django-Q自己注册的 ModelAdmin 对象。

信号

Django内置信号系统,我之前有写过一篇简单的文章介绍:3分钟看懂Python后端必须知道的Django的信号机制

Django-Q提供了两类信号:

  • 任务加入消息队列前
  • 任务执行前

例子代码如下:

from django.dispatch import receiver
from django_q.signals import pre_enqueue, pre_execute

@receiver(pre_enqueue)
def my_pre_enqueue_callback(sender, task, **kwargs):
    print("Task {} will be enqueued".format(task["name"]))

@receiver(pre_execute)
def my_pre_execute_callback(sender, func, task, **kwargs):
    print("Task {} will be executed by calling {}".format(
          task["name"], func))

有需要的话可以注册消息接收器,做一些处理。

小结

搞定~

Django-Q使用下来的体验还是不错的,足够轻量,部署足够方便,足以应付大部分场景了~

参考资料

09-01 10:57