• 一次logstash的实践解锁了如此多的玩法....


    前言

    logstash作为elastic公司elkb体系中的一环,在日志处理中被广泛应用。它可以实现数据传输,数据过滤,格式化输出,还有强大的官方以及自定义插件功能,因而在很久之前就已经闻其大名了。也进行过简单的功能测试,对其有了基本的认知。

    但是真正的生产环境的产品化应用却不是很多,这次终于找到了一个合适的场景来真正使用下这个产品。本来一个简单的需求应该不会掀起什么波澜,但是结果却令我很意外,logstash给我留下了深刻的印象,并为我解锁了很多的玩法。下面我们就来看看全过程,顺便梳理下logstash在使用过程中的一些常规用法以及需要注意的地方。

    方案

    需求

    其实这次的需求很简单,就是把流数据中的部分种类的数据按照一定的规则重新组织,并推送到远端的硬盘按天存储。

    这个需求看起来其实很简单,最直接的方案就是在数据处理的过程中进行分支业务处理,当符合条件时,将数据进行解析并重新组织,通过文件流的方式输出到远端的硬盘即可。

    虽然逻辑很简单,但是考虑到效率,容错以及对主程序性能的影响,直接使用上面的处理方式其实并不是最佳的选择。于是,使用logstash来搞定这件事情的方案就被优先选择了。

    方案

    • 在数据处理主程序中,进行分支处理,将数据拼装好以后直接吐到kafka的topic中,主程序的任务就结束了,剩下的事情就交给logstash来处理

    • logstash读kafka,然后简单的处理数据后,直接输出到file中,问题解决

    • 使用logstash不仅将主程序数据处理与数据落盘解耦合,不会影响到主程序的效率,更重要的是logstash能将性能,容错等这些不太好处理的脏活累活都大包大揽了

    实践

    根据上述的方案,logstash的配置就很显然了。主要的选型如下:

    • input:kafka,即从kafka的topic中读取数据

    • filter:mutate,由于kafka input输出的数据格式有很多多余的字段,所以使用mutate filter来删掉这些字段,用来减少网络IO以及磁盘存储的消耗

    • output:file,即输出到硬盘存储成文件

    准备

    首先就是安装logstash了,安装步骤很简单,把tar包下载到服务器本地,tar命令解压即可。

    这里需要注意下,安装完毕后,需要使用命令查看当前版本的logstash自带插件的情况:

    .\bin\logstash-plugin list
    

    果不其然,并没有发现我需要的kafka相关的plugin,所以第一步是安装kafka相关的插件。

    进入logstash的安装目录,执行下面的命令:

    命令如下:

    1. //在线
    2. .\bin\logstash-plugin install logstash-output-logservice
    3. //离线
    4. .\bin\logstash-plugin install /root/logstash-offline-plugins.zip

    此处需要注意两点:

    • 必须在logstash的安装目录下执行命令,而不能在bin目录下执行,因为该插件托管于RubyGems,需要相关的依赖,而这些依赖都在logstash的安装目录下。

    • 离线安装必须下载logstash-offline-plugins.zip,安装插件时需要依赖此压缩包

    安装完毕后,再次使用上面插件查看的命令,发现kafka的插件安装完毕。

    至此kafka的plugin安装完毕,准备任务完成。

    first config

    环境准备完毕后,接下来就开始编写conf文件了,毕竟所有的业务和处理都依赖于这个配置文件。根据上面的业务需求,第一个配置文件内容如下:

    1. input{
    2.       kafka{
    3.         bootstrap_servers => ["172.16.45.157:9092"]
    4.         client_id => "test"
    5.         group_id => "test"
    6.         auto_offset_reset => "latest" 
    7.         consumer_threads => 5
    8.         topics => ["bb_risk_control"
    9.       }
    10. }
    11. output {
    12.     file {
    13.         ceate_if_deleted=>true
    14.         file_mode=>777
    15.         filename_failure=>"log_error"
    16.         flush_interval=>0
    17.         path=>"/data1/data/%{+YYYYMMdd}.log"
    18.         gzip=>true
    19.     }
    20. }

    第一个配置文件本着先调通流程,看看数据长啥样的目的,所以仅仅加了些常规配置,也没有加filter,然后得到的结果如下:

    1. {
    2.     "event":{
    3.         "original":"{\"country\":\"中国\",\"is_xxxxxx_xxxx\":\"1\",\"city\":\"北京\",\"user_id\":\"admin\",\"ip\":\"172.16.43.157\",\"is_xxxx\":\"0\",\"deviceID\":\"1234567890\",\"xxxxxx_num\":\"1660111513693\",\"dev_xx\":\"7423208c-25fd-3482-98b9-951dfe5af148\",\"is_xxxxxxxx\":\"0\"}"
    4.     },
    5.     "@timestamp":"2022-08-10T06:00:23.695733Z",
    6.     "message":"{\"country\":\"中国\",\"is_xxxxxx_xxxx\":\"1\",\"city\":\"北京\",\"user_id\":\"admin\",\"ip\":\"172.16.43.157\",\"is_xxxx\":\"0\",\"deviceID\":\"1234567890\",\"xxxxxx_num\":\"1660111513693\",\"dev_xx\":\"7423208c-25fd-3482-98b9-951dfe5af148\",\"is_xxxxxxxx\":\"0\"}",
    7.     "@version":"1"
    8. }

    虽然内容看起来是没问题的,但是默认的kafka input输出的默认结果字段冗余,结构混乱,这显然不是我们期望的结果,下一步就是对这些字段进行格式化以及过滤了。

    input改成json格式

    首先,通过学习发现想输出json格式的数据,最简单的就是增加input的一个属性即codec=”json“,这样数据就会按照json输出,配置文件如下:

    1. input{
    2.       kafka{
    3.         bootstrap_servers => ["172.16.45.157:9092"]
    4.         client_id => "test"
    5.         group_id => "test"
    6.         auto_offset_reset => "latest" 
    7.         consumer_threads => 5
    8.         topics => ["bb_risk_control"
    9.         codec => "json"
    10.       }
    11. }
    12. output {
    13.     file {
    14.         ceate_if_deleted=>true
    15.         file_mode=>777
    16.         filename_failure=>"log_error"
    17.         flush_interval=>0
    18.         path=>"/data1/data/%{+YYYYMMdd}.log"
    19.         gzip=>true
    20.     }
    21. }

    看看输出结果,果然好多了。

    1. {"xxxxxx_num":"1660114709040",
    2. "deviceID":"1234567890",
    3. "@timestamp":"2022-08-10T06:53:38.960117Z",
    4. "is_xxxxxx_xxxx":"1",
    5. "user_id":"admin",
    6. "is_xxxxxxxx":"0",
    7. "event":{"original":"{\"country\":\"中国\",\"is_xxxxxx_xxxx\":\"1\",\"city\":\"北京\",\"user_id\":\"admin\",\"ip\":\"172.16.43.157\",\"is_xxxx\":\"0\",\"deviceID\":\"1234567890\",\"xxxxxx_num\":\"1660114709040\",\"dev_xx\":\"7423208c-25fd-3482-98b9-951dfe5af148\",\"is_xxxxxxxx\":\"0\"}"},
    8. "is_xxxx":"0",
    9. "city":"北京",
    10. "dev_xx":"7423208c-25fd-3482-98b9-951dfe5af148",
    11. "ip":"172.16.43.157",
    12. "country":"中国",
    13. "@version":"1"}

    结果是在一级json中,但是还多了event,@timestamp以及@version字段,下一步的目标就是去掉这三个字段。

    进入误区

    过滤字段,就轮到filter模块上场了,由于需要使用的字段remove_field是filter的公共字段,所以理论上所有的filter都可以实现此功能。

    经过筛选,分别尝试了json filter以及mutate filter,但是奇怪的是不管是哪个filter都无文件输出,且日志无报错。 脑袋上出现了无数个问号,到底哪里出问题了?

    后来经过了无数的尝试,终于发现了罪魁祸首,原来是output中的生成输出文件名中的配置导致的:

    path=>"/data1/data/%{+YYYYMMdd}.log"
    

    这个YYYYMMdd是通过@timestamp字段算出来的,如果把@timestamp字段干掉,这里就无法生成数据,输出的文件名就会变成.log文件,非法的文件名,自然就不会输出数据了。

    所以当前配置下,直接干掉这几个字段是不可能了,既然如此,那就先用着吧,但是另外一个大问题又慢慢的浮出水面了。

    时区问题

    在走出误区后,又发现了logstash的另一个大问题,即时区问题。logstash默认时区为UTC,与我们东八区相差八小时,所以@timestamp字段的时间其实比我们的正常时间少八个小时,这对output中elasticsearch按天创建索引或者file按天创建文件带来了很大的影响。

    那么当前版本下如何解决时区问题呢?常规的做法就是手动处理@timestamp字段,在这个值的基础上加8小时,手动改成北京时间(这个做法仅适用于file以日期或者时间格式生成文件名的场景,elasticsearch按天创建索引可能不是很适用,因为kibana是自动帮我们解决时区的问题,结果就是又多了八小时)。

    具体的解决方式参见下面的配置:

    1. filter {
    2.   date {
    3.     match => ["message","UNIX_MS"]
    4.     target => "@timestamp"   
    5.   }
    6.   ruby { 
    7.     code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" 
    8.   }
    9.   ruby {
    10.     code => "event.set('@timestamp',event.get('timestamp'))"
    11.   }
    12.   mutate {
    13.     remove_field => ["timestamp"]
    14.   }

    此处仅粘贴了filter相关的代码,逻辑也很简单:

    • 使用date filter将message中的时间字段转化为绝对秒数,并赋值给@timestamp

    • 创建一个新字段timestamp,并将@timestamp的时间加8小时后赋值给timestamp

    • 再将timestamp的值赋值给@timestamp

    • 删除timestamp字段

    是不是很眼熟?没错,和代码中两个值互换的操作异曲同工:

    1. c=a;
    2. a=b;
    3. b=c;

    操作完后,a和b的值就互换了。

    最终版本

    时区的问题解决了,理论上就可以解决按日期生成文件名后数据不准确的问题了。但是这个方案并没有从根本上解决我的问题,主要还有以下两个问题:

    • @timestamp是操作时间,而非业务时间,如果kafka数据积压或者来了部分历史数据,那么@timestamp生成的文件名称就会有问题

    • 字段冗余的问题也未能解决

    考虑到上面两个问题,我经过尝试放弃了原本的根据YYYYMMdd生成文件名的方式,而是在input的kafka消息中增加了一个字段date_str来标识业务日期,格式是YYYYMMdd格式,文件名的生成根据该字段进行拼接。然后在filter中remove掉event,@timestamp以及@version三个字段。

    最终配置如下:

    1. input{
    2.       kafka{
    3.         bootstrap_servers => ["172.16.45.157:9092"]
    4.         client_id => "test"
    5.         group_id => "test"
    6.         auto_offset_reset => "latest"
    7.         consumer_threads => 5
    8.         decorate_events => false
    9.         topics => ["bb_risk_control"]
    10.         codec => "json"
    11.       }
    12. }
    13. filter {
    14.     mutate {
    15.         remove_field => [ "event""@version""@timestamp" ]
    16.       }
    17. }
    18. output {
    19.     file {
    20.         create_if_deleted=>true
    21.         file_mode=>777
    22.         filename_failure=>"log_error"
    23.         flush_interval=>0
    24.         path=>"/data1/data/%{date_str}.gz"
    25.         gzip=>true
    26.     }
    27. }

    至此问题解决,输出的字段完全正常,达到预期。

    1. {"dev_xx":"7423208c-25fd-3482-98b9-951dfe5af148",
    2. "city":"北京",
    3. "user_id":"admin",
    4. "deviceID":"1234567890",
    5. "is_xxxxxx_xxxx":"1",
    6. "is_xxxx":"0",
    7. "is_xxxxxxxx":"0",
    8. "ip":"172.16.43.157",
    9. "xxxxxx_num":"1660136246082",
    10. "country":"中国",
    11. "date_str":"20220810"}

    节外生枝

    上面的问题解决后,本来该志得意满的去解决下一个问题,但是中间出了个小的插曲,也让我对上述的配置做了一个优化。

    由于之前的file output的数据启用了gzip压缩,且文件名是yyyyMMdd.log。一切都很正常除了在服务器上解压缩文件,浪费了点脑筋,使用gunzip,unzip以及tar命令都无法解压缩。而使用file命令来查看文件的属性,发现确实是压缩的gzip类型的数据,一时间进入了死胡同。

    后来经过尝试后发现将20220811.log重命名成20220811.log.gz,然后再使用gunzip 20220811.log.gz对文件进行解压缩,虽然得到的文件仍然叫20220811.log,但是已经可以使用vim查看了,解压缩成功。

    由于需要使用脚本对file output的文件进行解压缩,所以重命名这个命令其实是可以省略的,于是修改logstash的file output的path属性,直接改成:

    path=>"/data1/data/%{date_str}.gz"
    

    直接输出20220811.gz文件,直接进行解压缩,但是此处解压缩必须使用-f参数即gunzip -f 20220811.gz,否则会报错:

    总结

    上文就是使用logstash实现整个需求的全过程,可以看到,需求虽小,但是涉及到的问题和内容还是很丰富的,logstash常规的问题基本上都遇到了。

    但这也是件好事,毕竟这样后续再次使用logstash的时候试错成本就会低很多。只要能正常的用起来,logstash的效率以及稳定性还是比我们手写的软件强很多,更重要的是节省了我们很多的开发以及测试工作量。

    虽然如此,但是logstash也有自己的缺点。其中最让人诟病的就是logstash耗资源较大,运行占用CPU和内存高。但是这个也是和整个硬件投入以及数据量有关的,需要具体问题具体分析。

    另外,如果是轻量级的数据采集,业务简单的话,可以直接使用filebeat,filebeat更轻量,占用资源更少。也可以将filebeat作为 Logstash Forwarder,底层通过filebeat更快速稳定轻量低耗地进行收集工作,然后根据需求可以很方便地与 Logstash还有直接与Elasticsearch进行对接。而如此高的灵活性和选择扩展性代表着不同细分场景下不同的选择,完全由业务决定,无关乎优劣,这也正是开源软件的魅力所在!

    文章到这里就结束了,最后路漫漫其修远兮,大数据之路还很漫长。如果想一起大数据的小伙伴,欢迎点赞转发加关注,下次学习不迷路,我们在大数据的路上共同前进!

  • 相关阅读:
    DataFrame API入门操作及代码展示
    【Azure API 管理】Azure APIM服务集成在内部虚拟网络后,在内部环境中打开APIM门户使用APIs中的TEST功能失败
    springboot打包笔记
    fastlio2 论文笔记
    简历编写指南及注意事项
    做期权卖方一般会怎么选择合约?
    linux screen会话管理 断开连接恢复会话
    YOLOv8-pose针对视频实时提取打印对应关节点序号及坐标
    2023年中职组“网络安全”赛项南昌市竞赛任务书
    本科阶段学习方向的建议
  • 原文地址:https://blog.csdn.net/microGP/article/details/126410000