你好,我是胡夕。今天我要和你分享的内容是:消费者组重平衡能避免吗?
其实在专栏第15期中,我们讲过重平衡,也就是Rebalance,现在先来回顾一下这个概念的原理和用途。Rebalance就是让一个Consumer Group下所有的Consumer实例就如何消费订阅主题的所有分区达成共识的过程。在Rebalance过程中,所有Consumer实例共同参与,在协调者组件的帮助下,完成订阅主题分区的分配。但是,在整个过程中,所有实例都不能消费任何消息,因此它对Consumer的TPS影响很大。
你可能会对这里提到的“协调者”有些陌生,我来简单介绍下。所谓协调者,在Kafka中对应的术语是Coordinator,它专门为Consumer Group服务,负责为Group执行Rebalance以及提供位移管理和组成员管理等。
具体来讲,Consumer端应用程序在提交位移时,其实是向Coordinator所在的Broker提交位移。同样地,当Consumer应用启动时,也是向Coordinator所在的Broker发送各种请求,然后由Coordinator负责执行消费者组的注册、成员管理记录等元数据管理操作。
所有Broker在启动时,都会创建和开启相应的Coordinator组件。也就是说,所有Broker都有各自的Coordinator组件。那么,Consumer Group如何确定为它服务的Coordinator在哪台Broker上呢?答案就在我们之前说过的Kafka内部位移主题__consumer_offsets身上。
目前,Kafka为某个Consumer Group确定Coordinator所在的Broker的算法有2个步骤。
第1步:确定由位移主题的哪个分区来保存该Group数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
第2步:找出该分区Leader副本所在的Broker,该Broker即为对应的Coordinator。
简单解释一下上面的算法。首先,Kafka会计算该Group的group.id参数的哈希值。比如你有个Group的group.id设置成了“test-group”,那么它的hashCode值就应该是627841412。其次,Kafka会计算__consumer_offsets的分区数,通常是50个分区,之后将刚才那个哈希值对分区数进行取模加求绝对值计算,即abs(627841412 % 50) = 12。此时,我们就知道了位移主题的分区12负责保存这个Group的数据。有了分区号,算法的第2步就变得很简单了,我们只需要找出位移主题分区12的Leader副本在哪个Broker上就可以了。这个Broker,就是我们要找的Coordinator。
在实际使用过程中,Consumer应用程序,特别是Java Consumer API,能够自动发现并连接正确的Coordinator,我们不用操心这个问题。知晓这个算法的最大意义在于,它能够帮助我们解决定位问题。当Consumer Group出现问题,需要快速排查Broker端日志时,我们能够根据这个算法准确定位Coordinator对应的Broker,不必一台Broker一台Broker地盲查。
好了,我们说回Rebalance。既然我们今天要讨论的是如何避免Rebalance,那就说明Rebalance这个东西不好,或者说至少有一些弊端需要我们去规避。那么,Rebalance的弊端是什么呢?总结起来有以下3点:
-
Rebalance影响Consumer端TPS。这个之前也反复提到了,这里就不再具体讲了。总之就是,在Rebalance期间,Consumer会停下手头的事情,什么也干不了。
-
Rebalance很慢。如果你的Group下成员很多,就一定会有这样的痛点。还记得我曾经举过的那个国外用户的例子吧?他的Group下有几百个Consumer实例,Rebalance一次要几个小时。在那种场景下,Consumer Group的Rebalance已经完全失控了。
-
Rebalance效率不高。当前Kafka的设计机制决定了每次Rebalance时,Group下的所有成员都要参与进来,而且通常不会考虑局部性原理,但局部性原理对提升系统性能是特别重要的。
关于第3点,我们来举个简单的例子。比如一个Group下有10个成员,每个成员平均消费5个分区。假设现在有一个成员退出了,此时就需要开启新一轮的Rebalance,把这个成员之前负责的5个分区“转移”给其他成员。显然,比较好的做法是维持当前9个成员消费分区的方案不变,然后将5个分区随机分配给这9个成员,这样能最大限度地减少Rebalance对剩余Consumer成员的冲击。
遗憾的是,目前Kafka并不是这样设计的。在默认情况下,每次Rebalance时,之前的分配方案都不会被保留。就拿刚刚这个例子来说,当Rebalance开始时,Group会打散这50个分区(10个成员 * 5个分区),由当前存活的9个成员重新分配它们。显然这不是效率很高的做法。基于这个原因,社区于0.11.0.0版本推出了StickyAssignor,即有粘性的分区分配策略。所谓的有粘性,是指每次Rebalance时,该策略会尽可能地保留之前的分配方案,尽量实现分区分配的最小变动。不过有些遗憾的是,这个策略目前还有一些bug,而且需要升级到0.11.0.0才能使用,因此在实际生产环境中用得还不是很多。
总而言之,Rebalance有以上这三个方面的弊端。你可能会问,这些问题有解吗?特别是针对Rebalance慢和影响TPS这两个弊端,社区有解决办法吗?针对这两点,我可以很负责任地告诉你:“无解!”特别是Rebalance慢这个问题,Kafka社区对此无能为力。“本事大不如不摊上”,既然我们没办法解决Rebalance过程中的各种问题,干脆就避免Rebalance吧,特别是那些不必要的Rebalance。
就我个人经验而言,在真实的业务场景中,很多Rebalance都是计划外的或者说是不必要的。我们应用的TPS大多是被这类Rebalance拖慢的,因此避免这类Rebalance就显得很有必要了。下面我们就来说说如何避免Rebalance。
要避免Rebalance,还是要从Rebalance发生的时机入手。我们在前面说过,Rebalance发生的时机有三个:
- 组成员数量发生变化
- 订阅主题数量发生变化
- 订阅主题的分区数发生变化
后面两个通常都是运维的主动操作,所以它们引发的Rebalance大都是不可避免的。接下来,我们主要说说因为组成员数量变化而引发的Rebalance该如何避免。
如果Consumer Group下的Consumer实例数量发生变化,就一定会引发Rebalance。这是Rebalance发生的最常见的原因。我碰到的99%的Rebalance,都是这个原因导致的。
Consumer实例增加的情况很好理解,当我们启动一个配置有相同group.id值的Consumer程序时,实际上就向这个Group添加了一个新的Consumer实例。此时,Coordinator会接纳这个新实例,将其加入到组中,并重新分配分区。通常来说,增加Consumer实例的操作都是计划内的,可能是出于增加TPS或提高伸缩性的需要。总之,它不属于我们要规避的那类“不必要Rebalance”。
我们更在意的是Group下实例数减少这件事。如果你就是要停掉某些Consumer实例,那自不必说,关键是在某些情况下,Consumer实例会被Coordinator错误地认为“已停止”从而被“踢出”Group。如果是这个原因导致的Rebalance,我们就不能不管了。
Coordinator会在什么情况下认为某个Consumer实例已挂从而要退组呢?这个绝对是需要好好讨论的话题,我们来详细说说。
当Consumer Group完成Rebalance之后,每个Consumer实例都会定期地向Coordinator发送心跳请求,表明它还存活着。如果某个Consumer实例不能及时地发送这些心跳请求,Coordinator就会认为该Consumer已经“死”了,从而将其从Group中移除,然后开启新一轮Rebalance。Consumer端有个参数,叫session.timeout.ms,就是被用来表征此事的。该参数的默认值是10秒,即如果Coordinator在10秒之内没有收到Group下某Consumer实例的心跳,它就会认为这个Consumer实例已经挂了。可以这么说,session.timeout.ms决定了Consumer存活性的时间间隔。
除了这个参数,Consumer还提供了一个允许你控制发送心跳请求频率的参数,就是heartbeat.interval.ms。这个值设置得越小,Consumer实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启Rebalance,因为,目前Coordinator通知各个Consumer实例开启Rebalance的方法,就是将REBALANCE_NEEDED标志封装进心跳请求的响应体中。
除了以上两个参数,Consumer端还有一个参数,用于控制Consumer实际消费能力对Rebalance的影响,即max.poll.interval.ms参数。它限定了Consumer端应用程序两次调用poll方法的最大时间间隔。它的默认值是5分钟,表示你的Consumer程序如果在5分钟之内无法消费完poll方法返回的消息,那么Consumer会主动发起“离开组”的请求,Coordinator也会开启新一轮Rebalance。
搞清楚了这些参数的含义,接下来我们来明确一下到底哪些Rebalance是“不必要的”。
第一类非必要Rebalance是因为未能及时发送心跳,导致Consumer被“踢出”Group而引发的。因此,你需要仔细地设置session.timeout.ms和heartbeat.interval.ms的值。我在这里给出一些推荐数值,你可以“无脑”地应用在你的生产环境中。
- 设置session.timeout.ms = 6s。
- 设置heartbeat.interval.ms = 2s。
- 要保证Consumer实例在被判定为“dead”之前,能够发送至少3轮的心跳请求,即session.timeout.ms >= 3 * heartbeat.interval.ms。
将session.timeout.ms设置成6s主要是为了让Coordinator能够更快地定位已经挂掉的Consumer。毕竟,我们还是希望能尽快揪出那些“尸位素餐”的Consumer,早日把它们踢出Group。希望这份配置能够较好地帮助你规避第一类“不必要”的Rebalance。
第二类非必要Rebalance是Consumer消费时间过长导致的。我之前有一个客户,在他们的场景中,Consumer消费数据时需要将消息处理之后写入到MongoDB。显然,这是一个很重的消费逻辑。MongoDB的一丁点不稳定都会导致Consumer程序消费时长的增加。此时,max.poll.interval.ms参数值的设置显得尤为关键。如果要避免非预期的Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。就拿MongoDB这个例子来说,如果写MongoDB的最长时间是7分钟,那么你可以将该参数设置为8分钟左右。
总之,你要为你的业务处理逻辑留下充足的时间。这样,Consumer就不会因为处理这些消息的时间太长而引发Rebalance了。
如果你按照上面的推荐数值恰当地设置了这几个参数,却发现还是出现了Rebalance,那么我建议你去排查一下Consumer端的GC表现,比如是否出现了频繁的Full GC导致的长时间停顿,从而引发了Rebalance。为什么特意说GC?那是因为在实际场景中,我见过太多因为GC设置不合理导致程序频发Full GC而引发的非预期Rebalance了。
小结
总而言之,我们一定要避免因为各种参数或逻辑不合理而导致的组成员意外离组或退出的情形,与之相关的主要参数有:
- session.timeout.ms
- heartbeat.interval.ms
- max.poll.interval.ms
- GC参数
按照我们今天所说的内容,恰当地设置这些参数,你一定能够大幅度地降低生产环境中的Rebalance数量,从而整体提升Consumer端TPS。

