RocketMQ 系列 消息有序

消息中间件基本问题

Posted by lichao modified on June 30, 2020

RocketMQ 的顺序消息需要满足 2 点:

  1. Producer 端保证发送消息有序,且发送到同一个队列。
  2. Consumer 端保证消费同一个队列。

生产端

RocketMQ 可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。要全局顺序只能一个分区。

但是同一条 queue 里面,RocketMQ 的确是能保证 FIFO 的。

确保消息放到同一个 queue 中,需要使用 MessageQueueSelector。

1
2
3
4
5
6
7
8
9
10
11
12
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, body.getBytes());
//确保同一个订单号的数据放到同一个queue中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Long id = (Long) arg;
                        long index = id % mqs.size();
                        return mqs.get((int)index);
                    }
                }, orderList.get(i).getOrderId());//订单id

消费端

需要使用 MessageListenerOrderly 来消费数据。

MessageListenerOrderly 与 MessageListenerConcurrently 区别:

  • MessageListenerOrderly: 有序消费,同一队列的消息同一时刻只能一个线程消费,可保证消息在同一队列严格有序消费
  • MessageListenerConcurrently: 并发消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ConsumerInOrder {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr("10.11.11.11:9876;10.11.11.12:9876");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 
        consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");
 
        consumer.registerMessageListener(new MessageListenerOrderly() {
 
            Random random = new Random();
 
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
                for (MessageExt msg: msgs) {
                    System.out.println(msg + ", content:" + new String(msg.getBody()));
                }
                try {
                    //模拟业务逻辑处理中...
                    TimeUnit.SECONDS.sleep(random.nextInt(10));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
 
        consumer.start();
 
        System.out.println("Consumer Started.");
    }
}