TransactionListener接口,重写 executeLocalTransaction(执行本地事务) 方法与 checkLocalTransaction(事务回查) 方法public class TransactionListenerImpl implements TransactionListener {
//执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 本地事务业务...
//1.本地事务执行成功,生成消息索引(对消费者可见)
//return LocalTransactionState.COMMIT_MESSAGE;
//2.本地事务执行失败,删除消息
//return LocalTransactionState.ROLLBACK_MESSAGE;
//3.本地事务状态未知,待消息回查确认
return LocalTransactionState.UNKNOW;
}
//事务回查 默认是60s,一分钟检查一次
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 回查业务....
//1.本地事务执行成功,生成消息索引(对消费者可见)
//return LocalTransactionState.COMMIT_MESSAGE;
//2.本地事务执行失败,删除消息
//return LocalTransactionState.ROLLBACK_MESSAGE;
//3.本地事务状态未知,待消息回查确认
return LocalTransactionState.UNKNOW;
}
}
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
//创建线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
//设置生产者回查线程池
producer.setExecutorService(executorService);
//生产者设置监听器(执行本地事务和事务回查)
producer.setTransactionListener(new TransactionListenerImpl());
//启动消息生产者
producer.start();
//半事务的发送
try {
Message msg =
new Message("TransactionTopic", null, ("A向B系统转100块钱 ").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
System.out.println(sendResult.getSendStatus()+"-"+df.format(new Date()));//半事务消息是否成功
} catch (MQClientException | UnsupportedEncodingException e) {
//todo 回滚rollback
e.printStackTrace();
}
producer.shutdown();
}
}

源码入口:
// 半事务消息发送
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
DefaultMQProducerImpl::sendMessageInTransaction()
补偿流程:
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
// 1.得到 transactionListener
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// 2.去除延迟等级
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
// 3.校验消息
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
// 4.设置事务消息的标识
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
// 5.同步发送半事务消息
try {
//send(sync)
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
// 6.执行本地事务
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
// 7.根据本地事务执行情况发送提交\回滚\未知
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
入口:
//send(sync)
sendResult = this.send(msg);
Producer 消息发送流程
04 RocketMQ - Producer 源码分析
Broker 接受到消息后流程
03 RocketMQ - Broker 源码分析
broker收到消息写入的请求就会进入SendMessageProcessor::processRequest(),根据前面设置的消息事务标识,会进入到 TransactionalMessageServiceImpl::asyncPrepareMessage()中进行消息的存储

往下看,会看到在 TransactionalMessageBridge::parseHalfMessageInner() 中会对事务消息进行组装,包括给定固定的topic为 RMQ_SYS_TRANS_HALF_TOPIC,队列为 0,并保存实际的 topic 和 queueId

在发送完半事务消息后,会执行本地事务,也就是前面定义的 TransactionListenerImpl::executeLocalTransaction()

后面在根据本地事务的处理情况来对消息进行处理

跟踪源码会看到会发送 功能号为 END_TRANSACTION 的消息到 broker 端来处理半事务消息
Broker 端在启动时会注册相关处理类

查看 EndTransactionProcessor::processRequest(),会看到前面本地事务回滚时会删除半事务消息(将消息移到 RMQ_SYS_TRANS_OP_HALF_TOPIC主题队列下),而本地事务提交时会将消息写入真实的topic队列(消息此时对消费者可见)

在执行本地事务返回 LocalTransactionState.UNKNOW 后,会通过消息回查机制来进行闭环,
此时由 Broker 端定时发起回查,Producer 端进行业务回查
Broker 启动时会启动此定时任务,默认60s回查一次



在 TransactionalMessageServiceImpl::check() 中,会遍历 RMQ_SYS_TRANS_HALF_TOPIC 主题下的消息,发送功能号为 CHECK_TRANSACTION_STATE 的消息进行消息回查

在客户端启动时,会注册 CHECK_TRANSACTION_STATE 功能号的处理类

ClientRemotingProcessor::checkTransactionState()中,进行消息回查

进行消息回查,发送功能号为END_TRANSACTION的消息到 broker 端,更改消息(和执行本地事务流程后半段类似)
