认识偏移量(Offset)

认识偏移量(Offset)快速了解 1 偏移量是什么 2 为什么需要 Offset 这个东西 3 消费者如何提交偏移量 4 消费者重复消费和消息丢失和偏移量有关吗问题一 偏移量是什么 偏移量是 kafka 的一个标识符 主要是用来跟踪消息在某个分区中的特定位置的 可分成分区消息

大家好,欢迎来到IT知识分享网。

快速了解:

1、偏移量是什么?

2、为什么需要Offset这个东西

3、消费者如何提交偏移量

4、消费者重复消费和消息丢失和偏移量有关吗

问题一:偏移量是什么?

偏移量是kafka的一个标识符,主要是用来跟踪消息在某个分区中的特定位置的。

可分成分区消息的偏移量、消费者消费的偏移量。当一条新消息被写入分区时,它的偏移量就会递增,消费者通过这个偏移量来确定已消费到哪条消息。

认识偏移量(Offset)

定义了 消费者消费的偏移量,使得消费者每次调用poll()方法之后,它总是返回由生产者写入Kafka但还没有被消费者读取过的记录。

问题二:为什么需要Offset这个东西

  1. 跟踪消息位置,确保消费者能准确读取数据;
  2. 实现消息的顺序处理,偏移量保证了消息消费的顺序;
  3. 支持消费进度管理,消费者可以在崩溃后从最后消费的偏移量继续读取。

问题三:消费者如何提交偏移量

主要有五种形式:

  • 自动提交偏移量
  • 手动提交当前偏移量
  • 异步提交当前偏移量
  • 同步和异步组合提交
  • 自定义存储 offset

自动提交偏移量

最简单的方式是让消费者自动提交偏移量。

enable.auto.commit设为true,每过5s,消费者会自动把从poll()方法接收到的最大偏移量提交上去 。提交时间间隔由auto.commit.interval.ms控制,默认值是5s。

缺点:

  1. 可能导致消息丢失,尤其是在消费者处理消息失败时;
  2. 消费进度与处理进度不一致,可能造成重复消费或漏消费;
  3. 对消费者的故障恢复不够灵活,降低了处理的可靠性

例如poll()拉取到100条消息,客户端同时进行逻辑处理,然后5s后客户端才处理完50条消息,此时由于到达了应该提交的时间,因此消费者客户端上报目前消费offset为100,上报完成后,正好客户端崩溃,当客户端重启后,从Broker获取到的同步位移却是100,此时便出现了消息的丢失。

手动提交当前偏移量(同步)

enable.auto.committ设为 false,让应用程序决定何时提交偏 移量。使用 commitSync 提交偏移量最简单也最可靠。这个 API会提交由 poll() 方法返回 的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。

commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败)。

如果提交失败,或者在处理消息的过程中发生了再均衡,那么这部分的的消息也都是会被重复处理的

认识偏移量(Offset)

异步提交当前偏移量

enable.auto.committ设为 false。虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响

认识偏移量(Offset)

  • commitAsync 没有失败重试机制,故有可能提交失败。

为什么异步没有失败重试机制呢?主要避免一些极端问题导致重复消费的问题。 比如说轮询1异步提交offset的时候,offset为1000,这个时候假如网络短暂连接超时了,服务器没有给出响应。与此同时,轮询2开始了,并且把offset为2000成功给提交上去了。如果有失败重试,把轮询1更新上去,那就会导致数据重复消费问题了。

— 解决(1)为异步提交提供一个重试机制

是否重试异步提交:使用一个单调递增的序列号来维护异步提交的顺序。在每次提交偏 移量之后或在回调里提交偏移量时递增序列号。在进行重试前,先检查回调 的序列号和即将提交的偏移量是否相等,如果相等,说明没有新的提交,那么可以安全地进行重试。如果序列号比较大,说明有一个新的提交已经发送 出去了,应该停止重试。

无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费。

同步和异步结合提交当前偏移量

一般情况下,采取的异步提交,出现偶尔的失败,没有进行重试,也没影响,因为后面的轮询逻辑会将偏移量给提交上去。但是针对于关闭消费者的场景,关闭前的最好一次提交,如何确保提交成功呢?

一般会采取异步+同步结合的方式。

认识偏移量(Offset)

自定义存储 offset

Kafka 0.9 版本之前, offset 存储在 zookeeper, 0.9 版本及之后,默认将 offset 存储在 Kafka的一个内置的 topic 中。除此之外, Kafka 还可以选择自定义存储 offset。

offset 的维护是相当繁琐的, 因为需要考虑到消费者的 再均衡。

要实现自定义存储 offset,需要借助 ConsumerRebalanceListener

认识偏移量(Offset)

问题四:消费者重复消费和消息丢失和偏移量有关吗

偏移量提交的结果

认识偏移量(Offset)

如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失

认识偏移量(Offset)

那什么时候的场景会触发这个消息的重复消费以及消息的丢失呢?

消息重复消费

  1. 消费者重启:如果消费者在处理消息时崩溃或重启,可能会重新消费未确认的消息。
  2. 手动提交偏移量:如果消费者手动提交偏移量,并且在提交之前处理了消息,重启后可能会再次处理这些消息。
  3. 多消费组:当多个消费者在同一消费组中处理相同的主题时,如果某个消费者失败,其他消费者可能会接管其任务,导致消息被多次消费。

消息丢失

  1. 未正确配置的副本:如果 Kafka 的副本配置不正确(例如,min.insync.replicas 设置不合理),在主节点故障时可能会导致消息丢失。
  2. 生产者未确认:如果生产者发送消息后没有等待确认,可能会在网络问题或服务器故障时丢失消息。
  3. 日志清理策略:如果 Kafka 的保留策略设置得过于激进,例如使用时间或大小限制,可能会在消费者未及时消费时丢失旧消息。

解决方案

  • 使用 幂等性生产者 来确保消息只被发送一次。
  • 在消费者中实现 事务性处理,以确保消息的准确消费。
  • 配置适当的 副本因子 和 确认机制 以提高消息的持久性。

消息重复消费示例

场景:一个电商平台的订单处理系统。

  1. 消费者重启:
    • 假设有一个消费者负责处理订单消息。
    • 消费者在处理一条订单消息时发生崩溃(例如,由于内存溢出)。
    • 在消费者重启后,Kafka 会从最后提交的偏移量处继续消费,这可能导致消费者重新处理之前未完成的订单。
    • 结果:同一条订单被处理两次,可能导致客户的订单被多次确认。
  1. 手动提交偏移量:
    • 假设消费者手动处理消息并在处理完成后提交偏移量。
    • 如果在处理过程中,消费者意外崩溃而未提交偏移量,重启后将再次消费该消息。
    • 结果:相同的消息被处理多次,导致业务逻辑错误(如重复扣款)。

消息丢失示例

场景:日志记录系统。

  1. 未正确配置的副本:
    • 假设 Kafka 集群配置为只有一个副本,且没有设置适当的 min.insync.replicas。
    • 生产者发送消息,但主节点在发送后立即崩溃。
    • 如果没有备用副本来接管,消息将会丢失。
    • 结果:重要的日志信息丢失,导致后续问题无法追踪。
  1. 生产者未确认:
    • 生产者发送消息,但未设置 acks=all,只等待 leader 确认。
    • 在发送消息后,网络出现问题,消息未成功写入到所有副本。
    • 如果 leader 节点崩溃,未被保存的消息将会丢失。
    • 结果:数据丢失,影响系统的完整性。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/169122.html

(0)
上一篇 2025-02-05 18:25
下一篇 2025-02-05 18:26

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信