【Seata源码学习 】篇六 全局事务提交与回滚

全局事务提交

TM在RPC远程调用RM后,如果没有出现异常,将向TC发送提交全局事务请求io.seata.tm.api.TransactionalTemplate#execute

public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. Get transactionInfo
        //获取@GlobalTransation注解的属性封装的TransactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
        // GlobalTransactionContext 全局事务上下文对象 用于创建一个新事务,或者获取当前事务
        // GlobalTransactionContext.getCurrent - > RootContext.getXID -> ContextCore.get
        // ContextCore 是一个接口 seata有两个实现  FastThreadLocalContextCore  ThreadLocalContextCore 都是基于ThreadLocal存储XID
        GlobalTransaction tx = GlobalTransactionContext.getCurrent();

        // 1.2 Handle the transaction propagation.
        // 获取当前事务的传播行为
        Propagation propagation = txInfo.getPropagation();
        // 存储被挂起的事务XID
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            //处理事务的传播行为
            switch (propagation) {
                //如果当前事务的传播行为是 NOT_SUPPORTED 则以非事务的方式执行调用methodInvocation.proceed()
                // 如果当前拦截器不为拦截链的最后一个,则将获取下一个拦截器执行invoke方法,如果是最后一个,则直接执行目标方法
                case NOT_SUPPORTED:
                    // If transaction is existing, suspend it.
                    //如果当前存在全局事务,则挂起当前事务
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                    }
                    // Execute without transaction and return.
                    // 责任链模式 继续执行拦截器链
                    return business.execute();
                case REQUIRES_NEW:
                    // If transaction is existing, suspend it, and then begin new transaction.
                    // 如果当前存在事务 则挂起当前事务 并创建一个新的事务
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                        tx = GlobalTransactionContext.createNew();
                    }
                    // Continue and execute with new transaction
                    break;
                case SUPPORTS:
                    // If transaction is not existing, execute without transaction.
                    // 如果不存在事务 则跳过当前事务拦截器 执行拦截器链并返回
                    if (notExistingTransaction(tx)) {
                        return business.execute();
                    }
                    // Continue and execute with new transaction
                    break;
                case REQUIRED:
                    // If current transaction is existing, execute with current transaction,
                    // else continue and execute with new transaction.
                    break;
                case NEVER:
                    // If transaction is existing, throw exception.
                    // 有事务抛出异常
                    if (existingTransaction(tx)) {
                        throw new TransactionException(
                            String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                    , tx.getXid()));
                    } else {
                        // Execute without transaction and return.
                        return business.execute();
                    }
                case MANDATORY:
                    // If transaction is not existing, throw exception.
                    // 要求必须有事务,没事务抛出异常
                    if (notExistingTransaction(tx)) {
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    // Continue and execute with current transaction.
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }

            // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
            // 如果当前的事务上下文中不存在事务 那么此次事务发起为 TM 角色为 Launcher
            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }

            // set current tx config to holder
            // 记录当前的全局锁配置,存放到 ThreadLocal
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

            try {
                // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
                //    else do nothing. Of course, the hooks will still be triggered.
                // 执行全局事务开启的前后置钩子方法
                // 如果当前事务的角色是 Participant 也就是 RM ,判断当前事务上下文RootContext是否存在XID,如果不存在,抛出异常
                // 如果当前事务的角色是 launcher 也就是 TM ,判断当前事务上下文RootContext是否存在XID,如果存在,抛出异常
                // 如果不存在,则通过TmNettyRemotingClient  向TC发送一个 GlobalReportRequest 同步消息,并获取TC返回的XID,绑定到RootContext
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    // Do Your Business
                    // 执行执行拦截器链路
                    rs = business.execute();
                } catch (Throwable ex) {
                    // 3. The needed business exception to rollback.
                    // 如果抛出异常,判断异常是否在指定的范围中(默认为Throwable类及其子类)
                    // 执行异常回滚的前后钩子方法
                    // 如果当前事务的角色是 launcher 也就是 TM ,通过TmNettyRemotingClient  向TC发送一个 GlobalRollbackRequest 同步消息
                    // 并记录TC返回的当前事务状态Status
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

                // 4. everything is fine, commit.
                //  如果方法执行过程中没有出现异常
                //  执行事务提交的前后置方法
                //  如果当前事务的角色是 launcher 也就是 TM ,通过TmNettyRemotingClient  向TC发送一个 GlobalCommitRequest 同步消息 提交全局事务
                // 并记录TC返回的当前事务状态Status
                commitTransaction(tx);

                return rs;
            } finally {
                //5. clear
                // 恢复以前的全局锁配置
                resumeGlobalLockConfig(previousConfig);
                // 执行整个事务完成的前后置方法
                triggerAfterCompletion();
                // 移除当前绑定的事务钩子对象
                cleanUp();
            }
        } finally {
            // If the transaction is suspended, resume it.
            // 当前事务执行完毕后,恢复挂起的事务,
            // 获取suspendedResourcesHolder关联的xid,由RootContext重新绑定
            if (suspendedResourcesHolder != null) {
                tx.resume(suspendedResourcesHolder);
            }
        }
    }

