Actor是有状态的,当每一步执行失败后,返回失败地方继续执行时,希望此时的状态是正确的,为了保证这一点,持久化就成了必要的环节了。

Proto.Actor提供了三种方式执久化:

  • Event Sourcing事件溯源
  • Snapshotting快照
  • Event Sourcing with Snapshotting带快照的事件溯源
不管是那种持久化方式,首先要构造一个持久化的提供者,这个提者是内存也好,数据库也罢,本例中用Sqlite作为持久化的载体;在Actor中,实现持久化,首先要创建一个Persistence对象,用来将快照或事件保存起来,最重要的一点是,我们用事件溯源或快照,是帮我们保留住Actor在某刻的状,保留下来,以便我们再次启动时能延续这个状态,所以Persistence有一个很关键的作用就是能从持久化的载体中把原来的状态回复过来,这里,Event Source的把原来的状态步骤走再走一次,到达当前流程的点,但快照不然,直接取的是最后时刻的状态;带快照的事件溯源则是两者的结合。
码友看码:

NuGet安装

Proto.Actor

Proto.Persistence

Proto.Persistence.Sqlite

 using Microsoft.Data.Sqlite;
using Proto;
using Proto.Persistence;
using Proto.Persistence.SnapshotStrategies;
using Proto.Persistence.Sqlite;
using System;
using System.Threading.Tasks; namespace P008_Persistence
{
class Program
{
static void Main(string[] args)
{
//用sqlite持久化后
var actorid = "myactorid";
var dbfile = @"C:\MyFile\Source\Repos\ProtoActorSample\ProtoActorSample\P008_Persistence\data.sqlite";
var sqliteProvider = new SqliteProvider(new SqliteConnectionStringBuilder() { DataSource = dbfile });
while (true)
{
Console.WriteLine("1、事件溯源 2、快照 3、带快照的事件溯源 4、退出");
switch (Console.ReadLine())
{
case "":
CallEventSource(actorid, sqliteProvider);
break;
case "":
CallSnapShoot(actorid, sqliteProvider);
break;
case "":
CallSnapShootEventSource(actorid, sqliteProvider);
break;
case "":
return;
}
}
}
/// <summary>
/// 事件溯源
/// </summary>
/// <param name="actorid"></param>
/// <param name="sqliteProvider"></param>
private static void CallEventSource(string actorid, SqliteProvider sqliteProvider)
{
var props = Actor.FromProducer(() => new EventSourceDataActor(sqliteProvider, actorid));
var pid = Actor.Spawn(props);
var result = true;
while (result)
{
Console.WriteLine("1、Tell 2、删除持久化 3、退出"); switch (Console.ReadLine())
{
case "":
var random = new Random();
var no = random.Next(, );
Console.WriteLine($"随机产生的数字:{no}");
pid.Tell(new Data { Amount = no });
break;
case "":
//完成处理后清理持久化的操作
sqliteProvider.DeleteEventsAsync(actorid, ).Wait();
break;
case "":
result = false;
break;
}
}
} /// <summary>
/// 快照
/// </summary>
/// <param name="actorid"></param>
/// <param name="sqliteProvider"></param>
private static void CallSnapShoot(string actorid, SqliteProvider sqliteProvider)
{
var props = Actor.FromProducer(() => new SnapShootDataActor(sqliteProvider, actorid));
var pid = Actor.Spawn(props);
var result = true;
while (result)
{
Console.WriteLine("1、Tell 2、删除持久化 3、退出"); switch (Console.ReadLine())
{
case "":
var random = new Random();
var no = random.Next(, );
Console.WriteLine($"随机产生的数字:{no}");
pid.Tell(new Data { Amount = no });
break;
case "":
//完成处理后清理持久化的操作
sqliteProvider.DeleteEventsAsync(actorid, ).Wait();
break;
case "":
result = false;
break;
}
} }
/// <summary>
/// 快照事件溯源
/// </summary>
/// <param name="actorid"></param>
/// <param name="sqliteProvider"></param>
private static void CallSnapShootEventSource(string actorid, SqliteProvider sqliteProvider)
{
var props = Actor.FromProducer(() => new SnapShootEventSourceDataActor(sqliteProvider, sqliteProvider, actorid));
var pid = Actor.Spawn(props);
var result = true;
while (result)
{
Console.WriteLine("1、Tell 2、删除持久化 3、退出"); switch (Console.ReadLine())
{
case "":
var random = new Random();
var no = random.Next(, );
Console.WriteLine($"随机产生的数字:{no}");
pid.Tell(new Data { Amount = no });
break;
case "":
//完成处理后清理持久化的操作
sqliteProvider.DeleteEventsAsync(actorid, ).Wait();
sqliteProvider.DeleteSnapshotsAsync(actorid, ).Wait();
break;
case "":
result = false;
break;
}
}
}
} public class Data
{
public long Amount { get; set; }
} #region 事件溯源
public class EventSourceDataActor : IActor
{
private long _value = ;
private readonly Persistence _persistence; public EventSourceDataActor(IEventStore eventStore, string actorId)
{
//事件溯源持久化方式
_persistence = Persistence.WithEventSourcing(eventStore, actorId, ApplyEvent);
}
private void ApplyEvent(Proto.Persistence.Event @event)
{
switch (@event.Data)
{
case Data msg:
_value = _value + msg.Amount;
Console.WriteLine($"累计:{_value}");
break;
}
}
public async Task ReceiveAsync(IContext context)
{
switch (context.Message)
{
case Started _:
await _persistence.RecoverStateAsync();
break;
case Data msg:
await _persistence.PersistEventAsync(new Data { Amount = msg.Amount });
break;
}
}
}
#endregion #region 快照
public class SnapShootDataActor : IActor
{
private long _value = ;
private readonly Persistence _persistence; public SnapShootDataActor(ISnapshotStore snapshotStore, string actorId)
{
//快照持久化方式
_persistence = Persistence.WithSnapshotting(snapshotStore, actorId, ApplySnapshot);
}
private void ApplySnapshot(Proto.Persistence.Snapshot snapshot)
{
switch (snapshot.State)
{
case long value:
_value = value;
Console.WriteLine($"累计:{_value}");
break;
}
}
public async Task ReceiveAsync(IContext context)
{
switch (context.Message)
{
case Started _:
await _persistence.RecoverStateAsync();
break;
case Data msg:
_value = _value + msg.Amount;
await _persistence.DeleteSnapshotsAsync();
await _persistence.PersistSnapshotAsync(_value);
break;
}
}
}
#endregion #region 事件溯源and快照
public class SnapShootEventSourceDataActor : IActor
{
private long _value = ;
private readonly Persistence _persistence; public SnapShootEventSourceDataActor(IEventStore eventStore, ISnapshotStore snapshotStore, string actorId)
{
//注释快照策略
//_persistence = Persistence.WithEventSourcingAndSnapshotting(eventStore, snapshotStore, actorId, ApplyEvent, ApplySnapshot, new IntervalStrategy(5), () => { return _value; });
//无快照策略
_persistence = Persistence.WithEventSourcingAndSnapshotting(eventStore, snapshotStore, actorId, ApplyEvent, ApplySnapshot);
}
private void ApplyEvent(Proto.Persistence.Event @event)
{
switch (@event.Data)
{
case Data msg:
_value = _value + msg.Amount;
Console.WriteLine($"事件溯源累计:{_value}");
break;
}
}
private void ApplySnapshot(Proto.Persistence.Snapshot snapshot)
{
switch (snapshot.State)
{
case long value:
_value = value;
Console.WriteLine($"快照累计:{_value}");
break;
}
}
public async Task ReceiveAsync(IContext context)
{
switch (context.Message)
{
case Started _:
await _persistence.RecoverStateAsync();
break;
case Data msg:
await _persistence.PersistEventAsync(new Data { Amount = msg.Amount });
//无快照策略时启用
await _persistence.PersistSnapshotAsync(_value);
break;
}
}
}
#endregion
}
通过代码看到,持久化是通过在Actor中定义Persistence时,关联一个参数为,Event或Snapshot的方法,并且Actor的Receive方法在Stared到达是恢复(从持久载体中读取数据来恢复),在具体消息到达时,调用Persistence.PersistEventAsync或Persistence.PersisSnapshotAsync来持久化状态数据,这两个方法,都会把调用似递到Persistence产生是关联的那个方法,并把消息实体类通过Event.Data或Snapshot.State传递进去。
 此例分别演示了事件溯源,快照,带快照事件溯源,例子很简单,就是把每次产生的随机数累加起来
