• Kafka - 图解生产者消息发送流程



    在这里插入图片描述


    发送原理

    在这里插入图片描述


    Kafka的Producer发送消息采用的是异步发送的方式。

    在消息发送的过程中,涉及到了两个线程:main线程和Sender线程,以及一个线程共享变量:RecordAccumulator

    • ①main线程中创建了一个双端队列RecordAccumulator,将消息发送给RecordAccumulator。

    • ②Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。


    Kafka的Producer发送消息采用了异步发送的方式,这个过程确实涉及到多个线程以及共享变量。下面详细展开说明这个过程:

    1. 主线程 (main thread):

    主线程是生产者应用的线程,它负责创建消息并将这些消息发送给Kafka Producer API。主要的操作包括:

    • 创建消息:主线程创建消息,将它们封装成ProducerRecord对象。ProducerRecord通常包括消息的主题(topic)、分区(partition)、键(key)和值(value)等信息。

    • 发送消息到RecordAccumulator:主线程将创建的消息发送到一个双端队列(deque)叫做RecordAccumulator。这个队列用于缓冲消息,允许Producer线程将消息异步发送到Kafka集群,而不需要等待每条消息都被立刻发送。

    2. Sender 线程:

    Sender线程是Kafka Producer内部的一个后台线程,它负责从RecordAccumulator中拉取消息并发送到Kafka broker。Sender线程的主要工作如下:

    • 从RecordAccumulator拉取消息:Sender线程定期轮询(poll)RecordAccumulator,检查是否有新消息需要发送。这个轮询是异步的,因此主线程不需要等待消息被发送。

    • 构建请求:当Sender线程发现有消息需要发送,它会构建一个或多个ProducerRequest,每个请求包含多个消息,以便进行有效的批量发送。

    • 发送消息到Kafka broker:Sender线程将构建的请求发送到Kafka broker,等待来自broker的响应。一旦消息被成功接收并记录在Kafka broker中,Sender线程会通知RecordAccumulator,以便它可以更新消息的状态。

    3. RecordAccumulator:

    RecordAccumulator是Producer内部的一个共享变量,用于暂存即将发送到Kafka broker的消息。主要功能包括:

    • 暂存消息:主线程将消息发送到RecordAccumulator中,使其在等待Sender线程处理。

    • 管理消息的状态:RecordAccumulator跟踪每条消息的发送状态,以确保消息被成功发送到Kafka broker。一旦消息被成功写入到Kafka broker的日志中,RecordAccumulator会将消息的状态标记为已发送。

    • 负责消息批量化:RecordAccumulator也有助于消息的批量发送,以减少网络开销和提高性能。

    发送原理小结

    总结一下,Kafka的Producer采用异步发送消息的方式,

    • 主线程负责创建和发送消息到RecordAccumulator,

    • 而Sender线程负责从RecordAccumulator中拉取消息并将其发送到Kafka broker。

    • RecordAccumulator充当缓冲区,用于管理消息的状态以及批量发送,以提高性能和降低延迟。

    这个架构充分利用了多线程和异步操作,使得Producer能够高效地发送消息到Kafka集群


    重要参数

    参数名称描述
    bootstrap.servers生产者连接集群所需的broker地址清单。可以设置1个或者多个,中间用逗号隔开。生产者从给定的broker里查找到其他broker信息。
    key.serializer, value.serializer指定发送消息的key和value的序列化类型。要写全类名。(反射获取)
    buffer.memoryRecordAccumulator缓冲区总大小,默认32m。
    batch.size缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
    linger.ms如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。
    acks0:生产者发送过来的数据,不需要等数据落盘应答。
    1:生产者发送过来的数据,Leader数据落盘后应答.
    -1(all):生产者发送过来的数据,Leader和isr队列里面的所有节点数据都落盘后应答。默认值是-1
    max.in.flight.requests.per.connection允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。
    Retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
    retry.backoff.ms两次重试之间的时间间隔,默认是100ms。
    enable.idempotence是否开启幂等性,默认true,开启幂等性。
    compression.type生产者发送的所有数据的压缩方式。默认是none,不压缩。支持压缩类型:none、gzip、snappy、lz4和zstd。

    在这里插入图片描述

  • 相关阅读:
    SpringBoot限制接口访问频率 - 这些错误千万不能犯
    谁给乡镇夫妻店的数字化铺路?
    CircRNA+代谢组如何冲击22分高分文章?
    基于 Python 的简单域名反查 IP 脚本
    2022-02-12 数据安全/付出会有回报
    基于PyTorch使用LSTM实现新闻文本分类任务
    牛客刷题之图论-最小生成树
    机器学习 | Python实现KNN(K近邻)算法模型
    【无标题】
    【深入浅出Java并发编程指南】「原理分析篇」底层角度去分析线程的实现原理
  • 原文地址:https://blog.csdn.net/yangshangwei/article/details/134036676