


开始配置
rpm -qa telnet-server
rpm -qa xinetd
yum -y install telnet
yum -y install xinetd
systemctl start xinetd.service
systemctl enable xinetd.service
systemctl restart xinetd.service

JAVA_HOME=/usr/java/jdk1.8.0_152
JRE_HOME=/usr/java/jdk1.8.0_151/jre
HADOOP_HOME=/usr/local/hadoop-2.7.1
ZOOKEEPER_HOME=/usr/local/zookeeper-3.3.6
FLUME_HOME=/usr/local/flume-1.9.0
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZOOKEEPER_HOME/bin:$FLUME_HOME/bin
export PATH CLASSPATH JAVA_HOME JRE_HOME HADOOP_HOME ZOOKEEPER_HOME FLUME_HOME

lsof -i:44444
kill -9 PID

注意:最好将中文删去--(这句话也不要)
# Name the components on this agent -- a1表示agent的名字
a1.sources = r1 #表示a1的输入源
a1.sinks = k1 #表示a1的输出目的地
a1.channels = c1 #表示a1的缓冲区
# Describe/configure the source
a1.sources.r1.type = netcat #表示a1的输入源类型为netcat端口类型
a1.sources.r1.bind = localhost #表示a1的监听主机
a1.sources.r1.port = 44444 #表示a1的监听端口号
# Describe the sink
a1.sinks.k1.type = logger #表示a1的输出目的地时控制台logger类型
# Use a channel which buffers events in memory
a1.channels.c1.type = memory #表示a1的缓冲区类型是内存型
a1.channels.c1.capacity = 1000 #表示a1的缓冲区总容量是1000个event
a1.channels.c1.transactionCapacity = 100 #表示a1的缓冲区之前收集到100条event后再提交事务
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 #连接source与channel
a1.sinks.k1.channel = c1 #连接sink与channel
命令解析
--conf conf:配置文件目录
--conf-file jobs/t1/flume-telnet-logger.conf:flume本次启动读取的配置文件位置
--name a1:agent的名字
-Dflume.root.logger==INFO,console
-D表示flume运行时动态修改flume.root.logger参数属性值
将控制台日志打印级别设置为INFO级别




开始配置


a2.sources=r2
a2.sinks=k2
a2.channels=c2
a2.sources.r2.type=exec
a2.sources.r2.command=tail -F /usr/local/hadoop-2.7.1/logs/hadoop-root-namenode-hadoop100.log #Source读取数据前执行的命令
a2.sources.r2.shell=/bin/bash -c
a2.sources.r2.batchSize=10
a2.sources.r2.batchTimeout=2000
a2.sinks.k2.type=hdfs
a2.sinks.k2.hdfs.path=hdfs://node1:8020/flume/%Y%m%d/%H #hdfs路径 /年月日/小时
a2.sinks.k2.hdfs.filePrefix=logs- #文件前缀
a2.sinks.k2.hdfs.round=true #轮询
a2.sinks.k2.hdfs.roundValue=1 #时间间隔
a2.sinks.k2.hdfs.roundUnit = hour #重新定义时间单位
a2.sinks.k2.hdfs.useLocalTimeStamp = true #是否使用本地时间戳
a2.sinks.k2.hdfs.batchSize = 100 #积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.fileType = DataStream #设置文件类型,可支持压缩
a2.sinks.k2.hdfs.rollInterval = 600 #多久生成一个新的文件
a2.sinks.k2.hdfs.rollSize = 134217700 #设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollCount = 0 #文件的滚动与Event数量无关
a2.sinks.k2.hdfs.minBlockReplicas = 1 #最小冗余数
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 1000
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2



开始配置
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
a1.sources.r1.selector.type = replicating # 将数据流复制给所有channel
a1.sources.r1.selector.optional = c2 #若c2写入流错误则系统忽略,但c1错误则回滚
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /tmp/a.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node1
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
JAVA_HOME=/usr/java/jdk1.8.0_152
JRE_HOME=/usr/java/jdk1.8.0_151/jre
HADOOP_HOME=/usr/local/hadoop-2.7.1
ZOOKEEPER_HOME=/usr/local/zookeeper-3.3.6
FLUME_HOME=/usr/local/flume-1.9.0
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZOOKEEPER_HOME/bin:$FLUME_HOME/bin
export PATH CLASSPATH JAVA_HOME JRE_HOME HADOOP_HOME ZOOKEEPER_HOME FLUME_HOME
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = node3
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://node3:8020/flume2/%Y%m%d/%H
a2.sinks.k1.hdfs.filePrefix = flume2- #上传文件的前缀
a2.sinks.k1.hdfs.round = true #是否按照时间滚动文件夹
a2.sinks.k1.hdfs.roundValue = 1 #多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundUnit = hour #重新定义时间单位
a2.sinks.k1.hdfs.useLocalTimeStamp = true #是否使用本地时间戳
a2.sinks.k1.hdfs.batchSize = 100 #积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.fileType = DataStream #设置文件类型,可支持压缩
a2.sinks.k1.hdfs.rollInterval = 600 #多久生成一个新的文件
a2.sinks.k1.hdfs.rollSize = 134217700 #设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollCount = 0 #文件的滚动与Event数量无关
a2.sinks.k1.hdfs.minBlockReplicas = 1 #最小冗余数
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = node3
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /tmp/flumedatatest
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2



开始配置
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node1
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
-------------------------------------
a1.sinkgroups = g1
为了消除数据处理管道中的单点故障,Flume可以使用负载平衡或故障转移策略,将event发送到不同的sink
sink组是用来创建逻辑上的一组sink,这个组的行为是由sink处理器来决定的,它决定了event的路由策略
a1.sinkgroups.g1.processor.type = load_balance #负载均衡,除了这个还有default, failover(故障转移)
a1.sinkgroups.g1.processor.backoff = true #Should failed sinks be backed off exponentially
a1.sinkgroups.g1.processor.selector = round_robin #负载均衡策略
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = node1
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = logger
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = node1
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2






开始配置
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /tmp/a.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node4
a1.sinks.k1.port = 4141
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 44444
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = node4
a2.sinks.k1.port = 4141
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = node4
a3.sources.r1.port = 4141
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1


