我这里需要采集mysql的日志如二进制日志、通用查询日志、慢查询日志、错误日志等,使用flume能很方便的将其采集到kafka的topic中,而且能解决其中有些日志充斥大量换行等符号所导致的清洗难题。通过自定义select查询语句去为mysql日志进行结构化改造,相对网上别的办法这样做比较便捷。
我这里想要采集mysql日志所以需要第一步,若只是想采集mysql数据库表,就直接从第二步开始配就行了,首先mysql、flume、kafka安装我就略过了。
- //查看状态
- show variables like "%general%";
-
- //ON就是开启了,OFF没开启
- //然后是输出的路径(如果输出类型为file的话,日志就存在这里)
没开启的话通过修改mysql的my.cnf文件中[mysqld]
添加如下参数,将输出类型设为TABLE
- general_log=1
- log_output=TABLE
- //查看输出类型
- SHOW VARIABLES LIKE '%log_out%';
- //我这边是使用table类型,这样的话可以将日志输出为数据库表的样子,为后续flume采集提供便利

- //执行sql语句查看是否有general日志
- SELECT * FROM mysql.general_log;

这里想要通过flume采集mysql数据,flume暂时内置的功能还办不到,所以需要引入以下插件
flume-ng-sql-source-1.5.3.jar
mysql-connector-java-5.1.49-bin.jar
flume-ng-sql-source下载地址:https://github.com/keedio/flume-ng-sql-source
(这边下载是没编译过的,需要自己编译)
这部分配置有一点多,但是分块来看还是挺明了的
- # 定义sources、channels、sinks
- agent.sources=r1
- agent.channels= c1
- agent.sinks= k1
-
- # 设置source,这里type照着写,剩下能看出是设置mysql相关连接参数
- agent.sources.r1.type = org.keedio.flume.source.SQLSource
- agent.sources.r1.hibernate.connection.url=jdbc:mysql://{IP}:{PORT}/mysql
- agent.sources.r1.hibernate.connection.user=root
- agent.sources.r1.hibernate.connection.password=root
- agent.sources.r1.hibernate.connection.autocommit = true
- # mysql驱动
- agent.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQLDialect
- agent.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
- agent.sources.r1.run.query.delay=10000
- # 存放status文件
- agent.sources.r1.status.file.path = /opt/flumedata/status
- agent.sources.r1.status.file.name = sqlSource.status
- # 这个查询语句很关键,搜索结果就是你要采集的数据
- agent.sources.r1.custom.query = select `event_time`,`user_host`,`thread_id`,`server_id`,`command_type`,convert(`argument` using utf8mb4) as sql_text from mysql.general_log ORDER BY event_time desc
-
- agent.sources.r1.batch.size = 1000
- agent.sources.r1.max.rows = 1000
- # 每个查询字段中间的分割符号
- agent.sources.r1.delimiter.entry = |
-
- agent.sources.sqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
- agent.sources.sqlSource.hibernate.c3p0.min_size=1
- agent.sources.sqlSource.hibernate.c3p0.max_size=10
- agent.sources.r1.channels = c1
-
- # 剩下设置channel、sink和绑定关系啥的
-
- agent.channels.c1.type=memory
- agent.channels.c1.capacity=1000
- agent.channels.c1.transactionCapacity=1000
-
- # 设置目标kafka的ip端口号还有topic
- agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
- agent.sinks.k1.kafka.topic={mytopic
- }
- agent.sinks.k1.kafka.bootstrap.servers={ip:9092}
- agent.sinks.k1.kafka.flumeBatchSize=2000
- agent.sinks.k1.kafka.producer.acks=1
- agent.sinks.k1.channel=c1
kafka-console-consumer.sh --bootstrap-server ip:9092 --topic mytopic
