Featured image of post 自顶向下学习 RocketMQ(九):回溯消费

自顶向下学习 RocketMQ(九):回溯消费

定义“回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Brok

定义

回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒。

Demo

我们仍然是利用 Spring Cloud Stream 的编程模型 + Spring Cloud Alibaba RocketMQ 来实现。

理论

在消费时,可以设置一个字段 ConsumeFromWhere(源码位置在:org.apache.rocketmq.common.consumer.ConsumeFromWhere),从哪开始消费。可选参数,去掉 Deprecated 的,剩下的就是

1public enum ConsumeFromWhere {
2  CONSUME_FROM_LAST_OFFSET,
3  CONSUME_FROM_FIRST_OFFSET,
4  CONSUME_FROM_TIMESTAMP,
5}
  • CONSUME_FROM_LAST_OFFSET:从最后的偏移量开始消费
  • CONSUME_FROM_FIRST_OFFSET:从最小偏移量开始消费
  • CONSUME_FROM_TIMESTAMP:从某个时间开始消费

我们需要设置从某个时间开始消费,即配置 CONSUME_FROM_TIMESTAMP 并设置好具体的时间点。

实现

首先还是看一下配置文件

 1server:
 2  port: 8080
 3  servlet:
 4    context-path: /mq-example
 5
 6spring:
 7 
 8  application:
 9    name: mq-example
10  cloud:
11    stream:
12      bindings:
13
14        input-backtracking:
15          content-type: application/json
16          destination: test-topic3
17          group: backtracking-consumer-group
18
19        # 定义 name 为 output 的 binding 生产
20        output-order:
21          content-type: application/json
22          destination: test-topic3
23
24      rocketmq:
25        # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
26        binder:
27          # 配置 rocketmq 的 nameserver 地址
28          name-server: 127.0.0.1:9876
29          group: rocketmq-group
30        bindings:
31          output-order:
32            # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
33            producer:
34              #group: producer-group # 生产者分组
35              sync: true # 是否同步发送消息,默认为 false 异步。
36          input-backtracking: # 回溯消息配置
37            # com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties
38            consumer:
39              consumeFromWhere: CONSUME_FROM_TIMESTAMP
40              consumeTimestamp: 20220117110148
41              enabled: true # 是否开启消费,默认为 true
42              broadcasting: false # 是否使用广播消费,默认为 false 使用集群消费

这里我们仍然用之前的 ouput-order 作为生产者,生产消息。

消息者配置上主要注意 input-backtracking 节点中的属性配置:

  • consumeFromWhere 即上文提到的从哪儿开始消费,这里我们指定时间消费
  • consumeTimestamp 即指定的时间点

程序入口:

1@SpringBootApplication
2@EnableBinding({ MySource.class})
3public class MqBootstrapApplication {
4    public static void main(String[] args) {
5        SpringApplication.run(MqBootstrapApplication.class);
6    }
7
8}

要加上 @EnableBinding

MySource:

1public interface MySource {
2
3    @Output("output-order")
4    MessageChannel output4Order();
5
6    @Input("input-backtracking")
7    MessageChannel inputBackTracking();
8
9}

controller 生产消息:

 1@GetMapping("/produce")
 2    public void produceMsg() {
 3
 4        Map<String, Object> headers = Maps.newHashMapWithExpectedSize(16);
 5        headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 2);
 6        headers.put(MessageConst.PROPERTY_TAGS, "test03");
 7
 8        Order order = Order.builder().id(1L).desc("test").build();
 9        Message message = MessageBuilder.createMessage(order, new MessageHeaders(headers));
10        mySource.output4Order().send(message);
11
12    }

ReceiveService 消费消息:

 1
 2@Service
 3@Log4j2
 4public class ReceiveService {
 5
 6    @StreamListener("input-backtracking")
 7    public void receiveBackTrackingInput(String receiveMsg, GenericMessage message, @Headers Map headers) {
 8
 9        log.info("接收到回溯消息:{}", receiveMsg);
10
11    }
12
13}

测试

可以先调用 controller 生产消息,或者不用 Demo 中的生产者生产消息,找一个之前发过消息的 topic , 看一下它的消息轨迹,找到存储时间

Image

如果你用之前发过消息的 topic 记得修改配置文件中的 topic名称 :

Image

确认找到的这条消息已经被消费过(因为要测回溯,至少是二次消费),将 consumeTimestamp 的时间配置在 存储时间之后。

这时启动项目,观察 ReceiveService 的输出:

1 接收到回溯消息:{"id":1,"desc":"test"}

证明消息回溯消费成功。

参考

位旅人路过 次翻阅 初次见面