16 | 揭开神秘的“位移主题”面纱

你好,我是胡夕。今天我要和你分享的内容是:Kafka中神秘的内部主题(Internal Topic)__consumer_offsets。

__consumer_offsets在Kafka源码中有个更为正式的名字,叫位移主题,即Offsets Topic。为了方便今天的讨论,我将统一使用位移主题来指代__consumer_offsets。需要注意的是,它有两个下划线哦。

好了,我们开始今天的内容吧。首先,我们有必要探究一下位移主题被引入的背景及原因,即位移主题的前世今生。

在上一期中,我说过老版本Consumer的位移管理是依托于Apache ZooKeeper的,它会自动或手动地将位移数据提交到ZooKeeper中保存。当Consumer重启后,它能自动从ZooKeeper中读取位移数据,从而在上次消费截止的地方继续消费。这种设计使得Kafka Broker不需要保存位移数据,减少了Broker端需要持有的状态空间,因而有利于实现高伸缩性。

但是,ZooKeeper其实并不适用于这种高频的写操作,因此,Kafka社区自0.8.2.x版本开始,就在酝酿修改这种设计,并最终在新版本Consumer中正式推出了全新的位移管理机制,自然也包括这个新的位移主题。

新版本Consumer的位移管理机制其实也很简单,就是将Consumer的位移数据作为一条条普通的Kafka消息,提交到__consumer_offsets中。可以这么说,__consumer_offsets的主要作用是保存Kafka消费者的位移信息。它要求这个提交过程不仅要实现高持久性,还要支持高频的写操作。显然,Kafka的主题设计天然就满足这两个条件,因此,使用Kafka主题来保存位移这件事情,实际上就是一个水到渠成的想法了。

这里我想再次强调一下,和你创建的其他主题一样,位移主题就是普通的Kafka主题。你可以手动地创建它、修改它,甚至是删除它。只不过,它同时也是一个内部主题,大部分情况下,你其实并不需要“搭理”它,也不用花心思去管理它,把它丢给Kafka就完事了。

虽说位移主题是一个普通的Kafka主题,但它的消息格式却是Kafka自己定义的,用户不能修改,也就是说你不能随意地向这个主题写消息,因为一旦你写入的消息不满足Kafka规定的格式,那么Kafka内部无法成功解析,就会造成Broker的崩溃。事实上,Kafka Consumer有API帮你提交位移,也就是向位移主题写消息。你千万不要自己写个Producer随意向该主题发送消息。

你可能会好奇,这个主题存的到底是什么格式的消息呢?所谓的消息格式,你可以简单地理解为是一个KV对。Key和Value分别表示消息的键值和消息体,在Kafka中它们就是字节数组而已。想象一下,如果让你来设计这个主题,你觉得消息格式应该长什么样子呢?我先不说社区的设计方案,我们自己先来设计一下。

首先从Key说起。一个Kafka集群中的Consumer数量会有很多,既然这个主题保存的是Consumer的位移数据,那么消息格式中必须要有字段来标识这个位移数据是哪个Consumer的。这种数据放在哪个字段比较合适呢?显然放在Key中比较合适。

现在我们知道该主题消息的Key中应该保存标识Consumer的字段,那么,当前Kafka中什么字段能够标识Consumer呢?还记得之前我们说Consumer Group时提到的Group ID吗?没错,就是这个字段,它能够标识唯一的Consumer Group。

说到这里,我再多说几句。除了Consumer Group,Kafka还支持独立Consumer,也称Standalone Consumer。它的运行机制与Consumer Group完全不同,但是位移管理的机制却是相同的。因此,即使是Standalone Consumer,也有自己的Group ID来标识它自己,所以也适用于这套消息格式。

Okay,我们现在知道Key中保存了Group ID,但是只保存Group ID就可以了吗?别忘了,Consumer提交位移是在分区层面上进行的,即它提交的是某个或某些分区的位移,那么很显然,Key中还应该保存Consumer要提交位移的分区。

好了,我们来总结一下我们的结论。位移主题的Key中应该保存3部分内容:<Group ID,主题名,分区号>。如果你认同这样的结论,那么恭喜你,社区就是这么设计的!

接下来,我们再来看看消息体的设计。也许你会觉得消息体应该很简单,保存一个位移值就可以了。实际上,社区的方案要复杂得多,比如消息体还保存了位移提交的一些其他元数据,诸如时间戳和用户自定义的数据等。保存这些元数据是为了帮助Kafka执行各种各样后续的操作,比如删除过期位移消息等。但总体来说,我们还是可以简单地认为消息体就是保存了位移值。

