您好,我是码农飞哥(wei158556),感谢您阅读本文,欢迎一键三连哦。
💪🏻 1. Python基础专栏,基础知识一网打尽,9.9元买不了吃亏,买不了上当。 Python从入门到精通
😁 2. 毕业设计专栏,毕业季咱们不慌忙,几百款毕业设计等你选。
❤️ 3. Python爬虫专栏,系统性的学习爬虫的知识点。9.9元买不了吃亏,买不了上当 。python爬虫入门进阶
❤️ 4. Ceph实战,从原理到实战应有尽有。 Ceph实战
❤️ 5. Java高并发编程入门,打卡学习Java高并发。 Java高并发编程入门
事务消息可以认为是一个两阶段的提交消息实现,以确保分布式事务的最终一致性。事务性消息确保本地事务的执行和消息的发送可以原子执行。
两阶段提交主要保证了分布式事务的原子性:即所有结点要么全做要么全不做,所谓的两个阶段是指:第一阶段:准备阶段;第二阶段:提交阶段。
事务消息有三种状态:

TransactionListener.executeTransaction() 方法执行本地事务。TransactionListener.checkLocalTransaction() 方法回查本地事务执行状态,并再次执行5,6,7三步骤,若回查次数超过15次则丢弃。使用限制:
transactionCheckMax 来更改此限制,如果一条消息的检查次数超过 transactionCheckMax 次,broker默认会丢弃这条消息,同时打印错误日志。用户可以重写 AbstractTransactionCheckListener 类来改变这种行为。transactionTimeout 确定。并且用户也可以在发送事务消息时通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,这个参数优先于 transactionMsgTimeout 参数。事务消息的消费者与普通消息的消费者基本相同,也就是说事务消息是控制生产者端和broker端。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_transaction_consumer");
consumer.setNamesrvAddr("172.31.184.89:9876");
consumer.subscribe("TransactionTopic", "*");
// 4.创建一个回调函数
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// 5.处理消息
for (MessageExt msg : msgs) {
System.out.println(msg);
System.out.println("收到的消息内容:" + new String(msg.getBody()));
}
// 返回消费成功的对象
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 6.启动消费者
consumer.start();
System.out.println("消费者已经启动");
事务消息最关键的地方是生产者本地事务的实现,生产者本地事务实现 TransactionListener 接口,并实现该接口中的executeLocalTransaction方法和checkLocalTransaction方法。
其中,executeLocalTransaction 方法的作用是执行本地事务。它在生产者每次发送half消息的时候被调用,
LocalTransactionState.COMMIT_MESSAGE状态,则此消息会被消费者消费到。LocalTransactionState.ROLLBACK_MESSAGE 状态,则此消息会被broker丢弃LocalTransactionState.UNKNOW 状态,即中间状态,则broker会调用checkLocalTransaction方法进行回查,最多回查15次。checkLocalTransaction方法的作用是检查本地事务, 它是生产者发送完所有消息的时候调用,主要是针对的是中间状态的消息进行调用。
同样的如果调用此方法返回前面提到的三种状态,broker也会做出相同的处理。
public class TransactionListenerImpl implements TransactionListener {
/**
* 执行本地事务
* 当事务half消息发送成功,这个方法将被执行
* 事务的half消息是发到 RMQ_SYS_TRANS_OP_HALF_TOPIC 的topic中
*
* @param msg 消息
* @param arg arg 自定义业务参数
* @return {@link LocalTransactionState}
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String tags = msg.getTags();
System.out.println("============执行executeLocalTransaction方法,;消息内容是="+new String(msg.getBody()));
if (StringUtils.contains(tags, "tagA")) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.contains(tags, "tagB")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
/**
* 检查本地事务
* 回查本地事务状态,当half消息没响应时调用。
*
* @param msg 消息
* @return {@link LocalTransactionState}
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String tags = msg.getTags();
System.out.println("============执行checkLocalTransaction方法,;消息内容是="+new String(msg.getBody()));
if (StringUtils.contains(tags, "tagC")) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.contains(tags, "tagD")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}
事务消息的生产者与普通消息的生产者最核心的区别是事务消息的生产者需要事务监听器,并且是调用sendMessageInTransaction 方法发送 half 消息。
//1.定义事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
//2.定义生产者
TransactionMQProducer producer = new TransactionMQProducer("transaction_produce_group");
producer.setNamesrvAddr("172.31.184.89:9876");
//3.定义线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), (runnable, executor) -> {
BlockingQueue<Runnable> queue = executor.getQueue();
try {
queue.put(runnable);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
//4.设置线程池
producer.setExecutorService(threadPoolExecutor);
//5.设置事务监听器
producer.setTransactionListener(transactionListener);
// 启动生产者
producer.start();
String[] tags = {"tagA", "tagB", "tagC", "tagD","tagE"};
//发送10条half消息,消费者是收不到half消息的
for (int i = 0; i < 10; i++) {
Message message = new Message("TransactionTopic", tags[i % tags.length],
"key" + i, ("飞哥测试事务消息" + tags[i % tags.length]+"_"+i).getBytes(StandardCharsets.UTF_8));
TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
System.out.println("本次发送的消息是=" + new String(message.getBody()));
System.out.printf("%s%n", transactionSendResult);
Thread.sleep(10);
}
System.out.println("==========所有消息发送完成======");
生产者:

从运行结果可以看出中间状态的消息最多回查15次,就像图中的消息 执行checkLocalTransaction方法,;消息内容是=飞哥测试事务消息tagE_9 broker调用checkLocalTransaction 方法回查了15次。

消费者:
我们可以看到最终消费者消费到的是消费的tags是tagA以及tagC的四条消息。

那么,为啥生产者发送的half消息,消费者不会里面收到呢?这是因为half消息会被放到 RMQ_SYS_TRANS_OP_HALF_TOPIC 的topic中,直到本地事务返回 COMMIT_MESSAGE 状态时,消费者才能消费到此消息 。