开放讨论
说说在你的业务场景中,Rebalance发生的频率、原因,以及你是怎么应对的,我们一起讨论下是否有更好的解决方案。
欢迎写下你的思考和答案,我们一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。
精选留言
2019-07-11 10:13:58
这里想问的是,如果我有一个长耗时的业务逻辑需要处理,并且offset还未提交,这时候系统发生了Rebalance的话,是等待所有消费端当前消息都处理完成,再进行停止消费,并进行重新分配分区,还是说强制停止消费。
如果强制停止消费的话,那么那些已经处理完成一半的数据并offset未提交的数据,势必会导致Rebalance后重新进行消费,导致数据产生重复消费。
2019-07-11 09:34:27
2019-07-12 14:04:34
2020-02-03 22:03:10
请问如果使用 Standalone Consumer,是不是也不会发生 rebalance 了?
感觉专栏里对 Standalone Consumer 就是提了两句,没有太多的介绍,相较于订阅模式它们有什么特点嘛?
2019-07-11 09:34:58
2019-07-11 09:59:56
2019-07-11 12:20:25
2019-11-04 10:02:59
A :让一个Consumer Group下所有的consumer实例就如何消费订阅主题的所有分区达成共识的过程。
B :在重平衡过程中,所有Consumer实例共同参与,在协调者组件的帮助下,完成订阅分区的分配。
C :整个过程中,所有实例都不能消费任何消息,因此对Consumer的TPS影响很大
2 为什要避免重平衡
A :Rebalance影响Consumer端的TPS,因为重平衡过程中消费者不能消费消息
B :Rebalance很慢,如果有数百个消费者实例,整个过程耗时可能达到几个小时
C :Rebalance效率低,这个过程是全员参与,通常不考虑局部性原理,但局部性原理对系统性能提升特别重要。
D :真实的业务场景中,很多Rebalance都是计划外或是不必要的。
3 何时会触发重平衡
A :组成员数量发生变化
B :订阅主题数量发生变化
C :订阅主题分区数发生变化。
4, 要避免哪些重平衡
最常见的是消费者数发生变化触发的重平衡,其他的重平衡是不可避免的,但消费者数量变化是可避免的
A :Consumer实例增加
当启动一个配置相同的group.id值的consumer程序时,就是向这个组中增加一个消费者实例,这中秋情况一般是我们为了提升消费者端的TPS,是计划内的,所以也不用避免。
B :Consumer实例减少
(1)按计划的减少消费者实例,同样不用避免
(2)计划外的减少触发的重平衡才是我们要关注的。
5 如何避免重平衡
在某些情况下,Consumer实例会被Coordinateor错误地认为“已停止”,进而被踢出Group。这种情况导致的重平衡是需要避免的。
A :Consumer实例不能及时的发送心跳请求
当消费者组完成重平衡后,每个Consumer实例都会定期地向Coordinator发送心跳请求,如这个心跳请求没有被及时发送,Coordinator就会认为该Consumer已经掉线,将其从组中移除,并开启新一轮重平衡。
解决:Consumer端设置:
》Session.timeout.ms:默认为10秒,表示10秒内Coordinator没有收到Group下某个Consumer实例的心跳,就认为实例下线。这个可以适当的增大
》heartbeat.interval.ms:控制发送心跳请求的频率,频繁的发送心跳请求会额外消耗带库资源。
》max.poll.interval.ms:限定Consumer端应用程序两次调用poll方法的最大时间间隔。默认值是5分钟,表示如果Consumer程序在5分钟之内无法消费完poll方法返回的消息,那么consumer会主动的发起“离开组”的请求,
建议:session.timeout.ms=6s
Heartbeat.interval.ms=2s
保证Consumer实例在判定为“dead”之前,能够发送至少3轮的心跳请求,即session.timeout.ms >=3 * heartbeat.interval.ms。
B :Consumer消费时间过长
消费者端处理了一个很重的消费逻辑,耗时较长,导致Consumer端应用程序两次调用poll方法的时间超出设置的最大时间间隔。
解决:
1,将max.poll.interval.ms参数设置较大一些
2,优化消费者端业务逻辑,压缩消费耗时
C :GC影响
Consumer端的GC表现也会导致频繁的重平衡,频繁的Ful GC会导致长时间的断顿。
解决:
JVM调优。
2020-04-20 05:51:05
2019-07-11 20:52:16
2019-07-11 17:23:24
2021-01-14 11:05:15
2. 增加消费者端允许下游系统消费一批消息的最大时长:当消费者组完成重平衡之后,每个消费者实例都会定期地向协调者发送心跳请求,表明它还存活着。如果某个消费者实例不能及时地发送这些心跳请求,协调者就会认为该消费者已经“死”了,从而将其从组中移除,然后开启新一轮重平衡。消费者端有个参数,叫 session.timeout.ms,就是被用来表征此事的。该参数的默认值是 10 秒,即如果协调者在 10 秒之内没有收到组内某个消费者实例的心跳,它就会认为这个消费者实例已经挂了。可以这么说,session.timeout.ms 决定了消费者存活性的时间间隔
3. 控制发送心跳请求频率:消费者还提供了一个允许你控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,消费者实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启重平衡
4. 减少下游系统一次性消费的消息总数:这取决于消费者端参数 max.poll.records 的值。当前该参数的默认值是 500 条,表明调用一次 KafkaConsumer.poll 方法,最多返回 500 条消息。可以说,该参数规定了单次 poll 方法能够返回的消息总数的上限。如果前两种方法对你都不适用的话,降低此参数值是避免 CommitFailedException 异常最简单的手段
5. 调整两次调用 poll 方法的最大时间间隔:消费者端还有一个参数,用于控制消费者实际消费能力对重平衡的影响,即 max.poll.interval.ms 参数。它限定了消费者端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的消费者程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么消费者会主动发起“离开组”的请求,协调者也会开启新一轮重平衡
6. 下游系统使用多线程来加速消费:具体的思路就是,让下游系统手动创建多个消费线程处理 poll 方法返回的一批消息。之前你使用 Kafka 消费者消费数据更多是单线程的,所以当消费速度无法匹及 Kafka 消费者消息返回的速度时,它就会抛出 CommitFailedException 异常。如果是多线程,你就可以灵活地控制线程数量,随时调整消费承载能力,再配以目前多核的硬件条件,该方法可谓是防止 CommitFailedException 最高档的解决之道。事实上,很多主流的大数据流处理框架使用的都是这个方法,比如 Apache Flink 在集成 Kafka 时,就是创建了多个 KafkaConsumerThread 线程,自行处理多线程间的数据消费。不过,凡事有利就有弊,这个方法实现起来并不容易,特别是在多个线程间如何处理位移提交这个问题上,更是极容易出错。
2019-09-06 11:37:34
2020-03-26 08:18:36
2020-03-30 21:45:08
2019-07-12 22:07:08
2019-09-16 14:41:49
2019-08-16 07:46:23
Rebalance 就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程。在 Rebalance 过程中,所有 Consumer 实例共同参与,在协调者组件的帮助下,完成订阅主题分区的分配。但是,在整个过程中,所有实例都不能消费任何消息,因此它对 Consumer 的 TPS 影响很大。
2:rebalance有啥弊端?
2-1:Rebalance 影响 Consumer 端 TPS。这个之前也反复提到了,这里就不再具体讲了。总之就是,在 Rebalance 期间,Consumer 会停下手头的事情,什么也干不了。
2-2:Rebalance 很慢。如果你的 Group 下成员很多,就一定会有这样的痛点。还记得我曾经举过的那个国外用户的例子吧?他的 Group 下有几百个 Consumer 实例,Rebalance 一次要几个小时。在那种场景下,Consumer Group 的 Rebalance 已经完全失控了。
2-3:Rebalance 效率不高。当前 Kafka 的设计机制决定了每次 Rebalance 时,Group 下的所有成员都要参与进来,而且通常不会考虑局部性原理,但局部性原理对提升系统性能是特别重要的。
3:rebalance啥时候发生?
3-1:组成员数量发生变化
3-2:订阅主题数量发生变化
3-3:订阅主题的分区数发生变化
4:rebalance的算法是啥?
4-1:全员参与的分区分配策略——目前的算法,也是rebalance慢的根源
4-2:粘性的分区分配策略——尽量不动没有问题的分区,重新分配有问题的分区
5:rebalance能否避免?
不能完全避免
只能最大限度的设置更为合理的参数,来避免非必要的rebalance,比如这些参数
5-1:session.timeout.ms
5-2:heartbeat.interval.ms
5-3:max.poll.interval.ms
5-4:GC参数
疑问?
rebalance的算法为啥最早是全员参与的方式?kafka起源于大数据,估计分区数比较多的情况应该早已经猜到。
另外,粘性的分区分配策略具体是怎么实现的,听起来不难,但是写kafka的人都实现的不佳,想必不是那么容易的,老师觉得实现的痛点在哪里?
2019-12-15 10:07:11
2019-07-12 20:53:51