本文介绍了NewThreadScheduler.Default时间表上的同一主题的所有工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

目前,我正在试图总结我的周围RX .NET并发头被什么东西感到困惑。我想运行四个并行的比较慢的任务,所以我估计 NewThreadScheduler.Default 将要走的路,因为它的代表一个对象,日程表每个单元。在一个单独的线程作品

I'm currently trying to wrap my head around concurrency with RX .NET and getting confused by something. I want to run four relatively slow tasks in parallel, so I assumed NewThreadScheduler.Default would be the way to go, as it "Represents an object that schedules each unit of work on a separate thread.".

下面是我的设置代码:

    static void Test()
    {
        Console.WriteLine("Starting. Thread {0}", Thread.CurrentThread.ManagedThreadId);

        var query = Enumerable.Range(1, 4);
        var obsQuery = query.ToObservable(NewThreadScheduler.Default);
        obsQuery.Subscribe(DoWork, Done);

        Console.WriteLine("Last line. Thread {0}", Thread.CurrentThread.ManagedThreadId);
    }

    static void DoWork(int i)
    {
        Thread.Sleep(500);
        Console.WriteLine("{0} Thread {1}", i, Thread.CurrentThread.ManagedThreadId);
    }

    static void Done()
    {
        Console.WriteLine("Done. Thread {0}", Thread.CurrentThread.ManagedThreadId);
    }



我以为X线程Y将每一次输出不同的线程ID然而实际的输出是:

I assumed the "X Thread Y" would output a different thread id every time, however the actual output is:

Starting. Thread 1
Last line. Thread 1
1 Thread 3
2 Thread 3
3 Thread 3
4 Thread 3
Done. Thread 3



所有的工作正在一个在顺序相同的新的线程,这是不我所期待的。

All the work is being one on the same new thread in sequential order, which isn't what I was expecting.

我假设我失去了一些东西,但我想不出什么。

I'm assuming I'm missing something, but I can't figure out what.

推荐答案

有两部分可观察到的查询中,查询本身和订阅。 (这也是ObserveOn和SubscribeOn运营商之间的差异。)

There are two parts to an observable query, the Query itself and the Subscription. (This is also the difference between the ObserveOn and SubscribeOn operators.)

查询

Enumerable
    .Range(1, 4)
    .ToObservable(NewThreadScheduler.Default);

这将创建一个可观察的的生产上的缺省值 NewThreadScheduler 该系统。

This creates an observable that produces values on the default NewThreadScheduler for that system.

您认购

obsQuery.Subscribe(DoWork, Done);

这运行的DoWork 通过生成的每个值在查询完成查询与结束的onComplete 电话。我不认为有什么线程的订阅方法的功能任何保证如果查询的所有值都相同的线程是线程的认购将运行上产生将被要求,在实践中。看来他们也使得它使所有订阅的呼叫这是最有可能做的摆脱了很多常见的多线程错误的在同一个线程是由

This runs DoWork for each value produced by the Query and Done when the Query finishes with an OnComplete call. I don't think there are any guarantees about what thread the functions in the subscribe method will be called on, in practice if all values of the query are produced on the same thread that is the thread the subscription will be run on. It appears they are also making it so all of the subscription calls are made on the same thread which is most likely done to get rid of a lot of common multi-threading errors.

所以,你有两个问题,一个是你的记录,如果你改变你的查询

So you have two issues, one is with your logging, if you change your Query to

Enumerable
    .Range(1, 4)
    .Do(x => Console.WriteLine("Query Value {0} produced on Thread {1}", x, Thread.CurrentThread.ManagedThreadId);
    .ToObservable(NewThreadScheduler.Default);

您'会看到一个新的线程生成的每个值。

You'll see each value produced on a new thread.

另一个问题是接收的意图和设计的。这是的的的查询是长期运行的过程和订阅是的结果涉及一个简短的方法。如果你想运行长时间运行的函数作为接收可观察到的你的最佳选择是使用的。

The other issue is one of the intention and design of Rx. It's intended that the Query is the long-running process and the Subscription is a short method that deals with the results. If you want to run a long running function as an Rx Observable your best option is to use Observable.ToAsync.

这篇关于NewThreadScheduler.Default时间表上的同一主题的所有工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-20 20:35