事务提交前后钩子方法执行

在全局事务提交前后,seata给我们预留了两个钩子方法,可以根据实际生产中的业务需要进行扩展

io.seata.tm.api.TransactionalTemplate#commitTransaction

    private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            //全局事务提交前钩子方法
            triggerBeforeCommit();
            //全局事务提交
            tx.commit();
            //全局事务提交后钩子方法
            triggerAfterCommit();
        } catch (TransactionException txe) {
            // 4.1 Failed to commit
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.CommitFailure);
        }
    }

TM提交全局事务

io.seata.tm.api.DefaultGlobalTransaction#commit

public void commit() throws TransactionException {
        // 全局事务提交必须是 TM发起的
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
            }
            return;
        }

        //XID不能为空
        assertXIDNotNull();

        // 全局事务提交失败默认重试5次
        int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
        try {
            while (retry > 0) {
                try {
                    retry--;
                    //由 TransactionManager 对事务进行提交操作 
                    status = transactionManager.commit(xid);
                    break;
                } catch (Throwable ex) {
                    LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global commit", ex);
                    }
                }
            }
        } finally {
            if (xid.equals(RootContext.getXID())) {
                //挂起事务 -》 将当前的XID解绑,将XID封装到SuspendedResourcesHolder中
                suspend();
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] commit status: {}", xid, status);
        }
    }

全局事务提交失败默认会重试5次,seata对事务的操作都是交由TransactionManager接口处理,TC、TM、RM分别有不同的实现类,具体如下图所示

【Seata源码学习 】篇六 全局事务提交与回滚-LMLPHP

io.seata.tm.DefaultTransactionManager#commit

    public GlobalStatus commit(String xid) throws TransactionException {
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        //绑定全局事务XID
        globalCommit.setXid(xid);
        //向TC发送GlobalCommitRequest消息
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }

TC处理全局事务提交

TC接收到消息后由ServerHandler调用processMessage处理,将根据消息的类型从pair中匹配合适的RemotingProcessor处理器; GlobalCommitResponse 消息将匹配ServerOnRequestProcessor处理器,ServerOnRequestProcessor进而又将消息由TransactionMessageHandler进行处理。

TransactionMessageHandler 也是一个接口,TC将使用的是DefaultCoordinator实现类,最终将消息向上转型为AbstractTransactionRequestToTC,调用不同消息子类的handle进行处理。

io.seata.core.protocol.transaction.GlobalCommitRequest#handle

    public AbstractTransactionResponse handle(RpcContext rpcContext) {
        return handler.handle(this, rpcContext);
    }

io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.GlobalCommitRequest, io.seata.core.rpc.RpcContext)