当然了,位移主题的消息格式可不是只有这一种。事实上,它有3种消息格式。除了刚刚我们说的这种格式,还有2种格式:

  1. 用于保存Consumer Group信息的消息。
  2. 用于删除Group过期位移甚至是删除Group的消息。

第1种格式非常神秘,以至于你几乎无法在搜索引擎中搜到它的身影。不过,你只需要记住它是用来注册Consumer Group的就可以了。

第2种格式相对更加有名一些。它有个专属的名字:tombstone消息,即墓碑消息,也称delete mark。下次你在Google或百度中见到这些词,不用感到惊讶,它们指的是一个东西。这些消息只出现在源码中而不暴露给你。它的主要特点是它的消息体是null,即空消息体。

那么,何时会写入这类消息呢?一旦某个Consumer Group下的所有Consumer实例都停止了,而且它们的位移数据都已被删除时,Kafka会向位移主题的对应分区写入tombstone消息,表明要彻底删除这个Group的信息。

好了,消息格式就说这么多,下面我们来说说位移主题是怎么被创建的。通常来说,当Kafka集群中的第一个Consumer程序启动时,Kafka会自动创建位移主题。我们说过,位移主题就是普通的Kafka主题,那么它自然也有对应的分区数。但如果是Kafka自动创建的,分区数是怎么设置的呢?这就要看Broker端参数offsets.topic.num.partitions的取值了。它的默认值是50,因此Kafka会自动创建一个50分区的位移主题。如果你曾经惊讶于Kafka日志路径下冒出很多__consumer_offsets-xxx这样的目录,那么现在应该明白了吧,这就是Kafka自动帮你创建的位移主题啊。

你可能会问,除了分区数,副本数或备份因子是怎么控制的呢?答案也很简单,这就是Broker端另一个参数offsets.topic.replication.factor要做的事情了。它的默认值是3。

总结一下,如果位移主题是Kafka自动创建的,那么该主题的分区数是50,副本数是3

当然,你也可以选择手动创建位移主题,具体方法就是,在Kafka集群尚未启动任何Consumer之前,使用Kafka API创建它。手动创建的好处在于,你可以创建满足你实际场景需要的位移主题。比如很多人说50个分区对我来讲太多了,我不想要这么多分区,那么你可以自己创建它,不用理会offsets.topic.num.partitions的值。

不过我给你的建议是,还是让Kafka自动创建比较好。目前Kafka源码中有一些地方硬编码了50分区数,因此如果你自行创建了一个不同于默认分区数的位移主题,可能会碰到各种各样奇怪的问题。这是社区的一个Bug,目前代码已经修复了,但依然在审核中。

创建位移主题当然是为了用的,那么什么地方会用到位移主题呢?我们前面一直在说Kafka Consumer提交位移时会写入该主题,那Consumer是怎么提交位移的呢?目前Kafka Consumer提交位移的方式有两种:自动提交位移和手动提交位移。

Consumer端有个参数叫enable.auto.commit,如果值是true,则Consumer在后台默默地为你定期提交位移,提交间隔由一个专属的参数auto.commit.interval.ms来控制。自动提交位移有一个显著的优点,就是省事,你不用操心位移提交的事情,就能保证消息消费不会丢失。但这一点同时也是缺点。因为它太省事了,以至于丧失了很大的灵活性和可控性,你完全没法把控Consumer端的位移管理。

事实上,很多与Kafka集成的大数据框架都是禁用自动提交位移的,如Spark、Flink等。这就引出了另一种位移提交方式:手动提交位移,即设置enable.auto.commit = false。一旦设置了false,作为Consumer应用开发的你就要承担起位移提交的责任。Kafka Consumer API为你提供了位移提交的方法,如consumer.commitSync等。当调用这些方法时,Kafka会向位移主题写入相应的消息。

如果你选择的是自动提交位移,那么就可能存在一个问题:只要Consumer一直启动着,它就会无限期地向位移主题写入消息。

我们来举个极端一点的例子。假设Consumer当前消费到了某个主题的最新一条消息,位移是100,之后该主题没有任何新消息产生,故Consumer无消息可消费了,所以位移永远保持在100。由于是自动提交位移,位移主题中会不停地写入位移=100的消息。显然Kafka只需要保留这类消息中的最新一条就可以了,之前的消息都是可以删除的。这就要求Kafka必须要有针对位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘。

