• Flink1.17 DataStream API


    目录

    一.执行环境(Execution Environment)

    1.1 创建执行环境

    1.2 执行模式

    1.3 触发程序执行

    二.源算子(Source)

    2.1 从集合中读取数据

    2.2 从文件读取数据

    2.3 从 RabbitMQ 中读取数据

    2.4 从数据生成器读取数据 

    2.5 Flink支持的数据类型

    2.5.1 Flink的类型系统

    2.5.2 Flink支持的数据类型

    2.5.3 类型提示(Type Hints)

    三.转换算子(Transformation)

    3.1 基本转换算子(map/ filter/ flatMap)

    3.1.1 映射(map)

    3.1.2 过滤(filter)

    3.1.2 扁平映射(flatMap)

    3.2 聚合算子(Aggregation)

    3.2.1 按键分区(keyBy)

     3.2.2 简单聚合(sum/min/max/minBy/maxBy)

    3.2.3 归约聚合(reduce)

    3.3 用户自定义函数(UDF)

    3.3.1 函数类(Function Classes)

    3.3.2 富函数类(Rich Function Classes)

    3.4 物理分区算子(Physical Partitioning)

    3.4.1 随机分区(shuffle)

    3.4.2 轮询分区(Round-Robin)

    3.4.3 重缩放分区(rescale)

     3.4.4 广播(broadcast)

    3.4.5 全局分区(global)

    3.4.6 自定义分区(Custom) 

    3.5 分流

    3.5.1 Filter 实现分流

     3.5.2 使用侧输出流

    3.6 基本合流操作

    3.6.1 联合(Union)

    3.6.2 连接(Connect)

    3.6.2.1 连接流(ConnectedStreams)

    3.6.2.2 CoProcessFunction

    四.输出算子(Sink)

     4.1 连接到外部系统

    4.2 输出到文件 

    4.3 输出到RabbitMQ

    4.4 输出到MySQL(JDBC) 

     4.5 自定义Sink输出


    DataStream API是Flink的核心层API。一个Flink程序,其实就是对DataStream的各种转换。具体来说,代码基本上都由以下几部分构成:

    一.执行环境(Execution Environment)

            Flink程序可以在各种上下文环境中运行:既可以可以在本地JVM中执行程序,也可以提交到远程集群上运行。

    1.1 创建执行环境

    获取的执行环境是StreamExecutionEnvironment类的对象(流处理,批处理已经标记为过时),创建执行环境一般有以下三种方式:

    1. // 创建一个本地执行环境并返回,可传入并行度,默认是本地CPU核心数
    2. StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
    3. // 返回远程集群执行环境,需传入远程IobManager的主机名与端口、及在集群中需运行的Jar包
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("JobManager ip", "JobManager port","提交给JobManager的JAR包");
    5. // (推荐)根据当前环境自动选择执行环境,无脑选这个即可
    6. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

     当使用 getExecutionEnvironment() 创建环境时,可以传入 org.apache.flink.configuration.Configuration 类来手动指定默认的参数,例如端口等。

    1. Configuration conf = new Configuration();
    2. conf.set(RestOptions.PORT,8088);
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

    1.2 执行模式

    从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理。不建议使用DataSet API(已过时)。

    通过代码指定:

    1. // 流 执行模式
    2. env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
    3. // 批 执行模式
    4. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    5. // 自动模式,根据数据源是否有界自动选择执行模式
    6. env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

    提交任务时命令行指定(推荐):

    bin/flink run -Dexecution.runtime-mode=BATCH ...

    同一套代码/API,既可以指定流处理也可以指定批处理,这就是“流批一体”的其中一个解释。

    1.3 触发程序执行

    1. // 程序执行
    2. env.execute();

    写完输出(sink)操作并不代表程序已经结束。因为当 main() 方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”

    默认 main 方法的一个 env.execute() 会触发一个 Flink Job,并且一个 main 方法可以调用多个 env.execute() ,但无意义,因为第一个会阻塞住。可使用 env.executeAsync() 可以异步触发,而且不会产生阻塞。

    在application模式下,代码中有多少个 env.executeAsync() ,就会有多少个Job,对应就会有多少个 JboManager。

    二.源算子(Source)

    Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。

    从Flink1.12开始,主要使用流批统一的新Source架构:

    DataStreamSource stream = env.fromSource(…)

    Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。

    2.1 从集合中读取数据

            最简单的读取数据的方式,就是在代码中直接创建一个Java集合,然后调用执行环境的fromCollection方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。

    1. public static void main(String[] args) throws Exception {
    2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    3. // 从集合中读取数据
    4. DataStreamSource source = env.fromCollection(Arrays.asList(1, 10, 99, 53));
    5. source.print();
    6. env.execute();
    7. }

    输出结果:

    1. 6> 1
    2. 8> 99
    3. 1> 53
    4. 7> 10

    2.2 从文件读取数据

    在实际场景中,可能要读取、处理日志文件这样的需求,这也是批处理最常见的读取方式。 

    读取文件,需要添加文件连接器依赖:

    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-connector-filesartifactId>
    4. <version>${flink.version}version>
    5. dependency>

     代码如下:

    1. public static void main(String[] args) throws Exception {
    2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    3. FileSource fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/words.txt")).build();
    4. // !使用官方推荐的新的 Source 架构 => env.fromSource(Source实现类,Watermark,资源名称)
    5. env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),"file").print();
    6. env.execute();
    7. }

    输出结果:

    1. 3> hello flink
    2. 3> hello world
    3. 3> hello java

    2.3 从 RabbitMQ 中读取数据

    导入相关依赖:

    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-connector-rabbitmqartifactId>
    4. <version>3.0.1-1.17version>
    5. dependency>
    6. <dependency>
    7. <groupId>com.rabbitmqgroupId>
    8. <artifactId>amqp-clientartifactId>
    9. <version>5.14.1version>
    10. dependency>

    相关代码:

    1. import org.apache.flink.api.common.serialization.SimpleStringSchema;
    2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    4. import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
    5. import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
    6. /**
    7. * 从 RabbitMQ读取数据
    8. */
    9. public class RabbitMQSourceDemo {
    10. public static void main(String[] args) throws Exception {
    11. // 获取执行环境
    12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    13. // 配置 RabbitMQ 连接信息
    14. RMQConnectionConfig config = new RMQConnectionConfig.Builder()
    15. .setHost("RabbitMQ服务器地址")
    16. .setPort(RabbitMQ端口)
    17. .setUserName(用户名)
    18. .setPassword(密码)
    19. .setVirtualHost(虚拟主机)
    20. .build();
    21. // 添加 RabbitMQ 数据源(Flink 1.17 并不支持使用 env.fromSource 在 RabbitMQ 读取数据!)
    22. RMQSource<String> source = new RMQSource<>(
    23. config, // 连接配置
    24. "test_queue", // 队列名称
    25. new SimpleStringSchema()); // 反序列化器
    26. // 添加数据源
    27. DataStreamSource<String> rabbitMQStream = env.addSource(source);
    28. // 打印
    29. rabbitMQStream.print();
    30. // 执行
    31. env.execute("RabbitMQ job");
    32. }
    33. }

    进入 RabbitMQ Web 页面,在对应的虚拟主机下创建相关的队列,进入队列中,使用 Web 中的 Publish message给队列发送消息:

    输出结果:

    2.4 从数据生成器读取数据 

    Flink从1.11开始提供了一个内置的DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。1.17提供了新的Source写法,需要导入依赖:

    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-connector-datagenartifactId>
    4. <version>${flink.version}version>
    5. dependency>

     代码:

    1. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    2. import org.apache.flink.api.common.typeinfo.Types;
    3. import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
    4. import org.apache.flink.connector.datagen.source.DataGeneratorSource;
    5. import org.apache.flink.connector.datagen.source.GeneratorFunction;
    6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    7. /**
    8. * Flink 数据生成器
    9. */
    10. public class DataGeneratorDemo {
    11. public static void main(String[] args) throws Exception {
    12. // 获取执行环境
    13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    14. /**
    15. * 数据生成器的四个参数:
    16. * 1、GeneratorFunction的map实现。重写返回值
    17. * 2、返回的个数 会从0开始依次返回(使用Long.MAX_VALUE可模拟出无界流)
    18. * 3、限速,每秒多少个数据
    19. * 4、返回值类型
    20. */
    21. DataGeneratorSource dataGeneratorSource = new DataGeneratorSource<>(
    22. (GeneratorFunction) num -> "Number:" + num,
    23. 30,
    24. RateLimiterStrategy.perSecond(3),
    25. Types.STRING
    26. );
    27. env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(),"datagenerator-source").print();
    28. // 执行
    29. env.execute();
    30. }
    31. }

    输出:

    1. 1> Number:8
    2. 8> Number:12
    3. 4> Number:27
    4. 3> Number:0
    5. 2> Number:4
    6. 6> Number:24
    7. 5> Number:16
    8. 7> Number:20
    9. 3> Number:1
    10. 7> Number:21
    11. 5> Number:17
    12. .
    13. .
    14. .

    2.5 Flink支持的数据类型

    2.5.1 Flink的类型系统

    Flink使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。

    2.5.2 Flink支持的数据类型

    对于常见的Java和Scala数据类型,Flink都是支持的。Flink在内部,Flink对支持不同的类型进行了划分,这些类型可以在Types工具类中找到:

    其中包含 Java 基本类型包装类数组类型复合数据类型、辅助类型(List、Map等)、泛型类型(GENERIC)。

    符合类型又包括:

    • Java元组类型(TUPLE):这是Flink内置的元组类型,是Java API的一部分。最多25个字段,也就是从Tuple0~Tuple25,不支持空字段。
    • Scala 样例类及Scala元组:不支持空字段。
    • 行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段。
    • POJO:Flink自定义的类似于Java bean模式的类。(POJO的类和属性是公有的、有一个无参构造、属性可序列化)

    2.5.3 类型提示(Type Hints)

    Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。例如:

    1. .map(word -> Tuple2.of(word, 1L))
    2. .returns(Types.TUPLE(Types.STRING, Types.LONG));

    可写作:

    1. .map(word -> Tuple2.of(word, 1L))
    2. .returns(new TypeHint>(){})

    三.转换算子(Transformation)

    数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。

    3.1 基本转换算子(map/ filter/ flatMap

    准备工作

    为了方便练习,这里使用WaterSensor作为数据模型。

    字段名

    数据类型

    说明

    id

    String

    水位传感器类型

    ts

    Long

    传感器记录时间戳

    vc

    Integer

    水位记录

    代码如下:

    1. public class WaterSensor {
    2. public String id;
    3. public Long ts;
    4. public Integer vc;
    5. // 省略getter、setter、构造器、toString
    6. }

    3.1.1 映射(map)

    与 JDK1.8 中的Stream中的 Map 类似。Map 就是将一个元素映射成另一个元素。基于DataStream调用map()方法就可以进行转换处理。

    例子:需要提取 WaterSensor 中的 id 属性:

    1. public class MapDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. // 构造数据
    5. DataStreamSource source = env.fromElements(
    6. new WaterSensor("id_1", 1l, 1),
    7. new WaterSensor("id_2", 2l, 2),
    8. new WaterSensor("id_3", 3l, 3)
    9. );
    10. // 方法1:实现匿名内部类
    11. source.map(new MapFunction() {
    12. @Override
    13. public String map(WaterSensor waterSensor) throws Exception {
    14. return waterSensor.getId();
    15. }
    16. }).print();
    17. // 方法2:Lambda 表达式
    18. source.map(WaterSensor::getId).print();
    19. // 方法三:定义 MapFunction 实现类
    20. source.map(new MyMapFunction()).print();
    21. env.execute();
    22. }
    23. // 实现 MapFunction , 可以复用
    24. static class MyMapFunction implements MapFunction{
    25. @Override
    26. public String map(WaterSensor waterSensor) throws Exception {
    27. return waterSensor.getId();
    28. }
    29. }
    30. }

    结果输出:

    1. 2> id_3
    2. 8> id_1
    3. 1> id_2

    3.1.2 过滤(filter)

    与 JDK1.8 中的Stream中的 Fliter类似。对数据流进行过滤,满足条件的元素则会被输出,不满足则被过滤。

    例子:过滤掉 WaterSensor 中 id 不为 “id_1” 的元素。

    1. public class FilterDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. // 构造数据
    5. DataStreamSource source = env.fromElements(
    6. new WaterSensor("id_1", 1l, 1),
    7. new WaterSensor("id_1", 12l, 2),
    8. new WaterSensor("id_2", 2l, 2),
    9. new WaterSensor("id_3", 3l, 3)
    10. );
    11. // 过滤数据中 id 不为 id_1 的元素
    12. source.filter(new FilterFunction() {
    13. @Override
    14. public boolean filter(WaterSensor waterSensor) throws Exception {
    15. return "id_1".equals(waterSensor.getId());
    16. }
    17. }).print();
    18. env.execute();
    19. }
    20. }

    结果输出:

    1. 3> WaterSensor{id='id_1', ts=1, vc=1}
    2. 4> WaterSensor{id='id_1', ts=12, vc=2}

    3.1.2 扁平映射(flatMap)

    flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素。flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。

    例子:如果 id 为 id_1 则输出 vc 属性,如果 id 为 id_2 则输出 ts、vc 属性。

    1. public class FlatmapDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. // 构造数据
    5. DataStreamSource source = env.fromElements(
    6. new WaterSensor("id_1", 1l, 1),
    7. new WaterSensor("id_1", 12l, 2),
    8. new WaterSensor("id_2", 2l, 2),
    9. new WaterSensor("id_3", 3l, 3)
    10. );
    11. /**
    12. * 如果 id 为 id_1 则输出 vc 属性
    13. * 如果 id 为 id_2 则输出 ts、vc 属性
    14. */
    15. source.flatMap(new FlatMapFunction() {
    16. @Override
    17. public void flatMap(WaterSensor waterSensor, Collector collector) throws Exception {
    18. if("id_1".equals(waterSensor.getId())){
    19. // 将 vc 放入采集器
    20. collector.collect(waterSensor.getVc().toString());
    21. } else if ("id_2".equals(waterSensor.getId())) {
    22. // 将 ts、vc 放入采集器
    23. collector.collect(waterSensor.getVc().toString());
    24. collector.collect(waterSensor.getTs().toString());
    25. }
    26. }
    27. }).print();
    28. env.execute();
    29. }
    30. }

    结果输出:

    1. 2> 1
    2. 3> 2
    3. 4> 2
    4. 4> 2

    3.2 聚合算子(Aggregation)

    计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),类似于MapReduce中的reduce操作。

    3.2.1 按键分区(keyBy)

    在Flink中,要做聚合,需要先进行分区;这个操作就是通过keyBy来完成的。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。 

    • 基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key的数据,都将被发往同一个分区一个子任务就可以理解为一个分区
    • KeyBy 返回的是 KeyedStream 键控流。
    • KeyBy 不是转换算子,只是对数据做重分区,不能设置并行度。
    • 分区是通过对 Key 进行 Hash 再对分区数取模来实现的。

    例子:以 id 作为 Key 进行分区:

    1. public class KeyByDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. // 构造数据
    5. DataStreamSource source = env.fromElements(
    6. new WaterSensor("id_1", 1l, 1),
    7. new WaterSensor("id_1", 12l, 2),
    8. new WaterSensor("id_2", 2l, 2),
    9. new WaterSensor("id_3", 3l, 3)
    10. );
    11. // 以 id 为 Key 进行分区
    12. source.keyBy(new KeySelector() {
    13. @Override
    14. public String getKey(WaterSensor waterSensor) throws Exception {
    15. return waterSensor.getId();
    16. }
    17. }).print();
    18. env.execute();
    19. }
    20. }

     结果输出:

    1. 2> WaterSensor{id='id_2', ts=2, vc=2}
    2. 3> WaterSensor{id='id_1', ts=1, vc=1}
    3. 3> WaterSensor{id='id_1', ts=12, vc=2}
    4. 3> WaterSensor{id='id_3', ts=3, vc=3}

     3.2.2 简单聚合(sum/min/max/minBy/maxBy)

    所有的聚合操作都要基于按键分区的数据流KeyedStream。 Flink为我们内置实现了一些最基本、最简单的聚合API,主要有以下几种:

    • sum():在输入流上,对指定的字段做叠加求和的操作。
    • min():在输入流上,对指定的字段求最小值。
    • max():在输入流上,对指定的字段求最大值。
    • sumBy()、minBy()、maxBy():功能类似,xxxBy() 会返回包含符合要求的整条数据。而不加 By 只会保留第一次的非比较字段。

    例子:

    1. public class AggrDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(2);
    5. // 构造数据
    6. DataStreamSource source = env.fromElements(
    7. new WaterSensor("id_1", 1l, 1),
    8. new WaterSensor("id_1", 12l, 22),
    9. new WaterSensor("id_2", 2l, 2),
    10. new WaterSensor("id_3", 3l, 3)
    11. );
    12. // 以 id 为 Key 进行分区
    13. KeyedStream keyBySource = source.keyBy(new KeySelector() {
    14. @Override
    15. public String getKey(WaterSensor waterSensor) throws Exception {
    16. return waterSensor.getId();
    17. }
    18. });
    19. keyBySource.sum("vc").print();
    20. keyBySource.min("vc").print();
    21. keyBySource.max("vc").print();
    22. /**
    23. * max结果:
    24. * 1> WaterSensor{id='id_1', ts=1, vc=1}
    25. * 1> WaterSensor{id='id_1', ts=1, vc=22}
    26. * 1> WaterSensor{id='id_2', ts=2, vc=2}
    27. * 1> WaterSensor{id='id_3', ts=3, vc=3}
    28. * ts 还是 第一次的值
    29. */
    30. keyBySource.maxBy("vc").print();
    31. /**
    32. * max结果:
    33. * 1> WaterSensor{id='id_1', ts=1, vc=1}
    34. * 1> WaterSensor{id='id_1', ts=12, vc=22}
    35. * 1> WaterSensor{id='id_2', ts=2, vc=2}
    36. * 1> WaterSensor{id='id_3', ts=3, vc=3}
    37. * 取当前整列值
    38. */
    39. env.execute();
    40. }
    41. }

    3.2.3 归约聚合(reduce)

    reduce可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。

    ReduceFunction接口里需要实现reduce()方法,这个方法接收两个输入事件,经过转换处理之后输出一个相同类型的事件。在流处理的底层实现过程中,实际上是将中间“合并的结果”作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。

    例子:只保存每个分组中 VC 最大的那条数据

    1. public class ReduceDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(1);
    5. // 构造数据
    6. DataStreamSource source = env.fromElements(
    7. new WaterSensor("id_1", 1l, 1),
    8. new WaterSensor("id_1", 21l, 21),
    9. new WaterSensor("id_1", 31l, 31),
    10. new WaterSensor("id_2", 2l, 2),
    11. new WaterSensor("id_3", 3l, 3)
    12. );
    13. // 以 id 为 Key 进行分区
    14. KeyedStream sensorKs = source.keyBy(new KeySelector() {
    15. @Override
    16. public String getKey(WaterSensor waterSensor) throws Exception {
    17. return waterSensor.getId();
    18. }
    19. });
    20. /**
    21. * reduce:
    22. * 1.必须在KeyBy后调用
    23. * 2.输入类型 = 输出类型
    24. * 3.每个分区的第一条数据来的时候不会执行reduce,但是会存起来保存状态,直接输出,,“Flink有状态的体现”
    25. * 4.reduce( value1, value2)
    26. * a.value1 是上一次的计算结果
    27. * b.value2 是当前进入的数据
    28. */
    29. SingleOutputStreamOperator reduce = sensorKs.reduce(new ReduceFunction() {
    30. @Override
    31. public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
    32. if(value2.getVc() > value1.getVc()){
    33. return new WaterSensor(value2.getId(), value2.getTs(), value2.getVc());
    34. }else {
    35. return value1;
    36. }
    37. }
    38. });
    39. reduce.print();
    40. env.execute();
    41. }
    42. }

    结果输出:

    1. WaterSensor{id='id_1', ts=1, vc=1} // 分组的第一条数据直接返回
    2. WaterSensor{id='id_1', ts=21, vc=21}
    3. WaterSensor{id='id_1', ts=31, vc=31}
    4. WaterSensor{id='id_2', ts=2, vc=2} // 分组的第一条数据直接返回
    5. WaterSensor{id='id_3', ts=3, vc=3} // 分组的第一条数据直接返回

    reduce同简单聚合算子一样,也要针对每一个key保存状态。因为状态不会清空,所以我们需要将reduce算子作用在一个有限key的流上。

    3.3 用户自定义函数(UDF

    用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。

    用户自定义函数分为:函数类匿名函数富函数类

    3.3.1 函数类(Function Classes)

    Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。

    匿名内部类实现

    1. source.filter(new FilterFunction() {
    2. @Override
    3. public boolean filter(WaterSensor waterSensor) throws Exception {
    4. return "id_1".equals(waterSensor.getId());
    5. }
    6. }).print();

    Lambda表达式实现:

    source.filter((FilterFunction) waterSensor -> "id_1".equals(waterSensor.getId())).print();

    实现 XxxFunction 接口

    1. public class FilterFunctionImpl implements FilterFunction {
    2. public String id ;
    3. public FilterFunctionImpl(String id) {
    4. this.id = id;
    5. }
    6. @Override
    7. public boolean filter(WaterSensor waterSensor) throws Exception {
    8. return this.id.equals(waterSensor.getId());
    9. }
    10. }

    3.3.2 富函数类(Rich Function Classes)

    “富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。

    RichXxxFunction 与 XxxFunction 的区别是可以获取到任务运行时的一些上下文信息、环境信息以及对任务生命周期的管理。

    典型的生命周期方法有:

    • 重写open()方法,每个子任务在启动时,会调用一次。
    • 重写close()方法,每个子任务在结束时会调用一次。
      • 程序异常退出不会调用 close() 方法。
      • 手动取消任务会调用 close() 方法。

    在open、close中可以使用 getRuntimeContext() 来获取运行时上下文信息。

    1. public class RichMapFunctionDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(1);
    5. DataStreamSource source = env.fromElements(1, 2, 3, 4);
    6. SingleOutputStreamOperator map = source.map(new RichMapFunction() {
    7. @Override
    8. public void open(Configuration parameters) throws Exception {
    9. System.out.println("open:子任务名称"+getRuntimeContext().getTaskNameWithSubtasks());
    10. System.out.println("open:子任务编号"+getRuntimeContext().getIndexOfThisSubtask());
    11. super.open(parameters);
    12. }
    13. @Override
    14. public void close() throws Exception {
    15. System.out.println("close:子任务名称"+getRuntimeContext().getTaskNameWithSubtasks());
    16. System.out.println("close:子任务编号"+getRuntimeContext().getIndexOfThisSubtask());
    17. super.close();
    18. }
    19. @Override
    20. public Integer map(Integer value) throws Exception {
    21. return value + 1;
    22. }
    23. });
    24. map.print();
    25. env.execute();
    26. }
    27. }

    输出结果:

    1. open:子任务名称Source: Collection Source -> Map -> Sink: Print to Std. Out (1/1)#0
    2. open:子任务编号0
    3. 2
    4. 3
    5. 4
    6. 5
    7. close:子任务名称Source: Collection Source -> Map -> Sink: Print to Std. Out (1/1)#0
    8. close:子任务编号0

    3.4 物理分区算子(Physical Partitioning

    Flink 为我们提供了7种分区策略和一个用户自定义分区器。常见的物理分区策略有:随机分配(Random)轮询分配(Round-Robin)重缩放(Rescale)广播(Broadcast)

    分区算子就是将数据按照某种策略分配到下游算子的子任务分区中。 

    3.4.1 随机分区(shuffle)

    通过调用DataStream的.shuffle()方法,将数据随机地分配到下游算子的并行任务分区中去。

    shuffle底层实现采用的是 生成随机数

    1. public static void main(String[] args) throws Exception {
    2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    3. env.setParallelism(2);
    4. DataStreamSource source = env.socketTextStream("ip", 端口);
    5. // 随机分区 random.nextInt(下游算子并行度)
    6. source.shuffle().print();
    7. env.execute();
    8. }
    1. 输入:
    2. [root@VM-55-24-centos ~]# nc -lk 1234
    3. 1
    4. 2
    5. 3
    6. 4
    7. 5
    8. 输出:
    9. 1> 1
    10. 2> 2
    11. 2> 3
    12. 1> 4
    13. 1> 5

    从控制台输出的左侧子任务编号可以看出子任务分区是随机分配的。

    3.4.2 轮询分区(Round-Robin)

    通过调用DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance采用的是对并行度取模,可以将输入流数据平均分配到下游的并行任务中去。可以解决 数据源数据倾斜 的问题。

    1. // 轮询重分区
    2. source.rebalance().print();
    1. 输入:
    2. [root@VM-55-24-centos ~]# nc -lk 1234
    3. 1
    4. 2
    5. 3
    6. 4
    7. 5
    8. 输出:
    9. 1> 1
    10. 2> 2
    11. 1> 3
    12. 2> 4
    13. 1> 5

    3.4.3 重缩放分区(rescale)

    与 rebalance 类似,也是轮询的效果,不过比轮询更加高效。rescale的做法是将数据在固定的几个分区中进行轮询,而不是轮询所有分区。

    1. // 缩放轮询
    2. source.rescale().print();

     3.4.4 广播(broadcast)

    通过调用DataStream的broadcast()方法,会将数据发送到下游算子的所有并行任务中去。慎用!

    1. // 广播
    2. source.broadcast().print();

    3.4.5 全局分区(global)

    全局分区做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了1。慎用,可能对程序造成很大的压力!

    1. // 全局分区
    2. source.global().print();

    3.4.6 自定义分区(Custom) 

    当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略。

    例子:实现将奇数与偶数分配到不同的分区

    自定义分区器实现 Partitioner:

    1. public class MyPartitioner implements Partitioner {
    2. @Override
    3. public int partition(String key, int numPartitions) {
    4. // key 为当前数据,numPartitions 为下游并行度
    5. return Integer.parseInt(key) % numPartitions;
    6. }
    7. }

    使用自定义分区

    1. public class PartitionCustomDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(2);
    5. DataStreamSource source = env.socketTextStream("IP", 端口);
    6. source.partitionCustom(new MyPartitioner(), v -> v).print();
    7. env.execute();
    8. }
    9. }

    结果输出:

    1. 输入:
    2. [root@VM-55-24-centos ~]# nc -lk 1234
    3. 2
    4. 3
    5. 4
    6. 6
    7. 8
    8. 10
    9. 输出:
    10. 2> 1
    11. 1> 2
    12. 2> 3
    13. 1> 4
    14. 1> 6
    15. 1> 8
    16. 1> 10

    3.5 分流

    所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里

    与分区不同的是,分流是是将一条数据流拆分成多条流。而分区是将数据分配到下游算子的子任务中。

    3.5.1 Filter 实现分流

    例子:读取一个整数数字流,将数据流划分为奇数流和偶数流。

    1. public class SplitByFilterDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(2);
    5. DataStreamSource source = env.socketTextStream("IP", 端口);
    6. source.filter(num -> Integer.parseInt(num) % 2 == 0).print("偶数流:");
    7. source.filter(num -> Integer.parseInt(num) % 2 == 1).print("奇数流:");
    8. env.execute();
    9. }
    10. }

    输入输出结果:

    1. 输入:
    2. [root@VM-55-24-centos ~]# nc -lk 1234
    3. 1
    4. 2
    5. 45
    6. 324321
    7. 234325235
    8. 12312
    9. 11412
    10. 输出:
    11. 奇数流::1> 1
    12. 偶数流::2> 2
    13. 奇数流::1> 45
    14. 偶数流::2> 324321
    15. 奇数流::1> 234325235
    16. 偶数流::2> 12312
    17. 偶数流::2> 11412

    用 Filter 实现虽然简单但不够高效,因为每次数据流都会经过两次 Filter 过滤 。

     3.5.2 使用侧输出流

    一条未被分类操作的流被称为“主流”,经过分流操作后,侧输出流可以理解为“主流”的“支流”。

    需求:id 为 s1 、s2 的数据被到另外两条侧流 ,非 s1 、s2不受影响,放在主流:

    1. public class SideOutputDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(2);
    5. DataStreamSource source = env.socketTextStream("IP", 端口);
    6. SingleOutputStreamOperator sensorDs = source.map(new MapFunction() {
    7. @Override
    8. public WaterSensor map(String value) throws Exception {
    9. String[] data = value.split(",");
    10. return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]));
    11. }
    12. });
    13. // 侧输出流的标记
    14. OutputTag s1Tag = new OutputTag<>("s1", Types.POJO(WaterSensor.class));
    15. OutputTag s2Tag = new OutputTag<>("s2", Types.POJO(WaterSensor.class));
    16. SingleOutputStreamOperator process = sensorDs.process(new ProcessFunction() {
    17. @Override
    18. public void processElement(WaterSensor value, ProcessFunction.Context ctx, Collector out) throws Exception {
    19. if ("s1".equals(value.getId())) {
    20. ctx.output(s1Tag, value);
    21. } else if ("s2".equals(value.getId())) {
    22. ctx.output(s2Tag, value);
    23. } else {
    24. out.collect(value);
    25. }
    26. }
    27. });
    28. // process 默认只会返回主流数据
    29. process.print("主流");
    30. // 根据输出标签(流的标签)找到 s1 这条支流斌输出
    31. process.getSideOutput(s1Tag).printToErr("测输出流S1");
    32. // 根据输出标签(流的标签)找到 s2 这条支流斌输出
    33. process.getSideOutput(s2Tag).printToErr("测输出流S2");
    34. env.execute();
    35. }
    36. }

    输入与输出结果:

    1. 输入:
    2. [root@VM-55-24-centos ~]# nc -lk 1234
    3. s1,1,1
    4. s3,3,3
    5. s2,2,2
    6. s9,9,9
    7. s1,33,1134
    8. 输出:
    9. 测输出流S1:2> WaterSensor{id='s1', ts=1, vc=1}
    10. 主流:1> WaterSensor{id='s3', ts=3, vc=3}
    11. 测输出流S2:2> WaterSensor{id='s2', ts=2, vc=2}
    12. 主流:1> WaterSensor{id='s9', ts=9, vc=9}
    13. 测输出流S1:2> WaterSensor{id='s1', ts=33, vc=1134}
    • Process 算子非常灵活,基础算子底层都是调用 Process 来实现的。
    • OutputTag 可以理解为侧输出流的名称以流的数据类型。
    • 将数据放入侧输出流中需要使用 ctx.output()传入输出流标签和数据 ;
    • process 返回的流是主流,想获取侧输出流必须通过 process.getSideOutput()传入输出流标签来获取。

    3.6 基本合流操作

    在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以Flink中合流的操作会更加普遍,对应的API也更加丰富。

    3.6.1 联合(Union)

    通过调用数据源的 Union() 就可以将一条或者多条流进行合并。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。

    1. public class UnionDemo {
    2. /**
    3. * Union : 合并一条或多条相同数据类型的流
    4. */
    5. public static void main(String[] args) throws Exception {
    6. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    7. env.setParallelism(1);
    8. DataStreamSource source1 = env.fromElements(1, 2, 3);
    9. DataStreamSource source2 = env.fromElements(44, 55, 66);
    10. DataStreamSource source3 = env.fromElements("777", "888", "999");
    11. // 写法1 : 一次合并一个流
    12. DataStream union = source1.union(source2).union(source3.map(Integer::parseInt));
    13. // 写法2 : 一次合并多个流
    14. source1.union(source2,source3.map(Integer::parseInt));
    15. union.print();
    16. env.execute();
    17. }
    18. }

    结果输出:

    1. 1
    2. 2
    3. 3
    4. 44
    5. 55
    6. 66
    7. 777
    8. 888
    9. 999

    3.6.2 连接(Connect)

    Union 虽然使用简单,但是受限于只能合并相同类型的流,不太灵活。Flink 提供了另一个更方便的河流操作:连接(Connect)。

    3.6.2.1 连接流(ConnectedStreams

    通过 Connect 可以将两条不同类型的流进行连接,但是不再返回 DataStream ,而是返回 ConnectedStreams(连接流)。

    且两条流连接后只是形式上的“合并”,对这条流进行处理转换则需要对原本的两条流单独处理。

    1. public class ConnectDemo {
    2. /**
    3. * Connect : 连接(合并)两条流
    4. * 返回的是 ConnectedStreams(连接流) 而不是 DataStream
    5. * 只是名义上的统一,处理逻辑需要每条流单独处理
    6. */
    7. public static void main(String[] args) throws Exception {
    8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    9. env.setParallelism(1);
    10. DataStreamSource source1 = env.fromElements(1, 2, 3);
    11. DataStreamSource source2 = env.fromElements("777", "888", "999");
    12. ConnectedStreams connect = source1.connect(source2);
    13. // 需要对两条单独处理 CoMapFunction(第一条流的类型,第二条流的类型,输出的类型)
    14. SingleOutputStreamOperator map = connect.map(new CoMapFunction() {
    15. @Override
    16. public String map1(Integer value) throws Exception {
    17. return value.toString();
    18. }
    19. @Override
    20. public String map2(String value) throws Exception {
    21. return value;
    22. }
    23. });
    24. map.print();
    25. env.execute();
    26. }
    27. }

    结果输出:

    1. 1
    2. 777
    3. 2
    4. 888
    5. 3
    6. 999
    3.6.2.2 CoProcessFunction

    与CoMapFunction类似,如果是调用.map()就需要传入一个CoMapFunction,需要实现map1()、map2()两个方法;而调用.process()时,传入的则是一个CoProcessFunction。它也是“处理函数”家族中的一员,用法非常相似。它需要实现的就是processElement1()、processElement2()两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。

    例子:有两条数据类型不同的流,需要根据各自数据的第一个字段进行匹配。类似于 MySQL中的 Inner Join。

    1. public class ConnectKeyByDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(1);
    5. DataStreamSource> source1 = env.fromElements(
    6. Tuple2.of(1, "a1"),
    7. Tuple2.of(1, "a2"),
    8. Tuple2.of(2, "b"),
    9. Tuple2.of(3, "c")
    10. );
    11. DataStreamSource> source2 = env.fromElements(
    12. Tuple3.of(1, "aa1", 1),
    13. Tuple3.of(1, "aa2", 2),
    14. Tuple3.of(2, "bb", 1),
    15. Tuple3.of(3, "cc", 1)
    16. );
    17. ConnectedStreams, Tuple3> connect = source1.connect(source2);
    18. SingleOutputStreamOperator process = connect.process(new CoProcessFunction, Tuple3, String>() {
    19. // 为各自两条流定义中间变量用于存储匹配时的数据
    20. Map>> s1Cache = new HashMap<>();
    21. Map>> s2Cache = new HashMap<>();
    22. @Override
    23. public void processElement1(Tuple2 value, CoProcessFunction, Tuple3, String>.Context ctx, Collector out) throws Exception {
    24. Integer id = value.f0;
    25. // 第一次出现该 Key 则直接将数据put进s1的数据集合中
    26. if (!s1Cache.containsKey(id)) {
    27. ArrayList> s1Values = new ArrayList<>();
    28. s1Values.add(value);
    29. s1Cache.put(id, s1Values);
    30. } else {
    31. // 不是第一次出现该 Key ,直接添加进该 Key 的数组中
    32. s1Cache.get(id).add(value);
    33. }
    34. // 去另外一条流的数据中寻找有没有 id 相匹配的,有则放入采集器
    35. if (s2Cache.containsKey(id)) {
    36. for (Tuple3 s2Element : s2Cache.get(id)) {
    37. out.collect("S1:" + value + "<---->" + "s2:" + s2Element);
    38. }
    39. }
    40. }
    41. @Override
    42. public void processElement2(Tuple3 value, CoProcessFunction, Tuple3, String>.Context ctx, Collector out) throws Exception {
    43. Integer id = value.f0;
    44. // 第一次出现该 Key 则直接将数据put进s2的数据集合中
    45. if (!s2Cache.containsKey(id)) {
    46. ArrayList> s2Values = new ArrayList<>();
    47. s2Values.add(value);
    48. s2Cache.put(id, s2Values);
    49. } else {
    50. // 不是第一次出现该 Key ,直接添加进该 Key 的数组中
    51. s2Cache.get(id).add(value);
    52. }
    53. // 去另外一条流的数据中寻找有没有 id 相匹配的,有则放入采集器
    54. if (s1Cache.containsKey(id)) {
    55. for (Tuple2 s1Element : s1Cache.get(id)) {
    56. out.collect("S2:" + value + "<---->" + "s1:" + s1Element);
    57. }
    58. }
    59. }
    60. });
    61. process.print();
    62. env.execute();
    63. }
    64. }

    结果:

    1. S2:(1,aa1,1)<---->s1:(1,a1)
    2. S1:(1,a2)<---->s2:(1,aa1,1)
    3. S2:(1,aa2,2)<---->s1:(1,a1)
    4. S2:(1,aa2,2)<---->s1:(1,a2)
    5. S2:(2,bb,1)<---->s1:(2,b)
    6. S2:(3,cc,1)<---->s1:(3,c)

    注意:在多并行度下,以上匹配会出错,因为多并行度下,数据会被发往 Process 不同的子任务中(Slot),而不同的子任务间数据无法共享,导致读取不到另一个子任务的数组,从而匹配错误。所以需要在连接流后对要匹配的字段进行 KeyBy 操作,确保同一个 Key 被分配到同一个子任务中。

    四.输出算子(Sink

    Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。

     4.1 连接到外部系统

    Flink 1.17 中的DataStream API专门提供了向外部写入数据的方法:sinkTo,对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。

    stream.sinkTo(…)

    在大部分情况下,Sink 并不需要我们手动实现,Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:

    及第三方提供的连接器:

     地址:Overview | Apache Flink

    4.2 输出到文件 

    Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。

    FileSink支持行编码(Row-encoded)批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:

    例子:使用数据生成器源源不断生成数据,并输出到文件夹的文本文件中,每隔一个小时生成一个新的文件夹,且每隔20秒或者文件大小达到 3KB 则新建一个文本文件。

    导入依赖:

    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-connector-filesartifactId>
    4. <version>${flink.version}version>
    5. dependency>
    1. public class SinkFileDemo {
    2. public static void main(String[] args) throws Exception {
    3. // 获取执行环境
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. // 全局并行度设置为 2
    6. env.setParallelism(2);
    7. // 开启checkpoint
    8. env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
    9. /**
    10. * 数据生成器:无限生成数字,一秒生成 1000 条
    11. */
    12. DataGeneratorSource dataGenSource = new DataGeneratorSource<>(
    13. (GeneratorFunction) num -> "Number:" + num,
    14. Long.MIN_VALUE,
    15. RateLimiterStrategy.perSecond(1000),
    16. Types.STRING
    17. );
    18. DataStreamSource streamSource = env.fromSource(dataGenSource, WatermarkStrategy.noWatermarks(),"data-generator");
    19. /**
    20. * 输出到文件系统
    21. * Sink 算子同样会受到 并行度 的影响:例如会同时有 并行度个 个文件被写入
    22. */
    23. FileSink fileSink = FileSink
    24. // 指定要输出的 文件目录 及 文件编码
    25. .forRowFormat(new Path("D:/tmp"), new SimpleStringEncoder<>("UTF-8"))
    26. // 指定要生成文件的 前后缀
    27. .withOutputFileConfig(
    28. OutputFileConfig.builder() // 建造者模式
    29. // 文件的前缀
    30. .withPartPrefix("flink-file-test")
    31. // 文件的后缀
    32. .withPartSuffix(".txt")
    33. .build()
    34. )
    35. // 指定目录分桶:按照小时进行分桶(一小时生成一个新的目录),并设置时区为 Asia/Shanghai
    36. .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH", ZoneId.of("Asia/Shanghai")))
    37. // 文件滚动策略:每隔多少秒 或 文件超过多大 就生成新的文件
    38. .withRollingPolicy(
    39. DefaultRollingPolicy.builder() // 建造者模式
    40. // 每隔 20S 生成一个新的文件
    41. .withRolloverInterval(Duration.ofSeconds(20))
    42. // 文件大小超过大于 3KB 则生成一个新的文件
    43. .withMaxPartSize(new MemorySize(1024 * 3))
    44. .build()
    45. ).build();
    46. // 输出
    47. streamSource.sinkTo(fileSink);
    48. // 执行
    49. env.execute();
    50. }
    51. }

     结果:

    FileSink

            .forRowFormat:指定要输出的文件目录及文件编码 

            .withOutputFileConfig:指定要生成文件的前后缀

            .withBucketAssigner:指定目录分桶

            .withRollingPolicy:文件滚动策略

    4.3 输出到RabbitMQ

    想要输出到 RabbitMQ,也需要调用对应的 Sink 算子--RMQSink 。 

    例子:从 Socket 读数据,写入到 RabbitMQ 中,作为一条消息。

    添加Kafka 连接器依赖:

    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-connector-rabbitmqartifactId>
    4. <version>3.0.1-1.17version>
    5. dependency>
    1. public class SinkRabbitMqDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(1);
    5. env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
    6. DataStreamSource streamSource = env.socketTextStream("ip", 1234);
    7. // 配置 RabbitMQ 连接信息
    8. RMQConnectionConfig mqConfig = new RMQConnectionConfig.Builder()
    9. .setHost("xxx.xxx.xxx.xxx") // RabbitMQ 服务地址
    10. .setPort(5379) // RabbitMQ 服务端口
    11. .setUserName("用户名") // 用户名
    12. .setPassword("密码") // 密码
    13. .setVirtualHost("/") // 虚拟主机名
    14. .build();
    15. // 创建一个RMQSink,用于将数据发送到RabbitMQ队列
    16. // mq配置信息,队列名称,序列化器
    17. RMQSink rmqSink = new RMQSink(mqConfig, "test_queue", new SimpleStringSchema());
    18. // 将数据流写入RabbitMQ队列 Flink 1.17 并不支持使用 sinkTo 对第三方系统进行输出
    19. streamSource.addSink(rmqSink);
    20. env.execute("flink connectors rabbitmq");
    21. }
    22. }

    输入:

    1. [root@VM-55-24-centos ~]# nc -lk 1234
    2. hello flink
    3. hello rabbitmq

    结果:

    4.4 输出到MySQL(JDBC) 

    同样的,要输出到 MySQL ,需要调用 JdbcSink.sink() 算子,且也只能使用 addSink 来添加输出。

    例子: 在 Socket 中写入数据,写入MySQL中。

    在 MySQL 中新建表:

    1. CREATE TABLE `ws` (
    2. `id` varchar(100) NOT NULL,
    3. `ts` bigint(20) DEFAULT NULL,
    4. `vc` int(11) DEFAULT NULL,
    5. PRIMARY KEY (`id`)
    6. ) ENGINE=InnoDB DEFAULT CHARSET=utf8

    导入 MySQL 驱动:

    1. <dependency>
    2. <groupId>mysqlgroupId>
    3. <artifactId>mysql-connector-javaartifactId>
    4. <version>5.1.47version>
    5. dependency>

    导入 Flink - MySQL 连接器:

    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-connector-jdbcartifactId>
    4. <version>3.1.1-1.17version>
    5. dependency>

    代码:

    1. public class SinkMySQLDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(1);
    5. DataStreamSource streamSource = env.socketTextStream("xxx.xxx.xxx.xxx", 1234);
    6. // 将从 Socket 中读到的字符串转成实体类
    7. SingleOutputStreamOperator map = streamSource.map((MapFunction) s -> {
    8. String[] data = s.split(",");
    9. return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]));
    10. });
    11. /**
    12. * jdbcSink 四大参数:
    13. * 1、要执行的 SQL 语句
    14. * 2、为占位符填充值
    15. * 3、执行选项:重试次数,攒批
    16. * 4、MySQL 连接信息
    17. */
    18. SinkFunction jdbcSink = JdbcSink.sink(
    19. "insert into ws values( ? , ? , ?)",
    20. new JdbcStatementBuilder() {
    21. @Override
    22. public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
    23. preparedStatement.setString(1, waterSensor.getId());
    24. preparedStatement.setLong(2, waterSensor.getTs());
    25. preparedStatement.setInt(3, waterSensor.getVc());
    26. }
    27. },
    28. JdbcExecutionOptions.builder()
    29. .withBatchIntervalMs(3000) // 批次的时间
    30. .withBatchSize(100) // 批次的大小:条数
    31. .withMaxRetries(3) // 重试次数
    32. .build(),
    33. new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    34. .withUrl("jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
    35. .withUsername("Username")
    36. .withPassword("Password")
    37. .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
    38. .build()
    39. );
    40. map.addSink(jdbcSink);
    41. env.execute("flink connectors MySQL");
    42. }
    43. }

    输入:

    1. [root@VM-55-24-centos ~]# nc -lk 1234
    2. hello,1,1
    3. flink,2,2
    4. mysql,3,3

    输出:

     4.5 自定义Sink输出

    Flink 为我们提供很多常用的连接器,一般不推荐自定义Sink,因为需要自行处理连接逻辑及错误逻辑。

    如果要自定义Sink,Flink 为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。

    streamSource.addSink(new MySink());

    推荐继承RichSinkDunction,实现其中的三个方法open()、close()、invoke(String value, Context context)。

    1. public class MySink extends RichSinkFunction{
    2. @Override
    3. public void open(Configuration parameters) throws Exception {
    4. // 启动时会被调用一次
    5. // 可以在这里创建连接
    6. }
    7. @Override
    8. public void close() throws Exception {
    9. // 销毁时会被调用一次
    10. // 可以在这里销毁连接
    11. }
    12. // Sink 的核心逻辑
    13. @Override
    14. public void invoke(String value, Context context) throws Exception {
    15. // 每条数据来都会调用一次
    16. // 具体的写入逻辑...
    17. }
    18. }
  • 相关阅读:
    数据库设计中如何选择主键 (自然键或代理键 )| Part 1
    2022年电工杯数学建模B题5G网络环境下应急物资配送问题求解全过程论文及程序
    Android视角看鸿蒙第四课(module.json中的各字段含义之description&mainElement)修改程序入口
    Spring整合RabbitMQ——生产者(利用配置类)
    纯手码优质JAVA面试八股文
    给一个数组赋1到10的初始值(指针)
    AI芯片的性能评价
    C认证笔记 - Web基础 - 知识点1:语义化标签
    前端性能优化方法与实战08 诊断清单:如何实现监控预警并进行问题诊断
    数据结构入门-13-图
  • 原文地址:https://blog.csdn.net/weixin_53922163/article/details/133975274