public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {
        GlobalCommitResponse response = new GlobalCommitResponse();
        response.setGlobalStatus(GlobalStatus.Committing);
        exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {
            @Override
            public void execute(GlobalCommitRequest request, GlobalCommitResponse response)
                throws TransactionException {
                try {
                    //真正开始执行全局事务提交
                    doGlobalCommit(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore,
                        String.format("global commit request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()),
                        e);
                }
            }
            @Override
            public void onTransactionException(GlobalCommitRequest request, GlobalCommitResponse response,
                                               TransactionException tex) {
                super.onTransactionException(request, response, tex);
                checkTransactionStatus(request, response);
            }

            @Override
            public void onException(GlobalCommitRequest request, GlobalCommitResponse response, Exception rex) {
                super.onException(request, response, rex);
                checkTransactionStatus(request, response);
            }


        }, request, response);
        return response;
    }

io.seata.server.coordinator.DefaultCoordinator#doGlobalCommit

    protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
            throws TransactionException {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        response.setGlobalStatus(core.commit(request.getXid()));
    }

io.seata.server.coordinator.DefaultCore#commit

  @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        // 根据XID 从存储介质中找到对应的GlobalSession
        GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
        // 如果没找到 则全局事务已结束
        if (globalSession == null) {
            return GlobalStatus.Finished;
        }
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        // just lock changeStatus

        boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
            if (globalSession.getStatus() == GlobalStatus.Begin) {
                // Highlight: Firstly, close the session, then no more branch can be registered.
                // AT模式释放全局锁
                globalSession.closeAndClean();
                // 如果是AT模式 ,可以异步提交
                if (globalSession.canBeCommittedAsync()) {
                    // 将当前全局事务状态改为 异步提交中 并且将当前全局事务会话加入到 ASYNC_COMMITTING_SESSION_MANAGER sessionHolder中
                    globalSession.asyncCommit();
                    MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);
                    return false;
                } else {
                    globalSession.changeGlobalStatus(GlobalStatus.Committing);
                    return true;
                }
            }
            return false;
        });

        if (shouldCommit) {
            boolean success = doGlobalCommit(globalSession, false);
            //If successful and all remaining branches can be committed asynchronously, do async commit.
            if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
                globalSession.asyncCommit();
                return GlobalStatus.Committed;
            } else {
                return globalSession.getStatus();
            }
        } else {
            return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
        }
    }

DefaultCore 首先根据XID从存储介质中找到GlobalSession,AT模式下释放全局锁,标记当前全局事务状态为GlobalStatus.AsyncCommitting

seata 服务端在启动时会初始化 DefaultCoordinator,DefaultCoordinator会启动周期线程每隔一秒钟执行handleAsyncCommitting处理异步全局事务提交

     asyncCommitting.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,
            ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
protected void handleAsyncCommitting() {
        // 用于从存储介质中查找满足条件的session
        SessionCondition sessionCondition = new SessionCondition(GlobalStatus.AsyncCommitting);
        // 查找所有状态为 GlobalStatus.AsyncCommitting 的全局事务
        Collection<GlobalSession> asyncCommittingSessions =
                SessionHolder.getAsyncCommittingSessionManager().findGlobalSessions(sessionCondition);
        if (CollectionUtils.isEmpty(asyncCommittingSessions)) {
            return;
        }
        SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession -> {
            try {
                asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
                // 调用 DefaultCore.doGlobalCommit 处理
                core.doGlobalCommit(asyncCommittingSession, true);
            } catch (TransactionException ex) {
                LOGGER.error("Failed to async committing [{}] {} {}", asyncCommittingSession.getXid(), ex.getCode(), ex.getMessage(), ex);
            }
        });
    }

io.seata.server.coordinator.DefaultCore#doGlobalCommit

