本文介绍了扭曲:为什么将延迟的回调传递给延迟的线程会导致线程突然阻塞?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试使用txredis(redis的非阻塞式扭曲api)作为持久性消息队列,但尝试通过我正在处理的抓项目来建立持久性消息队列.我发现,尽管客户端没有阻塞,但是它变得比原先要慢得多,因为反应堆循环中原本应该成为一个事件的事件被分成了数千个步骤.

I unsuccessfully tried using txredis (the non blocking twisted api for redis) for a persisting message queue I'm trying to set up with a scrapy project I am working on. I found that although the client was not blocking, it became much slower than it could have been because what should have been one event in the reactor loop was split up into thousands of steps.

因此,我改为尝试使用redis-py(常规阻止扭曲的api)并将调用包装在延迟的线程中.它很好用,但是我想在调用redis时执行一次内部延迟,因为我想建立连接池以试图进一步加快速度.

So instead, I tried making use of redis-py (the regular blocking twisted api) and wrapping the call in a deferred thread. It works great, however I want to perform an inner deferred when I make a call to redis as I would like to set up connection pooling in attempts to speed things up further.

以下是我对扭曲文档中的一些示例代码的解释,这些示例代码用于递延线程以说明我的用例:

Below is my interpretation of some sample code taken from the twisted docs for a deferred thread to illustrate my use case:

#!/usr/bin/env python
from twisted.internet import reactor,threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'


def aBlockingRedisCall():
    print 'doing lookup... this may take a while'
    time.sleep(10)
    return 'results from redis'

def result(res):
    print res

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)
    d = threads.deferToThread(aBlockingRedisCall)
    d.addCallback(result)
    reactor.run()

if __name__=='__main__':
    main()

这是我对连接池的更改,使延迟线程中的代码受阻:

And here is my alteration for connection pooling that makes the code in the deferred thread blocking :

#!/usr/bin/env python
from twisted.internet import reactor,defer
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        time.sleep(10) # this is now blocking.. any ideas?
        d = defer.Deferred()
        d.addCallback(gotFinalResult)
        d.callback(x)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

所以我的问题是,有人知道为什么我的更改会导致延迟的线程被阻塞和/或有人可以提出更好的解决方案吗?

So my question is, does anyone know why my alteration causes the deferred thread to be blocking and/or can anyone suggest a better solution?

推荐答案

好,作为扭曲的文档说:

无论何时使用阻塞代码(例如sleep),都必须将其推迟到新线程中.

Whenever you're using blocking code, such as sleep, you have to defer it to a new thread.

#!/usr/bin/env python
from twisted.internet import reactor,defer, threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        def getstuff( x ):
            time.sleep(3)
            return "stuff is %s" % x

        # getstuff is blocking, so you need to push it to a new thread
        d = threads.deferToThread(getstuff, x)
        d.addCallback(gotFinalResult)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

如果redis api不太复杂,则可以使用twisted.web重写它,而不是仅在很多线程中调用阻塞的api.

In case the redis api is not very complex it might be more natural to rewrite it using twisted.web, instead of just calling the blocking api in a lot threads.

这篇关于扭曲:为什么将延迟的回调传递给延迟的线程会导致线程突然阻塞?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-21 03:03