• kafka 发送文件二进制流及使用header发送附属信息


    背景

    需要使用kafka发送文件二进制以及附属信息

    案例

    发送方

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.io.InputStream;
    import java.nio.charset.StandardCharsets;
    import java.util.Properties;
    
    public class SendFileToKafka {
    
        public static void main(String[] args) {
    
            String filePath = "com/example/kafka/file/ConsumerFileByteArrayFromKafka.java";
    
            Properties kafkaProps = new Properties();
            kafkaProps.put("bootstrap.servers", "192.168.56.112:9092");
            kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            KafkaProducer<String, byte[]> producer = new KafkaProducer<>(kafkaProps);
            InputStream in = SendFileToKafka.class.getResourceAsStream("/com/example/kafka/file/ConsumerFileByteArrayFromKafka.java");
            try {
                byte[] buffer = new byte[in.available()];
                // 读到buffer字节数组中
                in.read(buffer);
                ProducerRecord<String, byte[]> record = new ProducerRecord<>("dataTopic", buffer);
                String header = "aaa";
                record.headers().add("test_header", header.getBytes(StandardCharsets.UTF_8));
                producer.send(record);
                in.close();
                producer.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    

    接收方

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.header.Header;
    import org.apache.kafka.common.header.Headers;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    public class ConsumerFileByteArrayFromKafka {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.56.112:9092");
            props.put("group.id", "group1");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    
            KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("dataTopic"));
            try {
                while (true) {
                    ConsumerRecords<String, byte[]> records = consumer.poll(100);
                    for (ConsumerRecord<String, byte[]> record : records) {
                        Headers headers = record.headers();
                        Iterable<Header> testHeader = headers.headers("test_header");
                        for (Header header : testHeader) {
                            String recordHeader = new String(header.value(), "UTF-8");
                            System.out.println("recordHeader => " + recordHeader);
                        }
                        byte[] message = record.value();
                        System.out.println(new String(message));
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        }
    }
    

    在这里插入图片描述

  • 相关阅读:
    如何用Redis实现分布式锁?
    java计算机毕业设计基于springboo+vue的学生毕业离校系统
    Python3,5行代码,制作Gif动图,太简单了。
    主成分分析法在图像压缩和重建中的应用研究-含Matlab代码
    性能测试-如何进行监控设计
    计算机毕业设计Java机械生产企业办公设备管理系统(源码+系统+mysql数据库+lw文档)
    HDU 2648:Shopping ← STL map
    深入了解快速排序:原理、性能分析与 Java 实现
    华为云HECS安装docker-compose
    UE4 中可全局获取的变量(例如游戏实例、玩家控制器等) 详解
  • 原文地址:https://blog.csdn.net/qq_37362891/article/details/139390778