public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
        boolean success = true;
        // start committing event
        MetricsPublisher.postSessionDoingEvent(globalSession, retrying);
        //当前事务未AT模式
        if (globalSession.isSaga()) {
            success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
        } else {
            // 遍历所有的分支事务
            Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
                // if not retrying, skip the canBeCommittedAsync branches
                if (!retrying && branchSession.canBeCommittedAsync()) {
                    return CONTINUE;
                }

                BranchStatus currentStatus = branchSession.getStatus();
                if (currentStatus == BranchStatus.PhaseOne_Failed) {
                    SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                    return CONTINUE;
                }
                try {
                    // 策略模式 不同的分支事务类型交由不同的 Core 处理
                    BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
                    if (isXaerNotaTimeout(globalSession,branchStatus)) {
                        LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                        branchStatus = BranchStatus.PhaseTwo_Committed;
                    }
                    switch (branchStatus) {
                        //分支事务提交成功  将当前分支事务从全局事务中移除
                        case PhaseTwo_Committed:
                            SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                            return CONTINUE;
                        case PhaseTwo_CommitFailed_Unretryable:
                            //not at branch
                            SessionHelper.endCommitFailed(globalSession, retrying);
                            LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
                            return false;

                        default:
                            if (!retrying) {
                                globalSession.queueToRetryCommit();
                                return false;
                            }
                            if (globalSession.canBeCommittedAsync()) {
                                LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
                                    branchSession.getBranchId(), branchStatus);
                                return CONTINUE;
                            } else {
                                LOGGER.error(
                                    "Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
                                return false;
                            }
                    }
                } catch (Exception ex) {
                    StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
                        new String[] {branchSession.toString()});
                    if (!retrying) {
                        globalSession.queueToRetryCommit();
                        throw new TransactionException(ex);
                    }
                }
                return CONTINUE;
            });
            // Return if the result is not null
            if (result != null) {
                return result;
            }
            //If has branch and not all remaining branches can be committed asynchronously,
            //do print log and return false
            if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
                LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
                return false;
            }
            if (!retrying) {
                //contains not AT branch
                globalSession.setStatus(GlobalStatus.Committed);
            }
        }
        // if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is
        // executed to improve concurrency performance, and the global transaction ends..
        if (success && globalSession.getBranchSessions().isEmpty()) {
            SessionHelper.endCommitted(globalSession, retrying);
            LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
        }
        return success;
    }

全局事务在提交时,会遍历所有与之关联的分支事务ID,向RM发送BranchCommitRequest 消息,RM分支事务提交成功后将分支事务从全局事务中删除。当所有分支事务全部执行成功后,将GlobalSession信息从数据库中删除

    public BranchStatus branchCommit(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
        try {
            BranchCommitRequest request = new BranchCommitRequest();
            request.setXid(branchSession.getXid());
            request.setBranchId(branchSession.getBranchId());
            request.setResourceId(branchSession.getResourceId());
            request.setApplicationData(branchSession.getApplicationData());
            request.setBranchType(branchSession.getBranchType());
            return branchCommitSend(request, globalSession, branchSession);
        } catch (IOException | TimeoutException e) {
            throw new BranchTransactionException(FailedToSendBranchCommitRequest,
                    String.format("Send branch commit failed, xid = %s branchId = %s", branchSession.getXid(),
                            branchSession.getBranchId()), e);
        }
    }

RM提交分支事务

TC发送BranchCommitRequest消息后,RM接收到并将消息交由RmBranchCommitProcessor处理

io.seata.core.rpc.netty.RmNettyRemotingClient#registerProcessor

       // 分支事务提交
        RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);

io.seata.core.rpc.processor.client.RmBranchCommitProcessor#process

   public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        //获取TC的地址
        String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
        Object msg = rpcMessage.getBody();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("rm client handle branch commit process:" + msg);
        }
        //处理分支事务提交
        handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);
    }

io.seata.core.rpc.processor.client.RmBranchCommitProcessor#handleBranchCommit

    private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {
        BranchCommitResponse resultMessage;
        resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("branch commit result:" + resultMessage);
        }
        try {
            this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);
        } catch (Throwable throwable) {
            LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable);
        }
    }

