RocketMQ 系列 高可用设计

刨析rocketMQ的底层实现

Posted by lichao modified on May 27, 2020

消息消费

这里将消息的整体处理阶段分为 3 个阶段进行分析:

  • Producer 发送消息阶段。
  • Broker 处理消息阶段。
  • Consumer 消费消息阶段。

概念

故障延迟机制

消息消费 消息发送端采用重试机制,选择消息队列有两种方式。

获取机制:

直接用 sendWhichQueue 自增再获取值,与当前路由表中消息队列个数取模,返回该位置的MessageQueue。 如果消息发送再失败的话,下次进行消息队列选择时规避上次 MesageQueue 所在的Broker, 否则还是很有可能再次失败。

消息消费模式

广播模式与集群模式:

  • 广播模式比较简单,每一个消费者需要去拉取订阅主题下所有消费队列的消息
  • 在集群模式下,同一个消费组内有多个消息消费者,同一个主题存在多个消费队列,那么消费者进行消息队列负载

Producer 发送消息阶段

发送消息阶段涉及到 Producer 到 broker 的网络通信,因此丢失消息的几率一定会有,那 RocketMQ 在此阶段用了哪些手段保证消息不丢失了(或者说降低丢失的可能性)。

手段一:提供 SYNC 的发送消息方式,等待 broker 处理结果

RocketMQ 提供了 3 种发送消息方式,分别是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 // 1、同步发送
SendResult sendResult = producer.send(msg);

//2、异步发送
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
    }
    @Override
    public void onException(Throwable e) {
    }
});

//3、 Oneway发送
producer.sendOneway(msg);
  • 同步发送:Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应 发送结果。
  • 异步发送:Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。
  • Oneway发送:Oneway 方式只负责发送请求,不等待应答,Producer 只负责把请求发出去,而不处理响应结果。

在调用 producer.send 方法时,不指定回调方法,则默认采用同步发送消息的方式,这也是丢失几率最小的一种发送方式。

手段二:发送消息如果失败或者超时,则重新发送

发送重试源码如下,本质其实就是一个 for 循环,当发送消息发生异常的时候重新循环发送。默认重试3次,重试次数可以通过 producer 指定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//只有同步发送的方式才会重试
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
 //for循环来重试
 for (; times < timesTotal; times++) {
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                    try {
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        //不同的发送消息方式
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }
                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        continue;
                    } catch (MQClientException e) {
                        continue;
                    } catch (MQBrokerException e) {
                        exception = e;
                    }
                } 
            }

手段三:broker 提供多 master 模式,即使某台 broker 宕机了,保证消息可以投递到另外一台正常的 broker 上

如果 broker 只有一个节点,则 broker 宕机了,即使 producer 有重试机制,也没用,因此利用多主模式,当某台 broker 宕机了,换一台 broker 进行投递。

总结

producer 消息发送方式虽然有 3 种,但为了减小丢失消息的可能性尽量采用同步的发送方式,同步等待发送结果,利用同步发送+重试机制+多个 master 节点,尽可能减小消息丢失的可能性。

Broker 处理消息阶段

手段四:提供同步刷盘的策略

1
2
3
4
public enum FlushDiskType {
    SYNC_FLUSH, //同步刷盘
    ASYNC_FLUSH//异步刷盘(默认)
}

当消息投递到 broker 之后,会先存到 page cache,然后根据 broker 设置的刷盘策略是否立即刷盘,也就是如果刷盘策略为异步,broker 并不会等待消息落盘就会返回 producer 成功,也就是说当broker 所在的服务器突然宕机,则会丢失部分页的消息。

手段五:提供主从模式,同时主从支持同步双写

即使broker设置了同步刷盘,如果主broker磁盘损坏,也是会导致消息丢失。 因此可以给broker指定slave,同时设置master为SYNC_MASTER,然后将slave设置为同步刷盘策略。

此模式下,producer每发送一条消息,都会等消息投递到master和slave都落盘成功了,broker才会当作消息投递成功,保证休息不丢失。

总结:

在broker端,消息丢失的可能性主要在于刷盘策略和同步机制。 RocketMQ默认broker的刷盘策略为异步刷盘,如果有主从,同步策略也默认的是异步同步,这样子可以提高broker处理消息的效率,但是会有丢失的可能性。因此可以通过同步刷盘策略+同步slave策略+主从的方式解决丢失消息的可能。

Consumer消费消息阶段

手段六:consumer 默认提供的是 At least Once 机制

从 producer 投递消息到 broker,即使前面这些过程保证了消息正常持久化,但如果 consumer 消费消息没有消费到也不能理解为消息绝对的可靠。因此 RockerMQ 默认提供了 At least Once 机制保证消息可靠消费。

何为 At least Once?

Consumer 先 pull 消息到本地,消费完成后,才向服务器返回 ack。

通常消费消息的 ack 机制一般分为两种思路:

  1. 先提交后消费
  2. 先消费,消费成功后再提交

思路一可以解决重复消费的问题但是会丢失消息,因此Rocket默认实现的是思路二,由各自consumer业务方保证幂等来解决重复消费问题。

手段七:消费消息重试机制

当消费消息失败了,如果不提供重试消息的能力,则也不能算完全的可靠消费,因此 RocketMQ 本身提供了重新消费消息的能力。

consumer 端要保证消费消息的可靠性,主要通过 At least Once + 消费重试机制保证。

Consumer 的业务消费代码一定要保证幂等的原因?

  1. 由于先消费消息,再提交 offset,因此可能存在消费完消息之后,提交 offset 失败;当然这种可能性极低(因为消费完之后提交 offset 只是做了内存操作)
  2. 由于 offset 是先存在内存中,定时器间隔几秒提交给 broker,消费之后的 offset 是完全存在可能丢失的风险(例如 consumer 端突然宕机),从而会导致没有提交 offset 到 broker,再次启动consumer 客户端时,会重复消费。

参考文献

http://silence.work/2019/05/03/RocketMQ-Broker%E7%AB%AF%E5%A4%84%E7%90%86%E6%B6%88%E6%81%AF%E8%BF%87%E7%A8%8B%E5%88%86%E6%9E%90/

http://silence.work/2019/05/03/RocketMQ-Broker%E7%AB%AF%E5%A4%84%E7%90%86%E6%B6%88%E6%81%AF%E8%BF%87%E7%A8%8B%E5%88%86%E6%9E%90/