

注意位置是将来打包文件存放的位置,即我们打包好的文件在这/export/data个目录下寻找
Pom.xml文件中写入



//键入implements Interceptor{} 光标定位到Interceptor alt + enter键选择导入类导入flume的Interceptor即可 import org.apache.flume.interceptor.Interceptor;

//此时会报错,点击红色灯泡,选择 实现方法 就会在下文写出需要Override的四个抽象类

- public class MyInterceptor implements Interceptor {
-
- @Override
-
- //初始化方法
-
- public void initialize() {
-
-
- }
-
- //单个事件拦截
-
- //需求:在event的头部信息中添加标记
-
- //提供给channel selector 选择发送给不同的channel
-
-
-
- @Override
-
- public Event intercept(Event event)
-
- //Map也需要alt + enter 导入
-
- Map<String, String> headers = event.getHeaders();
-
- //输入even.getHeaders().var回车即可自行填充等号前面的变量信息
-
- String log = new String(event.getBody());
-
- //envent.getBody().var自行判断变量类型为byte,为方便使用改为String类型
-
- // 键入new String(envent.getBody()).var回车,然后根据需要自行修改变量名
-
- //判断log开头的第一个字符,字母则发到channel1,数字则发到channel2
-
- char c = log.charAt(0);
-
- //log.charAt(0).var回车即可自行填充等号前面的变量信息
-
- if(c >= '0' && c <= '9'){
-
- headers.put("type","number");
-
- }else if ((c >= 'A' && c<= 'Z') || (c >= 'a' && c <= 'z')){
-
- // 注意字符串类型要使用>=需要用单引号而不能用双引号
-
- headers.put("type","letter");
-
- }
-
- //因为头部信息属性是一个引用数据类型 直接修改对象即可,也可以不调用以下的set方法
-
- event.setHeaders(headers);
-
- //返回event
-
- return event;
-
- }
-
- //批量事件拦截(处理多个event,系统调用这个方法)
-
- @Override
-
- public List<Event> intercept(List<Event> list) {
-
-
-
- for (Event event : list){
-
- intercept(event);
-
- }
-
- return list;
-
- }
-
-
-
- //重写静态内部类Builder
-
- @Override
-
- public void close() {
-
-
- }
-
- public static class Builder implements Interceptor.Builder{
-
- //创建一个拦截器对象
-
- @Override
-
- public Interceptor build() {
-
- return new MyInterceptor();
-
- }
-
- //配置方法
-
- @Override
-
- public void configure(Context context) {
-
-
- }
-
- }
-
- }

打包完成在idea左侧菜单栏 target 中可以看到我们的包

cp /export/data/flume-interceptor-demo/target/flume-interceptor-demo-1.0-SNAPSHOT.jar $FLUME_HOME/lib

|
# agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2
# Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = node1 a1.sources.r1.port = 44444
# channel selector: multiplexing 多路复用 ;默认为replicating 复制 a1.sources.r1.selector.type = multiplexing # 填写相应inerceptor的header上的key a1.sources.r1.selector.header = type # 分配不同value发送到的channel,number到c2,letter到 c1 a1.sources.r1.selector.mapping.number = c2 a1.sources.r1.selector.mapping.letter = c1 #如果匹配不上默认选择的channel a1.sources.r1.selector.default = c2
#interceptor a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.ljr.flume.MyInterceptor$Builder
# Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = node1 a1.sinks.k1.port = 4545
a1.sinks.k2.type = avro a1.sinks.k2.hostname = node1 a1.sinks.k2.port = 4546
# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 # 接收c1中的数据 a1.sinks.k1.channel = c1 # 接收c2中的数据 a1.sinks.k2.channel = c2
|
|
a2.sources = r2 a2.sinks = k2 a2.channels = c2
# Describe/configure the source a2.sources.r2.type = avro a2.sources.r2.bind = node1 # flume1 中sink的输出端口 a2.sources.r2.port = 4545
# Describe the sink a2.sinks.k2.type = logger
# Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2 |
|
a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = avro a3.sources.r3.bind = node1 # flume1 中sink的输出端口 a3.sources.r3.port = 4546 # Describe the sink a3.sinks.k3.type = logger # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3 |
打开四个窗口,前三个分别运行flume1.conf、flume2.conf、flume3.conf 配置的进程
第四个窗口启用necat,输入内容进行测试
flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume1.conf -n a1
flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume2.conf -n a2
flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume3.conf -n a3
nc nc node1 44444 (flume1.conf中 source 填的主机名或IP地址 和端口号)
第一个窗口报错 ConnectException: 拒绝连接 可先忽略,运行二、三窗口后即可连接


在窗口4中输入数字、字母、符号

分别在窗口二看到输出字母,窗口三输出数字和符号


恭喜,Interceptor起作用!