官网:http://flume.apache.org/
下载 apache-flume-1.9.0-bin.tar.gz
$ tar xzvf apache-flume-1.9.0-bin.tar.gz -C /home/hadoop/local
$ cd /home/hadoop/local
$ ln -s apache-flume-1.9.0-bin flume
$ cd /home/hadoop/local/flume/lib
$ rm -rf guava-11.0.2.jar
$ vim /etc/profile.d/my_env.sh
- FLUME_HOME=/home/hadoop/local/flume
- PATH=$PATH:$FLUME_HOME/bin
- export FLUME_HOME PATH
$ source /etc/proflie
TailDir Source:断点续传、多目录。Flume1.6 以前需要自己自定义 Source 记录每次读取文件位置,实现断点续传。
Exec Source:可以实时搜集数据,但是在 Flume 不运行或者 Shell 命令出错的情况下,数据将会丢失;
Spooling Directory Source:监控目录,支持断点续传;
Event 1K 左右时,500 - 1000 合适(默认为 100)
从 Kafka 的 topic 中读取数据;
采用 Kafka Channel,省去了 Sink,提高了效率。KafkaChannel 数据存储在 Kafka 里面,所以数据时存储在磁盘中;
用 Kafka 作为一个数据缓冲,使用 Kafka Channel 数据已经写入到 Kafka 中了,就不需要再接 Kafka Sink 了;
可使用的场景:
将消息发布到 Kafka 的 topic 中,相当于 Kafka 的生产者角色;
通过 Kafka Source 从 Kafka 中获取数据,通过 Memory Channel,通过 Logger Sink 输出到日志文件
Kafka Source -- Memory Channel -- Logger Sink
配置文件中的配置都参考官网文档 https://flume.apache.org/releases/content/1.10.1/FlumeUserGuide.html
$ mkdir /home/hadoop/local/flume/jobs
$ vim /home/hadoop/local/flume/jobs/kafkasource.conf
- a1.sources = r1
- a1.channels = c1
- a1.sinks = k1
-
- a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
- a1.sources.r1.kafka.bootstrap.servers = ns1:9092,ns2:9092,ns3:9092
- a1.sources.r1.kafka.topics = zsoft1
- a1.sources.r1.kafka.consumer.group.id = flume
- a1.sources.r1.useFlumeEventFormat = false
-
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 10000
- a1.channels.c1.transactionCapacity = 1000
-
- a1.sinks.k1.type = logger
-
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkasource.conf -n a1 -Dflume.root.logger=INFO,console
再在另外一个 shell 中启动一个生产者:
$ kafka-console-producer.sh --topic zsoft1 --broker-list ns1:9092
输入:
> hello
> zhangsan
> lisi
在启动 flume-ng 的窗口中看到打印出了相关信息:
- 2022-09-01 17:02:59,624 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { header
- s:{topic=zsoft1, partition=1, offset=4, timestamp=1662022979348} body: 68 65 6C 6C 6F hello }
- 2022-09-01 17:03:09,631 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { header
- s:{topic=zsoft1, partition=0, offset=6, timestamp=1662022988636} body: 7A 68 61 6E 67 73 61 6E zhangsan }
- 2022-09-01 17:03:09,632 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { header
- s:{topic=zsoft1, partition=1, offset=5, timestamp=1662022988636} body: 6C 69 73 69 lisi }
监控端口数据输入,通过 Memory Channel,通过 Kafka Sink 输出到 Kafka
netcat Source -- Memory Channel -- Kafka Sink
$ vim /home/hadoop/local/flume/jobs/kafkasink.conf
- a1.sources = r1
- a1.channels = c1
- a1.sinks = k1
-
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 6666
-
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 10000
- a1.channels.c1.transactionCapacity = 1000
-
- a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
- a1.sinks.k1.kafka.bootstrap.servers = ns1:9092,ns2:9092,ns3:9092
- a1.sinks.k1.kafka.topic = zsoft1
- a1.sinks.k1.kafka.producer.acks = -1
- a1.sinks.k1.useFlumeEventFormat = false # true:保留 header,会将 header 信息也存入 kafka 中
-
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkasink.conf -n a1 -Dflume.root.logger=INFO,console
在另一个 shell 中启动一个 Kafka 的消费者:
$ kafka-console-consumer.sh --topic zsoft1 --bootstrap-server ns1:9092
在另一个 shell 中启动一个 nc(在 ns1 服务器上):
$ nc localhost 6666
在其中输入:
hello
zhangsan
lisi
在 Kafka 消费者终端中会打印出输入的内容:
- hello
- zhangsan
- lisi
从 netcat source 获取数据,通过拦截器,通过 multiplexing channel selector 选择器,不同的内容推送到不同的 memory channel 通道中,并进入不同的 Kafka Sink,写入到对应的 Kafka 主题中;
上面这个场景可通过 Kafka Sink 进行简化:
Kafka Sink 可通过判断 event header 中的 topic 字段值放入对应的 topic 中;这样就不用在进入 channel 时候就分到不同的 channel 处理了;拦截器还需要有,在拦截器中要加 header 信息,选择器也可以使用简单的 replicating channel selector;
netcat Source -- 拦截器 -- replicating channel selector -- memory Channel -- Kafka Sink
用 IDEA 创建一个 Maven 工程 flume-interceptor
pom.xml
- "1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0modelVersion>
-
- <groupId>org.zsoftgroupId>
- <artifactId>flume-interceptorartifactId>
- <version>1.0-SNAPSHOTversion>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flumegroupId>
- <artifactId>flume-ng-coreartifactId>
- <version>1.9.0version>
- <scope>providedscope>
- dependency>
-
- <dependency>
- <groupId>com.alibabagroupId>
- <artifactId>fastjsonartifactId>
- <version>1.2.62version>
- dependency>
- dependencies>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-compiler-pluginartifactId>
- <version>3.8.1version>
- <configuration>
- <source>1.8source>
- <target>1.8target>
- configuration>
- plugin>
- <plugin>
- <artifactId>maven-assembly-pluginartifactId>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependenciesdescriptorRef>
- descriptorRefs>
- configuration>
- <executions>
- <execution>
- <id>make-assemblyid>
- <phase>packagephase>
- <goals>
- <goal>singlegoal>
- goals>
- execution>
- executions>
- plugin>
- plugins>
- build>
- project>
新建拦截器:
com.zsoft.flume.interceptor/EventHeaderInterceptor.java
- package com.zsoft.flume.interceptor;
-
- import org.apache.flume.Context;
- import org.apache.flume.Event;
- import org.apache.flume.interceptor.Interceptor;
-
- import java.nio.charset.StandardCharsets;
- import java.util.List;
- import java.util.Map;
-
- public class EventHeaderInterceptor implements Interceptor {
- public void initialize() {
-
- }
-
- /*
- * 拦截方法
- */
- public Event intercept(Event event) {
- //1.获取event的headers
- Map
headers = event.getHeaders(); - //2.获取event的body
- byte[] body1 = event.getBody();
- String body = new String(body1, StandardCharsets.UTF_8);
- //3.判断是否包含"zhangsan" "lisi"
- if (body.contains("zhangsan")){
- headers.put("topic","zhangsan");
- }else if(body.contains("lisi")){
- headers.put("topic","lisi");
- }
- return event;
- }
-
- public List
intercept(List events) { - for (Event event : events) {
- intercept(event);
- }
- return events;
- }
-
- public void close() {
-
- }
-
- public static class MyBuilder implements Builder{
-
- @Override
- public Interceptor build() {
- return new EventHeaderInterceptor();
- }
-
- @Override
- public void configure(Context context) {
-
- }
- }
- }
对项目打包:
Maven:clean package
将打包后项目 target 目录下的 flume-interceptor-1.0-SNAPSHOT.jar 拷贝到 ns1 服务器 /home/hadoop/local/flume/lib/ 下
$ vim /home/hadoop/local/flume/jobs/kafkasink-topics.conf
- a1.sources = r1
- a1.channels = c1
- a1.sinks = k1
-
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 6666
-
- a1.sources.r1.interceptors = i1
- a1.sources.r1.interceptors.i1.type = com.zsoft.flume.interceptor.EventHeaderInterceptor$MyBuilder
-
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 10000
- a1.channels.c1.transactionCapacity = 1000
-
- a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
- a1.sinks.k1.kafka.bootstrap.servers = ns1:9092,ns2:9092,ns3:9092
- a1.sinks.k1.kafka.topic = other
- a1.sinks.k1.kafka.producer.acks = -1
- a1.sinks.k1.useFlumeEventFormat = false # true:保留 header,会将 header 信息也存入 kafka 中
-
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkasink-topics.conf -n a1 -Dflume.root.logger=INFO,console
新开 3 个 shell 窗口,分别消费 Kafka 的 3 个 topic:zhangsan、lisi、other:
窗口1:
$ kafka-console-consumer.sh --topic zhangsan --bootstrap-server ns1:9092
窗口2:
$ kafka-console-consumer.sh --topic lisi --bootstrap-server ns1:9092
窗口3:
$ kafka-console-consumer.sh --topic other --bootstrap-server ns1:9092
再在新的 shell 窗口用 nc 打开端口:
$ nc ns1 6666
输入 hello,在窗口 3 监控 other topic 的输出中打印出 hello
输入 zhangsan,在窗口 1 监控 zhangsan topic 的输出中打印出 zhangsan
输入 lisi,在窗口 2 监控 lisi topic 的输出中打印出 lisi
netcat Source -- Kafka Channel -- Logger Sink
这种既有 Source 又有 Sink 的 Kafka Channel 使用的比较少
$ vim /home/hadoop/local/flume/jobs/kafkachannel.conf
- a1.sources = r1
- a1.channels = c1
- a1.sinks = k1
-
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 6666
-
- a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
- a1.channels.c1.kafka.bootstrap.servers = ns1:9092,ns2:9092,ns3:9092
- a1.channels.c1.kafka.topic = zsoft1
- a1.channels.c1.parseAsFlumeEvent = false
-
- a1.sinks.k1.type = logger
-
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkachannel.conf -n a1 -Dflume.root.logger=INFO,console
在另外 shell 启动 nc:
$ nc ns1 6666
输入:
aaaaaa
bbbbbb
在 Flume 的进程中打印出:
aaaaaa
bbbbbb
没有 Source,Kafka Channel 直接从 Kafka 获取数据
Kafka Channel -- Logger Sink
$ vim /home/hadoop/local/flume/jobs/kafkachannelnosource.conf
- a1.channels = c1
- a1.sinks = k1
-
- a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
- a1.channels.c1.kafka.bootstrap.servers = ns1:9092,ns2:9092,ns3:9092
- a1.channels.c1.kafka.topic = zsoft1
- a1.channels.c1.parseAsFlumeEvent = false
-
- a1.sinks.k1.type = logger
-
- a1.sinks.k1.channel = c1
$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkachannelnosource.conf -n a1 -Dflume.root.logger=INFO,console
在另一个 shell 中启动一个 Kafka 生产者:
$ kafka-console-producer.sh --topic zsoft1 --broker-list ns1:9092
> hello
在 Flume 终端打印出内容:
2022-09-02 14:25:20,222 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { header s:{} body: 68 65 6C 6C 6F hello }
把数据直接写道 Kafka
netcat Source -- Kafka Channel
$ /home/hadoop/local/flume/jobs/kafkachannelnosink.conf
- a1.sources = r1
- a1.channels = c1
-
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 6666
-
- a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
- a1.channels.c1.kafka.bootstrap.servers = ns1:9092,ns2:9092,ns3:9092
- a1.channels.c1.kafka.topic = zsoft1
- a1.channels.c1.parseAsFlumeEvent = false
-
- a1.sources.r1.channels = c1
$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkachannelnosink.conf -n a1 -Dflume.root.logger=INFO,console
在另一个 shell 窗口中启动一个 Kafka 消费者
$ kafka-console-consumer.sh --topic zosft1 --bootstrap-server ns1:9092
在另一个 shell 窗口中打开 nc
$ nc ns1 6666
输入:
hello
在 Kafka 消费者 console 中打印出
hello