• Idea中flume的Interceptor的编写教程


    1.新建-项目-新建项目

    注意位置是将来打包文件存放的位置,即我们打包好的文件在这/export/data个目录下寻找

    2. 在maven项目中导入依赖

    Pom.xml文件中写入

           

                org.apache.flume

                flume-ng-core

                1.9.0

           

       

    3.创建包(scr-main-java右键-新建-软件包)

    4.创建Java类(右键包名-新建-java类)

    5. 继承(implements)flume 的拦截器接口

    //键入implements Interceptor{} 光标定位到Interceptor alt + enter键选择导入类导入flume的Interceptor即可 import org.apache.flume.interceptor.Interceptor;

        //此时会报错,点击红色灯泡,选择 实现方法 就会在下文写出需要Override的四个抽象类

    6.实现方法

    1. public class MyInterceptor implements Interceptor {
    2.     @Override
    3.     //初始化方法
    4.     public void initialize() {
    5.     }
    6. //单个事件拦截
    7. //需求:在event的头部信息中添加标记
    8.  //提供给channel selector 选择发送给不同的channel
    9.     @Override
    10. public Event intercept(Event event)
    11.        //Map也需要alt + enter 导入
    12.         Map<String, String> headers = event.getHeaders();
    13.         //输入even.getHeaders().var回车即可自行填充等号前面的变量信息
    14.         String log = new String(event.getBody());
    15.         //envent.getBody().var自行判断变量类型为byte,为方便使用改为String类型
    16.         // 键入new String(envent.getBody()).var回车,然后根据需要自行修改变量名
    17.         //判断log开头的第一个字符,字母则发到channel1,数字则发到channel2
    18.         char c = log.charAt(0);
    19.         //log.charAt(0).var回车即可自行填充等号前面的变量信息
    20.         if(c >= '0' && c <= '9'){
    21.             headers.put("type","number");
    22.         }else if ((c >= 'A' && c<= 'Z') || (c >= 'a' && c <= 'z')){
    23.             // 注意字符串类型要使用>=需要用单引号而不能用双引号
    24.             headers.put("type","letter");
    25.         }
    26.         //因为头部信息属性是一个引用数据类型 直接修改对象即可,也可以不调用以下的set方法   
    27.         event.setHeaders(headers);
    28.         //返回event
    29.         return event;
    30.     }
    31.     //批量事件拦截(处理多个event,系统调用这个方法)
    32.     @Override
    33.     public List<Event> intercept(List<Event> list) {
    34.         for (Event event : list){
    35.             intercept(event);
    36.         }
    37.         return list;
    38.     }
    39. //重写静态内部类Builder
    40.     @Override
    41.     public void close() {
    42. }
    43. public static class  Builder implements Interceptor.Builder{
    44.        //创建一个拦截器对象
    45.         @Override
    46.         public Interceptor build() {
    47.             return new MyInterceptor();
    48.         }
    49.        //配置方法
    50.         @Override
    51.         public void configure(Context context) {
    52.         }
    53.     }
    54. }

    7.打包(idea右侧菜单栏maven-生命周期-package)

    打包完成在idea左侧菜单栏 target 中可以看到我们的包

    8.将建好的包复制到flume家目录下的lib中即可使用

    cp /export/data/flume-interceptor-demo/target/flume-interceptor-demo-1.0-SNAPSHOT.jar $FLUME_HOME/lib

    9.测试

     9.1 编辑 flume 配置文件
           vim flume1.conf

    # agent

    a1.sources = r1

    a1.sinks = k1 k2

    a1.channels = c1 c2

    # Describe/configure the source

    a1.sources.r1.type = netcat

    a1.sources.r1.bind = node1

    a1.sources.r1.port = 44444

    # channel selector: multiplexing 多路复用 ;默认为replicating 复制

    a1.sources.r1.selector.type = multiplexing

    # 填写相应inerceptor的header上的key

    a1.sources.r1.selector.header = type

    # 分配不同value发送到的channel,number到c2,letter到 c1

    a1.sources.r1.selector.mapping.number = c2

    a1.sources.r1.selector.mapping.letter = c1

    #如果匹配不上默认选择的channel

    a1.sources.r1.selector.default = c2

    #interceptor

    a1.sources.r1.interceptors = i1

    a1.sources.r1.interceptors.i1.type = com.ljr.flume.MyInterceptor$Builder

    # Describe the sink

    a1.sinks.k1.type = avro

    a1.sinks.k1.hostname = node1

    a1.sinks.k1.port = 4545

    a1.sinks.k2.type = avro

    a1.sinks.k2.hostname = node1

    a1.sinks.k2.port = 4546

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    a1.channels.c2.type = memory

    a1.channels.c2.capacity = 1000

    a1.channels.c2.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1 c2

    # 接收c1中的数据

    a1.sinks.k1.channel = c1

    # 接收c2中的数据

    a1.sinks.k2.channel = c2

       vim flume2.conf

    a2.sources = r2

    a2.sinks = k2

    a2.channels = c2

    # Describe/configure the source

    a2.sources.r2.type = avro

    a2.sources.r2.bind = node1

    # flume1 中sink的输出端口

    a2.sources.r2.port = 4545

    # Describe the sink

    a2.sinks.k2.type = logger

    # Use a channel which buffers events in memory

    a2.channels.c2.type = memory

    a2.channels.c2.capacity = 1000

    a2.channels.c2.transactionCapacity = 100

    # Bind the source and sink to the channel

    a2.sources.r2.channels = c2

    a2.sinks.k2.channel = c2

    vim flume3.conf

    a3.sources = r3

    a3.sinks = k3

    a3.channels = c3

    # Describe/configure the source

    a3.sources.r3.type = avro

    a3.sources.r3.bind = node1

    # flume1 中sink的输出端口

    a3.sources.r3.port = 4546

    # Describe the sink

    a3.sinks.k3.type = logger

    # Use a channel which buffers events in memory

    a3.channels.c3.type = memory

    a3.channels.c3.capacity = 1000

    a3.channels.c3.transactionCapacity = 100

    # Bind the source and sink to the channel

    a3.sources.r3.channels = c3

    a3.sinks.k3.channel = c3

    9.2测试

           打开四个窗口,前三个分别运行flume1.conf、flume2.conf、flume3.conf 配置的进程

    第四个窗口启用necat,输入内容进行测试

    flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume1.conf -n a1

    flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume2.conf -n a2

    flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume3.conf -n a3

    nc nc node1 44444  (flume1.conf中 source 填的主机名或IP地址 和端口号)

    第一个窗口报错 ConnectException: 拒绝连接 可先忽略,运行二、三窗口后即可连接

    在窗口4中输入数字、字母、符号

    分别在窗口二看到输出字母,窗口三输出数字和符号

    恭喜,Interceptor起作用!

  • 相关阅读:
    java.util.Date在json的格式化上的研究与成果
    Linux 用户管理操作指令
    【Web漏洞探索】外部实体注入漏洞
    腾讯云双11服务器优惠活动价格表预热!
    意大利Eni公司将与法国PASQAL公司共同研究能源量子计算
    MySQL基础篇之多表查询(内连接、外连接、自连接、子查询、union)
    [Web Server]Tomcat调优之SpringBoot内嵌Tomcat源码分析
    springboot:slf4j+logback日志
    node笔记记录35ES模块化规范3
    Python算法——树的直径
  • 原文地址:https://blog.csdn.net/v15220/article/details/139047118