08 | 最最最重要的集群参数配置(下)

今天我们继续来聊那些重要的Kafka集群配置,下半部分主要是Topic级别参数、JVM参数以及操作系统参数的设置。

在上一期中,我们讨论了Broker端参数设置的一些法则,但其实Kafka也支持为不同的Topic设置不同的参数值。当前最新的2.2版本总共提供了大约25个Topic级别的参数,当然我们也不必全部了解它们的作用,这里我挑出了一些最关键的参数,你一定要把它们掌握清楚。除了Topic级别的参数,我今天还会给出一些重要的JVM参数和操作系统参数,正确设置这些参数是搭建高性能Kafka集群的关键因素。

Topic级别参数

说起Topic级别的参数,你可能会有这样的疑问:如果同时设置了Topic级别参数和全局Broker参数,到底听谁的呢?哪个说了算呢?答案就是Topic级别参数会覆盖全局Broker参数的值,而每个Topic都能设置自己的参数值,这就是所谓的Topic级别参数。

举个例子说明一下,上一期我提到了消息数据的留存时间参数,在实际生产环境中,如果为所有Topic的数据都保存相当长的时间,这样做既不高效也无必要。更适当的做法是允许不同部门的Topic根据自身业务需要,设置自己的留存时间。如果只能设置全局Broker参数,那么势必要提取所有业务留存时间的最大值作为全局参数值,此时设置Topic级别参数把它覆盖,就是一个不错的选择。

下面我们依然按照用途分组的方式引出重要的Topic级别参数。从保存消息方面来考量的话,下面这组参数是非常重要的:

  • retention.ms:规定了该Topic消息被保存的时长。默认是7天,即该Topic只保存最近7天的消息。一旦设置了这个值,它会覆盖掉Broker端的全局参数值。
  • retention.bytes:规定了要为该Topic预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的Kafka集群中会有用武之地。当前默认值是-1,表示可以无限使用磁盘空间。

上面这些是从保存消息的维度来说的。如果从能处理的消息大小这个角度来看的话,有一个参数是必须要设置的,即max.message.bytes。它决定了Kafka Broker能够正常接收该Topic的最大消息大小。我知道目前在很多公司都把Kafka作为一个基础架构组件来运行,上面跑了很多的业务数据。如果在全局层面上,我们不好给出一个合适的最大消息值,那么不同业务部门能够自行设定这个Topic级别参数就显得非常必要了。在实际场景中,这种用法也确实是非常常见的。

好了,你要掌握的Topic级别的参数就这么几个。下面我来说说怎么设置Topic级别参数吧。其实说到这个事情,我是有点个人看法的:我本人不太赞同那种做一件事情开放给你很多种选择的设计方式,看上去好似给用户多种选择,但实际上只会增加用户的学习成本。特别是系统配置,如果你告诉我只能用一种办法来做,我会很努力地把它学会;反之,如果你告诉我说有两种方法甚至是多种方法都可以实现,那么我可能连学习任何一种方法的兴趣都没有了。Topic级别参数的设置就是这种情况,我们有两种方式可以设置:

  • 创建Topic时进行设置
  • 修改Topic时设置

我们先来看看如何在创建Topic时设置这些参数。我用上面提到的retention.msmax.message.bytes举例。设想你的部门需要将交易数据发送到Kafka进行处理,需要保存最近半年的交易数据,同时这些数据很大,通常都有几MB,但一般不会超过5MB。现在让我们用以下命令来创建Topic:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880

我们只需要知道Kafka开放了kafka-topics命令供我们来创建Topic即可。对于上面这样一条命令,请注意结尾处的--config设置,我们就是在config后面指定了想要设置的Topic级别参数。

下面看看使用另一个自带的命令kafka-configs来修改Topic级别参数。假设我们现在要发送最大值是10MB的消息,该如何修改呢?命令如下:

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760

总体来说,你只能使用这么两种方式来设置Topic级别参数。我个人的建议是,你最好始终坚持使用第二种方式来设置,并且在未来,Kafka社区很有可能统一使用kafka-configs脚本来调整Topic级别参数。

JVM参数

我在专栏前面提到过,Kafka服务器端代码是用Scala语言编写的,但终归还是编译成Class文件在JVM上运行,因此JVM参数设置对于Kafka集群的重要性不言而喻。

