我有一个关于计划某个事件的软件设计的问题,该事件将来会在Heroku的分布式环境中触发一次。

我相信写出我想要达到的目标更好,但是我当然做了研究,即使经过两个小时的工作也无法弄清楚。

假设在我的views.py中,我有一个函数:

def after_6_hours():
    print('6 hours passed.')


def create_game():
    print('Game created')
    # of course time will be error, but that's just an example
    scheduler.do(after_6_hours, time=now + 6)


因此,我要实现的是能够在调用after_6_hours后的6小时内准确运行create_game函数。现在,如您所见,此函数是在通常的clock.pytask.py或etc等文件中定义的。

现在,如何使我的整个应用程序始终在Heroku中运行,并能够将此作业添加到此虚构的scheduler库的队列中?

附带说明一下,我不能使用Heroku的Temporizer插件。 APScheduler和Python rq的组合看起来很有希望,但是示例很简单,所有示例都安排在clock.py中的同一文件上,而我只是不知道如何将所有内容与现有的设置捆绑在一起。提前致谢!

最佳答案

在Heroku中,您可以在Web Dyno中运行Django应用程序,这将负责服务您的应用程序并安排任务。
例如(请注意,我没有测试运行代码):

创建after_hours.py,它将具有您要计划的功能(请注意,我们也将在worker中使用相同的源代码)。

def after_6_hours():
        print('6 hours passed.')


views.py中使用rq(请注意,根据您的情况,仅rq是不够的,因为您必须计划任务)和rq-scheduler

from redis import Redis
from rq_scheduler import Scheduler
from datetime import timedelta

from after_hours import after_6_hours

def create_game():
    print('Game created')

    scheduler = Scheduler(connection=Redis()) # Get a scheduler for the "default" queue
    scheduler.enqueue_in(timedelta(hours=6), after_6_hours) #schedules the job to run 6 hours later.


调用create_game()应该安排after_6_hours()在6小时后运行。

提示:您可以使用Redis附加组件在Heroku中配置Redis To Go

下一步是运行rqscheduler工具,该工具每分钟对Redis进行一次轮询,以查看当时是否有要执行的作业,并将其放入队列中(rq工作者将在其中监听)。

现在,在Worker Dyno中创建文件after_hours.py

def after_6_hours():
    print('6 hours passed.')
    #Better return something


并创建另一个文件worker.py

import os

import redis
from rq import Worker, Queue, Connection

from after_hours import after_6_hours

listen = ['high', 'default', 'low'] # while scheduling the task in views.py we sent it to default

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(map(Queue, listen))
        worker.work()


并运行此worker.py

python worker.py


那应该在Worker Dyno中运行计划的任务(在本例中为afer_6_hours)。
请注意,这里的关键是使工作人员也可以使用相同的源代码(在本例中为after_hours.py)。 rq docs中强调了相同的内容


确保工人和工作产生者完全共享
相同的源代码。


如果有帮助,docs中将提示您处理不同的代码库。


对于Web进程无法访问源代码的情况
在工作进程中运行(即代码库X调用了延迟函数
从代码库Y),您可以将函数作为字符串引用传递,
太。

 q = Queue('low', connection=redis_conn)
 q.enqueue('my_package.my_module.my_func', 3, 4)



希望rq-scheduler也尊重这种传递字符串而不是函数对象的方式。

您可以使用任何模块/计划工具(Celery / RabbitMQ,APScheduler等),只要您了解这件事。

07-27 19:31