假设我们必须同步对共享资源的读/写访问。多个线程将以读写方式访问该资源(大多数情况下是读取,有时是写入)。我们还假设每次写入都将始终触发读取操作(对象是可观察到的)。
在此示例中,我将想象一个这样的类(原谅的语法和样式,仅用于说明目的):
class Container {
public ObservableCollection<Operand> Operands;
public ObservableCollection<Result> Results;
}
我很想为此目的使用
ReadWriterLockSlim
,我将其放在Container
级别(想象的对象不是那么简单,一个读/写操作可能涉及多个对象):public ReadWriterLockSlim Lock;
对于此示例,
Operand
和Result
的实现没有意义。现在,让我们想象一些观察
Operands
并将产生结果放入Results
的代码:void AddNewOperand(Operand operand) {
try {
_container.Lock.EnterWriteLock();
_container.Operands.Add(operand);
}
finally {
_container.ExitReadLock();
}
}
我们的假设观察者将执行类似的操作,但要消耗一个新元素,并且它将使用
EnterReadLock()
锁定以获取操作数,然后使用EnterWriteLock()
锁定以添加结果(为此,我省略代码)。由于递归,这将产生异常,但是如果我设置LockRecursionPolicy.SupportsRecursion
,那么我将把代码打开为死锁(来自MSDN):为了清楚起见,我重复相关部分:
递归[...]使您的代码更容易出现死锁。
如果我对
LockRecursionPolicy.SupportsRecursion
没错,如果我在同一线程中询问a,比如说,读锁,然后其他人要求写锁,那么我将遇到死锁,然后MSDN会说得通。而且,递归也会以一种可衡量的方式降低性能(如果我使用ReadWriterLockSlim
而不是ReadWriterLock
或Monitor
,那不是我想要的)。问题)
最后,我的问题是(请注意,我不是在搜索有关常规同步机制的讨论,我会知道这个生产者/可观察/观察者场景的问题):
ReadWriterLockSlim
而不是Monitor
(即使在现实世界中,代码的读取远比写入的要多)? 我知道没有最佳的同步机制,因此我们使用的工具必须适合我们的情况,但是我想知道是否存在一些最佳实践,或者我只是忽略了线程和观察者之间的一些非常重要的事情(设想使用Microsoft Reactive Extensions,但问题很笼统) ,而不依赖于该框架)。
可能的解决方案?
我想做的是使事件(以某种方式)推迟:
第一种解决方案
每次更改都不会触发任何
CollectionChanged
事件,它会保留在队列中。提供程序(推送数据的对象)完成后,将手动强制刷新队列(依次引发每个事件)。这可以在另一个线程中甚至在调用者线程中(但在锁之外)完成。它可能会起作用,但会使所有内容都不那么“自动”(每个更改通知必须由生产者本身手动触发,要编写更多代码,到处都是错误)。
第二个解决方案
另一个解决方案可能是提供对我们对可观察集合的锁定的引用。如果我将
ReadWriterLockSlim
包装在自定义对象中(用于将其隐藏在易于使用的IDisposable
对象中),我可能会添加ManualResetEvent
来通知所有锁定已通过这种方式释放,集合本身可能会引发事件(再次在同一线程中或在另一个线程)。第三种解决方案
另一个想法可能是使事件异步。如果事件处理程序需要锁定,则它将停止以等待其时间范围。为此,我担心可能会使用大量线程(尤其是从线程池中使用时)。
老实说,我不知道这些方法中的任何一个是否适用于现实世界中的应用程序(从用户的角度来看,个人而言-我更喜欢第二个方法,但是它隐含了对所有内容的自定义集合,并且它使集合了解线程,并且我会避免使用它,如果可能的)。我不想使代码变得不必要的复杂。
最佳答案
这听起来像是多线程的泡菜。在这种事件链模式中使用递归是相当有挑战性的,同时还要避免死锁。您可能要考虑完全解决问题。
例如,您可以使操作数的添加与事件的引发异步:
private readonly BlockingCollection<Operand> _additions
= new BlockingCollection<Operand>();
public void AddNewOperand(Operand operand)
{
_additions.Add(operand);
}
然后在后台线程中进行实际添加:
private void ProcessAdditions()
{
foreach(var operand in _additions.GetConsumingEnumerable())
{
_container.Lock.EnterWriteLock();
_container.Operands.Add(operand);
_container.Lock.ExitWriteLock();
}
}
public void Initialize()
{
var pump = new Thread(ProcessAdditions)
{
Name = "Operand Additions Pump"
};
pump.Start();
}
这种分离会牺牲一些一致性-在add方法之后运行的代码实际上不知道添加的实际发生时间,这可能是您的代码存在的问题。如果是这样,则可以重写此代码以订阅观察值,并使用
Task
指示添加完成的时间:public Task AddNewOperandAsync(Operand operand)
{
var tcs = new TaskCompletionSource<byte>();
// Compose an event handler for the completion of this task
NotifyCollectionChangedEventHandler onChanged = null;
onChanged = (sender, e) =>
{
// Is this the event for the operand we have added?
if (e.NewItems.Contains(operand))
{
// Complete the task.
tcs.SetCompleted(0);
// Remove the event-handler.
_container.Operands.CollectionChanged -= onChanged;
}
}
// Hook in the handler.
_container.Operands.CollectionChanged += onChanged;
// Perform the addition.
_additions.Add(operand);
// Return the task to be awaited.
return tcs.Task;
}
在处理添加消息的后台线程上引发了事件处理程序逻辑,因此它不可能阻塞前台线程。如果等待窗口的消息泵上的添加,则同步上下文足够智能,可以安排消息泵线程上的继续。
无论您是否沿
Task
路线走,此策略都意味着您可以从可观察的事件中安全地添加更多操作数,而无需重新输入任何锁。