14 | 幂等生产者和事务生产者是一回事吗?

你好,我是胡夕。今天我要和你分享的主题是:Kafka消息交付可靠性保障以及精确处理一次语义的实现。

所谓的消息交付可靠性保障,是指Kafka对Producer和Consumer要处理的消息提供什么样的承诺。常见的承诺有以下三种:

  • 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  • 至少一次(at least once):消息不会丢失,但有可能被重复发送。
  • 精确一次(exactly once):消息不会丢失,也不会被重复发送。

目前,Kafka默认提供的交付可靠性保障是第二种,即至少一次。在专栏第11期中,我们说过消息“已提交”的含义,即只有Broker成功“提交”消息且Producer接到Broker的应答才会认为该消息成功发送。不过倘若消息成功“提交”,但Broker的应答没有成功发送回Producer端(比如网络出现瞬时抖动),那么Producer就无法确定消息是否真的提交成功了。因此,它只能选择重试,也就是再次发送相同的消息。这就是Kafka默认提供至少一次可靠性保障的原因,不过这会导致消息重复发送。

Kafka也可以提供最多一次交付保障,只需要让Producer禁止重试即可。这样一来,消息要么写入成功,要么写入失败,但绝不会重复发送。我们通常不会希望出现消息丢失的情况,但一些场景里偶发的消息丢失其实是被允许的,相反,消息重复是绝对要避免的。此时,使用最多一次交付保障就是最恰当的。

无论是至少一次还是最多一次,都不如精确一次来得有吸引力。大部分用户还是希望消息只会被交付一次,这样的话,消息既不会丢失,也不会被重复处理。或者说,即使Producer端重复发送了相同的消息,Broker端也能做到自动去重。在下游Consumer看来,消息依然只有一条。

那么问题来了,Kafka是怎么做到精确一次的呢?简单来说,这是通过两种机制:幂等性(Idempotence)和事务(Transaction)。它们分别是什么机制?两者是一回事吗?要回答这些问题,我们首先来说说什么是幂等性。

什么是幂等性(Idempotence)?

“幂等”这个词原是数学领域中的概念,指的是某些操作或函数能够被执行多次,但每次得到的结果都是不变的。我来举几个简单的例子说明一下。比如在乘法运算中,让数字乘以1就是一个幂等操作,因为不管你执行多少次这样的运算,结果都是相同的。再比如,取整函数(floor和ceiling)是幂等函数,那么运行1次floor(3.4)和100次floor(3.4),结果是一样的,都是3。相反地,让一个数加1这个操作就不是幂等的,因为执行一次和执行多次的结果必然不同。

在计算机领域中,幂等性的含义稍微有一些不同:

  • 在命令式编程语言(比如C)中,若一个子程序是幂等的,那它必然不能修改系统状态。这样不管运行这个子程序多少次,与该子程序关联的那部分系统状态保持不变。
  • 在函数式编程语言(比如Scala或Haskell)中,很多纯函数(pure function)天然就是幂等的,它们不执行任何的side effect。

幂等性有很多好处,其最大的优势在于我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态。如果是非幂等性操作,我们还需要担心某些操作执行多次对状态的影响,但对于幂等性操作而言,我们根本无需担心此事。

幂等性Producer

在Kafka中,Producer默认不是幂等性的,但我们可以创建幂等性Producer。它其实是0.11.0.0版本引入的新功能。在此之前,Kafka向分区发送数据时,可能会出现同一条消息被发送了多次,导致消息重复的情况。在0.11之后,指定Producer幂等性的方法很简单,仅需要设置一个参数即可,即props.put(“enable.idempotence”, ture),或props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。

enable.idempotence被设置成true后,Producer自动升级成幂等性Producer,其他所有的代码逻辑都不需要改变。Kafka自动帮你做消息的重复去重。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在Broker端多保存一些字段。当Producer发送了具有相同字段值的消息后,Broker能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。当然,实际的实现原理并没有这么简单,但你大致可以这么理解。

看上去,幂等性Producer的功能很酷,使用起来也很简单,仅仅设置一个参数就能保证消息不重复了,但实际上,我们必须要了解幂等性Producer的作用范围。

首先,它只能保证单分区上的幂等性,即一个幂等性Producer能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为Producer进程的一次运行。当你重启了Producer进程之后,这种幂等性保证就丧失了。

那么你可能会问,如果我想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型Producer。这也是幂等性Producer和事务型Producer的最大区别!

事务

Kafka的事务概念类似于我们熟知的数据库提供的事务。在数据库领域,事务提供的安全性保障是经典的ACID,即原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。

当然,在实际场景中各家数据库对ACID的实现各不相同。特别是ACID本身就是一个有歧义的概念,比如对隔离性的理解。大体来看,隔离性非常自然和必要,但是具体到实现细节就显得不那么精确了。通常来说,隔离性表明并发执行的事务彼此相互隔离,互不影响。经典的数据库教科书把隔离性称为可串行化(serializability),即每个事务都假装它是整个数据库中唯一的事务。

