事务:

一、可能同时存在不可预知数量的事务;

二、需要协调所有相关事务并行;

三、事务可能分布在不同的服务器上;

四、通过本地事务结合,实施分布式事务;(结合Redis发布订阅机制、结合接口入参,Redis用于做跨服务器发布订阅,通过消息传递实现对整体服务的最终要么提交、要么集体回滚的控制)

---------------------------------------------------

一、事务订阅消息约定:

  事务分为发起方、关联方;

  事务发起方,负责创建分布式事务(同时产生一个唯一事务ID),同时自动监听当前分布式事务的订阅消息,主题为:DistributedTransaction_StatusReport_TransactionID

  该主题的消息分为两个参数,一个是RequestID、一个是TransactionStatus,表示目标服务的执行状态。

  在分布式事务内调用其它服务时,同时将事务ID传递过去,同时为每个不同服务的请求产生一个RequestID;

  (微服务中的服务API请求基类需要为分布式事务预留事务ID字段,默认为null或空,需要以事务形式执行时传入,或者特定接口才实现分布式事务,则为接口添加事务ID专用参数)

      其它服务在执行时,判断有事务ID,则将操作基于事务的情形调度,但不执行提交,在提交前,做WaitOne和超时等待;

      等待前启动Redis订阅,订阅该事务ID关联的消息,消息为:DistributedTransaction_StatusCommit_TransactionID

      监听到该消息时,做Set操作,使超时退出,并完成事务操作;

  Redis消息会有三种结果:

    消息超时:回滚

    消息提交:提交

    消息回滚:回滚

    【服务不论执行回滚还是提交,都需要解除对上面事务消息的订阅;】

   分布式事务创建方,负责记录各个服务调用的结果,一发现有服务调用阶段失败、出错、没有完成,即时对前面的其它服务发送回滚通知,后续服务不再继续调用;

    创建方在全部服务、自身事务都执行后,确认没有问题后,向各个分布式机器推送消息,通知执行提交。

  ----------------------------------------------

  至此分布式事务完成。解决多个微服务在服务间调用时,各个本地事务协调的问题。

二、设计:

  DistributedTransaction.Create()//产生一个分布式事务ID,或者考虑使用Using写法事务范围内,分布调用其它服务,并检查每个服务的调度结果,

  DistributedTransaction.SessionID //保存上面调用Create创建的事务ID

  DistributedTransaction.Commit() //执行分布式事务的提交,事实就是向Redis发布提交消息,向全部订阅中的设备,发送订阅消息;

  DistributedTransaction.Rollback() //执行分布式事务的回滚。同样是向Redis发布回滚消息。

  DistributedTransaction.AddServiceTransaction({ServiceRequestID、TransactionStatus}) //添加一个事务关联方,用于本地记录有几个服务参与了事务,并且记录对方的状态(等待中、已提交、已回滚),也表示需要为每个被请求的服务关联一个请求ID,后续服务若发生回滚或提交,可以向创建方发送消息表明状态。

  DistributedTransaction.HasRollbacks() //用于判断是否有服务事务发生回滚

  DistributedTransaction.HasCommits() //用于判断是否有事务发生提交;

  DistributedTransaction.WaitComplate()

  //用于判断是否全部提交成功,只有全部状态为已提交时,视为全部提交,如果有提交有回滚,则事务有问题,视为false,如果有等待,有提交,则等待最终完成(也需要有超时机制)。

  //该方法会等待最终结果,并且,返回事务的最终结果及描述信息,但对于失败的部份,仅能记录请求ID,可以通过扩展,完成对请求ID和服务本地关联;


三、问题:

  1、事务的跟踪,上述没有跟踪机制,如果,假定有三个不同的服务参与事务,其中三个都执行完成,事务处于待提交或回滚状态,在发布提交消息前,

    其中一个服务A当机(假定断电),无法接收到消息,另外两个服务成功接收到消息,此时另外两个服务执行提交,服务A由于当机,没有提交;

    理论上,由于事务不参与业务,不跟踪具体影响的表、影响前后的数据,因此,无法执行人工回滚;

    但理论上可以增加回滚或提交、服务节点增加等机制来确保 分布式事务发起方,清楚是否全部完成,还是部份完成提交。

    但若假设事务发起发当机?本机事务尚未提交,但其它三个服务的事务均已提交?本机恢复后理论上应该执行本地事务的提交。

  2、超时时间的问题

    服务等待接收来自Redis的提交或回滚指令,在等待阶段,为了实现自动回滚,因此会进行超时监控,超时时间应该是比较短的时间,但由于不确定后续

    事务创建方还有多少个服务需要调用,且不能确定每个服务估计的执行时长,较短的时间有可能引发不正常回滚。

    如果可能,应该考虑多个服务并行化执行,确保合并相同的时间,等待的时间为最耗时服务的执行时间;任何事务都应该控制在尽可能短的时间内完成。

  2.1、超时引发回滚时

    上述未设计由于某服务等待超时自动回滚后,通知到创建方,或者关联服务批量执行回滚,另外,如果由发生回滚的服务发送通知。

四、解决的问题:

    上述机制,可以解决非异常情况下的分布式、微服务事务的同步执行或同步回滚,前提假设:不存在服务器突然当机、Redis当机的情况下。

----------------------------------------------------

下面再来一段EF/LINQ2SQL跨库事务代码,实现原理和上面的原理相似,只是没有跨服务器的场景,都是本地事务:未经过封装,仅为实验性代码,如何需要在生产环境中使用,建议写一个类做封装,方便集中控制,无差别控制,不区分需要跨库的数据库数量,实现出错时自动对已完成(未提交)的事务进行集中回滚。

 var dc = new DataContexts(0);
        dc.Ztb_shopdict.Connection.Open();
        var transaction1 = dc.Ztb_shopdict.Connection.BeginTransaction();
        dc.Ztb_shopinfo.Connection.Open();
        var tran2 = dc.Ztb_shopinfo.Connection.BeginTransaction();

        //事务是否成功的标记
        bool TransactionStatus = true;
        try
        {
            dc.Ztb_shopdict.Transaction = transaction1;
            dc.Ztb_shopdict.Dict_SystemPara.InsertOnSubmit(new API.Domain.ztb_shopdict.Dict_SystemPara() { ShopID = 0, ParaKey = "a", ParaValue = "v" });
            dc.Ztb_shopdict.SubmitChanges();
            try
            {
                dc.Ztb_shopinfo.Transaction = tran2;
                dc.Ztb_shopinfo.Activity_Item.InsertOnSubmit(new API.Domain.ztb_shopinfo.Activity_Item() { Activity_Info_ID = 0, Activity_Price = 0, addActivity_Price = 0, enable = true, firstActivity_Price = 0, levelId = 0, ServiceID = 0, ShopID = 0 });
                dc.Ztb_shopinfo.SubmitChanges();
            }
            catch (Exception ex)
            {
                tran2.Rollback();
                //同时抛出错误,引起1回滚
                throw ex;
            }
        }
        catch (Exception ex)
        {
            TransactionStatus = false;
            transaction1.Rollback();
            Response.Write(ex.ToString());
        }

        if (TransactionStatus)
        {
            //所有事务都成功了,一起提交
                transaction1.Commit();
                tran2.Commit();
        }

  

02-13 00:28