• SpringCloud Alibaba系列——17Seata AT模式源码分析


    学习目标

    1. Seata AT模式源码流程

    第1章 AT模式流程

    1.1 思维流程推导

    上文中已经讲了AT模式的大体原理,在源码中,通过README也能看出来AT模式的使用,那本文将从底层源码层面去分析AT模式的原理,在分析原理之前咱们先来看三幅图,理解一下他的工作思路和模式:

    先看看思维推导图

    1.2 初始化流程推导

    1.3 执行流程推导 

    第2章 源码分析

    2.1 SeataAutoConfiguration

    对于seata源码的研究主要看seata如何拦截业务SQL生成undo_log数据,如何在一阶段完成后提交全局事务,如何在一阶段业务失败后通过undo_log回滚事务,进行事务补偿。

    seata也是与spring整合使用的,结合SpringBoot,seata也是做了一些自动配置

    seata的自动配置类命名非常的直接,就叫做:SeataAutoConfiguration,我们打开这个类

    1. @ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
    2. @ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
    3. @Configuration
    4. @EnableConfigurationProperties({SeataProperties.class})
    5. public class SeataAutoConfiguration {
    6. }

    首先,@ComponentScan扫描了一下properties包,加载了一大堆类似SeataProperties的Bean对象。

    @ConditionalOnProperty将配置类生效条件设置为seata.enabled=true,默认值是true,所以可以开关分布式事务功能(在client端的file.conf里面可以配置)。

    @Configuration表明,SeataAutoConfiguration被定义为了spring的配置类。

    @EnableConfigurationProperties将配置包转成了一个SeataProperties的Bean对象来使用。

    接下来阅读SeataAutoConfiguration的内部代码

    1. @Bean
    2. @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    3. @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    4. public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
    5. if (LOGGER.isInfoEnabled()) {
    6. LOGGER.info("Automatically configure Seata");
    7. }
    8. return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    9. }

    自动配置的核心点落在了下面的一个Bean,GlobalTransactionScanner。

    我们看到构造这个Bean非常的简单,构造方法只需要一个applicationId和txServiceGroup。

    applicationId: 就是spring.application.name=你定义的当前应用的名字,例如:userService

    txServiceGroup: 就是以applicationId 加上 -seata-service-group命名的,例如:userService-seata-service-group。如果版本较低的话,那时候可能还不叫seata而是fescar,因此默认命名就是以fescar为后缀。

    new了一个GlobalTransactionScanner对象,SeataAutoConfiguration这个自动配置类的作用就结束了。SeataAutoConfiguration只是做了一个启动引导的作用。

    2.2 GlobalTransactionScanner

    既然核心点落在GlobalTransactionScanner这个类,我们继续关注它。看这个名字其实就可以猜测到一点它的作用,扫描@GlobalTransactional这个注解,并对代理方法进行拦截增强事务的功能。

    要了解这个类,不得不先阅读一下它的UML图

    可以看到,GlobalTransactionScanner主要有4个点值得关注:

    1)ApplicationContextAware表示可以拿到spring容器

    2)InitializingBean接口,表达了初始化的时候会进行一些操作

    3)AbstractAutoProxyCreator表示它会对spring容器中的Bean进行切面增强,也就是我们上面的拦截事务增强的猜测。

    4)Disposable接口,表达了spring容器销毁的时候会进行一些操作

    这里我们稍微关注一下这4个的执行顺序:

    ApplicationContextAware -> InitializingBean -> AbstractAutoProxyCreator -> DisposableBean

    2.3 InitializingBean

    1. @Override
    2. public void afterPropertiesSet() {
    3. if (disableGlobalTransaction) {
    4. if (LOGGER.isInfoEnabled()) {
    5. LOGGER.info("Global transaction is disabled.");
    6. }
    7. return;
    8. }
    9. initClient();
    10. }

    初始化Seata的Client端的东西,Client端主要包括TransactionManager和ResourceManager。或许是为了简化吧,并没有把initClient这件事从GlobalTransactionScanner里面独立出来一个类。

    跟进initClient方法

    1. private void initClient() {
    2. //init TM
    3. TMClient.init(applicationId, txServiceGroup);
    4. //init RM
    5. RMClient.init(applicationId, txServiceGroup);
    6. registerSpringShutdownHook();
    7. }

    initClient逻辑并不复杂,单纯调用TMClient.init初始化TransactionManager的RPC客户端,RMClient.init初始化ResourceManager的RPC客户端。seata的RPC采用netty来实现,seata封装简化了一下使用。并注册了一个Spring的ShutdownHook钩子函数

    2.3.1 TMClient初始化

    1. @Override
    2. public void init() {
    3. timerExecutor.scheduleAtFixedRate(new Runnable() {
    4. @Override
    5. public void run() {
    6. clientChannelManager.reconnect(getTransactionServiceGroup());
    7. }
    8. }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    9. if (NettyClientConfig.isEnableClientBatchSendRequest()) {
    10. mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
    11. MAX_MERGE_SEND_THREAD,
    12. KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
    13. new LinkedBlockingQueue<>(),
    14. new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
    15. mergeSendExecutorService.submit(new MergedSendRunnable());
    16. }
    17. super.init();
    18. clientBootstrap.start();
    19. }

    启动了一个定时器不断进行重连操作,调用clientChannelManager.reconnect方法进行重连

    1. void reconnect(String transactionServiceGroup) {
    2. List availList = null;
    3. try {
    4. availList = getAvailServerList(transactionServiceGroup);
    5. } catch (Exception e) {
    6. LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
    7. return;
    8. }
    9. if (CollectionUtils.isEmpty(availList)) {
    10. String serviceGroup = RegistryFactory.getInstance()
    11. .getServiceGroup(transactionServiceGroup);
    12. LOGGER.error("no available service '{}' found, please make sure registry config correct", serviceGroup);
    13. return;
    14. }
    15. for (String serverAddress : availList) {
    16. try {
    17. acquireChannel(serverAddress);
    18. } catch (Exception e) {
    19. LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);
    20. }
    21. }
    22. }

    根据transactionServiceGroup获取seata-server的ip地址列表,然后进行重连

    1. private List getAvailServerList(String transactionServiceGroup) throws Exception {
    2. List availInetSocketAddressList = RegistryFactory.getInstance()
    3. .lookup(transactionServiceGroup);
    4. if (CollectionUtils.isEmpty(availInetSocketAddressList)) {
    5. return Collections.emptyList();
    6. }
    7. return availInetSocketAddressList.stream()
    8. .map(NetUtil::toStringAddress)
    9. .collect(Collectors.toList());
    10. }

    RegistryFactory.getInstance().lookup(transactionServiceGroup);是对不同注册中心做了适配的,默认看下Nacos形式的实现

     先根据事务分组找到分组所属的server集群名称,这里是default,然后根据集群名称找到server对应ip端口地址

    1. @Override
    2. public List lookup(String key) throws Exception {
    3. //default
    4. String clusterName = getServiceGroup(key);
    5. if (clusterName == null) {
    6. return null;
    7. }
    8. if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {
    9. synchronized (LOCK_OBJ) {
    10. if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {
    11. List clusters = new ArrayList<>();
    12. clusters.add(clusterName);
    13. List firstAllInstances = getNamingInstance().getAllInstances(getServiceName(), getServiceGroup(), clusters);
    14. if (null != firstAllInstances) {
    15. List newAddressList = firstAllInstances.stream()
    16. .filter(instance -> instance.isEnabled() && instance.isHealthy())
    17. .map(instance -> new InetSocketAddress(instance.getIp(), instance.getPort()))
    18. .collect(Collectors.toList());
    19. CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);
    20. }
    21. subscribe(clusterName, event -> {
    22. List instances = ((NamingEvent) event).getInstances();
    23. if (null == instances && null != CLUSTER_ADDRESS_MAP.get(clusterName)) {
    24. CLUSTER_ADDRESS_MAP.remove(clusterName);
    25. } else if (!CollectionUtils.isEmpty(instances)) {
    26. List newAddressList = instances.stream()
    27. .filter(instance -> instance.isEnabled() && instance.isHealthy())
    28. .map(instance -> new InetSocketAddress(instance.getIp(), instance.getPort()))
    29. .collect(Collectors.toList());
    30. CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);
    31. }
    32. });
    33. }
    34. }
    35. }
    36. return CLUSTER_ADDRESS_MAP.get(clusterName);
    37. }

    Seata-server的IP地址已获取到,然后调用acquireChannel

    1. Channel acquireChannel(String serverAddress) {
    2. Channel channelToServer = channels.get(serverAddress);
    3. if (channelToServer != null) {
    4. channelToServer = getExistAliveChannel(channelToServer, serverAddress);
    5. if (channelToServer != null) {
    6. return channelToServer;
    7. }
    8. }
    9. if (LOGGER.isInfoEnabled()) {
    10. LOGGER.info("will connect to " + serverAddress);
    11. }
    12. channelLocks.putIfAbsent(serverAddress, new Object());
    13. synchronized (channelLocks.get(serverAddress)) {
    14. return doConnect(serverAddress);
    15. }
    16. }

    最后将获取到的seata-server的IP地址放到Netty中封装,TmClient就初始化完毕

    TmClient初始化总结:

    • 启动定时器,尝试进行一次重连seata-server

    • 重连时,先从nacos(或则其他配置)中根据分组名称(service_group)找到集群名称(cluster_name)

    • 再根据集群名称找到集群ip端口列表

    • 从ip列表中选择一个用netty进行连接

    2.3.2 RMClient初始化

    1. public static void init(String applicationId, String transactionServiceGroup) {
    2. // 获取单例对象
    3. RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
    4. // 设置ResourceManager的单例对象
    5. rmRpcClient.setResourceManager(DefaultResourceManager.get());
    6. // 添加监听器,监听Server端的消息推送
    7. rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()));
    8. // 初始化RPC
    9. rmRpcClient.init();
    10. }

    和TMClient想比,RMClient多出了一个监听Server端消息并处理的机制。也就是说TM的职责更多的是主动与Server端通信,比如:全局事务的begin、commit、rollback等。

    而RM除了主动操作本地资源外,还会因为全局事务的commit、rollback等的消息推送,从而对本地资源进行相关操作。

    设置资源管理器resourceManager,设置消息回调监听器用于接收TC在二阶段发出的提交或者回滚请求,Seata中对ResourceManager,AbstractRMHandler做了SPI适配,以ResouceManager为例:

    1. public class DefaultResourceManager implements ResourceManager {
    2. protected void initResourceManagers() {
    3. //init all resource managers
    4. List allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);
    5. if (CollectionUtils.isNotEmpty(allResourceManagers)) {
    6. for (ResourceManager rm : allResourceManagers) {
    7. resourceManagers.put(rm.getBranchType(), rm);
    8. }
    9. }
    10. }
    11. }

    可以看到初始化DefaultResouceManager时会使用ClassLoader去加载对应Jar下的实现,而默认AT模式使用的实现是数据库,也就是rm-datasource包下的实现,找实现类路径需要定位到/resources/META-INF/扩展接口全路径去找,就会找到对应的实现类

    ResourceManager对应实现类全路径 io.seata.rm.datasource.DataSourceManager,该类中指定了了提交和回滚的方法,DefaultRMHandler对应实现类全路径io.seata.rm.RMHandlerAT,是个接收server消息并做对应提交或者回滚操作的回调处理类。

    RMClinet的init()方法与TMClient基本一致

    2.3.3 总结

    • Spring启动时,初始化了2个客户端TmClient、RmClient

    • TmClient与seata-server通过Netty建立连接并发送消息

    • RmClient与seata-server通过Netty建立连接,负责接收二阶段提交、回滚消息并在回调器(RmHandler)中做处理

    2.4 AbstractAutoProxyCreator

    GlobalTransactionScanner初始化完了TM和RM以后,我们再关注一下AbstractAutoProxyCreator,自动代理。

    自动代理,它代理啥东西呢?或者说它给spring中的Bean增强了什么功能?

    GlobalTransactionScanner主要扩展了AbstractAutoProxyCreator的wrapIfNecessary

    代理增强的前置判断处理,表示是否该Bean需要增强,如果增强的话创建代理类

    2.4.1 wrapIfNecessary

    1. @Override
    2. protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    3. if (disableGlobalTransaction) {
    4. return bean;
    5. }
    6. try {
    7. synchronized (PROXYED_SET) {
    8. // 相同Bean排重
    9. if (PROXYED_SET.contains(beanName)) {
    10. return bean;
    11. }
    12. interceptor = null;
    13. // 判断是否开启TCC模式
    14. if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
    15. // TCC实现的拦截器
    16. interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
    17. } else {
    18. Class serviceInterface = SpringProxyUtils.findTargetClass(bean);
    19. Class[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
    20. // 判断是否存在@GlobalTransactional或者@GlobalLock注解
    21. if (!existsAnnotation(new Class[]{serviceInterface})
    22. && !existsAnnotation(interfacesIfJdk)) {
    23. return bean;
    24. }
    25. if (interceptor == null) {
    26. // 非TCC的拦截器
    27. if (globalTransactionalInterceptor == null) {
    28. globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
    29. ConfigurationCache.addConfigListener(
    30. ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
    31. (ConfigurationChangeListener)globalTransactionalInterceptor);
    32. }
    33. interceptor = globalTransactionalInterceptor;
    34. }
    35. }
    36. // 判断当前Bean是否已经是spring的代理类了
    37. if (!AopUtils.isAopProxy(bean)) {
    38. // 如果还不是,那么走一轮spring的代理过程即可
    39. bean = super.wrapIfNecessary(bean, beanName, cacheKey);
    40. } else {
    41. // 如果是一个spring的代理类,那么反射获取代理类中已经存在的拦截器集合,然后添加到该集合当中
    42. AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
    43. Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
    44. for (Advisor avr : advisor) {
    45. advised.addAdvisor(0, avr);
    46. }
    47. }
    48. PROXYED_SET.add(beanName);
    49. return bean;
    50. }
    51. } catch (Exception exx) {}
    52. }

    wrapIfNecessary方法较长我们分步骤看看

    1)isTccAutoProxy判断是否开启tcc模式,开启的话选择了TccActionInterceptor拦截器,非tcc模式选择GlobalTransactionalInterceptor拦截器,默认不开启

    2)existAnnotation判断当前Bean是否有类或者接口的方法存在@GlobalTransactional或者@GlobalLock注解,如果没有则直接返回

    3)isAopProxy方法是判断当前的Bean是否已经是spring的代理类了,无论是JDK动态代理还是Cglib类代理。如果是普通的Bean,走原有的生成代理逻辑即可,如果已经是代理类,那么要通过反射获取代理对象内的拦截器集合也叫做Advisor,直接添加到该集合当中。

    wrapIfNecessary的方法并不复杂,但是如果对代理不是很熟悉或许对细节点会有些困惑。

    2.4.1.1 AT一阶段开启全局事务

    在需要进行全局事务管理的接口上,会加@GlobalTransactional注解,这个注解会又一个对应的拦截器进行拦截GlobalTransactionalInterceptor,invoke就是拦截方法

    1. @Override
    2. public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    3. Class targetClass =
    4. methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
    5. Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
    6. if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
    7. final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
    8. //获取方法上的全局事务注解
    9. final GlobalTransactional globalTransactionalAnnotation =
    10. getAnnotation(method, targetClass, GlobalTransactional.class);
    11. //获取方法上的全局锁注解
    12. final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
    13. boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
    14. if (!localDisable) {
    15. //如果方法上有全局事务注解,调用handleGlobalTransaction开启全局事务
    16. if (globalTransactionalAnnotation != null) {
    17. return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
    18. //如果方法上有全局锁注解,调用handleGlobalLock开启全局锁
    19. } else if (globalLockAnnotation != null) {
    20. return handleGlobalLock(methodInvocation);
    21. }
    22. }
    23. }
    24. //如果啥都没有,按普通方法执行,提升性能
    25. return methodInvocation.proceed();
    26. }

    在handleGlobalTransaction方法中调用了transactionalTemplate.execute方法

    1. // 2. 开启全局事务beginTransaction
    2. beginTransaction(txInfo, tx);
    3. Object rs = null;
    4. try {
    5. // 执行业务方法business.execute()
    6. rs = business.execute();
    7. } catch (Throwable ex) {
    8. // 3.出现异常执行completeTransactionAfterThrowing回滚
    9. completeTransactionAfterThrowing(txInfo, tx, ex);
    10. throw ex;
    11. }
    12. // 4. 没有异常提交事务commitTransaction
    13. commitTransaction(tx);

    开启全局事务最终调用io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)方法

    1. @Override
    2. public void begin(int timeout, String name) throws TransactionException {
    3. //此处的角色判断有关键的作用
    4. //表明当前是全局事务的发起者(Launcher)还是参与者(Participant)
    5. //如果在分布式事务的下游系统方法中也加上GlobalTransactional注解
    6. //那么它的角色就是Participant,即会忽略后面的begin就退出了
    7. //而判断是发起者(Launcher)还是参与者(Participant)是根据当前上下文是否已存在XID来判断
    8. //没有XID的就是Launcher,已经存在XID的就是Participant
    9. if (role != GlobalTransactionRole.Launcher) {
    10. assertXIDNotNull();
    11. if (LOGGER.isDebugEnabled()) {
    12. LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
    13. }
    14. return;
    15. }
    16. assertXIDNull();
    17. if (RootContext.getXID() != null) {
    18. throw new IllegalStateException();
    19. }
    20. xid = transactionManager.begin(null, null, name, timeout);
    21. status = GlobalStatus.Begin;
    22. RootContext.bind(xid);
    23. if (LOGGER.isInfoEnabled()) {
    24. LOGGER.info("Begin new global transaction [{}]", xid);
    25. }
    26. }

    请求seata-server获取全局事务XID

    1. @Override
    2. public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    3. throws TransactionException {
    4. GlobalBeginRequest request = new GlobalBeginRequest();
    5. request.setTransactionName(name);
    6. request.setTimeout(timeout);
    7. //跟进
    8. GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
    9. if (response.getResultCode() == ResultCode.Failed) {
    10. throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
    11. }
    12. return response.getXid();
    13. }
    1. private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
    2. try {
    3. //TMClient封装的Netty对象
    4. return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
    5. } catch (TimeoutException toe) {
    6. throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
    7. }
    8. }

    将XID绑定在RootContext中,由此可以看出全局事务是由TM发起的,TM发起全局事务请求给seata-server服务,seata-server服务接受到请求后处理(以下是seata服务代码):

    1. @Override
    2. protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
    3. throws TransactionException {
    4. //进入begin
    5. response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
    6. request.getTransactionName(), request.getTimeout()));
    7. if (LOGGER.isInfoEnabled()) {
    8. LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
    9. rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
    10. }
    11. }

    io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin方法接受客户端开启全局事务的请求,调用io.seata.server.coordinator.DefaultCore#begin开启全局事务

    1. @Override
    2. public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    3. throws TransactionException {
    4. GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
    5. timeout);
    6. MDC.put(RootContext.MDC_KEY_XID, session.getXid());
    7. session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    8. //开启会话
    9. session.begin();
    10. // transaction start event
    11. eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
    12. session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));
    13. return session.getXid();
    14. }

    通过当前会话开启

    1. @Override
    2. public void begin() throws TransactionException {
    3. this.status = GlobalStatus.Begin;
    4. this.beginTime = System.currentTimeMillis();
    5. this.active = true;
    6. for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
    7. lifecycleListener.onBegin(this);
    8. }
    9. }

    调用io.seata.server.session.AbstractSessionManager#onBegin方法,又调用io.seata.server.storage.db.session.DataBaseSessionManager#addGlobalSession方法

    1. @Override
    2. public void addGlobalSession(GlobalSession session) throws TransactionException {
    3. if (StringUtils.isBlank(taskName)) {
    4. //进入
    5. boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
    6. if (!ret) {
    7. throw new StoreException("addGlobalSession failed.");
    8. }
    9. } else {
    10. boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
    11. if (!ret) {
    12. throw new StoreException("addGlobalSession failed.");
    13. }
    14. }
    15. }

     这里往数据库里写入数据

    1. @Override
    2. public boolean writeSession(LogOperation logOperation, SessionStorable session) {
    3. if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
    4. return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    5. } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
    6. return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    7. } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
    8. return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    9. } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
    10. return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    11. } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
    12. return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    13. } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
    14. return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    15. } else {
    16. throw new StoreException("Unknown LogOperation:" + logOperation.name());
    17. }
    18. }

    这里向seata库global_tab插入数据,到此全局事务已开启

    2.4.1.2 AT一阶段执行业务SQL

    全局事务已开启,下面需要执行业务SQL,生成undo_log数据,全局事务拦截成功后最终还是执行了业务方法的,但是由于Seata对数据源做了代理,所以sql解析与undo_log入库操作是在数据源代理中执行的,代理就是Seata对DataSource,Connection,Statement做的代理封装类

    1. /**
    2. * 构造datasource代理对象,替换原来的的datasource
    3. */
    4. @Primary
    5. @Bean("dataSource")
    6. public DataSourceProxy dataSourceProxy(DataSource druidDataSource){
    7. return new DataSourceProxy(druidDataSource);
    8. }

    项目中使用的数据源均用seata的DataSourceProxy代替

     最终对Sql进行解析操作,发生在StatementProxy类中

    1. @Override
    2. public boolean execute(String sql) throws SQLException {
    3. this.targetSQL = sql;
    4. return ExecuteTemplate.execute(this, (statement, args) -> statement.execute((String) args[0]), sql);
    5. }
    1. public static extends Statement> T execute(List sqlRecognizers,
    2. StatementProxy statementProxy,
    3. StatementCallback statementCallback,
    4. Object... args) throws SQLException {
    5. if (!RootContext.requireGlobalLock() && !StringUtils.equals(BranchType.AT.name(), RootContext.getBranchType())) {
    6. //不是全局事务的直接执行,提升性能
    7. return statementCallback.execute(statementProxy.getTargetStatement(), args);
    8. }
    9. String dbType = statementProxy.getConnectionProxy().getDbType();
    10. if (CollectionUtils.isEmpty(sqlRecognizers)) {
    11. sqlRecognizers = SQLVisitorFactory.get(
    12. statementProxy.getTargetSQL(),
    13. dbType);
    14. }
    15. Executor executor;
    16. if (CollectionUtils.isEmpty(sqlRecognizers)) {
    17. executor = new PlainExecutor<>(statementProxy, statementCallback);
    18. } else {
    19. if (sqlRecognizers.size() == 1) {
    20. SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
    21. //不同SQL类型,不同处理
    22. switch (sqlRecognizer.getSQLType()) {
    23. case INSERT:
    24. executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
    25. new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
    26. new Object[]{statementProxy, statementCallback, sqlRecognizer});
    27. break;
    28. case UPDATE:
    29. executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
    30. break;
    31. case DELETE:
    32. executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
    33. break;
    34. case SELECT_FOR_UPDATE:
    35. executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
    36. break;
    37. default:
    38. executor = new PlainExecutor<>(statementProxy, statementCallback);
    39. break;
    40. }
    41. } else {
    42. executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
    43. }
    44. }
    45. T rs;
    46. try {
    47. //执行SQL
    48. rs = executor.execute(args);
    49. } catch (Throwable ex) {
    50. if (!(ex instanceof SQLException)) {
    51. // Turn other exception into SQLException
    52. ex = new SQLException(ex);
    53. }
    54. throw (SQLException) ex;
    55. }
    56. return rs;
    57. }
    • 先判断是否开启了全局事务,如果没有,不走代理,不解析sql,提升性能

    • 调用SQLVisitorFactory对目标sql进行解析

    • 针对特定类型sql操作(INSERT,UPDATE,DELETE,SELECT_FOR_UPDATE)等进行特殊解析

    • 执行sql并返回结果

    不同类型的SQL处理方法不一样,这里以insert为例

    insert使用的是InsertExecutor.execute方法,但其实最终还是使用io.seata.rm.datasource.exec.BaseTransactionalExecutor#execute方法

    1. @Override
    2. public T execute(Object... args) throws Throwable {
    3. if (RootContext.inGlobalTransaction()) {
    4. String xid = RootContext.getXID();
    5. statementProxy.getConnectionProxy().bind(xid);
    6. }
    7. statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
    8. return doExecute(args);
    9. }

     将上下文中的xid绑定到了statementProxy中,并调用了doExecute方法,看下AbstractDMLBaseExecutor中的doExecute方法

    1. @Override
    2. public T doExecute(Object... args) throws Throwable {
    3. AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    4. if (connectionProxy.getAutoCommit()) {
    5. return executeAutoCommitTrue(args);
    6. } else {
    7. return executeAutoCommitFalse(args);
    8. }
    9. }

    方法中调用了executeAutoCommitTrue/executeAutoCommitFalse

    1. protected T executeAutoCommitTrue(Object[] args) throws Throwable {
    2. ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    3. try {
    4. connectionProxy.setAutoCommit(false);
    5. return new LockRetryPolicy(connectionProxy).execute(() -> {
    6. T result = executeAutoCommitFalse(args);
    7. connectionProxy.commit();
    8. return result;
    9. });
    10. } catch (Exception e) {
    11. // when exception occur in finally,this exception will lost, so just print it here
    12. LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
    13. if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
    14. connectionProxy.getTargetConnection().rollback();
    15. }
    16. throw e;
    17. } finally {
    18. connectionProxy.getContext().reset();
    19. connectionProxy.setAutoCommit(true);
    20. }
    21. }

    但仔细发现,最终都是调用executeAutoCommitFalse方法

    1. protected T executeAutoCommitFalse(Object[] args) throws Exception {
    2. //跟入getTableMeta方法
    3. if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && getTableMeta().getPrimaryKeyOnlyName().size() > 1)
    4. {
    5. throw new NotSupportYetException("multi pk only support mysql!");
    6. }
    7. //获取beforeImage
    8. TableRecords beforeImage = beforeImage();
    9. //执行业务sql
    10. T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    11. //获取afterImage
    12. TableRecords afterImage = afterImage(beforeImage);
    13. //保存image
    14. prepareUndoLog(beforeImage, afterImage);
    15. return result;
    16. }

    获取beforeImage

    1. //tableMeta里面包含表名、列、索引等数据
    2. protected TableMeta getTableMeta(String tableName) {
    3. if (tableMeta != null) {
    4. return tableMeta;
    5. }
    6. ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    7. tableMeta = TableMetaCacheFactory.getTableMetaCache(connectionProxy.getDbType())
    8. .getTableMeta(connectionProxy.getTargetConnection(), tableName, connectionProxy.getDataSourceProxy().getResourceId());
    9. return tableMeta;
    10. }

    执行业务sql还是使用com.alibaba.druid.pool.DruidPooledPreparedStatement#execute方法执行

    获取afterImage

    在提交事务时,插入undo_log日志  

    1. protected T executeAutoCommitTrue(Object[] args) throws Throwable {
    2. ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    3. try {
    4. connectionProxy.setAutoCommit(false);
    5. return new LockRetryPolicy(connectionProxy).execute(() -> {
    6. T result = executeAutoCommitFalse(args);
    7. //跟入
    8. connectionProxy.commit();
    9. return result;
    10. });
    11. } catch (Exception e) {
    12. // when exception occur in finally,this exception will lost, so just print it here
    13. LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
    14. if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
    15. connectionProxy.getTargetConnection().rollback();
    16. }
    17. throw e;
    18. } finally {
    19. connectionProxy.getContext().reset();
    20. connectionProxy.setAutoCommit(true);
    21. }
    22. }
    1. public void commit() throws SQLException {
    2. try {
    3. LOCK_RETRY_POLICY.execute(() -> {
    4. //跟入
    5. doCommit();
    6. return null;
    7. });
    8. } catch (SQLException e) {
    9. throw e;
    10. } catch (Exception e) {
    11. throw new SQLException(e);
    12. }
    13. }
    1. private void doCommit() throws SQLException {
    2. if (context.inGlobalTransaction()) {
    3. //跟入
    4. processGlobalTransactionCommit();
    5. } else if (context.isGlobalLockRequire()) {
    6. processLocalCommitWithGlobalLocks();
    7. } else {
    8. targetConnection.commit();
    9. }
    10. }
    1. private void processGlobalTransactionCommit() throws SQLException {
    2. try {
    3. //向seata-server注册分支信息
    4. register();
    5. } catch (TransactionException e) {
    6. recognizeLockKeyConflictException(e, context.buildLockKeys());
    7. }
    8. try {
    9. //提交事务之前,插入undo_log,跟入flushUndoLogs
    10. UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
    11. targetConnection.commit();
    12. } catch (Throwable ex) {
    13. LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
    14. report(false);
    15. throw new SQLException(ex);
    16. }
    17. if (IS_REPORT_SUCCESS_ENABLE) {
    18. report(true);
    19. }
    20. context.reset();
    21. }
    1. public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
    2. ConnectionContext connectionContext = cp.getContext();
    3. if (!connectionContext.hasUndoLog()) {
    4. return;
    5. }
    6. String xid = connectionContext.getXid();
    7. long branchId = connectionContext.getBranchId();
    8. BranchUndoLog branchUndoLog = new BranchUndoLog();
    9. branchUndoLog.setXid(xid);
    10. branchUndoLog.setBranchId(branchId);
    11. branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
    12. UndoLogParser parser = UndoLogParserFactory.getInstance();
    13. byte[] undoLogContent = parser.encode(branchUndoLog);
    14. if (LOGGER.isDebugEnabled()) {
    15. LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
    16. }
    17. //该方法插入undo_log
    18. insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName()), undoLogContent,
    19. cp.getTargetConnection());
    20. }

    在该方法中注册分支事务

    提交事务,向seata-server注册分支信息,seata-server接收到请求(seata源码)

    io.seata.server.coordinator.DefaultCoordinator#doBranchRegister方法  

    1. public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
    2. String applicationData, String lockKeys) throws TransactionException {
    3. GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
    4. return SessionHolder.lockAndExecute(globalSession, () -> {
    5. globalSessionStatusCheck(globalSession);
    6. globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    7. BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
    8. applicationData, lockKeys, clientId);
    9. MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
    10. branchSessionLock(globalSession, branchSession);
    11. try {
    12. //进行注册
    13. globalSession.addBranch(branchSession);
    14. } catch (RuntimeException ex) {
    15. branchSessionUnlock(branchSession);
    16. throw new BranchTransactionException(FailedToAddBranch, String
    17. .format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
    18. branchSession.getBranchId()), ex);
    19. }
    20. if (LOGGER.isInfoEnabled()) {
    21. LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
    22. globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
    23. }
    24. return branchSession.getBranchId();
    25. });
    26. }
    1. @Override
    2. public void addBranch(BranchSession branchSession) throws TransactionException {
    3. for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
    4. //跟入onAddBranch,选择AbstractSessionManager
    5. lifecycleListener.onAddBranch(this, branchSession);
    6. }
    7. branchSession.setStatus(BranchStatus.Registered);
    8. add(branchSession);
    9. }

    io.seata.server.storage.db.session.DataBaseSessionManager#addBranchSession方法

    1. @Override
    2. public void onAddBranch(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
    3. //跟入,选择DataBaseSessionManager
    4. addBranchSession(globalSession, branchSession);
    5. }
    1. @Override
    2. public void addBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException {
    3. if (StringUtils.isNotBlank(taskName)) {
    4. return;
    5. }
    6. //跟入
    7. boolean ret = transactionStoreManager.writeSession(LogOperation.BRANCH_ADD, session);
    8. if (!ret) {
    9. throw new StoreException("addBranchSession failed.");
    10. }
    11. }
    1. @Override
    2. public boolean writeSession(LogOperation logOperation, SessionStorable session) {
    3. if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
    4. return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    5. } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
    6. return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    7. } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
    8. return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    9. } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
    10. return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    11. } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
    12. return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    13. } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
    14. return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    15. } else {
    16. throw new StoreException("Unknown LogOperation:" + logOperation.name());
    17. }
    18. }
    1. @Override
    2. public boolean insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) {
    3. String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertBranchTransactionSQL(branchTable);
    4. Connection conn = null;
    5. PreparedStatement ps = null;
    6. try {
    7. int index = 1;
    8. conn = logStoreDataSource.getConnection();
    9. conn.setAutoCommit(true);
    10. ps = conn.prepareStatement(sql);
    11. ps.setString(index++, branchTransactionDO.getXid());
    12. ps.setLong(index++, branchTransactionDO.getTransactionId());
    13. ps.setLong(index++, branchTransactionDO.getBranchId());
    14. ps.setString(index++, branchTransactionDO.getResourceGroupId());
    15. ps.setString(index++, branchTransactionDO.getResourceId());
    16. ps.setString(index++, branchTransactionDO.getBranchType());
    17. ps.setInt(index++, branchTransactionDO.getStatus());
    18. ps.setString(index++, branchTransactionDO.getClientId());
    19. ps.setString(index++, branchTransactionDO.getApplicationData());
    20. return ps.executeUpdate() > 0;
    21. } catch (SQLException e) {
    22. throw new StoreException(e);
    23. } finally {
    24. IOUtil.close(ps, conn);
    25. }
    26. }

    Seata-server添加分支信息完成,到这里,一阶段结束,业务数据,undo_log,分支信息都已经写入数据库

    2.4.1.3 AT二阶段提交

    回到handleGlobalTransaction方法中,调用了transactionalTemplate.execute方法

    1. // 2. 开启全局事务beginTransaction
    2. beginTransaction(txInfo, tx);
    3. Object rs = null;
    4. try {
    5. // 执行业务方法business.execute()
    6. rs = business.execute();
    7. } catch (Throwable ex) {
    8. //上面是一阶段
    9. //下面是二阶段
    10. // 3.出现异常执行completeTransactionAfterThrowing回滚
    11. completeTransactionAfterThrowing(txInfo, tx, ex);
    12. throw ex;
    13. }
    14. // 4. 没有异常提交事务commitTransaction
    15. commitTransaction(tx);

    二阶段提交

    commitTransaction(tx);跟进

    1. private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    2. try {
    3. triggerBeforeCommit();
    4. //跟入
    5. tx.commit();
    6. triggerAfterCommit();
    7. } catch (TransactionException txe) {
    8. // 4.1 Failed to commit
    9. throw new TransactionalExecutor.ExecutionException(tx, txe,
    10. TransactionalExecutor.Code.CommitFailure);
    11. }
    12. }

    1. @Override
    2. public GlobalStatus commit(String xid) throws TransactionException {
    3. GlobalCommitRequest globalCommit = new GlobalCommitRequest();
    4. globalCommit.setXid(xid);
    5. //跟入syncCall
    6. GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
    7. return response.getGlobalStatus();
    8. }
    1. private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
    2. try {
    3. return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
    4. } catch (TimeoutException toe) {
    5. throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
    6. }
    7. }

    最终通过TM请求seata-server,Seata-server接收到全局提交请求(seata源码)

    DefaultCoordinator中

    1. @Override
    2. protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
    3. throws TransactionException {
    4. MDC.put(RootContext.MDC_KEY_XID, request.getXid());
    5. //跟入commit
    6. response.setGlobalStatus(core.commit(request.getXid()));
    7. }

     

    Seata-server接收到客户端全局提交请求后,先回调客户端,删除undo_log,seata在删除分支及全局事务

    之前说过RMClient在初始化时,设置资源管理器resourceManager,设置消息回调监听器用于接收TC在二阶段发出的提交或者回滚请求

    Seata-server删除分支数据及全局事务数据

    1. @Override
    2. public void removeBranch(BranchSession branchSession) throws TransactionException {
    3. // do not unlock if global status in (Committing, CommitRetrying, AsyncCommitting),
    4. // because it's already unlocked in 'DefaultCore.commit()'
    5. if (status != Committing && status != CommitRetrying && status != AsyncCommitting) {
    6. if (!branchSession.unlock()) {
    7. throw new TransactionException("Unlock branch lock failed, xid = " + this.xid + ", branchId = " + branchSession.getBranchId());
    8. }
    9. }
    10. for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
    11. //跟入
    12. lifecycleListener.onRemoveBranch(this, branchSession);
    13. }
    14. remove(branchSession);
    15. }
    1. private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {
    2. if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {
    3. if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
    4. throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,
    5. "Fail to store global session");
    6. } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
    7. throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,
    8. "Fail to update global session");
    9. } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
    10. throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,
    11. "Fail to remove global session");
    12. } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
    13. throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
    14. "Fail to store branch session");
    15. } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
    16. throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
    17. "Fail to update branch session");
    18. } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
    19. throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
    20. "Fail to remove branch session");
    21. } else {
    22. throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
    23. "Unknown LogOperation:" + logOperation.name());
    24. }
    25. }
    26. }
    1. public static void endCommitted(GlobalSession globalSession) throws TransactionException {
    2. globalSession.changeStatus(GlobalStatus.Committed);
    3. //删除全局事务
    4. globalSession.end();
    5. }

    客户端删除undo_log数据

    在接收提交里面

    1. protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
    2. throws TransactionException {
    3. String xid = request.getXid();
    4. long branchId = request.getBranchId();
    5. String resourceId = request.getResourceId();
    6. String applicationData = request.getApplicationData();
    7. if (LOGGER.isInfoEnabled()) {
    8. LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
    9. }
    10. //跟入
    11. BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
    12. applicationData);
    13. response.setXid(xid);
    14. response.setBranchId(branchId);
    15. response.setBranchStatus(status);
    16. if (LOGGER.isInfoEnabled()) {
    17. LOGGER.info("Branch commit result: " + status);
    18. }
    19. }

    getResourceManager获取的就是RMClient初始化时设置的资源管理器DataSourceManager

    1. public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
    2. String applicationData) throws TransactionException {
    3. return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);
    4. }
    1. @Override
    2. public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
    3. String applicationData) throws TransactionException {
    4. if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
    5. LOGGER.warn("Async commit buffer is FULL. Rejected branch [{}/{}] will be handled by housekeeping later.", branchId, xid);
    6. }
    7. return BranchStatus.PhaseTwo_Committed;
    8. }

     这边只是往一个ASYNC_COMMIT_BUFFER缓冲List中新增了一个二阶段提交的context,但真正提交在AsyncWorker的init()方法

    1. public synchronized void init() {
    2. LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);
    3. ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true));
    4. timerExecutor.scheduleAtFixedRate(() -> {
    5. try {
    6. //跟入
    7. doBranchCommits();
    8. } catch (Throwable e) {
    9. LOGGER.info("Failed at async committing ... {}", e.getMessage());
    10. }
    11. }, 10, 1000 * 1, TimeUnit.MILLISECONDS);
    12. }

    删除Undo_log

    二阶段回滚

    二阶段回滚seata-server端代码与二阶段提交类似,这里省略

    1. protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,
    2. RpcContext rpcContext) throws TransactionException {
    3. MDC.put(RootContext.MDC_KEY_XID, request.getXid());
    4. //全局回滚sea他接收请求
    5. response.setGlobalStatus(core.rollback(request.getXid()));
    6. }

     主要看回滚客户端如何进行事务补偿

    1. @Override
    2. public BranchRollbackResponse handle(BranchRollbackRequest request) {
    3. BranchRollbackResponse response = new BranchRollbackResponse();
    4. exceptionHandleTemplate(new AbstractCallback() {
    5. @Override
    6. public void execute(BranchRollbackRequest request, BranchRollbackResponse response)
    7. throws TransactionException {
    8. //跟入
    9. doBranchRollback(request, response);
    10. }
    11. }, request, response);
    12. return response;
    13. }
    1. public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
    2. String applicationData) throws TransactionException {
    3. DataSourceProxy dataSourceProxy = get(resourceId);
    4. if (dataSourceProxy == null) {
    5. throw new ShouldNeverHappenException();
    6. }
    7. try {
    8. UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
    9. } catch (TransactionException te) {
    10. StackTraceLogger.info(LOGGER, te,
    11. "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
    12. new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
    13. if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
    14. return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
    15. } else {
    16. return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
    17. }
    18. }
    19. return BranchStatus.PhaseTwo_Rollbacked;
    20. }

    最终回滚方法调用的是UndoLogManager.undo(dataSourceProxy, xid, branchId);

    判断undolog是否存在,存在则删除对应undolog,并一起提交,到此seata的AT模式源码解析完毕。  

  • 相关阅读:
    没做好这些准备,千万不要婚前同居!!
    HybridApp(混合应用)开发框架的优缺点分析
    第二课 我的第一个程序 hello world
    神经网络图片怎么分类的,图神经网络怎么做分类
    GIS与人工智能应用结合举例
    搭建单机测试SequoiaDB
    弘辽科技:淘宝店铺信用等级怎么看?信用等级怎么提升?
    TCP IP网络编程(六) 基于UDP的服务器端、客户端
    Nginx显示500错误原因和解决方法
    LeetCode每日一题(1169. Invalid Transactions)
  • 原文地址:https://blog.csdn.net/Eclipse_2019/article/details/126317642