io.seata.rm.AbstractRMHandler#onRequest

    public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
        if (!(request instanceof AbstractTransactionRequestToRM)) {
            throw new IllegalArgumentException();
        }
        AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;
        transactionRequest.setRMInboundMessageHandler(this);

        return transactionRequest.handle(context);
    }

io.seata.core.protocol.transaction.BranchCommitRequest#handle

    public AbstractTransactionResponse handle(RpcContext rpcContext) {
        return handler.handle(this);
    }

io.seata.rm.AbstractRMHandler#handle(io.seata.core.protocol.transaction.BranchCommitRequest)

    public BranchCommitResponse handle(BranchCommitRequest request) {
        BranchCommitResponse response = new BranchCommitResponse();
        //真正开始处理分支事务提交
        exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
            @Override
            public void execute(BranchCommitRequest request, BranchCommitResponse response)
                throws TransactionException {
                doBranchCommit(request, response);
            }
        }, request, response);
        return response;
    }

io.seata.rm.AbstractRMHandler#doBranchCommit

protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
    throws TransactionException {
    String xid = request.getXid();
    long branchId = request.getBranchId();
    String resourceId = request.getResourceId();
    String applicationData = request.getApplicationData();
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
    }
    // AT模式下 将从存储介质中删除Undo.log日志
    BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
        applicationData);
    response.setXid(xid);
    response.setBranchId(branchId);
    response.setBranchStatus(status);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Branch commit result: " + status);
    }

}

全局事务提交时,TC会释放全局锁,如果为AT模式将标记全局事务状态为异步提交中,然后遍历所有与当前全局事务绑定的分支事务,向RM发送BranchCommitRequest消息,RM随后删除undo.log日志,并向TC返回状态。待所有的分支事务全部删除undo.log日志成功后,TC将标记全局事务状态为Committed 并 从存储介质中删除全局事务信息

全局事务回滚

TM提交全局事务回滚请求

TM在RPC远程调用RM后,如果出现异常,获取自身业务方法抛出异常,将执行completeTransactionAfterThrowing方法

io.seata.tm.api.TransactionalTemplate#completeTransactionAfterThrowing

    private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
        //roll back
        if (txInfo != null && txInfo.rollbackOn(originalException)) {
            try {
                //全局事务回滚
                rollbackTransaction(tx, originalException);
            } catch (TransactionException txe) {
                // Failed to rollback
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                        TransactionalExecutor.Code.RollbackFailure, originalException);
            }
        } else {
            // not roll back on this exception, so commit
            commitTransaction(tx);
        }
    }

接着向TC发送GlobalRollbackRequest消息

io.seata.tm.DefaultTransactionManager#rollback

    public GlobalStatus rollback(String xid) throws TransactionException {
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setXid(xid);
        GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
        return response.getGlobalStatus();
    }

TC处理全局事务回滚请求

