07 | 最最最重要的集群参数配置(上)

你好,我是胡夕。今天我想和你聊聊最最最重要的Kafka集群配置。我这里用了3个“最”字并非哗众取宠,而是因为有些配置的重要性并未体现在官方文档中,并且从实际表现看,很多参数对系统的影响要比从文档上看更加明显,因此很有必要集中讨论一下。

我希望通过两期内容把这些重要的配置讲清楚。严格来说这些配置并不单单指Kafka服务器端的配置,其中既有Broker端参数,也有主题(后面我用我们更熟悉的Topic表示)级别的参数、JVM端参数和操作系统级别的参数。

需要你注意的是,这里所说的Broker端参数也被称为静态参数(Static Configs)。我会在专栏后面介绍与静态参数相对应的动态参数。所谓静态参数,是指你必须在Kafka的配置文件server.properties中进行设置的参数,不管你是新增、修改还是删除。同时,你必须重启Broker进程才能令它们生效。而主题级别参数的设置则有所不同,Kafka提供了专门的kafka-configs命令来修改它们。至于JVM和操作系统级别参数,它们的设置方法比较通用化,我介绍的也都是标准的配置参数,因此,你应该很容易就能够对它们进行设置。

下面我先从Broker端参数说起。

Broker端参数

目前Kafka Broker提供了近200个参数,这其中绝大部分参数都不用你亲自过问。当谈及这些参数的用法时,网上的文章多是罗列出一些常见的参数然后一个一个地给出它们的定义,事实上我以前写文章时也是这么做的。不过今天我打算换个方法,按照大的用途类别一组一组地介绍它们,希望可以更有针对性,也更方便你记忆。

首先Broker是需要配置存储信息的,即Broker使用哪些磁盘。那么针对存储信息的重要参数有以下这么几个:

  • log.dirs:这是非常重要的参数,指定了Broker需要使用的若干个文件目录路径。要知道这个参数是没有默认值的,这说明什么?这说明它必须由你亲自指定。
  • log.dir:注意这是dir,结尾没有s,说明它只能表示单个路径,它是补充上一个参数用的。

这两个参数应该怎么设置呢?很简单,你只要设置log.dirs,即第一个参数就好了,不要设置log.dir。而且更重要的是,在线上生产环境中一定要为log.dirs配置多个路径,具体格式是一个CSV格式,也就是用逗号分隔的多个路径,比如/home/kafka1,/home/kafka2,/home/kafka3这样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。这样做有两个好处:

  • 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
  • 能够实现故障转移:即Failover。这是Kafka 1.1版本新引入的强大功能。要知道在以前,只要Kafka Broker使用的任何一块磁盘挂掉了,整个Broker进程都会关闭。但是自1.1开始,这种情况被修正了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且Broker还能正常工作。还记得上一期我们关于Kafka是否需要使用RAID的讨论吗?这个改进正是我们舍弃RAID方案的基础:没有这种Failover的话,我们只能依靠RAID来提供保障。

下面说说与ZooKeeper相关的设置。首先ZooKeeper是做什么的呢?它是一个分布式协调框架,负责协调管理并保存Kafka集群的所有元数据信息,比如集群都有哪些Broker在运行、创建了哪些Topic,每个Topic都有多少分区以及这些分区的Leader副本都在哪些机器上等信息。

Kafka与ZooKeeper相关的最重要的参数当属zookeeper.connect。这也是一个CSV格式的参数,比如我可以指定它的值为zk1:2181,zk2:2181,zk3:2181。2181是ZooKeeper的默认端口。

现在问题来了,如果我让多个Kafka集群使用同一套ZooKeeper集群,那么这个参数应该怎么设置呢?这时候chroot就派上用场了。这个chroot是ZooKeeper的概念,类似于别名。

如果你有两套Kafka集群,假设分别叫它们kafka1和kafka2,那么两套集群的zookeeper.connect参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1zk1:2181,zk2:2181,zk3:2181/kafka2。切记chroot只需要写一次,而且是加到最后的。我经常碰到有人这样指定:zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3,这样的格式是不对的。

第三组参数是与Broker连接相关的,即客户端程序或其他Broker如何与该Broker进行通信的设置。有以下三个参数:

  • listeners:学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的Kafka服务。
  • advertised.listeners:和listeners相比多了个advertised。Advertised的含义表示宣称的、公布的,就是说这组监听器是Broker用于对外发布的。
  • host.name/port:列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。