Kafka是怎么删除位移主题中的过期消息的呢?答案就是Compaction。国内很多文献都将其翻译成压缩,我个人是有一点保留意见的。在英语中,压缩的专有术语是Compression,它的原理和Compaction很不相同,我更倾向于翻译成压实,或干脆采用JVM垃圾回收中的术语:整理。

不管怎么翻译,Kafka使用Compact策略来删除位移主题中的过期消息,避免该主题无限期膨胀。那么应该如何定义Compact策略中的过期呢?对于同一个Key的两条消息M1和M2,如果M1的发送时间早于M2,那么M1就是过期消息。Compact的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。我在这里贴一张来自官网的图片,来说明Compact过程。

图中位移为0、2和3的消息的Key都是K1。Compact之后,分区只需要保存位移为3的消息,因为它是最新发送的。

Kafka提供了专门的后台线程定期地巡检待Compact的主题,看看是否存在满足条件的可删除数据。这个后台线程叫Log Cleaner。很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查一下Log Cleaner线程的状态,通常都是这个线程挂掉了导致的。

小结

总结一下,今天我跟你分享了Kafka神秘的位移主题__consumer_offsets,包括引入它的契机与原因、它的作用、消息格式、写入的时机以及管理策略等,这对我们了解Kafka特别是Kafka Consumer的位移管理是大有帮助的。实际上,将很多元数据以消息的方式存入Kafka内部主题的做法越来越流行。除了Consumer位移管理,Kafka事务也是利用了这个方法,当然那是另外的一个内部主题了。

社区的想法很简单:既然Kafka天然实现了高持久性和高吞吐量,那么任何有这两个需求的子服务自然也就不必求助于外部系统,用Kafka自己实现就好了。

开放讨论

今天我们说了位移主题的很多好处,请思考一下,与ZooKeeper方案相比,它可能的劣势是什么?

欢迎写下你的思考和答案,我们一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。

