前言
最近做的这个项目(基于Django),需要做个功能,实现定时采集车辆定位。
这让我想起来几年前那个OneCat项目,当时我用的是Celery这个很重的组件
Celery实在是太重了,后来我做公众号采集平台的时候,又接触了Django-RQ和Django-Q这俩,前者是对RQ的封装,让RQ和Django更好的结合在一起;后者是一个全新的「多进程任务队列」组件,相比起celery很轻量,当时使用的时候就给我留下不错的印象。
于是这个项目我决定继续使用Django-Q来实现一些异步操作和定时任务。
关于Django-Q
官方介绍:
快速开始
安装
pip install django-q
添加到 INSTALLED_APPS
INSTALLED_APPS = (
# other apps
'django_q',
)
数据库迁移
由于Django-Q会把执行结果放到数据库里,所以要执行一下数据库迁移的操作
python manage.py migrate
这个操作会生成 django_q_ormq
、django_q_schedule
、django_q_task
三个表
配置
因为本身项目用的缓存就是Redis,所以我直接用Redis作为消息队列的后端(broker)
Django-Q支持很多种后端,除了Redis还有Disque、IronMQ、Amazon SQS、MongoDB或者是Django的ORM~
在 settings.py
中添加以下配置:
Q_CLUSTER = {
'name': 'project_name',
'workers': 4,
'recycle': 500,
'timeout': 60,
'compress': True,
'cpu_affinity': 1,
'save_limit': 250,
'queue_limit': 500,
'label': 'Django Q',
'redis': {
'host': 127.0.0.1',
'port': 6379,
'db': 0,
}
}
启动服务
python manage.py qcluster
搞定,现在消息队列服务已经跑起来了
我们可以添加异步任务或者定时任务
异步任务
最简单的方式是使用它提供的 async_task
方法,添加一个新的异步任务到队列中
来写个例子,输入一个数,求阶乘之后开平方
import math
def demo_task(number: int):
return math.sqrt(math.factorial(number))
启动任务
然后来添加一个异步任务
from django_q.tasks import async_task, Task
def task_finish(task: Task):
print(f'任务 {task.name}(ID:{task.id})完成!')
task_id = async_task(
demo_task, 10,
task_name='任务名称',
hook=task_finish,
)
可以看到,直接调用 async_task
方法就行
这个方法的定义是
async_task(func: Any, *args: Any, **kwargs: Any)
传入要异步执行的方法之后,可以把该方法的参数跟在后面传进去,也可以用 kwargs
的方式传入
这两种方式都可以的:
async_task(demo_task, 10)
async_task(demo_task, number=10)
我个人比较喜欢第一种,因为Django-Q本身有几个命名参数,比如 task_name
、hook
、timeout
之类的,用第一种方式传参不容易和Django-Q默认的命名参数冲突。
获取执行结果
有两种方式获取任务的执行结果:
- admin后台
- 使用
result
方法,在代码中获取
第一种方式无需赘述,在安装Django-Q组件后执行了数据库迁移,就会生成 Failed tasks
、Scheduled tasks
、Successful tasks
三个admin模块,顾名思义,在 Failed tasks
和Successful tasks
中可以看到任务的执行结果,也就是我们写在 demo_task
里的返回值。
第二种方式,代码如下:
from django_q.tasks import result
task_result = result(task_id)
把 task_id
传入就可以查询任务执行的结果,如果任务还没执行完,那结果就是 None
这个 result
方法还有个 wait
参数,可以设置等待时间,单位是毫秒
执行完成回调
上面代码中,我们还设置了 hook
参数
作用就是任务执行完成之后,执行 task_finish
这个函数
在 task_finish
里可以通过 task
参数获取任务信息
就是这样~
async_task
的其他参数
创建异步任务的这个方法还有很多参数,官网文档写得还算可以,很多参数都是 Q_CLUSTER
配置里面有的,在 async_task
里设置这些参数就会覆盖默认的配置。
我直接搬运一波,权当翻译文档了~
除了上面介绍到的 task_name
、hook
还有这些参数:
group: str
任务的分组名称save
配置任务运行结果的存储后端,不过文档里只是一句话的介绍,具体如何配置还得研究一下。(稍微看了一下源码,没搞懂,动态语言太折磨人了)timeout: int
任务超时时间,单位是秒。回顾一下前面的Q_CLUSTER
配置,里面有timeout
配置,设置这个参数可以覆盖前面的配置,如果任务运行超出了这个时间,就会被直接终止。ack_failures: bool
设置为True时,也承认失败的任务。这会导致失败的任务被视为成功交付,从而将其从任务队列中删除。默认值为False。(说实话我没看懂是啥意思)sync: bool
设置为True的时候,所有异步任务会变成同步执行,这个功能在测试的时候比较有用。默认是False。cached
这个参数既可以设置为True,也可以传入数字,代表缓存过期时间。根据文档描述,异步任务的执行结果会存在数据库里,当这个参数设置为True的时候,结果不写入数据库,而是保存在缓存里。这个功能在短时间内要大量执行异步任务,且不需要把结果立刻写入数据库的情况下比较有用,可以提高性能。broker
需要传入一个Broker
对象的实例,用来控制这个异步任务在哪个Broker里执行。q_options: dict
这是最后一个参数了。我下面单独介绍一下
q_options
参数
根据前面启动任务的部分,我们启动异步任务的时候,可以通过命名参数向任务方法传递参数,比如:
async_task(demo_task, number=10)
但 async_task
这个方法本身又有很多参数,如果这个参数名称和我们要执行的任务 demo_task
参数重名的话,这些参数就被 async_task
拿走了,我们的任务 demo_task
就拿不到这些参数了。
怎么办?
q_options
参数就是为了解决这个问题
可以把要传给 async_task
的参数都包装在一个 dict 里面,然后通过 q_options
参数传入
假如我们的 demo_task
是这样的:
def demo_task(number: int, timeout: int):
...
除了 number
这个参数,还要接收一个跟 async_task
自有参数重名的 timeout
参数,使用 q_options
的解决方案如下
opts = {
'hook': 'hooks.print_result',
'group': 'math',
'timeout': 30
}
async_task(demo_task, number=10, timeout=100, q_options=opts)
这样既能……又能……,完美啊~
当然我还是建议用 *args
的方式传参,这样就没有参数重名的问题了。
定时任务
有两种方式添加定时任务
- 在代码添加
- admin后台
在代码中添加
比较简单,直接上代码
from django_q.tasks import schedule
schedule(
'demo_task',
schedule_type=Schedule.MINUTES,
minutes=1,
task_name='任务名称',
)
有一点注意的是,因为添加后的定时任务是要保存在数据库中的
所以需要把要执行的方法(包含完整包名),以字符串的形式传入
假如在我们的Django项目中,要执行的是在 apps/test/tasks.py
文件中的 demo_task
方法
那么需要把 apps.test.tasks.demo_task
这个完整的名称传入
在admin中添加也是一样
时间间隔设置
Django-Q的定时任务有很多类型:
- 一次性
- 按x分钟执行一次
- 每小时一次
- 每天
- 每周
- 每月
- 每季度
- 每年
- Cron表达式
注意,即使是Cron表达式,定时任务执行的最短间隔也是1分钟
这点我一开始不知道,用Cron表达式写了个15秒的任务,但执行时间根本不对,然后我翻了一下github上的issues,看到作者的解答才知道~
那个Issues的地址:https://github.com/Koed00/django-q/issues/179
作者的回复:
这点感觉有些鸡肋,如果要高频执行的任务,那只能选择Celery了
在admin后台添加
这个更简单,傻瓜式操作
所以这部分略过了~
docker部署
现在后端服务基本是用docker部署的
为了能在docker中使用Django-Q
我们需要在原有Django容器的基础上,再起一个同样的容器,然后入口改成qcluster的启动命令
这里有个issues也有讨论这个问题:https://github.com/Koed00/django-q/issues/513
来个 docker-compose.yml
的例子
version: "3.9"
services:
redis:
image: redis:alpine
ports:
- 6379:6379
web:
build: .
command: python manage.py runserver 0.0.0.0:8000
volumes:
- .:/code
ports:
- "8000:8000"
depends_on:
- redis
- django_q
django_q:
build: .
command: python manage.py qcluster
volumes:
- .:/code
depends_on:
- redis
一个简单的例子
其他的类似环境变量这些,根据实际情况来
注意:
- Django容器有的东西(环境变量、依赖),Django-Q也要同步加进去
- Django项目代码修改之后,如果是通过uwsgi之类的自动重启服务,那要注意Django-Q不会自动重启,需要手动执行
docker-compose restart django_q
,才能使修改的代码生效
其他
命令行工具
Django-Q还提供了一些命令行工具
- 监控cluster执行情况:
python manage.py qmonitor
- 监控内容:
python manage.py qmemory
- 查看当前状态信息:
python manage.py qinfo
除了使用命令监控,还可以在代码里做监控,不过我暂时没用到,所以还没研究,有需要的同学可以直接看文档
admin自定义
安装完Django-Q后,会在admin出现三个菜单,跟普通的Django app一样,这些也是通过 admin 注册进去的,因此我们可以重新注册这些 ModelAdmin
来自定义admin上的操作界面
来一段官方关于失败任务界面的代码:
from django_q import models as q_models
from django_q import admin as q_admin
admin.site.unregister([q_models.Failure])
@admin.register(q_models.Failure)
class ChildClassAdmin(q_admin.FailAdmin):
list_display = (
'name',
'func',
'result',
'started',
# add attempt_count to list_display
'attempt_count'
)
跟普通的 ModelAdmin
是一样的
我们可以自行添加搜索框、过滤字段之类的。记得要先执行 admin.site.unregister([q_models.Failure])
取消之前Django-Q自己注册的 ModelAdmin
对象。
信号
Django内置信号系统,我之前有写过一篇简单的文章介绍:3分钟看懂Python后端必须知道的Django的信号机制
Django-Q提供了两类信号:
- 任务加入消息队列前
- 任务执行前
例子代码如下:
from django.dispatch import receiver
from django_q.signals import pre_enqueue, pre_execute
@receiver(pre_enqueue)
def my_pre_enqueue_callback(sender, task, **kwargs):
print("Task {} will be enqueued".format(task["name"]))
@receiver(pre_execute)
def my_pre_execute_callback(sender, func, task, **kwargs):
print("Task {} will be executed by calling {}".format(
task["name"], func))
有需要的话可以注册消息接收器,做一些处理。
小结
搞定~
Django-Q使用下来的体验还是不错的,足够轻量,部署足够方便,足以应付大部分场景了~
参考资料
- Django-Q官方文档:https://django-q.readthedocs.io/en/latest/index.html
- Django-Q项目主页:https://github.com/Koed00/django-q
- 相关issues:https://github.com/Koed00/django-q/issues/179
- 相关issues:https://github.com/Koed00/django-q/issues/513