开发环境:WIN10+IDEA
<properties>
<maven.compiler.source>8maven.compiler.source>
<maven.compiler.target>8maven.compiler.target>
<flink.version>1.14.6flink.version>
<scala.binary.version>2.12scala.binary.version>
<slf4j.version>2.0.3slf4j.version>
<log4j.version>2.17.2log4j.version>
<fastjson.version>2.0.19fastjson.version>
<lombok.version>1.18.24lombok.version>
properties>
<dependencies>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-javaartifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_${scala.binary.version}artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clients_${scala.binary.version}artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-runtime-web_${scala.binary.version}artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-apiartifactId>
<version>${slf4j.version}version>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-log4j12artifactId>
<version>${slf4j.version}version>
dependency>
<dependency>
<groupId>org.apache.logging.log4jgroupId>
<artifactId>log4j-to-slf4jartifactId>
<version>${log4j.version}version>
dependency>
dependencies>
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Hello {
public static void main(String[] args) throws Exception {
//创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//创建流数据源
DataStreamSource<Long> d = env.fromElements(1L, 2L, 3L, 4L);
//--------------------------------- Transform ----------------------------------------
d.print();
//--------------------------------- Transform ----------------------------------------
env.execute();
}
}
d.map(s -> s + 1L).print();
returns指定泛型的类型import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
d
.flatMap((FlatMapFunction<Long, Long>) (value, out) -> {
out.collect(value * value);
out.collect(-value * value);
})
.returns(Types.LONG)
.print();
d.filter(i -> (i % 2 == 0)).print();
//创建3条流
DataStreamSource<Integer> d1 = env.fromElements(1);
DataStreamSource<Integer> d2 = env.fromElements(2, 2);
DataStreamSource<Integer> d3 = env.fromElements(3, 3, 3);
//联合2条流
d1.union(d2).union(d3).print();
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
//创建2条流
DataStreamSource<Integer> d1 = env.fromElements(1, 2, 3, 4);
DataStreamSource<String> d2 = env.fromElements("a", "b", "c");
//连结2条流
ConnectedStreams<Integer, String> dd = d1.connect(d2);
//分别取出2条流
DataStream<Integer> s1 = dd.getFirstInput();
DataStream<String> s2 = dd.getSecondInput();
//打印
s1.print("first");
s2.print("second");
DataStream => KeyedStreamimport org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Hi {
public static void main(String[] args) throws Exception {
//创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//创建流数据源
DataStreamSource<String> d = env.fromElements("1", "4", "5", "2", "3");
//分区
KeyedStream<String, Integer> k = d.keyBy(i -> (Integer.parseInt(i) % 2));
//归约
k.reduce((ReduceFunction<String>) (value1, value2) -> value1 + "," + value2).print();
//执行
env.execute();
}
}
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Hi {
public static void main(String[] args) throws Exception {
//创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//创建流数据源
DataStreamSource<Integer> d = env.fromElements(1, 2, 3, 4, 5);
//分区
KeyedStream<Integer, Integer> k = d.keyBy(i -> (i % 2));
//聚合计算
k.sum(0).print("sum");
k.max(0).print("max");
k.min(0).print("min");
//执行
env.execute();
}
}
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
d.process(new ProcessFunction<Long, String>() {
@Override
public void processElement(Long value, Context ctx, Collector<String> out) {
out.collect(value + "L");
}
}).print();
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
KeyedStream<Long, Integer> k = d.keyBy(i -> (int) (i % 2));
k.process(new KeyedProcessFunction<Integer, Long, String>() {
@Override
public void processElement(Long value, Context ctx, Collector<String> out) {
System.out.println("当前key:" + ctx.getCurrentKey());
out.collect(value + "L");
}
}).print("输出");