首先我先说说Java版本,我个人极其不推荐将Kafka运行在Java 6或7的环境上。Java 6实在是太过陈旧了,没有理由不升级到更新版本。另外Kafka自2.0.0版本开始,已经正式摒弃对Java 7的支持了,所以有条件的话至少使用Java 8吧。

说到JVM端设置,堆大小这个参数至关重要。虽然在后面我们还会讨论如何调优Kafka性能的问题,但现在我想无脑给出一个通用的建议:将你的JVM堆大小设置成6GB吧,这是目前业界比较公认的一个合理值。我见过很多人就是使用默认的Heap Size来跑Kafka,说实话默认的1GB有点小,毕竟Kafka Broker在与客户端进行交互时会在JVM堆上创建大量的ByteBuffer实例,Heap Size不能太小。

JVM端配置的另一个重要参数就是垃圾回收器的设置,也就是平时常说的GC设置。如果你依然在使用Java 7,那么可以根据以下法则选择合适的垃圾回收器:

  • 如果Broker所在机器的CPU资源非常充裕,建议使用CMS收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC
  • 否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC

当然了,如果你在使用Java 8,那么可以手动设置使用G1收集器。在没有任何调优的情况下,G1表现得要比CMS出色,主要体现在更少的Full GC,需要调整的参数更少等,所以使用G1就好了。

现在我们确定好了要设置的JVM参数,我们该如何为Kafka进行设置呢?有些奇怪的是,这个问题居然在Kafka官网没有被提及。其实设置的方法也很简单,你只需要设置下面这两个环境变量即可:

  • KAFKA_HEAP_OPTS:指定堆大小。
  • KAFKA_JVM_PERFORMANCE_OPTS:指定GC参数。

比如你可以这样启动Kafka Broker,即在启动Kafka Broker之前,先设置上这两个环境变量:

$> export KAFKA_HEAP_OPTS=--Xms6g  --Xmx6g
$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$> bin/kafka-server-start.sh config/server.properties

操作系统参数

最后我们来聊聊Kafka集群通常都需要设置哪些操作系统参数。通常情况下,Kafka并不需要设置太多的OS参数,但有些因素最好还是关注一下,比如下面这几个:

  • 文件描述符限制
  • 文件系统类型
  • Swappiness
  • 提交时间

首先是ulimit -n。我觉得任何一个Java项目最好都调整下这个值。实际上,文件描述符系统资源并不像我们想象的那样昂贵,你不用太担心调大此值会有什么不利的影响。通常情况下将它设置成一个超大的值是合理的做法,比如ulimit -n 1000000。还记得电影《让子弹飞》里的对话吗:“你和钱,谁对我更重要?都不重要,没有你对我很重要!”。这个参数也有点这么个意思。其实设置这个参数一点都不重要,但不设置的话后果很严重,比如你会经常看到“Too many open files”的错误。

其次是文件系统类型的选择。这里所说的文件系统指的是如ext3、ext4或XFS这样的日志型文件系统。根据官网的测试报告,XFS的性能要强于ext4,所以生产环境最好还是使用XFS。对了,最近有个Kafka使用ZFS的数据报告,貌似性能更加强劲,有条件的话不妨一试。

第三是swap的调优。网上很多文章都提到设置其为0,将swap完全禁掉以防止Kafka进程使用swap空间。我个人反倒觉得还是不要设置成0比较好,我们可以设置成一个较小的值。为什么呢?因为一旦设置成0,当物理内存耗尽时,操作系统会触发OOM killer这个组件,它会随机挑选一个进程然后kill掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用swap空间时,你至少能够观测到Broker性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。基于这个考虑,我个人建议将swappniess配置成一个接近0但不为0的值,比如1。

最后是提交时间或者说是Flush落盘时间。向Kafka发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据LRU算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是5秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于Kafka在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。

小结

今天我和你分享了关于Kafka集群设置的各类配置,包括Topic级别参数、JVM参数以及操作系统参数,连同上一篇一起构成了完整的Kafka参数配置列表。我希望这些最佳实践能够在你搭建Kafka集群时助你一臂之力,但切记配置因环境而异,一定要结合自身业务需要以及具体的测试来验证它们的有效性。

开放讨论

