• RocketMQ(3)之事务消息


    一、发送事务消息案例

            事务消息共有三种状态,提交状态、回滚状态、中间状态: 

    • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
    • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
    • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

            1.1创建事务性生产者

            使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在上面说的。

    1. /**
    2. * 发送事务消息
    3. * @throws Exception
    4. */
    5. @Test
    6. public void testTransactionProduce() throws Exception {
    7. TransactionListener transactionListener = new TransactionListenerImpl();
    8. TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
    9. ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() {
    10. @Override
    11. public Thread newThread(Runnable r) {
    12. Thread thread = new Thread(r);
    13. thread.setName("client-transaction-msg-check-thread");
    14. return thread;
    15. }
    16. });
    17. producer.setExecutorService(executorService);
    18. producer.setTransactionListener(transactionListener);
    19. producer.start();
    20. String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
    21. for (int i = 0; i < 10; i++) {
    22. try {
    23. Message msg =
    24. new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
    25. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    26. SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    27. System.out.printf("%s%n", sendResult);
    28. Thread.sleep(10);
    29. } catch (MQClientException | UnsupportedEncodingException e) {
    30. e.printStackTrace();
    31. }
    32. }
    33. for (int i = 0; i < 100000; i++) {
    34. Thread.sleep(1000);
    35. }
    36. producer.shutdown();
    37. }

    2.事务监听接口

            当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。

    1. class TransactionListenerImpl implements TransactionListener {
    2. private AtomicInteger transactionIndex = new AtomicInteger(0);
    3. private ConcurrentHashMap localTrans = new ConcurrentHashMap<>();
    4. /**
    5. * 本地事务
    6. */
    7. @Override
    8. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    9. int value = transactionIndex.getAndIncrement();
    10. int status = value % 3;
    11. localTrans.put(msg.getTransactionId(), status);
    12. return LocalTransactionState.UNKNOW;
    13. }
    14. /**
    15. * 状态回查
    16. */
    17. @Override
    18. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    19. Integer status = localTrans.get(msg.getTransactionId());
    20. if (null != status) {
    21. switch (status) {
    22. case 0:
    23. return LocalTransactionState.UNKNOW;
    24. case 1:
    25. return LocalTransactionState.COMMIT_MESSAGE;
    26. case 2:
    27. return LocalTransactionState.ROLLBACK_MESSAGE;
    28. }
    29. }
    30. return LocalTransactionState.COMMIT_MESSAGE;
    31. }
    32. }

    1.3事务消息使用上的限制

    1. 事务消息不支持延时消息和批量消息。
    2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
    3. 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
    4. 事务性消息可能不止一次被检查或消费
    5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
    6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
  • 相关阅读:
    react-state hook
    高效搜索,提升编程效率
    面试题-消息中间件篇-主流的消息中间件
    端到端测试(End-to-end tests)重试策略
    (完美方案)解决mfc140u.dll文件丢失问题,快速且有效的修复
    新股发行基本流程及网下投资者参与过程介绍
    微信小程序云开发学习笔记No.01
    动漫新闻查询易语言代码
    Redis哨兵和集群模式
    Java泛型
  • 原文地址:https://blog.csdn.net/jokeMqc/article/details/132738254