• Seata 源码篇之AT模式启动流程 - 中 - 03



    本系列文章:


    数据源代理

    当我们的数据源被代理后,代理数据源方法调用会走AOP拦截逻辑,也就是被SeataAutoDataSourceProxyAdvice的invoke方法拦截。invoke方法内部会将原本调用DataSource的方法转发给SeataDataSourceProxy执行:

       @Override
        public Object invoke(MethodInvocation invocation) throws Throwable {
            // 1. 检查当前是否存在全局事务,或者是否需要获取全局锁
            if (!inExpectedContext()) {
                return invocation.proceed();
            }
    
            // 2. 获取当前调用的是数据源的哪个方法
            Method method = invocation.getMethod();
            String name = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
    
            Method declared;
            try {
                declared = DataSource.class.getDeclaredMethod(name, parameterTypes);
            } catch (NoSuchMethodException e) {
                return invocation.proceed();
            }
    
            // 3. 取出当前数据源对应的SeataDataSourceProxy,然后调用代理数据源对应的方法
            DataSource origin = (DataSource) invocation.getThis();
            SeataDataSourceProxy proxy = DataSourceProxyHolder.get(origin);
            Object[] args = invocation.getArguments();
            return declared.invoke(proxy, args);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    在这里插入图片描述

    SeataDataSourceProxy这里采用装饰器模式实现对DataSource的增强,同时借助动态代理实现对用户的无感装饰。这里有趣的一点在于,为什么不直接在拦截器invoke方法内部实现拦截逻辑,而是借助装饰器倒了一手,大家可以思考一下原因。

    我们通常会通过DataSource的getConnection方法从连接池中获取一个空闲连接,然后借助Connection创建一个会话对象Statement,最后利用Statament对象完成SQL语句的执行。Seata需要拦截SQL执行,那么就不仅需要在DataSource层面做装饰增强,还需要在Connection和Statement层面同样进行装饰增强。

    public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
        @Override
        public ConnectionProxy getConnection() throws SQLException {
            Connection targetConnection = targetDataSource.getConnection();
            // 返回的Connection对象同样采用装饰器进行增强
            return new ConnectionProxy(this, targetConnection);
        }
    ...
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    ConnectionProxy 内部会在创建Statement会话对象的时候进行装饰增强:

    public abstract class AbstractConnectionProxy implements Connection {
        @Override
        public Statement createStatement() throws SQLException {
            Statement targetStatement = getTargetConnection().createStatement();
            // 返回的Statement采用装饰器进行增强
            return new StatementProxy(this, targetStatement);
        }
        ... 
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    但是ConnectionProxy不仅仅负责对Statement对象进行装饰,Seata还需要能够在commit和rollback等时间点进行拦截,因此ConnectionProxy的commit和rollback方法就不能只是简单的方法转发了,而是需要增加相关拦截逻辑,这一点后文会讲到。

    同时ConnectionProxy内部还需要维护本次连接期间的上下文信息,上下文信息由ConnectionContext保存:

    public class ConnectionContext {
        // 全局事务ID
        private String xid;
        // 分支事务ID
        private Long branchId;
        // 是否需要获取全局锁
        private boolean isGlobalLockRequire;
        // 自动提交的状态是否变更过
        private boolean autoCommitChanged;
        private final Map<String, Object> applicationData = new HashMap<>(2, 1.0001f);
        private final Map<Savepoint, Set<String>> lockKeysBuffer = new LinkedHashMap<>();
        private final Map<Savepoint, List<SQLUndoLog>> sqlUndoItemsBuffer = new LinkedHashMap<>();
        private final List<Savepoint> savepoints = new ArrayList<>(8);
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    StatementProxy 负责拦截本次会话中执行的每条SQL语句,并通过解析,查询前置和后置镜像,组装undo_log日志,最终完成本地事务的提交。

    下面我们将来仔细分析一下StatementProxy的模版流程实现。


    会话代理

    StatementProxy 类中的拦截逻辑也是以模版方法固定下来的,但是由于模版逻辑存在于query,insert,delete 和 update 逻辑中,所以这里将模版逻辑抽取到了ExecuteTemplate类中:

    public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {
        @Override
        public ResultSet executeQuery(String sql) throws SQLException {
            this.targetSQL = sql;
            return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery((String) args[0]), sql);
        }
    
        @Override
        public int executeUpdate(String sql) throws SQLException {
            this.targetSQL = sql;
            return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate((String) args[0]), sql);
        }
        
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    到此,应该可以猜到,SQL解析和前后镜像组织的核心逻辑都汇聚于ExecteTemplate类的execute方法中,下面我们来详细看看具体实现:

    public class ExecuteTemplate {
        public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,
                                                         StatementCallback<T, S> statementCallback,
                                                         Object... args) throws SQLException {
            return execute(null, statementProxy, statementCallback, args);
        }
    
        public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                         StatementProxy<S> statementProxy,
                                                         StatementCallback<T, S> statementCallback,
                                                         Object... args) throws SQLException {
            // 1. 当前事务执行无需获取全局锁,直接调用原本的Statement方法
            if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
                return statementCallback.execute(statementProxy.getTargetStatement(), args);
            }
            // 2. 根据DB类型,获取对应的SQL解析器
            String dbType = statementProxy.getConnectionProxy().getDbType();
            if (CollectionUtils.isEmpty(sqlRecognizers)) {
                sqlRecognizers = SQLVisitorFactory.get(
                        statementProxy.getTargetSQL(),
                        dbType);
            }
            Executor<T> executor;
            // 3. 当前SQL无需执行任何拦截处理,直接调用原本的Statement方法
            if (CollectionUtils.isEmpty(sqlRecognizers)) {
                executor = new PlainExecutor<>(statementProxy, statementCallback);
            } else {
            // 4. 如果SQL解析器只存在唯一的一个
                if (sqlRecognizers.size() == 1) {
                    SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                    // 5. 根据当前SQL类型,获取不同类型的执行拦截器
                    switch (sqlRecognizer.getSQLType()) {
                        case INSERT:
                            executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                        new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                        new Object[]{statementProxy, statementCallback, sqlRecognizer});
                            break;
                        // 下面就是case update , case delete 等同质的选择逻辑了
                        ...
                    }
                } else {
                // 6. 当存在多个SQL解析器时
                    executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
                }
            }
            T rs;
            // 7. 执行器的execute方法中包含SQL解析等逻辑,但是不同操作对于的处理逻辑不太一样,所以需要使用不同的执行器类型
            rs = executor.execute(args);
            ...
            return rs;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    ExecuteTemplate 会根据所执行的SQL语句类型不同,通过SPI加载不同类型的执行器来执行,执行器继承体系如下所示:
    在这里插入图片描述
    SQLRecognizer 作为SQL解析器,由于内部使用Druid作为最终解析工具,所有看做是Seata与Druid的一层隔离,防止两者直接耦合在一起。不同类型的SQL语句同样对应不同类型的SQLRecognizer实现,具体如下图所示:
    在这里插入图片描述
    执行器主要负责解析SQL语句来组织回滚日志,执行本地事务,获取全局锁以及提交本地事务。

    下面我们来看看不同场景下,执行器执行逻辑的区别。


    锁定查询执行器

    我们首先来看最简单的SelectForUpdateExecutor实现,针对select … for update 语句进行拦截增强:

    public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
        @Override
        public T execute(Object... args) throws Throwable {
            // 1. 获取全局事务ID,并绑定到当前连接上下文中
            String xid = RootContext.getXID();
            if (xid != null) {
                statementProxy.getConnectionProxy().bind(xid);
            }
            // 2. 将是否需要全局锁这一标识设置到连接上下文中 
            statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
            // 3. 真正执行查询的方法
            return doExecute(args);
        }
        
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    public class SelectForUpdateExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
        @Override
        public T doExecute(Object... args) throws Throwable {
            // 1. 获取代理数据源连接
            Connection conn = statementProxy.getConnection();
            // 2. 从代理数据源连接中获取当前数据库元数据信息
            DatabaseMetaData dbmd = conn.getMetaData();
            T rs;
            Savepoint sp = null;
            boolean originalAutoCommit = conn.getAutoCommit();
            try {
                // 3. 如果当前数据源开启了事务自动提交,则将自动提交暂时关闭
                if (originalAutoCommit) {
                    conn.setAutoCommit(false);
                } else if (dbmd.supportsSavepoints()) {
                // 4. 如果当前数据源关闭了事务自动提交,则在当前数据源支持回滚点的前提下,创建一个回滚点
                    // 如果因为全局锁获取失败,需要执行全局回滚,则可以直接回滚到当前事务执行到此处的状态,而非把之前的操作全部执行回滚
                    sp = conn.setSavepoint();
                } else {
                    throw new SQLException("not support savepoint. please check your db version");
                }
    
                LockRetryController lockRetryController = new LockRetryController();
                ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
                // 5. 构建SQL语句负责查询出本次查询涉及到的所有记录: select 主键 from 表 需要执行的sql的where子句
                String selectPKSQL = buildSelectSQL(paramAppenderList);
                // 这里的while循环为的是全局锁获取失败后,进行重试
                while (true) {
                    try {
                        // 6. 执行目标SQL查询语句
                        rs = statementCallback.execute(statementProxy.getTargetStatement(), args);
                        // 7. 执行before image对应的SQL语句,同时利用返回的记录列表,构建全局锁的key,该全局锁覆盖本次查询得到的记录列表
                        TableRecords selectPKRows = buildTableRecords(getTableMeta(), selectPKSQL, paramAppenderList);
                        String lockKeys = buildLockKey(selectPKRows);
                        if (StringUtils.isNullOrEmpty(lockKeys)) {
                            break;
                        }
                        // 8. 本地事务执行完毕,提交前,尝试获取全局锁  
                        if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {
                            // Do the same thing under either @GlobalTransactional or @GlobalLock, 
                            // that only check the global lock  here.
                            statementProxy.getConnectionProxy().checkLock(lockKeys);
                        } else {
                            throw new RuntimeException("Unknown situation!");
                        }
                        break;
                    } catch (LockConflictException lce) {
                        // 9. 获取全局锁失败,会先回滚当前本地事务,然后休眠指定时间后,再次重试
                        if (sp != null) {
                            conn.rollback(sp);
                        } else {
                            conn.rollback();
                        }
                        // trigger retry
                        lockRetryController.sleep(lce);
                    }
                }
            } finally {
                // 10. 如果有需要,则释放先前临时的创建的回滚点,同时将自动提交设置更改回来
                if (sp != null) {
                    try {
                        if (!JdbcConstants.ORACLE.equalsIgnoreCase(getDbType())) {
                            conn.releaseSavepoint(sp);
                        }
                    } catch (SQLException e) {
                        LOGGER.error("{} release save point error.", getDbType(), e);
                    }
                }
                // 11. 如果先前开启了自动提交,此处需要提交本地事务,同时将原本的自动提交设置更改回来
                if (originalAutoCommit) {
                    conn.setAutoCommit(true);
                }
            }
            // 12. 返回执行目标SQL语句得到的结果
            return rs;
        }
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    由于select … for update 锁定读不涉及数据修改,所以也就无需前置和后置镜像了,但是这里有四点需要注意一下:

    1. selectPKSQL 如何构建出来的 ?
    // 目标SQL语句
    select * from test where id = 1 and name = 'dhy' for update;
    // 构建得到的selectPKSQL,组装规则为: select + 主键 + from 表 + 目标SQL语句的where子句
    SELECT id FROM test WHERE id = 1 AND name = 'dhy' FOR UPDATE;
    
    • 1
    • 2
    • 3
    • 4
    1. 全局锁的key是如何构成的 ?
    // 对于只有单个主键的情况,例如上面的SQL语句,则key的模样如下
    // 表名:记录1主键值,记录2主键值,记录3主键值
    test:1,2
    // 对于联合主键的情况,则key的模样如下
    // 表名:记录1主键1值_记录1主键2值
    test:1_a,2_b
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. 如何判断全局锁是否获取成功 ?
    public class ConnectionProxy extends AbstractConnectionProxy {
        public void checkLock(String lockKeys) throws SQLException {
            if (StringUtils.isBlank(lockKeys)) {
                return;
            }
            try {
                // 全局锁等资源由默认的资源管理器管理
                boolean lockable = DefaultResourceManager.get().lockQuery(BranchType.AT,
                    getDataSourceProxy().getResourceId(), context.getXid(), lockKeys);
                // 获取全局锁失败,抛出锁冲突异常
                if (!lockable) {
                    throw new LockConflictException(String.format("get lock failed, lockKey: %s",lockKeys));
                }
            } catch (TransactionException e) {
                // 识别是否为锁冲突异常,如果是的话,抛出锁冲突异常
                recognizeLockKeyConflictException(e, lockKeys);
            }
        }
        ...
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    获取全局锁是否成功完全由资源管理器说的算,所以下面我们来看看资源管理器是如何判断全局锁是否获取成功的:

    public class DataSourceManager extends AbstractResourceManager {
        @Override
        public boolean lockQuery(BranchType branchType, String resourceId, String xid, String lockKeys) throws TransactionException {
            // 1. 构建全局锁获取亲戚
            GlobalLockQueryRequest request = new GlobalLockQueryRequest();
            request.setXid(xid);
            request.setLockKey(lockKeys);
            request.setResourceId(resourceId);
            try {
                GlobalLockQueryResponse response;
                // 2. 借助RmNettyRemotingClient发送同步请求
                if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {
                    response = (GlobalLockQueryResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
                } else {
                    throw new RuntimeException("unknow situation!");
                }
                // 3. 判断请求是否成功
                if (response.getResultCode() == ResultCode.Failed) {
                    throw new TransactionException(response.getTransactionExceptionCode(),
                        "Response[" + response.getMsg() + "]");
                }
                // 4. 判断全局锁是否获取成功
                return response.isLockable();
            } catch (TimeoutException toe) {
                throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
            } catch (RuntimeException rex) {
                throw new RmTransactionException(TransactionExceptionCode.LockableCheckFailed, "Runtime", rex);
            }
        }
        
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    全局锁资源具体是如何管理的,由server端负责承接逻辑实现,这一点在本系列文章后面会进行详细分析,这里暂时不展开。

    最后还有一点,LockRetryController主要负责两件事:

    1. 维护重试次数和重试间隔
        public LockRetryController() {
            this.lockRetryInterval = getLockRetryInterval();
            this.lockRetryTimes = getLockRetryTimes();
        }
    
    • 1
    • 2
    • 3
    • 4
    1. 负责休眠等待逻辑实现
        public void sleep(Exception e) throws LockWaitTimeoutException {
            // prioritize the rollback of other transactions
            if (--lockRetryTimes < 0 || (e instanceof LockConflictException
                && ((LockConflictException)e).getCode() == TransactionExceptionCode.LockKeyConflictFailFast)) {
                throw new LockWaitTimeoutException("Global lock wait timeout", e);
            }
    
            try {
                Thread.sleep(lockRetryInterval);
            } catch (InterruptedException ignore) {
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    本地事务提交

    关于本地事务提交这一点,由于内容较多,我想单独开一节进行讲解。

    本地事务提交有两种方式,一种是在设置autoCommit属性为false的前提下,由开发者手动提交;另一种就是在设置autoCommit属性为true的前提下,由框架内部的模版代码先将自动提交关闭,执行完本地SQL语句和附加逻辑后,再最后由框架内部调用commit方法完成提交,同时恢复原先自动提交的属性设置。

    要想享受上面框架提供的事务模版服务,我们需要首先把要执行的事务交付于框架托管,比如在事务方法上标注@Transactional注解,表示当前事务执行交由Spring事务模块托管;下面我们复习一下Spring为我们提供的事务执行的整套模版流程:

    public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
    
    	protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
    			final InvocationCallback invocation) throws Throwable {
    
                ...
                // 1. 准备平台事务管理器  
    		   PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
               ...
               // 2. 根据事务传播行为,决定是否开启新的本地事务
    	       TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
               // 3. 执行本地事务
    		   Object retVal;
    		   try {
    				retVal = invocation.proceedWithInvocation();
    		   }
    		   catch (Throwable ex) {
    		       // 4. 出现异常则回滚
    			   completeTransactionAfterThrowing(txInfo, ex);
    			   throw ex;
    		   }
               ...
               // 5. 执行正常则提交
    		   commitTransactionAfterReturning(txInfo);
    		   return retVal;
    	  }
            ...
       }
    }    				
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    1. 开启新的本地事务过程中,会调用PlatformTransactionManager的doBegin方法真正开启一次新的事务连接
    	@Override
    	protected void doBegin(Object transaction, TransactionDefinition definition) {
    		    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
                ...
    			con = txObject.getConnectionHolder().getConnection();
    			// 核心: 如果连接开启了自动提交,则关闭自动提交,并设置标记
    			if (con.getAutoCommit()) {
    				txObject.setMustRestoreAutoCommit(true);
    				con.setAutoCommit(false);
    			}
    			...
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    1. 无论我们是设置了手动提交,还是默认的自动提交,只要事务交由了Spring托管,那么Spring便会在目标方法正常执行完毕后,进行commit
    	@Override
    	protected void doCommit(DefaultTransactionStatus status) {
    		DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    		Connection con = txObject.getConnectionHolder().getConnection();
    		...
    		con.commit();
    	    ...
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    1. 在事务提交完毕后的资源清理环节,会将先前更改的自动提交恢复过来
    	@Override
    	protected void doCleanupAfterCompletion(Object transaction) {
    		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    		...
    		// 重新将连接设置回自动提交
    		Connection con = txObject.getConnectionHolder().getConnection();
    		if (txObject.isMustRestoreAutoCommit()) {
    				con.setAutoCommit(true);
    		}
    		...
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    Spring声明式事务实现的特点在于,如果当前连接设置的是自动提交,则更改为手动提交,同时目标方法执行完毕后,统一由Spring框架内部帮我们调用commit方法完成本地事务提交。我们下面会看到Seata的实现有所不同。

    Seata在遇到设置为自动提交模式的连接处理上,和Spring处理思路一致,但是当连接处于手动提交模式时,Seata不会在目标方法执行完毕后,帮助我们统一调用commit进行提交,而是需要开发者自行调用commit方法进行提交。

    对于SelectForUpdateExecutor来说,其execute方法可以简化为如下过程:

        @Override
        public T doExecute(Object... args) throws Throwable {
            ... 
            // 1. 如果当前连接开启了自动提交,则先关闭自动提交
            boolean originalAutoCommit = conn.getAutoCommit();
            if (originalAutoCommit) {
                conn.setAutoCommit(false);
            }
            // 2. 执行业务SQL
            ...
            // 3. 如果先前为自动提交模式,则在此处提交本地事务,同时恢复自动提交
            if (originalAutoCommit) {
                conn.setAutoCommit(true);
            }
            return rs;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    Seata只会在自动提交模式下,才会由框架内部在业务SQL执行完毕后,帮助我们调用commit方法完成本地事务提交,这一点需要注意。

    下面看看本地事务提交的具体过程:

    public class ConnectionProxy extends AbstractConnectionProxy {
        @Override
        public void setAutoCommit(boolean autoCommit) throws SQLException {
            // 1. 在开启了全局事务场景下,并且原先是自动提交模式,则在此处由框架内部手动帮助我们提交事务
            if ((context.inGlobalTransaction() || context.isGlobalLockRequire()) && autoCommit && !getAutoCommit()) {
                doCommit();
            }
            // 2. 恢复自动提交设置
            targetConnection.setAutoCommit(autoCommit);
        }
        ...
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    真正的本地事务提交逻辑在doCommit方法中,我们下面来看看其具体实现:

        private void doCommit() throws SQLException {
            // 1. 如果处于全局事务模式下,则走seata拦截逻辑
            if (context.inGlobalTransaction()) {
                processGlobalTransactionCommit();
            } else if (context.isGlobalLockRequire()) {
                processLocalCommitWithGlobalLocks();
            } else {
            // 2. 否则直接走正常事务提交逻辑  
                targetConnection.commit();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    下面看看对标注了@GlobalTransactional注解的目标方法的事务提交逻辑实现:

        private void processGlobalTransactionCommit() throws SQLException {
            try {
                // 1. 当前分支事务执行本地提交前,先执行分支事务注册(分支注册过程同时包含获取全局锁逻辑)
                register();
            } catch (TransactionException e) {
                // 识别是否为全局锁获取失败抛出的异常
                recognizeLockKeyConflictException(e, context.buildLockKeys());
            }
            try {
                // 2. 刷新UNDO日志到undo_log表中
                UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
                // 3. 执行本地事务提前(这里包括原有的业务逻辑和undo_log日志的落盘逻辑)
                targetConnection.commit();
            } catch (Throwable ex) {
                // 4. 上报分支事务执行失败
                report(false);
                throw new SQLException(ex);
            }
            // 5. 上报分支事务执行成功
            if (IS_REPORT_SUCCESS_ENABLE) {
                report(true);
            }
            // 6. 清空当前连接上下文
            context.reset();
        }
    
        private void register() throws TransactionException {
            if (!context.hasUndoLog() || !context.hasLockKey()) {
                return;
            }
            // 1. 分支事务提交前,到TC中进行注册
            Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
                null, context.getXid(), context.getApplicationData(),
                // 传入需要获取的全局锁key,不难猜出,分支事务注册的同时,还包括获取全局锁的逻辑
                context.buildLockKeys());
            // 2. 保存分支事务ID
            context.setBranchId(branchId);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    如果将连接设置为手动提交模式,则需要开发者手动调用ConnectionProxy的commit方法完成分支事务提交:

        @Override
        public void commit() throws SQLException {
            try {
                // 分支事务一阶段提交包括注册和获取全局锁两个过程,如果全局锁获取失败
                // 此处由lockRetryPolicy提供的模版方法,完成重试抢锁
                lockRetryPolicy.execute(() -> {
                    // 同样是调用doCommit方法完成分支事务一阶段提交
                    doCommit();
                    return null;
                });
            } catch (SQLException e) {
                if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
                    rollback();
                }
                throw e;
            } catch (Exception e) {
                throw new SQLException(e);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    SelectForUpdateExecutor 自动提交模式下无需在setAutoCommit方法中处理抢锁失败逻辑,是因为这段逻辑已经存在于了execute方法中,有遗忘的可以回看。而doCommit方法不只SelectForUpdateExecutor 会调用,所以内部需要处理获取全局锁失败逻辑。


    本地事务回滚

    本地事务回滚会调用ConnectionProxy的rollback方法,回滚逻辑实现比较简单,关键在于会向TC报告自己本地执行失败的状态:

        @Override
        public void rollback() throws SQLException {
            // 1. 执行正常的回滚操作
            targetConnection.rollback();
            // 2. 向TC报告当前分支事务的状态
            if (context.inGlobalTransaction() && context.isBranchRegistered()) {
                report(false);
            }
            // 3. 清空连接上下文
            context.reset();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    大家可以思考一下,当TC收到某个分支事务执行失败的状态后,它又是如何通知其他分支事务完成回滚的呢?这部分内容将在本系列后面揭晓。


    更新执行器

    UpdateExecutor 的逻辑是其次简单的,我们来看看其实现逻辑:

    public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
        @Override
        public T execute(Object... args) throws Throwable {
            // 1. 获取全局事务ID,并绑定到当前连接上下文中
            String xid = RootContext.getXID();
            if (xid != null) {
                statementProxy.getConnectionProxy().bind(xid);
            }
            // 2. 将是否需要全局锁这一标识设置到连接上下文中 
            statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
            // 3. 真正执行查询的方法
            return doExecute(args);
        }
        
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
        @Override
        public T doExecute(Object... args) throws Throwable {
            AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
            // 如果当前连接本身已经开启了自动提交,则在事务执行前,关闭自动提交,执行结束后,再开启自动提交
            if (connectionProxy.getAutoCommit()) {
                return executeAutoCommitTrue(args);
            } else {
                return executeAutoCommitFalse(args);
            }
        }
        ...
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    下面我们来看看处理更全面的一种情况,也就是连接本身开启了自动提交的前提下,是如何进行处理的:

    public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
        
        protected T executeAutoCommitTrue(Object[] args) throws Throwable {
            ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
            try {
                // 1. 关闭自动提交设置
                connectionProxy.changeAutoCommit();
                // 2. 分支事务一阶段提交逻辑包含获取全局锁的逻辑,所以需要处理抢锁失败的重试逻辑
                return new LockRetryPolicy(connectionProxy).execute(() -> {
                    // 执行SQL解析等拦截逻辑,然后执行最终的目标SQL语句
                    T result = executeAutoCommitFalse(args);
                    // 提交事务
                    connectionProxy.commit();
                    return result;
                });
            } catch (Exception e) {
                if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                    connectionProxy.getTargetConnection().rollback();
                }
                throw e;
            } finally {
                // 恢复自动提交设置
                connectionProxy.getContext().reset();
                connectionProxy.setAutoCommit(true);
            }
        }
        
        ... 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
        protected T executeAutoCommitFalse(Object[] args) throws Exception {
            try {
                // 1. 获取前置镜像
                TableRecords beforeImage = beforeImage();
                // 2. 执行正常的SQL语句
                T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
                // 3. 获取后置镜像
                TableRecords afterImage = afterImage(beforeImage);
                // 4. 准备undo日志
                prepareUndoLog(beforeImage, afterImage);
                return result;
            } catch (TableMetaException e) {
                ...
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    更新过程中的拦截逻辑核心就三步,下面我们来详细看看每一步的具体实现过程:

    1. 准备前置镜像
        @Override
        protected TableRecords beforeImage() throws SQLException {
            ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
            TableMeta tmeta = getTableMeta();
            // 1. 构建组装前置镜像对应的SQL语句
            String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);
            // 2. 执行该SQL语句,然后获取查询出来的记录
            return buildTableRecords(tmeta, selectSQL, paramAppenderList);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    这里简单讲讲前置镜像SQL语句组装的规则:

    // 目标SQL语句
    update test set name = 'WILL' where age = 18
    // 构建得到的SQL语句
    // 规则: select  + 主键,update涉及列 + from 表 + 目标SQL语句的where子句 
    SELECT id, name FROM test WHERE age = 18 FOR UPDATE
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. 准备后置镜像
        @Override
        protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
            TableMeta tmeta = getTableMeta();
            if (beforeImage == null || beforeImage.size() == 0) {
                return TableRecords.empty(getTableMeta());
            }
            // 1. 构建后置镜像查询SQL
            String selectSQL = buildAfterImageSQL(tmeta, beforeImage);
            ResultSet rs = null;
            // 2. 执行后置镜像查询SQL,返回查询结果
            try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {
                SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst);
                rs = pst.executeQuery();
                return TableRecords.buildRecords(tmeta, rs);
            } finally {
                IOUtil.close(rs);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    后置镜像SQL语句组装是依赖于前置镜像SQL的,而非目标SQL语句,:

    // 目标SQL语句
    update test set name = 'WILL' where age = 18
    // 前置镜像SQL语句
    SELECT id, name FROM test WHERE age = 18 FOR UPDATE
    // 构建得到的SQL语句
    // 规则: select  + 主键,update涉及列 + from 表 + where 主键 in (前置镜像查询出来的记录列表的主键列聚合得到的主键列表)
    SELECT id, name FROM test WHERE (id) in ( (?),(?) )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    1. 准备undo_log日志
        protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
            // 0. 健壮性检查
            if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
                return;
            }
            if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
                if (beforeImage.getRows().size() != afterImage.getRows().size()) {
                    throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
                }
            }
            ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
            // 1. 提交本地事务前,需要获取对应的全局锁,如果此处执行的时删除语句,则以前置镜像作为锁记录,否则以后置镜像作为锁记录(删除操作,无需记录后置镜像)
            TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
            // (此处key的组装规则和上文说的一致)
            String lockKeys = buildLockKey(lockKeyRecords);
            if (null != lockKeys) {
                // 2. 向当前连接上下文的lockKeysBuffer中追加需要获取的全局锁key
                connectionProxy.appendLockKey(lockKeys);
                // 3. 构建undo日志
                SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
                // 4. 像当前连接上下文的sqlUndoItemsBuffer中追加构建好的undo日志
                connectionProxy.appendUndoLog(sqlUndoLog);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    构建undo日志的具体过程如下:

        protected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterImage) {
            SQLType sqlType = sqlRecognizer.getSQLType();
            String tableName = sqlRecognizer.getTableName();
    
            SQLUndoLog sqlUndoLog = new SQLUndoLog();
            sqlUndoLog.setSqlType(sqlType);
            sqlUndoLog.setTableName(tableName);
            sqlUndoLog.setBeforeImage(beforeImage);
            sqlUndoLog.setAfterImage(afterImage);
            return sqlUndoLog;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    lockKeysBuffer 和 sqlUndoItemsBuffer 会在当前本地事务提交的时候用到。

    分支事务提交会调用ConnectionProxy的doCommit方法,这一点上面已经说过了,而doCommit方法会在注册分支事务的同时,传入需要获取的全局锁的key:
    在这里插入图片描述
    ConnectionContext 的buildLockKeys 方法中会遍历lockKeysBuffer 集合,对所有需要获取的全局锁key进行拼接:

        public String buildLockKeys() {
            if (lockKeysBuffer.isEmpty()) {
                return null;
            }
            // 1. 获取所有全局锁key
            Set<String> lockKeysBufferSet = new HashSet<>();
            for (Set<String> lockKeys : lockKeysBuffer.values()) {
                lockKeysBufferSet.addAll(lockKeys);
            }
    
            if (lockKeysBufferSet.isEmpty()) {
                return null;
            }
            // 2. 用;拼接在一起,然后返回
            StringBuilder appender = new StringBuilder();
            Iterator<String> iterable = lockKeysBufferSet.iterator();
            while (iterable.hasNext()) {
                appender.append(iterable.next());
                if (iterable.hasNext()) {
                    appender.append(";");
                }
            }
            return appender.toString();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    执行完分支事务注册和全局锁获取后,下一步就是向undo_log表中写入undo日志,然后提交本地事务了; 这里会调用UndoLogManager的flushUndoLogs写入undo日志,下面我们一起来看看:

        @Override
        public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
            ConnectionContext connectionContext = cp.getContext();
            if (!connectionContext.hasUndoLog()) {
                return;
            }
            // 1. 准备分支事务的undo日志
            String xid = connectionContext.getXid();
            long branchId = connectionContext.getBranchId();
            BranchUndoLog branchUndoLog = new BranchUndoLog();
            branchUndoLog.setXid(xid);
            branchUndoLog.setBranchId(branchId);
            branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
            // 2. 调用undo日志解析器对undo日志进行编码
            UndoLogParser parser = UndoLogParserFactory.getInstance();
            byte[] undoLogContent = parser.encode(branchUndoLog);
            // 3. 尝试对undo日志进行压缩
            CompressorType compressorType = CompressorType.NONE;
            if (needCompress(undoLogContent)) {
                compressorType = ROLLBACK_INFO_COMPRESS_TYPE;
                undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);
            }
            // 4. 将undo日志插入undo_log表中,如果使用的是Mysql,这里会调用MySQLUndoLogManager的insertUndoLogWithNormal实现
            insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    删除执行器

    本节来看看DeleteExecutor的执行流程,由于DeleteExecutor和UpdateExecutor都继承了AbstractDMLBaseExecutor,所以二者主要区别主要集中在前置和后置镜像构建的逻辑上,下面我们一起来看一下:

    1. 构建前置镜像
    public class DeleteExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
        @Override
        protected TableRecords beforeImage() throws SQLException {
            SQLDeleteRecognizer visitor = (SQLDeleteRecognizer) sqlRecognizer;
            TableMeta tmeta = getTableMeta(visitor.getTableName());
            ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
            // 构建前置镜像对应的SQL,然后执行该SQL,返回查询得到的结果
            String selectSQL = buildBeforeImageSQL(visitor, tmeta, paramAppenderList);
            return buildTableRecords(tmeta, selectSQL, paramAppenderList);
        }
         
        ... 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    这里还是老规矩,来看看前置镜像SQL组装的逻辑:

    // 目标SQL
    delete from t where id = 1
    // 前置镜像SQL
    // 组装规则: select * from 表 + 目标SQL的where子句
    SELECT name, id FROM t WHERE id = 1 FOR UPDATE
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. delete操作无需后置镜像,所以返回结果集合为空
        @Override
        protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
            return TableRecords.empty(getTableMeta());
        }
    
    • 1
    • 2
    • 3
    • 4

    插入执行器

    本节我们再来看看MySQLInsertExecutor的执行流程,由于MySQLInsertExecutor同样继承了AbstractDMLBaseExecutor,所以这里我们也只对前置和后置镜像构建过程进行分析:

    1. 构建前置镜像
        @Override
        protected TableRecords beforeImage() throws SQLException {
             // insert操作无需前置镜像,所以返回的空集合
            return TableRecords.empty(getTableMeta());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. 构建后置镜像
        @Override
        protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
            // 1. 获取插入记录的主键值
            Map<String, List<Object>> pkValues = getPkValues();
            // 2. 正常情况下,此处返回的后置镜像里面的内容也是空的,因为记录插入之前并不存在
            TableRecords afterImage = buildTableRecords(pkValues);
            if (afterImage == null) {
                throw new SQLException("Failed to build after-image for insert");
            }
            return afterImage;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    除了简单的增删改查语句外,还有涉及Join的更新操作,InsertOnUpdate等操作,这部分操作对应的执行器实现大家可自行翻阅源码进行学习。


    小节

    本文和大家一起探索了Seata AT模式的一阶段实现,下篇文章将和大家一起来看看AT模式二阶段的实现和一阶段中漏掉的全局事务提交和回滚。

  • 相关阅读:
    数据结构期末复习(十套试卷)库题
    Django笔记十五之in查询及date日期相关过滤操作
    LIinux使用VIM掌握输入输出重定向与管道命令的应用
    ASP.NET Core - 缓存之分布式缓存
    构建可扩展的应用:六边形架构详解与实践
    docker安装Jenkins完整教程
    Spring Aop原理解析和使用示例
    机器学习(公式推导与代码实现)--sklearn机器学习库
    LeetCode之二:字母异位词分组
    长沙学院2022暑假训练赛(一)六级阅读
  • 原文地址:https://blog.csdn.net/m0_53157173/article/details/132999397