• 分布式事务解决方案Seata-Golang浅析


    前言

    Seata 是一款简单易用,高性能、开源的一站式分布式事务解决方案,由阿里开源。

    2020 年 4 月,刘晓敏开始基于 Seata 着手做多语言 golang 项目,经过一年时间的开发,seata-golang 发布了 1.0.0 版本。

    Seata架构

    在这里插入图片描述

    ● TC:Transaction coordinator,它是一个分布式事务协调器。
    ● TM:Transaction manager,它是一个事务管理器,负责全局事务的开启、提交和回滚。
    ● RM:Resource Manager,它是管理分支事务资源的,它加入全局事务组后,向 TC 报告分支事务的执行状态。
    ● XID:TM 开启全局事务时,会在 TC 创建一个 GlobalSession,GlobalSession 的全局唯一标识即为 XID。
    ● BranchID:RM 向 TC 注册分支事务后,在 TC 侧生成一个 BranchSession,BranchID 全局唯一标识这个 BranchSession。

    当 RM 向 TC 报告分支执行失败时,TC 会标记这个 BranchSession 的状态为失败,然后 TM 发起回滚时,TC 根据 XID 找到所有成功执行的事务分支,通知他们进行回滚。

    下面具体是在下单购买业务上的一个完整流程
    在这里插入图片描述

    用户向聚合层发起下单请求

    1. 聚合层TM向TC请求开始一个全局事务,TC创建GlobalSession,代表一个全局事务的会话
    2. TC返回给聚合层TM一个Xid,用于标示这个全局事务,之后聚合层TM将获取的Xid分别给到订单服务和库存服务
    3. 订单服务向TC注册事务分支,TC会创建一个BranchSession,并添加到全局事务下
    4. TC返回给订单服务一个BranchId,用于唯一表示这个分支事务
    5. 库存服务向TC注册事务分支,TC会创建一个BranchSession,并添加到全局事务下
    6. TC返回给库存服务一个BranchId,用于唯一表示这个分支事务
    7. 库存服务如果执行本地逻辑失败,会向TC报告,发起BranchReportRequest;并会返回err给TM,
    8. TM感知到库存服务本地分支事务执行失败后,TM会向TC发起全局回滚,即GlobalRollbackRequest
    9. TC认为全局事务失败,通知各个服务的RM,让RM分别在本地的事务分支发起回滚

    把上述流程中涉及到的seata方法抽象出来:

    这里可以先忽略SAGA模式,seata-golang中没有去实现
    在这里插入图片描述

    Seata Golang 支持的模式

    Seata-Golang支持AT和TCC两种模式。

    概括地说:

    • AT依赖于本地数据库事务特性,包括回滚、提交等

    • TCC模式依赖于用户自定义实现对事务的回滚和提交,抽象出三个方法:Try、Confirm、Cancel,这也是为什么叫TCC

    AT模式

    AT模式的回滚依赖对执行操作的补偿,seata-golang基于TIDB的Parser来辨别用户做了什么操作,构造成Undo Log 之后针对这些操作进行回滚补偿。

    TCC模式

    依赖开发者实现seata的TCC接口,即自定义Try、Confirm、Cancel的方法

    对开发者对约束较多

    Seata源码浅析

    1.TC 全局事务协调器

    type TransactionCoordinator struct {
    	sync.Mutex
    	maxCommitRetryTimeout            int64
    	maxRollbackRetryTimeout          int64
    	rollbackDeadSeconds              int64
    	rollbackRetryTimeoutUnlockEnable bool
    
    	asyncCommittingRetryPeriod time.Duration
    	committingRetryPeriod      time.Duration
    	rollingBackRetryPeriod     time.Duration
    	timeoutRetryPeriod         time.Duration
    
    	streamMessageTimeout time.Duration
    
    	holder             holder.SessionHolderInterface
    	resourceDataLocker lock.LockManagerInterface
    	locker             GlobalSessionLocker
    
    	idGenerator        *atomic.Uint64
    	futures            *sync.Map
    	activeApplications *sync.Map
    	callBackMessages   *sync.Map
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    Begin()方法开启全局事务

    func (tc *TransactionCoordinator) Begin(ctx context.Context, request *apis.GlobalBeginRequest) (*apis.GlobalBeginResponse, error) {
    	// 生成XID
    	transactionID := uuid.NextID()
    	xid := common.GenerateXID(request.Addressing, transactionID)
    	// 全局事务
    	gt := model.GlobalTransaction{
    	    // 全局会话
    		GlobalSession: &apis.GlobalSession{
    			Addressing:      request.Addressing,
    			XID:             xid,
    			TransactionID:   transactionID,
    			TransactionName: request.TransactionName,
    			Timeout:         request.Timeout,
    		},
    	}
    	// 开启全局事务,实际上是修改了全局事务的状态
    	gt.Begin()
    	// 把全局事务加到holder中
    	err := tc.holder.AddGlobalSession(gt.GlobalSession)
    	if err != nil {
    		return &apis.GlobalBeginResponse{
    			ResultCode:    apis.ResultCodeFailed,
    			ExceptionCode: apis.BeginFailed,
    			Message:       err.Error(),
    		}, nil
    	}
    	// 启动协程,接收全局事务事件
    	runtime.GoWithRecover(func() {
    		evt := event.NewGlobalTransactionEvent(gt.TransactionID, event.RoleTC, gt.TransactionName, gt.BeginTime, 0, gt.Status)
    		event.EventBus.GlobalTransactionEventChannel <- evt
    	}, nil)
    
    	log.Infof("successfully begin global transaction xid = {}", gt.XID)
    	return &apis.GlobalBeginResponse{
    		ResultCode: apis.ResultCodeSuccess,
    		XID:        xid,
    	}, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    TC 全局事务协调器有三大概念

    1.1 LockManager

    AT模式下,将分支事务上的数据锁到TC上,保证事务数据的隔离
    在这里插入图片描述

    1.2 SessionManager

    管理BranchSession,记录事务执行状态、持久化事务数据

    目前有两种实现方式:

    • FileBasedSessionManager:事务数据保存到内存,并持久化到本地文件
    • DBBasedSessionManager:事务数据持久化到数据库中,采用这种方式,TC可做到高可用

    后续可能会考虑ETCD或者对TC做Raft,是后续社区会考虑到议题。

    1.3 Transaction Manager

    即TM,事务管理器,下面会介绍

    2.TM 事务管理器

    根据前面的描述,TM负责全局的一些操作,包括请求TC开启全局事务、全局提交、全局回滚等操作

    type TransactionManagerInterface interface {
    	// GlobalStatus_Begin a new global transaction.
    	Begin(ctx context.Context, name string, timeout int32) (string, error)
    
    	// Global commit.
    	Commit(ctx context.Context, xid string) (apis.GlobalSession_GlobalStatus, error)
    
    	// Global rollback.
    	Rollback(ctx context.Context, xid string) (apis.GlobalSession_GlobalStatus, error)
    
    	// Get current status of the give transaction.
    	GetStatus(ctx context.Context, xid string) (apis.GlobalSession_GlobalStatus, error)
    
    	// Global report.
    	GlobalReport(ctx context.Context, xid string, globalStatus apis.GlobalSession_GlobalStatus) (apis.GlobalSession_GlobalStatus, error)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    值得注意的是,TransactionManagerInterface中的方法,其实是通知TC去做对应操作

    以Rollback方法为例,具体实现就是请求TC进行全局事务的回滚,TC会通知各个RM进行本地分支事务的回滚。

    func (manager *TransactionManager) Rollback(ctx context.Context, xid string) (apis.GlobalSession_GlobalStatus, error) {
        // RPC请求TC,让其进行全局事务回滚
    	request := &apis.GlobalRollbackRequest{XID: xid}
    	resp, err := manager.rpcClient.Rollback(ctx, request)
    	if err != nil {
    		return 0, err
    	}
    	if resp.ResultCode == apis.ResultCodeSuccess {
    		return resp.GlobalStatus, nil
    	}
    	return 0, &exception.TransactionException{
    		Code:    resp.GetExceptionCode(),
    		Message: resp.GetMessage(),
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    3.RM 资源管理器

    RM主要负责本地分支事务的管理

    type ResourceManagerInterface interface {
    	BranchCommit(ctx context.Context, request *apis.BranchCommitRequest) (*apis.BranchCommitResponse, error)
    
    	BranchRollback(ctx context.Context, request *apis.BranchRollbackRequest) (*apis.BranchRollbackResponse, error)
    
    	// RegisterResource Register a Resource to be managed by Resource Manager.
    	RegisterResource(resource model.Resource)
    
    	// UnregisterResource Unregister a Resource from the Resource Manager.
    	UnregisterResource(resource model.Resource)
    
    	// GetBranchType ...
    	GetBranchType() apis.BranchSession_BranchType
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    除此之外,RM还负责在全局事务启动的时候,向TC注册事务分支,所以seata-golang抽取出来三个通知TC的接口

    type ResourceManagerOutbound interface {
    	// BranchRegister 向TC注册事务分支
    	BranchRegister(ctx context.Context, xid string, resourceID string, branchType apis.BranchSession_BranchType,
    		applicationData []byte, lockKeys string) (int64, error)
    
    	// BranchReport 报告分支事务的执行状态
    	BranchReport(ctx context.Context, xid string, branchID int64, branchType apis.BranchSession_BranchType,
    		status apis.BranchSession_BranchStatus, applicationData []byte) error
    
    	// LockQuery lock resource by lockKeys.
    	LockQuery(ctx context.Context, xid string, resourceID string, branchType apis.BranchSession_BranchType, lockKeys string) (bool, error)
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
  • 相关阅读:
    多模态大模型时代下的文档图像智能分析与处理
    基于51单片机的两路数字电压表Protues仿真设计
    Spring Boot 技术架构图(InsCode AI 创作助手辅助)
    以客户为中心,CRM系统可以给企业带来哪些价值?值得一看
    几十万换来的DDoS攻击防护经验(转载)
    研发效能|DevOps 是运维还是开发?
    Linux安装 spark 教程详解
    DNA脱氧核糖核酸修饰金属铂纳米颗粒PtNPS-DNA|科研试剂
    PDF有限制密码,不能复制怎么办?
    docker的基础命令&docker镜像和docker容器操作
  • 原文地址:https://blog.csdn.net/weixin_43889841/article/details/125441843