flink 1.15.1集群安装部署
| 节点 | 服务 |
|---|---|
| lsyk01 | master |
| lsyk02 | worker |
| lsyk03 | worker |
| lsyk04 | worker |
java 11.0.15.1
hadoop:3.3.3
rhel:7.8
解压安装
tar -xvf flink-1.15.1-bin-scala_2.12.tgz -C /opt
# 创建目录
mkdir -p /opt/flink-1.15.1/tmp
mkdir -p /opt/flink-1.15.1/pids
mkdir -p /opt/flink-1.15.1/usrlib
配置maseters、works
# 配置masters
vi /opt/flink-1.15.1/conf/masters
# 修改为
lsyk01:8081
# 配置works
vi /opt/flink-1.15.1/conf/workers
# 修改为
lsyk01
lsyk02
lsyk03
配置flink-conf.yaml
# master地址
jobmanager.rpc.address: lsyk01
# master端口,不用改
jobmanager.rpc.port: 6123
# master地址绑定设置(master节点参数)
jobmanager.bind-host: 0.0.0.0
# worker地址绑定设置
taskmanager.bind-host: 0.0.0.0
# worker地址(注意:三个worker节点的host不一样)
taskmanager.host: lsyk02
# worker槽位数设置,默认为 1
taskmanager.numberOfTaskSlots: 2
# 默认并行度,默认为 1
parallelism.default: 2
# WEB UI 端口(master节点参数) 放开注释即可
rest.port: 8081
# WEB UI 管理地址
rest.address: lsyk01
# WEB UI 地址绑定设置,想让外部访问,可以设置具体的IP,或者直接设置成“0.0.0.0”(master节点参数)
rest.bind-address: 0.0.0.0
# IO临时目录,默认:/tmp
io.tmp.dirs: /opt/flink-1.15.1/tmp
# 集群节点进程ID存放目录,默认:/tmp
env.pid.dir: /opt/flink-1.15.1/pids
# Job文件目录(master节点参数)
web.upload.dir: /opt/flink-1.15.1/usrlib
分发到其他节点
scp -r /opt/flink-1.15.1 lsyk02:/opt
scp -r /opt/flink-1.15.1 lsyk03:/opt
scp -r /opt/flink-1.15.1 lsyk04:/opt
每个worker节点修改:taskmanager.host: 参数
启动停止
上述配置没有进行yarn配置,所以是以standalone启动。
只能在配置的master节点启动集群
/opt/flink-1.15.1/bin/start-cluster.sh

查看进程 :jps

/opt/flink-1.15.1/bin/stop-cluster.sh

http://lsyk01:8081


提交作业
直接管理页面提交"Submit New Job",上传jar包即可
在将 Flink 任务部署至 YARN 集群之前,需要确认集群是否安装有 Hadoop,保证 Hadoop版本至少在 2.2 以上,并且集群中安装有 HDFS 服务。
配置环境变量,增加环境变量配置如下
vi /etc/profile
# 添加如下
export HADOOP_HOME=/opt/hadoop-3.3.3
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
export FLINK_HOME=/opt/flink-1.15.1
export PATH=$PATH:$FLINK_HOME/bin
# 分发到其他节点
启动hadoop
# 先启动hadoop,在hadoop的name node节点执行
start-all.sh
pplication Mode将在YARN上启动一个Flink集群,其中应用程序jar的main()方法将在YARN中的JobManager上执行。应用程序一完成,集群就会关闭。
可以通过yarn application -kill 或取消Flink作业手动停止集群。
为了释放Application Mode的全部潜力,请考虑与yarn.provided.lib.dirs配置项一起使用将flink的依赖jar、应用程序jar上传到集群中所有节点都可以访问的位置。
下述操作将使作业提交变得更加轻量级,因为所需的Flink jar和应用程序jar将由指定的远程位置提取,而不是由客户机发送到集群。
# 启动flink,在master节点执行
cd /opt/flink-1.15.1/
#将flink lib及应用的jar上传到hdfs
hadoop fs -mkdir -p hdfs://lsyk01:9000/jars/flink
hadoop fs -copyFromLocal lib/*.jar hdfs://lsyk01:9000/jars/flink/
hadoop fs -mkdir -p hdfs://lsyk01:9000/jars/apps
hadoop fs -copyFromLocal ./examples/streaming/TopSpeedWindowing.jar hdfs://lsyk01:9000/jars/apps/
hadoop fs -copyFromLocal ./examples/batch/WordCount.jar hdfs://lsyk01:9000/jars/apps/
hadoop fs -copyFromLocal ./examples/batch/README.txt hdfs://lsyk01:9000/tmp/
#运行TopSpeedWindowing
./bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://lsyk01:9000/jars/flink/" hdfs://lsyk01:9000/jars/apps/TopSpeedWindowing.jar


点击去:


查看作业
# 列出集群上正在运行的作业,列出jobId、jobName
flink list -t yarn-application -Dyarn.application.id=application_1659191584047_0001

终止作业
#取消任务: jobId
#请注意,取消应用程序集群上的作业将停止该集群。
flink cancel -t yarn-application -Dyarn.application.id=application_1659191584047_0001 142d134309edd4ab858e93b870d996a5
或者在flink的管理界面终止:

运行batch任务
#运行wordcount示例
./bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://lsyk01:9000/jars/flink/" hdfs://lsyk01:9000/jars/apps/WordCount.jar --input hdfs://lsyk01:9000/tmp/README.txt --output hdfs://lsyk01:9000/tmp/wordcount-result10.txt
hadoop fs -ls hdfs://lsyk01:9000/tmp/wordcount-result10.txt
hadoop fs -cat hdfs://lsyk01:9000/tmp/wordcount-result10.txt/1
hadoop fs -cat hdfs://lsyk01:9000/tmp/wordcount-result10.txt/2

Per-Job Cluster Mode将在YARN上启动一个Flink集群,然后在本地运行提供的应用程序jar,最后将JobGraph提交给YARN上的JobManager。如果传递–detached参数,客户端将在提交被接受后停止。
一个任务会对应一个Job,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。
一旦作业停止,Flink集群就会停止。
#提交任务
./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar
#列出集群上正在运行的作业, 列出jobId、jobName
./bin/flink list -t yarn-per-job -Dyarn.application.id=application_1659191584047_0005
#取消任务: jobId
#请注意,取消应用程序集群上的作业将停止该集群。
./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_1659191584047_0005 639979328d1e2f5353c7c91deb07ad2a
Application Mode 与 Per-Job Mode的区别