10 | 生产者压缩算法面面观

你好,我是胡夕。今天我要和你分享的内容是:生产者压缩算法面面观。

说起压缩(compression),我相信你一定不会感到陌生。它秉承了用时间去换空间的经典trade-off思想,具体来说就是用CPU时间去换磁盘空间或网络I/O传输量,希望以较小的CPU开销带来更少的磁盘占用或更少的网络I/O传输。在Kafka中,压缩也是用来做这件事的。今天我就来跟你分享一下Kafka中压缩的那些事儿。

怎么压缩?

Kafka是如何压缩消息的呢?要弄清楚这个问题,就要从Kafka的消息格式说起了。目前Kafka共有两大类消息格式,社区分别称之为V1版本和V2版本。V2版本是Kafka 0.11.0.0中正式引入的。

不论是哪个版本,Kafka的消息层次都分为两层:消息集合(message set)以及消息(message)。一个消息集合中包含若干条日志项(record item),而日志项才是真正封装消息的地方。Kafka底层的消息日志由一系列消息集合日志项组成。Kafka通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。

那么社区引入V2版本的目的是什么呢?V2版本主要是针对V1版本的一些弊端做了修正,和我们今天讨论的主题相关的修正有哪些呢?先介绍一个,就是把消息的公共部分抽取出来放到外层消息集合里面,这样就不用每条消息都保存这些信息了。

我来举个例子。原来在V1版本中,每条消息都需要执行CRC校验,但有些情况下消息的CRC值是会发生变化的。比如在Broker端可能会对消息时间戳字段进行更新,那么重新计算之后的CRC值也会相应更新;再比如Broker端在执行消息格式转换时(主要是为了兼容老版本客户端程序),也会带来CRC值的变化。鉴于这些情况,再对每条消息都执行CRC校验就有点没必要了,不仅浪费空间还耽误CPU时间,因此在V2版本中,消息的CRC校验工作就被移到了消息集合这一层。

V2版本还有一个和压缩息息相关的改进,就是保存压缩消息的方法发生了变化。之前V1版本中保存压缩消息的方法是把多条消息进行压缩然后保存到外层消息的消息体字段中;而V2版本的做法是对整个消息集合进行压缩。显然后者应该比前者有更好的压缩效果。

我对两个版本分别做了一个简单的测试,结果显示,在相同条件下,不论是否启用压缩,V2版本都比V1版本节省磁盘空间。当启用压缩时,这种节省空间的效果更加明显,就像下面这两张图展示的那样:

何时压缩?

在Kafka中,压缩可能发生在两个地方:生产者端和Broker端。

生产者程序中配置compression.type参数即表示启用指定类型的压缩算法。比如下面这段程序代码展示了如何构建一个开启GZIP的Producer对象:

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 // 开启GZIP压缩
 props.put("compression.type", "gzip");
 
 Producer<String, String> producer = new KafkaProducer<>(props);

这里比较关键的代码行是props.put(“compression.type”, “gzip”),它表明该Producer的压缩算法使用的是GZIP。这样Producer启动后生产的每个消息集合都是经GZIP压缩过的,故而能很好地节省网络传输带宽以及Kafka Broker端的磁盘占用。

在生产者端启用压缩是很自然的想法,那为什么我说在Broker端也可能进行压缩呢?其实大部分情况下Broker从Producer端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改,但这里的“大部分情况”也是要满足一定条件的。有两种例外情况就可能让Broker重新压缩消息。

情况一:Broker端指定了和Producer端不同的压缩算法。

先看一个例子。想象这样一个对话。

Producer说:“我要使用GZIP进行压缩。”

Broker说:“不好意思,我这边接收的消息必须使用Snappy算法进行压缩。”

你看,这种情况下Broker接收到GZIP压缩消息后,只能解压缩然后使用Snappy重新压缩一遍。如果你翻开Kafka官网,你会发现Broker端也有一个参数叫compression.type,和上面那个例子中的同名。但是这个参数的默认值是producer,这表示Broker端会“尊重”Producer端使用的压缩算法。可一旦你在Broker端设置了不同的compression.type值,就一定要小心了,因为可能会发生预料之外的压缩/解压缩操作,通常表现为Broker端CPU使用率飙升。

情况二:Broker端发生了消息格式转换。

