• RocketMQ实战之Producer


    环境以及版本

    1. 双主双从,版本4.3.1 在这里插入图片描述

    概念

    1. 架构图在这里插入图片描述

    2. 名词解释

      • Producer(消息生产者):一个应用只应该创建一个Producer(默认使用DefaultMQProducer),由应用来维护此对象,可以设置为全局对象或者单例
      • ProducerGroup:一类Producer的集合名称.这类Producer通常发送一类消息,且发送逻辑一致。应用自己保证ProducerGroup名字唯一,ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时比较关键,因为服务器会回查这个Group下的任意一个Producer
      • Consumer:消息消费者
      • Consumer Group:一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致
      • Broker:消息中转角色,负责存储消息,转发消息,就是MQ服务器
      • Topic:主题,不同类型的消息使用不同的topic区分
      • Message Queue:消息队列,一个topic可以设置一个或者多个Message Queue
      • Offset:消息在Message Queue中存储,都有一个偏移量Offset,通过此偏移量能访问消息。

    发送消息案例

    1. 单个消息同步发送消息

      @Test
      public void testSync() {
          String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9876";
          String producerGroup = "ProducerGroupName";
          final DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);
          try {
              defaultMQProducer.setInstanceName("producer");
              //发送超时时间,也可以在send方法设置不同消息的超时时间
              defaultMQProducer.setSendMsgTimeout(20000);
              defaultMQProducer.setVipChannelEnabled(false);
              defaultMQProducer.setNamesrvAddr(namesrvAddr);
              //发送失败重试3次,默认2次
              defaultMQProducer.setRetryTimesWhenSendFailed(3);
              //Producer对象在使用之前必须要调用start初始化,初始化一次即可
              defaultMQProducer.start();
              
              String topic = "testTopic";
              String tag = "TagA";
              String keys = "keys";
              for (int i = 0; i < 100; i++) {
                  String msg = "hello world " + i;
                  Message message = new Message(topic, tag, keys, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
                  SendResult sendResult = defaultMQProducer.send(message);
                  logger.info("第{}条消息:{}返回状态{}", i, sendResult.getSendStatus());
              }
          } catch (Exception e) {
              logger.error(e.getMessage(), e);
          } finally {
              defaultMQProducer.shutdown();
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
    2. 发送代码解析

      • instanceName默认DEFAULT,当一个JVM需要启动多个Producer的时候,通过设置不同的instanceName来区分
      • KEYS:每个消息在业务层面的唯一标识码,要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic,key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。
      • TAG:对于同一类创建一个Topic,但是消费此Topic的各个系统可能关心的数据不一样。消费者可订阅相同的Topic,通过TAG筛选自己的消息
      • SendStatus发送返回的状态,不同状态在不同的刷盘策略和同步策略的配置下含义不相同
        • SEND_OK:发送成功(消息是否存盘,是否同步到slave,在slave上是否写入磁盘)
        • FLUSH_DISK_TIMEOUT:在规定的时间内没有完成刷盘(需要broker的刷盘策略被设置成SYNC_FLUSH才会返回这个状态)
        • FLUSH_SLAVE_TIMEOUT:主从方式下,并且Broker被设置成SYNC_MASTER方式,没有在设定时间内完成主从同步,会返回该状态
        • SLAVE_NOT_AVAILABLE:主从模式下,并且Broker被设置成SYNC_MASTER方式,但是没有找到被配置成slave的Broker
    3. 优雅退出:应用退出时,要调用shutdown来清理资源,关闭网络连接,从RocketMQ服务器上注销自己。如果是web应用,建议应用在Tomcat等容器的退出钩子里调用shutdown方法

      Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
          public void run() {
              defaultMQProducer.shutdown();
          }
      }));
      System.exit(0);
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    4. 批量发送:将多条消息打包发送到服务端,减少网络调用次数,提高传输效率

      @Test
      public void testBatchSync() {
          String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9876";
          String producerGroup = "ProducerGroupName";
          final DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);
          try {
              defaultMQProducer.setInstanceName("producer");
              //发送超时时间,也可以在send方法设置不同消息的超时时间
              defaultMQProducer.setSendMsgTimeout(20000);
              defaultMQProducer.setVipChannelEnabled(false);
              defaultMQProducer.setNamesrvAddr(namesrvAddr);
              //发送失败重试3次,默认2次
              defaultMQProducer.setRetryTimesWhenSendFailed(3);
              //Producer对象在使用之前必须要调用start初始化,初始化一次即可
              defaultMQProducer.start();
      
              String topic = "testTopic";
              String tag = "TagA";
              String keys = "keys";
              List<Message> batchList = new ArrayList<>();
              for (int i = 0; i < 100; i++) {
                  String msg = "hello world " + i;
                  Message message = new Message(topic, tag, keys, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
                  batchList.add(message);
              }
              //批量发送
              SendResult sendResult = defaultMQProducer.send(batchList);
              logger.info("批量发送消息:返回状态{}", sendResult.getSendStatus());
          } catch (Exception e) {
              logger.error(e.getMessage(), e);
          } finally {
              defaultMQProducer.shutdown();
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
    5. 异步发送

      /**
       * 异步发送
       */
      @Test
      public void testAsync() {
          String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9876";
          String producerGroup = "ProducerGroupName";
          final DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);
          try {
              defaultMQProducer.setSendMsgTimeout(20000);
              defaultMQProducer.setVipChannelEnabled(false);
              defaultMQProducer.setNamesrvAddr(namesrvAddr);
              defaultMQProducer.setRetryTimesWhenSendFailed(3);
              defaultMQProducer.start();
      
              String topic = "testTopic";
              String tag = "TagA";
              String keys = "keys";
              CountDownLatch countDownLatch = new CountDownLatch(100);
              for (int i = 100; i < 200; i++) {
                  String msg = "hello world " + i;
                  Message message = new Message(topic, tag, keys, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
                  final int k = i;
                  defaultMQProducer.send(message, new SendCallback() {
                      @Override
                      public void onSuccess(SendResult sendResult) {
                          countDownLatch.countDown();
                          logger.info("第{}条消息:返回状态{}", k, sendResult.getSendStatus());
                      }
      
                      @Override
                      public void onException(Throwable e) {
                          countDownLatch.countDown();
                          logger.error(e.getMessage(), e);
                      }
                  });
              }
              countDownLatch.await();
          } catch (Exception e) {
              logger.error(e.getMessage(), e);
          } finally {
              defaultMQProducer.shutdown();
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44

    延迟消息

    1. RocketMQ支持发送延迟消息,支持特定的level,例如1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。但是不支持任意时间精度的延迟,主要原因是如果要支持任意的时间精度,在Broker层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。

    2. Broker配置messageDelayLeve:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 描述了各级别与延时时间的对应映射关系。

      • 这个配置项配置了从1级开始,各级延时的时间,可以修改这个指定级别的延时时间;
      • 时间单位支持:s、m、h、d,分别表示秒、分、时、天;
      • 默认值就是下面声明的,可手工调整; 默认值已够用,不建议修改这个值。
    3. 案例:只需要设置消息的setDelayTimeLevel

      /**
       * 延迟消息
       */
      @Test
      public void testDelayMessage() {
          String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9877";
          String producerGroup = "ProducerGroupName";
          final DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);
          try {
              defaultMQProducer.setInstanceName("producer");
              //发送超时时间,也可以在send方法设置不同消息的超时时间
              defaultMQProducer.setSendMsgTimeout(20000);
              defaultMQProducer.setVipChannelEnabled(false);
              defaultMQProducer.setNamesrvAddr(namesrvAddr);
              //发送失败重试3次,默认2次
              defaultMQProducer.setRetryTimesWhenSendFailed(3);
              //Producer对象在使用之前必须要调用start初始化,初始化一次即可
              defaultMQProducer.start();
      
              String topic = "testTopic";
              String tag = "TagA";
              String keys = "keys";
              for (int i = 0; i < 100; i++) {
                  String msg = "hello world " + i;
                  Message message = new Message(topic, tag, keys, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
                  //messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
                  //这里就表示10s
                  message.setDelayTimeLevel(3);
                  SendResult sendResult = defaultMQProducer.send(message);
                  logger.info("第{}条消息:返回状态{}", i, sendResult.getSendStatus());
              }
          } catch (Exception e) {
              logger.error(e.getMessage(), e);
          } finally {
              defaultMQProducer.shutdown();
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37

    消息路由规则

    1. 一个Topic会有多个Message Queue(类似于kafka一个Topic有多个Partition),如果使用默认规则,会轮流向各个Message Queue发送消息

    2. 如果需要把消息发送到相同的Message Queue,可以实现MessageQueueSelector接口

      public interface MessageQueueSelector {
      	MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
      }
      
      // 使用以下方法发送消息,根据传入的Object arg或者根据消息内容把消息发送到指定的MessageQueue中
      public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    3. 当需要顺序消息时,比如相同的订单不同的状态,此时需要将相同的订单发送到同一个消息队列中。

      @Test
      public void testSelectorOrder() {
          String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9876";
          String producerGroup = "ProducerGroupName";
          final DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);
          try {
              defaultMQProducer.setSendMsgTimeout(20000);
              defaultMQProducer.setVipChannelEnabled(false);
              defaultMQProducer.setNamesrvAddr(namesrvAddr);
              defaultMQProducer.setRetryTimesWhenSendFailed(3);
              defaultMQProducer.start();
      
              String topic = "testTopic";
              String tag = "TagA";
              String keys = "keys";
              for (int i = 200; i < 300; i++) {
                  String msg = "hello world " + i;
                  Message message = new Message(topic, tag, keys, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
      
                  SendResult sendResult = defaultMQProducer.send(message, new MessageQueueSelector() {
                      @Override
                      public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                          //获取总共的队列数
                          int size = mqs.size();
                          int curIndex = Integer.parseInt(arg.toString());
                          int index = (curIndex / 10) % size;
                          MessageQueue messageQueue = mqs.get(index);
                          //225,index:22,queueId:6,size:32
                          logger.info("{},index:{},queueId:{},size:{}", curIndex, index, messageQueue.getQueueId(), size);
                          return messageQueue;
                      }
                  }, i);
                  logger.info("第{}条消息:返回状态{}", i, sendResult.getSendStatus());
              }
          } catch (Exception e) {
              logger.error(e.getMessage(), e);
          } finally {
              defaultMQProducer.shutdown();
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40

    消息过滤

    1. 在发送消息时,可以为每一条消息设置一个TAG标签。比如一个主题下有多种不同类型的数据,不同的消费者订阅相同Topic下不同类型的数据。消费者订阅相同的主题不同的TAG,多个TAG使用"|"分隔,同一个消费组订阅的topic,TAG必须相同

      1. 发送
      for (int i = 0; i < 100; i++) {
          String msg = "hello world " + i;
          Message message = null;
          if (i % 2 == 0) {
              message = new Message("filterTopic", //topic
                      "EVEN", // tag
                      "keys", //key
                      msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
      
          } else {
              message = new Message("filterTopic", //topic
                      "ODD", // tag
                      "keys", //key
                      msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
          }
          SendResult sendResult = defaultMQProducer.send(message);
          producerLogger.info("第{}条消息:{}发送成功", i, sendResult);
      }   
      
      2. 订阅
      consumer.subscribe("filterTopic", "ODD");
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
    2. SQL模式过滤

      1. 发送之前设置消息属性
      message.putUserProperty("num", String.valueOf(i));
      
      2. 订阅,通过消息属性过滤
      consumer.subscribe("filterTopic", MessageSelector.bySql("num >=20 and num <= 50"));
      
      3. 如果报错
      The broker does not support consumer to filter message by SQL92
      需要在broker配置
      enablePropertyFilter=true
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
    3. 类过滤模式:自定义消息过滤类需要实现接口MessageFilter。使用类模式的前提是启动FilterServer。

      public class MessageFilterImpl implements MessageFilter {
          @Override
          public boolean match(MessageExt msg, FilterContext context) {
              String property = msg.getProperty("num");
              if (property != null) {
                  int num = Integer.parseInt(property);
                  if (num > 100) {
                      return true;
                  }
              }
              return false;
          }
      }
      consumer.subscribe("filterTopic","xxxx.filter.MessageFilterImpl",filterCode)
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14

    事务消息

    1. 事务消息的限制

      • 事务消息不支持延时消息和批量消息。
      • 为了避免单个消息被检查太多次而导致半队列消息累积,默认将单个消息的检查次数限制为 15 次,用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为
      • 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
      • 事务性消息可能不止一次被检查或消费。
      • 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
    2. 编写回调实现类

      public class TransactionListenerImpl implements TransactionListener {
          private final static Logger logger = LoggerFactory.getLogger("rocketmq-producer");
      
          private AtomicInteger DB_ID = new AtomicInteger(0);
      
          private ConcurrentHashMap<Integer, String> localDB = new ConcurrentHashMap<>();
      
          /**
           * 当发送事务prepare消息成功时,将调用该方法执行本地事务。
           */
          @Override
          public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
              String messageStr = JSON.parseObject(msg.getBody(), String.class);
              logger.info("事务ID:{},消息数据:{}", msg.getTransactionId(), msg.getBody());
              try {
                  //模拟业务逻辑(向数据库添加一条数据)
                  localDB.put(DB_ID.incrementAndGet(), messageStr);
                  logger.info("事务ID:{},执行业务逻辑:{}", msg.getTransactionId(), messageStr);
              } catch (Exception e) {
                  logger.error("业务处理异常,回滚事务");
                  return LocalTransactionState.ROLLBACK_MESSAGE;
              }
              return LocalTransactionState.UNKNOW;
          }
      
          @Override
          public LocalTransactionState checkLocalTransaction(MessageExt msg) {
              logger.info("消息回查,事务ID:{},消息数据:{}", msg.getTransactionId(), msg.getBody());
              String messageStr = JSON.parseObject(msg.getBody(), String.class);
              int i = Integer.parseInt(messageStr);
              //模拟回查状态
              if (i >= 0 && i < 3) {
                  //继续回查,直到回滚或者提交
                  return LocalTransactionState.UNKNOW;
              } else if (i >= 3 && i < 6) {
                  //回滚
                  return LocalTransactionState.ROLLBACK_MESSAGE;
              } else {
                  //提交事务
                  return LocalTransactionState.COMMIT_MESSAGE;
              }
      
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
    3. 发送

      public class TransactionProducerTest {
      
          private final static Logger logger = LoggerFactory.getLogger("rocketmq-producer");
      
          @Test
          public void testTransaction() {
              String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9876";
              String producerGroup = "ProducerTransactionGroupName";
      
      
              TransactionListener transactionListener = new TransactionListenerImpl();
              TransactionMQProducer producer = new TransactionMQProducer(producerGroup);
              producer.setNamesrvAddr(namesrvAddr);
              try {
                  ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                      @Override
                      public Thread newThread(Runnable r) {
                          Thread thread = new Thread(r);
                          thread.setName("client-transaction-msg-check-thread");
                          return thread;
                      }
                  });
      
                  producer.setExecutorService(executorService);
                  producer.setTransactionListener(transactionListener);
                  producer.start();
      
                  String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
                  for (int i = 0; i < 10; i++) {
                      String msgStr = (i + "");
                      Message msg =
                              new Message("transactionTopic",
                                      tags[i % tags.length],
                                      "KEY" + i,
                                      msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
                      SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                      logger.info("{}", sendResult);
      
                  }
                  LockSupport.park();
              } catch (Exception e) {
                  logger.error(e.getMessage(), e);
              } finally {
                  producer.shutdown();
              }
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
  • 相关阅读:
    RabbitMQ课件(尚)
    Redis 集群偶数节点跨地域部署之高可用测试
    浅谈游戏安全 (一)
    C51 存储类型与存储模式
    最新Workerman 在线客服系统源码/附搭建教程-ThinkPHP网站在线客服系统源码
    RAD Studio 11.2详解其务实改进(Delphi & C++ Builder)-Alexandria
    Python的requests库:解决文档缺失问题的策略与实践
    利用PHP开发具有注册、登陆、文件上传、发布动态功能的网站
    【Mybatis实战】02——XML方式的基本用法
    Baklib|如何才能做好企业内部知识管理?
  • 原文地址:https://blog.csdn.net/usagoole/article/details/126355042