Kafka Exactly Once 语义实现原理:幂等性与事务消息

01 前言

在现代分布式系统中,确保数据处理的准确性和一致性是至关重要的。Apache Kafka,作为一个广泛使用的流处理平台,提供了强大的消息队列和流处理功能。随着业务需求的增长,Kafka 的事务消息功能应运而生,它允许应用程序以一种原子的方式处理消息,即要么所有消息都被正确处理,要么都不处理。本文将深入剖析 Kafka 的 Exactly-Once 语义实现原理,包括幂等性与事务消息的关键概念,以及它们是如何在 Kafka 中实现的。我们将探讨 Kafka 事务的流程,事务提供的 ACID 保证,以及在实际应用中可能遇到的一些限制。无论您是 Kafka 的新手还是经验丰富的开发者,本文都将为您提供有价值的见解和指导。

02 消息队列的事务场景

Kafka 目前用于流处理的场景:相当于一个有向无环图(DAG,Directed acyclic graph)每个节点是一个 Kafka Topic,每条边是一个流处理操作。在这样的场景下,有两种操作:ꔷ 消费上游消息并提交位点ꔷ 处理消息并发送到下游 Topic

对于由这两种操作构成的一组处理流程需要具备事务语义,这样我们就可以不重复(Exactly Once)的处理上游消息并将结果可靠地存储在下游 Topic 中。

上图是一个典型的 Kafka 事务的流程,我们可以看到:MySQL 的 binlog 作为上游数据源将数据写入到 Kafka 中,Spark Streaming 从 Kafka 中读取数据并进行处理,最后将处理结果写入到另外两个 Topic 中(图中三个 Topic 位于同一集群中)。其中消费 Topic A 与写入 Topic B 和 Topic C 的操作具备事务语义。

03 Kafka 的 Exactly Once 语义

从上述的场景中我们可以发现,事务消息最主要的动机是在流处理中实现 Exactly Once 的语义,这可以分为:
ꔷ 仅发送一次: 单分区仅发送一次由生产者幂等保证,多分区仅发送一次由事务机制保证
ꔷ 仅消费一次: Kafka 通过消费位点的提交来控制消费进度,而消费位点的提交被抽象成向系统 topic 发送消息。这就使得发送和消费行为统一起来,只要解决了多分区发送消息的一致性就能实现 Exactly Once 语义

04 生产者幂等性

在创建 Kafka 生产者时设置了 enable.idempotence 参数,用于开启生产者幂等性。

val props = new Properties()
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")

val producer = new KafkaProducer(props)

Kafka 的发送幂等是通过序列号来实现的,每个消息都会被分配一个序列号,序列号是递增的,这样就可以保证消息的顺序性。当生产者发送消息时,会将消息的序列号和消息内容一起写入到日志文件中,下次收到非预期序列号的消息就会返回 OutOfOrderSequenceException 异常。

