我有一个关于计划某个事件的软件设计的问题,该事件将来会在Heroku的分布式环境中触发一次。
我相信写出我想要达到的目标更好,但是我当然做了研究,即使经过两个小时的工作也无法弄清楚。
假设在我的views.py
中,我有一个函数:
def after_6_hours():
print('6 hours passed.')
def create_game():
print('Game created')
# of course time will be error, but that's just an example
scheduler.do(after_6_hours, time=now + 6)
因此,我要实现的是能够在调用
after_6_hours
后的6小时内准确运行create_game
函数。现在,如您所见,此函数是在通常的clock.py
或task.py
或etc等文件中定义的。现在,如何使我的整个应用程序始终在Heroku中运行,并能够将此作业添加到此虚构的
scheduler
库的队列中?附带说明一下,我不能使用Heroku的Temporizer插件。 APScheduler和Python rq的组合看起来很有希望,但是示例很简单,所有示例都安排在
clock.py
中的同一文件上,而我只是不知道如何将所有内容与现有的设置捆绑在一起。提前致谢! 最佳答案
在Heroku中,您可以在Web Dyno中运行Django应用程序,这将负责服务您的应用程序并安排任务。
例如(请注意,我没有测试运行代码):
创建after_hours.py
,它将具有您要计划的功能(请注意,我们也将在worker中使用相同的源代码)。
def after_6_hours():
print('6 hours passed.')
在
views.py
中使用rq
(请注意,根据您的情况,仅rq
是不够的,因为您必须计划任务)和rq-scheduler
:from redis import Redis
from rq_scheduler import Scheduler
from datetime import timedelta
from after_hours import after_6_hours
def create_game():
print('Game created')
scheduler = Scheduler(connection=Redis()) # Get a scheduler for the "default" queue
scheduler.enqueue_in(timedelta(hours=6), after_6_hours) #schedules the job to run 6 hours later.
调用
create_game()
应该安排after_6_hours()在6小时后运行。提示:您可以使用
Redis
附加组件在Heroku中配置Redis To Go
。下一步是运行
rqscheduler
工具,该工具每分钟对Redis进行一次轮询,以查看当时是否有要执行的作业,并将其放入队列中(rq
工作者将在其中监听)。现在,在Worker Dyno中创建文件
after_hours.py
def after_6_hours():
print('6 hours passed.')
#Better return something
并创建另一个文件
worker.py
:import os
import redis
from rq import Worker, Queue, Connection
from after_hours import after_6_hours
listen = ['high', 'default', 'low'] # while scheduling the task in views.py we sent it to default
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(conn):
worker = Worker(map(Queue, listen))
worker.work()
并运行此
worker.py
python worker.py
那应该在Worker Dyno中运行计划的任务(在本例中为
afer_6_hours
)。请注意,这里的关键是使工作人员也可以使用相同的源代码(在本例中为
after_hours.py
)。 rq
docs中强调了相同的内容确保工人和工作产生者完全共享
相同的源代码。
如果有帮助,docs中将提示您处理不同的代码库。
对于Web进程无法访问源代码的情况
在工作进程中运行(即代码库X调用了延迟函数
从代码库Y),您可以将函数作为字符串引用传递,
太。
q = Queue('low', connection=redis_conn)
q.enqueue('my_package.my_module.my_func', 3, 4)
希望
rq-scheduler
也尊重这种传递字符串而不是函数对象的方式。您可以使用任何模块/计划工具(Celery / RabbitMQ,APScheduler等),只要您了解这件事。