提到隔离级别,这种歧义或混乱就更加明显了。很多数据库厂商对于隔离级别的实现都有自己不同的理解,比如有的数据库提供Snapshot隔离级别,而在另外一些数据库中,它们被称为可重复读(repeatable read)。好在对于已提交读(read committed)隔离级别的提法,各大主流数据库厂商都比较统一。所谓的read committed,指的是当读取数据库时,你只能看到已提交的数据,即无脏读。同时,当写入数据库时,你也只能覆盖掉已提交的数据,即无脏写。

Kafka自0.11版本开始也提供了对事务的支持,目前主要是在read committed隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证Consumer只能看到事务成功提交的消息。下面我们就来看看Kafka中的事务型Producer。

事务型Producer

事务型Producer能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型Producer也不惧进程的重启。Producer重启回来后,Kafka依然保证它们发送消息的精确一次处理。

设置事务型Producer的方法也很简单,满足两个要求即可:

  • 和幂等性Producer一样,开启enable.idempotence = true。
  • 设置Producer端参数transactional. id。最好为其设置一个有意义的名字。

此外,你还需要在Producer代码中做一些调整,如这段代码所示:

producer.initTransactions();
try {
            producer.beginTransaction();
            producer.send(record1);
            producer.send(record2);
            producer.commitTransaction();
} catch (KafkaException e) {
            producer.abortTransaction();
}

和普通Producer代码相比,事务型Producer的显著特点是调用了一些事务API,如initTransaction、beginTransaction、commitTransaction和abortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。

这段代码能够保证Record1和Record2被当作一个事务统一提交到Kafka,要么它们全部提交成功,要么全部写入失败。实际上即使写入失败,Kafka也会把它们写入到底层的日志中,也就是说Consumer还是会看到这些消息。因此在Consumer端,读取事务型Producer发送的消息也是需要一些变更的。修改起来也很简单,设置isolation.level参数的值即可。当前这个参数有两个取值:

  1. read_uncommitted:这是默认值,表明Consumer能够读取到Kafka写入的任何消息,不论事务型Producer提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型Producer,那么对应的Consumer就不要使用这个值。
  2. read_committed:表明Consumer只会读取事务型Producer成功提交事务写入的消息。当然了,它也能看到非事务型Producer写入的所有消息。

小结

简单来说,幂等性Producer和事务型Producer都是Kafka社区力图为Kafka实现精确一次处理语义所提供的工具,只是它们的作用范围是不同的。幂等性Producer只能保证单分区、单会话上的消息幂等性;而事务能够保证跨分区、跨会话间的幂等性。从交付语义上来看,自然是事务型Producer能做的更多。

不过,切记天下没有免费的午餐。比起幂等性Producer,事务型Producer的性能要更差,在实际使用过程中,我们需要仔细评估引入事务的开销,切不可无脑地启用事务。

开放讨论

你理解的事务是什么呢?通过今天的分享,你能列举出未来可能应用于你们公司实际业务中的事务型Producer使用场景吗?

欢迎写下你的思考和答案,我们一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。

