本文介绍了动态产生和放大器; spsc_queues的安全使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述 唯一的boost :: lockfree ,我所做的工作是 spsc_queue ,这是惊人的。The only boost::lockfree that I've made work is spsc_queue, and it's amazing.不过,我想其中一个线程传递的信息来回内核来实现它 - 1 线程However, I'd like to implement it where one thread passes information back and forth with cores - 1 threads.我在想,每个工作线程都会有自己的一套 spsc_queues 的,进出,这将被存储在向量 s其中主线程将通过信息传递到一个输出队列,然后移动到下一个队列中的矢量等,以及周期传入队列。I was thinking that each of the worker threads would have its own set of spsc_queues, in and out, which would be stored in vectors where the main thread would pass information to one outgoing queue and then move to the next queue in the vector and so on as well as cycle through the incoming queues.可以将这些 spsc_queue ■在两个矢量 s内推安全弹出?Can these spsc_queues in the two vectors be pushed and popped safely?如果没有,有没有按照我的意图使用spsc_queues的另一种方式?If not, is there an alternative way to use spsc_queues according to my intent?推荐答案你基本上建议使用2个(芯-1)spsc_queues在其预定的方式。是的,这是可行的。You're basically proposing to use 2x(cores-1) spsc_queues in their intended fashion. Yes, this would work.我不明白你怎么会的显然的主线程处理上的反应(传入队列)虽然。熊实物存在对输入队列没有等待操作,既不你会想要一个(它不会很lockfree更多,你会离开所有其他工人不提供服务,同时等待传入消息)。I don't see how you would obviously handle the responses ("incoming queues") on the main thread though. Bear in kind there is no "waiting" operation on the incoming queue, and neither would you want one (it wouldn't be very lockfree any more and you would leave all the other workers unserviced while waiting for an incoming message). 除了:如果您DIMENSION的响应队列这样,他们将永远不会溢出,那么你就可以得到与朴素roundrobin阅读从它(只是不尝试读取所有的消息很长的路要走一个响应队列,因为这是一个安全可靠的方式来获得对方响应队列调度饥饿)。在底部code样品( code样品)Code sample at the bottom (CODE SAMPLE)所有这一切都使我的强烈的怀疑你的真正的后 异步的,而不是的并发的。我中有你会是很幸福的感觉即可运行1线程应用程序,只需维护每个可用的信息 - 无论它的源代码或内容 - 尽快成为可能。 All of this leads me to strongly suspect that you are actually after asynchrony, as opposed to concurrency. I have the feeling you would be very happy to get your application running on 1 thread, just servicing each available message - whatever it's source or content - as soon "as possible". 这种模式将很好地随之扩展为大量可以在很短的时间** [¹] **处理小的消息。 1时线程已经饱和,你可以通过增加工人向外扩展。 在有需要显著较长的加工信息服务,您可以卸载上的专用的线程只处理低频的要求,以异步的方式处理这些任务:他们可以推动小完成信息返回到主工作队列一旦他们完成。This model will scale very well for large number of small messages that can be processed in very little time**[¹]**. When 1 thread is saturated, you could scale out by adding workers. In services that have messages that require significantly longer processing, you could offload these tasks on dedicated threads that only handle the low-frequency requests, in an asynchronous manner: they can just push small "completion" messages back onto the main work queue once they're done.这一切都将导致我认为像libuv一个图书馆或升压短耳。如果你已经知道了手,你会需要运行锁少得到你需要的吞吐量(这是相当罕见的境外工业强度的服务器解决方案),你可以模拟使用锁少队列一样。这是更多的工作,因为你有一个epoll的/选择/ poll的循环集成到您的生产者。我建议你​​保持它的简单朴素简单,只采用aditional的复杂性,因为你真正需要他们。 All of this would lead me to think of a library like libuv or Boost Asio. If you already know off-hand that you'll need to be running lock-less to get the throughput you need (this is quite rare outside industrial-strength server solutions) you could emulate the same using lock-less queues. This is much more work because you'll have to integrate a epoll/select/poll loop into your producers. I suggest you keep it simple simple simple and only adopt aditional complexities as you actually need them. 口头禅:正确的,首先良好分解;后来优化(注意良好分解在那里。在这种情况下,它意味着你/不是/让你高通量队列缓慢的处理任务。)(Note the "well-factored" there. In this case it means you will /not/ allow slow processing tasks on you high-throughput queues.)作为承诺的一个简单证明的概念,显示了使用多个双向SPSC队列的消息与几个工作线程。 As promised a simple Proof Of Concept that shows using multiple bi-directional SPSC queue messaging with several worker threads. 完全lockfree版本: 住在Coliru Completely lockfree version: Live On Coliru有相当一些细微之处这里。特别注意到如何underdimensioning队列的将会的导致直接丢弃消息。这不会发生,如果消费者可以与生产商跟上,但只要有操作系统的活动你可以不知道,所以你应该增加检查这一点。There are quite some subtleties here. In particular note how underdimensioning the queues will lead to silently dropped messages. This won't happen if the consumers can keep up with the producers, but as long as there is OS activity you can't know so you should be adding checks for this. 更新按照在评论中要求,下面是检查队列饱和度版本 - 不放弃的消息。 查看它住在Coliru 太 UPDATE As per request in the comments, here's a version that checks for queue saturation - without dropping messages. See it Live On Coliru too.没有消息可以被丢弃没有更多晚到(由于主循环不会直到所有响应已收到出口)的循环赛不再依赖于循环变量,因为发送可能停滞,这将导致读取相同的响应队列所有的时间。这是死锁或其他最坏情况下的性能的配方。在饱和队列的情况下,我们不得不想,以平衡负载的正确方法。我选择了一个微小的睡眠。从技术上讲,这意味着我们的无锁的无等待的当队列饱和溶液下降到正规合作社多线程。也许,如果检测到这种情况,你会preFER到的增长的队列。这一切都取决于你的系统。 您想知道什么时候发生这种情况;我已经包括了所有线程简单拥堵的统计数据。在我的系统,以微入睡通话 sleep_for(纳秒(1)),输出是:No messages can be droppedThere are no more late arrivals (since the main loop doesn't exit until all responses have been received)The round robin is no longer tied to the loop variable, because the sending might stall, which would result in reading the same response queue all the time. This is a recipe for dead lock or other worst-case performance.In case of saturated queues, we had to think of a proper way to balance the load. I opted for a tiny sleep. Technically, this means that our lock-free wait-free solution degrades to regular co-operative multithreading when queues are saturated. Perhaps you would prefer to grow the queues if this is detected. It all depends on your system.You will want to know when this happens; I've included simple congestion statistics for all threads. On my system, with microsleep call sleep_for(nanoseconds(1)), the output is:Received 1048576 responses (97727 100529 103697 116523 110995 115291 103048 102611 102583 95572 )Total: 1048576 responses/1048576 requestsMain thread congestion: 21.2%Worker #0 congestion: 1.7%Worker #1 congestion: 3.1%Worker #2 congestion: 2.0%Worker #3 congestion: 2.5%Worker #4 congestion: 4.5%Worker #5 congestion: 2.5%Worker #6 congestion: 3.0%Worker #7 congestion: 3.2%Worker #8 congestion: 3.1%Worker #9 congestion: 3.6%real 0m0.616suser 0m3.858ssys 0m0.025s正如你所看到的,在Coliru调谐需要是截然不同的。只要你的系统运行在最大负荷下运行的风险,这种调整将是必要的。As you can see, the tuning on Coliru needed to be drastically different. This tuning would be required whenever your system runs a risk of running at maximum load.相反,你必须想到如何当一个队列为空油门负荷:在这一刻,工人将队列只忙着循环,等待消息的出现。在一个真正的服务器环境中,当负载隔三差五发生,你会想检测空闲时间,降低轮询频率,从而节省CPU功率(在同一时间允许CPU最大化througput其他线程)。Conversely, you'd have to think of how to throttle the load when a queue is empty: at this moment, workers will just busy loop on the queue, waiting for messages to appear. In a real server environment, when loads happen in bursts, you will want to detect "idle" periods and reduce the polling frequency so as to conserve CPU power (at the same time allowing the CPU to maximize througput on other threads).是第二个,混合动力版(无锁,直到队列饱和度):Included in this answer is the second, "hybrid" version (lock-free until queue saturation):#include <boost/lockfree/spsc_queue.hpp>#include <boost/scoped_ptr.hpp>#include <boost/thread.hpp>#include <memory>#include <iostream>#include <iterator>namespace blf = boost::lockfree;static boost::atomic_bool shutdown(false);static void nanosleep(){ //boost::this_thread::yield(); boost::this_thread::sleep_for(boost::chrono::nanoseconds(1));}struct Worker{ typedef blf::spsc_queue<std::string > queue; typedef std::unique_ptr<queue> qptr; qptr incoming, outgoing; size_t congestion = 0; Worker() : incoming(new queue(64)), outgoing(new queue(64)) { } void operator()() { std::string request; while (!shutdown) { while (incoming->pop(request)) while (!outgoing->push("Ack: " + request)) ++congestion, nanosleep(); } }};int main(){ boost::thread_group g; std::vector<Worker> workers(10); std::vector<size_t> responses_received(workers.size()); for (auto& w : workers) g.create_thread(boost::ref(w)); // let's give them something to do const auto num_requests = (1ul<<20); std::string response; size_t congestion = 0; for (size_t total_sent = 0, total_received = 0; total_sent < num_requests || total_received < num_requests;) { if (total_sent < num_requests) { // send to a random worker auto& to = workers[rand() % workers.size()]; if (to.incoming->push("request " + std::to_string(total_sent))) ++total_sent; else congestion++; } if (total_received < num_requests) { static size_t round_robin = 0; auto from = (++round_robin) % workers.size(); if (workers[from].outgoing->pop(response)) { ++responses_received[from]; ++total_received; } } } auto const sum = std::accumulate(begin(responses_received), end(responses_received), size_t()); std::cout << "\nReceived " << sum << " responses ("; std::copy(begin(responses_received), end(responses_received), std::ostream_iterator<size_t>(std::cout, " ")); std::cout << ")\n"; shutdown = true; g.join_all(); std::cout << "\nTotal: " << sum << " responses/" << num_requests << " requests\n"; std::cout << "Main thread congestion: " << std::fixed << std::setprecision(1) << (100.0*congestion/num_requests) << "%\n"; for (size_t idx = 0; idx < workers.size(); ++idx) std::cout << "Worker #" << idx << " congestion: " << std::fixed << std::setprecision(1) << (100.0*workers[idx].congestion/responses_received[idx]) << "%\n";} [¹] 很少有时间之中,一如往昔,一个相对的概念,大致意思是不是新的消息之间的平均时间更短的时间。例如。如果你有每秒100个请求,然后5ms的处理时间将非常少为单线程系统。但是,如果你有每秒10K的请求,1毫秒的处理时间会约16核服务器上的限制。[¹] "very little time" being, as ever, a relative notion that roughly means "less time than the average time between new messages". E.g. if you have 100 requests per seconds, then 5ms processing time would be "very little" for a single-threaded system. However, if you have 10k requests per second, 1 ms processing time would be about the limit on a 16-core server. 这篇关于动态产生和放大器; spsc_queues的安全使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!
10-22 09:02