我们具体说说监听器的概念,从构成上来说,它是若干个逗号分隔的三元组,每个三元组的格式为<协议名称,主机名,端口号>。这里的协议名称可能是标准的名字,比如PLAINTEXT表示明文传输、SSL表示使用SSL或TLS加密传输等;也可能是你自己定义的协议名字,比如CONTROLLER: //localhost:9092

一旦你自己定义了协议名称,你必须还要指定listener.security.protocol.map参数告诉这个协议底层使用了哪种安全协议,比如指定listener.security.protocol.map=CONTROLLER:PLAINTEXT表示CONTROLLER这个自定义协议底层使用明文不加密传输数据。

至于三元组中的主机名和端口号则比较直观,不需要做过多解释。不过有个事情你还是要注意一下,经常有人会问主机名这个设置中我到底使用IP地址还是主机名。这里我给出统一的建议:最好全部使用主机名,即Broker端和Client端应用配置中全部填写主机名。 Broker源代码中也使用的是主机名,如果你在某些地方使用了IP地址进行连接,可能会发生无法连接的问题。

第四组参数是关于Topic管理的。我来讲讲下面这三个参数:

  • auto.create.topics.enable:是否允许自动创建Topic。
  • unclean.leader.election.enable:是否允许Unclean Leader选举。
  • auto.leader.rebalance.enable:是否允许定期进行Leader选举。

我还是一个个说。

auto.create.topics.enable参数我建议最好设置成false,即不允许自动创建Topic。在我们的线上环境里面有很多名字稀奇古怪的Topic,我想大概都是因为该参数被设置成了true的缘故。

你可能有这样的经历,要为名为test的Topic发送事件,但是不小心拼写错误了,把test写成了tst,之后启动了生产者程序。恭喜你,一个名为tst的Topic就被自动创建了。

所以我一直相信好的运维应该防止这种情形的发生,特别是对于那些大公司而言,每个部门被分配的Topic应该由运维严格把控,决不能允许自行创建任何Topic。

第二个参数unclean.leader.election.enable是关闭Unclean Leader选举的。何谓Unclean?还记得Kafka有多个副本这件事吗?每个分区都有多个副本来提供高可用。在这些副本中只能有一个副本对外提供服务,即所谓的Leader副本。

那么问题来了,这些副本都有资格竞争Leader吗?显然不是,只有保存数据比较多的那些副本才有资格竞选,那些落后进度太多的副本没资格做这件事。

好了,现在出现这种情况了:假设那些保存数据比较多的副本都挂了怎么办?我们还要不要进行Leader选举了?此时这个参数就派上用场了。

如果设置成false,那么就坚持之前的原则,坚决不能让那些落后太多的副本竞选Leader。这样做的后果是这个分区就不可用了,因为没有Leader了。反之如果是true,那么Kafka允许你从那些“跑得慢”的副本中选一个出来当Leader。这样做的后果是数据有可能就丢失了,因为这些副本保存的数据本来就不全,当了Leader之后它本人就变得膨胀了,认为自己的数据才是权威的。

这个参数在最新版的Kafka中默认就是false,本来不需要我特意提的,但是比较搞笑的是社区对这个参数的默认值来来回回改了好几版了,鉴于我不知道你用的是哪个版本的Kafka,所以建议你还是显式地把它设置成false吧。

第三个参数auto.leader.rebalance.enable的影响貌似没什么人提,但其实对生产环境影响非常大。设置它的值为true表示允许Kafka定期地对一些Topic分区进行Leader重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。严格来说它与上一个参数中Leader选举的最大不同在于,它不是选Leader,而是换Leader!比如Leader A一直表现得很好,但若auto.leader.rebalance.enable=true,那么有可能一段时间后Leader A就要被强行卸任换成Leader B。

你要知道换一次Leader代价很高的,原本向A发送请求的所有客户端都要切换成向B发送请求,而且这种换Leader本质上没有任何性能收益,因此我建议你在生产环境中把这个参数设置成false。

