• Flume(二)


    目录

    日志采集Flume启动停止脚本

    项目经验之Flume组件选型

    消费者Flume配置

    Flume时间戳拦截器

    消费者Flume启动停止脚本

     采集通道启动/停止脚本


    日志采集Flume启动停止脚本

    [doudou@hadoop102 bin]$ vim fl.sh
    
    1. #! /bin/bash
    2. case $1 in
    3. "start"){
    4. for i in hadoop102 hadoop103
    5. do
    6. echo " --------启动 $i 采集flume-------"
    7. ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1 &"
    8. done
    9. };;
    10. "stop"){
    11. for i in hadoop102 hadoop103
    12. do
    13. echo " --------停止 $i 采集flume-------"
    14. ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
    15. done
    16. };;
    17. esac
    [doudou@hadoop102 bin]$ chmod 777 fl.sh 
    

    项目经验之Flume组件选型

    FileChannel和MemoryChannel区别

    MemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。

    FileChannel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。

    选型:

    金融类公司、对钱要求非常准确的公司通常会选择FileChannel

    传输的是普通日志信息(京东内部一天丢100万-200万条,这是非常正常的),通常选择MemoryChannel。

    消费者Flume配置

    Flume时间戳拦截器

    在com.atguigu.flume.interceptor包下创建TimeStampInterceptor类

    1. import com.alibaba.fastjson.JSONObject;
    2. import org.apache.flume.Context;
    3. import org.apache.flume.Event;
    4. import org.apache.flume.interceptor.Interceptor;
    5. import java.nio.charset.StandardCharsets;
    6. import java.util.List;
    7. import java.util.Map;
    8. public class TimeStampInterceptor implements Interceptor {
    9. @Override
    10. public void initialize() {
    11. }
    12. @Override
    13. public Event intercept(Event event) {
    14. //获取header
    15. Map headers=event.getHeaders();
    16. //获取body中的ts
    17. byte[] body=event.getBody();//获取body
    18. String log=new String(body, StandardCharsets.UTF_8);//转换编译语言
    19. JSONObject jsonObject=JSONObject.parseObject(log);//处理该日志
    20. String ts=jsonObject.getString("ts");//截取ts
    21. //将ts赋值给timestamp
    22. headers.put("timestamp",ts);
    23. return event;
    24. }
    25. @Override
    26. public List intercept(List list) {
    27. for (Event event:list){
    28. intercept(event);
    29. }
    30. return list;
    31. }
    32. @Override
    33. public void close() {
    34. }
    35. public static class Builder implements Interceptor.Builder{
    36. @Override
    37. public Interceptor build() {
    38. return new TimeStampInterceptor();
    39. }
    40. @Override
    41. public void configure(Context context) {
    42. }
    43. }
    44. }

    打包上传jar包

    1. [doudou@hadoop104 lib]$ rz -E
    2. [doudou@hadoop104 lib]$ ls | grep interceptor
    3. flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

    在hadoop104的/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件

    [doudou@hadoop104 conf]$ vim kafka-flume-hdfs.conf
    
    1. ## 组件
    2. a1.sources=r1
    3. a1.channels=c1
    4. a1.sinks=k1
    5. ## source1
    6. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    7. a1.sources.r1.batchSize = 5000
    8. a1.sources.r1.batchDurationMillis = 2000
    9. a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    10. a1.sources.r1.kafka.topics=topic_log
    11. a1.sources.r1.interceptors = i1
    12. a1.sources.r1.interceptors.i1.type = com.doudouflume.interceptor.TimeStampInterceptor$Builder
    13. ## channel1
    14. a1.channels.c1.type = file
    15. a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
    16. a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
    17. ## sink1
    18. a1.sinks.k1.type = hdfs
    19. a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
    20. a1.sinks.k1.hdfs.filePrefix = log-
    21. a1.sinks.k1.hdfs.round = false
    22. #控制生成的小文件
    23. a1.sinks.k1.hdfs.rollInterval = 10
    24. a1.sinks.k1.hdfs.rollSize = 134217728
    25. a1.sinks.k1.hdfs.rollCount = 0
    26. ## 控制输出文件是原生文件。
    27. a1.sinks.k1.hdfs.fileType = CompressedStream
    28. a1.sinks.k1.hdfs.codeC = lzop
    29. ## 拼装
    30. a1.sources.r1.channels = c1
    31. a1.sinks.k1.channel= c1

    消费者Flume启动停止脚本

    [doudou@hadoop102 bin]$ vim f2.sh
    
    1. #! /bin/bash
    2. case $1 in
    3. "start"){
    4. for i in hadoop104
    5. do
    6. echo " --------启动 $i 消费flume-------"
    7. ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log2.txt 2>&1 &"
    8. done
    9. };;
    10. "stop"){
    11. for i in hadoop104
    12. do
    13. echo " --------停止 $i 消费flume-------"
    14. ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
    15. done
    16. };;
    17. esac
    [doudou@hadoop102 bin]$ chmod 777 f2.sh 
    

     采集通道启动/停止脚本

    [doudou@hadoop102 bin]$ vim cluster.sh 
    1. #!/bin/bash
    2. case $1 in
    3. "start"){
    4. echo ================== 启动 集群 ==================
    5. #启动 Zookeeper集群
    6. zk.sh start
    7. #启动 Hadoop集群
    8. hdp.sh start
    9. #启动 Kafka采集集群
    10. kf.sh start
    11. #启动 Flume采集集群
    12. f1.sh start
    13. #启动 Flume消费集群
    14. f2.sh start
    15. };;
    16. "stop"){
    17. echo ================== 停止 集群 ==================
    18. #停止 Flume消费集群
    19. f2.sh stop
    20. #停止 Flume采集集群
    21. f1.sh stop
    22. #停止 Kafka采集集群
    23. kf.sh stop
    24. #停止 Hadoop集群
    25. hdp.sh stop
    26. #停止 Zookeeper集群
    27. zk.sh stop
    28. };;
    29. esac
    [doudou@hadoop102 bin]$ chmod 777 cluster.sh

  • 相关阅读:
    Delphi时间戳转日期、日期转时间戳
    设备流程复用
    Talk | ACL‘23 杰出论文,MultiIntruct:通过多模态指令集微调提升VLM的零样本学习
    【从0-1成为架构师】对于架构你要知道的性能那些事
    2.6 动态规划—lc炒股系列
    mysql操作 sql语句中的完整性约束有哪些,主键约束、外键约束、引用完整性约束,主键外键、唯一性
    「经验总结」高效开发,老代码可以这样动
    mysql锁表的原因及mysql行锁
    gRPC使用案例-使用gRPC获取数据库版本
    普中51单片机:串口通信原理与应用指南(八)
  • 原文地址:https://blog.csdn.net/qq_61741696/article/details/126458000