很多人争论Kafka不需要为Broker设置太大的堆内存,而应该尽可能地把内存留给页缓存使用。对此你是怎么看的?在你的实际使用中有哪些好的法则来评估Kafka对内存的使用呢?

欢迎写下你的思考和答案,我们一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。

精选留言

  • 丰富

    2019-06-20 08:16:02

    G1是jdk9中默认的,jdk8还是需要显式指定的
    作者回复

    嗯嗯,笔误了。多谢纠正 :)

    2019-06-20 16:10:20

  • saup007

    2019-06-21 12:55:43

    修改 Topic 级 max.message.bytes,还要考虑以下两个吧?
    还要修改 Broker的 replica.fetch.max.bytes 保证复制正常
    消费还要修改配置 fetch.message.max.bytes
    作者回复

    是的,您考虑得很全面:)

    2019-06-21 16:10:08

  • Hello world

    2019-06-20 16:06:40

    老师说的无脑配置给jvm heap 6G大小,这应该也看机器的吧,现在机器的内存也越来越大,我们这的机器都是64G 内存,配了16G的heap,老师觉得可以优化吗
    作者回复

    虽然无脑推荐6GB,但绝不是无脑推荐>6GB。一个16GB的堆Full GC一次要花多长时间啊,所以我觉得6GB可以是一个初始值,你可以实时监控堆上的live data大小,根据这个值调整heap size。只是因为大内存就直接调整到16GB,个人觉得不可取。

    另外堆越小留给页缓存的空间也就越大,这对Kafka是好事啊。

    2019-06-20 17:01:49

  • 赌神很低调

    2019-06-24 10:05:04

    胡老师,kafka认为写入成功是指写入页缓存成功还是数据刷到磁盘成功算成功呢?还是上次刷盘宕机失败的问题,页缓存的数据如果刷盘失败,是不是就丢了?这个异常会不会响应给生产者让其重发呢?
    作者回复

    写入到页缓存即认为成功。如果在flush之前机器就宕机了,的确这条数据在broker上就算丢失了。producer端表现如何取决于acks的设定。如果是acks=1而恰恰是leader broker在flush前宕机,那么的确有可能消息就丢失了,而且producer端不会重发——因为它认为是成功了。

    2019-06-26 09:05:28

  • aoe

    2019-06-20 02:32:06

    ulimit -n这个参数说的太好了!如果不设置,单机在Centos7上几百的并发就报“Too many open files”了。网上搜索后设置成65535,用JMater压测单机也只能支撑到1000左右的并发,原来这个值可以设置到1000000!《Kafka权威指南》上说Kafka单机可以轻松处理300万并发;《响应式架构:消息模式Actor实现与Scala、Akka应用集成》上说Scala用Actor单机可以处理5000万并发。请问胡老师有没有推荐的Linux方面的书籍,可以详细解答ulimit -n参数、如何知道单台Linux机器可以处理的连接数上线?
    我在mac笔记本上用Go开启了10万个goroutine,压测服务器,结果得到异常“Too many open files”,后来也修改了ulimit -65535,但也只能保证1万左右的请求正常,请问Mac上也是只要设置ulimit -n参数就可以将请求的连接数提升到上限吗?
  • 小头针

    2019-06-26 22:01:49

    胡老师,在本课程最后留的问题,又成功的引起了我的注意,我曾经因为kafka假死,不知原因为何,而尝试过设置Broker的内存为(32G/256G),然而进程假死更加频繁(后面检测是那个版本存在线程死锁)。后来还是设置为16G了。当然我这真真的是无脑设置。我也看到了评论了胡老师的建议,很值得参考。
    另外,胡老师在这节课里,讲到了页缓存,我想请问一下这个页缓存它存在的意义和作用,以及它在整个过程中的机制又是怎样的呢?
    作者回复

    页缓存属于磁盘缓存(Disk cache)的一种,主要是为了改善系统性能。重复访问磁盘上的磁盘块是常见的操作,把它们保存在内存中可以避免昂贵的磁盘IO操作。

    既然叫页缓存,它是根据页(page)来组织的内存结构。每一页包含了很多磁盘上的块数据。Linux使用Radix树实现页缓存,主要是加速特定页的查找速度。另外一般使用LRU策略来淘汰过期页数据。总之它是一个完全由内核来管理的磁盘缓存,用户应用程序通常是无感知的。

    如果要详细了解page cache,可以参见《Understanding the Linux Kernel》一书的第15章

    2019-06-27 08:54:54

  • cricket1981

    2019-06-21 17:14:13

    kafka streams或者ksql的性能参数调优有什么建议和参考资料吗?
    作者回复

    Kafka Streams的性能调优建议:https://www.confluent.io/blog/optimizing-kafka-streams-applications

    KSQL本专栏不会涉及,目前我也给不出相应的建议,因为我。。。。我也不会😳

    2019-06-24 08:46:16

  • theivanxu

    2019-06-20 09:45:39

    最近环境中有一台3G堆内存的节点在某个topic handle request的时候一直OOM,调整到5G重启后恢复正常,很想知道如何评判堆内存大小设置的标准。
    作者回复

    没有通用的标准,只有一个最佳实践值:6GB。最好还是监控一下实时的堆大小,特别是GC之后的live data大小,通常将heapsize设置成其1.5~2倍就足以了

    2019-06-20 16:46:44

  • 张振宇

    2019-11-19 17:21:48

    老师,怎么能限制消费者的消费速度,或者限制消费带宽啊,
    作者回复

    这是我之前写的,可以参考下:https://www.cnblogs.com/huxi2b/p/8609453.html

    2019-11-20 08:48:27

  • 张洋

    2020-05-11 15:36:39

    老师我想问下,写入到pageCache 根据配置的时间‘脏数据’Flush到磁盘,kafka 把数据同步到磁盘只在这个地方做吗。意思是:只有每次‘判断’的脏数据才入盘吗,其他的数据呢?
    作者回复

    Kafka其实只是把数据写入到pagecache中,后面的刷盘是由os完成的,什么时候刷,刷那些数据都是由os决定

    2020-05-12 09:59:58

  • 王晨光

    2020-04-13 22:14:32

    老师,kafka消费段,过一段时间jvm内存就会超过设置上线,有什么好的思路调整吗
    作者回复

    OOM的问题首先看下到底是那OOM的问题可以这样排查:
    1. 到底是哪部分内存。大部分是堆溢出
    2. 如果是heap溢出,主要看stacktrace,看看到底是哪段代码导致的
    3. 再看导致的原因,到底是内存泄露还是内存溢出。这两者是有区别的。前者是程序写的有问题,后者是程序确实需要这么多内存,那么只能增加heap size

    不管怎么样,你可以先增加一下heap size试试,如果还是OOM,那么很有可能出现了内存泄露

    2020-04-14 09:46:05

  • 刘朋

    2019-06-20 09:37:34

    系统会根据LRU算法定期将页缓存上的 脏 数据落盘到物理磁盘上. 这个定期就是由提交时间来确定的,默认是5秒.

    这个时间如何设置? 是内核参数吗?
    作者回复

    不算内核参数,是文件系统的参数。你可以查询一下文件系统手册。比如ext4就是commit=Nseconds这样设置

    2019-06-20 16:45:42

  • Xiao

    2019-06-20 16:59:45

    帅气的胡老师,后边是否会将Kafka数据丢失和消息重复的场景以以及解决思路!
    作者回复

    会有的,后面有防止消息丢失和重复消费,到时候一起讨论哈

    2019-06-21 09:27:02

  • 匿名

    2020-04-14 12:22:06

    swap设置成1,可以观测到Broker 性能开始出现急剧下降,从而进一步调优和诊断问题。这个地方的意思是观测运维指标,发现Kafka性能下降然后被kill。基本可以推断是出发了OOM Killer 是吗?
    作者回复

    这里想表达的意思是,如果swap=0,内存不足时oom killer触发可能直接将broker进程杀死。而如果设置swap为一个比较小的值,在oom killer触发前你至少有一段时间观察到broker性能变差,给你监控和调优都留了一些时间

    2020-04-16 10:42:12

  • 2019-08-13 07:16:22

    胡老师或者其他同学,初学kafka有些概念没完全弄明白,请帮忙解答一下,多谢。
    1:broker的本质是什么?启动一个kafka应用程序的进程就相当于一个broker在跑了嘛?还是说可以通过设置会存在多个broker在跑?
    2:broker和topic的关系是怎么样的?目前我确定的是一个kafka集群中topic一定是一个唯一的,但肯定会有多个broker,是不是说启一台kafka服务器就是一个broker在跑,多个一块构成一个集群?还是说一台服务器可以跑多个kafka程序也有多个broker在跑,也能构成一个集群,只是比较脆弱?
    3:分区的底层数据结构是什么?队列?数组还是列表?还是说分区这一次还不够底层和具体的数据结构关系不大?
    请老师帮忙解答一下,学到目前还不太明确一个消息从生产出来到消费掉,都经历了那些关键的路程,感觉理解其他的越发困难了,如果方便也请老师大致勾画一下一个消息的生命轨迹,其中那些是关键的转变?
    多谢啦!
    作者回复

    1. Broker是Kafka集群的服务器。启动集群就是指启动若干个Broker进程
    2. Topic在一个Kafka集群上可能有很多个,Topic数据的确保存在Broker上,但它们之间没有什么关系。1台Broker也可以对外提供服务,只是如你所说:很脆弱。
    3. 分区下面是副本,副本的存储结构是日志(log)。每条消息写入分区时,其实是写入副本的日志文件中

    2019-08-13 08:42:28

  • King Yao

    2019-07-04 10:02:06

    老师请教一个问题,我们设置了过期时间3小时,但是客户端还是会消费到昨天的昨天的消息,这个如何查找原因呢
    作者回复

    你要有多个日志段文件消息删除才可能生效。只有一个日志段文件是没用的。

    2019-07-05 08:35:00

  • Geek_osqiyw

    2019-06-29 10:33:34

    老师的美式英语发音真标准
  • bunny

    2019-06-20 08:38:03

    胡老师,这个参数retention.bytes应该是指使用的磁盘空间吧,而且是针对于单个分区的;之前遇到过kafka将磁盘写满的情况,导致broker不可用,请问有什么好的预防措施和监控手段么?
    作者回复

    这是topic级别的参数,控制每个分区能占用的最大磁盘空间。设置它就好了~~

    监控的话,好像没有现成的JMX指标。我之前写过一个方法可以监控磁盘占用,你不妨一试:https://www.cnblogs.com/huxi2b/p/7929690.html

    2019-06-20 16:14:48

  • James

    2020-05-11 22:13:21

    随便写一下,方便以后查看

    二.Topic级别参数(以Topic级别参数为主,可覆盖全局Broker参数的值)
    1.retention.ms
    (消息被保存的时长)
    2.retention.bytes
    (预留多大的磁盘空间)
    3.max.message.bytes
    (能够正常接收该Topic的最大消息大小)
    4.调整参数
    bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760

    三.JVM参数(java版本1.8以上,自2.0版本以上仅支持jdk1.8以上)
    1.Heap Size设置为6GB(默认为1GB)
    (Broker与客户端交互会在堆上创建大量的ByteBuffer)
    2.垃圾回收器的设置
    2.1jdk1.7,在cpu充足下使用CMS收集器
    否则使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC
    2.2jdk1.8,使用默认Parallel GC,Java9默认才是G1

    以上参数,在启动Broker前配置
    $> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
    $> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
    $> bin/kafka-server-start.sh config/server.properties


    四.操作系统参数
    4.1文件描述符
    (设置ulimit -n 1000000 ,否则Too many open files)
    4.2限制文件系统类型
    (ext3,ext4,XFS日志型文件系统;XFS的性能要强于ext4最好还是使用XFS)
    4.3Swappiness
    (设置为0,当无物理内存时会随机杀死进程;设置接近0不为0的值,可以观测到Broker性能下降时,给予调优的时间)
    4.4提交时间(Flush落盘时间)
    (向Kafka发送数据并不是真正等数据写入磁盘才算成功;而是只要数据被写入到操作系统的页缓存就算成功)
    默认为5s,应该提高这个时间,降低物理磁盘的写操作.
    作者回复

    👍

    2020-05-12 09:56:02

  • poettian

    2020-03-27 11:50:17

    请教老师一个问题,我在使用 kafka 客户端应用时,有时会需要 broker list 这个参数,在我的集群有三个broker 的情况下,我发现 只填一个 和 三个都填上 都可以用,这个有什么区别吗?网上搜了一圈也没搜着
    作者回复

    确实只需要写1个就可以,因为Kafka能够通过这1个找到集群所有的Broker。当然最好还是多写几个

    2020-03-30 09:57:51