所谓的消息格式转换主要是为了兼容老版本的消费者程序。还记得之前说过的V1、V2版本吧?在一个生产环境中,Kafka集群中同时保存多种版本的消息格式非常常见。为了兼容老版本的格式,Broker端会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。一般情况下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外,它还让Kafka丧失了引以为豪的Zero Copy特性。

所谓“Zero Copy”就是“零拷贝”,我在专栏第6期提到过,说的是当数据在磁盘和网络进行传输时避免昂贵的内核态数据拷贝,从而实现快速的数据传输。因此如果Kafka享受不到这个特性的话,性能必然有所损失,所以尽量保证消息格式的统一吧,这样不仅可以避免不必要的解压缩/重新压缩,对提升其他方面的性能也大有裨益。如果有兴趣你可以深入地了解下Zero Copy的原理。

何时解压缩?

有压缩必有解压缩!通常来说解压缩发生在消费者程序中,也就是说Producer发送压缩消息到Broker后,Broker照单全收并原样保存起来。当Consumer程序请求这部分消息时,Broker依然原样发送出去,当消息到达Consumer端后,由Consumer自行解压缩还原成之前的消息。

那么现在问题来了,Consumer怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。Kafka会将启用了哪种压缩算法封装进消息集合中,这样当Consumer读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。如果用一句话总结一下压缩和解压缩,那么我希望你记住这句话:Producer端压缩、Broker端保持、Consumer端解压缩。

除了在Consumer端解压缩,Broker端也会进行解压缩。注意了,这和前面提到消息格式转换时发生的解压缩是不同的场景。每个压缩过的消息集合在Broker端写入时都要发生解压缩操作,目的就是为了对消息执行各种验证。我们必须承认这种解压缩对Broker端性能是有一定影响的,特别是对CPU的使用率而言。

事实上,最近国内京东的小伙伴们刚刚向社区提出了一个bugfix,建议去掉因为做消息校验而引入的解压缩。据他们称,去掉了解压缩之后,Broker端的CPU使用率至少降低了50%。不过有些遗憾的是,目前社区并未采纳这个建议,原因就是这种消息校验是非常重要的,不可盲目去之。毕竟先把事情做对是最重要的,在做对的基础上,再考虑把事情做好做快。针对这个使用场景,你也可以思考一下,是否有一个两全其美的方案,既能避免消息解压缩也能对消息执行校验。

各种压缩算法对比

那么我们来谈谈压缩算法。这可是重头戏!之前说了这么多,我们还是要比较一下各个压缩算法的优劣,这样我们才能有针对性地配置适合我们业务的压缩策略。

在Kafka 2.1.0版本之前,Kafka支持3种压缩算法:GZIP、Snappy和LZ4。从2.1.0开始,Kafka正式支持Zstandard算法(简写为zstd)。它是Facebook开源的一个压缩算法,能够提供超高的压缩比(compression ratio)。

对了,看一个压缩算法的优劣,有两个重要的指标:一个指标是压缩比,原先占100份空间的东西经压缩之后变成了占20份空间,那么压缩比就是5,显然压缩比越高越好;另一个指标就是压缩/解压缩吞吐量,比如每秒能压缩或解压缩多少MB的数据。同样地,吞吐量也是越高越好。

下面这张表是Facebook Zstandard官网提供的一份压缩算法benchmark比较结果:

从表中我们可以发现zstd算法有着最高的压缩比,而在吞吐量上的表现只能说中规中矩。反观LZ4算法,它在吞吐量方面则是毫无疑问的执牛耳者。当然对于表格中数据的权威性我不做过多解读,只想用它来说明一下当前各种压缩算法的大致表现。

在实际使用中,GZIP、Snappy、LZ4甚至是zstd的表现各有千秋。但对于Kafka而言,它们的性能测试结果却出奇得一致,即在吞吐量方面:LZ4 > Snappy > zstd和GZIP;而在压缩比方面,zstd > LZ4 > GZIP > Snappy。具体到物理资源,使用Snappy算法占用的网络带宽最多,zstd最少,这是合理的,毕竟zstd就是要提供超高的压缩比;在CPU使用率方面,各个算法表现得差不多,只是在压缩时Snappy算法使用的CPU较多一些,而在解压缩时GZIP算法则可能使用更多的CPU。

最佳实践

了解了这些算法对比,我们就能根据自身的实际情况有针对性地启用合适的压缩算法。

首先来说压缩。何时启用压缩是比较合适的时机呢?

