
架构图
名词解释
Producer(消息生产者):一个应用只应该创建一个Producer(默认使用DefaultMQProducer),由应用来维护此对象,可以设置为全局对象或者单例ProducerGroup:一类Producer的集合名称.这类Producer通常发送一类消息,且发送逻辑一致。应用自己保证ProducerGroup名字唯一,ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时比较关键,因为服务器会回查这个Group下的任意一个ProducerConsumer:消息消费者Consumer Group:一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致Broker:消息中转角色,负责存储消息,转发消息,就是MQ服务器Topic:主题,不同类型的消息使用不同的topic区分Message Queue:消息队列,一个topic可以设置一个或者多个Message QueueOffset:消息在Message Queue中存储,都有一个偏移量Offset,通过此偏移量能访问消息。单个消息同步发送消息
@Test
public void testSync() {
String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9876";
String producerGroup = "ProducerGroupName";
final DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);
try {
defaultMQProducer.setInstanceName("producer");
//发送超时时间,也可以在send方法设置不同消息的超时时间
defaultMQProducer.setSendMsgTimeout(20000);
defaultMQProducer.setVipChannelEnabled(false);
defaultMQProducer.setNamesrvAddr(namesrvAddr);
//发送失败重试3次,默认2次
defaultMQProducer.setRetryTimesWhenSendFailed(3);
//Producer对象在使用之前必须要调用start初始化,初始化一次即可
defaultMQProducer.start();
String topic = "testTopic";
String tag = "TagA";
String keys = "keys";
for (int i = 0; i < 100; i++) {
String msg = "hello world " + i;
Message message = new Message(topic, tag, keys, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = defaultMQProducer.send(message);
logger.info("第{}条消息:{}返回状态{}", i, sendResult.getSendStatus());
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
defaultMQProducer.shutdown();
}
}
发送代码解析
instanceName默认DEFAULT,当一个JVM需要启动多个Producer的时候,通过设置不同的instanceName来区分KEYS:每个消息在业务层面的唯一标识码,要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic,key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。TAG:对于同一类创建一个Topic,但是消费此Topic的各个系统可能关心的数据不一样。消费者可订阅相同的Topic,通过TAG筛选自己的消息SendStatus发送返回的状态,不同状态在不同的刷盘策略和同步策略的配置下含义不相同
SEND_OK:发送成功(消息是否存盘,是否同步到slave,在slave上是否写入磁盘)FLUSH_DISK_TIMEOUT:在规定的时间内没有完成刷盘(需要broker的刷盘策略被设置成SYNC_FLUSH才会返回这个状态)FLUSH_SLAVE_TIMEOUT:主从方式下,并且Broker被设置成SYNC_MASTER方式,没有在设定时间内完成主从同步,会返回该状态SLAVE_NOT_AVAILABLE:主从模式下,并且Broker被设置成SYNC_MASTER方式,但是没有找到被配置成slave的Broker优雅退出:应用退出时,要调用shutdown来清理资源,关闭网络连接,从RocketMQ服务器上注销自己。如果是web应用,建议应用在Tomcat等容器的退出钩子里调用shutdown方法
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
defaultMQProducer.shutdown();
}
}));
System.exit(0);
批量发送:将多条消息打包发送到服务端,减少网络调用次数,提高传输效率
@Test
public void testBatchSync() {
String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9876";
String producerGroup = "ProducerGroupName";
final DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);
try {
defaultMQProducer.setInstanceName("producer");
//发送超时时间,也可以在send方法设置不同消息的超时时间
defaultMQProducer.setSendMsgTimeout(20000);
defaultMQProducer.setVipChannelEnabled(false);
defaultMQProducer.setNamesrvAddr(namesrvAddr);
//发送失败重试3次,默认2次
defaultMQProducer.setRetryTimesWhenSendFailed(3);
//Producer对象在使用之前必须要调用start初始化,初始化一次即可
defaultMQProducer.start();
String topic = "testTopic";
String tag = "TagA";
String keys = "keys";
List<Message> batchList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
String msg = "hello world " + i;
Message message = new Message(topic, tag, keys, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
batchList.add(message);
}
//批量发送
SendResult sendResult = defaultMQProducer.send(batchList);
logger.info("批量发送消息:返回状态{}", sendResult.getSendStatus());
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
defaultMQProducer.shutdown();
}
}
异步发送
/**
* 异步发送
*/
@Test
public void testAsync() {
String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9876";
String producerGroup = "ProducerGroupName";
final DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);
try {
defaultMQProducer.setSendMsgTimeout(20000);
defaultMQProducer.setVipChannelEnabled(false);
defaultMQProducer.setNamesrvAddr(namesrvAddr);
defaultMQProducer.setRetryTimesWhenSendFailed(3);
defaultMQProducer.start();
String topic = "testTopic";
String tag = "TagA";
String keys = "keys";
CountDownLatch countDownLatch = new CountDownLatch(100);
for (int i = 100; i < 200; i++) {
String msg = "hello world " + i;
Message message = new Message(topic, tag, keys, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
final int k = i;
defaultMQProducer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
logger.info("第{}条消息:返回状态{}", k, sendResult.getSendStatus());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
logger.error(e.getMessage(), e);
}
});
}
countDownLatch.await();
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
defaultMQProducer.shutdown();
}
}
RocketMQ支持发送延迟消息,支持特定的level,例如1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。但是不支持任意时间精度的延迟,主要原因是如果要支持任意的时间精度,在Broker层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。
Broker配置messageDelayLeve:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 描述了各级别与延时时间的对应映射关系。
案例:只需要设置消息的setDelayTimeLevel
/**
* 延迟消息
*/
@Test
public void testDelayMessage() {
String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9877";
String producerGroup = "ProducerGroupName";
final DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);
try {
defaultMQProducer.setInstanceName("producer");
//发送超时时间,也可以在send方法设置不同消息的超时时间
defaultMQProducer.setSendMsgTimeout(20000);
defaultMQProducer.setVipChannelEnabled(false);
defaultMQProducer.setNamesrvAddr(namesrvAddr);
//发送失败重试3次,默认2次
defaultMQProducer.setRetryTimesWhenSendFailed(3);
//Producer对象在使用之前必须要调用start初始化,初始化一次即可
defaultMQProducer.start();
String topic = "testTopic";
String tag = "TagA";
String keys = "keys";
for (int i = 0; i < 100; i++) {
String msg = "hello world " + i;
Message message = new Message(topic, tag, keys, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
//messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
//这里就表示10s
message.setDelayTimeLevel(3);
SendResult sendResult = defaultMQProducer.send(message);
logger.info("第{}条消息:返回状态{}", i, sendResult.getSendStatus());
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
defaultMQProducer.shutdown();
}
}
一个Topic会有多个Message Queue(类似于kafka一个Topic有多个Partition),如果使用默认规则,会轮流向各个Message Queue发送消息
如果需要把消息发送到相同的Message Queue,可以实现MessageQueueSelector接口
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
// 使用以下方法发送消息,根据传入的Object arg或者根据消息内容把消息发送到指定的MessageQueue中
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
当需要顺序消息时,比如相同的订单不同的状态,此时需要将相同的订单发送到同一个消息队列中。
@Test
public void testSelectorOrder() {
String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9876";
String producerGroup = "ProducerGroupName";
final DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);
try {
defaultMQProducer.setSendMsgTimeout(20000);
defaultMQProducer.setVipChannelEnabled(false);
defaultMQProducer.setNamesrvAddr(namesrvAddr);
defaultMQProducer.setRetryTimesWhenSendFailed(3);
defaultMQProducer.start();
String topic = "testTopic";
String tag = "TagA";
String keys = "keys";
for (int i = 200; i < 300; i++) {
String msg = "hello world " + i;
Message message = new Message(topic, tag, keys, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = defaultMQProducer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//获取总共的队列数
int size = mqs.size();
int curIndex = Integer.parseInt(arg.toString());
int index = (curIndex / 10) % size;
MessageQueue messageQueue = mqs.get(index);
//225,index:22,queueId:6,size:32
logger.info("{},index:{},queueId:{},size:{}", curIndex, index, messageQueue.getQueueId(), size);
return messageQueue;
}
}, i);
logger.info("第{}条消息:返回状态{}", i, sendResult.getSendStatus());
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
defaultMQProducer.shutdown();
}
}
在发送消息时,可以为每一条消息设置一个TAG标签。比如一个主题下有多种不同类型的数据,不同的消费者订阅相同Topic下不同类型的数据。消费者订阅相同的主题不同的TAG,多个TAG使用"|"分隔,同一个消费组订阅的topic,TAG必须相同
1. 发送
for (int i = 0; i < 100; i++) {
String msg = "hello world " + i;
Message message = null;
if (i % 2 == 0) {
message = new Message("filterTopic", //topic
"EVEN", // tag
"keys", //key
msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
} else {
message = new Message("filterTopic", //topic
"ODD", // tag
"keys", //key
msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
}
SendResult sendResult = defaultMQProducer.send(message);
producerLogger.info("第{}条消息:{}发送成功", i, sendResult);
}
2. 订阅
consumer.subscribe("filterTopic", "ODD");
SQL模式过滤
1. 发送之前设置消息属性
message.putUserProperty("num", String.valueOf(i));
2. 订阅,通过消息属性过滤
consumer.subscribe("filterTopic", MessageSelector.bySql("num >=20 and num <= 50"));
3. 如果报错
The broker does not support consumer to filter message by SQL92
需要在broker配置
enablePropertyFilter=true
类过滤模式:自定义消息过滤类需要实现接口MessageFilter。使用类模式的前提是启动FilterServer。
public class MessageFilterImpl implements MessageFilter {
@Override
public boolean match(MessageExt msg, FilterContext context) {
String property = msg.getProperty("num");
if (property != null) {
int num = Integer.parseInt(property);
if (num > 100) {
return true;
}
}
return false;
}
}
consumer.subscribe("filterTopic","xxxx.filter.MessageFilterImpl",filterCode)
事务消息的限制
transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为transactionMsgTimeout 参数。编写回调实现类
public class TransactionListenerImpl implements TransactionListener {
private final static Logger logger = LoggerFactory.getLogger("rocketmq-producer");
private AtomicInteger DB_ID = new AtomicInteger(0);
private ConcurrentHashMap<Integer, String> localDB = new ConcurrentHashMap<>();
/**
* 当发送事务prepare消息成功时,将调用该方法执行本地事务。
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String messageStr = JSON.parseObject(msg.getBody(), String.class);
logger.info("事务ID:{},消息数据:{}", msg.getTransactionId(), msg.getBody());
try {
//模拟业务逻辑(向数据库添加一条数据)
localDB.put(DB_ID.incrementAndGet(), messageStr);
logger.info("事务ID:{},执行业务逻辑:{}", msg.getTransactionId(), messageStr);
} catch (Exception e) {
logger.error("业务处理异常,回滚事务");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
logger.info("消息回查,事务ID:{},消息数据:{}", msg.getTransactionId(), msg.getBody());
String messageStr = JSON.parseObject(msg.getBody(), String.class);
int i = Integer.parseInt(messageStr);
//模拟回查状态
if (i >= 0 && i < 3) {
//继续回查,直到回滚或者提交
return LocalTransactionState.UNKNOW;
} else if (i >= 3 && i < 6) {
//回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
//提交事务
return LocalTransactionState.COMMIT_MESSAGE;
}
}
}
发送
public class TransactionProducerTest {
private final static Logger logger = LoggerFactory.getLogger("rocketmq-producer");
@Test
public void testTransaction() {
String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9876";
String producerGroup = "ProducerTransactionGroupName";
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
try {
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
String msgStr = (i + "");
Message msg =
new Message("transactionTopic",
tags[i % tags.length],
"KEY" + i,
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
logger.info("{}", sendResult);
}
LockSupport.park();
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
producer.shutdown();
}
}
}