• Flink(Pometheus监控)


    简介

    Flink发布监控全流程

    入门

    使用架构图

    特点

    能够监控进程内部的信息

    规范化的数据模型

    所有采集的监控数据均以指标(metric)的形式保存在内置的时间序列数据库当中(TSDB)。所有的样本除了基本的指标名称以外,还包含一组用于描述该样本特征的标签。如下所示:

    1. http_request_status{code='200',content_path='/api/path',environment='produment'} => [value1@timestamp1,value2@timestamp2...]
    2. http_request_status{code='200',content_path='/api/path2',environment='produment'} => [value1@timestamp1,value2@timestamp2...]

    每一条时间序列由指标名称(Metrics Name)以及一组标签(Labels)唯一标识。每条时间序列按照时间的先后顺序存储一系列的样本值。

    • http_request_status:指标名称(Metrics Name)
    • {code='200',content_path='/api/path',environment='produment'}:表示维度的标签,基于这些Labels我们可以方便地对监控数据进行聚合,过滤,裁剪。
    • [value1@timestamp1,value2@timestamp2...]:按照时间的先后顺序 存储的样本值。

    查询语言PromQL

    Prometheus内置了一个强大的数据查询语言PromQL。 通过PromQL可以实现对监控数据的查询、聚合。同时PromQL也被应用于数据可视化(如Grafana)以及告警当中。

    通过PromQL可以轻松回答类似于以下问题:

    • 在过去一段时间中95%应用延迟时间的分布范围?
    • CPU占用率前5位的服务有哪些?

    Prometheus的架构

    官网

    Prometheus - Monitoring system & time series database

    下载地址

    Download | Prometheus

    安装包

    链接:https://pan.baidu.com/s/1pvbFCCLv6XekPk8h6o1nkA 
    提取码:yyds 
    --来自百度网盘超级会员V4的分享

    使用

    解压

    部署情况

    masternode1node2

    prometheus

    pushgateway

    node exporter

    node exporternode exporter

    修改prometheus.yml

    1. scrape_configs:
    2. - job_name: 'prometheus'
    3. static_configs:
    4. - targets: ['master:9090']
    5. # 添加 PushGateway 监控配置
    6. - job_name: 'pushgateway'
    7. static_configs:
    8. - targets: ['master:9091']
    9. labels:
    10. instance: pushgateway
    11. # 添加 Node Exporter 监控配置
    12. - job_name: 'node exporter'
    13. static_configs:
    14. - targets: ['master:9100', 'node1:9100', 'node2:9100']

    参数说明

    • job_name:监控作业的名称
    • static_configs:表示静态目标配置,就是固定从某个target拉取数据
    • targets:指定监控的目标,其实就是从哪儿拉取数据。Prometheus会从http://hadoop202:9090/metrics上拉取数据。

    Prometheus是可以在运行时自动加载配置的。启动时需要添加:--web.enable-lifecycle

     修改配置如图

    分发node_exporter

    ./xsync /home/bigdata/prome/node_exporter-1.2.2.linux-amd64/

    启动

    启动prometheus

    nohup ./prometheus --config.file=prometheus.yml > ./prometheus.log 2>&1 &

    启动pushgateway

    nohup ./pushgateway --web.listen-address=":9091" > ./pushgateway.log 2>&1 &

    启动node_exporter(三台机器都启动)

    ./node_exporter  & 

    访问 prometheus的9090端口

    点击对应的界面进行查看

    使用PromSql

    按时间查询

    node_arp_entries[5m]

    m表示分钟 

    条件查询

    node_arp_entries{device='ens33',instance='node1:9100'}

    使用正则表达式

    node_arp_entries{device=~'^ens33'}

    使用条件

    node_arp_entries{device=~'^ens33'}[1m] offset 10m

    对于历史数据累加

    sum(node_arp_entries{device=~'^ens33'} offset 10m) by(device)

    监控Flink

    添加配置文件

    添加依赖

    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-metrics-prometheus_2.12artifactId>
    4. <version>1.13.5version>
    5. <scope>providedscope>
    6. dependency>

    打包插件

    1. <build>
    2. <plugins>
    3. <plugin>
    4. <groupId>org.apache.maven.pluginsgroupId>
    5. <artifactId>maven-assembly-pluginartifactId>
    6. <version>3.0.0version>
    7. <configuration>
    8. <descriptorRefs>
    9. <descriptorRef>jar-with-dependenciesdescriptorRef>
    10. descriptorRefs>
    11. configuration>
    12. <executions>
    13. <execution>
    14. <id>make-assemblyid>
    15. <phase>packagephase>
    16. <goals>
    17. <goal>singlegoal>
    18. goals>
    19. execution>
    20. executions>
    21. plugin>
    22. plugins>
    23. build>

    在resource下面添加配置文件

    log4j.properties

    1. monitorInterval=30
    2. # This affects logging for both user code and Flink
    3. rootLogger.level = error
    4. rootLogger.appenderRef.file.ref = MainAppender
    5. # Uncomment this if you want to _only_ change Flink's logging
    6. #logger.flink.name = org.apache.flink
    7. #logger.flink.level = INFO
    8. # The following lines keep the log level of common libraries/connectors on
    9. # log level INFO. The root logger does not override this. You have to manually
    10. # change the log levels here.
    11. logger.akka.name = akka
    12. logger.akka.level = INFO
    13. logger.kafka.name= org.apache.kafka
    14. logger.kafka.level = INFO
    15. logger.hadoop.name = org.apache.hadoop
    16. logger.hadoop.level = INFO
    17. logger.zookeeper.name = org.apache.zookeeper
    18. logger.zookeeper.level = INFO
    19. logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
    20. logger.shaded_zookeeper.level = INFO
    21. # Log all infos in the given file
    22. appender.main.name = MainAppender
    23. appender.main.type = RollingFile
    24. appender.main.append = true
    25. appender.main.fileName = ${sys:log.file}
    26. appender.main.filePattern = ${sys:log.file}.%i
    27. appender.main.layout.type = PatternLayout
    28. appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    29. appender.main.policies.type = Policies
    30. appender.main.policies.size.type = SizeBasedTriggeringPolicy
    31. appender.main.policies.size.size = 100MB
    32. appender.main.policies.startup.type = OnStartupTriggeringPolicy
    33. appender.main.strategy.type = DefaultRolloverStrategy
    34. appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
    35. # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    36. logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    37. logger.netty.level = OFF

     flink-conf.yaml

    1. ##### 与Prometheus集成配置 #####
    2. metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
    3. # PushGateway的主机名与端口号
    4. metrics.reporter.promgateway.host: master
    5. metrics.reporter.promgateway.port: 9091
    6. ## Flink metric在前端展示的标签(前缀)与随机后缀
    7. metrics.reporter.promgateway.jobName: flink-metrics-ppg
    8. #如果jobName启动二次,那么第二次的时候会有一个随机的名字
    9. metrics.reporter.promgateway.randomJobNameSuffix: true
    10. metrics.reporter.promgateway.deleteOnShutdown: false
    11. #这里表示多久推一次数据
    12. metrics.reporter.promgateway.interval: 15 SECONDS

    启动程序的时候修改配置(由于加了provided,如果不修改配置那么就会加载不到)

    本地测试监控Flink

    传入参数

     对应的应用程序(本地测试)

    1. public class Demo01App {
    2. public static void main(String[] args) throws Exception {
    3. //0 调试取本地配置 ,打包部署前要去掉
    4. // Configuration configuration=new Configuration(); //此行打包部署专用
    5. // String resPath = Thread.currentThread().getContextClassLoader().getResource("flink-conf.yaml").getPath(); //本地调试专用
    6. Configuration configuration = GlobalConfiguration.loadConfiguration("C:\\Users\\zhang\\Desktop"); //本地调试专用
    7. //1. 读取初始化环境
    8. configuration.setString("metrics.reporter.promgateway.jobName","demo01App");
    9. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
    10. // 2. 指定nc的host和port
    11. ParameterTool parameterTool = ParameterTool.fromArgs(args);
    12. String hostname = parameterTool.get("host");
    13. int port = parameterTool.getInt("port");
    14. // 3. 接受socket数据源
    15. DataStreamSource dataStreamSource = env.socketTextStream(hostname, port);
    16. dataStreamSource.print();
    17. //appname
    18. env.execute("demo01App");
    19. }
    20. }

    测试程序

    查看控制台然后可以看到采集过来的数据 

     

    发布集群测试监控Flink

    先启动yarn

    修改linux里面flink的配置

    提交运行

    ./flink run -m node1:34982 -c com.atguigu.prome.app.Demo01App -p 2 ./flink-prome2022-1.1-SNAPSHOT.jar

    使用grafana

    安装

    解压

    tar -zxvf grafana-enterprise-8.1.2.linux-amd64.tar.gz

    启动

    nohup ./bin/grafana-server web > ./grafana.log 2>&1 &

    访问 

    监控Linux 

    先添加数据源

    如果和前一分钟比,它们的时间不在变化那么这个时候说明Flink挂掉了

    flink_jobmanager_job_uptime-flink_jobmanager_job_uptime offset 1m

    导入数据

     

    得到的效果为 

    监控Flink

    修改监控Flink配置文件的问题

    原因:是pushgateway不会主动的清理数据,监控面板的判断有误,如果我们改成现在和过去一分钟的数据进行减法如果等于零,也就是没有数据更新的时候改成complete

    原始值

    absent(flink_jobmanager_job_uptime{job_name="$JobName", job=~"$JobManager", job_id=~"$JobId", instance_id="$InstanceId"} > 0)

     修改后的值为

    absent(flink_jobmanager_job_uptime{job_name="$JobName", job=~"$JobManager", job_id=~"$JobId", instance_id="$InstanceId"} - flink_jobmanager_job_uptime{job_name="$JobName", job=~"$JobManager", job_id=~"$JobId", instance_id="$InstanceId"} offset 1m > 0)

    当程序停止以后可以看到

    添加心跳图

    因为只有图表才能发送报警

    配置查询参数 

    flink_jobmanager_job_uptime - flink_jobmanager_job_uptime offset 1m

     效果图

    Flink监控埋点

    示例程序

    1. public class Demo01App {
    2. public static void main(String[] args) throws Exception {
    3. //0 调试取本地配置 ,打包部署前要去掉
    4. // Configuration configuration=new Configuration(); //此行打包部署专用
    5. String resPath = Thread.currentThread().getContextClassLoader().getResource("flink-conf.yaml").getPath(); //本地调试专用
    6. Configuration configuration = GlobalConfiguration.loadConfiguration("C:\\Users\\zhang\\Desktop"); //本地调试专用
    7. //1. 读取初始化环境
    8. configuration.setString("metrics.r+eporter.promgateway.jobName","demo01App");
    9. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
    10. // 2. 指定nc的host和port
    11. ParameterTool parameterTool = ParameterTool.fromArgs(args);
    12. String hostname = parameterTool.get("host");
    13. int port = parameterTool.getInt("port");
    14. // 3. 接受socket数据源
    15. DataStreamSource dataStreamSource = env.socketTextStream(hostname, port);
    16. dataStreamSource.keyBy(new KeySelector() {
    17. @Override
    18. public String getKey(String s) throws Exception {
    19. return s;
    20. }
    21. }).process(new ProcessFunction() {
    22. Counter counter=null;
    23. @Override
    24. public void open(Configuration parameters) throws Exception {
    25. //TODO 申明埋点
    26. counter = getRuntimeContext().getMetricGroup().addGroup("mycount").counter("mycountTest");
    27. }
    28. @Override
    29. public void processElement(String s, ProcessFunction.Context context, Collector collector) throws Exception {
    30. // TODO 对于埋点的数据进行累加
    31. counter.inc();
    32. collector.collect(s);
    33. }
    34. }).print();
    35. //appname
    36. env.execute("demo01App");
    37. }
    38. }

    使用Prometheus得到指标

    http://master:9091/metrics

    上图可以看到自定义的指标收集到了 

    窗口最大值,求缓存命中率

    思想就是10分钟一个窗口,求出窗口的最大值,和上一个窗口进行减法然后就是10分钟的增量 

    自定义得到的数据

    使用grafana展示自定义指标

    添加图表,把查询Prometheus的查询得到的数据到grafana进行展示

    保存以后得到图标

    pushGetWay定期清理过期数据

    由于pushGetWay在任务挂掉一会不会自动清理掉数据,它是由最新的数据覆盖久数据的形式,如果任务挂了以后,那么就没有新的数据进行覆盖了,这个时候就会有数据的残留,我们得进行处理

    总结

    pushGetWay不会自动的删除过期的数据,Promethus默认保存15天的数据,自己会对每一次拉去过来的数据加上一个时间戳

  • 相关阅读:
    Mysql存储过程与存储函数
    GraphQL & Go,graphql基本知识,go-graphql使用
    ERROR 2003 (HY000) Can‘t connect to MySQL server on ‘localhost3306‘ (10061)解决办法
    批量输出生成宗地图斑界址点成果表,支持自动分页,纯FME实现
    深度学习入门(四十二)计算机视觉——目标检测和边界框
    这么分页,小心有坑
    计算机组成原理期末复习第三章-1(唐朔飞)
    基于Java+SpringBoot+Vue火车订票管理系统的设计与实现 前后端分离【Java毕业设计·文档报告·代码讲解·安装调试】
    Java基础面试题50题
    MySQL删除数据库(DROP DATABASE语句)
  • 原文地址:https://blog.csdn.net/S1124654/article/details/126159497