Java充电社
专辑
博文
联系我
本人继续续收门徒,亲手指导
RocketMQ第13篇:原理篇-消息消费
相关专辑:
RocketMQ专题
<div style="display:none"></div> 消费者从Broker中获取消息的方式有两种:pull拉取方式和push推动方式。消费者组对于消息消费的模式又分为两种:集群消费Clustering和广播消费Broadcasting。 ## 1、获取消费类型 ### 1.1、拉取式消费 Consumer主动从Broker中拉取消息,主动权由Consumer控制。一旦获取了批量消息,就会启动消费过程。不过,该方式的实时性较弱,即Broker中有了新的消息时消费者并不能及时发现并消费。 > 由于拉取时间间隔是由用户指定的,所以在设置该间隔时需要注意平稳:间隔太短,空请求比例会增加;间隔太长,消息的实时性太差 ### 1.2、推送式消费 该模式下Broker收到数据后会主动推送给Consumer。该获取方式一般实时性较高。 该获取方式是典型的发布-订阅模式,即Consumer向其关联的Queue注册了监听器,一旦发现有新的消息到来就会触发回调的执行,回调方法是Consumer去Queue中拉取消息。而这些都是基于Consumer与Broker间的长连接的。长连接的维护是需要消耗系统资源的。 ### 1.3、对比 - pull:需要应用去实现对关联Queue的遍历,实时性差;但便于应用控制消息的拉取 - push:封装了对关联Queue的遍历,实时性强,但会占用较多的系统资 ## 2、消费模式 ### 2.1、广播消费 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/361/e809633b-7e1a-4e1b-a635-164bb4fdf811.png) 广播消费模式下,相同Consumer Group的每个Consumer实例都接收同一个Topic的全量消息。即每条消息都会被发送到Consumer Group中的每个Consumer。 ### 2.2、集群消费 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/361/224e4ad9-4ad5-4605-a5ed-6ee7ff7e727e.png) 集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊同一个Topic的消息。即每条消息只会被发送到Consumer Group中的某个Consumer。 ### 2.3、消息进度保存 - 广播模式:消费进度保存在consumer端。因为广播模式下consumer group中每个consumer都会消费所有消息,但它们的消费进度是不同。所以consumer各自保存各自的消费进度。 - 集群模式:消费进度保存在broker中。consumer group中的所有consumer共同消费同一个Topic中的消息,同一条消息只会被消费一次。消费进度会参与到了消费的负载均衡中,故消费进度是需要共享的。下图是broker中存放的各个Topic的各个Queue的消费进度。 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/361/7b6066ed-f3c6-4cfc-82b3-e5a83921f5c4.png) ## 3、Rebalance机制 Rebalance机制讨论的前提是:集群消费。 ### 3.1、什么是Rebalance Rebalance即再均衡,指的是,将⼀个Topic下的多个Queue在同⼀个Consumer Group中的多个Consumer间进行重新分配的过程。 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/361/d7eb0d67-37f1-48a0-9dc1-85aceaacd21b.png) Rebalance机制的本意是为了提升消息的并行消费能力。例如,⼀个Topic下5个队列,在只有1个消费者的情况下,这个消费者将负责消费这5个队列的消息。如果此时我们增加⼀个消费者,那么就可以给其中⼀个消费者分配2个队列,给另⼀个分配3个队列,从而提升消息的并行消费能力。 ### 3.2、Rebalance限制 由于⼀个队列最多分配给⼀个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时,多余的消费者实例将分配不到任何队列。 ### 3.3、Rebalance危害 Rebalance的在提升消费能力的同时,也带来一些问题: **消费暂停**:在只有一个Consumer时,其负责消费所有队列;在新增了一个Consumer后会触发Rebalance的发生。此时原Consumer就需要暂停部分队列的消费,等到这些队列分配给新的Consumer后,这些暂停消费的队列才能继续被消费。 **消费重复**:Consumer 在消费新分配给自己的队列时,必须接着之前Consumer 提交的消费进度的offset继续消费。然而默认情况下,offset是异步提交的,这个异步性导致提交到Broker的offset与Consumer实际消费的消息并不一致。这个不一致的差值就是可能会重复消费的消息。 > 同步提交:consumer提交了其消费完毕的一批消息的offset给broker后,需要等待broker的成功ACK。当收到ACK后,consumer才会继续获取并消费下一批消息。在等待ACK期间,consumer是阻塞的。 > > 异步提交:consumer提交了其消费完毕的一批消息的offset给broker后, 不需要等待broker的成功ACK。consumer可以直接获取并消费下一批消息。 > > 对于一次性读取消息的数量,需要根据具体业务场景选择一个相对均衡的是很有必要的。因为数量过大,系统性能提升了,但产生重复消费的消息数量可能会增加;数量过小,系统性能会下降,但被重复消费的消息数量可能会减少。 **消费突刺**:由于Rebalance可能导致重复消费,如果需要重复消费的消息过多,或者因为Rebalance暂停时间过长从而导致积压了部分消息。那么有可能会导致在Rebalance结束之后瞬间需要消费很多消息。 ### 3.4、Rebalance产生的原因 导致Rebalance产生的原因,无非就两个:消费者所订阅Topic的Queue数量发生变化,或消费者组中消费者的数量发生变化 > 1)Queue数量发生变化的场景: > > - Broker扩容或缩容 > - Broker升级运维 > - Broker与NameServer间的网络异常 > - Queue扩容或缩容 > > 2)消费者数量发生变化的场景: > > - Consumer Group扩容或缩容 > - Consumer升级运维 > - Consumer与NameServer间网络异常 ### 3.5、Rebalance过程 在Broker中维护着多个Map集合,这些集合中动态存放着当前Topic中Queue的信息、Consumer Group中Consumer实例的信息。一旦发现消费者所订阅的Queue数量发生变化,或消费者组中消费者的数量发生变化,立即向Consumer Group中的每个实例发出Rebalance通知。 > TopicConfigManager:key是topic名称,value是TopicConfig。TopicConfig中维护着该Topic中所有Queue的数据。 > > ConsumerManager:key是Consumser Group Id,value是ConsumerGroupInfo。 > > ConsumerGroupInfo中维护着该Group中所有Consumer实例数据。 > > ConsumerOffsetManager:key为 Topic与订阅该Topic的Group的组合,即topic@group,value是一个内层Map。内层Map的key为QueueId,内层Map的value为该Queue的消费进度offset。 Consumer实例在接收到通知后会采用Queue分配算法自己获取到相应的Queue,即由Consumer实例自主进行Rebalance。 ### 3.5、与Kafka对比 在Kafka中,一旦发现出现了Rebalance条件,Broker会调用Group Coordinator来完成Rebalance。 Coordinator是Broker中的一个进程。Coordinator会在Consumer Group中选出一个Group Leader。由这个Leader根据自己本身组情况完成Partition分区的再分配。这个再分配结果会上报给Coordinator,并由Coordinator同步给Group中的所有Consumer实例。 Kafka中的Rebalance是由Consumer Leader完成的。而RocketMQ中的Rebalance是由每个Consumer自身完成的,Group中不存在Leader。 ## 4、Queue分配算法 一个Topic中的Queue只能由Consumer Group中的一个Consumer进行消费,而一个Consumer可以同时消费多个Queue中的消息。那么Queue与Consumer间的配对关系是如何确定的,即Queue要分配给哪个Consumer进行消费,也是有算法策略的。常见的有四种策略。这些策略是通过在创建Consumer时的构造器传进去的。 ### 4.1、平均分配策略 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/361/9468376e-918b-488d-b19c-f17e84c313cf.png) 该算法是要根据avg = QueueCount / ConsumerCount 的计算结果进行分配的。如果能够整除,则按顺序将avg个Queue逐个分配Consumer;如果不能整除,则将多余出的Queue按照Consumer顺序逐个分配。 该算法即,先计算好每个Consumer应该分得几个Queue,然后再依次将这些数量的Queue逐个分配个Consumer。 ### 4.2、环形平均策略 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/361/664d5269-0fe2-4152-9f32-602bad8b9e5b.png) 环形平均算法是指,根据消费者的顺序,依次在由queue队列组成的环形图中逐个分配。 该算法不用事先计算每个Consumer需要分配几个Queue,直接一个一个分即可。 ### 4.3、一致性hash策略 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/361/eddfd75b-d600-4d17-8430-7c4b6c83ee83.png) 该算法会将consumer的hash值作为Node节点存放到hash环上,然后将queue的hash值也放到hash环上,通过顺时针方向,距离queue最近的那个consumer就是该queue要分配的consumer。 > 该算法存在的问题:分配不均。 ### 4.4、同机房策略 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/361/f596e7c1-97ce-4a76-849b-5fe2a3a59f6d.png) 该算法会根据queue的部署机房位置和consumer的位置,过滤出当前consumer相同机房的queue。然后按照平均分配策略或环形平均策略对同机房queue进行分配。如果没有同机房queue,则按照平均分配策略或环形平均策略对所有queue进行分配。 ### 4.5、对比 一致性hash算法存在的问题: 两种平均分配策略的分配效率较高,一致性hash策略的较低。因为一致性hash算法较复杂。另外,一致性hash策略分配的结果也很大可能上存在不平均的情况。 一致性hash算法存在的意义: 其可以有效减少由于消费者组扩容或缩容所带来的大量的Rebalance。 ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/361/5693c2cc-b20f-4a36-8ece-866adb8c8257.png) ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/361/4a5e2d79-a59a-4b0d-a775-968e1f2db600.png) 一致性hash算法的应用场景:Consumer数量变化较频繁的场景。 ## 5、至少一次原则 RocketMQ有一个原则:每条消息必须要被成功消费一次。 那么什么是成功消费呢?Consumer在消费完消息后会向其消费进度记录器提交其消费消息的offset,offset被成功记录到记录器中,那么这条消费就被成功消费了。 > 什么是消费进度记录器? > > 对于广播消费模式来说,Consumer本身就是消费进度记录器。 > > 对于集群消费模式来说,Broker是消费进度记录器。 <a style="display:none" target="_blank" href="https://mp.weixin.qq.com/s/_S1DD2JADnXvpexxaBwLLg" style="color:red; font-size:20px; font-weight:bold">继续收门徒,亲手带,月薪 4W 以下的可以来找我</a> ## 最新资料 1. <a href="https://mp.weixin.qq.com/s?__biz=MzkzOTI3Nzc0Mg==&mid=2247484964&idx=2&sn=c81bce2f26015ee0f9632ddc6c67df03&scene=21#wechat_redirect" target="_blank">尚硅谷 Java 学科全套教程(总 207.77GB)</a> 2. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247484192&idx=1&sn=505f2faaa4cc911f553850667749bcbb&scene=21#wechat_redirect" target="_blank">2021 最新版 Java 微服务学习线路图 + 视频</a> 3. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247484573&idx=1&sn=7f3d83892186c16c57bc0b99f03f1ffd&scene=21#wechat_redirect" target="_blank">阿里技术大佬整理的《Spring 学习笔记.pdf》</a> 4. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247484544&idx=2&sn=c1dfe907cfaa5b9ae8e66fc247ccbe84&scene=21#wechat_redirect" target="_blank">阿里大佬的《MySQL 学习笔记高清.pdf》</a> 5. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247485167&idx=1&sn=48d75c8e93e748235a3547f34921dfb7&scene=21#wechat_redirect" target="_blank">2021 版 java 高并发常见面试题汇总.pdf</a> 6. <a href="https://mp.weixin.qq.com/s?__biz=MzkwOTAyMTY2NA==&mid=2247485664&idx=1&sn=435f9f515a8f881642820d7790ad20ce&scene=21#wechat_redirect" target="_blank">Idea 快捷键大全.pdf</a> ![](https://itsoku.oss-cn-hangzhou.aliyuncs.com/itsoku/blog/article/1/2883e86e-3eff-404a-8943-0066e5e2b454.png)
相关专辑:
RocketMQ专题