最后一组参数是数据留存方面的,我分别介绍一下。

  • log.retention.{hours|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说ms设置最高、minutes次之、hours最低。
  • log.retention.bytes:这是指定Broker为消息保存的总磁盘容量大小。
  • message.max.bytes:控制Broker能够接收的最大消息大小。

先说这个“三兄弟”,虽然ms设置有最高的优先级,但是通常情况下我们还是设置hours级别的多一些,比如log.retention.hours=168表示默认保存7天的数据,自动删除7天前的数据。很多公司把Kafka当作存储来使用,那么这个值就要相应地调大。

其次是这个log.retention.bytes。这个值默认是-1,表明你想在这台Broker上保存多少数据都可以,至少在容量方面Broker绝对为你开绿灯,不会做任何阻拦。这个参数真正发挥作用的场景其实是在云上构建多租户的Kafka集群:设想你要做一个云上的Kafka服务,每个租户只能使用100GB的磁盘空间,为了避免有个“恶意”租户使用过多的磁盘空间,设置这个参数就显得至关重要了。

最后说说message.max.bytes。实际上今天我和你说的重要参数都是指那些不能使用默认值的参数,这个参数也是一样,默认的1000012太少了,还不到1MB。实际场景中突破1MB的消息都是屡见不鲜的,因此在线上环境中设置一个比较大的值还是比较保险的做法。毕竟它只是一个标尺而已,仅仅衡量Broker能够处理的最大消息大小,即使设置大一点也不会耗费什么磁盘空间的。

小结

再次强调一下,今天我和你分享的所有参数都是那些要修改默认值的参数,因为它们的默认值不适合一般的生产环境。当然,我并不是说其他100多个参数就不重要。事实上,在专栏的后面我们还会陆续提到其他的一些参数,特别是那些和性能息息相关的参数。所以今天我提到的所有参数,我希望作为一个最佳实践给到你,可以有的放矢地帮助你规划和调整你的Kafka生产环境。

开放讨论

除了今天我分享的这些参数,还有哪些参数是你认为比较重要而文档中没有提及的?你曾踩过哪些关于参数配置的“坑”?欢迎提出来与我和大家一起讨论。

欢迎你写下自己的思考或疑问,我们一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。

精选留言

  • 大坏狐狸

    2020-03-16 16:42:05

    auto.create.topics.enable: 不能自立为王

    unclean.leader.election.enable: 宁缺毋滥

    auto.leader.rebalance.enable:江山不易改

    log.retention.{hours|minutes|ms} :数据寿命 hours=168
    log.rentention.bytes: 祖宅大小 -1 表示没限制
    message.max.bytes: 祖宅大门宽度,默认 1000012=976KB
    作者回复

    我去,这个过于形象了~

    2020-03-17 09:26:43

  • 草帽路飞

    2019-06-18 00:32:48

    老师 advertised.listeners 这个配置能否再解释一下。感觉配置了 listeners之后就不用配置这个了呀?
    作者回复

    advertised.listeners主要是为外网访问用的。如果clients在内网环境访问Kafka不需要配置这个参数。

    常见的玩法是:你的Kafka Broker机器上配置了双网卡,一块网卡用于内网访问(即我们常说的内网IP);另一个块用于外网访问。那么你可以配置listeners为内网IP,advertised.listeners为外网IP。

    2019-06-18 08:50:35

  • 第一片心意

    2019-12-13 14:36:21

    auto.leader.rebalance.enable
    关于这个参数的设置,我有一点不同的意见,官网说的是如果某个broker挂了,那分布在他上的leader副本就会自动切换到其他活着的broker上,但是挂掉的broker重启之后,集群并不会将他之前的leader副本再切换回来,这样就会使其他broker上leader副本数较多,而该broker上无leader副本(无新主题创建),从而造成负载不均衡的情况。
    这时我们可以通过 kafka-preferred-replica-election.sh 脚本来重新平衡集群中的leader副本。但是我们配置这个参数为true的话,controller角色就会每五分钟(默认)检查一下集群不平衡的状态,进而重新平衡leader副本。
    作者回复

    同意。不过实际上,线上环境贸然大面积迁移副本leader是非常有风险的事情:)

    2019-12-14 13:44:27

  • QQ怪

    2019-06-18 22:55:56

    老师帮我们讲讲这个参数吧auto.offset.reset,我有时候删除一个topic时会导致offset异常,出现重复消费问题,不知道跟这个参数有没有关系??
    作者回复

    不太懂“删除topic后还出现重复消费”是什么意思?删完了还要继续消费它吗?

    当consumer启动后它会从Kafka读取它上次消费的位移。情况1: 如果 Kafka broker端没有保存这个位移值,那么consumer会看auto.offset.reset的脸色
    情况2:consumer拿到位移值开始消费,如果后面发现它要读取消息的位移在Kafka中不存在(可能对应的消息已经被删除了),那么它也会看auto.offset.reset的脸色
    情况3:除以上这两种情况之外consumer不会再顾忌auto.offset.reset的值

    怎么看auto.offset.reset的脸色呢?简单说就是earliest从头消息;latest从当前新位移处消费。

    2019-06-19 08:50:50

  • 🇭 🇴 🇱 🇮 🇸

    2020-04-09 16:03:08

    老师 我把message.max.bytes设置地挺大,但是java生产者发送1M以上数据就失败,集群也重启过,版本0.10左右 是否有其他参数需要调?
    作者回复

    需要。producer、broker、consumer三端都需要调整

    broker: message.max.bytes和replica.fetch.max.bytes
    consumer:fetch.message.max.bytes

    2020-04-10 09:21:45

  • 杨陆伟

    2019-12-30 15:28:52

    你好,log.retention.bytes这个参数是针对主题的吧?比如设置为100M,Kafka定期会把每个主题的日志数据留存到100M以下?
    作者回复

    这个参数既有broker端也有topic端,不过最终都是作用于topic的。另外算法上也不是简单的比较大小。举个例子吧:假设日志段大小是700MB,当前分区共有4个日志段文件,大小分别是700MB,700MB,700MB和1234B——显然1234B那个文件就是active日志段。此时该分区总的日志大小是3*700MB+1234B=2100MB+1234B,如果阈值设置为2000MB,那么超出阈值的部分就是100MB+1234B,小于日志段大小700MB,故Kafka不会执行任何删除操作,即使总大小已经超过了阈值;反之如果阈值设置为1400MB,那么超过阈值的部分就是700MB+1234B > 700MB,此时Kafka会删除最老的那个日志段文件

    2019-12-31 10:36:38

  • 小头针

    2019-06-24 22:43:12

    胡老师,我在kafka升级过程中遇到过这样的问题,就是升级后的Kafka与之前的Kafka 的配置完全一样,就是版本不一样了。但是5个Broker后,Kafka Manager工具中,只有1个Broker有数据进入进出。后来同时添加了以下4个参数:
    rebalance.max.retries=4
    auto.leader.rebalance.enable=true
    leader.imbalance.check.interval.seconds=300
    leader.imbalance.per.broker.percentage=10
    再重启Kafka,5个Broker都有数据进入进出,但是我不清楚这到底是哪个参数起到了决定性的作用。其中就有老师讲的auto.leader.rebalance.enable这个参数,但是我这里设置的是true?
    作者回复

    只有一个broker有数据进出,我猜是因为这样的原因:1. 首先你的主题分区副本数是1;2. 在你升级的过程中所有分区的Leader副本都变更到了同一台broker上。

    后面开启了auto.leader.rebalance.enable=true之后它定期将Leader副本分散到不同broker上了。

    2019-06-25 08:39:06

  • 你好旅行者

    2019-06-18 01:37:07

    老师好!关于Unclean这个参数,将其设置为false之后,就意味着如果ISR内的所有broker都宕机,那么这个分区就不可用了。
    刚好我前几天看到饶军在2013年的一次报告上讲到Kafka在CAP问题上的取舍,他说,因为Kafka是部署在一个DataCenter中的,而一个DataCenter很少会出现Partitioning的情况,所以Kafka放弃了分区容忍性。
    我想问的是,Kafka舍弃了分区容忍性这一点是否可以体现在社区默认将Unclean设置为false上呢?
    附上报告的地址:https://www.youtube.com/watch?v=XcvHmqmh16g
    关于CAP的取舍出现在21:50左右的地方。谢谢老师!
    作者回复

    首先,CAP理论有很多有歧义的地方,我很好奇为什么国内很多人追捧CAP,其实对于分布式系统而言,很多一致性问题都是CAP覆盖不了的。
    其次,我个人觉得饶大神并不是说Kafka放弃了P,其实Kafka是依托于ZooKeeper以及合理配置minIsr等参数来规避脑裂的。
    第三,我翻看了社区对此提案的讨论,变更为false就是很朴素的思想:用户在默认情况下可能更加关心数据一致性,不想数据丢失。如果用户想要更高的可用性,手动调整即可。你可以看看社区对此问题的讨论:https://www.mail-archive.com/dev@kafka.apache.org/msg63086.html

    2019-06-18 09:04:43

  • 不了峰

    2019-06-18 20:22:20

    请教老师
    gg.handler.kafkahandler.Mode = tx
    gg.handler.kafkahandler.Mode = op
    这两个的差别。我们遇到时 dml 数据会丢失的情况。用的是 op 。
    谢谢
    作者回复

    搜了一下,像是Oracle GoldenGate Kafka Adapter的参数。我没有用过,从文档中看这两者的区别是:当设置成op单个数据库表的变更(插入、更新、删除)会被当成一条Kafka消息发送;当设置成tx时,数据库事务所做的所有变更统一被封装进一条Kafka消息,并在事务提交后被发送。

    显然,后者有事务性的保障,至少有原子性方面的保证,不会丢失部分CDC数据。

    2019-06-19 08:45:32

  • 咸淡一首诗

    2020-02-14 18:06:49

    老师,对于failover机制,kafka会新建副本,从leader处同步最新的数据给新建副本。如果坏掉的盘是leader持久化的盘并且其他副本没有来的及从坏掉的leader分区同步最新数据,重新选举leader后岂不是也会丢失数据???
    作者回复

    是的,这种情况会丢失数据。其实Kafka并没有承诺不丢失数据,而是在满足某些条件下才做持久化保证。

    2020-02-15 09:06:58

  • henry

    2019-06-19 09:27:40

    老师,最近别人问我一个问题,假如现有集群已经有3个分区,动态添加两个分区, 原有的分区会迁移数据到新增的分区吗?
    作者回复

    不会。已有数据将一直“躺在”原有分区中。

    2019-06-19 10:28:01

  • hunterlodge

    2019-11-10 19:46:25

    “坚决不能让那些落后太多的副本竞选 Leader”,请问落后多少算是太多呢?谢谢
    作者回复

    这个取决于broker端参数replica.lag.time.max.ms的取值

    2019-11-11 08:34:49

  • Liam

    2019-06-19 08:09:39

    请问老师,坏掉的数据是怎么自动转移到其他磁盘上的呢?
    作者回复

    可能有点没说清楚。

    1. Broker自动在好的路径上重建副本,然后从leader同步;
    2. Kafka支持工具能够将某个路径上的数据拷贝到其他路径上

    2019-06-19 09:11:21

  • 李 P

    2019-06-19 16:26:27

    和本节无关,消息队列重复消费问题有什么太好的办法吗?我们现在的做法是把offset和消费后的计算结果一并保存在业务系统中,有没有更好的做法
    作者回复

    可以试试Kafka 0.11引入的事务

    2019-06-19 18:25:33

  • 你看起来很好吃

    2019-06-18 17:21:54

    '如果设置成 false,那么就坚持之前的原则,坚决不能让那些落后太多的副本竞选 Leader。'想问一下老师,每个partition的副本保存的数据不是应该和leader是一模一样的吗?为什么会有丢失的?
    作者回复

    它们是异步拉取消息的,必然有一个时间窗口导致它和leader中的数据是不一致的,或者说它是落后于leader的。

    2019-06-19 08:38:45

  • 汉斯·冯·拉特

    2020-07-10 13:43:12

    老师,可以这样设置吗,listeners设置协议用PLAINTEXT,advertised.listeners设置协议为SASL_PLAINTEXT,让broker之间走plaintext,这样内部相互传输快些,但是外部访问则需要认证
    作者回复

    这是可以的,broker间的网络通讯是安全的话其实没必要做SSL

    2020-07-11 14:20:56

  • Geek_b809ff

    2019-08-21 20:28:51

    [2019-08-21 20:25:24,619] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 57 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

    老师,请教一下,这个错误是什么参数配置错了导致的呢?
    作者回复

    如果只是偶尔抛出不用管,通常是因为没有找到对应的主题所致。不是参数配置错导致

    2019-08-22 10:28:34

  • 明翼

    2019-06-19 07:12:29

    老师你好,message.max.bytes设置后是不是影响了kafka的内存占用大小?谢谢
    作者回复

    对于普通的消息处理,这个值不会增加额外的内存占用,它不像是数组的长度那样 ,即使用不完也要申请足量的内存空间。

    但对于Log Cleaner而言(就是为topic执行compact操作的线程),这个值的确会占用更多的内存,因为cleaner的读写buffer都要申请一块ByteBuffer。这个值越大这块buffer也就越大。好在cleaner thread也就那么几个。

    2019-06-19 09:09:44

  • 小猪

    2019-06-18 10:59:54

    请教老师,kafka可以部署到kubernetes云平台吗?效能如何?和直接部署在物理机上有什么区别?例如物理机可以选择分布在不同物理磁盘存数据提高速度等
    作者回复

    性能应该是不用担心的吧。很多大厂都有自己的Kafka PaaS服务,没有听说有性能方面的硬伤。用Docker跑也可以挂多个磁盘

    2019-06-19 08:33:58

  • Geek_jacky

    2019-06-18 09:49:40

    老师好,如果磁盘坏掉了,这些数据是什么机制读取到其他磁盘上的呢?不是都坏了吗?不应该读取其他副本中的数据了吗?这个磁盘上的数据就算是丢失了吗?
    作者回复

    Broker会在好的目录上重建副本。另外Kafka也提供了工具将某块磁盘上的数据直接搬移到另一个磁盘上,毕竟磁盘坏了也不是不能修好:)

    2019-06-18 10:10:27