回顾
上文中我们对 RocketMQ 的 顺序消息 进行了 spring cloud 版本的演示,这里再回顾一下:
顺序消息分为 分区顺序消息 和 全局顺序消息。
全局顺序消息其实是分区顺序消息的一种特殊情况,即如果只有一个分区且同一时间只有一个消费者线程进行消费,那么就可以看作是全局顺序消息。
在 RocketMQ 创建 topic 时默认队列(分区)数量是:8 ,是针对所有 topic 的

如果要单独设置一个 topic 的队列(分区)数量,在 spring cloud alibaba 中可以这样配置:
1spring:
2
3 application:
4 name: mq-example
5 cloud:
6 stream:
7 bindings:
8 # 定义 name 为 input 的 binding 消费
9 input:
10 content-type: application/json
11 destination: test-topic3
12 group: consumer-group
13 # 定义 name 为 output 的 binding 生产
14 output-order:
15 content-type: application/json
16 destination: test-topic3
17 # Producer 配置项,对应 ProducerProperties 类
18 producer:
19 partitionCount: 1 # 分区数量
注意这里 partitionCount,如将该值设置为 1,则 broker 会将消息发送到同一个分区中。
原理
本文我们重点了解一下,RocketMQ 的顺序消息的实现原理。
在 MQ 的模型中,顺序需要由 3 个阶段去保障:
- 消息被发送时保持顺序
- 消息被存储时保持和发送的顺序一致
- 消息被消费时保持和存储的顺序一致
RocketMQ 要想实现顺序消息,核心就是 Producer 同步发送,确保一组顺序消息被发送到同一个分区队列,然后 Consumer 确保同一个队列只被一个线程消费
发送有序
这里我们串一下代码,看一下 producer 发送消息的时候是怎么实现顺序发送的:
1private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
2 this.makeSureStateOK();
3 Validators.checkMessage(msg, this.defaultMQProducer);
4 long invokeID = this.random.nextLong();
5 long beginTimestampFirst = System.currentTimeMillis();
6 long beginTimestampPrev = beginTimestampFirst;
7 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
8 if (topicPublishInfo != null && topicPublishInfo.ok()) {
9 boolean callTimeout = false;
10 MessageQueue mq = null;
11 Exception exception = null;
12 SendResult sendResult = null;
13 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
14 int times = 0;
15 String[] brokersSent = new String[timesTotal];
16
17 while(true) {
18 label122: {
19 String info;
20 if (times < timesTotal) {
21 info = null == mq ? null : mq.getBrokerName();
22 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);
23 if (mqSelected != null) {
24 mq = mqSelected;
25 brokersSent[times] = mqSelected.getBrokerName();
26
27 long endTimestamp;
28 try {
29 beginTimestampPrev = System.currentTimeMillis();
30 long costTime = beginTimestampPrev - beginTimestampFirst;
31 if (timeout >= costTime) {
32 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
33 endTimestamp = System.currentTimeMillis();
34 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
35 switch(communicationMode) {
36 case ASYNC:
37 return null;
38 case ONEWAY:
39 return null;
40 case SYNC:
41 if (sendResult.getSendStatus() == SendStatus.SEND_OK || !this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
42 return sendResult;
43 }
44 default:
45 break label122;
46 }
47 }
48
49 callTimeout = true;
50 } catch (RemotingException var26) {
51 endTimestamp = System.currentTimeMillis();
52 this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
53 this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var26);
54 this.log.warn(msg.toString());
55 exception = var26;
56 break label122;
57 } catch (MQClientException var27) {
58 endTimestamp = System.currentTimeMillis();
59 this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
60 this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var27);
61 this.log.warn(msg.toString());
62 exception = var27;
63 break label122;
64 } catch (MQBrokerException var28) {
65 endTimestamp = System.currentTimeMillis();
66 this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
67 this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var28);
68 this.log.warn(msg.toString());
69 exception = var28;
70 switch(var28.getResponseCode()) {
71 case 1:
72 case 14:
73 case 16:
74 case 17:
75 case 204:
76 case 205:
77 break label122;
78 default:
79 if (sendResult != null) {
80 return sendResult;
81 }
82
83 throw var28;
84 }
85 } catch (InterruptedException var29) {
86 endTimestamp = System.currentTimeMillis();
87 this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, false);
88 this.log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var29);
89 this.log.warn(msg.toString());
90 this.log.warn("sendKernelImpl exception", var29);
91 this.log.warn(msg.toString());
92 throw var29;
93 }
94 }
95 }
96
97 if (sendResult != null) {
98 return sendResult;
99 }
100
101 info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent));
102 info = info + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/");
103 MQClientException mqClientException = new MQClientException(info, (Throwable)exception);
104 if (callTimeout) {
105 throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
106 }
107
108 if (exception instanceof MQBrokerException) {
109 mqClientException.setResponseCode(((MQBrokerException)exception).getResponseCode());
110 } else if (exception instanceof RemotingConnectException) {
111 mqClientException.setResponseCode(10001);
112 } else if (exception instanceof RemotingTimeoutException) {
113 mqClientException.setResponseCode(10002);
114 } else if (exception instanceof MQClientException) {
115 mqClientException.setResponseCode(10003);
116 }
117
118 throw mqClientException;
119 }
120
121 ++times;
122 }
123 } else {
124 List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
125 if (null != nsList && !nsList.isEmpty()) {
126 throw (new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10005);
127 } else {
128 throw (new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10004);
129 }
130 }
131 }
上面是消息发送的代码,下面梳理下主要流程:
消息发送时,先根据 Topic 从 Broker 拉取 TopicPublishInfo 信息,它里面包含了 Topic 下所有的 MessageQueue。
1 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
2
3 private TopicPublishInfo tryToFindTopicPublishInfo(String topic) {
4 TopicPublishInfo topicPublishInfo = (TopicPublishInfo)this.topicPublishInfoTable.get(topic);
5 if (null == topicPublishInfo || !topicPublishInfo.ok()) {
6 this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
7 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
8 topicPublishInfo = (TopicPublishInfo)this.topicPublishInfoTable.get(topic);
9 }
10
11 if (!topicPublishInfo.isHaveTopicRouterInfo() && !topicPublishInfo.ok()) {
12 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
13 topicPublishInfo = (TopicPublishInfo)this.topicPublishInfoTable.get(topic);
14 return topicPublishInfo;
15 } else {
16 return topicPublishInfo;
17 }
18 }
19
20public class TopicPublishInfo {
21 private boolean orderTopic = false;
22 private boolean haveTopicRouterInfo = false;
23 private List<MessageQueue> messageQueueList = new ArrayList();
24 private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
25 private TopicRouteData topicRouteData;
26
27 public TopicPublishInfo() {
28 }
29 ...
选取一个目标队列:
1 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);
接着调用核心发送方法,将消息发送到 broker
1sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
必须使用同步发送,异步/单向发送都无法保证消息被有序写入队列
sendKernelImpl 方法中有同/异步的判断,这里应该是走的 case SYNC
1case ASYNC:
2 Message tmpMessage = msg;
3 if (msgBodyCompressed) {
4 tmpMessage = MessageAccessor.cloneMessage(msg);
5 msg.setBody(prevBody);
6 }
7
8 long costTimeAsync = System.currentTimeMillis() - beginStartTime;
9 if (timeout < costTimeAsync) {
10 throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
11 }
12
13 sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this);
14 break;
15case ONEWAY:
16case SYNC:
17 long costTimeSync = System.currentTimeMillis() - beginStartTime;
18 if (timeout < costTimeSync) {
19 throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
20 }
21
22 sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this);
23 break;
24default:
25 assert false;
Producer 保证发送有序,只要保证相同 ShardingKey 的消息发送到同一队列即可,以 spring cloud stream 的实现为例,可以查看 PartitionHandler 类中的 determinePartition 方法
1 public int determinePartition(Message<?> message) {
2 Object key = this.extractKey(message);
3 int partition;
4 if (this.producerProperties.getPartitionSelectorExpression() != null) {
5 partition = (Integer)this.producerProperties.getPartitionSelectorExpression().getValue(this.evaluationContext, key, Integer.class);
6 } else {
7 partition = this.partitionSelectorStrategy.selectPartition(key, this.partitionCount);
8 }
9
10 return Math.abs(partition % this.partitionCount);
11 }
可以看到 partition 的值如果之前配置了分区 key 表达式如:
1 producer:
2 partition-key-expression: payload['id']
则值是表达式的值,如没有配置,则走默认策略,默认策略的实现取的是消息的 hash:
1private static class DefaultPartitionSelector implements PartitionSelectorStrategy {
2 private DefaultPartitionSelector() {
3 }
4
5 public int selectPartition(Object key, int partitionCount) {
6 int hashCode = key.hashCode();
7 if (hashCode == -2147483648) {
8 hashCode = 0;
9 }
10
11 return Math.abs(hashCode);
12 }
13 }
最后队列的选择是利用 partition 和队列(分区)总数取模后得到的结果。 这样就可以保证相同 ShardingKey 的消息发送到同一队列了。
整体的流程如下图:

消息发送后,由于队列本身的 FIFO 特性,它保存到 broker 端也是有序的。
消费有序
Consumer 默认是多线程并发消费同一个 MessageQueue 的,即使消息是顺序到达的,也不能保证消息顺序消费。
那么 RocketMQ 如何保证消息顺序消费呢?
与 producer 一样,我们按照 consumer 的流程串一下代码
consumer 启动时,在 MQClientInstance 类的 start 方法中进行了重平衡操作:
1public void start() throws MQClientException {
2
3 synchronized (this) {
4 switch (this.serviceState) {
5 case CREATE_JUST:
6 this.serviceState = ServiceState.START_FAILED;
7 // If not specified,looking address from name server
8 if (null == this.clientConfig.getNamesrvAddr()) {
9 this.mQClientAPIImpl.fetchNameServerAddr();
10 }
11 // Start request-response channel
12 this.mQClientAPIImpl.start();
13 // Start various schedule tasks
14 this.startScheduledTask();
15 // Start pull service
16 this.pullMessageService.start();
17 // Start rebalance service
18 this.rebalanceService.start();
19 // Start push service
20 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
21 log.info("the client factory [{}] start OK", this.clientId);
22 this.serviceState = ServiceState.RUNNING;
23 break;
24 case START_FAILED:
25 throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
26 default:
27 break;
28 }
29 }
30 }
就是这一行 rebalanceService.start() ,它的目的是给自己分配 MessageQueue。要保证一个队列被一个消费者消费,那么消费者在进行消息拉取消费时就必须向 mq 服务器申请队列锁。如果申请到琐,则拉取消息,否则放弃消息拉取,等到下一个队列负载周期 (20s) 再试。
申请锁可以参考 RebalanceImpl类 updateProcessQueueTableInRebalance 和 lock 方法中的代码:
1 //如果是顺序消息,需要向 Broker 申请锁队列,加锁成功才开始消费。
2for (MessageQueue mq : mqSet) {
3 if (!this.processQueueTable.containsKey(mq)) {
4 if (isOrder && !this.lock(mq)) {
5 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
6 continue;
7 }
8
9 this.removeDirtyOffset(mq);
10 ProcessQueue pq = new ProcessQueue();
11
12public boolean lock(final MessageQueue mq) {
13 // 查找 Broker Master 主机地址
14 FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
15 if (findBrokerResult != null) {
16 // 构建请求体
17 LockBatchRequestBody requestBody = new LockBatchRequestBody();
18 requestBody.setConsumerGroup(this.consumerGroup);// 消费组
19 requestBody.setClientId(this.mQClientFactory.getClientId());// 客户端实例 ID
20 requestBody.getMqSet().add(mq);// 申请锁哪些队列
21
22 try {
23 // 发送请求,Broker 返回锁住的队列集合
24 Set<MessageQueue> lockedMq =
25 this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
26 for (MessageQueue mmqq : lockedMq) {
27 ProcessQueue processQueue = this.processQueueTable.get(mmqq);
28 if (processQueue != null) {
29 processQueue.setLocked(true);
30 processQueue.setLastLockTimestamp(System.currentTimeMillis());
31 }
32 }
33 // 目标队列在里面,就说明加锁成功了
34 boolean lockOK = lockedMq.contains(mq);
35 log.info("the message queue lock {}, {} {}",
36 lockOK ? "OK" : "Failed",
37 this.consumerGroup,
38 mq);
39 return lockOK;
40 } catch (Exception e) {
41 log.error("lockBatchMQ exception, " + mq, e);
42 }
43 }
44 return false;
45}
这个锁是 Broker 维护的全局锁。
一旦加锁成功,就会开始构建 PullRequest 对象开始拉取消息,消息拉取部分的代码实现在 PullMessageService 中,拉取成功后,在 PullCallback 里会将拉取到的消息填充到 ProcessQueue,然后提交消费请求,让 ConsumeMessageOrderlyService 开始消费消息。
消费消息时,先获取 MessageQueue 的锁对象,然后通过 synchronized 关键字保证只有一个线程消费

对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断地进行消息重试(每次间隔时间为 1 秒),重试最大值是 Integer.MAX_VALUE
1case SUSPEND_CURRENT_QUEUE_A_MOMENT:
2 this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
3 // 校验最大重试次数,默认 Integer.MAX_VALUE
4 if (checkReconsumeTimes(msgs)) {
5 // 标记消息等待重新消费
6 consumeRequest.getProcessQueue().makeMessageToConsumeAgain(msgs);
7 // 提交消费请求,稍后重试
8 this.submitConsumeRequestLater(
9 consumeRequest.getProcessQueue(),
10 consumeRequest.getMessageQueue(),
11 context.getSuspendCurrentQueueTimeMillis());
12 continueConsume = false;
13 } else {
14 commitOffset = consumeRequest.getProcessQueue().commit();
15 }
16 break;
最后补充一点,在消费的过程中,会对处理队列 (ProccessQueue) 进行加锁,保证处理中的消息消费完成,发生队列负载后,其他消费者才能继续消费。
例如队列 q3 目前是分配给消费者 C2 进行消费,已将拉取了 32 条消息在线程池中处理,然后对消费者进行了扩容,分配给 C2 的 q3 队列,被分配给 C3 了,由于 C2 已将处理了一部分,位点信息还没有提交,如果 C3 立马去消费 q3 队列中的消息,那存在一部分数据会被重复消费,故在 C2 消费者在消费 q3 队列的时候,消息没有消费完成,那负载队列就不能丢弃该队列,就不会在 broker 端释放琐,其他消费者就无法从该队列消费,尽最大可能保证了消息的重复消费,保证顺序性语义
消费总结 :
- 创建消息拉取任务时,消息客户端向 broker 端申请锁定 MessageQueue,使得一个 MessageQueue 同一个时刻只能被一个消费客户端消费
- 消息消费时,多线程针对同一个消息队列的消费先尝试使用 synchronized 申请独占锁,加锁成功才能进行消费,使得一个 MessageQueue 同一个时刻只能被一个消费客户端中一个线程消费
- RocketMQ 中每一个消费组一个单独的线程池并发消费拉取到的消息,即消费端是多线程消费。而顺序消费的并发度等于该消费者分配到的队列数。消费并行度理论上不会有太大问题,因为 MessageQueue 的数量可以调整。
- 在消费的过程中,会对处理队列 (ProccessQueue) 进行加锁,保证处理中的消息消费完成
- 顺序消息一旦消费失败,默认会一直重试,不会跳过,因为一旦跳过就失去顺序消息的语义了
顺序消息可能存在的问题
消息阻塞
在顺序性语义的要求下,如果一条消息没有被成功消费,下一条消息就不能被消费,否则就不是顺序消费了。一条消息失败,如果跳过去消费其他消息,那就违背了顺序消费的语义。
建议在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。可以提供一些策略,由用户根据错误类型来决定是否跳过,并且提供重试队列之类的功能,在跳过之后用户可以在“其他”地方重新消费到这条消息。
failover 失效
发送顺序消息无法利用集群的 Failover 特性,因为不能更换 MessageQueue 进行重试
