• flume系列(一)部署示例及组件介绍


    1.flume定义

            Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构,灵活简单

    2. flume架构组件

    2.1 flume-source

    2.2 flume-channel

    2.3 flume-sink

    2.4 Event

    3. flume示例

    3.1官方demo

    1. #添加内容如下:
    2. # Name the components on this agent
    3. a1.sources = r1
    4. a1.sinks = k1
    5. a1.channels = c1
    6. # Describe/configure the source
    7. a1.sources.r1.type = netcat
    8. a1.sources.r1.bind = localhost
    9. a1.sources.r1.port = 44444
    10. # Describe the sink
    11. a1.sinks.k1.type = logger
    12. # Use a channel which buffers events in memory
    13. # 事务的容量transactionCapacity
    14. # 事件可以保存的数量capacity
    15. a1.channels.c1.type = memory
    16. a1.channels.c1.capacity = 1000
    17. a1.channels.c1.transactionCapacity = 100
    18. # Bind the source and sink to the channel
    19. # source和channels绑定
    20. # sinks和channel绑定
    21. # 一个source可以对应多个channel,一个channel可以对应多个sink
    22. a1.sources.r1.channels = c1
    23. a1.sinks.k1.channel = c1

    3.2 flume读取文件到hdfs

    ./bin/flume-ng agent -n a2 -c conf/ -f configs/flume-file-hdfs.conf -Dflume.root.logger=INFO,console
    1. a2.sources = r2
    2. a2.sinks = k2
    3. a2.channels = c2
    4. # Describe/configure the source
    5. a2.sources.r2.type = exec
    6. a2.sources.r2.command = tail -F /demo.log
    7. # Describe the sink
    8. a2.sinks.k2.type = hdfs
    9. a2.sinks.k2.hdfs.path = hdfs://hadoop11:8020/flume/%Y%m%d/%H
    10. #上传文件的前缀
    11. a2.sinks.k2.hdfs.filePrefix = logs-
    12. #是否按照时间滚动文件夹
    13. a2.sinks.k2.hdfs.round = true
    14. #多少时间单位创建一个新的文件夹
    15. a2.sinks.k2.hdfs.roundValue = 1
    16. #重新定义时间单位
    17. a2.sinks.k2.hdfs.roundUnit = hour
    18. #是否使用本地时间戳
    19. a2.sinks.k2.hdfs.useLocalTimeStamp = true
    20. #积攒多少个 Event 才 flush 到 HDFS 一次
    21. a2.sinks.k2.hdfs.batchSize = 100
    22. #设置文件类型,可支持压缩
    23. a2.sinks.k2.hdfs.fileType = DataStream
    24. #多久生成一个新的文件
    25. a2.sinks.k2.hdfs.rollInterval = 60
    26. #设置每个文件的滚动大小
    27. a2.sinks.k2.hdfs.rollSize = 134217700
    28. #文件的滚动与 Event 数量无关
    29. a2.sinks.k2.hdfs.rollCount = 0
    30. # Use a channel which buffers events in memory
    31. a2.channels.c2.type = memory
    32. a2.channels.c2.capacity = 1000
    33. a2.channels.c2.transactionCapacity = 100
    34. # Bind the source and sink to the channel
    35. a2.sources.r2.channels = c2
    36. a2.sinks.k2.channel = c2

    3.2 flume监测文件目录把文件传到hdfs

     目录中:

    1. 不能放入同名的文件(linux系统不能存在相同的文件 )
    2. 不能放入指定结尾的文件(不能上传对应的文件)
    3. 放入指定的忽略 的文件是无法上传的
    4. 文件动态修改后,是无法识别到的,再次修改后无法上传
    ./bin/flume-ng agent -n a3 -c conf/ -f configs/flume-dir-hdfs.conf -Dflume.root.logger=INFO,console
    1. a3.sources = r3
    2. a3.sinks = k3
    3. a3.channels = c3
    4. # Describe/configure the source
    5. a3.sources.r3.type = spooldir
    6. a3.sources.r3.spoolDir = /opt/upload
    7. a3.sources.r3.fileSuffix = .COMPLETED
    8. a3.sources.r3.fileHeader = true
    9. #忽略所有以.tmp 结尾的文件,不上传
    10. a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
    11. # Describe the sink
    12. a3.sinks.k3.type = hdfs
    13. a3.sinks.k3.hdfs.path = hdfs://hadoop11:8020/flume/upload/%Y%m%d/%H
    14. #上传文件的前缀
    15. a3.sinks.k3.hdfs.filePrefix = upload-
    16. #是否按照时间滚动文件夹
    17. a3.sinks.k3.hdfs.round = true
    18. #多少时间单位创建一个新的文件夹
    19. a3.sinks.k3.hdfs.roundValue = 1
    20. #重新定义时间单位
    21. a3.sinks.k3.hdfs.roundUnit = hour
    22. #是否使用本地时间戳
    23. a3.sinks.k3.hdfs.useLocalTimeStamp = true
    24. #积攒多少个 Event 才 flush 到 HDFS 一次
    25. a3.sinks.k3.hdfs.batchSize = 100
    26. #设置文件类型,可支持压缩
    27. a3.sinks.k3.hdfs.fileType = DataStream
    28. #多久生成一个新的文件
    29. a3.sinks.k3.hdfs.rollInterval = 60
    30. #设置每个文件的滚动大小大概是 128M
    31. a3.sinks.k3.hdfs.rollSize = 134217700
    32. #文件的滚动与 Event 数量无关
    33. a3.sinks.k3.hdfs.rollCount = 0
    34. # Use a channel which buffers events in memory
    35. a3.channels.c3.type = memory
    36. a3.channels.c3.capacity = 1000
    37. a3.channels.c3.transactionCapacity = 100
    38. # Bind the source and sink to the channel
    39. a3.sources.r3.channels = c3
    40. a3.sinks.k3.channel = c3

    3.3 监控目录下的多个追加文件(重要)

    flume-1.7版本后:Taildir Source支持断点续传,适用于多个实时追加的文件

    1. a3.sources = r3
    2. a3.sinks = k3
    3. a3.channels = c3
    4. # Describe/configure the source
    5. a3.sources.r3.type = TAILDIR
    6. a3.sources.r3.positionFile = /opt/module/flume-1.9.0/tail_dir.json
    7. a3.sources.r3.filegroups = f1 f2
    8. a3.sources.r3.filegroups.f1 = /opt/module/flume-1.9.0/upload2/files1/.*file.*
    9. a3.sources.r3.filegroups.f2 = /opt/module/flume-1.9.0/upload2/files2/.*log.*
    10. # Describe the sink
    11. a3.sinks.k3.type = hdfs
    12. a3.sinks.k3.hdfs.path = hdfs://hadoop11:8020/flume/upload2/%Y%m%d/%H
    13. #上传文件的前缀a3.sinks.k3.hdfs.filePrefix = upload-
    14. #是否按照时间滚动文件夹
    15. a3.sinks.k3.hdfs.round = true
    16. #多少时间单位创建一个新的文件夹
    17. a3.sinks.k3.hdfs.roundValue = 1
    18. #重新定义时间单位
    19. a3.sinks.k3.hdfs.roundUnit = hour
    20. #是否使用本地时间戳
    21. a3.sinks.k3.hdfs.useLocalTimeStamp = true
    22. #积攒多少个 Event 才 flush 到 HDFS 一次
    23. a3.sinks.k3.hdfs.batchSize = 100
    24. #设置文件类型,可支持压缩
    25. a3.sinks.k3.hdfs.fileType = DataStream
    26. #多久生成一个新的文件
    27. a3.sinks.k3.hdfs.rollInterval = 60
    28. #设置每个文件的滚动大小大概是 128M
    29. a3.sinks.k3.hdfs.rollSize = 134217700
    30. #文件的滚动与 Event 数量无关
    31. a3.sinks.k3.hdfs.rollCount = 0
    32. # Use a channel which buffers events in memory
    33. a3.channels.c3.type = memory
    34. a3.channels.c3.capacity = 1000
    35. a3.channels.c3.transactionCapacity = 100
    36. # Bind the source and sink to the channel
    37. a3.sources.r3.channels = c3
    38. a3.sinks.k3.channel = c3

  • 相关阅读:
    OpenShift 4 - 利用 RHSSO 实现应用认证和访问授权
    OpenCV图像处理学习十,图像的形态学操作——膨胀腐蚀
    MySQL8.0创建新用户并授权
    将本地jar打包到本地maven仓库或maven私服仓库中
    java基础09
    PID控制电机输出作为电机PWM占空比输入的理解
    迁移 MySQL 数据到 OceanBase 集群
    2022-7-5 随机过程之条件期望及其性质 (三)
    数据结构 冒泡排序 选择排序 学习笔记
    将可调用对象转换为 c 风格指针
  • 原文地址:https://blog.csdn.net/qq_38130094/article/details/126677424