mq

MQ 系列 消息不丢

刨析rocketMQ的底层实现

Posted by lichao modified on September 29, 2019

消息重投

对于普通消息,消息发送默认采用 round-robin 机制来选择发送到哪一个队列,如果发送失败,默认重试 2 次。对于普通有序消息,RocketMQ 是不会进行重试的。如果需要重试,那么业务 RD 同学需要自己编写重试代码,例如通过一个 for 循环,最多重试几次。对于严格有序消息,由于直接指定了一个 MessageQueue。如果这个 MessageQueue 所在的 Broker 宕机了,那么之后的重试必然都失败。

生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway 没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息。如下方法可以设置消息重试策略:

  • retryTimesWhenSendFailed: 同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。
  • retryTimesWhenSendAsyncFailed: 异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。
  • retryAnotherBrokerWhenNotStoreOK: 消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他 broker,默认false。十分重要消息可以开启。

消费重试

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以认为有以下几种情况:

  • 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过 10 秒后再重试。
  • 由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。

RocketMQ 会为每个消费组都设置一个 Topic 名称为 “%RETRY%+consumerGroup” 的重试队列(这里需要注意的是,这个 Topic 的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致 Consumer 端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。

消费失败重试

  • Kafka 消费失败不支持重试。
  • RocketMQ 消费失败支持定时重试,每次重试间隔时间顺延。

死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

数据可靠性

  • RocketMQ 支持异步刷盘,同步刷盘,同步复制,异步复制。
  • Kafka 使用异步刷盘方式,异步复制/同步复制。

总结:RocketMQ 的同步刷盘在单机可靠性上比 Kafka 更高,不会因为操作系统 Crash 导致数据丢失。Kafka 同步 Replication 理论上性能低于 RocketMQ 的同步 Replication,原因是 Kafka 的数据以分区为单位组织,意味着一个 Kafka 实例上会​​有几百个数据分区,RocketMQ 一个实例上只有一个数据分区,RocketMQ 可以充分利用 IO 组 Commit 机制,批量传输数据,配置同步 Replication 与异步 Replication 相比,性能损耗约 20%~30%.

消息可靠性

影响消息可靠性的几种情况:

  1. Broker 正常关闭
  2. Broker 异常 Crash
  3. OS Crash
  4. 机器掉电,但是能立即恢复供电情况。
  5. 机器无法开机(可能是 cpu 、主板、内存等关键设备损坏)
  6. 磁盘设备损坏。

1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。

5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。注:RocketMQ从3.0版本开始支持同步双写。

消息持久化

消息中间件通常采用的几种持久化方式:

  1. 持久化到数据库,例如 MySQL。
  2. 持久化到 KV 存储,例如 levelDB 、伯克利 DB 等 KV 存储系统。
  3. 文件记录形式持久化,例如 Kafka RocketMQ
  4. 对内存数据做一个持久化镜像,例如 beanstalkd VisiNotify

1.2.3 三种持久化方式都具有将内存队列 Buffer 进行扩展的能力,4 只是一个内存的镜像,作用是当 Broker 挂掉重启后仍然能将之前内存的数据恢复出来。

JMS 与 CORBA Notification 规范没有明确说明如何持久化,但是持久化部分的性能直接决定了整个 消息中间件的性能。

RocketMQ 参考了 Kafka 的持久化方式,充分利用 Linux 文件系统内存 cache 来提高性能。