io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.GlobalRollbackRequest, io.seata.core.rpc.RpcContext)

 public GlobalRollbackResponse handle(GlobalRollbackRequest request, final RpcContext rpcContext) {
        GlobalRollbackResponse response = new GlobalRollbackResponse();
        response.setGlobalStatus(GlobalStatus.Rollbacking);
        exceptionHandleTemplate(new AbstractCallback<GlobalRollbackRequest, GlobalRollbackResponse>() {
            @Override
            public void execute(GlobalRollbackRequest request, GlobalRollbackResponse response)
                throws TransactionException {
                try {
                    doGlobalRollback(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore, String
                        .format("global rollback request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);
                }
            }

            @Override
            public void onTransactionException(GlobalRollbackRequest request, GlobalRollbackResponse response,
                                               TransactionException tex) {
                super.onTransactionException(request, response, tex);
                // may be appears StoreException outer layer method catch
                checkTransactionStatus(request, response);
            }

            @Override
            public void onException(GlobalRollbackRequest request, GlobalRollbackResponse response, Exception rex) {
                super.onException(request, response, rex);
                // may be appears StoreException outer layer method catch
                checkTransactionStatus(request, response);
            }
        }, request, response);
        return response;
    }

io.seata.server.coordinator.DefaultCore#rollback

 public GlobalStatus rollback(String xid) throws TransactionException {
        GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
        if (globalSession == null) {
            return GlobalStatus.Finished;
        }
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        // just lock changeStatus
        boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {
            globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.
            if (globalSession.getStatus() == GlobalStatus.Begin) {
                // 标记全局事务状态为 Rollbacking
                globalSession.changeGlobalStatus(GlobalStatus.Rollbacking);
                return true;
            }
            return false;
        });
        if (!shouldRollBack) {
            return globalSession.getStatus();
        }
        
        boolean rollbackSuccess = doGlobalRollback(globalSession, false);
        return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();
    }

io.seata.server.coordinator.DefaultCore#doGlobalRollback

 public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
        boolean success = true;
        // start rollback event
        MetricsPublisher.postSessionDoingEvent(globalSession, retrying);

        if (globalSession.isSaga()) {
            success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);
        } else {
            //遍历分支事务
            Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {
                BranchStatus currentBranchStatus = branchSession.getStatus();
                if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
                    SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                    return CONTINUE;
                }
                try {
                    //分支事务回滚操作
                    BranchStatus branchStatus = branchRollback(globalSession, branchSession);
                    if (isXaerNotaTimeout(globalSession, branchStatus)) {
                        LOGGER.info("Rollback branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                        branchStatus = BranchStatus.PhaseTwo_Rollbacked;
                    }
                    switch (branchStatus) {
                        //回滚成功则将分支事务从存储介质中删除
                        case PhaseTwo_Rollbacked:
                            SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                            LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                            return CONTINUE;
                        //回滚失败则标记全局锁回滚失败,并释放全局锁
                        case PhaseTwo_RollbackFailed_Unretryable:
                            SessionHelper.endRollbackFailed(globalSession, retrying);
                            LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                            return false;
                        default:
                            LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                            if (!retrying) {
                                globalSession.queueToRetryRollback();
                            }
                            return false;
                    }
                } catch (Exception ex) {
                    StackTraceLogger.error(LOGGER, ex,
                        "Rollback branch transaction exception, xid = {} branchId = {} exception = {}",
                        new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});
                    if (!retrying) {
                        globalSession.queueToRetryRollback();
                    }
                    throw new TransactionException(ex);
                }
            });
            // Return if the result is not null
            if (result != null) {
                return result;
            }
        }

        // In db mode, lock and branch data residual problems may occur.
        // Therefore, execution needs to be delayed here and cannot be executed synchronously.
        if (success) {
            // 全局事务回滚成功,将全局锁释并标记全局锁状态为 Rollbacked 随后删除全局事务信息
            SessionHelper.endRollbacked(globalSession, retrying);
            LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());
        }
        return success;
    }

io.seata.server.coordinator.AbstractCore#branchRollback

    public BranchStatus branchRollback(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
        try {
            BranchRollbackRequest request = new BranchRollbackRequest();
            request.setXid(branchSession.getXid());
            request.setBranchId(branchSession.getBranchId());
            request.setResourceId(branchSession.getResourceId());
            request.setApplicationData(branchSession.getApplicationData());
            request.setBranchType(branchSession.getBranchType());
            return branchRollbackSend(request, globalSession, branchSession);
        } catch (IOException | TimeoutException e) {
            throw new BranchTransactionException(FailedToSendBranchRollbackRequest,
                    String.format("Send branch rollback failed, xid = %s branchId = %s",
                            branchSession.getXid(), branchSession.getBranchId()), e);
        }
    }

