• SpringCloudStream+Rocket事务消息配置


    本文用到的版本

    spring-cloud-stream 3.2.6

    rocketmq-client 4.9.4

    spring-cloud-starter-stream-rocketmq 2021.0.5.0

    一、依赖导入

    1. <dependency>
    2. <groupId>com.alibaba.cloud</groupId>
    3. <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    4. </dependency>

    这个版本不再需要引入rocketmq-spring-boot-starter这个依赖

    这个版本的stream不支持@EnableBinding注解,这个版本的rocketmq不支持txProducerGroup参数。

    二、编写生产者

    1.写配置

    application.yml增加如下配置

    1. spring:
    2. cloud:
    3. stream:
    4. rocketmq:
    5. binder:
    6. name-server: 127.0.0.1:9876
    7. bindings:
    8. addBounsChannel-out-0:
    9. producer:
    10. producerType: Trans
    11. transactionListener: addBounsStreamTransactionListener
    12. bindings:
    13. ## 新版本固定格式 channel名字-{out/in}-{index}
    14. addBounsChannel-out-0:
    15. destination: add-bouns
    16. group: bouns-producer-group

    这里事务的配置参考官方文档:https://github.com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq-new.adoc

    注意:bingdings下面的channel只能有一个in和一个out,不能配置多个in 多个out,否则会引起配置混乱。

    注意:事务配置新旧版本有变化

    旧版为

    spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true

    新版为

    spring.cloud.stream.rocketmq.bindings.output2.producer.producerType=Trans

    如果不确定版本,可以直接查看下面这个类的属性。com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties

    2.写代码

    发送代码

    1. @Autowired
    2. private StreamBridge streamBridge;
    3. String transactionId = UUID.randomUUID().toString();
    4. streamBridge.send("addBounsChannel-out-0",
    5. MessageBuilder.withPayload(
    6. UserAddBonusMsgDTO.builder()
    7. .userId(share.getUserId())
    8. .bonus(50)
    9. .build()
    10. )
    11. .setHeader("TRANSACTION_ID", transactionId)
    12. .setHeader("share_id", id)
    13. .setHeader("dto", JSON.toJSONString(auditDTO))
    14. .build()

    send的第一个参数与yml里的channel名保持一致

    事务代码

    1. package com.itmuch.contentcenter.rocketmq;
    2. import com.alibaba.fastjson.JSON;
    3. import com.itmuch.contentcenter.dao.content.RocketmqTransactionLogMapper;
    4. import com.itmuch.contentcenter.domain.dto.content.ShareAuditDTO;
    5. import com.itmuch.contentcenter.domain.entity.content.RocketmqTransactionLog;
    6. import com.itmuch.contentcenter.service.content.ShareService;
    7. import org.apache.rocketmq.client.producer.LocalTransactionState;
    8. import org.apache.rocketmq.client.producer.TransactionListener;
    9. import org.apache.rocketmq.common.message.Message;
    10. import org.apache.rocketmq.common.message.MessageExt;
    11. import org.springframework.beans.factory.annotation.Autowired;
    12. import org.springframework.stereotype.Component;
    13. import javax.annotation.Resource;
    14. import java.util.Map;
    15. @Component
    16. public class AddBounsStreamTransactionListener implements TransactionListener {
    17. @Autowired
    18. private ShareService shareService;
    19. @Resource
    20. private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
    21. @Override
    22. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    23. Map<String, String> headers = msg.getProperties();
    24. String transactionId = (String) headers.get("TRANSACTION_ID");
    25. Integer shareId = Integer.valueOf((String) headers.get("share_id"));
    26. ShareAuditDTO auditDTO = JSON.parseObject(headers.get("dto"), ShareAuditDTO.class);
    27. try {
    28. shareService.auditByIdInDBWithRocketMqLog(shareId, auditDTO, transactionId);
    29. return LocalTransactionState.COMMIT_MESSAGE;
    30. } catch (Exception e) {
    31. return LocalTransactionState.ROLLBACK_MESSAGE;
    32. }
    33. }
    34. @Override
    35. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    36. Map<String, String> headers = msg.getProperties();
    37. String transactionId = (String) headers.get("TRANSACTION_ID");
    38. RocketmqTransactionLog rocketmqTransactionLog = rocketmqTransactionLogMapper.selectOne(
    39. RocketmqTransactionLog.builder()
    40. .transactionId(transactionId)
    41. .build()
    42. );
    43. if (rocketmqTransactionLog != null) {
    44. return LocalTransactionState.COMMIT_MESSAGE;
    45. }
    46. return LocalTransactionState.ROLLBACK_MESSAGE;
    47. }
    48. }

    这里的事务代码类似非stream方式 实现RocketMQLocalTransactionListener里的两个方法。

    不同点为:

    获取header,非stream方式为 调用getHeaders()方法,stream方式为调用getProperties()方法。

    RocketMQHeaders.TRANSACTION_ID这个常量在stream方式里没有了,使用字符串"TRANSACTION_ID"替换就行。

    三、编写消费者

    1.写配置

    application.yml

    1. spring:
    2. cloud:
    3. stream:
    4. rocketmq:
    5. binder:
    6. name-server: 127.0.0.1:9876
    7. bindings:
    8. ## 新版本固定格式 channel名字-{out/in}-{index}
    9. addBounsChannel-in-0:
    10. destination: add-bouns
    11. group: bouns-consumer-group

    注意:bingdings下面的channel只能有一个in和一个out,不能配置多个in 多个out,否则会引起配置混乱。

    2.写代码

    消费者代码

    1. package com.itmuch.usercenter.rocketmq;
    2. import com.itmuch.usercenter.domain.dto.message.UserAddBonusMsgDTO;
    3. import com.itmuch.usercenter.service.user.UserService;
    4. import lombok.extern.slf4j.Slf4j;
    5. import org.springframework.beans.factory.annotation.Autowired;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. import java.util.function.Consumer;
    9. @Slf4j
    10. @Configuration
    11. public class AddBounsStreamConsumer {
    12. @Autowired
    13. private UserService userService;
    14. @Bean
    15. public Consumer addBounsChannel() {
    16. return message -> {
    17. log.info("addBounsChannel接到消息:{}", message);
    18. userService.addBonus(message);
    19. };
    20. }
    21. }

    注意:@Bean注解的方法名和yml里的channel名前半段保持一致

    引用的 userService.addBonus

    1. package com.itmuch.usercenter.service.user;
    2. import com.itmuch.usercenter.dao.user.BonusEventLogMapper;
    3. import com.itmuch.usercenter.dao.user.UserMapper;
    4. import com.itmuch.usercenter.domain.dto.message.UserAddBonusMsgDTO;
    5. import com.itmuch.usercenter.domain.entity.user.BonusEventLog;
    6. import com.itmuch.usercenter.domain.entity.user.User;
    7. import lombok.extern.slf4j.Slf4j;
    8. import org.springframework.stereotype.Service;
    9. import org.springframework.transaction.annotation.Transactional;
    10. import javax.annotation.Resource;
    11. import java.util.Date;
    12. @Service
    13. @Slf4j
    14. public class UserService {
    15. @Resource
    16. private UserMapper userMapper;
    17. @Resource
    18. private BonusEventLogMapper bonusEventLogMapper;
    19. @Transactional(rollbackFor = Exception.class)
    20. public void addBonus(UserAddBonusMsgDTO message) {
    21. log.info("消费消息 message ={}",message);
    22. //当收到消息的时候,执行的业务
    23. //1.为用户加积分
    24. Integer userId = message.getUserId();
    25. User user = userMapper.selectByPrimaryKey(userId);
    26. Integer bonus = message.getBonus();
    27. user.setBouns(user.getBouns() + bonus);
    28. userMapper.updateByPrimaryKeySelective(user);
    29. //2.记录日志到bounus_event_log表里面
    30. bonusEventLogMapper.insert(
    31. BonusEventLog.builder()
    32. .userId(userId)
    33. .value(bonus)
    34. .event("CONTRIBUTE")
    35. .createTime(new Date())
    36. .description("投稿加积分..")
    37. .build()
    38. );
    39. log.info("积分添加完毕..");
    40. }
    41. }

  • 相关阅读:
    java计算机毕业设计家电仓储管理系统MyBatis+系统+LW文档+源码+调试部署
    0501 货仓选址 【中位数 距离和的最小值】
    高效数据管理:Java助力实现Excel数据验证
    智安网络|揭开云服务的神秘面纱:其含义和功能的综合指南
    MySQL 子查询使用方式
    【会员管理系统】篇二之项目搭建、初始化、安装第三方库
    极简二叉树
    手工计算深度学习模型是如何更新参数的
    1055 The World‘s Richest
    数据挖掘一般框架
  • 原文地址:https://blog.csdn.net/gsls200808/article/details/133278138