目录
[doudou@hadoop102 bin]$ vim fl.sh
- #! /bin/bash
-
- case $1 in
- "start"){
- for i in hadoop102 hadoop103
- do
- echo " --------启动 $i 采集flume-------"
- ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1 &"
- done
- };;
- "stop"){
- for i in hadoop102 hadoop103
- do
- echo " --------停止 $i 采集flume-------"
- ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
- done
-
- };;
- esac
[doudou@hadoop102 bin]$ chmod 777 fl.sh
FileChannel和MemoryChannel区别
MemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。
FileChannel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。
选型:
金融类公司、对钱要求非常准确的公司通常会选择FileChannel
传输的是普通日志信息(京东内部一天丢100万-200万条,这是非常正常的),通常选择MemoryChannel。
在com.atguigu.flume.interceptor包下创建TimeStampInterceptor类
- import com.alibaba.fastjson.JSONObject;
- import org.apache.flume.Context;
- import org.apache.flume.Event;
- import org.apache.flume.interceptor.Interceptor;
-
- import java.nio.charset.StandardCharsets;
- import java.util.List;
- import java.util.Map;
-
- public class TimeStampInterceptor implements Interceptor {
- @Override
- public void initialize() {
-
- }
-
- @Override
- public Event intercept(Event event) {
- //获取header
- Map
headers=event.getHeaders(); - //获取body中的ts
- byte[] body=event.getBody();//获取body
- String log=new String(body, StandardCharsets.UTF_8);//转换编译语言
- JSONObject jsonObject=JSONObject.parseObject(log);//处理该日志
- String ts=jsonObject.getString("ts");//截取ts
-
- //将ts赋值给timestamp
- headers.put("timestamp",ts);
- return event;
- }
-
- @Override
- public List
intercept(List list) { - for (Event event:list){
- intercept(event);
- }
- return list;
- }
-
- @Override
- public void close() {
-
- }
-
- public static class Builder implements Interceptor.Builder{
-
- @Override
- public Interceptor build() {
- return new TimeStampInterceptor();
- }
-
- @Override
- public void configure(Context context) {
-
- }
- }
- }
打包上传jar包
- [doudou@hadoop104 lib]$ rz -E
-
- [doudou@hadoop104 lib]$ ls | grep interceptor
- flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
在hadoop104的/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件
[doudou@hadoop104 conf]$ vim kafka-flume-hdfs.conf
- ## 组件
- a1.sources=r1
- a1.channels=c1
- a1.sinks=k1
-
- ## source1
- a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
- a1.sources.r1.batchSize = 5000
- a1.sources.r1.batchDurationMillis = 2000
- a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
- a1.sources.r1.kafka.topics=topic_log
- a1.sources.r1.interceptors = i1
- a1.sources.r1.interceptors.i1.type = com.doudouflume.interceptor.TimeStampInterceptor$Builder
-
- ## channel1
- a1.channels.c1.type = file
- a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
- a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
-
-
- ## sink1
- a1.sinks.k1.type = hdfs
- a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
- a1.sinks.k1.hdfs.filePrefix = log-
- a1.sinks.k1.hdfs.round = false
-
- #控制生成的小文件
- a1.sinks.k1.hdfs.rollInterval = 10
- a1.sinks.k1.hdfs.rollSize = 134217728
- a1.sinks.k1.hdfs.rollCount = 0
-
- ## 控制输出文件是原生文件。
- a1.sinks.k1.hdfs.fileType = CompressedStream
- a1.sinks.k1.hdfs.codeC = lzop
-
- ## 拼装
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel= c1
[doudou@hadoop102 bin]$ vim f2.sh
- #! /bin/bash
-
- case $1 in
- "start"){
- for i in hadoop104
- do
- echo " --------启动 $i 消费flume-------"
- ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log2.txt 2>&1 &"
- done
- };;
- "stop"){
- for i in hadoop104
- do
- echo " --------停止 $i 消费flume-------"
- ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
- done
-
- };;
- esac
[doudou@hadoop102 bin]$ chmod 777 f2.sh
[doudou@hadoop102 bin]$ vim cluster.sh
- #!/bin/bash
-
- case $1 in
- "start"){
- echo ================== 启动 集群 ==================
-
- #启动 Zookeeper集群
- zk.sh start
-
- #启动 Hadoop集群
- hdp.sh start
-
- #启动 Kafka采集集群
- kf.sh start
-
- #启动 Flume采集集群
- f1.sh start
-
- #启动 Flume消费集群
- f2.sh start
-
- };;
- "stop"){
- echo ================== 停止 集群 ==================
-
- #停止 Flume消费集群
- f2.sh stop
-
- #停止 Flume采集集群
- f1.sh stop
-
- #停止 Kafka采集集群
- kf.sh stop
-
- #停止 Hadoop集群
- hdp.sh stop
-
- #停止 Zookeeper集群
- zk.sh stop
-
- };;
- esac
[doudou@hadoop102 bin]$ chmod 777 cluster.sh