你现在已经知道Producer端完成的压缩,那么启用压缩的一个条件就是Producer程序运行机器上的CPU资源要很充足。如果Producer运行机器本身CPU已经消耗殆尽了,那么启用消息压缩无疑是雪上加霜,只会适得其反。

除了CPU资源充足这一条件,如果你的环境中带宽资源有限,那么我也建议你开启压缩。事实上我见过的很多Kafka生产环境都遭遇过带宽被打满的情况。这年头,带宽可是比CPU和内存还要珍贵的稀缺资源,毕竟万兆网络还不是普通公司的标配,因此千兆网络中Kafka集群带宽资源耗尽这件事情就特别容易出现。如果你的客户端机器CPU资源有很多富余,我强烈建议你开启zstd压缩,这样能极大地节省网络资源消耗。

其次说说解压缩。其实也没什么可说的。一旦启用压缩,解压缩是不可避免的事情。这里只想强调一点:我们对不可抗拒的解压缩无能为力,但至少能规避掉那些意料之外的解压缩。就像我前面说的,因为要兼容老版本而引入的解压缩操作就属于这类。有条件的话尽量保证不要出现消息格式转换的情况。

小结

总结一下今天分享的内容:我们主要讨论了Kafka压缩的各个方面,包括Kafka是如何对消息进行压缩的、何时进行压缩及解压缩,还对比了目前Kafka支持的几个压缩算法,最后我给出了工程化的最佳实践。分享这么多内容,我就只有一个目的:就是希望你能根据自身的实际情况恰当地选择合适的Kafka压缩算法,以求实现最大的资源利用率。

开放讨论

最后给出一道作业题,请花时间思考一下:前面我们提到了Broker要对压缩消息集合执行解压缩操作,然后逐条对消息进行校验,有人提出了一个方案:把这种消息校验移到Producer端来做,Broker直接读取校验结果即可,这样就可以避免在Broker端执行解压缩操作。你认同这种方案吗?

欢迎写下你的思考和答案,我们一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。

