• 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么【云原生】


    一、前言

    至此,seata系列的内容包括:

    1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决
    2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
    3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
    4. 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
    5. 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
    6. 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server

    本文着重聊一聊seata-server启动时都做了什么?

    PS:前文中搭建的Seata案例,seata的版本为1.3.0,而本文开始的源码分析将基于当前(2022年8月)最新的版本1.5.2进行源码解析。

    二、Seata Server启动

    Seata Server包含几个主要模块:Config(配置TC)、Store(TC运行时全局事务以及分支事务的相关信息通过Store持久化)、Coordinator(TC实现事务协调的核心)、Netty-RPC(负责TC与TM/RM交互)、Lock(资源全局锁的实现);

    1、找入口

    当要启动一个seata-server时,只需要执行压缩包中bin/目录下的seata-server.sh,在这个脚本中会运行seata-server.jar

    在这里插入图片描述

    即对应于源码工程中的server目录 / seata-server 模块,由于seata-server是一个SpringBoot项目,找到其启动类ServerApplication,里面仅仅指定了一个包扫描路径为io.seata,并无其余特殊配置;
    在这里插入图片描述

    在启动类的同级目录下,有一个ServerRunner类;
    在这里插入图片描述

    ServerRunner类实现了CommandLineRunner接口:
    在这里插入图片描述

    CommandLineRunner接口主要用于实现在Spring容器初始化后执行,并且在整个应用生命周期内只会执行一次;也就是说在Spring容器初始化后会执行ServerRunner#run()方法;
    在这里插入图片描述
    在这里插入图片描述

    ServerRunner#run()方法中仅仅调用了Server#start()方法;因此可以确定入口为io.seata.server.Server类的start()方法;

    2、整体执行流程

    Server#start()方法:

    public class Server {
        /**
         * The entry point of application.
         *
         * @param args the input arguments
         */
        public static void start(String[] args) {
            // create logger
            final Logger logger = LoggerFactory.getLogger(Server.class);
    
            //initialize the parameter parser
            //Note that the parameter parser should always be the first line to execute.
            //Because, here we need to parse the parameters needed for startup.
            // 1. 对配置文件做参数解析:包括registry.conf、file.conf的解析
            ParameterParser parameterParser = new ParameterParser(args);
    
            // 2、初始化监控,做metric指标采集
            MetricsManager.get().init();
    
            // 将Store资源持久化方式放到系统的环境变量store.mode中
            System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
    
            // seata server里netty server 的io线程池(核心线程数50,最大线程数100)
            ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
                    NettyServerConfig.getMaxServerPoolSize(),
                    NettyServerConfig.getKeepAliveTime(),
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
                    new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
    
            // 3、创建TC与RM/TM通信的RPC服务器--netty
            NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
    
            // 4、初始化UUID生成器(雪花算法)
            UUIDGenerator.init(parameterParser.getServerNode());
    
            //log store mode : file, db, redis
            // 5、设置事务会话的持久化方式,有三种类型可选:file/db/redis
            SessionHolder.init(parameterParser.getSessionStoreMode());
            LockerManagerFactory.init(parameterParser.getLockStoreMode());
    
            // 6、创建并初始化事务协调器,创建时后台会启动一堆线程
            DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
            coordinator.init();
    
            // 将DefaultCoordinator作为Netty Server的transactionMessageHandler;
            // 用于做AT、TCC、SAGA等不同事务类型的逻辑处理
            nettyRemotingServer.setHandler(coordinator);
    
            // let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
            // 7、注册ServerRunner销毁(Spring容器销毁)的回调钩子函数
            ServerRunner.addDisposable(coordinator);
    
            //127.0.0.1 and 0.0.0.0 are not valid here.
            if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
                XID.setIpAddress(parameterParser.getHost());
            } else {
                String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);
                if (StringUtils.isNotBlank(preferredNetworks)) {
                    XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));
                } else {
                    XID.setIpAddress(NetUtil.getLocalIp());
                }
            }
            // 8、启动netty Server,用于接收TM/RM的请求
            nettyRemotingServer.init();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    Server端的启动流程大致做了八件事:

    1. 对配置文件(包括registry.conf、file.conf)做参数解析;
    2. 初始化监控,做metric指标采集;
    3. 创建TC与RM/TM通信的RPC服务器(NettyRemotingServer)–netty;
    4. 初始化UUID生成器(雪花算法),用于生成全局事务id和分支事务id;
    5. 设置事务会话(SessionHolder)、全局锁(LockManager)的持久化方式并初始化,有三种类型可选:file/db/redis;
    6. 创建并初始化事务协调器(DefaultCoordinator),后台启动一堆线程做定时任务,并将DefaultCoordinator绑定到RPC服务器上做为transactionMessageHandler
    7. 注册ServerRunner销毁(Spring容器销毁)的回调钩子函数DefaultCoordinator;
    8. 启动netty Server,用于接收TM/RM的请求;

    1)对配置文件做参数解析

    具体代码执行流程如下:

    在这里插入图片描述

    ParameterParser的init()方法中:

    1. 首先从启动命令(运行时参数)中解析;
    2. 接着判断server端是否在容器中启动,是则从容器环境中获取seata环境、host、port、serverNode、storeMode存储模式等信息;
    3. 如果storeMode不存在,则从配置中心/文件中获取配置。
    // 解析运行期参数,默认什么里面什么都没有
    private void getCommandParameters(String[] args) {
        JCommander jCommander = JCommander.newBuilder().addObject(this).build();
        jCommander.parse(args);
        if (help) {
            jCommander.setProgramName(PROGRAM_NAME);
            jCommander.usage();
            System.exit(0);
        }
    }
    
    // server端在容器中启动,则从容器环境中读取环境、host、port、server节点以及StoreMode存储模式
    private void getEnvParameters() {
        // 设置seata的环境
        if (StringUtils.isBlank(seataEnv)) {
            seataEnv = ContainerHelper.getEnv();
        }
        // 设置Host
        if (StringUtils.isBlank(host)) {
            host = ContainerHelper.getHost();
        }
        // 设置端口号
        if (port == 0) {
            port = ContainerHelper.getPort();
        }
        if (serverNode == null) {
            serverNode = ContainerHelper.getServerNode();
        }
        if (StringUtils.isBlank(storeMode)) {
            storeMode = ContainerHelper.getStoreMode();
        }
        if (StringUtils.isBlank(sessionStoreMode)) {
            sessionStoreMode = ContainerHelper.getSessionStoreMode();
        }
        if (StringUtils.isBlank(lockStoreMode)) {
            lockStoreMode = ContainerHelper.getLockStoreMode();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    2)初始化监控

    在这里插入图片描述

    默认不开启,此处不做过多介绍

    3)创建TC与RM/TM通信的RPC服务器

    在这里插入图片描述

    单纯的new一个NettyRemotingServer,也没啥可说的;

    4)初始化UUID生成器

    UUID底层采用雪花算法,其用于生成全局事务id和分支事务id;

    代码执行流程如下:

    在这里插入图片描述

    UUIDGenerator会委托IdWorker来生成雪花id,生成的雪花Id由0、10位的workerId、41位的时间戳、12位的sequence序列号组成。

    IdWorker

    IdWorker中有8个重要的成员变量/常量:

    /**
     * Start time cut (2020-05-03)
     */
    private final long twepoch = 1588435200000L;
    
    /**
     * The number of bits occupied by workerId
     */
    private final int workerIdBits = 10;
    
    /**
     * The number of bits occupied by timestamp
     */
    private final int timestampBits = 41;
    
    /**
     * The number of bits occupied by sequence
     */
    private final int sequenceBits = 12;
    
    /**
     * Maximum supported machine id, the result is 1023
     */
    private final int maxWorkerId = ~(-1 << workerIdBits);
    
    /**
     * business meaning: machine ID (0 ~ 1023)
     * actual layout in memory:
     * highest 1 bit: 0
     * middle 10 bit: workerId
     * lowest 53 bit: all 0
     */
    private long workerId;
    
    /**
     * 又是一个雪花算法(64位,8字节)
     * timestamp and sequence mix in one Long
     * highest 11 bit: not used
     * middle  41 bit: timestamp
     * lowest  12 bit: sequence
     */
    private AtomicLong timestampAndSequence;
    
    /**
     * 从一个long数组类型中抽取出一个时间戳伴随序列号,偏向一个辅助性质
     * mask that help to extract timestamp and sequence from a long
     */
    private final long timestampAndSequenceMask = ~(-1L << (timestampBits + sequenceBits));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    变量/常量解释:

    1. 常量twepoch表示我们的时间戳时间从2020-05-03开始计算,即当前时间的时间戳需要减去twepoch的值1588435200000L
    2. 常量workerIdBits表示机器号workerId占10位;
    3. 常量timestampBits表示时间戳timestamp占41位;
    4. 常量sequenceBits表示序列化占12位;
    5. 常量maxWorkerId表示机器号的最大值为1023;
    6. long类型的变量workerId本身也是一个雪花算法,只是从开头往后数,第2位开始,一共10位用来表示workerId,其余位全是0;
    7. AtomicLong类型的变量timestampAndSequence,其本身也是一个雪花算法,头11位不使用,中间41位表示timestamp,最后12位表示sequence;
    8. long类型的常量timestampAndSequenceMask,用于从一个完整的雪花ID(long类型)中摘出timestamp 和 sequence

    IdWorker构造器中会分别初始化TimestampAndSequence、WorkerId。

    1> initTimestampAndSequence()

    initTimestampAndSequence()方法负责初始化timestampsequence

    private void initTimestampAndSequence() {
        // 拿到当前时间戳 - (2020-05-03 时间戳)的数值,即当前时间相对2020-05-03的时间戳
        long timestamp = getNewestTimestamp();
        // 把时间戳左移12位,后12位流程sequence使用
        long timestampWithSequence = timestamp << sequenceBits;
        // 把混合sequence(默认为0)的时间戳赋值给timestampAndSequence
        this.timestampAndSequence = new AtomicLong(timestampWithSequence);
    }
    
    // 获取当前时间戳
    private long getNewestTimestamp() {
        //当前时间的时间戳减去2020-05-03的时间戳
        return System.currentTimeMillis() - twepoch;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    2> initWorkerId(Long)

    initWorkerId(Long workerId)方法负责初始化workId,默认不会传过来workerId,如果传过来则使用传过来的workerId,并校验其不能大于1023,然后将其左移53位;

    private void initWorkerId(Long workerId) {
       if (workerId == null) {
           // workid为null时,自动生成一个workerId
           workerId = generateWorkerId();
       }
       // workerId最大只能是1023,因为其只占10bit
       if (workerId > maxWorkerId || workerId < 0) {
           String message = String.format("worker Id can't be greater than %d or less than 0", maxWorkerId);
           throw new IllegalArgumentException(message);
       }
       this.workerId = workerId << (timestampBits + sequenceBits);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    如果没传则基于MAC地址生成;
    在这里插入图片描述

    如果基于MAC地址生成workerId出现异常,则以1023为基数生成一个随机的workerId;
    在这里插入图片描述

    最后同样,校验workerId不能大于1023,然后将其左移53位;

    5)设置事务会话(SessionHolder)、全局锁(LockManager)的持久化方式并初始化

    在这里插入图片描述

    1> SessionHolder

    SessionHolder负责事务会话Session的持久化,一个session对应一个事务,事务又分为全局事务和分支事务;

    SessionHolder支持db,file和redis的持久化方式,其中redis和db支持集群模式,项目上推荐使用redis或db模式;

    SessionHolder有五个重要的属性,如下:

    // 用于管理所有的Setssion,以及Session的创建、更新、删除等
    private static SessionManager ROOT_SESSION_MANAGER;
    // 用于管理所有的异步commit的Session,包括创建、更新以及删除
    private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
    // 用于管理所有的重试commit的Session,包括创建、更新以及删除
    private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
    // 用于管理所有的重试rollback的Session,包括创建、更新以及删除
    private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;
    // 用于管理分布式锁
    private static DistributedLocker DISTRIBUTED_LOCKER;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    这五个属性在SessionHolder#init()方法中初始化,init()方法源码如下:

    public static void init(String mode) {
        if (StringUtils.isBlank(mode)) {
            mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE,
                    CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));
        }
        StoreMode storeMode = StoreMode.get(mode);
        // 根据storeMode采用SPI机制初始化SessionManager
        // db模式
        if (StoreMode.DB.equals(storeMode)) {
            ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
            ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});
            RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});
            RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
    
            DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.DB.getName());
        } else if (StoreMode.FILE.equals(storeMode)) {
            // 文件模式
            String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,
                    DEFAULT_SESSION_STORE_FILE_DIR);
            if (StringUtils.isBlank(sessionStorePath)) {
                throw new StoreException("the {store.file.dir} is empty.");
            }
            ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
                new Object[]{ROOT_SESSION_MANAGER_NAME, sessionStorePath});
            ASYNC_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
            RETRY_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
            RETRY_ROLLBACKING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
    
            DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.FILE.getName());
        } else if (StoreMode.REDIS.equals(storeMode)) {
            // redis模式
            ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName());
            ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
                StoreMode.REDIS.getName(), new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});
            RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
                StoreMode.REDIS.getName(), new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});
            RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
                StoreMode.REDIS.getName(), new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
    
            DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.REDIS.getName());
        } else {
            // unknown store
            throw new IllegalArgumentException("unknown store mode:" + mode);
        }
        // 根据storeMode重新加载
        reload(storeMode);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    init()方法中根据storeMode采用SPI机制初始化SessionManager,SessionManager有三个实现类:
    在这里插入图片描述

    2> LockerManager

    在这里插入图片描述

    SessionHolder一样,LockManagerFactory#init()方法同样根据storeMode采用SPI机制初始化LockManager,LockManager有三个实现类:

    在这里插入图片描述

    6)创建并初始化事务协调器(DefaultCoordinator

    DefaultCoordinator是事务协调的核心,比如:开启、提交、回滚全局事务,注册、提交、回滚分支事务都是通过DefaultCoordinator进行协调处理的。

    在这里插入图片描述
    (1)先来看DefaultCoordinator的创建;
    在这里插入图片描述

    使用Double Check Lock(DCL-双重检查锁)机制获取到单例的DefaultCoordinator;如果DefaultCoordinator为实例化过,则new一个:
    在这里插入图片描述

    DefaultCoordinator的类构造器中,首先绑定远程通信的Server的具体实现到内部成员中,然后实例化一个DefaultCore,DefaultCore是AT、TCC、XA、Saga四种分布式事务模式的具体实现类;
    在这里插入图片描述

    DefaultCore的类构造器中首先通过SPI机制加载出所有的AbstractCore的子类,一共有四个:ATCore、TccCore、SagaCore、XACore;然后将AbstractCore子类可以处理的事务模式作为Key、AbstractCore子类作为Value存储到一个缓存Map(Map coreMap)中;

    private static Map<BranchType, AbstractCore> coreMap = new ConcurrentHashMap<>();
    
    • 1

    后续通过BranchType(分支类型)就可以从coreMap中获取到相应事务模式的具体AbstractCore实现类。

    (2)初始化DefaultCoordinator;

    在这里插入图片描述

    所谓的初始化,其实就是后台启动一堆线程做定时任务;去定时处理重试回滚、重试提交、异步提交、超时的检测,以及定时清理undo_log。

    除定时清理undo_log外,其余定时任务的处理逻辑基本都是:

    1. 首先获取所有可回滚的全局事务会话Session,如果可回滚的分支事务为空,则直接返回;
    2. 否者,遍历所有的可回滚Session;为了防止重复回滚,如果session的状态是正在回滚中并且session不是死亡的,则直接返回;
    3. 如果Session重试回滚超时,从缓存中删除已经超时的回滚Session;
    4. 发布session回滚完成事件给到Metric,对回滚中的Session添加Session生命周期的监听;
    5. 使用DefaultCoordinator组合的DefaultCore执行全局回滚。

    以处理重试回滚的方法handleRetryRollbacking()为例:

    protected void handleRetryRollbacking() {
        SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);
        sessionCondition.setLazyLoadBranch(true);
        // 获取所有的可回滚的全局事务session
        Collection<GlobalSession> rollbackingSessions =
            SessionHolder.getRetryRollbackingSessionManager().findGlobalSessions(sessionCondition);
        // 如果可回滚的分支事务为空,则直接返回
        if (CollectionUtils.isEmpty(rollbackingSessions)) {
            return;
        }
        long now = System.currentTimeMillis();
        // 遍历所有的可回滚Session,
        SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {
            try {
                // prevent repeated rollback
                // 防止重复回滚:如果session的状态是正在回滚中并且session不是死亡的,则直接返回。
                if (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking)
                    && !rollbackingSession.isDeadSession()) {
                    // The function of this 'return' is 'continue'.
                    return;
                }
                // 判断回滚是否重试超时
                if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {
                    if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {
                        rollbackingSession.clean();
                    }
                    // Prevent thread safety issues
                    // 删除已经超时的回滚Session
                    SessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);
                    LOGGER.error("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid());
    
                    SessionHelper.endRollbackFailed(rollbackingSession, true);
    
                    // rollback retry timeout event
                    // 发布session回滚完成事件给到Metric
                    MetricsPublisher.postSessionDoneEvent(rollbackingSession, GlobalStatus.RollbackRetryTimeout, true, false);
    
                    //The function of this 'return' is 'continue'.
                    return;
                }
                // 对回滚中的Session添加Session生命周期的监听
                rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
                // 使用DefaultCoordinator组合的DefaultCore执行全局回滚
                core.doGlobalRollback(rollbackingSession, true);
            } catch (TransactionException ex) {
                LOGGER.info("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());
            }
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    7)注册ServerRunner销毁(Spring容器销毁)的回调钩子函数DefaultCoordinator

    8)启动NettyServer(NettyRemotingServer)

    启动NettyRemotingServer时会做两件事:注册消息处理器、初始化并启动NettyServerBootstrap
    在这里插入图片描述

    1> 首先注册消息处理器

    消息处理器是用来处理消息的,其根据消息的不同类型选择不同的消息处理器来处理消息(属于典型的策略模式);

    在这里插入图片描述

    每个消息类型和对应的处理器关系如下:

    所谓的注册消息处理器本质上就是将处理器RemotingProcessor和处理消息的线程池ExecutorService包装成一个Pair,然后将Pair作为Value,messageType作为key放入一个Map(processorTable)中;
    在这里插入图片描述

    /**
     * This container holds all processors.
     * processor type {@link MessageType}
     */
    protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    2> 初始化NettyRemotingServer

    在初始化NettyRemotingServer之前会通过AtomicBoolean类型的原子变量initialized + CAS操作确保仅会有一个线程进行NettyRemotingServer的初始化;

    在这里插入图片描述

    再看NettyRemotingServer的类继承图:
    在这里插入图片描述

    CAS成功后进入到NettyRemotingServer的父类AbstractNettyRemotingServer#init()方法;

    在这里插入图片描述

    方法中:

    (1)首先调用父类AbstractNettyRemoting的init()方法:
    在这里插入图片描述
    启动一个延时3s,每3s执行一次的定时任务,做请求超时检查;

    (2)紧接着启动ServerBootstrap(就正常的nettyServer启动):

    在这里插入图片描述
    NettyRemotingServer在启动的过程中设置了4个ChannelHandler:

    1. IdleStateHandler:处理心跳
    2. ProtocolV1Decoder:消息解码器
    3. ProtocolV1Encoder:消息编码器
    4. AbstractNettyRemotingServer.ServerHandler:处理各种消息
    AbstractNettyRemotingServer.ServerHandler类

    在这里插入图片描述
    ServerHandler类上有个@ChannelHandler.Sharable注解,其表示所有的连接都会共用这一个ChannelHandler;所以当消息处理很慢时,会降低并发。

    processMessage(ctx, (RpcMessage) msg)方法中会根据消息类型获取到 请求处理组件(消息的处理过程是典型的策略模式),如果消息对应的处理器设置了线程池,则放到线程池中执行;如果对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;所以在seata-server中大部分处理器都有对应的线程池。

    /**
     * Rpc message processing.
     *
     * @param ctx        Channel handler context.
     * @param rpcMessage rpc message.
     * @throws Exception throws exception process message error.
     * @since 1.3.0
     */
    protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
        }
        Object body = rpcMessage.getBody();
        if (body instanceof MessageTypeAware) {
            MessageTypeAware messageTypeAware = (MessageTypeAware) body;
            // 根据消息的类型获取到请求处理组件和请求处理线程池组成的Pair
            final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
            if (pair != null) {
                // 如果消息对应的处理器设置了线程池,则放到线程池中执行
                if (pair.getSecond() != null) {
                    try {
                        pair.getSecond().execute(() -> {
                            try {
                                pair.getFirst().process(ctx, rpcMessage);
                            } catch (Throwable th) {
                                LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                            } finally {
                                MDC.clear();
                            }
                        });
                    } catch (RejectedExecutionException e) {
                        // 线程池拒绝策略之一,抛出异常:RejectedExecutionException
                        LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                            "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                        if (allowDumpStack) {
                            String name = ManagementFactory.getRuntimeMXBean().getName();
                            String pid = name.split("@")[0];
                            long idx = System.currentTimeMillis();
                            try {
                                String jstackFile = idx + ".log";
                                LOGGER.info("jstack command will dump to " + jstackFile);
                                Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
                            } catch (IOException exx) {
                                LOGGER.error(exx.getMessage());
                            }
                            allowDumpStack = false;
                        }
                    }
                } else {
                    // 对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;
                    try {
                        pair.getFirst().process(ctx, rpcMessage);
                    } catch (Throwable th) {
                        LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                    }
                }
            } else {
                LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
            }
        } else {
            LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    三、总结和后续

    本文我们聊了Seata Server启动时都做了哪些事?博主总结一共八件事:

    1. 对配置文件(包括registry.conf、file.conf)做参数解析;
    2. 初始化监控,做metric指标采集;
    3. 创建TC与RM/TM通信的RPC服务器(NettyRemotingServer)–netty;
    4. 初始化UUID生成器(雪花算法),用于生成全局事务id和分支事务id;
    5. 设置事务会话(SessionHolder)、全局锁(LockManager)的持久化方式并初始化,有三种类型可选:file/db/redis;
    6. 创建并初始化事务协调器(DefaultCoordinator),后台启动一堆线程做定时任务,并将DefaultCoordinator绑定到RPC服务器上做为transactionMessageHandler
    7. 注册ServerRunner销毁(Spring容器销毁)的回调钩子函数DefaultCoordinator;
    8. 启动netty Server,用于接收TM/RM的请求;

    下一篇文章我们聊一下Seata Client(AT模式下仅作为RM时)启动时都做了什么?

  • 相关阅读:
    cesium 鹰眼图1
    【IoT】如何快速了解一个行业?如何做市场洞察?
    系统韧性研究(5)| 常用的系统韧性技术
    关于小程序内嵌H5页面交互的问题?
    id 和 class的区别,使用避坑!
    IDEA集成Git环境
    缓存中的这7个坑,我把坑惨了!!!
    win10搭建ESP8266_RTOS_SDK编译环境
    数据治理-分类法
    【电子元件】常用电子元器件的识别之电阻器
  • 原文地址:https://blog.csdn.net/Saintmm/article/details/126457129