• FLUME 安装配置及使用示例


    下载安装

    官网:http://flume.apache.org/

    下载 apache-flume-1.9.0-bin.tar.gz

    下载

    $ tar xzvf apache-flume-1.9.0-bin.tar.gz -C /home/hadoop/local

    $ cd /home/hadoop/local

    $ ln -s apache-flume-1.9.0-bin flume

    删除冲突的 Jar 包

    $ cd /home/hadoop/local/flume/lib

    $ rm -rf guava-11.0.2.jar

    配置环境变量

    $ vim /etc/profile.d/my_env.sh

    1. FLUME_HOME=/home/hadoop/local/flume
    2. PATH=$PATH:$FLUME_HOME/bin
    3. export FLUME_HOME PATH

    $ source /etc/proflie

    Flume 组件选型

    1)Source

    (1)Taildir Source 相比 Exec Source、Spooling Directory Source 的优势

    TailDir Source:断点续传、多目录。Flume1.6 以前需要自己自定义 Source 记录每次读取文件位置,实现断点续传。

    Exec Source:可以实时搜集数据,但是在 Flume 不运行或者 Shell 命令出错的情况下,数据将会丢失;

    Spooling Directory Source:监控目录,支持断点续传;

    (2)batchSize 大小如何设置?

    Event 1K 左右时,500 - 1000 合适(默认为 100)

    (3)KafkaSource

    从 Kafka 的 topic 中读取数据;

    2)Channel

    采用 Kafka Channel,省去了 Sink,提高了效率。KafkaChannel 数据存储在 Kafka 里面,所以数据时存储在磁盘中;

    Kafka Channel

    用 Kafka 作为一个数据缓冲,使用 Kafka Channel 数据已经写入到 Kafka 中了,就不需要再接 Kafka Sink 了;

    可使用的场景:

    • 使用 Flume 的 Source 和 Sink - 它为 events 提供可靠且高可用的通道;
    • 使用 Flume 的 Source 和拦截器,但不用 Sink - 它允许将 Flume 的 events 写入 Kafka 的 topic,以供其他应用程序使用;
    • 使用 Flume 的 Sink,但不用 Source - 它是一种低延迟、容错的方式,可将 events 从 Kafka 发送到 Flume 的 Sink,如 HDFS、HBASE 或 Solr;

    3)Sink

    Kafka Sink

    将消息发布到 Kafka 的 topic 中,相当于 Kafka 的生产者角色;

    实例

    1)Kafka Source

    场景

    通过 Kafka Source 从 Kafka 中获取数据,通过 Memory Channel,通过 Logger Sink 输出到日志文件

    Kafka Source -- Memory Channel -- Logger Sink

    配置

    配置文件中的配置都参考官网文档 https://flume.apache.org/releases/content/1.10.1/FlumeUserGuide.html

    $ mkdir /home/hadoop/local/flume/jobs

    $ vim /home/hadoop/local/flume/jobs/kafkasource.conf

    1. a1.sources = r1
    2. a1.channels = c1
    3. a1.sinks = k1
    4. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    5. a1.sources.r1.kafka.bootstrap.servers = ns1:9092,ns2:9092,ns3:9092
    6. a1.sources.r1.kafka.topics = zsoft1
    7. a1.sources.r1.kafka.consumer.group.id = flume
    8. a1.sources.r1.useFlumeEventFormat = false
    9. a1.channels.c1.type = memory
    10. a1.channels.c1.capacity = 10000
    11. a1.channels.c1.transactionCapacity = 1000
    12. a1.sinks.k1.type = logger
    13. a1.sources.r1.channels = c1
    14. a1.sinks.k1.channel = c1

    运行 Flume

    $ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkasource.conf -n a1 -Dflume.root.logger=INFO,console

    测试

    再在另外一个 shell 中启动一个生产者:

    $ kafka-console-producer.sh --topic zsoft1 --broker-list ns1:9092

    输入:

    > hello

    > zhangsan

    > lisi

    在启动 flume-ng 的窗口中看到打印出了相关信息:

    1. 2022-09-01 17:02:59,624 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { header
    2. s:{topic=zsoft1, partition=1, offset=4, timestamp=1662022979348} body: 68 65 6C 6C 6F hello }
    3. 2022-09-01 17:03:09,631 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { header
    4. s:{topic=zsoft1, partition=0, offset=6, timestamp=1662022988636} body: 7A 68 61 6E 67 73 61 6E zhangsan }
    5. 2022-09-01 17:03:09,632 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { header
    6. s:{topic=zsoft1, partition=1, offset=5, timestamp=1662022988636} body: 6C 69 73 69 lisi }

    2)Kafka Sink

    场景

    监控端口数据输入,通过 Memory Channel,通过 Kafka Sink 输出到 Kafka

    netcat Source -- Memory Channel -- Kafka Sink

    配置

    $ vim /home/hadoop/local/flume/jobs/kafkasink.conf

    1. a1.sources = r1
    2. a1.channels = c1
    3. a1.sinks = k1
    4. a1.sources.r1.type = netcat
    5. a1.sources.r1.bind = 0.0.0.0
    6. a1.sources.r1.port = 6666
    7. a1.channels.c1.type = memory
    8. a1.channels.c1.capacity = 10000
    9. a1.channels.c1.transactionCapacity = 1000
    10. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    11. a1.sinks.k1.kafka.bootstrap.servers = ns1:9092,ns2:9092,ns3:9092
    12. a1.sinks.k1.kafka.topic = zsoft1
    13. a1.sinks.k1.kafka.producer.acks = -1
    14. a1.sinks.k1.useFlumeEventFormat = false # true:保留 header,会将 header 信息也存入 kafka 中
    15. a1.sources.r1.channels = c1
    16. a1.sinks.k1.channel = c1

    运行 FLUME

    $ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkasink.conf -n a1 -Dflume.root.logger=INFO,console

    测试

    在另一个 shell 中启动一个 Kafka 的消费者:

    $ kafka-console-consumer.sh --topic zsoft1 --bootstrap-server ns1:9092

    在另一个 shell 中启动一个 nc(在 ns1 服务器上):

    $ nc localhost 6666

    在其中输入:

    hello

    zhangsan

    lisi

    在 Kafka 消费者终端中会打印出输入的内容:

    1. hello
    2. zhangsan
    3. lisi

    3)复杂 Kafka Sink(将数据发往多 topic)

    场景

    从 netcat source 获取数据,通过拦截器,通过 multiplexing channel selector 选择器,不同的内容推送到不同的 memory channel 通道中,并进入不同的 Kafka Sink,写入到对应的 Kafka 主题中;

    上面这个场景可通过 Kafka Sink 进行简化:

    Kafka Sink 可通过判断 event header 中的 topic 字段值放入对应的 topic 中;这样就不用在进入 channel 时候就分到不同的 channel 处理了;拦截器还需要有,在拦截器中要加 header 信息,选择器也可以使用简单的 replicating channel selector;

    netcat Source -- 拦截器 -- replicating channel selector -- memory Channel -- Kafka Sink

    拦截器工程

    用 IDEA 创建一个 Maven 工程 flume-interceptor

    pom.xml

    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <modelVersion>4.0.0modelVersion>
    6. <groupId>org.zsoftgroupId>
    7. <artifactId>flume-interceptorartifactId>
    8. <version>1.0-SNAPSHOTversion>
    9. <dependencies>
    10. <dependency>
    11. <groupId>org.apache.flumegroupId>
    12. <artifactId>flume-ng-coreartifactId>
    13. <version>1.9.0version>
    14. <scope>providedscope>
    15. dependency>
    16. <dependency>
    17. <groupId>com.alibabagroupId>
    18. <artifactId>fastjsonartifactId>
    19. <version>1.2.62version>
    20. dependency>
    21. dependencies>
    22. <build>
    23. <plugins>
    24. <plugin>
    25. <artifactId>maven-compiler-pluginartifactId>
    26. <version>3.8.1version>
    27. <configuration>
    28. <source>1.8source>
    29. <target>1.8target>
    30. configuration>
    31. plugin>
    32. <plugin>
    33. <artifactId>maven-assembly-pluginartifactId>
    34. <configuration>
    35. <descriptorRefs>
    36. <descriptorRef>jar-with-dependenciesdescriptorRef>
    37. descriptorRefs>
    38. configuration>
    39. <executions>
    40. <execution>
    41. <id>make-assemblyid>
    42. <phase>packagephase>
    43. <goals>
    44. <goal>singlegoal>
    45. goals>
    46. execution>
    47. executions>
    48. plugin>
    49. plugins>
    50. build>
    51. project>

    新建拦截器:

    com.zsoft.flume.interceptor/EventHeaderInterceptor.java

    1. package com.zsoft.flume.interceptor;
    2. import org.apache.flume.Context;
    3. import org.apache.flume.Event;
    4. import org.apache.flume.interceptor.Interceptor;
    5. import java.nio.charset.StandardCharsets;
    6. import java.util.List;
    7. import java.util.Map;
    8. public class EventHeaderInterceptor implements Interceptor {
    9. public void initialize() {
    10. }
    11. /*
    12. * 拦截方法
    13. */
    14. public Event intercept(Event event) {
    15. //1.获取event的headers
    16. Map headers = event.getHeaders();
    17. //2.获取event的body
    18. byte[] body1 = event.getBody();
    19. String body = new String(body1, StandardCharsets.UTF_8);
    20. //3.判断是否包含"zhangsan" "lisi"
    21. if (body.contains("zhangsan")){
    22. headers.put("topic","zhangsan");
    23. }else if(body.contains("lisi")){
    24. headers.put("topic","lisi");
    25. }
    26. return event;
    27. }
    28. public List intercept(List events) {
    29. for (Event event : events) {
    30. intercept(event);
    31. }
    32. return events;
    33. }
    34. public void close() {
    35. }
    36. public static class MyBuilder implements Builder{
    37. @Override
    38. public Interceptor build() {
    39. return new EventHeaderInterceptor();
    40. }
    41. @Override
    42. public void configure(Context context) {
    43. }
    44. }
    45. }

    对项目打包:

    Maven:clean package

    将打包后项目 target 目录下的 flume-interceptor-1.0-SNAPSHOT.jar 拷贝到 ns1 服务器 /home/hadoop/local/flume/lib/ 下

    配置

    $ vim /home/hadoop/local/flume/jobs/kafkasink-topics.conf

    1. a1.sources = r1
    2. a1.channels = c1
    3. a1.sinks = k1
    4. a1.sources.r1.type = netcat
    5. a1.sources.r1.bind = 0.0.0.0
    6. a1.sources.r1.port = 6666
    7. a1.sources.r1.interceptors = i1
    8. a1.sources.r1.interceptors.i1.type = com.zsoft.flume.interceptor.EventHeaderInterceptor$MyBuilder
    9. a1.channels.c1.type = memory
    10. a1.channels.c1.capacity = 10000
    11. a1.channels.c1.transactionCapacity = 1000
    12. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    13. a1.sinks.k1.kafka.bootstrap.servers = ns1:9092,ns2:9092,ns3:9092
    14. a1.sinks.k1.kafka.topic = other
    15. a1.sinks.k1.kafka.producer.acks = -1
    16. a1.sinks.k1.useFlumeEventFormat = false # true:保留 header,会将 header 信息也存入 kafka 中
    17. a1.sources.r1.channels = c1
    18. a1.sinks.k1.channel = c1

    启动 Flume

    $ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkasink-topics.conf -n a1 -Dflume.root.logger=INFO,console

    测试

    新开 3 个 shell 窗口,分别消费 Kafka 的 3 个 topic:zhangsan、lisi、other:

    窗口1:

    $ kafka-console-consumer.sh --topic zhangsan --bootstrap-server ns1:9092

    窗口2:

    $ kafka-console-consumer.sh --topic lisi --bootstrap-server ns1:9092

    窗口3:

    $ kafka-console-consumer.sh --topic other --bootstrap-server ns1:9092

    再在新的 shell 窗口用 nc 打开端口:

    $ nc ns1 6666

    输入 hello,在窗口 3 监控 other topic 的输出中打印出 hello

    输入 zhangsan,在窗口 1 监控 zhangsan topic 的输出中打印出 zhangsan

    输入 lisi,在窗口 2 监控 lisi topic 的输出中打印出 lisi

    4)Kafka Channel

    场景

    netcat Source -- Kafka Channel -- Logger Sink

    这种既有 Source 又有 Sink 的 Kafka Channel 使用的比较少

    配置

    $ vim /home/hadoop/local/flume/jobs/kafkachannel.conf

    1. a1.sources = r1
    2. a1.channels = c1
    3. a1.sinks = k1
    4. a1.sources.r1.type = netcat
    5. a1.sources.r1.bind = 0.0.0.0
    6. a1.sources.r1.port = 6666
    7. a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    8. a1.channels.c1.kafka.bootstrap.servers = ns1:9092,ns2:9092,ns3:9092
    9. a1.channels.c1.kafka.topic = zsoft1
    10. a1.channels.c1.parseAsFlumeEvent = false
    11. a1.sinks.k1.type = logger
    12. a1.sources.r1.channels = c1
    13. a1.sinks.k1.channel = c1

    启动 Flume

    $ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkachannel.conf -n a1 -Dflume.root.logger=INFO,console

    测试

    在另外 shell 启动 nc:

    $ nc ns1 6666

    输入:

    aaaaaa

    bbbbbb

    在 Flume 的进程中打印出:

    aaaaaa

    bbbbbb

    5)Kafka Channel no Source

    场景

    没有 Source,Kafka Channel 直接从 Kafka 获取数据

    Kafka Channel -- Logger Sink

    配置

    $ vim /home/hadoop/local/flume/jobs/kafkachannelnosource.conf

    1. a1.channels = c1
    2. a1.sinks = k1
    3. a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    4. a1.channels.c1.kafka.bootstrap.servers = ns1:9092,ns2:9092,ns3:9092
    5. a1.channels.c1.kafka.topic = zsoft1
    6. a1.channels.c1.parseAsFlumeEvent = false
    7. a1.sinks.k1.type = logger
    8. a1.sinks.k1.channel = c1

    启动 Flume

    $ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkachannelnosource.conf -n a1 -Dflume.root.logger=INFO,console

    测试

    在另一个 shell 中启动一个 Kafka 生产者:

    $ kafka-console-producer.sh --topic zsoft1 --broker-list ns1:9092

    > hello

    在 Flume 终端打印出内容:

    2022-09-02 14:25:20,222 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { header s:{} body: 68 65 6C 6C 6F hello }

    6)Kafka Channel no Sink

    场景

    把数据直接写道 Kafka

    netcat Source -- Kafka Channel

    配置

    $ /home/hadoop/local/flume/jobs/kafkachannelnosink.conf

    1. a1.sources = r1
    2. a1.channels = c1
    3. a1.sources.r1.type = netcat
    4. a1.sources.r1.bind = 0.0.0.0
    5. a1.sources.r1.port = 6666
    6. a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    7. a1.channels.c1.kafka.bootstrap.servers = ns1:9092,ns2:9092,ns3:9092
    8. a1.channels.c1.kafka.topic = zsoft1
    9. a1.channels.c1.parseAsFlumeEvent = false
    10. a1.sources.r1.channels = c1

    启动 Flume

    $ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkachannelnosink.conf -n a1 -Dflume.root.logger=INFO,console

    测试

    在另一个 shell 窗口中启动一个 Kafka 消费者

    $ kafka-console-consumer.sh --topic zosft1 --bootstrap-server ns1:9092

    在另一个 shell 窗口中打开 nc

    $ nc ns1 6666

    输入:

    hello

    在 Kafka 消费者 console 中打印出

    hello

  • 相关阅读:
    Linux增加/删除文件权限
    socket编程基础
    手把手教你做多重线性逐步回归
    简悦+Logseq 搭建本地化个人知识库
    乐鑫 ESP RainMaker® 系列文章
    科技与环卫的结合,是智慧公厕厂家的使命
    空间变形网络——STN
    SpringBoot+ElasticSearch 实现模糊查询,批量CRUD,排序,分页,高亮!
    本地安装多个node版本,gvnm来安装切换使用。vue2和vue3对node版本要求不一样
    windows nignx 常用操作命令(启动、停止、重启服务)
  • 原文地址:https://blog.csdn.net/zhy0414/article/details/126662860