• 06 RocketMQ - 分布式事务 源码分析


    RocketMQ 分布式事务使用

    1. 定义监听器
      实现TransactionListener接口,重写 executeLocalTransaction(执行本地事务) 方法与 checkLocalTransaction(事务回查) 方法
    public class TransactionListenerImpl implements TransactionListener {
        //执行本地事务
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // 本地事务业务...
    
            //1.本地事务执行成功,生成消息索引(对消费者可见)
            //return LocalTransactionState.COMMIT_MESSAGE;
            //2.本地事务执行失败,删除消息
            //return LocalTransactionState.ROLLBACK_MESSAGE;
            //3.本地事务状态未知,待消息回查确认
            return LocalTransactionState.UNKNOW;
        }
        //事务回查  默认是60s,一分钟检查一次
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            // 回查业务....
    
            //1.本地事务执行成功,生成消息索引(对消费者可见)
            //return LocalTransactionState.COMMIT_MESSAGE;
            //2.本地事务执行失败,删除消息
            //return LocalTransactionState.ROLLBACK_MESSAGE;
            //3.本地事务状态未知,待消息回查确认
            return LocalTransactionState.UNKNOW;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    1. Producer 端发送事务消息
    public class TransactionProducer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            TransactionMQProducer producer = new TransactionMQProducer("TransactionProducer");
            producer.setNamesrvAddr("127.0.0.1:9876");
            //创建线程池
            ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            });
            //设置生产者回查线程池
            producer.setExecutorService(executorService);
            //生产者设置监听器(执行本地事务和事务回查)
            producer.setTransactionListener(new TransactionListenerImpl());
            //启动消息生产者
            producer.start();
            //半事务的发送
            try {
                Message msg =
                    new Message("TransactionTopic", null, ("A向B系统转100块钱 ").getBytes(RemotingHelper.DEFAULT_CHARSET));
    
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
                System.out.println(sendResult.getSendStatus()+"-"+df.format(new Date()));//半事务消息是否成功
            } catch (MQClientException | UnsupportedEncodingException e) {
                //todo 回滚rollback
                e.printStackTrace();
            }
            producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    RocketMQ 分布式事务方案

    在这里插入图片描述

    RocketMQ 分布式事务源码

    源码入口:

    // 半事务消息发送
    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    
    • 1
    • 2

    DefaultMQProducerImpl::sendMessageInTransaction()

    1. 校验 transactionListener
    2. 去除延迟等级
    3. 校验消息
    4. 设置事务消息的标识
    5. 同步发送半事务消息
    6. 执行本地事务
    7. 根据本地事务状态执行 Commit 或者 Rollback

    补偿流程:

    • 对没有Commit/Rollback的事务消息(pending状态的消息),定时任务从服务端发起一次回查
    • Producer收到回查消息,检查回查消息对应的本地事务的状态
    • 根据本地事务状态,重新Commit或者Rollback
    public TransactionSendResult sendMessageInTransaction(final Message msg,
            final LocalTransactionExecuter localTransactionExecuter, final Object arg)
            throws MQClientException {
            // 1.得到 transactionListener
            TransactionListener transactionListener = getCheckListener();
            if (null == localTransactionExecuter && null == transactionListener) {
                throw new MQClientException("tranExecutor is null", null);
            }
    
            // 2.去除延迟等级
            if (msg.getDelayTimeLevel() != 0) {
                MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
            }
            // 3.校验消息
            Validators.checkMessage(msg, this.defaultMQProducer);
    
            SendResult sendResult = null;
            // 4.设置事务消息的标识
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
            // 5.同步发送半事务消息
            try {
                //send(sync)
                sendResult = this.send(msg);
            } catch (Exception e) {
                throw new MQClientException("send message Exception", e);
            }
    
            LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
            Throwable localException = null;
            switch (sendResult.getSendStatus()) {
                case SEND_OK: {
                    try {
                        if (sendResult.getTransactionId() != null) {
                            msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                        }
                        String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                        if (null != transactionId && !"".equals(transactionId)) {
                            msg.setTransactionId(transactionId);
                        }
                        // 6.执行本地事务
                        if (null != localTransactionExecuter) {
                            localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                        } else if (transactionListener != null) {
                            log.debug("Used new transaction API");
                            localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                        }
                        if (null == localTransactionState) {
                            localTransactionState = LocalTransactionState.UNKNOW;
                        }
    
                        if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                            log.info("executeLocalTransactionBranch return {}", localTransactionState);
                            log.info(msg.toString());
                        }
                    } catch (Throwable e) {
                        log.info("executeLocalTransactionBranch exception", e);
                        log.info(msg.toString());
                        localException = e;
                    }
                }
                break;
                case FLUSH_DISK_TIMEOUT:
                case FLUSH_SLAVE_TIMEOUT:
                case SLAVE_NOT_AVAILABLE:
                    localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                    break;
                default:
                    break;
            }
    
            try {
                // 7.根据本地事务执行情况发送提交\回滚\未知
                this.endTransaction(sendResult, localTransactionState, localException);
            } catch (Exception e) {
                log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
            }
    
            TransactionSendResult transactionSendResult = new TransactionSendResult();
            transactionSendResult.setSendStatus(sendResult.getSendStatus());
            transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
            transactionSendResult.setMsgId(sendResult.getMsgId());
            transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
            transactionSendResult.setTransactionId(sendResult.getTransactionId());
            transactionSendResult.setLocalTransactionState(localTransactionState);
            return transactionSendResult;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87

    半事务消息发送

    入口:

    //send(sync)
    sendResult = this.send(msg);
    
    • 1
    • 2

    Producer 消息发送流程
    04 RocketMQ - Producer 源码分析


    Broker 接受到消息后流程
    03 RocketMQ - Broker 源码分析


    broker收到消息写入的请求就会进入SendMessageProcessor::processRequest(),根据前面设置的消息事务标识,会进入到 TransactionalMessageServiceImpl::asyncPrepareMessage()中进行消息的存储
    在这里插入图片描述
    往下看,会看到在 TransactionalMessageBridge::parseHalfMessageInner() 中会对事务消息进行组装,包括给定固定的topic为 RMQ_SYS_TRANS_HALF_TOPIC,队列为 0,并保存实际的 topic 和 queueId
    在这里插入图片描述


    执行本地事务

    在发送完半事务消息后,会执行本地事务,也就是前面定义的 TransactionListenerImpl::executeLocalTransaction()
    在这里插入图片描述
    后面在根据本地事务的处理情况来对消息进行处理
    在这里插入图片描述
    跟踪源码会看到会发送 功能号为 END_TRANSACTION 的消息到 broker 端来处理半事务消息

    Broker 端在启动时会注册相关处理类
    在这里插入图片描述
    查看 EndTransactionProcessor::processRequest(),会看到前面本地事务回滚时会删除半事务消息(将消息移到 RMQ_SYS_TRANS_OP_HALF_TOPIC主题队列下),而本地事务提交时会将消息写入真实的topic队列(消息此时对消费者可见)
    在这里插入图片描述

    消息回查

    在执行本地事务返回 LocalTransactionState.UNKNOW 后,会通过消息回查机制来进行闭环,
    此时由 Broker 端定时发起回查,Producer 端进行业务回查

    Broker 启动时会启动此定时任务,默认60s回查一次
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    在 TransactionalMessageServiceImpl::check() 中,会遍历 RMQ_SYS_TRANS_HALF_TOPIC 主题下的消息,发送功能号为 CHECK_TRANSACTION_STATE 的消息进行消息回查
    在这里插入图片描述
    在客户端启动时,会注册 CHECK_TRANSACTION_STATE 功能号的处理类
    在这里插入图片描述
    ClientRemotingProcessor::checkTransactionState()中,进行消息回查
    在这里插入图片描述
    进行消息回查,发送功能号为END_TRANSACTION的消息到 broker 端,更改消息(和执行本地事务流程后半段类似)

    在这里插入图片描述

  • 相关阅读:
    使用 Vue 官方脚手架初始化 Vue3 项目
    DenseNet 和 FractalNet学习笔记
    XUST——Kcsoftware Part3 题目题解
    Cache与内存映射
    快速上手Linux核心命令(八):网络相关命令
    二分图及其多个扩展用法详解及其证明 + 模板题: 关押罪犯 棋盘覆盖
    MySQL集群高可用架构之MMM
    Gradle 自动化构建开源工具
    java计算机毕业设计旅游系统源码+系统+mysql数据库+lw文档
    ElasticSearch--配置--大全/详解
  • 原文地址:https://blog.csdn.net/qq_33512765/article/details/126851920