• Seata四大模式之TCC模式详解及代码实现


      😊 @ 作者: 一恍过去
      🎊 @ 社区: Java技术栈交流
      🎉 @ 主题: Seata四大模式之TCC模式详解及代码实现
      ⏱️ @ 创作时间: 2022年06月24日

      1、实现机制

      1.1 提交阶段

      TCC模式是一种需要在业务代码中进行编码的分布式事务解决方案。

      • 一阶段:Try,进行资源的检测和预留。
      • 二阶段:
        • 提交:Confirm,完成资源操作业务;要求 Try 成功 Confirm 一定要能成功。
        • 回滚:Cancel,释放预留资源,可以理解为try的反向操作。

      1.2 实现逻辑

      在这里插入图片描述

      一阶段:
      try,尝试将资源进行锁定,比如需要扣减金额,并且记录一条扣减记录,执行业务(生成订单等操作)。

      二阶-事务提交
      将金额的扣减记录进行删除,表示完成整个事务过程。

      二阶-事务回滚
      获取扣减记录,从扣减中将金额进行恢复,表示数据回滚。

      TCC中的空回滚和业务悬挂:

      当某个分支事务在执行Try操作时,因为阻塞导致全局获取状态超时,从而执行Cancel操作,在未执行Try操作时执行了Cancel操作这就是空回滚。当执行空回滚的业务如果没有了阻塞并且继续执行Try操作,会导致无法执行后续的Confirm或者Cancel操作,这就是业务悬挂。

      1.3 优缺点

      • 优点:

        • 一阶段完成直接提交事务,释放数据库资源,性能好。
        • 相比AT模型,无需生成快照,无需使用全局锁,性能强。
        • 不依赖数据库事务,而是依赖补偿操作,可以用于非事务型数据库。
      • 缺点:

        • 有代码侵入,需要人为编写try、Confirm和Cancel接口,比较麻烦。
        • 软状态,事务是最终一致。
        • 需要考虑Confirm和Cancel的失败情况,做好幂等处理。

      2、代码实现

      创建两个SpringBoot工程,分别为storage-serviceorder-service,模拟从在order-service服务中新增订单,然后调用storage-service服务新增库存扣减记录,TCC的是需要开发者通过设计代码自行实现回滚补偿机制;核心代码如下,完整代码参考文末github地址

      2.1 建表语句

      -- 数据库名称: seata-tcc-demo.sql
      
      -- 订单表
      CREATE TABLE `tb_order`
      (
          `id`    int(11) NOT NULL COMMENT '主键',
          `count` int(11) NULL DEFAULT 0 COMMENT '下单数量',
          `money` int(11) NULL DEFAULT 0 COMMENT '金额',
          `status` int(11) NULL DEFAULT 1 COMMENT '状态:1:预处理,2-完成',
          PRIMARY KEY (`id`) USING BTREE
      ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = COMPACT;
      
      
      -- 库存表
      CREATE TABLE `tb_storage`
      (
          `id`       int(11) NOT NULL COMMENT '主键',
          `order_id` int(11) NOT NULL COMMENT '订单ID',
          `count`    int(11) NOT NULL DEFAULT 0 COMMENT '库存',
          `status` int(11) NULL DEFAULT 1 COMMENT '状态:1:预处理,2-完成',
          PRIMARY KEY (`id`) USING BTREE
      ) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = COMPACT;
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23

      2.2 order-service服务

      2.2.1 yaml配置

      server:
        port: 8082
      spring:
        application:
          name: order-service
        datasource:
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://127.0.0.1:3307/seata-at-demo?useUnicode=true&useSSL=false&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai
          username: root
          password: lhzlx
        cloud:
          nacos:
            discovery:
              server-addr: 127.0.0.1:8848
              namespace: 64ed9ca7-d705-4655-b4e4-f824e420a12a
              group: test
      
      seata:
        enabled: true
        application-id: ${spring.application.name}
        # 事务组的名称,对应service.vgroupMapping.default_tx_group=xxx中配置的default_tx_group
        tx-service-group: default_tx_group
        # 配置事务组与集群的对应关系
        service:
          vgroup-mapping:
            # default_tx_group为事务组的名称,default为集群名称
            default_tx_group: default
          disable-global-transaction: false
        registry:
          type: nacos
          nacos:
            application: seata-server
            server-addr: 127.0.0.1:8848
            group: SEATA_GROUP
            namespace: 64ed9ca7-d705-4655-b4e4-f824e420a12a
            username: nacos
            password: nacos
            cluster: default
        config:
          type: nacos
          nacos:
            server-addr: 162.14.115.18:8848
            group: SEATA_GROUP
            namespace: 64ed9ca7-d705-4655-b4e4-f824e420a12a
            username: nacos
            password: nacos
            data-id: seataServer.properties
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47

      2.2.2 Service接口

      在接口上使用@LocalTCC注解表示开启TCC模式,否则seata会认为是AT模式;

      @LocalTCC
      public interface OrderService {
      
          /**
           * 创建订单
           * @TwoPhaseBusinessAction 描述⼆阶段提交
           * name: 为 tcc⽅法的 bean 名称,需要全局唯⼀,⼀般写⽅法名即可
           * commitMethod: Commit⽅法的⽅法名
           * rollbackMethod:Rollback⽅法的⽅法名
           * @BusinessActionContextParamete 该注解⽤来修饰 Try⽅法的⼊参,
           * 被修饰的⼊参可以在 Commit ⽅法和 Rollback ⽅法中通过BusinessActionContext 获取。
           * @param order
           * @return
           */
          @TwoPhaseBusinessAction(name = "createOrderPrepare", commitMethod = "createOrderCommit", rollbackMethod = "createOrderRollBack")
          Order createOrderPrepare(@BusinessActionContextParameter(paramName = "order") Order order);
      
      
          /**
           * 提交
           * @param context
           * @return
           */
          Boolean createOrderCommit(BusinessActionContext context);
      
          /**
           * 回滚
           * @param context
           * @return
           */
          Boolean createOrderRollBack(BusinessActionContext context);
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32

      2.2.3 Service实现类

      @Slf4j
      @Service
      public class OrderServiceImpl implements OrderService {
      
          private static final Map<String, String> STATUS_MAP = new ConcurrentHashMap<>();
      
          @Resource
          private OrderMapper orderMapper;
      
      
          /**
           * 创建订单
           *
           * @param order
           * @return
           */
          @Override
          public Order createOrderPrepare(Order order) {
              // 0.获取事务id
              String xid = RootContext.getXID();
              log.info("创建订单预处理,xid={}",xid );
      
              // 设置为预处理状态
              order.setStatus(1);
      
              // 判断是否已经执行过了Cancel或者Confirm
              if(STATUS_MAP.get(xid)!=null){
                  // 表示已经执行了Cancel或者Confirm实现业务悬挂
                  return null;
              }
      
              orderMapper.insert(order);
              return order;
          }
          /**
           * 提交
           * @param context
           * @return
           */
          @Override
          public  Boolean createOrderCommit(BusinessActionContext context){
              try {
                  String xid = context.getXid();
                  // 将订单的状态修改为完成
                  log.info("创建订单提交处理,xid={}",xid );
      
                  // 幂等处理
                  if(STATUS_MAP.get(xid)!=null){
                      return true;
                  }
                  STATUS_MAP.put(xid,"Confirm");
      
                  Object obj = context.getActionContext("order");
                  if(obj!=null) {
                      Order order = JSON.parseObject(obj.toString(), Order.class);
                      if (order != null) {
                          order.setStatus(2);
                          orderMapper.updateById(order);
                      }
                  }
      
              }catch (Exception e){
                  log.error(e.getMessage());
              }
              return true;
          }
      
          /**
           * 回滚
           * @param context
           * @return
           */
          @Override
          public  Boolean createOrderRollBack(BusinessActionContext context){
              try {
                  String xid = context.getXid();
                  log.info("创建订单回滚处理,xid={}",xid );
      
                  // 幂等处理
                  if(STATUS_MAP.get(xid)!=null){
                      return true;
                  }
                  STATUS_MAP.put(xid,"Cancel");
      
                  // 将订单的状态修改为完成
                  Object obj = context.getActionContext("order");
                  if(obj!=null) {
                      Order order = JSON.parseObject(obj.toString(), Order.class);
                      // 将订单进行删除,表示回滚
                      if (order != null) {
                          log.info("删除订单ID:"+order.getId());
                          orderMapper.deleteById(order.getId());
                      }
                  }
              }catch (Exception e){
                  log.error(e.getMessage());
              }
              return true;
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100

      2.2.4 Controller

      @RestController
      @RequestMapping("order")
      public class OrderController {
          @Resource
          private TccHandler tccHandler;
      
          @PostMapping
          public ResponseEntity<String> createOrder(@RequestBody Order order) {
              long id = new Random().nextInt(999999999);
              order.setId(id);
              tccHandler.createOrderAndStorage(order);
              return ResponseEntity.status(HttpStatus.OK).body("操作成功");
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14

      2.2.5 TCC处理器

      @Component
      @Slf4j
      public class TccHandler {
      
          @Resource
          private OrderService orderService;
      
          @Resource
          private StorageClient storageClient;
      
          /**
           * 创建订单和库存记录的TCC处理器
           * 使用@GlobalTransactional开启全局事务
           * @param order
           * @return
           */
          @GlobalTransactional
          public void createOrderAndStorage(Order order) {
      
              // 记录订单数据
              log.info("开始记录订单数据...");
              Order orderPrepare = orderService.createOrderPrepare(order);
              log.info("结束记录订单数据...");
      
              // feign调用记录库存数据
              log.info("开始记录库存数据...");
              storageClient.deduct(orderPrepare.getId(),orderPrepare.getCount());
              log.info("结束记录库存数据...");
      
              // 模拟最后出现异常情况
              int a=1/0;
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33

      2.2.6 StorageClient

      @FeignClient("storage-service")
      public interface StorageClient {
          /**
           * 扣减库存
           *
           * @param orderId
           * @param count
           */
          @PostMapping("/storage")
          void deduct(@RequestParam("orderId") Long orderId, @RequestParam("count") Integer count);
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11

      2.3 storage-service服务

      2.3.1 yaml配置

      server:
        port: 8081
      spring:
        application:
          name: storage-service
        datasource:
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://127.0.0.1:3307/seata-at-demo?useUnicode=true&useSSL=false&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai
          username: root
          password: lhzlx
        cloud:
          nacos:
            discovery:
              server-addr: 127.0.0.1:8848
              namespace: 64ed9ca7-d705-4655-b4e4-f824e420a12a
              group: test
              # 在dev环境进行debug时,可以将时间设置长一些
              #heart-beat-interval: 1000 #心跳间隔。单位为毫秒,默认5*1000
              heart-beat-timeout: 300000 #心跳暂停,收不到心跳,会将实例设为不健康。单位为毫秒,默认15*1000
              ip-delete-timeout: 4000000 #Ip删除超时,收不到心跳,会将实例删除。单位为毫秒,默认30*1000
      
      
      seata:
        enabled: true
        application-id: ${spring.application.name}
        # 事务组的名称,对应service.vgroupMapping.default_tx_group=xxx中配置的default_tx_group
        tx-service-group: default_tx_group
        # 配置事务组与集群的对应关系
        service:
          vgroup-mapping:
            # default_tx_group为事务组的名称,default为集群名称
            default_tx_group: default
          disable-global-transaction: false
        registry:
          type: nacos
          nacos:
            application: seata-server
            server-addr: 162.14.115.18:8848
            group: SEATA_GROUP
            namespace: 64ed9ca7-d705-4655-b4e4-f824e420a12a
            username: nacos
            password: nacos
            cluster: default
        config:
          type: nacos
          nacos:
            server-addr: 162.14.115.18:8848
            group: SEATA_GROUP
            namespace: 64ed9ca7-d705-4655-b4e4-f824e420a12a
            username: nacos
            password: nacos
            data-id: seataServer.properties
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52

      2.3.2 Service接口

      在接口上使用@LocalTCC注解表示开启TCC模式,否则seata会认为是AT模式;

      @LocalTCC
      public interface StorageService {
      
              /**
               * 创建订单
               * @TwoPhaseBusinessAction 描述⼆阶段提交
               * name: 为 tcc⽅法的 bean 名称,需要全局唯⼀,⼀般写⽅法名即可
               * commitMethod: Commit⽅法的⽅法名
               * rollbackMethod:Rollback⽅法的⽅法名
               * @BusinessActionContextParamete 该注解⽤来修饰 Try⽅法的⼊参,
               * 被修饰的⼊参可以在 Commit ⽅法和 Rollback ⽅法中通过BusinessActionContext 获取。
               *
               * @param storage
               * @return
               */
              @TwoPhaseBusinessAction(name = "createPrepare", commitMethod = "deductCommit", rollbackMethod = "deductRollBack")
              void deductPrepare(@BusinessActionContextParameter(paramName = "storage") Storage storage);
      
              /**
               * 提交
               * @param context
               * @return
               */
              Boolean deductCommit(BusinessActionContext context);
      
              /**
               * 回滚
               * @param context
               * @return
               */
              Boolean deductRollBack(BusinessActionContext context);
      
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33

      2.3.3 Service实现类

      @Slf4j
      @Service
      public class StorageServiceImpl implements StorageService {
      
          private static final Map<String, String> STATUS_MAP = new ConcurrentHashMap<>();
      
          @Resource
          private StorageMapper storageMapper;
      
          /**
           * 扣除存储数量
           *
           */
          @Override
          public void deductPrepare( Storage storage) {
              // 0.获取事务id
              String xid = RootContext.getXID();
              log.info("记录库存信息预处理,xid={}",xid );
              try {
                  // 设置为预处理状态
                  storage.setStatus(1);
      
                  // 判断是否已经执行过了Cancel或者Confirm
                  if(STATUS_MAP.get(xid)!=null){
                      // 表示已经执行了Cancel或者Confirm实现业务悬挂
                      return ;
                  }
      
                  storageMapper.insert(storage);
      
                  // 下游服务抛出异常
                  // int a = 1 / 0;
              } catch (Exception e) {
                  throw new RuntimeException("扣减库存失败,可能是库存不足!", e);
              }
              log.info("库存信息记录成功");
          }
      
          /**
           * 提交
           * @param context
           * @return
           */
          @Override
          public  Boolean deductCommit(BusinessActionContext context){
              try {
                  String xid = context.getXid();
                  // 将状态修改为完成
                  log.info("记录库存信息提交处理,xid={}", xid);
                  // 幂等处理
                  if(STATUS_MAP.get(xid)!=null){
                      return true;
                  }
      
                  STATUS_MAP.put(xid,"Confirm");
      
                  Object obj = context.getActionContext("storage");
                  if (obj != null) {
                      Storage storage = JSON.parseObject(obj.toString(), Storage.class);
                      if (storage != null) {
                          storage.setStatus(2);
                          storageMapper.updateById(storage);
                      }
                  }
              }catch (Exception e){
                  log.error(e.getMessage());
              }
              return true;
          }
      
          /**
           * 回滚
           * @param context
           * @return
           */
          @Override
          public  Boolean deductRollBack(BusinessActionContext context){
              try {
                  String xid = context.getXid();
                  log.info("记录库存信息回滚处理,xid={}",xid );
      
                  // 幂等处理
                  if(STATUS_MAP.get(xid)!=null){
                      return true;
                  }
                  STATUS_MAP.put(xid,"Cancel");
      
                  // 将订单的状态修改为完成
                  Object obj = context.getActionContext("storage");
                  if(obj!=null) {
                      Storage storage = JSON.parseObject(obj.toString(), Storage.class);
                      if (storage != null) {
                          // 将记录进行删除,表示回滚
                          log.info("删除记录ID:"+storage.getId());
                          storageMapper.deleteById(storage.getId());
                      }
                  }
              }catch (Exception e){
                  log.error(e.getMessage());
              }
              return true;
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103

      2.3.4 Controller

      @RestController
      @RequestMapping("storage")
      public class StorageController {
      
          @Resource
          private StorageService storageService;
      
      
          /**
           * 扣减库存
           *
           * @param orderId 商品ID
           * @param count   要扣减的数量
           * @return
           */
          @PostMapping
          public ResponseEntity<Void> deduct(@RequestParam("orderId") Long orderId, @RequestParam("count") Integer count) {
              Storage storage  = new Storage();
              long id = new Random().nextInt(999999999);
              storage.setId(id);
              storage.setOrderId(orderId);
              storage.setCount(count);
              storageService.deductPrepare(storage);
              return ResponseEntity.status(HttpStatus.OK).body(null);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26

      注意: TCC就是通过手动编写自定义代码,实现事务的回滚与提交,而全局事务的控制还是由seata完成

      3 测试

      测试时没有做截图进行演示,只说明了结果,可以运行代码设置异常进行验证

      3.1 下游服务异常

      order-service服务中正常,在storage-service服务的service中抛出异常,观察数据是否成功回滚;如果tb_ordertb_storage都不存在数据,则表示全局事务成功;

      3.2 上游服务异常

      order-service服务的TccHandler中在执行storageClient.deduct()方法后抛出异常,在storage-service服务中正常,观察数据是否成功回滚;如果tb_ordertb_storage都不存在数据,则表示全局事务成功;

      3.3 数据最终一致性验证

      我们可以在上游服务执行完storageClient.deduct()后马上进入断点,测试去观察数据库会发现tb_ordertb_storage中存在数据,再放行断点使程序执行异常,再次观察数据库会发现tb_ordertb_storage中的数据已经被删除了;

      4、源码地址

      Seata值AT模式代码实现:《seata-tcc-demo》

    • 相关阅读:
      【开源】基于JAVA的音乐偏好度推荐系统
      0906几个内核前缀
      94. 二叉树的中序遍历(递归+迭代)
      04【@RequestMapping注解详解】
      CSS之复合选择器与伪类选择器
      Arbitrum奥德赛第一周跨链桥任务教程
      推荐算法详解
      PyCharm搭建Scrapy环境
      Feign远程调用时的步骤
      MySQL 游标的详解
    • 原文地址:https://blog.csdn.net/zhuocailing3390/article/details/125351942