低延迟,高吞吐、结果精准,良好的容错
? 支持事件时间(event-time)和处理时间(processing-time)语义
? 精确一次(exactly-once)的状态一致性保证
? 低延迟,每秒处理数百万个事件,毫秒级延迟
? 与众多常用存储系统的连接
? 高可用,动态扩展,实现7*24小时全天候运行
越顶层越抽象,表达含义越简明,使用越方便
越底层越具体,表达能力越丰富,使用越灵活

?数据模型
-spark采用RDD模型,spark streaming的DStream实际上也就是一组组小批数据RDD的集合
-flink基秀数据模型是数据流,以及事件(Event)序列
?运行时架构
-spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个
-flink是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节
点进行处理













1、通常jobmanager 的配置比 taskmanager,因为干活的是taskmanager
2、并行度不一定比slots小,一定比集群总的slots小





















作业管理器 JobManager作用
1、控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。
2、JobManager会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow Graph)和打包了所有的类、库和其它资源的JAR包。
3、JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做"执行图"(ExecutionGraph),包含了所有可以并发执行的任务。
4、JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(兀skManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的hskManager上。而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
任务管理器 TaskManager
1、Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots),插槽的数量限制了TaskManager能够执行的任务数量。
2、启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。
3、在执行过程中,TgskManager可以跟其它运行同一应用程序的TaskManager交换数据。
资源管理器 ResourceManager
1、主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。
2、Flink为不同的环境和资源管理工具提供了不同资源管理器,比如SRN、Mesos、K8s, 以及Standalone部署。
3、当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分酉已给JobManager。如果ResourceManager没有有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动心TaskManager进程的容器。
分发器
1、可以跨作业运行,它为应用提交提供了REST接口。
2、当一个应用被提交执行时,分发器就会启动并将应用移交给一个
JobManager
3、Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执
行的信息。
4、Dispatcher在架构中可能并不是必需的,这取决于应用提交运行
的方式。














import com.tan.flink.bean.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class SourceFromCollection {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream inputDataStream = env.fromCollection(Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_6", 1547718201L, 15.4),
new SensorReading("sensor_7", 1547718202L, 6.7),
new SensorReading("sensor_10", 1547718205L, 38.1)
));
inputDataStream.print();
env.execute();
}
}
env.readTextFile(path);
pom 依赖
org.apache.flink
flink-connector-kafka-0.11_2.12
1.10.1
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
public class SourceFromKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// kafka 配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.200.102:9092,192.168.200.102:9092,192.168.200.104:9092");
properties.setProperty("group.id", "flink-kafka");
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
DataStreamSource inputDataStream = env.addSource(new FlinkKafkaConsumer011(
"sensor",
new SimpleStringSchema(),
properties
));
inputDataStream.print();
env.execute();
}
}
需要实现SourceFunction 或者继承SourceFunction的富函数RichSourceFunction
import com.tan.flink.bean.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
import java.util.UUID;
public class SourceFromCustom {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource inputDataStream = env.addSource(new CustomSource());
inputDataStream.print();
env.execute();
}
public static class CustomSource implements SourceFunction {
boolean running = true;
@Override
public void run(SourceContext sourceContext) throws Exception {
Random random = new Random();
while (running) {
// 每隔 100 秒数据
for (int i = 0; i < 5; i++) {
String id = UUID.randomUUID().toString().substring(0, 8);
long timestamp = System.currentTimeMillis();
double temperature = 60 + random.nextGaussian() * 20;
sourceContext.collect(new SensorReading(id, timestamp, temperature));
Thread.sleep(100L);
}
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
running = false;
}
}
}