本文用到的版本
spring-cloud-stream 3.2.6
rocketmq-client 4.9.4
spring-cloud-starter-stream-rocketmq 2021.0.5.0
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
- </dependency>
这个版本不再需要引入rocketmq-spring-boot-starter这个依赖
这个版本的stream不支持@EnableBinding注解,这个版本的rocketmq不支持txProducerGroup参数。
application.yml增加如下配置
- spring:
- cloud:
- stream:
- rocketmq:
- binder:
- name-server: 127.0.0.1:9876
- bindings:
- addBounsChannel-out-0:
- producer:
- producerType: Trans
- transactionListener: addBounsStreamTransactionListener
- bindings:
- ## 新版本固定格式 channel名字-{out/in}-{index}
- addBounsChannel-out-0:
- destination: add-bouns
- group: bouns-producer-group
这里事务的配置参考官方文档:https://github.com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq-new.adoc
注意:bingdings下面的channel只能有一个in和一个out,不能配置多个in 多个out,否则会引起配置混乱。
注意:事务配置新旧版本有变化
旧版为
spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
新版为
spring.cloud.stream.rocketmq.bindings.output2.producer.producerType=Trans
如果不确定版本,可以直接查看下面这个类的属性。com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties
发送代码
- @Autowired
- private StreamBridge streamBridge;
-
- String transactionId = UUID.randomUUID().toString();
-
- streamBridge.send("addBounsChannel-out-0",
- MessageBuilder.withPayload(
- UserAddBonusMsgDTO.builder()
- .userId(share.getUserId())
- .bonus(50)
- .build()
- )
- .setHeader("TRANSACTION_ID", transactionId)
- .setHeader("share_id", id)
- .setHeader("dto", JSON.toJSONString(auditDTO))
- .build()
send的第一个参数与yml里的channel名保持一致
事务代码
- package com.itmuch.contentcenter.rocketmq;
-
- import com.alibaba.fastjson.JSON;
- import com.itmuch.contentcenter.dao.content.RocketmqTransactionLogMapper;
- import com.itmuch.contentcenter.domain.dto.content.ShareAuditDTO;
- import com.itmuch.contentcenter.domain.entity.content.RocketmqTransactionLog;
- import com.itmuch.contentcenter.service.content.ShareService;
- import org.apache.rocketmq.client.producer.LocalTransactionState;
- import org.apache.rocketmq.client.producer.TransactionListener;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
- import java.util.Map;
-
- @Component
- public class AddBounsStreamTransactionListener implements TransactionListener {
-
- @Autowired
- private ShareService shareService;
-
- @Resource
- private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
-
- @Override
- public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
- Map<String, String> headers = msg.getProperties();
- String transactionId = (String) headers.get("TRANSACTION_ID");
- Integer shareId = Integer.valueOf((String) headers.get("share_id"));
- ShareAuditDTO auditDTO = JSON.parseObject(headers.get("dto"), ShareAuditDTO.class);
- try {
- shareService.auditByIdInDBWithRocketMqLog(shareId, auditDTO, transactionId);
- return LocalTransactionState.COMMIT_MESSAGE;
- } catch (Exception e) {
- return LocalTransactionState.ROLLBACK_MESSAGE;
- }
- }
-
- @Override
- public LocalTransactionState checkLocalTransaction(MessageExt msg) {
- Map<String, String> headers = msg.getProperties();
- String transactionId = (String) headers.get("TRANSACTION_ID");
- RocketmqTransactionLog rocketmqTransactionLog = rocketmqTransactionLogMapper.selectOne(
- RocketmqTransactionLog.builder()
- .transactionId(transactionId)
- .build()
- );
- if (rocketmqTransactionLog != null) {
- return LocalTransactionState.COMMIT_MESSAGE;
- }
- return LocalTransactionState.ROLLBACK_MESSAGE;
- }
- }
这里的事务代码类似非stream方式 实现RocketMQLocalTransactionListener里的两个方法。
不同点为:
获取header,非stream方式为 调用getHeaders()方法,stream方式为调用getProperties()方法。
RocketMQHeaders.TRANSACTION_ID这个常量在stream方式里没有了,使用字符串"TRANSACTION_ID"替换就行。
application.yml
- spring:
- cloud:
- stream:
- rocketmq:
- binder:
- name-server: 127.0.0.1:9876
- bindings:
- ## 新版本固定格式 channel名字-{out/in}-{index}
- addBounsChannel-in-0:
- destination: add-bouns
- group: bouns-consumer-group
注意:bingdings下面的channel只能有一个in和一个out,不能配置多个in 多个out,否则会引起配置混乱。
消费者代码
- package com.itmuch.usercenter.rocketmq;
-
- import com.itmuch.usercenter.domain.dto.message.UserAddBonusMsgDTO;
- import com.itmuch.usercenter.service.user.UserService;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.function.Consumer;
-
- @Slf4j
- @Configuration
- public class AddBounsStreamConsumer {
-
- @Autowired
- private UserService userService;
-
- @Bean
- public Consumer
addBounsChannel() { - return message -> {
- log.info("addBounsChannel接到消息:{}", message);
- userService.addBonus(message);
- };
- }
- }
注意:@Bean注解的方法名和yml里的channel名前半段保持一致
引用的 userService.addBonus
- package com.itmuch.usercenter.service.user;
-
- import com.itmuch.usercenter.dao.user.BonusEventLogMapper;
- import com.itmuch.usercenter.dao.user.UserMapper;
- import com.itmuch.usercenter.domain.dto.message.UserAddBonusMsgDTO;
- import com.itmuch.usercenter.domain.entity.user.BonusEventLog;
- import com.itmuch.usercenter.domain.entity.user.User;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
-
- import javax.annotation.Resource;
- import java.util.Date;
-
- @Service
- @Slf4j
- public class UserService {
- @Resource
- private UserMapper userMapper;
-
- @Resource
- private BonusEventLogMapper bonusEventLogMapper;
-
-
- @Transactional(rollbackFor = Exception.class)
- public void addBonus(UserAddBonusMsgDTO message) {
- log.info("消费消息 message ={}",message);
-
- //当收到消息的时候,执行的业务
- //1.为用户加积分
- Integer userId = message.getUserId();
- User user = userMapper.selectByPrimaryKey(userId);
- Integer bonus = message.getBonus();
- user.setBouns(user.getBouns() + bonus);
- userMapper.updateByPrimaryKeySelective(user);
-
- //2.记录日志到bounus_event_log表里面
- bonusEventLogMapper.insert(
- BonusEventLog.builder()
- .userId(userId)
- .value(bonus)
- .event("CONTRIBUTE")
- .createTime(new Date())
- .description("投稿加积分..")
- .build()
- );
- log.info("积分添加完毕..");
- }
- }