• flume采集mysql日志数据发送到kafka


    0、前言

            我这里需要采集mysql的日志如二进制日志、通用查询日志、慢查询日志、错误日志等,使用flume能很方便的将其采集到kafka的topic中,而且能解决其中有些日志充斥大量换行等符号所导致的清洗难题。通过自定义select查询语句去为mysql日志进行结构化改造,相对网上别的办法这样做比较便捷。

            我这里想要采集mysql日志所以需要第一步,若只是想采集mysql数据库表,就直接从第二步开始配就行了,首先mysql、flume、kafka安装我就略过了。

    1、首先将需要采集的mysql日志打开(我这里是用通用查询日志举例,即general_log日志)

    1. //查看状态
    2. show variables like "%general%";
    3. //ON就是开启了,OFF没开启
    4. //然后是输出的路径(如果输出类型为file的话,日志就存在这里)

          

            没开启的话通过修改mysql的my.cnf文件中[mysqld]
            添加如下参数,将输出类型设为TABLE

    1. general_log=1
    2. log_output=TABLE
    1. //查看输出类型
    2. SHOW VARIABLES LIKE '%log_out%';
    3. //我这边是使用table类型,这样的话可以将日志输出为数据库表的样子,为后续flume采集提供便利

     2、重启数据库看看是否生效

    1. //执行sql语句查看是否有general日志
    2. SELECT * FROM mysql.general_log;

     

    3、flume相关配置

    3.1、下载插件

    这里想要通过flume采集mysql数据,flume暂时内置的功能还办不到,所以需要引入以下插件

    flume-ng-sql-source-1.5.3.jar

    mysql-connector-java-5.1.49-bin.jar

    flume-ng-sql-source下载地址:https://github.com/keedio/flume-ng-sql-source

    (这边下载是没编译过的,需要自己编译)

    3.2 复制jar包放到$FLUME_HOME/lib文件下

    3.3修改flume配置文件flume-conf.properties

    这部分配置有一点多,但是分块来看还是挺明了的

    1. # 定义sources、channels、sinks
    2. agent.sources=r1
    3. agent.channels= c1
    4. agent.sinks= k1
    5. # 设置source,这里type照着写,剩下能看出是设置mysql相关连接参数
    6. agent.sources.r1.type = org.keedio.flume.source.SQLSource
    7. agent.sources.r1.hibernate.connection.url=jdbc:mysql://{IP}:{PORT}/mysql
    8. agent.sources.r1.hibernate.connection.user=root
    9. agent.sources.r1.hibernate.connection.password=root
    10. agent.sources.r1.hibernate.connection.autocommit = true
    11. # mysql驱动
    12. agent.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQLDialect
    13. agent.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
    14. agent.sources.r1.run.query.delay=10000
    15. # 存放status文件
    16. agent.sources.r1.status.file.path = /opt/flumedata/status
    17. agent.sources.r1.status.file.name = sqlSource.status
    18. # 这个查询语句很关键,搜索结果就是你要采集的数据
    19. agent.sources.r1.custom.query = select `event_time`,`user_host`,`thread_id`,`server_id`,`command_type`,convert(`argument` using utf8mb4) as sql_text from mysql.general_log ORDER BY event_time desc
    20. agent.sources.r1.batch.size = 1000
    21. agent.sources.r1.max.rows = 1000
    22. # 每个查询字段中间的分割符号
    23. agent.sources.r1.delimiter.entry = |
    24. agent.sources.sqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
    25. agent.sources.sqlSource.hibernate.c3p0.min_size=1
    26. agent.sources.sqlSource.hibernate.c3p0.max_size=10
    27. agent.sources.r1.channels = c1
    28. # 剩下设置channel、sink和绑定关系啥的
    29. agent.channels.c1.type=memory
    30. agent.channels.c1.capacity=1000
    31. agent.channels.c1.transactionCapacity=1000
    32. # 设置目标kafka的ip端口号还有topic
    33. agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
    34. agent.sinks.k1.kafka.topic={mytopic
    35. }
    36. agent.sinks.k1.kafka.bootstrap.servers={ip:9092}
    37. agent.sinks.k1.kafka.flumeBatchSize=2000
    38. agent.sinks.k1.kafka.producer.acks=1
    39. agent.sinks.k1.channel=c1

    4、启动,监听kafka的对应topic看是否有数据进来

    kafka-console-consumer.sh --bootstrap-server ip:9092 --topic mytopic

     

  • 相关阅读:
    网络编程
    java基于springboot+vue+elementui的会员制在线读书图书购物管理平台
    【翠花Vue之旅】vue打卡8
    LabVIEW合并VI
    沉睡者IT - 为你解密那些卖虚拟资源和知识付费课程的平台到底有多简单和多赚钱。
    爬虫学习——第一章 初识爬虫
    第一次前端笔试复盘(蔚来)
    AWVS使用手册
    2-网络架构和Netty系列-Java和IO网络模型
    人机交互复习专题
  • 原文地址:https://blog.csdn.net/qq_35515283/article/details/126158677