RocketMQ 系列 消费者

刨析rocketMQ的底层实现

Posted by lichao modified on September 29, 2019

概念

消费模式

消息消费以组的模式开展, 一个消费组内可以包含多个消费者,每一个消费组可订阅多个主题,消费组之间有集群模式与广播模式两种消费模式。

  • 集群模式,主题下的同一条消息只允许被其中一个消费者消费。
  • 广播模式,主题下的同一条消息将被集群内的所有消费者消费一次。

消息传送方式

消息服务器与消费者之间的消息传送也有两种方式:推模式、拉模式。所谓的拉模式,是消费端主动发起拉消息请求,而推模式是消息到达消息服务器后,推送给消息消费者。RocketMQ 消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。

消息过滤模式

问题

消息消费流程

消息消费

  1. 构建topic订阅消息SubscriptionData并加入到 Rebalancelmpl 的订阅消息中:
    • topic订阅信息
    • 订阅重试主题消息。RocketMQ 消息重试是以消费组为单位,而不是主题,消息重试主题名为%RETRY%+消费组名。消费者在启动的时候会自动订阅该 主题,参与该主题的消息队列负载。
  2. Rebalanccelmpl创建PullRequest对象放入pullRequestQueue,根据 PullRequest 拉取任务执行完一次消息拉取任务后,又将 PullRequest 对象放入到pullRequestQueue 消息消费
    • String consumerGroup :消费者组。
    • MessageQueue messageQueue:待拉取消费队列。
    • ProcessQueue processQueue :消息处理队列,从Broker 拉取到的消息先存人ProccessQueue, 然后再提交到消费者消费线程池消费。
    • long nextOffset :待拉取的MessageQueue 偏移量。
    • Boolean lockedFirst :是否被锁定。
  3. ProcessQueue 是 MessageQueue 在消费端的重现、快照。PullMessageService 从消息服务器默认每次拉取 32 条消息,按消息的队列偏移量顺序存放在 ProcessQueue 中,PullMessageService 然后将消息提交到消费者消费线程池,消息成功消费后从ProcessQueue 中移除 4.

集群模式下,多个消费者如何对消息队列进行负载呢?

消息队列负载机制遵循一个通用的思想: 一个消息队列同一时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。 RocketMQ 支持局部顺序消息消费,也就是保证同一个消息队列上的消息顺序消费。不支持消息全局顺序消费, 如果要实现某一主题的全局顺序消息消费, 可以将该主题的队列数设置为1 ,牺牲高可用性。

集群内多个消费者是如何负载主题下的多个消费队列, 并且如果有新的消费者加入时,消息队列又会如何重新分布?

如何管理消息进度offset?

  1. 初始化消息进度。如果消息消费是集群模式,那么消息进度保存在Broker上;如果是广播模式,那么消息消费进度存储在消费端。

如何进行消息重试?

消息消费

  1. 构建topic订阅消息SubscriptionData并加入到 Rebalancelmpl 的订阅消息中:
    • topic订阅信息
    • 订阅重试主题消息。RocketMQ 消息重试是以消费组为单位,而不是主题,消息重试主题名为%RETRY%+消费组名。消费者在启动的时候会自动订阅该 主题,参与该主题的消息队列负载。 2.

  • 每一个消费组内维护一个线程池来消费消息