设置 enable.idempotence 参数后,生产者会检查以下三个参数的值是否合法(ProducerConfig#postProcessAndValidateIdempotenceConfigs) ꔷ max.in.flight.requests.per.connection 必须小于 5
ꔷ retries 必须大于 0
ꔷ acks 必须设置为 all
Kafka 将消息的序列号信息保存在分区维度的 .snapshot 文件中,格式如下(ProducerStateManager#ProducerSnapshotEntrySchema):

我们可以发现,该文件中保存了 ProducerId、ProducerEpoch 和 LastSequence。所以幂等的约束为:相同分区、相同 Producer(id 和 epoch) 发送的消息序列号需递增。即 Kafka 的生产者幂等性只在单连接、单分区生效,Producer 重启或消息发送到其他分区就失去了幂等性的约束。

.snapshot 文件在 log segment 滚动时更新,发生重启后通过读取 .snapshot 文件和最新的日志文件即可恢复 Producer 的状态。Broker 的重启或分区迁移并不会影响幂等性。

05 事务消息流程

我们首先从 Demo 开始,来看一下如何使用 Kafka 客户端完成一个事务:

// 事务初始化
val props = new Properties()
...
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")

val producer = new KafkaProducer(props)
producer.initTransactions()
producer.beginTransaction()

// 消息发送
producer.send(RecordUtils.create(topic1, partition1, "message1"))
producer.send(RecordUtils.create(topic2, partition2, "message2"))

// 事务提交或回滚
producer.commitTransaction()

5.1 事务初始化
Kafka Producer 启动后我们使用两个 API 来初始化事务:initTransactions 和 beginTransaction。

回顾一下我们的 Demo,在发送消息时是发送到两个不同分区中,这两个分区可能在不同的 Broker 上,所以我们需要一个全局的协调者 TransactionCoordinator 来记录事务的状态。

所以,在 initTransactions 中,Producer 首先发送 ApiKeys.FIND_COORDINATOR 请求获取 TransactionCoordinator。

之后即可向其发送 ApiKeys.INIT_PRODUCER_ID 请求获取 ProducerId 及  ProducerEpoch(也是上文中用于幂等的字段)。此步骤生成的 id 和 epoch 会写入内部 Topic __transaction_state 中,并且将事务的状态置为 Empty。

__transaction_state 是 compaction Topic,其中消息的 key 为客户端设置的transactional.id(详见 TransactionStateManager#appendTransactionToLog)。

区别于 ProducerId 是服务端生成的内部属性;TransactionId 由用户设置,用于标识业务视角认为的“同一个应用”,启动具有相同 TransactionId 的新 Producer 会使得未完成的事务被回滚并且来自旧 Producer(具有较小 epoch)的请求被拒绝掉。

后续 beginTransaction 用于开始一个事务,该方法会创建一个 Producer 内部事务状态,标识这一个事务的开始,并不会有 RPC 产生。

5.2 消息发送
上一节说到 beginTransaction 只是更改 Producer 内部状态,那么在第一条消息发送时才隐式开启了事务:
首先,Producer 会发送 ApiKeys.ADD_PARTITIONS_TO_TXN 请求到 TransactionCoordinator。TransactionCoordinator 会将这个分区加入到事务中,并更改事务的状态为 Ongoing,这些信息被持久化到 __transaction_state 中。

然后 Producer 使用 ApiKeys.PRODUCE 请求正常发送消息到对应的分区中。这条消息的可见性控制在下文消息消费一节中会详细讨论。

5.3 事务提交与回滚
当所有消息发送完成后,Producer 可以选择提交或回滚事务,此时:
ꔷ TransactionCoordinator:具有当前事务所有相关分区的信息
ꔷ 其他 Broker:已经将消息持久化到日志文件中

接下来 Producer 调用 commitTransaction 会发送 ApiKeys.END_TXN 请求将事务状态更改为 PrepareCommit(回滚事务对应状态 PrepareAbort)并持久化到 __transaction_state 中,此时从 Producer 的视角来看整个事务已经结束了。

TransactionCoordinator 会异步向各个 Broker 发送 ApiKeys.WRITE_TXN_MARKERS 请求,当所有参加事务的 Broker 都返回成功后,TransactionCoordinator 会将事务状态更改为 CompleteCommit(回滚事务对应状态 CompleteAbort)并持久化到 __transaction_state 中。

5.4 消息的消费
某个分区的消息可能是事务消息与非事务消息混杂的,如下图所示:

在 Broker 处理 ApiKeys.PRODUCE 请求时,完成消息持久化会更新 LSO 到第一条未提交的事务消息的 offset。这样在消费者消费消息时,可以通过 LSO 来判断消息是否可见:如果设置了 isolation.level 为 read_committed 则只会消费 LSO 之前的消息。

LSO(log stable offset): 它表示的是已经被成功复制到所有副本(replicas)并且可以被消费者安全消费的消息的最大偏移量。 

但是我们可以发现 LSO 之前存在已回滚的消息(图中红色矩形)这些消息应该被过滤掉:在 Broker 处理 ApiKeys.WRITE_TXN_MARKERS 请求时,会将已回滚的消息索引写入到 .txnindex 文件中(LogSegmentKafka#updateTxnIndex)。

后续 Consumer 消费消息时还会收到对应区间的已取消事务消息列表,上图区间中的该列表为:

代表 offset 在 [2,5] 之间且由 id 为 11 的 Producer 发送的消息都已回滚。
上文我们讨论了 __transaction_state 的实现确保同一时间,同一 TransactionId 有且只有一个事务在进行中。所以可以使用 ProducerId 和 offset 区间定位回滚的消息不会发生冲突。

06 Kafka 事务提供的 ACID 保证

ꔷ 原子性(Atomicity)
Kafka 通过对 __transaction_state Topic 的写入实现了事务状态的转移,保证了事务要么同时提交,要么同时回滚。

ꔷ 一致性(Consistency)
在事务进入 PrepareCommit 或 PrepareAbort 阶段时, TransactionCoordinator 异步向所有参与事务的 Broker 提交或回滚事务。这使得 Kafka 的事务做不到强一致性,只能通过不断重试保证最终一致性。

ꔷ 隔离性(Isolation)
Kafka 通过 LSO 机制和 .txnindex 文件来避免脏读,实现读已提交(Read Committed)的隔离级别。

ꔷ 持久性(Durability)
Kafka 通过将事务状态写入到 __transaction_state Topic 和消息写入到日志文件中来保证持久性。

07 Kafka 事务的限制

从功能上看,Kafka 事务并不能支持业务方事务,强限制上游的消费和下游写入都需要是同一个 Kafka 集群,否则就不具备原子性保障。
从性能上看,Kafka 事务的性能开销主要体现在生产侧:

  1. 开启事务时需要额外的 RPC 请求定位 TransactionCoordinator 并初始化数据

  2. 消息发送需要在发送消息前向 TransactionCoordinator 同步请求添加分区,并将事务状态的变化写入到 __transaction_state Topic

  3. 事务提交或回滚时需要向所有参与事务的 Broker 发送请求

对于涉及分区较少且消息数量较多的事务,事务的开销可以被均摊;反之,较多的同步 RPC 带来的开销会极大影响性能。并且每个生产者只能有一个事务在进行中,这就意味着事务的吞吐量会受到限制。

消费侧也有一定的影响:消费者只能看到 LSO 以下的消息,并且需要额外的索引文件来过滤已回滚的消息,这无疑会增加端到端的延迟。

08 总结

通过本文的深入分析,我们了解到 Kafka 的事务消息功能是如何在流处理场景中提供 Exactly-Once 语义的。Kafka 通过其事务 API 和内部机制,实现了消息发送的原子性、最终一致性、隔离性和持久性,尽管在实际应用中可能存在一些性能和功能上的限制。开发者和架构师应当充分理解这些概念,并在设计系统时考虑如何有效地利用 Kafka 的事务功能,以构建更加健壮和可靠的数据处理流程。

AutoMQ 是构建于对象存储之上的云原生 Kafka fork,在解决了 Kafka 已有的成本和弹性问题基础上对 Kafka 100%兼容,因此在 AutoMQ 上也可以使用 Kafka 事务消息。AutoMQ 作为国内 Kafka 生态的忠实拥护者,我们将持续为 Kafka 技术爱好者带来优质的 Kafka 技术内容分享,欢迎关注我们。

关于我们

我们是来自 Apache RocketMQ 和 Linux LVS 项目的核心团队,曾经见证并应对过消息队列基础设施在大型互联网公司和云计算公司的挑战。现在我们基于对象存储优先、存算分离、多云原生等技术理念,重新设计并实现了 Apache Kafka 和 Apache RocketMQ,带来高达 10 倍的成本优势和百倍的弹性效率提升。

🌟 GitHub 地址:https://github.com/AutoMQ/automq
💻 官网:https://www.automq.com
👀 B站:AutoMQ官方账号
🔍 视频号:AutoMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/583075.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MacPro(M1,M2芯片)Java开发和常用工具开源软件合集

目录 Java开发软件1 IDE1.1 idea1.2 Vs Code 2 开发工具2.1 数据库数据库模型管理数据库连接客户端 2.2 SSH/Telnet/Serial/Shell/Sftp客户端2.3 MarkDown编辑器2.3 代码片段管理粘贴 3小工具3.1 截图贴图3.2 Mac下修改hosts文件的图形化界面软件 Java开发软件 1 IDE 1.1 ide…

第三方软件测试机构-科技成果评价测试

科技成果评价测试是对科研成果的工作质量、学术水平、实际应用和成熟程度等方面进行的客观、具体、恰当的评价过程。这一评价过程有助于了解科技成果的质量和水平,以及其在学术和应用方面的价值和潜力。 科技成果评价测试主要包括以下几个方面: 工作质量…

设计不外流,保护创意的同时锁住图纸安全!

在设计行业中,图纸和创意文稿的安全至关重要,因为它们体现了企业的创新能力和核心竞争力。华企盾DSC数据防泄密系统提供了一系列功能,可以有效地保护这些珍贵的设计和文档不被外泄。以下是如何利用华企盾DSC系统保障设计图纸安全的关键措施&a…

tableau如何传参数到MySQL数据库

1、打开tableau连接本地MySQL-》新建自定义sql-》创建参数 2、新建一个简单的工作表-》把维度拖拽到行显示结果-》右键显示参数 3、参数传递到数据库sql写法 select * from yonghu where yonghu.姓名 like concat(%,<参数.姓名>,%)select * FROMabadata4WHERE abadata4…

mysql-sql-练习题-1

文章目录 环境注释建表 5张建库学生表课程表教师表分数表总表 语法书写顺序in学过/没学过完全相同 环境 Windows cmd&#xff08;普通用户/管理员&#xff09; mysql -uroot -pmysql版本&#xff0c;模式&#xff08;可自定义&#xff09; select version(),global.sql_mode…

不完全微分PD控制器(CODESYS源代码+算法详细介绍)

完全微分计算公式为Kp*Td/Ts(e(k)-e(k-1))。有关位置式PID和增量式PID更多相关内容,大家可以参考下面的文章链接: 1、CODESYS位置式PID CODESYS位置式PID(完整ST源代码)_codesys pid功能块-CSDN博客文章浏览阅读1.1k次,点赞2次,收藏2次。CODESYS增量式PID完整源代码请参看…

中国标准地图如何与卫星影像叠加

我们在《一幅SHP格式的中国标准地图》一文中&#xff0c;为你分享过一幅SHP格式的中国标准地图&#xff0c;但该数据为等积投影。 由于我们常用的卫星影像为WGS84经纬度投影或墨卡托投影&#xff0c;那么将该数据如何与卫星影像进行叠加制作专题图呢&#xff1f; 我们现在就来…

day17-day20_项目实战项目部署

万信金融 项目部署 目标&#xff1a; 理解DevOps概念 能够使用Docker Compose部署项目 理解持续集成的作用 会使用Jenkins进行持续集成 1 DevOps介绍 1.1 什么是DevOps DevOps是Development和Operations两个词的缩写&#xff0c;引用百度百科的定义&#xff1a; DevOps…

Windows Server配置网卡绑定:NIC组合

正文共&#xff1a;1024 字 12 图&#xff0c;预估阅读时间&#xff1a;1 分钟 在网络设备上&#xff0c;为了提高可靠性&#xff0c;一般会配置链路聚合&#xff08;Link Aggregation&#xff09;&#xff08;网络之路28&#xff1a;二层链路聚合&#xff09;&#xff0c;同样…

GNU Radio之OFDM Channel Estimation底层C++实现

文章目录 前言一、 OFDM Channel Estimation 模块简介二、C 具体实现1、初始化和配置参数2、forecast 函数3、计算载波偏移量4、提取信道响应5、核心的数据处理任务 前言 OFDM Channel Estimation 模块的功能是根据前导码&#xff08;同步字&#xff09;估计 OFDM 的信道和粗略…

FileLink内外网文件摆渡系统产品介绍

在现代企业中&#xff0c;往往存在着多个网络、系统之间的数据孤岛问题&#xff0c;数据难以互相访问和共享。 一、常用的内外网文件摆渡方式 传统的数据交换方式往往需要人工介入&#xff0c;效率低下且容易出错。如&#xff1a;U盘、FTP、VPN等&#xff0c;极易引发各种各样…

CSS常见的 9 个单位汇总!

你好&#xff0c;我是云桃桃。 一个希望帮助更多朋友快速入门 WEB 前端的程序媛。 云桃桃-大专生&#xff0c;一枚程序媛&#xff0c;感谢关注。回复 “前端基础题”&#xff0c;可免费获得前端基础 100 题汇总&#xff0c;回复 “前端工具”&#xff0c;可获取 Web 开发工具合…

银行卡归属地查询API接口快速对接

银行卡归属地查询API接口指的是通过银行卡号查询该银行卡详细信息&#xff0c;包括银行卡名称、卡种、卡品牌、发卡行、编号以及归属地等信息&#xff0c;支持一千多家银行返回归属地信息&#xff0c;那么银行卡归属地查询API接口如何快速对接呢&#xff1f; 首先找到有做银行…

短视频橱窗好物带货者必看:如何解决无商品素材无收益还限流的烦恼?

随着短视频橱窗带货越来越火爆&#xff0c;许多人发现通过短视频橱窗好物带货素材APP不仅可以提升创作效果&#xff0c;还能轻松赚取佣金。下面&#xff0c;为您推荐三款一键领取并直接发布的抖音短视频素材APP&#xff0c;确保您在创作短视频时&#xff0c;既有高质量的素材&a…

Facebook’s Tectonic Filesystem: Efficiency from Exascale——论文阅读

FAST 2021 Paper 分布式元数据论文阅读笔记整理 背景 Blob storage 用来存放大量的文本、图片、视频等非结构化数据 包含 EB 级别的数据 存储内容大小不一&#xff0c;大小几KB到几MB不等 要求低时延 使用 Haystack 和 F4 Data warehouse 存放用于数据分析和机器学习的…

AI与新能源催生新增长,电子制造业如何提升预测力与连接力?

国产替代和新基建带来的结构性机遇&#xff0c;AI和新能源汽车行业的增长所带来的需求提升&#xff0c;都给电子制造行业以乐观的理由。但是&#xff0c;不少企业的客户经营管理、供需平衡与供应链协同等所面临的挑战仍在&#xff0c;如何为行业高质量增长持续注入动能&#xf…

使用mmdetection来训练自己的数据集(visdrone)(四)结果分析

测试 python tools/test.py <your-config-file> <your-model-weights-file> --out <save-pickle-path>关于test.py 的命令行 parser.add_argument(--out,typestr,helpdump predictions to a pickle file for offline evaluation)计算量、参数量计算脚本 pyth…

PNPM - nodejs 包管理

文章目录 一、关于 PNPM开发动机1、节省磁盘空间2、提升安装速度3、创建一个 non-flat node_modules 文件夹 二、安装通过 npm 安装 pnpm通过 Homebrew 安装 pnpm 三、pnpm CLI1、与 npm 的差异2、参数-C <path>, --dir <path>-w, --workspace-root 3、命令4、环境…

如何监控ANR

在 Android 开发中&#xff0c;“应用程序无响应”&#xff08;ANR&#xff09;是一种常见的问题&#xff0c;当应用程序在主线程上执行过长时间的操作时就会出现。本文将详细介绍ANR的成因&#xff0c;以及如何有效监控和预防这一问题。 什么是ANR&#xff1f; ANR&#xff…

PADS看图常用操作

1.放大缩小&#xff1a; 方法1&#xff0c;CTRL滚轮 方法2&#xff0c;按住滚轮&#xff0c;前后移动鼠标。 方法3&#xff0c;PageUP&#xff0c;PageDown按键 2.PADS layout只显示单层&#xff08;当前层&#xff09;怎么操作&#xff0c;而不显示其他层 第一步&#xff1a;…