本文介绍了芹菜的SQLAlchemy会话问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经为我们的网络应用安排了一些芹菜节拍的重复任务

I have scheduled a few recurring tasks with celery beat for our web app

该应用程序本身是使用金字塔Web框架构建的.使用zopetransaction扩展程序来管理会话

The app itself is build using pyramid web framework. Using the zopetransaction extension to manage session

在芹菜中,我正在将该应用程序用作图书馆.我正在重新定义具有功能的模型中的会话.

In celery, I am using the app as a library. I am redefining session in models with a function.

它工作良好,但偶尔会出现 InvalidRequestError:此会话处于准备"状态;在此事务中无法发出进一步的SQL

It works well but once in a while, it raises InvalidRequestError: This session is in 'prepared' state; no further SQL can be emitted within this transaction

我不确定出什么问题以及为什么会发出这些警告.

I am not sure what is wrong and why it issues these warnings.

示例代码:

tasks.py

def initialize_async_session():
    import sqlalchemy
    from webapp.models import Base, set_dbsession, engine

    Session = sqlalchemy.orm.scoped_session(
                sqlalchemy.orm.sessionmaker(autocommit=True, autoflush=True)
                            )
    Session.configure(bind=engine)
    session = Session()

    set_dbsession(session)
    Base.metadata.bind = engine
    return session


@celery.task
def rerun_scheduler():
    log.info("Starting pipeline scheduler")
    session = initialize_async_session()
    webapp.sheduledtask.service.check_for_updates(session)
    log.info("Ending pipeline scheduler")

在webapp的 models.py

DBSession = scoped_session(sessionmaker(bind=engine, expire_on_commit=False,
                    extension=ZopeTransactionExtension()))

def set_dbsession(db_session=None):
    """
    This function sets the db session
    """
    global DBSession
    if db_session:
        DBSession = db_session
        log.info("session changed to {0}".format(db_session))

更新:

回溯:

Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 54, in new_function
    result = f(*args, **kwargs)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 100, in new_function
    result = f(*args, **kwargs)
  File "/home/ubuntu/modwsgi/env/mvc-service/webapp/webapp/data/mongo_service.py", line 1274, in run
    self.table_params.set_task_status_as_finished()
  File "/home/ubuntu/modwsgi/env/mvc-service/webapp/webapp/mem_objects.py", line 33, in set_task_status_as_finished
    task = Task.get_by_id(self.task_id)
  File "/home/ubuntu/modwsgi/env/mvc-service/webapp/webapp/models.py", line 162, in get_by_id
    return DBSession.query(cls).filter(cls.id == obj_id).first()
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2156, in first
    ret = list(self[0:1])
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2023, in __getitem__
    return list(res)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2227, in __iter__
    return self._execute_and_instances(context)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2240, in _execute_and_instances
    close_with_result=True)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2231, in _connection_from_session
    **kw)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 777, in connection
    close_with_result=close_with_result)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 781, in _connection_for_bind
    return self.transaction._connection_for_bind(engine)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 289, in _connection_for_bind
    self._assert_is_active()
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 217, in _assert_is_active
    "This Session's transaction has been rolled back "
InvalidRequestError: This Session's transaction has been rolled back by a nested rollback() call.  To begin a new transaction, issue Session.rollback() first.

#########################################################################

[2013-05-30 14:32:57,782: WARNING/PoolWorker-3] Exception in thread Thread-4:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 54, in new_function
    result = f(*args, **kwargs)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 100, in new_function
    result = f(*args, **kwargs)
  File "/home/ranjith/wksp/mvc-service/webapp/webapp/data/mongo_service.py", line 1274, in run
    self.table_params.set_task_status_as_finished()
  File "/home/ranjith/wksp/mvc-service/webapp/webapp/mem_objects.py", line 33, in set_task_status_as_finished
    task = Task.get_by_id(self.task_id)
  File "/home/ranjith/wksp/mvc-service/webapp/webapp/models.py", line 166, in get_by_id
    return DBSession.query(cls).filter(cls.id == obj_id).first()
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2145, in first
    ret = list(self[0:1])
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2012, in __getitem__
    return list(res)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2216, in __iter__
    return self._execute_and_instances(context)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2229, in _execute_and_instances
    close_with_result=True)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2220, in _connection_from_session
    **kw)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 798, in connection
    close_with_result=close_with_result)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 802, in _connection_for_bind
    return self.transaction._connection_for_bind(engine)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 281, in _connection_for_bind
    self._assert_active()
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 181, in _assert_active
    "This session is in 'prepared' state; no further "
InvalidRequestError: This session is in 'prepared' state; no further SQL can be emitted within this transaction.

推荐答案

我相信问题是您正在尝试在Celery任务中使用SQLAlchemy会话.

I believe the problem is that you are attempting to use the SQLAlchemy session in your Celery task.

我建议做的第一件事是创建两个单独的作用域会话,一个用于Celery应用程序,另一个用于Web应用程序.接下来,我将确保您的Celery数据库会话在Celery初始化期间仅配置一次.您可以使用Celery worker_init.connect 来确保它在Celery启动期间创建数据库( http://hynek.me/articles/using-celery-with-pyramid/).

The first thing I recommend doing is creating two separate scoped sessions, one for your Celery application and another one for your web application. Next, I would make sure your Celery database session is only configured once during Celery initialization. You can use the Celery worker_init.connect to make sure it creates the database during Celery startup (http://hynek.me/articles/using-celery-with-pyramid/).

非常重要的是,您的Web应用程序不要使用与Celery应用程序相同的数据库会话.

It is very important that your web application does not use the same database session as your Celery application.

您的 tasks.py 文件的内容如下:

from celery import Celery
from celery.signals import worker_init
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

Session = sqlalchemy.orm.scoped_session(
    sqlalchemy.orm.sessionmaker(autocommit=True, autoflush=True))


@worker_init.connect
def initialize_session():
    some_engine = create_engine('database_url')    
    Session.configure(bind=some_engine)

@celery.task
def rerun_scheduler():
    log.info("Starting pipeline scheduler")
    webapp.sheduledtask.service.check_for_updates(Session)
    log.info("Ending pipeline scheduler")

这篇关于芹菜的SQLAlchemy会话问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-23 19:16