1、事件溯源

《通过C#学Proto.Actor模型》之Persistence-LMLPHP

三个绿色箭头,意思是进了三次“1、事件溯源”这个选项
三次蓝色箭头,意思是调用了三次Tell方法,用来获取三次随机数,蓝色椭圆是产生的三个数字,分别是7,7,6,蓝色方框是累计结果,从上往下,第一次是(0+7)7,第二次是(7+7)14,第三次是(14+6)20
红色箭头是退出事件溯源的方法,返回上一级
绿色方框是绿色箭头再次进入,自动恢复,事件溯源后的结果(即从持久化载体中把之前的所有事件重新走一次)还是之前退出时的结果,累计20,所以不管这个Actor在什么地方退出,再次运行,都会把之前的补运行回来。
也可以打开sqlite库进行查看保存的事件结果
2、 快照

《通过C#学Proto.Actor模型》之Persistence-LMLPHP

快照与事件溯源类似,差别在于每次再次进来,只取上次退出时的结果,同时,在数据里,只保存了最后一次的结果。
 
3、带快照的事件溯源

《通过C#学Proto.Actor模型》之Persistence-LMLPHP

与快照类似,上面代码我们是一个事件,一个快照。 
官方给出带快照的事件可以通过快照策略来保存快照
在创建持久化对象时,可以添加快照策略
 _persistence = Persistence.WithEventSourcingAndSnapshotting(eventStore, snapshotStore, actorId, ApplyEvent, ApplySnapshot, new IntervalStrategy(), () => { return _value; });
您可以选择ISnapshotStrategy在保存事件时指定自动保存快照。提供的策略是:
  • EventTypeStrategy - 根据保存的事件类型保存快照
  • IntervalStrategy - 根据保存的事件数量,即每100个事件,定期保存快照
  • TimeStrategy - 根据时间以固定间隔保存快照,即在快照之间等待至少6小时
同时要在Actor的Receive把保存快照注释掉,Demo中我用的是5个事件后保存一次快照,如下图结果

《通过C#学Proto.Actor模型》之Persistence-LMLPHP

绿色是第一次,要保存一下快照,然后之后第五个事件过来后保存第二次快照,如果在第四个事件后程序就退出,那快照保存的只有第一次的,不有担心,当再次调用时,因为记录下了所有事件,Actor会取出最后一次快照,再支执行快照后的事件,这是因为在保存快照和事件时,会把他们的索引保存起来,索引是一样的,就能用最后的快照+这个快照索引后的事件,恢复到退出的地方。

记得执行后查看Sqlite数据,有助于你更好的了解Proto.Actor的Persistence机制哦!

04-14 06:50