• Kafka与Flink的整合 -- sink、source


    1、首先导入依赖:
    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-connector-kafka</artifactId>
    4. <version>1.15.2</version>
    5. </dependency>
    2、 source:Flink从Kafka中读取数据
    1. public class Demo01KafkaSource {
    2. public static void main(String[] args) throws Exception{
    3. //构建环境
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. //构建kafka source 环境
    6. KafkaSource source = KafkaSource.builder()
    7. //指定broker列表
    8. .setBootstrapServers("master:9092,node1:9092,node2:9092")
    9. //指定topic
    10. .setTopics("bigdata")
    11. //指定消费组
    12. .setGroupId("my-group")
    13. //指定数据的读取的位置,earliest指的是读取最早的数据,latest:指定的读取的是最新的数据
    14. .setStartingOffsets(OffsetsInitializer.earliest())
    15. //读取数据格式:
    16. .setValueOnlyDeserializer(new SimpleStringSchema())
    17. .build();
    18. //使用kafka数据源
    19. DataStreamSource kafkaSourceDS = env.
    20. fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    21. kafkaSourceDS.print();
    22. //启动flink
    23. env.execute();
    24. }
    25. }
            启动生产kafka:
    kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bigdata
    3、sink:Flink向Kafka中写入数据
    1. public class Demo02KafkaSink {
    2. public static void main(String[] args) throws Exception{
    3. //构建flink的环境
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. //读取数据文件:
    6. DataStreamSource studentDS = env.readTextFile("flink/data/students.txt");
    7. //创建kafka sink
    8. KafkaSink sink = KafkaSink.builder()
    9. //指定flink broker列表
    10. .setBootstrapServers("master:9092,node1:9092,node2:9092")
    11. //指定数据的格式:
    12. .setRecordSerializer(KafkaRecordSerializationSchema.builder()
    13. //指定topic,如果topic不存在就会自动的创建一个分区是1个副本是1个的topic
    14. .setTopic("student")
    15. //指定数据的格式
    16. .setValueSerializationSchema(new SimpleStringSchema())
    17. .build()
    18. )
    19. //指定数据处理的语义:
    20. .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    21. .build();
    22. //执行flink
    23. studentDS.sinkTo(sink);
    24. //构建flink环境
    25. env.execute();
    26. }
    27. }
            启动消费kafka:
    kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic student

  • 相关阅读:
    【论文阅读】MSGNet:学习多变量时间序列预测中的多尺度间序列相关性
    媳妇面试了一家公司,期望月薪20K,对方没多问就答应了,只要求3天内到岗,可我总觉得哪里不对劲。
    GAN的原理与应用PPT
    drools-加载流程
    OA项目之会议通知(查询&是否参会&反馈详情)
    基于动态资源使用策略的SMT执行端口侧信道安全防护
    Windows下Redis3.0集群搭建
    【Flink】flink 状态恢复 because the operator is not available in the new program
    Linux中top 实时监控系统进程状态
    qt 信号与槽机制,登录界面跳转
  • 原文地址:https://blog.csdn.net/m0_62078954/article/details/134274702