事务消息共有三种状态,提交状态、回滚状态、中间状态:
使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在上面说的。
- /**
- * 发送事务消息
- * @throws Exception
- */
- @Test
- public void testTransactionProduce() throws Exception {
- TransactionListener transactionListener = new TransactionListenerImpl();
- TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
- ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue
(2000), new ThreadFactory() { - @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r);
- thread.setName("client-transaction-msg-check-thread");
- return thread;
- }
- });
- producer.setExecutorService(executorService);
- producer.setTransactionListener(transactionListener);
- producer.start();
- String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
- for (int i = 0; i < 10; i++) {
- try {
- Message msg =
- new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
- ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
- SendResult sendResult = producer.sendMessageInTransaction(msg, null);
- System.out.printf("%s%n", sendResult);
- Thread.sleep(10);
- } catch (MQClientException | UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- }
- for (int i = 0; i < 100000; i++) {
- Thread.sleep(1000);
- }
- producer.shutdown();
- }
当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
- class TransactionListenerImpl implements TransactionListener {
- private AtomicInteger transactionIndex = new AtomicInteger(0);
- private ConcurrentHashMap
localTrans = new ConcurrentHashMap<>(); -
- /**
- * 本地事务
- */
- @Override
- public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
- int value = transactionIndex.getAndIncrement();
- int status = value % 3;
- localTrans.put(msg.getTransactionId(), status);
- return LocalTransactionState.UNKNOW;
- }
-
- /**
- * 状态回查
- */
- @Override
- public LocalTransactionState checkLocalTransaction(MessageExt msg) {
- Integer status = localTrans.get(msg.getTransactionId());
- if (null != status) {
- switch (status) {
- case 0:
- return LocalTransactionState.UNKNOW;
- case 1:
- return LocalTransactionState.COMMIT_MESSAGE;
- case 2:
- return LocalTransactionState.ROLLBACK_MESSAGE;
- }
- }
- return LocalTransactionState.COMMIT_MESSAGE;
- }
- }
transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。transactionTimeout 参数。