与全局事务提交类似,全局事务回滚时也是遍历所有的分支事务,随后进行分支事务回滚操作,分支事务回滚成功后就将分支事务的信息从存储介质中删除;待所有的分支事务全部回滚后,就将全局锁释放,并删除全局锁信息。

接下来我们看下分支事务回滚操作

RM处理分支事务回滚

TC向RM发送BranchRollbackRequest消息后,RM将消息交由 RmBranchRollbackProcessor 进行处理

        RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);

io.seata.core.rpc.processor.client.RmBranchRollbackProcessor#handleBranchRollback

    private void handleBranchRollback(RpcMessage request, String serverAddress, BranchRollbackRequest branchRollbackRequest) {
        BranchRollbackResponse resultMessage;
        resultMessage = (BranchRollbackResponse) handler.onRequest(branchRollbackRequest, null);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("branch rollback result:" + resultMessage);
        }
        try {
            this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);
        } catch (Throwable throwable) {
            LOGGER.error("send response error: {}", throwable.getMessage(), throwable);
        }
    }

io.seata.rm.AbstractRMHandler#handle(io.seata.core.protocol.transaction.BranchRollbackRequest)

    public BranchRollbackResponse handle(BranchRollbackRequest request) {
        BranchRollbackResponse response = new BranchRollbackResponse();
        exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {
            @Override
            public void execute(BranchRollbackRequest request, BranchRollbackResponse response)
                throws TransactionException {
                doBranchRollback(request, response);
            }
        }, request, response);
        return response;
    }

io.seata.rm.AbstractRMHandler#doBranchRollback

    protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
        throws TransactionException {
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
        }
        //AT模式 使用的是 DataSourceManager
        BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch Rollbacked result: " + status);
        }
    }

io.seata.rm.datasource.DataSourceManager#branchRollback

    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                       String applicationData) throws TransactionException {
        DataSourceProxy dataSourceProxy = get(resourceId);
        if (dataSourceProxy == null) {
            throw new ShouldNeverHappenException(String.format("resource: %s not found",resourceId));
        }
        try {
            //通过undo.log日志进行回滚
            UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
        } catch (TransactionException te) {
            StackTraceLogger.info(LOGGER, te,
                "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
                new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
            if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
            } else {
                return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
            }
        }
        return BranchStatus.PhaseTwo_Rollbacked;

    }

io.seata.rm.datasource.undo.AbstractUndoLogManager#undo