精选留言

  • 时隐时现

    2019-09-08 15:39:59

    幂等性producer和事务型producer实现原理都没有涉及,这篇有点太肤浅了
  • Jowin

    2019-07-26 17:40:50

    研究了一下kafka的设计文档,基本搞清楚了幂等和事务消息,一些理解,https://www.jianshu.com/p/f77ade3f41fd, 欢迎大家一起交流。
  • October

    2019-07-04 17:21:27

    我所理解的kafka事务是这样的:生产者的事务能够保证一条消息仅仅会保存在kafka的某一个分区上,不会出现在多个分区上,另外,能够保证多条消息原子性的发送到多个分区。也就是说它只保证了从producer端到broker端消息不丢失不重复。但对于consumer端,由于偏移量的提交和消息处理的顺序有前有后,依然可能导致重复消费或者消息丢失消费,如果要实现消费者消费的精确一次,还需要通过额外机制在消费端实现偏移量提交和消息消费的事务处理。不知道自己理解的对不对,希望老师指正。
    作者回复

    嗯嗯,我觉得很有道理:)

    2019-07-05 08:41:47

  • dream

    2019-07-04 10:21:56

    老师,请问一下,事务型 Producer 可以实现一组消息要么全部写入成功,要么全部失败,但是事务型 Producer 是具体怎么实现多分区以及多会话上的消息无重复的呢?

    作者回复

    主要的机制是两阶段提交(2PC)。引入了事务协调器的组件帮助完成分布式事务

    2019-07-05 08:35:58

  • 风中花

    2019-07-04 13:35:52

    我一直认为事务,不到必须时是不用得东西,那么我想知道,胡老师实际中,你们有用到过吗,在一些什么场景下使用?老师可以简单说下吗,谢谢
    作者回复

    我们没有使用。事务更多用在Kafka Streams中。如果要实现流处理中的精确一次语义,事务是不可少的。

    2019-07-05 08:37:43

  • 知易

    2019-08-02 10:54:10

    版本0.11.0.2,设置transactional.id并开启事务后,须同时保证retries>1
  • 2019-10-06 21:17:59

    老师,kafka中的事务提交异常,broker端的数据还是会写入日志,相当于只是记录一下失败状态,在消费端通过隔离级别,来过滤掉出这部分消息,不进行消费。为什么事务异常了,还要将数据写入日志呢?直接删除掉不好吗?像DB那样。
    作者回复

    Kafka broker基本上还是保持append-only的日志型风格,不做删除处理

    2019-10-08 08:44:09

  • Geek_bd6gy9

    2020-04-09 11:53:18

    老师,有个地方很困惑,这句话“实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息”这是什么意思?明明写入都失败了,为什么还会写到底层的commit log中呢?那这里的写入失败是指写入磁盘失败么?麻烦老师解答一下,谢谢~~
    作者回复

    这里的写入失败是指事务失败,可能没有说太清楚。如果事务失败中止了,Kafka没法向数据库那样执行回滚,写入的日志也只能继续“躺在”日志中了,但是Kafka依赖于LSO等机制来设定一个事务型Consumer的可见范围,保证事务的准确性

    2020-04-10 09:23:19

  • 注定非凡

    2019-11-01 09:10:30

    消息交付可靠性:
    (1)可靠性保障,常见有三种:
    最多一次(at most once):消息可能会丢失,但不会被重复发送
    至少一次(at least once):消息不会丢失,但可能会重复发送
    精确一次(exactly once):消息不会对丢失,也不会被重复发送
    (2)Kafka默认提供交付可靠性保障是至少一次
    (3)Kafka消息交付可靠性保障以及精确处理一次语义通过两种机制来实现的:冥等性(Idempotence)和事务(Transaction)。


    冥等性
    (1)什么是幂等性(Idempotence)
    A:“幂等”:原是数学概念,指某些操作或函数能够被执行多次,但每次得到的结果都不变。
    B:计算机领域的含义:
    a,在命令式编程语言(如C)中,若一个子程序是幂等的,那它必然不能修改系统状态。无论这个子程序运行多少次,与该子程序的关联的那部分系统保持不变。
    b,在函数式编程语言(比如Scala或Haskell)中,很多纯函数(pure function)天然就是幂等的,他们不执行任何的side effect。
    C:冥等性的优点:最大的优势是可以安全地重试任何冥等性操作,因为他们不会破坏系统状态
    (2)冥等性Producer
    A:开启:设置props.put(“enable.idempotence”,true)或props.put(ProducerConfig.ENABLE_IDEMPOTENC_CONFIG,true)。
    B:特征:开启后,Kafka自动做消息的重复去重。
    C:实现思路:用空间换取时间,Broker端多保存一些字段,当Producer发送了具有相同字段值的消息后,Broker就可以知道这些消息重复,就将这些消息丢弃。
    D:作用范围:
    (1)只能保证单分区上幂等性,无法实现多个分区的幂等性。
    (2)只能实现单会话上的冥等性,当Producer重启后,这种幂等性保证就失效了。

    事务
    (1)事务概念:
    A:事务提供的安全性保障是经典的ACID。原子性(Atomicity),一致性(Consistency),隔离性(Isolation),持久性(Durability)。
    B:kafka的事务机制可以保证多条消息原子性地写入到目标分区,同时也能保证Consumer只能看到事务成功提交的消息。

    (2)事务型Producer:
    A:开启:
    【1】设置enable.idempotence = true。
    【2】设置Producer端参数transactional.id。最好为其设置一个有意义的名字。
    【3】调整Producer代码,显示调用事务API。
    【4】设置Consumer端参数 isolation.level 值:
    read_uncommitted(默认值,能够读到kafka写入的任何消息)
    read_committed(Consumer只会读取事务型Producer成功事务写入的消息。)
    B:特征:
    【1】能够保证将消息原子性地写入到多个分区中。一批消息要么全部成功,要么全部失败。
    【2】不惧进程重启,Producer重启回来后,kafka依然能保证发送的消息的精确一次处理。

    关键事项:
    1,幂等性无法实现多个分区以及多会话上的消息无重复,但事务(transaction)或依赖事务型Produce可以做到。
    2,开启事务对性能影响很大,在使用时要充分考虑
  • calljson

    2019-07-09 19:34:26

    文中提到了幂等性,希望能够讲解下幂等性原理,如何实现,这样大家才能明白为何幂等性无法保证:跨回话、跨分区
  • 妥协

    2020-02-23 16:44:35

    不启用幂等也可以保证同分区下无消息乱序的。——消息发送失败重发时,在broker端不会导致收到的顺序,和producer端发送顺序不一致吗?如果是的话,是类似TCP那种保证有序的机制吗?
    作者回复

    需要配合参数max.in.flight.requests.per.connection = 1来实现。这样producer会等待之前请求的消息发送成功才会发送下一个,从而不会乱序

    2020-02-23 18:57:37

  • 南山

    2019-11-09 20:47:19

    老师,事务型producer不会重复发送消息吗?如果发送的这一批到broker了,但是broker返回的确认消息producer没有收到,再次尝试,broker会去重吗?或者consumer端会去重啊?
    作者回复

    producer端可能发送重复消息,broker端有一套机制来去重(幂等性依赖seq number机制,事务依赖各种marker来标记)

    2019-11-11 08:49:06

  • 冉冉

    2020-04-23 22:21:08

    想问下老师一个关于消费者的问题,如果一个消费者组里有两个消费者c1,c2,一个topic有两个分区p1,p2,那c1永远从p1收消息,而不会收到生产者发到p2的消息对吗?
    作者回复

    hmmm.... 应该这么来说:从Kafka设计的角度,不是永远的关系,c1理论上可以收到p2的消息,只要发生了rebalance。但实际使用过程中,一旦你的group确定了分配策略,其实这种分配关系也就是确定的了。特别是对于你这个场景,无论发生多少次rebalance,p1应该都会被分配给c1。当然我说的是c1,c2都运行的情况下,如果c2挂了,c1肯定能收到p2的消息

    2020-04-24 11:04:20

  • Liam

    2019-07-04 09:18:03

    retry的话producer会保证发送到同一个分区吧,不然幂等性就没法保证了
    作者回复

    是的,会保证

    2019-07-04 09:27:35

  • 妥协

    2020-03-12 06:03:15

    重启之后标识producer的PID就变化了,broker就不认识了——这个是幂等性的另一个限制条件,无法实现夸会话的幂等性。我理解的是:一个幂等性的producer,只保证单分区的幂等性,而producer的消息会发给一个主题的多个分区,每个单分区都保证幂等性,其实就是实现了多分区的幂等性,只是无法实现跨会话的幂等性,不知道理解的对不对?
    作者回复

    嗯,很有道理:)

    2020-03-12 09:18:27

  • 快跑

    2019-07-05 12:55:11

    老师,你好
    幂等性为什么只保证单分区有效?是因为下一次消息重试指不定发送到哪个分区么。如果这样的话是不是可以采用按消息键保序的方式?这样重试消息还发送到同一个分区。
    作者回复

    重启之后标识producer的PID就变化了,broker就不认识了。要想认识就要让broker和producer做更多的事,也就是事务机制做的那些事。

    重试还是发送到同一个分区

    2019-07-06 07:57:55

  • 明翼

    2019-07-05 09:06:15

    老师请教个问题啊,我们在一个生产环境是日志入kafka,然后读取kafka的数据入到es里面,由于数据比较多,所以入到kafka的数据可能要过半天到一天才可以处理完,结果发现一个很奇怪的现象就是kafka的入的数据越快,那么入es的速度也越快,本来怀疑是kafka数据在cache里面所以快的,但是我们的数据延迟了很久,不太可能在cache,而且通过读kafka的程序日志分析,读kafka环节一直很快,只是入es的时间有快又慢,这个可能是什么问题那?
    作者回复

    如果consumer能够读取page cache中的数据而不是要去磁盘执行物理读,那么可以用上zero copy,速度应该是很快的。你可以看下你的consumer消费时broker的物理读IO,如果很低,大概率走的是page cache。另外如果读kafka很快,es忽高忽低,那是不是应该查一下ES的写入?

    2019-07-06 07:59:34

  • 奇奇

    2019-08-28 11:20:25

    事务还有用 冥等生产者没什么用,反正消费端都是有可能重复消费的,业务上必须做去重处理
    作者回复

    Kafka Streams依靠幂等producer和事务机制来保证EOS

    2019-08-28 17:34:59

  • Geek_rebecca

    2020-06-08 10:22:03

    老师,kafka的幂等性仅限于单分区会话,producer重启PID会变。那是不是说明其实kafka还是不能保证不会重复消费消息,如果要做到不重复消费,只能consumer端的代码逻辑里面去重过滤。
    作者回复

    不能完全避免消费者重复消费。最好还是使用业务去重

    2020-06-08 23:44:30

  • 胡小禾

    2020-05-12 23:53:17

    即使consumer读到了事务消息,但还是可能由于rebalance等原因导致重复消费的吧?
    作者回复

    嗯, 是的~

    2020-05-13 09:08:52