• Flink学习笔记(5)——DataSteam API


    目录

    一、 DataStream API(基础篇)

    1.1 执行环境(Execution Environment)

    1.1.1 创建执行环境

    1.1.2 执行模式(Execution Mode)

    1.1.3 触发程序执行

    1.2 源算子(Source)

    1.2.1  准备工作

    1.2.2 三种读取数据的方法(从集合中读取数据、从文件读取数据、从元素读取数据)

    1.2.3 从 Socket 读取数据

    1.2.4 从 Kafka 读取数据

    1.2.6 自定义Source

    1.2.7 Flink 支持的数据类型

    1.3 转换算子(Transformation)

    1.3.1 基本转换算子

    1.3.2 聚合算子(Aggregation)

    1.3.3 用户自定义函数(UDF)

    1.3.4  物理分区(Physical Partitioning)

    1.4 输出算子(Sink)

    1.4.1 连接到外部系统

    1.4.2 输出到文件

    1.4.3 输出到Kafka

    1.4.4 输出到Redis

    1.3.4 输出到 Elasticsearch

    1.4.6 输出到 MySQL(JDBC)

    1.4.7 自定义 Sink 输出

    1.5 本章总结


    一、 DataStream API(基础篇)

    1.1 执行环境(Execution Environment)

    Flink 程序可以在各种上下文环境中运行:我们可以在本地 JVM 中执行程序,也可以提交 到远程集群上运行。

    不同的环境,代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时, 首先必须获取当前 Flink 的运行环境,从而建立起与 Flink 框架之间的联系。只有获取了环境 上下文信息,才能将具体的任务调度到不同的 TaskManager 执行。

    1.1.1 创建执行环境

    编写Flink程序的第一步,就是创建执行环境。我们要获取的执行环境 , 是StreamExecutionEnvironment 类的对象,这是所有 Flink 程序的基础。在代码中创建执行环境的 方式,就是调用这个类的静态方法,具体有以下三种。

    1.1.2 执行模式(Execution Mode)

     

     当然,在后面的示例代码中,即使是有界的数据源,我们也会统一用 STREAMING 模式 处理。这是因为我们的主要目标还是构建实时处理流数据的程序,有界数据源也只是我们用来 测试的手段。

    1.1.3 触发程序执行

    代码写完以后一定要加上execute方法才会触发真正的计算 

    1.2 源算子(Source)

    1.2.1  准备工作

    1. package com.me.chapter05;
    2. import java.sql.Timestamp;
    3. public class Event {
    4. public String user;
    5. public String url;
    6. public Long timestamp;
    7. public Event(){
    8. }
    9. public Event(String user, String url, Long timestamp) {
    10. this.user = user;
    11. this.url = url;
    12. this.timestamp = timestamp;
    13. }
    14. @Override
    15. public String toString() {
    16. return "Event{" +
    17. "user='" + user + '\'' +
    18. ", url='" + url + '\'' +
    19. ", timestamp=" + new Timestamp(timestamp) +
    20. '}';
    21. }
    22. }

    这里需要注意,我们定义的 Event,有这样几个特点:

    ⚫ 类是公有(public)的

    ⚫ 有一个无参的构造方法

    ⚫ 所有属性都是公有(public)的

    ⚫ 所有属性的类型都是可以序列化的 Flink 会把这样的类作为一种特殊的 POJO 数据类型来对待,方便数据的解析和序列化。 另外我们在类中还重写了 toString 方法,主要是为了测试输出显示更清晰。关于 Flink 支持的 数据类型,我们会在后面章节做详细说明。

    我们这里自定义的 Event POJO 类会在后面的代码中频繁使用,所以在后面的代码中碰到

    Event,把这里的 POJO 类导入就好了。

    注:Java 编程比较好的实践是重写每一个类的 toString 方法

    1.2.2 三种读取数据的方法(从集合中读取数据、从文件读取数据、从元素读取数据)

    1. package com.me.chapter05;
    2. import org.apache.flink.streaming.api.datastream.DataStream;
    3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    5. import java.util.ArrayList;
    6. public class SourceTest {
    7. public static void main(String[] args) throws Exception{
    8. //创建执行环境
    9. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    10. env.setParallelism(1);
    11. //1、从文件中读取数据(最常见)
    12. DataStreamSource stream1 = env.readTextFile("input/clicks.txt");
    13. //2、从集合中读取数据
    14. ArrayList nums = new ArrayList<>();
    15. nums.add(2);
    16. nums.add(5);
    17. DataStream numStream = env.fromCollection(nums);
    18. ArrayList events = new ArrayList<>();
    19. events.add(new Event("Mary","./home",1000L));
    20. events.add(new Event("Bob","./cart",2000L));
    21. DataStream stream2= env.fromCollection(events);
    22. //3、从元素读取数据
    23. DataStream stream3= env.fromElements(
    24. new Event("Mary","./home",1000L),
    25. new Event("Bob","./cart",2000L)
    26. );
    27. stream1.print("1");
    28. numStream.print("nums");
    29. stream2.print("2");
    30. stream3.print("3");
    31. env.execute();
    32. }
    33. }

    这三种方法都是对有界流的处理

    1.2.3 从 Socket 读取数据

    1.2.4 从 Kafka 读取数据

    1. org.apache.flink
    2. flink-connector-kafka_${scala.binary.version}
    3. ${flink.version}

    然后调用 env.addSource(),传入 FlinkKafkaConsumer 的对象实例就可以了。

    1. import org.apache.flink.api.common.serialization.SimpleStringSchema;
    2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    5. import java.util.Properties;
    6. public class SourceKafkaTest {
    7. public static void main(String[] args) throws Exception {
    8. StreamExecutionEnvironment env =
    9. StreamExecutionEnvironment.getExecutionEnvironment();
    10. env.setParallelism(1);
    11. Properties properties = new Properties();
    12. properties.setProperty("bootstrap.servers", "hadoop102:9092");
    13. properties.setProperty("group.id", "consumer-group");
    14. properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    15. properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    16. properties.setProperty("auto.offset.reset", "latest");
    17. DataStreamSource stream = env.addSource(new FlinkKafkaConsumer(
    18. "clicks",
    19. new SimpleStringSchema(),
    20. properties
    21. ));
    22. stream.print("Kafka");
    23. env.execute();
    24. }
    25. }

    1.2.6 自定义Source

    1. package com.me.chapter05;
    2. import org.apache.flink.streaming.api.functions.source.SourceFunction;
    3. import java.util.Calendar;
    4. import java.util.Random;
    5. public class ClickSource implements SourceFunction {
    6. //声明一个标志位
    7. private Boolean running=true;
    8. @Override
    9. public void run(SourceContext ctx) throws Exception {
    10. Random random = new Random(); // 在指定的数据集中随机选取数据
    11. String[] users = {"Mary", "Alice", "Bob", "Cary"};
    12. String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};
    13. while (running) {
    14. ctx.collect(new Event(
    15. users[random.nextInt(users.length)],
    16. urls[random.nextInt(urls.length)],
    17. Calendar.getInstance().getTimeInMillis()
    18. ));
    19. // 隔 1 秒生成一个点击事件,方便观测
    20. Thread.sleep(1000);
    21. }
    22. }
    23. @Override
    24. public void cancel() {
    25. }
    26. }
    1. package com.me.chapter05;
    2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    4. import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
    5. import java.util.Random;
    6. public class SourceCustomTest {
    7. public static void main(String[] args) throws Exception{
    8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    9. env.setParallelism(1);
    10. //这种简单的自定义Source方法只可以指定并行度为1
    11. //DataStreamSource customStream=env.addSource(new ClickSource());
    12. DataStreamSource customStream = env.addSource(new ParallelCustomSource()).setParallelism(2);
    13. customStream.print();
    14. env.execute();
    15. }
    16. //实现自定义的并行SourceFunction,这样可以指定更高的并行度(这里直接用静态类完成,不去重新新建一个类了)
    17. public static class ParallelCustomSource implements ParallelSourceFunction{
    18. private Boolean running=true;
    19. private Random random=new Random();
    20. @Override
    21. public void run(SourceContext ctx) throws Exception {
    22. while(running){
    23. ctx.collect(random.nextInt());
    24. }
    25. }
    26. @Override
    27. public void cancel() {
    28. }
    29. }
    30. }

    1.2.7 Flink 支持的数据类型

    我们已经了解了 Flink 怎样从不同的来源读取数据。在之前的代码中,我们的数据都是定 义好的 UserBehavior 类型,而且在之前小节中特意说明了对这个类的要求。那还有没有其他 更灵活的类型可以用呢?Flink 支持的数据类型到底有哪些?

    1.3 转换算子(Transformation)

    1.3.1 基本转换算子

    1. package com.me.chapter05;
    2. import org.apache.flink.api.common.functions.MapFunction;
    3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    5. public class TransformMapTest {
    6. public static void main(String[] args) throws Exception{
    7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    8. env.setParallelism(1);
    9. DataStreamSource stream = env.fromElements(
    10. new Event("Mary", "./home", 1000L),
    11. new Event("Bob", "./cart", 2000L)
    12. );
    13. // 传入匿名类,实现 MapFunction
    14. stream.map(new MapFunction() {
    15. @Override
    16. public String map(Event e) throws Exception {
    17. return e.user;
    18. }
    19. });
    20. // 传入 MapFunction 的实现类
    21. stream.map(new UserExtractor()).print();
    22. env.execute();
    23. }
    24. //单独定义的一个静态类
    25. public static class UserExtractor implements MapFunction {
    26. @Override
    27. public String map(Event e) throws Exception {
    28. return e.user;
    29. }
    30. }
    31. }

    上面代码中,MapFunction 实现类的泛型类型,与输入数据类型和输出数据的类型有关。 在实现 MapFunction 接口的时候,需要指定两个泛型,分别是输入事件和输出事件的类型,还 需要重写一个 map()方法,定义从一个输入事件转换为另一个输出事件的具体逻辑。

    另外,细心的读者通过查看 Flink 源码可以发现,基于 DataStream 调用 map 方法,返回 的其实是一个 SingleOutputStreamOperator。

    public  SingleOutputStreamOperator map(MapFunction mapper){} 
    

    这表示 map 是一个用户可以自定义的转换(transformation)算子,它作用于一条数据流上,转换处理的结果是一个确定的输出类型。当然,SingleOutputStreamOperator 类本身也继承自DataStream 类,所以说 map 是将一个 DataStream 转换成另一个 DataStream 是完全正确的。

    1. package com.me.chapter05;
    2. import org.apache.flink.api.common.functions.FilterFunction;
    3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    5. public class TransformFilterTest {
    6. public static void main(String[] args) throws Exception{
    7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    8. env.setParallelism(1);
    9. DataStreamSource stream = env.fromElements(
    10. new Event("Mary", "./home", 1000L),
    11. new Event("Bob", "./cart", 2000L)
    12. );
    13. // 传入匿名类实现 FilterFunction
    14. stream.filter(new FilterFunction() {
    15. @Override
    16. public boolean filter(Event e) throws Exception {
    17. return e.user.equals("Mary");
    18. }
    19. });
    20. // 传入 FilterFunction 实现类
    21. stream.filter(new UserFilter()).print();
    22. env.execute();
    23. }
    24. public static class UserFilter implements FilterFunction {
    25. @Override
    26. public boolean filter(Event e) throws Exception {
    27. return e.user.equals("Mary");
    28. }
    29. }
    30. }

    1. package com.me.chapter05;
    2. import org.apache.flink.api.common.functions.FlatMapFunction;
    3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    5. import org.apache.flink.util.Collector;
    6. public class TransformFlatMapTest {
    7. public static void main(String[] args) throws Exception {
    8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    9. env.setParallelism(1);
    10. DataStreamSource stream = env.fromElements(
    11. new Event("Mary", "./home", 1000L),
    12. new Event("Bob", "./cart", 2000L)
    13. );
    14. stream.flatMap(new MyFlatMap()).print();
    15. env.execute();
    16. }
    17. public static class MyFlatMap implements FlatMapFunction {
    18. @Override
    19. public void flatMap(Event value, Collector out) throws Exception
    20. {
    21. if (value.user.equals("Mary")) {
    22. out.collect(value.user);
    23. } else if (value.user.equals("Bob")) {
    24. out.collect(value.user);
    25. out.collect(value.url);
    26. }
    27. }
    28. }
    29. }

    1.3.2 聚合算子(Aggregation)

    1. package com.me.chapter05;
    2. import org.apache.flink.api.java.functions.KeySelector;
    3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    4. import org.apache.flink.streaming.api.datastream.KeyedStream;
    5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    6. public class TransformSimpleTest {
    7. public static void main(String[] args) throws Exception {
    8. StreamExecutionEnvironment env =
    9. StreamExecutionEnvironment.getExecutionEnvironment();
    10. env.setParallelism(1);
    11. DataStreamSource stream = env.fromElements(
    12. new Event("Mary", "./home", 1000L),
    13. new Event("Bob", "./cart", 2000L)
    14. );
    15. // 使用 Lambda 表达式
    16. KeyedStream keyedStream = stream.keyBy(e -> e.user);
    17. // 使用匿名类实现 KeySelector
    18. KeyedStream keyedStream1 = stream.keyBy(new KeySelector() {
    19. @Override
    20. public String getKey(Event e) throws Exception {
    21. return e.user;
    22. }
    23. });
    24. env.execute();
    25. }
    26. }

     例如,下面就是对元组数据流进行聚合的测试:

    1. package com.me.chapter05;
    2. import org.apache.flink.api.java.tuple.Tuple2;
    3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    5. public class TransTupleAggreationTest {
    6. public static void main(String[] args) throws Exception {
    7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    8. env.setParallelism(1);
    9. DataStreamSource> stream = env.fromElements(
    10. Tuple2.of("a", 1),
    11. Tuple2.of("a", 3),
    12. Tuple2.of("b", 3),
    13. Tuple2.of("b", 4)
    14. );
    15. stream.keyBy(r -> r.f0).sum(1).print();
    16. stream.keyBy(r -> r.f0).sum("f1").print();
    17. stream.keyBy(r -> r.f0).max(1).print();
    18. stream.keyBy(r -> r.f0).max("f1").print();
    19. stream.keyBy(r -> r.f0).min(1).print();
    20. stream.keyBy(r -> r.f0).min("f1").print();
    21. stream.keyBy(r -> r.f0).maxBy(1).print();
    22. stream.keyBy(r -> r.f0).maxBy("f1").print();
    23. stream.keyBy(r -> r.f0).minBy(1).print();
    24. stream.keyBy(r -> r.f0).minBy("f1").print();
    25. env.execute();
    26. }
    27. }

    而如果数据流的类型是 POJO 类,那么就只能通过字段名称来指定,不能通过位置来指定了。

    1. package com.me.chapter05;
    2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    4. public class TransPojoAggregationTest {
    5. public static void main(String[] args) throws Exception {
    6. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    7. env.setParallelism(1);
    8. DataStreamSource stream = env.fromElements(
    9. new Event("Mary", "./home", 1000L),
    10. new Event("Bob", "./cart", 2000L)
    11. );
    12. stream.keyBy(e -> e.user).max("timestamp").print(); // 指定字段名称
    13. env.execute();
    14. }
    15. }

     

    1. package com.me.chapter05;
    2. import org.apache.flink.api.common.functions.MapFunction;
    3. import org.apache.flink.api.common.functions.ReduceFunction;
    4. import org.apache.flink.api.java.tuple.Tuple2;
    5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    6. public class TransReduceTest {
    7. public static void main(String[] args) throws Exception {
    8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    9. env.setParallelism(1);
    10. // 这里的 ClickSource()使用了之前自定义数据源小节中的 ClickSource()
    11. env.addSource(new ClickSource())
    12. // 将 Event 数据类型转换成元组类型
    13. .map(new MapFunction>() {
    14. @Override
    15. public Tuple2 map(Event e) throws Exception {
    16. return Tuple2.of(e.user, 1L);
    17. }
    18. })
    19. .keyBy(r -> r.f0) // 使用用户名来进行分流
    20. .reduce(new ReduceFunction>() {
    21. @Override
    22. public Tuple2 reduce(Tuple2 value1,
    23. Tuple2 value2) throws Exception {
    24. // 每到一条数据,用户 pv 的统计值加 1
    25. return Tuple2.of(value1.f0, value1.f1 + value2.f1);
    26. }
    27. })
    28. .keyBy(r -> true) // 为每一条数据分配同一个 key,将聚合结果发送到一条流中 去
    29. .reduce(new ReduceFunction>() {
    30. @Override
    31. public Tuple2 reduce(Tuple2 value1,
    32. Tuple2 value2) throws Exception {
    33. // 将累加器更新为当前最大的 pv 统计值,然后向下游发送累加器的值
    34. return value1.f1 > value2.f1 ? value1 : value2;
    35. }
    36. }).print();
    37. env.execute();
    38. }
    39. }

    reduce 同简单聚合算子一样,也要针对每一个 key 保存状态。因为状态不会清空,所以我 们需要将 reduce 算子作用在一个有限 key 的流上。

    1.3.3 用户自定义函数(UDF)

     

    1. import org.apache.flink.api.common.functions.FilterFunction;
    2. import org.apache.flink.streaming.api.datastream.DataStream;
    3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    5. public class TransFunctionUDFTest {
    6. public static void main(String[] args) throws Exception {
    7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    8. env.setParallelism(1);
    9. 93
    10. DataStreamSource clicks = env.fromElements(
    11. new Event("Mary", "./home", 1000L),
    12. new Event("Bob", "./cart", 2000L)
    13. );
    14. DataStream stream = clicks.filter(new FlinkFilter());
    15. stream.print();
    16. env.execute();
    17. }
    18. public static class FlinkFilter implements FilterFunction {
    19. @Override
    20. public boolean filter(Event value) throws Exception {
    21. return value.url.contains("home");
    22. }
    23. }
    24. }

     

      

     

    1. import org.apache.flink.api.common.functions.RichMapFunction;
    2. import org.apache.flink.configuration.Configuration;
    3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    5. public class RichFunctionTest {
    6. public static void main(String[] args) throws Exception {
    7. StreamExecutionEnvironment env =
    8. StreamExecutionEnvironment.getExecutionEnvironment();
    9. env.setParallelism(2);
    10. DataStreamSource clicks = env.fromElements(
    11. new Event("Mary", "./home", 1000L),
    12. new Event("Bob", "./cart", 2000L),
    13. new Event("Alice", "./prod?id=1", 5 * 1000L),
    14. new Event("Cary", "./home", 60 * 1000L)
    15. );
    16. // 将点击事件转换成长整型的时间戳输出
    17. clicks.map(new RichMapFunction() {
    18. @Override
    19. public void open(Configuration parameters) throws Exception {
    20. super.open(parameters);
    21. System.out.println(" 索 引 为 " +
    22. getRuntimeContext().getIndexOfThisSubtask() + " 的任务开始");
    23. }
    24. @Override
    25. public Long map(Event value) throws Exception {
    26. return value.timestamp;
    27. 98
    28. }
    29. @Override
    30. public void close() throws Exception {
    31. super.close();
    32. System.out.println(" 索 引 为 " +
    33. getRuntimeContext().getIndexOfThisSubtask() + " 的任务结束");
    34. }
    35. })
    36. .print();
    37. env.execute();
    38. }
    39. }

    1. public class MyFlatMap extends RichFlatMapFunction> {
    2. @Override
    3. public void open(Configuration configuration) {
    4. // 做一些初始化工作
    5. // 例如建立一个和 MySQL 的连接
    6. }
    7. @Override
    8. public void flatMap(IN in, Collector {
    9. // 对数据库进行读写
    10. }
    11. @Override
    12. public void close() {
    13. // 清理工作,关闭和 MySQL 数据库的连接。
    14. 99
    15. }
    16. }

    另外,富函数类提供了 getRuntimeContext()方法(我们在本节的第一个例子中使用了一 下),可以获取到运行时上下文的一些信息,例如程序执行的并行度,任务名称,以及状态 (state)。这使得我们可以大大扩展程序的功能,特别是对于状态的操作,使得 Flink 中的算子 具备了处理复杂业务的能力。关于 Flink 中的状态管理和状态编程,我们会在后续章节逐渐展 开。

    1.3.4  物理分区(Physical Partitioning)

    1. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    3. public class ShuffleTest {
    4. public static void main(String[] args) throws Exception {
    5. // 创建执行环境
    6. StreamExecutionEnvironment env =
    7. StreamExecutionEnvironment.getExecutionEnvironment();
    8. env.setParallelism(1);
    9. // 读取数据源,并行度为 1
    10. DataStreamSource stream = env.addSource(new ClickSource());
    11. // 经洗牌后打印输出,并行度为 4
    12. stream.shuffle().print("shuffle").setParallelism(4);
    13. env.execute();
    14. }
    15. }
    16. 可以得到如下形式的输出结果:
    17. shuffle:1> Event{user='Bob', url='./cart', timestamp=...}
    18. shuffle:4> Event{user='Cary', url='./home', timestamp=...}
    19. shuffle:3> Event{user='Alice', url='./fav', timestamp=...}
    20. shuffle:4> Event{user='Cary', url='./cart', timestamp=...}
    21. shuffle:3> Event{user='Cary', url='./fav', timestamp=...}
    22. shuffle:1> Event{user='Cary', url='./home', timestamp=...}
    23. shuffle:2> Event{user='Mary', url='./home', timestamp=...}
    24. shuffle:1> Event{user='Bob', url='./fav', timestamp=...}
    25. shuffle:2> Event{user='Mary', url='./home', timestamp=...}
    26. ...

    1. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    3. public class RebalanceTest {
    4. public static void main(String[] args) throws Exception {
    5. // 创建执行环境
    6. StreamExecutionEnvironment env =
    7. StreamExecutionEnvironment.getExecutionEnvironment();
    8. env.setParallelism(1);
    9. // 读取数据源,并行度为 1
    10. DataStreamSource stream = env.addSource(new ClickSource());
    11. // 经轮询重分区后打印输出,并行度为 4
    12. stream.rebalance().print("rebalance").setParallelism(4);
    13. env.execute();
    14. }
    15. }

     

    1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    2. import
    3. org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    4. public class RescaleTest {
    5. public static void main(String[] args) throws Exception {
    6. StreamExecutionEnvironment env =
    7. StreamExecutionEnvironment.getExecutionEnvironment();
    8. env.setParallelism(1);
    9. // 这里使用了并行数据源的富函数版本
    10. // 这样可以调用 getRuntimeContext 方法来获取运行时上下文的一些信息
    11. env
    12. .addSource(new RichParallelSourceFunction() {
    13. @Override
    14. public void run(SourceContext sourceContext) throws
    15. Exception {
    16. for (int i = 0; i < 8; i++) {
    17. // 将奇数发送到索引为 1 的并行子任务
    18. // 将偶数发送到索引为 0 的并行子任务
    19. if ((i + 1) % 2 ==
    20. getRuntimeContext().getIndexOfThisSubtask()) {
    21. sourceContext.collect(i + 1);
    22. }
    23. }
    24. }
    25. @Override
    26. public void cancel() {
    27. }
    28. })
    29. .setParallelism(2)
    30. .rescale()
    31. .print().setParallelism(4);
    32. env.execute();
    33. }
    34. }

    这里使用 rescale 方法,来做数据的分区,输出结果是:

    1. 4> 3
    2. 3> 1
    3. 1> 2
    4. 1> 6
    5. 3> 5
    6. 4> 7
    7. 2> 4
    8. 2> 8

    1. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    3. public class BroadcastTest {
    4. public static void main(String[] args) throws Exception {
    5. // 创建执行环境
    6. StreamExecutionEnvironment env =
    7. StreamExecutionEnvironment.getExecutionEnvironment();
    8. env.setParallelism(1);
    9. // 读取数据源,并行度为 1
    10. DataStreamSource stream = env.addSource(new ClickSource());
    11. // 经广播后打印输出,并行度为 4
    12. stream. broadcast().print("broadcast").setParallelism(4);
    13. env.execute();
    14. }
    15. }

    1. import org.apache.flink.api.common.functions.Partitioner;
    2. import org.apache.flink.api.java.functions.KeySelector;
    3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    4. public class CustomPartitionTest {
    5. public static void main(String[] args) throws Exception {
    6. StreamExecutionEnvironment env =
    7. StreamExecutionEnvironment.getExecutionEnvironment();
    8. env.setParallelism(1);
    9. // 将自然数按照奇偶分区
    10. env.fromElements(1, 2, 3, 4, 5, 6, 7, 8)
    11. .partitionCustom(new Partitioner() {
    12. @Override
    13. public int partition(Integer key, int numPartitions) {
    14. return key % 2;
    15. }
    16. }, new KeySelector() {
    17. @Override
    18. public Integer getKey(Integer value) throws Exception {
    19. return value;
    20. }
    21. })
    22. .print().setParallelism(2);
    23. env.execute();
    24. }
    25. }

    1.4 输出算子(Sink)

    1.4.1 连接到外部系统

    1.4.2 输出到文件

    1. import org.apache.flink.api.common.serialization.SimpleStringEncoder;
    2. import org.apache.flink.core.fs.Path;
    3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    5. import
    6. org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
    7. import
    8. org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.Defa
    9. ultRollingPolicy;
    10. import java.util.concurrent.TimeUnit;
    11. public class SinkToFileTest {
    12. public static void main(String[] args) throws Exception{
    13. StreamExecutionEnvironment env =
    14. StreamExecutionEnvironment.getExecutionEnvironment();
    15. 109
    16. env.setParallelism(4);
    17. DataStreamSource stream = env.fromElements(new Event("Mary",
    18. "./home", 1000L),
    19. new Event("Bob", "./cart", 2000L),
    20. new Event("Alice", "./prod?id=100", 3000L),
    21. new Event("Alice", "./prod?id=200", 3500L),
    22. new Event("Bob", "./prod?id=2", 2500L),
    23. new Event("Alice", "./prod?id=300", 3600L),
    24. new Event("Bob", "./home", 3000L),
    25. new Event("Bob", "./prod?id=1", 2300L),
    26. new Event("Bob", "./prod?id=3", 3300L));
    27. StreamingFileSink fileSink = StreamingFileSink
    28. .forRowFormat(new Path("./output"),
    29. new SimpleStringEncoder<>("UTF-8"))
    30. .withRollingPolicy(
    31. DefaultRollingPolicy.builder()
    32. .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)
    33. )
    34. .withInactivityInterval(TimeUnit.MINUTES.toMillis(5
    35. ))
    36. .withMaxPartSize(1024 * 1024 * 1024)
    37. .build())
    38. .build();
    39. // 将 Event 转换成 String 写入文件
    40. stream.map(Event::toString).addSink(fileSink);
    41. env.execute();
    42. }
    43. }

    1.4.3 输出到Kafka

    Kafka 是一个分布式的基于发布/订阅的消息系统,本身处理的也是流式数据,所以跟Flink“天生一对”,经常会作为 Flink 的输入数据源和输出系统。

    1. import org.apache.flink.api.common.serialization.SimpleStringSchema;
    2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    5. import java.util.Properties;
    6. public class SinkToKafkaTest {
    7. public static void main(String[] args) throws Exception {
    8. StreamExecutionEnvironment env =
    9. StreamExecutionEnvironment.getExecutionEnvironment();
    10. env.setParallelism(1);
    11. Properties properties = new Properties();
    12. properties.put("bootstrap.servers", "hadoop102:9092");
    13. DataStreamSource stream = env.readTextFile("input/clicks.csv");
    14. stream
    15. .addSink(new FlinkKafkaProducer(
    16. "clicks",
    17. new SimpleStringSchema(),
    18. properties
    19. ));
    20. env.execute();
    21. }
    22. }

    1.4.4 输出到Redis

    1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    2. import org.apache.flink.streaming.connectors.redis.RedisSink;
    3. 112
    4. import
    5. org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfi
    6. g;
    7. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    8. import
    9. org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescrip
    10. tion;
    11. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    12. public class SinkToRedisTest {
    13. public static void main(String[] args) throws Exception {
    14. StreamExecutionEnvironment env =
    15. StreamExecutionEnvironment.getExecutionEnvironment();
    16. env.setParallelism(1);
    17. // 创建一个到 redis 连接的配置
    18. FlinkJedisPoolConfig conf = new
    19. FlinkJedisPoolConfig.Builder().setHost("hadoop102").build();
    20. env.addSource(new ClickSource())
    21. .addSink(new RedisSink(conf, new MyRedisMapper()));
    22. env.execute();
    23. }
    24. }

    1.3.4 输出到 Elasticsearch

    1. import org.apache.flink.api.common.functions.RuntimeContext;
    2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    4. import
    5. org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
    6. ;
    7. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
    8. import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
    9. import org.apache.http.HttpHost;
    10. import org.elasticsearch.action.index.IndexRequest;
    11. import org.elasticsearch.client.Requests;
    12. import java.sql.Timestamp;
    13. import java.util.ArrayList;
    14. import java.util.HashMap;
    15. public class SinkToEsTest {
    16. public static void main(String[] args) throws Exception {
    17. StreamExecutionEnvironment env =
    18. StreamExecutionEnvironment.getExecutionEnvironment();
    19. env.setParallelism(1);
    20. DataStreamSource stream = env.fromElements(
    21. new Event("Mary", "./home", 1000L),
    22. new Event("Bob", "./cart", 2000L),
    23. new Event("Alice", "./prod?id=100", 3000L),
    24. new Event("Alice", "./prod?id=200", 3500L),
    25. new Event("Bob", "./prod?id=2", 2500L),
    26. new Event("Alice", "./prod?id=300", 3600L),
    27. new Event("Bob", "./home", 3000L),
    28. new Event("Bob", "./prod?id=1", 2300L),
    29. new Event("Bob", "./prod?id=3", 3300L));
    30. ArrayList httpHosts = new ArrayList<>();
    31. httpHosts.add(new HttpHost("hadoop102", 9200, "http"));
    32. // 创建一个 ElasticsearchSinkFunction
    33. ElasticsearchSinkFunction elasticsearchSinkFunction = new
    34. ElasticsearchSinkFunction() {
    35. @Override
    36. public void process(Event element, RuntimeContext ctx, RequestIndexer
    37. indexer) {
    38. HashMap data = new HashMap<>();
    39. data.put(element.user, element.url);
    40. IndexRequest request = Requests.indexRequest()
    41. .index("clicks")
    42. .type("type") // Es 6 必须定义 type
    43. .source(data);
    44. indexer.add(request);
    45. }
    46. };
    47. stream.addSink(new ElasticsearchSink.Builder(httpHosts,
    48. elasticsearchSinkFunction).build());
    49. env.execute();
    50. }
    51. }

    1.4.6 输出到 MySQL(JDBC)

    1. package com.me.chapter05;
    2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    3. import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
    4. import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
    5. import org.apache.flink.connector.jdbc.JdbcSink;
    6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    7. public class SinkToMySQL {
    8. public static void main(String[] args) throws Exception {
    9. StreamExecutionEnvironment env =
    10. StreamExecutionEnvironment.getExecutionEnvironment();
    11. env.setParallelism(1);
    12. DataStreamSource stream = env.fromElements(
    13. new Event("Mary", "./home", 1000L),
    14. new Event("Bob", "./cart", 2000L),
    15. new Event("Alice", "./prod?id=100", 3000L),
    16. new Event("Alice", "./prod?id=200", 3500L),
    17. new Event("Bob", "./prod?id=2", 2500L),
    18. new Event("Alice", "./prod?id=300", 3600L),
    19. new Event("Bob", "./home", 3000L),
    20. new Event("Bob", "./prod?id=1", 2300L),
    21. new Event("Bob", "./prod?id=3", 3300L));
    22. stream.addSink(
    23. JdbcSink.sink(
    24. "INSERT INTO clicks (user, url) VALUES (?, ?)",
    25. (statement, r) -> {
    26. statement.setString(1, r.user);
    27. statement.setString(2, r.url);
    28. },
    29. JdbcExecutionOptions.builder()
    30. .withBatchSize(1000)
    31. .withBatchIntervalMs(200)
    32. .withMaxRetries(5)
    33. .build(),
    34. new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    35. .withUrl("jdbc:mysql://localhost:3306/userbehavior")
    36. // 对于 MySQL 5.7,用"com.mysql.jdbc.Driver"
    37. .withDriverName("com.mysql.cj.jdbc.Driver")
    38. .withUsername("username")
    39. .withPassword("password")
    40. .build()
    41. )
    42. );
    43. env.execute();
    44. }
    45. }

    1.4.7 自定义 Sink 输出

    1. import org.apache.flink.configuration.Configuration;
    2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    3. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    4. import org.apache.hadoop.hbase.HBaseConfiguration;
    5. import org.apache.hadoop.hbase.TableName;
    6. import org.apache.hadoop.hbase.client.Connection;
    7. import org.apache.hadoop.hbase.client.ConnectionFactory;
    8. import org.apache.hadoop.hbase.client.Put;
    9. import org.apache.hadoop.hbase.client.Table;
    10. import java.nio.charset.StandardCharsets;
    11. public class SinkCustomtoHBase {
    12. public static void main(String[] args) throws Exception {
    13. StreamExecutionEnvironment env =
    14. StreamExecutionEnvironment.getExecutionEnvironment();
    15. env.setParallelism(1);
    16. env
    17. .fromElements("hello", "world")
    18. .addSink(
    19. new RichSinkFunction() {
    20. public org.apache.hadoop.conf.Configuration
    21. configuration; // 管理 Hbase 的配置信息,这里因为 Configuration 的重名问题,将类以完整路径
    22. 导入
    23. public Connection connection; // 管理 Hbase 连接
    24. 119
    25. @Override
    26. public void open(Configuration parameters) throws
    27. Exception {
    28. super.open(parameters);
    29. configuration = HBaseConfiguration.create();
    30. configuration.set("hbase.zookeeper.quorum",
    31. "hadoop102:2181");
    32. connection =
    33. ConnectionFactory.createConnection(configuration);
    34. }
    35. @Override
    36. public void invoke(String value, Context context) throws
    37. Exception {
    38. Table table =
    39. connection.getTable(TableName.valueOf("test")); // 表名为 test
    40. Put put = new
    41. Put("rowkey".getBytes(StandardCharsets.UTF_8)); // 指定 rowkey
    42. put.addColumn("info".getBytes(StandardCharsets.UTF_8) // 指定列名
    43. , value.getBytes(StandardCharsets.UTF_8) // 写
    44. 入的数据
    45. , "1".getBytes(StandardCharsets.UTF_8)); // 写
    46. 入的数据
    47. table.put(put); // 执行 put 操作
    48. table.close(); // 将表关闭
    49. }
    50. @Override
    51. public void close() throws Exception {
    52. super.close();
    53. connection.close(); // 关闭连接
    54. }
    55. }
    56. );
    57. env.execute();
    58. }
    59. }

    这里要加上Hbase的版本,文档里没有自己要进行相应更改

    1.5 本章总结

  • 相关阅读:
    XD2023新版 Experience Design v57.1.12.2
    金融贷款风险预测:使用图神经网络模型进行违约概率评估
    SpringBoot的创建与使用
    大话C#之实践场景入门进阶必知点a,深入浅出解析教程 31 继承和抽象类中的重写方法入门
    Nwafu-OJ-1485 Problem o C语言实习题八——单链表结点的阶乘和
    2023CSP-S初赛复习整理
    全志ARM926 Melis2.0系统的开发指引⑤
    【oracle数据库】最全最详细的数据库查询
    凸面镜反射场景无监督域适应语义分割的一些问题
    SpringBoot热部署
  • 原文地址:https://blog.csdn.net/qq_64557330/article/details/127173044