本文介绍了当运行celery map / reduce任务时,“ ./ manage.py runserver”重新启动;有时会引起与inner_run错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在django项目中有一个视图,该视图触发了芹菜任务。 celery任务本身通过子进程/结构触发了一些map / reduce作业,并且hadoop作业的结果存储在磁盘上-数据库中实际上没有任何存储。在hadoop作业完成后,celery任务会发送django信号,表明已完成,就像这样:

I have a view in my django project that fires off a celery task. The celery task itself triggers a few map/reduce jobs via subprocess/fabric and the results of the hadoop job are stored on disk --- nothing is actually stored in the database. After the hadoop job has been completed, the celery task sends a django signal that it is done, something like this:

# tasks.py
from models import MyModel
import signals

from fabric.operations import local

from celery.task import Task

class Hadoopification(Task):
    def run(self, my_model_id, other_args):
        my_model = MyModel.objects.get(pk=my_model_id)
        self.hadoopify_function(my_model, other_args)
        signals.complete_signal.send(
            sender=self,
            my_model_id=my_model_id,
            complete=True,
        )

    def hadoopify_function(self, my_model, other_args):
        local("""hadoop jar /usr/lib/hadoop/hadoop-streaming.jar -D mapred.reduce.tasks=0 -file hadoopify.py -mapper "parse_mapper.py 0 0" -input /user/me/input.csv -output /user/me/output.csv""")

真正让我感到困惑的是,当运行celery任务时,django运行服务器正在 reloading ,好像我在django项目中的某个地方更改了一些代码(我没有,我可以向您保证!)。有时,这甚至会在runserver命令中引起错误,在runserver命令重新加载之前,我会看到类似以下的输出,并且可以再次执行(注意:此错误消息与)。

What is truly baffling me is that the django runserver is reloading when the celery task is run, as if I had changed some code somewhere in the django project (which I have not, I can assure you!). From time to time, this even causes errors in the runserver command where I see output like the following before the runserver command reloads and is ok again (note: this error message is very similar to the problem described here).

Unhandled exception in thread started by <function inner_run at 0xa18cd14>
Error in sys.excepthook:
Traceback (most recent call last):
  File "/usr/lib/python2.6/dist-packages/apport_python_hook.py", line 48, in apport_excepthook
    if not enabled():
TypeError: 'NoneType' object is not callable

Original exception was:
Traceback (most recent call last):
  File "/home/rdm/Biz/Projects/Daegis/Server_Development/tar/env/lib/python2.6/site-packages/django/core/management/commands/runserver.py", line 60, in inner_run
    run(addr, int(port), handler)
  File "/home/rdm/Biz/Projects/Daegis/Server_Development/tar/env/lib/python2.6/site-packages/django/core/servers/basehttp.py", line 721, in run
    httpd.serve_forever()
  File "/usr/lib/python2.6/SocketServer.py", line 224, in serve_forever
    r, w, e = select.select([self], [], [], poll_interval)
AttributeError: 'NoneType' object has no attribute 'select'

我已将问题缩小到生气的时候e通过将 local( hadoop ...)替换为 local( ls)不会对重新加载django runserver造成任何问题。 hadoop代码中没有错误-当它不是被celery调用时,它可以独立运行。

I've narrowed the problem down to when calls are made to hadoop by replacing local("""hadoop ...""") with local("ls") which does not cause any problems with reloading the django runserver. There are no bugs in the hadoop code --- it runs just fine on its own when its not called by celery.

有什么可能的原因吗?

推荐答案

有在Fabric github页面上对此进行了一些讨论,,和。引发错误的另一种方法是使用设置上下文管理器:

There is some discussion about this on the fabric github page here, here and here. Another option for raising an error is to use the settings context manager:

from fabric.api import settings

class Hadoopification(Task):
    ...
    def hadoopify_function(self, my_model, other_args):
        with settings(warn_only=True):
            result = local(...)
        if result.failed:
            # access result.return_code, result.stdout, result.stderr
            raise UsefulException(...)

这具有允许访问返回码和结果上所有其他属性的优点。

This has the advantage of allowing access to the return code and all of the other attributes on the result.

这篇关于当运行celery map / reduce任务时,“ ./ manage.py runserver”重新启动;有时会引起与inner_run错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-23 19:37