mq

MQ 系列 消息过滤

刨析rocketMQ的底层实现

Posted by lichao modified on September 29, 2019

消息过滤

目前业界流行的消息过滤方案有以下几种:

  • 基于 tag 的消息过滤: 一种常用的方案是基于 tag 进行消息过滤,tag 相当于对 topic 中消息再次进行逻辑分类
  • 基于 sql 的消息过滤: 消息 Message 对象中添加一些自定义用户属性,消费方在消息时,可以指定一个 sql 过滤表达式
  • 基于动态脚本过滤,以下两种方式,本质上都是动态脚本过滤,只不过实现策略不同:
    • DDMQ支持 groovy 脚本中对消息进行:过滤,修改等操作。
    • RocketMQ 支持根据一个实现了 MessageFilter 接口的类进行消息过滤,消费者需要实现这个接口,并将源码当做订阅条件上报给 RocketMQ。

kafka 不具备消息过滤的能力,意味着所有的下游消费者都会接受到所有的消息。

RocketMQ 消费者可以根据 Tag 进行消息过滤,一个消息只能指定一个 Tag,这是因为 RocketMQ 在存储时,将消息的 tag 整体内容计算出一个 hashcode 进行存储。消息过滤目前是在 Broker 端实现的,优点是减少了对于 Consumer 无用消息的网络传输,缺点是增加了 Broker 的负担、而且实现相对复杂。

RocketMQ 消费者支持基于自定义属性 sql 过滤。

ActiveMQ 支持基于 SQL 的消息过滤。

不支持代理端的消息过滤

  • RocketMQ支持两种代理端消息过滤方式
    • 根据消息变量来过滤,相当于子主题概念
    • 向服务器上传一段Java代码,可以对消息做任意形式的过滤,甚至可以做Message身体的过滤拆分。