public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
        Connection conn = null;
        ResultSet rs = null;
        PreparedStatement selectPST = null;
        boolean originalAutoCommit = true;

        for (; ; ) {
            try {
                conn = dataSourceProxy.getPlainConnection();

                // The entire undo process should run in a local transaction.
                if (originalAutoCommit = conn.getAutoCommit()) {
                    conn.setAutoCommit(false);
                }

                // Find UNDO LOG
                //根据全局事务xid 和分支事务id查找回滚日志
                selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                selectPST.setLong(1, branchId);
                selectPST.setString(2, xid);
                rs = selectPST.executeQuery();
                //存在回滚日志
                boolean exists = false;
                while (rs.next()) {
                    exists = true;

                    // It is possible that the server repeatedly sends a rollback request to roll back
                    // the same branch transaction to multiple processes,
                    // ensuring that only the undo_log in the normal state is processed.
                    int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
                    if (!canUndo(state)) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
                        }
                        return;
                    }

                    String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
                    //解析回滚日志
                    Map<String, String> context = parseContext(contextString);
                    byte[] rollbackInfo = getRollbackInfo(rs);

                    String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                    UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                        : UndoLogParserFactory.getInstance(serializer);
                    //反序列化成BranchUndoLog对象
                    BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                    try {
                        // put serializer name to local
                        setCurrentSerializer(parser.getName());
                        List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                        if (sqlUndoLogs.size() > 1) {
                            Collections.reverse(sqlUndoLogs);
                        }
                        for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                            TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
                                conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
                            sqlUndoLog.setTableMeta(tableMeta);
                            AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                dataSourceProxy.getDbType(), sqlUndoLog);
                            //执行回滚日志
                            undoExecutor.executeOn(conn);
                        }
                    } finally {
                        // remove serializer name
                        removeCurrentSerializer();
                    }
                }

                // If undo_log exists, it means that the branch transaction has completed the first phase,
                // we can directly roll back and clean the undo_log
                // Otherwise, it indicates that there is an exception in the branch transaction,
                // causing undo_log not to be written to the database.
                // For example, the business processing timeout, the global transaction is the initiator rolls back.
                // To ensure data consistency, we can insert an undo_log with GlobalFinished state
                // to prevent the local transaction of the first phase of other programs from being correctly submitted.
                // See https://github.com/seata/seata/issues/489

                if (exists) {
                    //回滚日志执行成功后删除
                    deleteUndoLog(xid, branchId, conn);
                    //提交本地事务
                    conn.commit();
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,
                            State.GlobalFinished.name());
                    }
                } else {
                    insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
                    conn.commit();
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,
                            State.GlobalFinished.name());
                    }
                }

                return;
            } catch (SQLIntegrityConstraintViolationException e) {
                // Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
                }
            } catch (Throwable e) {
                if (conn != null) {
                    try {
                        conn.rollback();
                    } catch (SQLException rollbackEx) {
                        LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
                    }
                }
                throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
                    .format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
                        branchId, e.getMessage()), e);

            } finally {
                try {
                    if (rs != null) {
                        rs.close();
                    }
                    if (selectPST != null) {
                        selectPST.close();
                    }
                    if (conn != null) {
                        if (originalAutoCommit) {
                            conn.setAutoCommit(true);
                        }
                        conn.close();
                    }
                } catch (SQLException closeEx) {
                    LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
                }
            }
        }
    }

通过xid和分支事务ID,从业务库中的undo.log表中获取回滚日志,经过解析和反序列化后执行

io.seata.rm.datasource.undo.AbstractUndoExecutor#executeOn

public void executeOn(Connection conn) throws SQLException {
        // 前后镜像数据 与当前数据三者比较
        // 如果前镜像数据 = 后镜像数据 无需回滚 数据没有变化
        // 如果后镜像数据 != 当前数据 不能回滚,因为产生了脏数据
        if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) {
            return;
        }
        PreparedStatement undoPST = null;
        try {
            // 构建回滚SQL模版
            String undoSQL = buildUndoSQL();
            undoPST = conn.prepareStatement(undoSQL);
            //获取数据修改前的镜像数据
            TableRecords undoRows = getUndoRows();
            //遍历所有修改前的行数据
            for (Row undoRow : undoRows.getRows()) {
                ArrayList<Field> undoValues = new ArrayList<>();
                List<Field> pkValueList = getOrderedPkList(undoRows, undoRow, getDbType(conn));
                for (Field field : undoRow.getFields()) {
                    if (field.getKeyType() != KeyType.PRIMARY_KEY) {
                        undoValues.add(field);
                    }
                }
                //拼装成真正的回滚SQL
                undoPrepare(undoPST, undoValues, pkValueList);
                //执行回滚SQL
                undoPST.executeUpdate();
            }

        } catch (Exception ex) {
            if (ex instanceof SQLException) {
                throw (SQLException) ex;
            } else {
                throw new SQLException(ex);
            }
        }
        finally {
            //important for oracle
            IOUtil.close(undoPST);
        }

    }

在执行回滚SQL前,会将前后镜像数据与当前数据进行比较

    // 如果前镜像数据 = 后镜像数据 无需回滚 数据没有变化
    // 如果后镜像数据 != 当前数据 不能回滚,因为产生了脏数据

如果后镜像数据等于当前数据 且 不等于前镜像数据,那么将创建回滚SQL并执行,完成分支事务回滚操作

总结图

【Seata源码学习 】篇六 全局事务提交与回滚-LMLPHP

12-07 10:25