public Object execute(TransactionalExecutor business)throws Throwable { // 1. Get transactionInfo // 获取事务信息 TransactionInfotxInfo= business.getTransactionInfo(); if (txInfo == null) { thrownewShouldNeverHappenException("transactionInfo does not exist"); } // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'. // 获取当前事务,主要获取Xid GlobalTransactiontx= GlobalTransactionContext.getCurrent();
// 1.2 Handle the transaction propagation. // 根据配置的不同事务传播行为,执行不同的逻辑 Propagationpropagation= txInfo.getPropagation(); SuspendedResourcesHoldersuspendedResourcesHolder=null; try { switch (propagation) { case NOT_SUPPORTED: // If transaction is existing, suspend it. if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); } // Execute without transaction and return. return business.execute(); case REQUIRES_NEW: // If transaction is existing, suspend it, and then begin new transaction. if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); tx = GlobalTransactionContext.createNew(); } // Continue and execute with new transaction break; case SUPPORTS: // If transaction is not existing, execute without transaction. if (notExistingTransaction(tx)) { return business.execute(); } // Continue and execute with new transaction break; case REQUIRED: // If current transaction is existing, execute with current transaction, // else continue and execute with new transaction. break; case NEVER: // If transaction is existing, throw exception. if (existingTransaction(tx)) { thrownewTransactionException( String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s" , tx.getXid())); } else { // Execute without transaction and return. return business.execute(); } case MANDATORY: // If transaction is not existing, throw exception. if (notExistingTransaction(tx)) { thrownewTransactionException("No existing transaction found for transaction marked with propagation 'mandatory'"); } // Continue and execute with current transaction. break; default: thrownewTransactionException("Not Supported Propagation:" + propagation); }
// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'. // 当前没有事务,则创建一个新的事务 if (tx == null) { tx = GlobalTransactionContext.createNew(); }
// set current tx config to holder GlobalLockConfigpreviousConfig= replaceGlobalLockConfig(txInfo);
try { // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC, // else do nothing. Of course, the hooks will still be triggered. // 开始执行全局事务 beginTransaction(txInfo, tx);
Object rs; try { // Do Your Business // 执行当前业务逻辑: // 1. 在TC注册当前分支事务,TC会在branch_table中插入一条分支事务数据 // 2. 执行本地update语句,并在执行前后查询数据状态,并把数据前后镜像存入到undo_log表中 // 3. 远程调用其他应用,远程应用接收到xid,也会注册分支事务,写入branch_table及本地undo_log表 // 4. 会在lock_table表中插入全局锁数据(一个分支一条) rs = business.execute(); } catch (Throwable ex) { // 3. The needed business exception to rollback. // 发生异常全局回滚,各个数据通过undo_log表进行事务补偿 completeTransactionAfterThrowing(txInfo, tx, ex); throw ex; }
// 4. everything is fine, commit. // 全局提交事务 commitTransaction(tx);
return rs; } finally { //5. clear // 清除所有资源 resumeGlobalLockConfig(previousConfig); triggerAfterCompletion(); cleanUp(); } } finally { // If the transaction is suspended, resume it. if (suspendedResourcesHolder != null) { tx.resume(suspendedResourcesHolder); } } }