精选留言

  • 胡夕

    2019-06-26 09:28:08

    刚刚看到4天前京东提的那个jira已经修复了,看来规避了broker端为执行校验而做的解压缩操作,代码也merge进了2.4版本。有兴趣的同学可以看一下:https://issues.apache.org/jira/browse/KAFKA-8106
  • 常超

    2019-06-28 13:52:25

    文中对于消息结构的描述,确实引起了一些混乱,下面试图整理一下,希望对大家有帮助。
    消息(v1叫message,v2叫record)是分批次(batch)读写的,batch是kafka读写(网络传输和文件读写)的基本单位,不同版本,对相同(或者叫相似)的概念,叫法不一样。
    v1(kafka 0.11.0之前):message set, message
    v2(kafka 0.11.0以后):record batch,record
    其中record batch对英语message set,record对应于message。
    一个record batch(message set)可以包含多个record(message)。

    对于每个版本的消息结构的细节,可以参考kafka官方文档的5.3 Message Format 章,里面对消息结构列得非常清楚。
  • Geek_8441fd

    2019-06-25 21:09:27

    broker端校验可以分两步走。
    第1步,message set 层面,增加一个 crc,这样可以不用解压缩,直接校验压缩后的数据。
    如果校验不成功,说明message set 中有损坏的message;
    这时,再做解压操作,挨个校验message,找出损坏的那一个。

    这样的话,绝大部分情况下,是不用做解压操作的;只有在确实发生错误时,才需要解压。
    请指正。
    作者回复

    嗯嗯,挺好的。我自己也学到了一些。另外校验不仅仅是CRC校验,还有消息级别的检查。

    2019-06-26 09:57:28

  • 你好旅行者

    2019-06-25 01:56:34

    不行吧,校验操作应该也是为了防止在网络传输的过程中出现数据丢失的情况,在Producer端做完校验之后如果在传输的时候出现了错误,那这个校验就没有意义了。
    我有一个问题想请教老师,如果每次传到Broker的消息都要做一次校验,那是不是都要把消息从内核态拷贝到用户态做校验?如果是这样的话那零拷贝机制不是就没有用武之地了?
  • Li Shunduo

    2019-06-25 08:46:04

    假如一个消息集合里有10条消息,并且被压缩,但是消费端配置每次只poll 5条消息。这种情况下,消费端怎么解压缩?矛盾点是 如果只取5条消息,需要broker帮助解压缩;如果取整个消息集合10条消息,会有贷款等资源的浪费?
    作者回复

    目前java consumer的设计是一次取出一批,缓存在客户端内存中,然后再过滤出max.poll.records条消息返给你,也不算太浪费吧,毕竟下次可以直接从缓存中取,不用再发请求了。

    2019-06-26 09:15:11

  • 衣申人

    2019-12-11 21:34:36

    老师,我看源码,broker在接收producer消息并落盘这块貌似没有用零拷贝啊!只有传输给consumer时用了,求解答
    作者回复

    是的,就是你理解的那样。Kafka使用Zero Copy优化将页缓存中的数据直接传输到Socket——这的确是发生在broker到consumer的链路上。这种优化能成行的前提是consumer端能够识别磁盘上的消息格式。

    2019-12-12 08:41:42

  • 风中花

    2019-06-26 11:05:03

    胡老师您好! 我们已经学历10多节课了! 针对我们得留言和反馈,不知道您有没有给我们一些后续得课程得学习建议和方法?我目前得学习就是您告诉我们得,我必须学会记住。但是看同学们得评论和反馈,我觉得貌似还有很多很多知识啊且不知也不懂,故有此一问!希望老师能给与一点一点学习建议? 感谢老师
    作者回复

    个人觉得学一个东西最重要的还是要用,如果只是参加一些培训课程很难全面的理解。您这么多的留言我一直坚持回复。我也一直是这个观点:用起来,自然问题就来了。

    我学机器学习的经历和您现在学Kafka很像。没有实际使用场景怎么学都觉得深入不了。

    我给您的建议是:把Kafka官网通读几遍然后再实现一个实时日志收集系统(比如把服务器日志实时放入Kafka)。事实上,能把官网全面理解的话已经比很多Kafka使用者要强了。

    2019-06-27 08:48:01

  • Hello world

    2019-06-25 17:34:34

    做一个笔记
    怎么压缩:
    1、新版本改进将每个消息公共部分取出放在外层消息集合,例如消息的 CRC 值
    2、新老版本的保存压缩消息的方法变化,新版本是对整个消息集合进行压缩
    何时压缩:
    1、正常情况下都是producer压缩,节省带宽,磁盘存储
    2、例外情况 a、broker端和producer端使用的压缩方法不同 b、broker与client交互,消息版本不同
    何时解压缩:
    1、consumer端解压缩
    2、broker端解压缩,用来对消息执行验证

    优化:选择适合自己的压缩算法,是更看重吞吐量还是压缩率。其次尽量server和client保持一致,这样不会损失kafka的zero copy优势
  • 南辕北辙

    2019-06-27 10:46:25

    老师有一点不是很明白,在正常情况下broker端会原样保存起来,但是为了检验需要解压缩。该怎么去理解这个过程呢,broker端解压缩以后还会压缩还原吗?
    这个过程是在用户态执行的吗,总感觉怪怪的
    作者回复

    它只是解压缩读取而已,不会将解压缩之后的数据回写到磁盘。另外就像我置顶的留言那样,目前社区已经接纳了京东小伙伴的修改,貌似可以绕过这部分解压缩了,。

    2019-06-28 08:17:23

  • 星期八

    2019-07-12 08:46:02

    老师那再问一下,如果多条消息组成消息集合发送,那是什么条件控制消息发送,如果是一条又是什么条件控制触发发送的呢
    作者回复

    主要是这两个参数:batch.size和linger.ms。如果是生产了一条消息且linger.ms=0,通常producer就会立即发送者一条消息了。

    2019-07-12 09:14:54

  • cricket1981

    2019-06-25 09:17:25

    校验的目的是防止因为网络传输出现问题导致broker端接收了受损的消息,所以应该放在作为serverr broker端进行,而不是在作为client端的producer。改进的方案可以是针对一次传输的整个message set进行CRC检验,而不是针对一条条消息,这能够大大提高校验效率,因为避免了解压缩。
  • dream

    2019-06-25 11:10:22

    老师,我对消息层次、消息集合、消息、日志项这些概念与它们之间的关系感觉很懵,
    消息层次都分消息集合以及消息,消息集合中包含日志项,日志项中封装消息,
    那么日志项中封装的是producer发送的消息吗?
    一个日志项中会包含多条消息吗?
    消息集合中消息项封装的的消息与消息层次包含的消息有什么关系呢?
    这两个消息与producer发送的消息有什么关系呢?
    一个消息集合对应是producer发送的一条消息还是多条消息呢?
    最后,老师能不能详细说一下CRC校验,谢谢!
    作者回复

    消息批次RecordBatch里面包含若干条消息(record)。
    你可以认为消息批次和消息集合是等价的,消息和日志项是等价的。
    这样消息层次有两层:外层是消息批次(或消息集合);里层是消息(或日志项)。
    Producer以recordbatch为单位发送消息,对于V2版本一个batch中通常包含多条消息。

    在V2版本中,在batch层面计算CRC值;在V1版本中,每条消息都要计算CRC值。

    2019-06-26 10:08:17

  • dream

    2019-06-25 10:53:09

    老师,对于消息层次、消息集合、消息这三者的概念与关系我有点懵,能不能详细说一下?谢谢!
  • 曾轼麟

    2019-06-25 09:01:34

    不认同,因为网络传输也会造成丢失,但是我建议可以在消息里面使用一种消耗较小的签名方式sign,比如多使用位移等方式,broke端也这么操纵,如果签名不一致证明有数据丢失,同时签名的方式可以避免CPU大量消耗
    作者回复

    有一定道理。有机会我向社区反馈下:)

    2019-06-26 09:08:52

  • 南辕北辙

    2019-07-01 10:14:15

    老师有一点有点迷惑,broker为了多版本消息兼容,意思是一份消息有多个版本存在吗,是这个意思吗?
    作者回复

    同一台broker上可能存在多个版本的消息,但每条消息只会以1个版本的形式保存。

    2019-07-01 11:32:26

  • 2019-06-25 12:47:41

    我看了三遍老师的课,得到了我要的答案:
    1.如果生产者使用了压缩,broker为了crc校验,会启动解压,这个解压过程不可避免;
    2.v2的broker为了低版本的消费者,会把消息再次解压并进行协议转换。
    所以消费者的兼容成本较大,需要避免这个情况。
  • 归零

    2020-12-10 10:29:52

    这节觉得老师讲的有点没看懂,前面说只有两个场景broker会解压,大部分场景是"**Producer 端压缩、Broker 端保持、Consumer 端解压缩**。" 后面又说每个压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,目的就是为了对消息执行各种验证。这一块能再解释下吗?谢谢!
    作者回复

    也许应该这么说,解压的完整步骤包括:1、解压;2、把解压后的数据保存下来。broker端避免解压的意思是broker只需要把数据解压之后动态进行校验,而无需把解压之后的结果保存,是这个意思

    2020-12-16 09:02:17

  • 代码小生

    2019-09-18 17:25:00

    原来在 V1 版本中,每条消息都需要执行 CRC 校验,但是CRC在某些情况下会变化,所以crc拿到消息集和中更好,这个逻辑我没有明白呢,既然CRC会变,为了消息的正确性不更应该每条消息都校验吗?为什么说拿到消息集和中一次校验更好呢?
    作者回复

    V2依然是做CRC校验的,只不过是在record batch这个层级上做,而不是一条一条消息地做了。如果CRC校验失败,重传batch。也就是说不会以消息作为传输单位进行校验,这样效率太低

    2019-09-19 08:43:49

  • What for

    2019-08-08 22:12:50

    老师您好,您的课程很棒,又很实用又有原理性的分析!
    我想问一个问题,Producer 发送数据时以批次为单位,那么 batch 与 broker 端的消息集合又是怎么样的对应关系呢?每个消息集合的 record 数量是否固定呢?
    就是说在 Producer 端即使消息并没有达到 batch.size 的数量,linger.ms 也可以让它发送一批数据,那 broker 在低峰期的时候收到一批数据之后是会写入缓存等凑够一定数量组成一个消息集合还是说会立即(或设置超时时间)组成一个消息集合写入磁盘?
    谢谢!
    作者回复

    不是固定数量。
    “在 Producer 端即使消息并没有达到 batch.size 的数量,linger.ms 也可以让它发送一批数据” --- 是的

    取决于linger.ms的值

    2019-08-08 23:04:06

  • juan

    2019-07-09 07:29:36

    如果在配置中不指定压缩算法,kafka有默认的压缩算法吗?
    作者回复

    没有

    2019-07-09 09:08:01