• (二)kafka的事务性和幂等性


    1:事务性

    事务性多个生产者向同一个集群中不同topic投递消息时,数据一致性的保证,即整体上不重不丢不乱序且原子性(要么都成功要么都失败)。在 Kafka 中关于事务性,是有三种层面上的含义:一是幂等性的支持(幂等性是事务性的基础);二是事务性的支持;三是 Kafka Streams 的 exactly once 的实现。
    幂等性:Producer 的幂等性指的是当向同一topic发送同一条消息时,数据在 Server 端只会被持久化一次,数据不丟不重。

    1.1:生产者的幂等性

    幂等性是保证单个生产者向同一个集群中同一个topic投递的数据不重不丢不乱序,幂等性是事务性实现的基础
    1:开启幂等性

    Properties props = new Properties();
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    //当开启幂等性时默认开启,不用再手动配置下面这三个参数
    //client单个connection阻塞前可持有的最大的未确认请求数量,默认为5
    MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION<=5
    //client发送失败后重试次数,默认为Integer.MAX_VALUE
    RETRIES_CONFIG>=0
    //消息可靠性确认数量,默认为1
    ACKS_CONFIG=all(相当于配置acks=-1)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2:幂等性的实现原理
    Producer 设置 at least once 时,由于异常触发重试机制导致数据重复,幂等性的目的就是为了解决这个数据重复的问题,简单来说就是:
    at least once + 幂等 = exactly once

    Kafka Producer 在实现时有以下两个重要机制:

    • PID(Producer id),用来标识每个 producer client,每个生产者客户端唯一性;
    • sequence numbers,client 发送的每条消息都会带相应的 sequence number,Server 端就是根据这个值来判断数据是否重复,从0开始递增到。

    client端:被发送前的数据ProducerBatch 也提供了一个 setProducerState() 方法,它可以给一个 batch 添加一些 meta 信息(pid、baseSequence、isTransactional),这些信息是会伴随着 ProduceRequest 发到 Server 端,Server 端也正是通过这些 meta 来做相应的判断。

    server端处理:当 Broker 收到 ProduceRequest 请求之后,会通过 handleProduceRequest() 做相应的处理。
    1:检查是否开启了事务性
    2:检查数据是否带有pid,做数据校验
    3:再进行对应的数据写入。

    1.2:事务性

    多个生产者向同一个集群中不同topic投递消息时,数据一致性的保证,即整体上不重不丢不乱序且原子性(要么都成功要么都失败)。通过设置transactional.id 事务id实现(其必须是唯一的)。

    当用户使用 Kafka 的事务性时,Kafka 可以做到的保证:

    • 跨会话的幂等性写入:即使中间故障,恢复后依然可以保持幂等性;
    • 跨会话的事务恢复:如果一个应用实例挂了,启动的下一个实例依然可以保证上一个事务完成(commit 或者 abort);
    • 跨多个 Topic-Partition 的幂等性写入,Kafka 可以保证跨多个 Topic-Partition 的数据要么全部写入成功,要么全部失败,不会出现中间状态。

    1:开启事务性

    Properties props = new Properties();
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("client.id", "ProducerTranscationnalExample");
    props.put("bootstrap.servers", "localhost:9092");
    //通过设置事务id进行开启
    props.put("transactional.id", "test-transactional");
    props.put("acks", "all");
    KafkaProducer producer = new KafkaProducer(props);
    producer.initTransactions();
    try { 
    	String msg = "test";
    	producer.beginTransaction(); 
    	producer.send(new ProducerRecord(topic, "0", msg.toString())); 
    	producer.send(new ProducerRecord(topic, "1", msg.toString())); 
    	producer.send(new ProducerRecord(topic, "2", msg.toString())); 
    	//提交事务
    	producer.commitTransaction();
    	} 
    catch (ProducerFencedException e1) {
     	e1.printStackTrace(); producer.close();
     } catch (KafkaException e2) {
    	 e2.printStackTrace();
    	//出现异常时候进行事务回滚
     	producer.abortTransaction();
     }
     	producer.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    2:事务性要解决的问题
    事务性其实更多的是解决幂等性中没有解决的问题,比如:

    • 2.1:在写多个 Topic-Partition 时,执行的一批写入操作,有可能出现部分 Topic-Partition 写入成功,部分写入失败(比如达到重试次数),这相当于出现了中间的状态,这并不是我们期望的结果;
    • 2.2:Producer 应用中间挂之后再恢复,无法做到 Exactly-Once 语义保证;比如kafka-flink、kafka+spark等。

    Exactly-Once,仅仅靠 Kafka 是无法做到的,还需要应用本身做相应的容错设计,以 Flink 为例,其容错设计就是 checkpoint 机制,作业保证在每次 checkpoint 成功时,它之前的处理都是 Exactly-Once 的,如果中间作业出现了故障,恢复之后,只需要接着上次 checkpoint 的记录做恢复即可,对于失败前那个未完成的事务执行回滚操作(abort)就可以了,这样的话就是实现了 Flink + Kafka 端到端的 Exactly-Once
    3:事务性实现原理

    关于这点,最容易想到的应该是引用 2PC 协议(它主要是解决分布式系统数据一致性的问题)中协调者的角色,它的作用是统计所有参与者的投票结果,如果大家一致认为可以 commit,那么就执行 commit,否则执行 abort:

    我们来想一下,Kafka 是不是也可以引入一个类似的角色来管理事务的状态,只有当 Producer 真正 commit 时,事务才会提交,否则事务会还在进行中(实际的实现中还需要考虑 timeout 的情况),不会处于完成状态;

    Producer 在开始一个事务时,告诉【协调者】事务开始,然后开始向多个 Topic-Partition 写数据,只有这批数据全部写完(中间没有出现异常),Producer 会调用 commit 接口进行 commit,然后事务真正提交,否则如果中间出现异常,那么事务将会被 abort(Producer 通过 abort 接口告诉【协调者】执行 abort 操作);

    这里的协调者与 2PC 中的协调者略有不同,主要为了管理事务相关的状态信息,这就是 Kafka Server 端的 TransactionCoordinator 角色;

    为了保证TransactionCoordinator 的高可用和容错性,事务数据(transaction log)就是 __transaction_state 这个内部 topic,所有事务状态信息都会持久化到这个 topic,TransactionCoordinator 在做故障恢复也是从这个 topic 中恢复数据;

  • 相关阅读:
    火狐浏览器找不到书签了
    ipad触控笔是哪几款?开学季便宜的ipad电容笔推荐
    牛客java选择题每日打卡Day2
    HTML做一个个人博客页面(纯html代码)
    libusb系列-002-Windows下libusb源码编译
    计算机毕设 opencv python 深度学习垃圾图像分类系统
    【华为OD机试python】报数游戏【2023 B卷|100分】
    什么时候一个变量应该以state的形式出现?
    Spring的注解开发-注解原理解析-xml方式/注解方式组件扫描
    MySQL 使用 pt-archiver 删除数据
  • 原文地址:https://blog.csdn.net/weixin_43930865/article/details/126154306