• logstash 消费kafka数据,转发到tcp端口


    1, logstash 配置文件

    [root@host1: ]  cat /opt/logstash/kafka-to-tcp.yml
    input { 
                kafka {
                  bootstrap_servers => "192.168.0.11:9092" #这里可以是kafka集群,如"192.168.149.101:9092,192.168.149.102:9092"
                  consumer_threads => 3 #等于 topic分区数
                  group_id => "logstash_123"
                  #client_id => "logstash1" #注意,多台logstash实例消费同一个topics时,client_id需要指定不同的名字
                  #auto_offset_reset => "latest"
                  auto_offset_reset => "earliest"
                  topics => ["alertTopic1"]
                  codec => json { charset => "UTF-8" }
                 }
    }
    
    filter { 
    				 #删除某些数据:正则取反,根据json字段ruleName字段内容删除数据
    				 if ([ruleName] !~ ".*主机告警.*") {
    					drop {}
    				 } 
    				 
    				 #只保留某些数据:正则匹配,删除其他的数据
    				 #if ([ruleName] =~ ".*主机告警.*") {
    				 #   drop {}
    				 #} 
    				 
    				  mutate {  
    						#删除某些json字段, 修改某些字段内容
    						remove_field => ["eventId","ruleId"]
    						gsub => [
    									"Msg" , "[\r|\n]" , ""                   
    								]
    				 }
    }
    
    output {
                    #输出到命令行窗口,方便调试
                    #stdout{}
    
                    #输出到文件,方便排查告警漏告等问题
                    file {
                            codec =>  json_lines  { charset => "UTF-8" }
                            path => "/tmp/b.log"
                    }
    
                    #输出UMP平台对接指定的ip、端口,以指定的格式推送到UMP集中告警平台
                    tcp {
                            host => "192.168.0.11"
                            port => "514"
                            codec => plain {
                                           format =>"%{TIME} 测试环境--ruleName:%{ruleName},Msg:%{Msg}\n"
                             }
                    }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    2,调试并后台启动

    • ./bin/logstash -f /xx/xx.yml
    [root@host1: ]  cat /usr/lib/systemd/system/logstashtcp.service
    [Unit]
    Description=Logstash
    Requires=network.service
    After=network.service
    
    [Service]
    LimitNOFILE=65536
    LimitMEMLOCK=infinity
    WorkingDirectory=/opt/logstash/
    ExecStart=/bin/sh bin/logstash  -f  kafka-to-tcp.yml
    ExecReload=/bin/kill -HUP $MAINPID
    KillMode=mixed
    SuccessExitStatus=143
    Restart=on-failure
    RestartSec=42s
    
    [Install]
    WantedBy=multi-user.target
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    3, 修改logstash 服务日志路径

     sed -i.bak 's@${sys:ls.logs}@/xx/yy@' config/log4j2.properties 
     重启logstash服务
    
    • 1
    • 2
  • 相关阅读:
    Kotlin 反射获取internal class中的成员变量LiveData并绑定观察
    教学资源共享平台的设计
    数据情报分析EXCEL篇
    C++ 测试框架 GoogleTest 初学者入门篇 乙
    Vue之Vue的介绍&安装&开发实例&生命周期钩子
    学会这些VRay渲染器HDRI照明技巧,轻松搞定3ds Max
    abaqus Isight学习
    基于ReAct机制的AI Agent
    Spring如何处理线程的并发问题?
    Git基本概念
  • 原文地址:https://blog.csdn.net/eyeofeagle/article/details/132718650