精选留言

  • 注定非凡

    2019-11-03 19:54:46

    1,诞生背景
    A :老版本的Kafka会把位移信息保存在Zk中,当Consumer重启后,自动从Zk中读取位移信息。这种设计使Kafka Broker不需要保存位移数据,可减少Broker端需要持有的状态空间,有利于实现高伸缩性。
    B :但zk不适用于高频的写操作,这令zk集群性能严重下降,在新版本中将消费者的位移数据作为一条条普通的Kafka消息,提交至内部主题(_consumer_offsets)中保存。实现高持久性和高频写操作。

    2,特点:
    A :位移主题是一个普通主题,同样可以被手动创建,修改,删除。。
    B :位移主题的消息格式是kafka定义的,不可以被手动修改,若修改格式不正确,kafka将会崩溃。
    C :位移主题保存了三部分内容:Group ID,主题名,分区号。

    3,创建:
    A :当Kafka集群中的第一个Consumer程序启动时,Kafka会自动创建位移主题。也可以手动创建
    B :分区数依赖于Broker端的offsets.topic.num.partitions的取值,默认为50
    C :副本数依赖于Broker端的offsets.topic.replication.factor的取值,默认为3

    4,使用:
    A :当Kafka提交位移消息时会使用这个主题
    B :位移提交得分方式有两种:手动和自动提交位移。
    C :推荐使用手动提交位移,自动提交位移会存在问题:只有consumer一直启动设置,他就会无限期地向主题写入消息。

    5,清理:
    A :Kafka使用Compact策略来删除位移主题中的过期消息,避免位移主题无限膨胀。
    B :kafka提供专门的后台线程定期巡检待compcat的主题,查看是否存在满足条件的可删除数据。

    6,注意事项:
    A :建议不要修改默认分区数,在kafka中有些许功能写死的是50个分区
    B :建议不要使用自动提交模式,采用手动提交,避免消费者无限制的写入消息。
    C :后台定期巡检线程叫Log Cleaner,若线上遇到位移主题无限膨胀占用过多磁盘,应该检查此线程的工作状态。
  • 此方彼方Francis

    2020-02-14 14:31:04

    之前遇到过的一个问题跟大家分享一下,原因描述不正确的地方还请大佬指正:
    log cleaner线程挂掉还有可能导致消费端出现:Marking Coordinator Dead!

    原因大概如下:
    log cleaner线程挂掉之后会导致磁盘上位移主题的文件越来越多(当然,大部分是过期数据,只是依旧存在),broker内存中会维护offsetMap,从名字上看这个map就是维护消费进度的,而这个map和位移主题的文件有关联,文件越来越多会导致offsetMap越来越大,甚至导致offsetMap构建失败(为什么会失败没有搞明白),offsetMap构建失败之后broker不会承认自己是coordinator。
    消费者组找coordinator的逻辑很简单:abs(consumer_groupName.hashCode) % __consumer_offset.partition.num对应的partition所在的broker就是这个group的coordinate,一旦这个broker的offsetMap构建失败,那么这个broker就不承认自己是这个group的coordinate,这个group的消费就无法继续进行,会出现Marking Coordinator Dead错误。
    此时需要删除过期的位移主题的文件(根据文件名很容易确定哪个是最新的),重启broker。重启过程中需要关注log cleaner是否会再次挂掉。

    PS:上述问题在broker重启和正常运行中都有可能遇到。
    作者回复

    是个很好的梳理思路~

    2020-02-15 09:09:15

  • Eco

    2020-01-15 22:47:38

    有个问题想请教一下,这个位移主题,Consumer是像消费其他主题的分区的内容一样去获取数据的话,那么这本身不也得有个位移,那这个位移又保存到哪里的呢?这样下去不就陷入了一个死循环了吗?要么就不是像正常的消费消息那样去从位移主题获取当前消费者对于某个主题的分区的位移?
    作者回复

    好问题!其实Kafka并不太关注__consumer_offsets消费的情况,不过Coordinator的确会在JVM中把所有分区当前已提交的最新位移缓存起来,并且通过这个缓存来决定哪个consumer当前消费到了哪个位移。

    2020-01-16 08:40:13

  • mellow

    2019-07-09 11:56:43

    老师能讲一下,同一个group下的consumer启动之后是怎么去offset topic 拿到该group上次消费topic每个partition的最新offset呢?是根据key来定位offset topic的partition吗,然后拿到所有消息得到最新的offset吗
    作者回复

    它会去寻找其Coordinator Leader副本对应的broker去拿。根据group.id找到对应Coordinator的分区数

    2019-07-10 09:26:15

  • 蛋炒番茄

    2019-07-09 10:20:41

    “自动提交位移有一个显著的优点,就是省事,你不用操心位移提交的事情,就能保证消息消费不会丢失”,对于这一点我表示疑问啊❓我记得你在之前的章节里面讲过自动提交不仅会增加消息可重复消费的可能,也可能导致部分消息丢失。比如说虽然消息拉取下来但是还没消费完就已经提交,此时服务挂了这样情况。
  • 王藝明

    2019-10-14 11:37:18

    老师好!
    为什么位移主题写入消息时,不直接替换掉原来的数据,像 HashMap 一样呢?而是要堆积起来,另起线程来维护位移主题
    作者回复

    位移主题也是主题,也要遵循Kafka底层的日志设计思路,即append-only log

    2019-10-15 10:03:33

  • sharpdeep

    2020-03-26 09:49:19

    有个困惑: 位移主题用来记住位移,那么这个位移主题的位移由谁来记住呢?
    作者回复

    位移主题的位移由Kafka内部的Coordinator自行管理

    2020-03-27 08:54:55

  • 🤡

    2019-08-12 10:00:08

    对GroupId 还有疑惑,假设一个Group下有 3 个Consumer , 那这三个Consumer 对应的groupid 应该是一样的。这样的话怎么做key做唯一区分呢
    作者回复

    每个client都有自己的member id和client id用于区分彼此

    2019-08-12 10:39:43

  • HZ

    2020-02-24 16:45:44

    老师 有两点不太清楚 1. 位移主题里面,对于同一个consumer group的位移提交记录,是分布在50个partitions中吗? 2. 如果把位移主题当作kafka里面的一个普通主题,那么写入这个主题的数据可以保证不丢失吗? 写入是ack=all? 同时,broker端的min.insync.replicas的设置有影响吗?
    作者回复

    1. 同一个group的位移记录只保存在一个partition上
    2. 没错,写入就是acks=all的设置
    3. min.insync.replicas对位移主题依然适用

    2020-02-25 09:14:52

  • 帆船出航

    2020-01-04 21:56:17

    胡老师,消费者提交的位移消息,保存到位移主题分区是随机的吗?就是某一个消费者的提交第一个位移数据保存在位移主题的A分区里面,第二个位移数据保存在位移主题的B分区里面
    还有消费者是怎样获取已消费的位移数据
    作者回复

    不是随机的。通常来说,同一个group下的所有消费者提交的位移数据保存在位移主题的同一个分区下

    2020-01-06 11:05:17

  • Geek_cd6rkj

    2019-07-15 09:41:12

    consumer端 日常业务发版呢,那每次发版需要重启consumer不是也会导致Rebalance,这个如何规避
    作者回复

    可以考虑使用standalone consumer,否则group机制无法避免

    2019-07-15 10:00:28

  • 张申傲

    2020-06-29 19:56:57

    说到Kafka的Compaction,就想到了Redis的AOF Rewrite,都是类似的机制
  • Coder4

    2019-07-11 21:27:08

    老师好,前几年一直有个说法,说kafka不适合创建过多topic,请问现在的新版还有这个问题么?
    作者回复

    topic过多其实是指分区数过多。会有两个可能的问题:1. controller无法管理这么多分区;2. 分区数过多导致broker物理随机IO增加,减少吞吐量。

    第一个问题社区算是修复了吧,目前单controller能够支持20w的分区数,况且社区也在考虑做多controller方案;第二个问题目前没有太多直接的修复举措,只能说具体问题具体分析吧

    2019-07-12 08:38:13

  • gogogo

    2019-07-09 01:12:48

    请问offset是以最新的为准,还是值最大的为准?
    作者回复

    最新的

    2019-07-09 09:17:02

  • 永光

    2019-07-11 11:37:49

    位移主题,适用于高频写的操作,为什么ZooKeeper不适用于这种高频的写操作?zookeeper 也可以按照<Group ID,主题名,分区号 > 来写入呀?
    作者回复

    ZooKeeper本身只是一个分布式协调框架,znode中保存的数据多是那些不怎么频繁修改的元数据,本身不适合频繁更新。

    是的,旧版本consumer就是这么使用ZooKeeper来保存位移的

    2019-07-12 08:51:53

  • 南辕北辙

    2019-07-10 10:20:25

    老师,请教一下consumer 是如何从这个位移主题中拿到曾经属于自己组的offset呢
    作者回复

    首先找到对应的Coordinator,Coordinator保存了这些数据,然后consumer向Coordinator发送请求去请求这些数据

    2019-07-10 11:38:27

  • 燃烧的M豆

    2019-07-09 14:15:21

    老师如果超大规模集群超大规模消费者对这一个 50 个 partitions 的 topic 进行消费是不是会引起性能问题?topic 下的 partitions 设置有上线吗 面对超大规模并发除了提升 partitions 数量还有什么办法?谢谢
    作者回复

    partition数量在kafka中没有上限,只受os限制。除了提升partition,增加broker挂载的磁盘数也是一个方法

    2019-07-10 09:23:38

  • LJK

    2019-09-23 07:35:05

    老师好,请教一个小白问题,一个位移主题是在第一个consumer启动时建立的,是说对于一个kafka集群只有一个位移主题么?另外我对kafka框架还是有点迷惑,kafka集群是不是没有NameNode这个概念啊?每一个leader partition就相当于一个NameNode?谢谢老师
    作者回复

    每个Kafka集群都只有一个位移主题。Kafka没有NameNode的概念。如果硬搬Hadoop中的概念,我倒倾向于认为分区的leader副本是datanode,而namenode的作用在Kafka中由ZooKeeper承接

    2019-09-23 09:53:06

  • 小鱼

    2019-07-31 15:54:10

    老师,你好,请问控制Kafka 使用Compact 策略来删除位移主题中的过期消息的参数是哪个?

    作者回复

    offsets.retention.minutes

    2019-08-01 10:48:52

  • WL

    2019-07-09 19:39:20

    请教老师三个问题:
    1. 在consumer group中的一个consumer消费一条消息后,是往它拉取消息的那个broker写一条offset消息还是往所有它连接的broker都广播一条消息。
    2. 一个broker中的位移主题保存的是他自己上面的主题和分区的位移还是整个集群的所有主题所有分区的位移都有保存。
    3. 位移主题的50个分区分配在各个broker的方式是啥,轮询,hash,还是随机?
    作者回复

    1. 你是指提交位移吗?如果是,它是向对应的Coordinator所在的broker发送一条位移写请求
    2. 都有保存
    3. 你基本上可以认为是轮训的

    2019-07-10 09:16:53