• 离线数仓 (四) --------- 用户行为数据采集模块



    一、集群所有进程查看脚本

    A、在 /home/fancy/bin 目录下创建脚本 xcall.sh

    [fancy@node1 bin]$ vim xcall.sh
    
    • 1

    B、在脚本中编写如下内容

    #! /bin/bash
     
    for i in node101 node102 node103
    do
        echo --------- $i ----------
        ssh $i "$*"
    done
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    C、修改脚本执行权限

    [fancy@node101 bin]$ chmod 777 xcall.sh
    
    • 1

    D、启动脚本

    [fancy@node101 bin]$ xcall.sh jps
    
    • 1

    二、Hadoop安装

    具体安装过程详见 Hadoop (四) --------- Hadoop 运行模式 这篇文章。。。

    集群规划:

    服务器 node101服务器 node102服务器103
    HDFSNameNode、DataNodeDataNodeDataNode、SecondaryNameNode
    YarnNodeManagerResourcemanager、NodeManagerNodeManager

    注意:尽量使用离线方式安装

    1. HDFS存储多目录

    A、给Linux系统新增加一块硬盘

    参考:https://www.cnblogs.com/yujianadu/p/10750698.html

    B、生产环境服务器磁盘情况

    在这里插入图片描述

    C、在hdfs-site.xml文件中配置多目录,注意新挂载磁盘的访问权限问题

    HDFS 的 DataNode 节点保存数据的路径由 dfs.datanode.data.dir 参数决定,其默认值为 file://${hadoop.tmp.dir}/dfs/data,若服务器有多个磁盘,必须对该参数进行修改。如服务器磁盘如上图所示,则该参数应修改为如下的值。

    <property>
        <name>dfs.datanode.data.dirname>
    	<value>file:///dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4value>
    property>
    
    • 1
    • 2
    • 3
    • 4

    注意:因为每台服务器节点的磁盘情况不同,所以这个配置配完之后,不需要分发。

    2. 集群数据均衡

    A、节点间数据均衡

    开启数据均衡命令

    start-balancer.sh -threshold 10
    
    • 1

    对于参数10,代表的是集群中各个节点的磁盘空间利用率相差不超过10%,可根据实际情况进行调整。

    停止数据均衡命令

    stop-balancer.sh
    
    • 1

    注意:于HDFS需要启动单独的 Rebalance Server 来执行 Rebalance 操作,所以尽量不要在 NameNode 上执行 start-balancer.sh,而是找一台比较空闲的机器。

    B、磁盘间数据均衡

    生成均衡计划 (我们只有一块磁盘,不会生成计划)

    hdfs diskbalancer -plan node102
    
    • 1

    执行均衡计划

    hdfs diskbalancer -execute node102.plan.json
    
    • 1

    查看当前均衡任务的执行情况

    hdfs diskbalancer -query node102
    
    • 1

    取消均衡任务

    hdfs diskbalancer -cancel node102.plan.json
    
    • 1

    3. 支持 LZO 压缩配置

    A、hadoop-lzo编译

    hadoop 本身并不支持 lzo 压缩,故需要使用 twitter 提供的 hadoop-lzo 开源组件。hadoop-lzo 需依赖
    hadoop 和 lzo 进行编译,编译步骤如下。

    0. 环境准备
    maven (下载安装,配置环境变量,修改sitting.xml加阿里云镜像)
    gcc-c++
    zlib-devel
    autoconf
    automake
    libtool
    通过yum安装即可, yum -y install gcc-c++ lzo-devel zlib-devel autoconf automake libtool
    
    1. 下载、安装并编译LZO
    
    wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz
    
    tar -zxvf lzo-2.10.tar.gz
    
    cd lzo-2.10
    
    ./configure -prefix=/usr/local/hadoop/lzo/
    
    make
    
    make install
    
    2. 编译hadoop-lzo源码
    
    2.1 下载hadoop-lzo的源码,下载地址:https://github.com/twitter/hadoop-lzo/archive/master.zip
    2.2 解压之后,修改pom.xml
         <hadoop.current.version>3.1.3</hadoop.current.version>
    2.3 声明两个临时环境变量
         export C_INCLUDE_PATH=/usr/local/hadoop/lzo/include
         export LIBRARY_PATH=/usr/local/hadoop/lzo/lib 
    2.4 编译
         进入hadoop-lzo-master,执行maven编译命令
         mvn package -Dmaven.test.skip=true
    2.5 进入target,hadoop-lzo-0.4.21-SNAPSHOT.jar 即编译成功的hadoop-lzo组件
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    B、将编译好后的 hadoop-lzo-0.4.20.jar 放入 hadoop-3.1.3/share/hadoop/common/

    [fancy@node101 common]$ pwd
    /opt/module/hadoop-3.1.3/share/hadoop/common
    
    • 1
    • 2
    [fancy@node101 common]$ ls
    hadoop-lzo-0.4.20.jar
    
    • 1
    • 2

    C、同步 hadoop-lzo-0.4.20.jar 到 node102、node103

    [fancy@node101 common]$ xsync hadoop-lzo-0.4.20.jar
    
    • 1

    D、core-site.xml增加配置支持LZO压缩

    <configuration>
        <property>
            <name>io.compression.codecsname>
            <value>
                org.apache.hadoop.io.compress.GzipCodec,
                org.apache.hadoop.io.compress.DefaultCodec,
                org.apache.hadoop.io.compress.BZip2Codec,
                org.apache.hadoop.io.compress.SnappyCodec,
                com.hadoop.compression.lzo.LzoCodec,
                com.hadoop.compression.lzo.LzopCodec
            value>
        property>
    
        <property>
            <name>io.compression.codec.lzo.classname>
            <value>com.hadoop.compression.lzo.LzoCodecvalue>
        property>
    configuration>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    E、同步 core-site.xml 到 node102、node103

    [fancy@node101 hadoop]$ xsync core-site.xml
    
    • 1

    F、启动及查看集群

    [fancy@node101 hadoop-3.1.3]$ sbin/start-dfs.sh
    [fancy@node102 hadoop-3.1.3]$ sbin/start-yarn.sh
    
    • 1
    • 2

    G、测试-数据准备

    [fancy@node101 hadoop-3.1.3]$ hadoop fs -mkdir /input
    [fancy@node101 hadoop-3.1.3]$ hadoop fs -put README.txt /input
    
    • 1
    • 2

    H、测试-压缩

    [fancy@node101 hadoop-3.1.3]$ hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=com.hadoop.compression.lzo.LzopCodec  /input /output
    
    • 1

    4. LZO 创建索引

    A、创建LZO文件的索引

    LZO 压缩文件的可切片特性依赖于其索引,故我们需要手动为LZO压缩文件创建索引。若无索引,则LZO 文件的切片只有一个。

    hadoop jar /path/to/your/hadoop-lzo.jar com.hadoop.compression.lzo.DistributedLzoIndexer big_file.lzo
    
    • 1

    B、测试

    将 bigtable.lzo (200M) 上传到集群的根目录

    [fancy@node101 module]$ hadoop fs -mkdir /input
    [fancy@node101 module]$ hadoop fs -put bigtable.lzo /input
    
    • 1
    • 2

    执行 wordcount 程序

    [fancy@node101 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount -Dmapreduce.job.inputformat.class=com.hadoop.mapreduce.LzoTextInputFormat /input /output1
    
    • 1

    在这里插入图片描述
    对上传的LZO文件建索引

    [fancy@node101 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar  com.hadoop.compression.lzo.DistributedLzoIndexer /input/bigtable.lzo
    
    • 1

    再次执行WordCount程序

    [fancy@node101 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount -Dmapreduce.job.inputformat.class=com.hadoop.mapreduce.LzoTextInputFormat /input /output2
    
    • 1

    在这里插入图片描述

    C、注意

    如果以上任务,在运行过程中报如下异常

    Container [pid=8468,containerID=container_1594198338753_0001_01_000002] 
    is running 318740992B beyond the 'VIRTUAL' memory limit. Current usage: 
    111.5 MB of 1 GB physical memory used; 2.4 GB of 2.1 GB virtual memory used. Killing container.
    Dump of the process-tree for container_1594198338753_0001_01_000002 :
    
    • 1
    • 2
    • 3
    • 4

    解决办法:在 node101 的 /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml 文件中增加如下配置,然后分发到 node102、node103 服务器上,并重新启动集群。

    
    <property>
       <name>yarn.nodemanager.vmem-check-enabledname>
       <value>falsevalue>
    property>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    5. 基准测试

    在企业中非常关心每天从 Java 后台拉取过来的数据,需要多久能上传到集群?消费者关心多久能从HDFS 上拉取需要的数据?

    为了搞清楚 HDFS 的读写性能,生产环境上非常需要对集群进行压测。

    HDFS 的读写性能主要受网络和磁盘影响比较大。为了方便测试,将 node101、node102、node103 虚拟机网络都设置为 100mbps。

    100Mbps单位是bit;10M/s单位是byte ; 1byte=8bit,100Mbps/8=12.5M/s。

    在这里插入图片描述

    测试网速:

    A、来到 node101 的 /opt/module 目录,创建一个

    [fancy@node101 software]$ python -m SimpleHTTPServer
    
    • 1

    B、在 Web 页面上访问

    node101:8000
    
    • 1

    ① 测试HDFS写性能

    写测试底层原理

    在这里插入图片描述

    测试内容: 向HDFS集群写10个128M的文件

    [fancy@node101 mapreduce]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -write -nrFiles 10 -fileSize 128MB
    
    • 1
    2021-02-09 10:43:16,853 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write
    2021-02-09 10:43:16,854 INFO fs.TestDFSIO:             Date & time: Tue Feb 09 10:43:16 CST 2021
    2021-02-09 10:43:16,854 INFO fs.TestDFSIO:         Number of files: 10
    2021-02-09 10:43:16,854 INFO fs.TestDFSIO:  Total MBytes processed: 1280
    2021-02-09 10:43:16,854 INFO fs.TestDFSIO:       Throughput mb/sec: 1.61
    2021-02-09 10:43:16,854 INFO fs.TestDFSIO:  Average IO rate mb/sec: 1.9
    2021-02-09 10:43:16,854 INFO fs.TestDFSIO:   IO rate std deviation: 0.76
    2021-02-09 10:43:16,854 INFO fs.TestDFSIO:      Test exec time sec: 133.05
    2021-02-09 10:43:16,854 INFO fs.TestDFSIO:
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    注意:nrFiles n 为生成 mapTask 的数量,生产环境一般可通过 node102:8088 查看CPU核数,设置为(CPU核数 - 1)

    • Number of files :生成 mapTask 数量,一般是集群中 (CPU核数 - 1),我们测试虚拟机就按照实际的物理内存 -1 分配即可。(目标,让每个节点都参与测试)

    • Total MBytes processed :单个map处理的文件大小

    • Throughput mb/sec : 单个mapTak的吞吐量
      计算方式:处理的总文件大小/每一个mapTask写数据的时间累加
      集群整体吞吐量:生成mapTask数量*单个mapTak的吞吐量

    • Average IO rate mb/sec 平均mapTak的吞吐量
      计算方式:每个mapTask处理文件大小/每一个mapTask写数据的时间,全部相加除以task数量

    • IO rate std deviation:方差、反映各个 mapTask 处理的差值,越小越均衡

    注意: 如果测试过程中,出现异常

    ① 可以在 yarn-site.xml 中设置虚拟内存检测为 false

    
    <property>
         <name>yarn.nodemanager.vmem-check-enabledname>
         <value>falsevalue>
    property>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    ②分发配置并重启Yarn集群

    测试结果分析

    ①由于副本1就在本地,所以该副本不参与测试

    在这里插入图片描述

    一共参与测试的文件:10个文件 * 2个副本 = 20个
    压测后的速度:1.61
    实测速度:1.61M/s * 20个文件 ≈ 32M/s
    三台服务器的带宽:12.5 + 12.5 + 12.5 ≈ 30m/s
    
    • 1
    • 2
    • 3
    • 4

    所有网络资源都已经用满。

    如果实测速度远远小于网络,并且实测速度不能满足工作需求,可以考虑采用固态硬盘或者增加磁盘个数。

    ② 如果客户端不在集群节点,那就三个副本都参与计算

    在这里插入图片描述

    ② 测试HDFS读性能

    测试内容: 读取 HDFS 集群 10 个 128M 的文件

    [fancy@node101 mapreduce]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB
    
    • 1
    2021-02-09 11:34:15,847 INFO fs.TestDFSIO: ----- TestDFSIO ----- : read
    2021-02-09 11:34:15,847 INFO fs.TestDFSIO:             Date & time: Tue Feb 09 11:34:15 CST 2021
    2021-02-09 11:34:15,847 INFO fs.TestDFSIO:         Number of files: 10
    2021-02-09 11:34:15,847 INFO fs.TestDFSIO:  Total MBytes processed: 1280
    2021-02-09 11:34:15,848 INFO fs.TestDFSIO:       Throughput mb/sec: 200.28
    2021-02-09 11:34:15,848 INFO fs.TestDFSIO:  Average IO rate mb/sec: 266.74
    2021-02-09 11:34:15,848 INFO fs.TestDFSIO:   IO rate std deviation: 143.12
    2021-02-09 11:34:15,848 INFO fs.TestDFSIO:      Test exec time sec: 20.83
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    删除测试生成数据

    [fancy@node101 mapreduce]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -clean
    
    • 1

    测试结果分析:

    为什么读取文件速度大于网络带宽?由于目前只有三台服务器,且有三个副本,数据读取就近原则,相当于都是读取的本地磁盘数据,没有走网络。

    在这里插入图片描述

    ③ 使用 Sort 程序评测 MapReduce

    使用 RandomWriter 来产生随机数,每个节点运行 10 个 Map 任务,每个 Map 产生大约 1G 大小的二进制随机数

    [fancy@node101 mapreduce]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar randomwriter random-data
    
    • 1

    执行Sort程序

    [fancy@node101 mapreduce]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar sort random-data sorted-data
    
    • 1

    验证数据是否真正排好序了

    [fancy@node101 mapreduce]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar testmapredsort -sortInput random-data -sortOutput sorted-data
    
    • 1

    6. Hadoop 参数调优

    A、HDFS参数调优hdfs-site.xml

    The number of Namenode RPC server threads that listen to requests from clients. If dfs.namenode.servicerpc-address is not configured then Namenode RPC server threads listen to requests from all nodes.
    NameNode有一个工作线程池,用来处理不同 DataNode 的并发心跳以及客户端并发的元数据操作。
    对于大集群或者有大量客户端的集群来说,通常需要增大参数dfs.namenode.handler.count的默认值10。

    <property>
        <name>dfs.namenode.handler.countname>
        <value>10value>
    property>
    
    • 1
    • 2
    • 3
    • 4

    dfs.namenode.handler.count=20×〖log〗_e^(Cluster Size),比如集群规模为 8 台时,此参数设置为 41。可通过简单的 python 代码计算该值,代码如下。

    [fancy@node101 ~]$ python
    Python 2.7.5 (default, Apr 11 2018, 07:36:10) 
    [GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
    Type "help", "copyright", "credits" or "license" for more information.
    >>> import math
    >>> print int(20*math.log(8))
    41
    >>> quit()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    B、YARN参数调优yarn-site.xml

    情景描述:总共7台机器,每天几亿条数据,数据源->Flume->Kafka->HDFS->Hive
    面临问题:数据统计主要用HiveSQL,没有数据倾斜,小文件已经做了合并处理,开启的JVM重用,而且IO没有阻塞,内存用了不到50%。但是还是跑的非常慢,而且数据量洪峰过来时,整个集群都会宕掉。基于这种情况有没有优化方案。

    解决办法:

    NodeManager 内存和服务器实际内存配置尽量接近,如服务器有 128g 内存,但是 NodeManager 默认内存 8G,不修改该参数最多只能用 8G 内存。NodeManager 使用的 CPU 核数和服务器 CPU 核数尽量接近。
    yarn.nodemanager.resource.memory-mb NodeManager 使用内存数
    yarn.nodemanager.resource.cpu-vcores NodeManager 使用CPU核数

    三、Zookeeper 安装

    1. 安装 ZK

    详见 Zookeeper (二) --------- Zookeeper 本地安装 这一篇文章

    集群规划

    服务器 node101服务器 node102服务器 node103
    ZookeeperZookeeperZookeeperZookeeper

    2. ZK集群启动停止脚本

    A、在 node101 的 /home/fancy/bin 目录下创建脚本

    [fancy@node101 bin]$ vim zk.sh
    
    • 1

    在脚本中编写如下内容

    #!/bin/bash
    
    case $1 in
    "start"){
    	for i in node101 node102 node103
    	do
            echo ---------- zookeeper $i 启动 ------------
    		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
    	done
    };;
    "stop"){
    	for i in node101 node102 node103
    	do
            echo ---------- zookeeper $i 停止 ------------    
    		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
    	done
    };;
    "status"){
    	for i in node101 node102 node103
    	do
            echo ---------- zookeeper $i 状态 ------------    
    		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
    	done
    };;
    esac
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    B、增加脚本执行权限

    [fancy@node101 bin]$ chmod u+x zk.sh
    
    • 1

    C、Zookeeper集群启动脚本

    [fancy@node101 module]$ zk.sh start
    
    • 1

    D、Zookeeper集群停止脚本

    [fancy@node101 module]$ zk.sh stop
    
    • 1

    四、Kafka 安装

    在这里插入图片描述

    1. Kafka 集群安装

    具体安装详见 Kafka (二) ---------- Kafka 快速入门 这篇文章

    集群规划:

    服务器node101服务器node102服务器node103
    KafkaKafkaKafkaKafka

    2. Kafka 集群启动停止脚本

    A、在 /home/fancy/bin 目录下创建脚本kf.sh

    [fancy@node101 bin]$ vim kf.sh
    
    • 1

    在脚本中填写如下内容

    #! /bin/bash
    case $1 in
    "start"){
        for i in node101 node102 node103
        do
            echo " --------启动 $i Kafka-------"
            ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
        done
    };;
    "stop"){
        for i in node101 node102 node103
        do
            echo " --------停止 $i Kafka-------"
            ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
        done
    };;
    esac
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    B、增加脚本执行权限

    [fancy@node101 bin]$ chmod u+x kf.sh
    
    • 1

    C、kf 集群启动脚本

    [fancy@node101 module]$ kf.sh start
    
    • 1

    D、kf集群停止脚本

    [fancy@node101 module]$ kf.sh stop
    
    • 1

    3. Kafka 常用命令

    A、查看Kafka Topic列表

    [fancy@node101 kafka]$ bin/kafka-topics.sh --zookeeper node101:2181/kafka --list
    
    • 1

    B、创建Kafka Topic

    进入到 /opt/module/kafka/ 目录下创建日志主题

    [fancy@node101 kafka]$ bin/kafka-topics.sh --zookeeper node101:2181,node102:2181,node103:2181/kafka  --create --replication-factor 1 --partitions 1 --topic topic_log
    
    • 1

    C、删除Kafka Topic

    [fancy@node101 kafka]$ bin/kafka-topics.sh --delete --zookeeper node101:2181,node102:2181,node103:2181/kafka --topic topic_log
    
    • 1

    D、Kafka生产消息

    [fancy@node101 kafka]$ bin/kafka-console-producer.sh \
    --broker-list node101:9092 --topic topic_log
    
    • 1
    • 2
    >hello world
    >fancy fancy
    
    • 1
    • 2

    E、Kafka消费消息

    [fancy@node101 kafka]$ bin/kafka-console-consumer.sh \
    --bootstrap-server node101:9092 --from-beginning --topic topic_log
    
    • 1
    • 2

    --from-beginning: 会把主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

    F、查看 Kafka Topic 详情

    [fancy@node101 kafka]$ bin/kafka-topics.sh --zookeeper node101:2181/kafka \
    --describe --topic topic_log
    
    • 1
    • 2

    4. Kafka 机器数量计算

    Kafka机器数量(经验公式) = 2 *(峰值生产速度 * 副本数 / 100)+ 1
    
    • 1

    先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka的数量。

    A、峰值生产速度

    峰值生产速度可以压测得到。

    B、副本数

    副本数默认是1个,在企业里面2-3个都有,2个居多。
    副本多可以提高可靠性,但是会降低网络传输效率。
    比如我们的峰值生产速度是50M/s。副本数为2。

    Kafka机器数量 = 2 *(50 * 2 / 100)+ 1 = 3
    • 1

    5. Kafka压力测试

    A、Kafka压测

    用Kafka官方自带的脚本,对Kafka进行压测。

    kafka-consumer-perf-test.sh
    kafka-producer-perf-test.sh
    
    • 1
    • 2

    Kafka压测时,在硬盘读写速度一定的情况下,可以查看到哪些地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。

    B、Kafka Producer压力测试

    在这里插入图片描述

    Ⅰ、压测环境准备:

    ① node101、node102、node103 的网络带宽都设置为 100mbps 。
    ② 关闭 node101 主机,并根据 node101 克隆出node104 (修改IP和主机名称)
    ③ node104 的带宽不设限
    ④ 创建一个test topic,设置为 3 个分区 2 个副本

    [fancy@node101 kafka]$ bin/kafka-topics.sh --zookeeper node101:2181,node102:2181,node103:2181/kafka --create --replication-factor 2 --partitions 3 --topic test
    
    • 1

    /opt/module/kafka/bin 目录下面有这两个文件。我们来测试一下

    [fancy@node101 kafka]$ bin/kafka-producer-perf-test.sh  --topic test --record-size 100 --num-records 10000000 --throughput -1 --producer-props bootstrap.servers=node101:9092,node102:9092,node103:9092
    
    • 1

    说明:

    • record-size是一条信息有多大,单位是字节。
    • num-records是总共发送多少条信息。
    • throughput 是每秒多少条信息,设成-1,表示不限流,尽可能快的生产数据,可测出生产者最大吞吐量。

    Ⅱ、Kafka 会打印下面的信息

    699884 records sent, 139976.8 records/sec (13.35 MB/sec), 1345.6 ms avg latency, 2210.0 ms max latency.
    713247 records sent, 141545.3 records/sec (13.50 MB/sec), 1577.4 ms avg latency, 3596.0 ms max latency.
    773619 records sent, 153862.2 records/sec (14.67 MB/sec), 2326.8 ms avg latency, 4051.0 ms max latency.
    773961 records sent, 154206.2 records/sec (15.71 MB/sec), 1964.1 ms avg latency, 2917.0 ms max latency.
    776970 records sent, 154559.4 records/sec (15.74 MB/sec), 1960.2 ms avg latency, 2922.0 ms max latency.
    776421 records sent, 154727.2 records/sec (15.76 MB/sec), 1960.4 ms avg latency, 2954.0 ms max latency.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    参数解析: Kafka 的吞吐量 15m/s 左右是否符合预期呢?
    node101、node102、node103 三台集群的网络总带宽 30m/s 左右,由于是两个副本,所以 Kafka 的吞吐量 30m/s ➗ (2副本) = 15m/s
    结论:网络带宽和副本都会影响吞吐量。

    Ⅲ、调整batch.size

    batch.size默认值是16k。
    batch.size较小,会降低吞吐量。比如说,批次大小为0则完全禁用批处理,会一条一条发送消息);
    batch.size过大,会增加消息发送延迟。比如说,Batch设置为64k,但是要等待5秒钟Batch才凑满了64k,才能发送出去。那这条消息的延迟就是5秒钟。

    [fancy@node101 kafka]$ bin/kafka-producer-perf-test.sh  --topic test --record-size 100 --num-records 10000000 --throughput -1 --producer-props bootstrap.servers=node101:9092,node102:9092,node103:9092 batch.size=500
    
    • 1

    输出结果

    69169 records sent, 13833.8 records/sec (1.32 MB/sec), 2517.6 ms avg latency, 4299.0 ms max latency.
    105372 records sent, 21074.4 records/sec (2.01 MB/sec), 6748.4 ms avg latency, 9016.0 ms max latency.
    113188 records sent, 22637.6 records/sec (2.16 MB/sec), 11348.0 ms avg latency, 13196.0 ms max latency.
    108896 records sent, 21779.2 records/sec (2.08 MB/sec), 12272.6 ms avg latency, 12870.0 ms max latency.
    
    • 1
    • 2
    • 3
    • 4

    Ⅳ、linger.ms
    如果设置 batch size 为64k,但是比如过了10分钟也没有凑够64k,怎么办?
    可以设置 linger.ms。比如 linger.ms=5ms,那么就是要发送的数据没有到 64k,5ms 后,数据也会发出去。

    Ⅴ、总结

    同时设置batch.size和 linger.ms,就是哪个条件先满足就都会将消息发送出去。Kafka需要考虑高吞吐量与延时的平衡。

    C、Kafka Consumer压力测试

    在这里插入图片描述

    Ⅰ、Consumer的测试,如果这四个指标 (IO,CPU,内存,网络) 都不能改变,考虑增加分区数来提升性能。

    [fancy@node101 kafka]$ bin/kafka-consumer-perf-test.sh --broker-list node101:9092,node102:9092,node103:9092 --topic test --fetch-size 10000 --messages 10000000 --threads 1
    
    • 1

    ①参数说明:

    --broker-list 指定 Kafka 集群地址
    --topic 指定 topic 的名称
    --fetch-size 指定每次 fetch 的数据的大小
    --messages 总共要消费的消息个数

    ②测试结果说明:

    start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
    2021-08-03 21:17:21:778, 2021-08-03 21:18:19:775, 514.7169, 8.8749, 5397198, 93059.9514
    
    • 1
    • 2

    开始测试时间,测试结束数据,共消费数据514.7169MB,吞吐量8.8749MB/s

    Ⅱ、调整fetch-size

    ① 增加fetch-size值,观察消费吞吐量。

    [fancy@node101 kafka]$ bin/kafka-consumer-perf-test.sh --broker-list node101:9092,node102:9092,node103:9092 --topic test --fetch-size 100000 --messages 10000000 --threads 1
    
    • 1

    ②测试结果说明:
    start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
    2021-08-03 21:22:57:671, 2021-08-03 21:23:41:938, 514.7169, 11.6276, 5397198, 121923.7355

    Ⅲ、总结
    吞吐量受网络带宽和fetch-size的影响

    6. Kafka分区数计算

    • 创建一个只有1个分区的 topic
    • 测试这个 topic 的 producer 吞吐量和 consumer 吞吐量。
    • 假设他们的值分别是Tp和Tc,单位可以是MB/s。
    • 然后假设总的目标吞吐量是Tt,那么分区数 = Tt / min(Tp,Tc)

    例如:producer吞吐量 = 20m/s;consumer吞吐量 = 50m/s,期望吞吐量100m/s;分区数 = 100 / 20 = 5分区
    https://blog.csdn.net/weixin_42641909/article/details/89294698 分区数一般设置为:3-10个

    五、采集日志 Flume

    1. 日志采集Flume安装

    具体安装步骤详见:https://blog.csdn.net/m0_51111980/article/details/126199070

    集群规划:

    服务器node101服务器node102服务器node103
    Flume(采集日志)FlumeFlume

    2. Flume组件选型

    A、Source

    Taildir Source 相比 Exec Source、Spooling Directory Source 的优势:

    • TailDir Source :断点续传、多目录。Flume1.6 以前需要自己自定义 Source 记录每次读取文件位置,实现断点续传。不会丢数据,但是有可能会导致数据重复。
    • Exec Source :可以实时搜集数据,但是在 Flume 不运行或者 Shell 命令出错的情况下,数据将会丢失。
    • Spooling Directory Source 监控目录,支持断点续传。

    batchSize大小如何设置?

    答:Event 1K左右时,500-1000合适(默认为100)

    B、Channel

    采用 Kafka Channel,省去了 Sink,提高了效率。KafkaChannel 数据存储在 Kafka 里面,所以数据是存储在磁盘中。注意在 Flume1.7 以前,Kafka Channel 很少有人使用,因为发现 parseAsFlumeEvent 这个配置起不了作用。也就是无论parseAsFlumeEvent 配置为 true 还是 false,都会转为 Flume Event。这样的话,造成的结果是,会始终都把 Flume 的 headers中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。

    3. 日志采集 Flume 配置

    A、Flume配置分析

    在这里插入图片描述
    Flume直接读log日志的数据,log日志的格式是app.yyyy-mm-dd.log。

    B、Flume的具体配置如下

    /opt/module/flume/conf 目录下创建 file-flume-kafka.conf 文件

    [fancy@node101 conf]$ vim file-flume-kafka.conf
    
    • 1

    在文件配置如下内容

    #为各组件命名
    a1.sources = r1
    a1.channels = c1
    
    #描述source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
    a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
    a1.sources.r1.interceptors =  i1
    a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.ETLInterceptor$Builder
    
    #描述channel
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers = node101:9092,node102:9092
    a1.channels.c1.kafka.topic = topic_log
    a1.channels.c1.parseAsFlumeEvent = false
    
    #绑定source和channel以及sink和channel的关系
    a1.sources.r1.channels = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    注意:com.fancy.flume.interceptor.ETLInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。

    4. Flume 拦截器

    A、创建Maven工程flume-interceptor

    B、创建包名:com.fancy.flume.interceptor

    C、在pom.xml文件中添加如下配置

    <dependencies>
        <dependency>
            <groupId>org.apache.flumegroupId>
            <artifactId>flume-ng-coreartifactId>
            <version>1.9.0version>
            <scope>providedscope>
        dependency>
    
        <dependency>
            <groupId>com.alibabagroupId>
            <artifactId>fastjsonartifactId>
            <version>1.2.62version>
        dependency>
    dependencies>
    
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-pluginartifactId>
                <version>2.3.2version>
                <configuration>
                    <source>1.8source>
                    <target>1.8target>
                configuration>
            plugin>
            <plugin>
                <artifactId>maven-assembly-pluginartifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependenciesdescriptorRef>
                    descriptorRefs>
                configuration>
                <executions>
                    <execution>
                        <id>make-assemblyid>
                        <phase>packagephase>
                        <goals>
                            <goal>singlegoal>
                        goals>
                    execution>
                executions>
            plugin>
        plugins>
    build>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    注意:scope中provided的含义是编译时用该jar包。打包时时不用。因为集群上已经存在flume的jar包。只是本地编译时用一下。

    D、在com.fancy.flume.interceptor包下创建JSONUtils类

    package com.fancy.flume.interceptor;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONException;
    
    public class JSONUtils {
        public static boolean isJSONValidate(String log){
            try {
                JSON.parse(log);
                return true;
            }catch (JSONException e){
                return false;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    E、在com.fancy.flume.interceptor包下创建LogInterceptor类

    package com.fancy.flume.interceptor;
    
    import com.alibaba.fastjson.JSON;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.Iterator;
    import java.util.List;
    
    public class ETLInterceptor implements Interceptor {
    
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
    
            byte[] body = event.getBody();
            String log = new String(body, StandardCharsets.UTF_8);
    
            if (JSONUtils.isJSONValidate(log)) {
                return event;
            } else {
                return null;
            }
        }
    
        @Override
        public List<Event> intercept(List<Event> list) {
    
            Iterator<Event> iterator = list.iterator();
    
            while (iterator.hasNext()){
                Event next = iterator.next();
                if(intercept(next)==null){
                    iterator.remove();
                }
            }
    
            return list;
        }
    
        public static class Builder implements Interceptor.Builder{
    
            @Override
            public Interceptor build() {
                return new ETLInterceptor();
            }
            @Override
            public void configure(Context context) {
    
            }
    
        }
    
        @Override
        public void close() {
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    F、打包

    在这里插入图片描述

    G、需要先将打好的包放入到 node101 的 /opt/module/flume/lib 文件夹下面

    [fancy@node101 lib]$ ls | grep interceptor
    flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
    
    • 1
    • 2

    H、分发 Flume 到 node102、node103

    [fancy@node101 module]$ xsync flume/
    
    • 1

    I、分别在 node101、node102 上启动Flume

    [fancy@node101 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
    
    [fancy@node102 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
    
    • 1
    • 2
    • 3

    5. 测试 Flume-Kafka 通道

    A、生成日志

    [fancy@node101 ~]$ lg.sh
    
    • 1

    B、消费Kafka数据,观察控制台是否有数据获取到

    [fancy@node101 kafka]$ bin/kafka-console-consumer.sh \
    --bootstrap-server node101:9092 --from-beginning --topic topic_log
    
    • 1
    • 2

    说明:如果获取不到数据,先检查 Kafka、Flume、Zookeeper 是否都正确启动。再检查 Flume 的拦截器代码是否正常。

    6. Flume 启动停止脚本

    A、在 /home/fancy/bin 目录下创建脚本 f1.sh

    [fancy@node101 bin]$ vim f1.sh
    
    • 1

    在脚本中填写如下内容

    #! /bin/bash
    
    case $1 in
    "start"){
            for i in node101 node102
            do
                    echo " --------启动 $i 采集flume-------"
                    ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1  &"
            done
    };;	
    "stop"){
            for i in node101 node102
            do
                    echo " --------停止 $i 采集flume-------"
                    ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
            done
    
    };;
    esac
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 说明1:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup 就是不挂起的意思,不挂断地运行命令。

    • 说明2:awk 默认分隔符为空格

    • 说明3:$2是在""双引号内部会被解析为脚本的第二个参数,但是这里面想表达的含义是 awk 的第二个值,所以需要将他转义,用\$2表示。

    • 说明4:xargs 表示取出前面命令运行的结果,作为后面命令的输入参数。

    B、增加脚本执行权限

    [fancy@node101 bin]$ chmod u+x f1.sh
    
    • 1

    C、f1集群启动脚本

    [fancy@node101 module]$ f1.sh start
    
    • 1

    D、f1集群停止脚本

    [fancy@node101 module]$ f1.sh stop
    
    • 1

    六、消费 Kafka 数据 Flume

    集群规划

    服务器node101服务器node102服务器103
    Flume(消费Kafka)Flume

    1. Flume 组件选型

    A、FileChannel和MemoryChannel区别

    MemoryChannel 传输数据速度更快,但因为数据保存在 JVM 的堆内存中,Agent 进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。

    FileChannel 传输速度相对于 Memory 慢,但数据安全保障高,Agent 进程挂掉也可以从失败中恢复数据。

    选型:

    • 金融类公司、对钱要求非常准确的公司通常会选择 FileChannel
    • 传输的是普通日志信息(京东内部一天丢100万-200万条,这是非常正常的),通常选择MemoryChannel。

    B、FileChannel优化

    通过配置 dataDirs 指向多个路径,每个路径对应不同的硬盘,增大 Flume 吞吐量。

    官方说明如下:

    Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
    
    • 1

    checkpointDir 和 backupCheckpointDir 也尽量配置在不同硬盘对应的目录中,保证 checkpoint 坏掉后,可以快速使用backupCheckpointDir 恢复数据。

    在这里插入图片描述
    C、Sink:HDFS Sink

    • HDFS存入大量小文件,有什么影响?
      元数据层面: 每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在 Namenode 内存中。所以小文件过多,会占用 Namenode 服务器大量内存,影响 Namenode 性能和使用寿命。
      计算层面: 默认情况下 MR 会对每个小文件启用一个 Map 任务计算,非常影响计算性能。同时也影响磁盘寻址时间。

    • HDFS小文件处理
      官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
      基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:
      ①文件在达到128M时会滚动生成新文件
      ②文件创建超3600秒时会滚动生成新文件

    2. 消费者 Flume 配置

    A、Flume配置分析
    在这里插入图片描述
    Flume的具体配置如下:

    在 node103 的 /opt/module/flume/conf目录下创建 kafka-flume-hdfs.conf 文件

    [fancy@node103 conf]$ vim kafka-flume-hdfs.conf
    
    • 1

    在文件配置如下内容

    ## 组件
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1
    
    
    ## source1
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 5000
    a1.sources.r1.batchDurationMillis = 2000
    a1.sources.r1.kafka.bootstrap.servers = node101:9092,node102:9092,node103:9092
    a1.sources.r1.kafka.topics=topic_log
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimeStampInterceptor$Builder
    
    ## channel1
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
    a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
    
    
    ## sink1
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
    a1.sinks.k1.hdfs.filePrefix = log-
    a1.sinks.k1.hdfs.round = false
    
    #控制生成的小文件
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 134217728
    a1.sinks.k1.hdfs.rollCount = 0
    
    ## 控制输出文件是原生文件。
    a1.sinks.k1.hdfs.fileType = CompressedStream
    a1.sinks.k1.hdfs.codeC = lzop
    
    ## 拼装
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel= c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    3. Flume 时间戳拦截器

    由于 Flume 默认会用 Linux 系统时间,作为输出到 HDFS 路径的时间。如果数据是 23:59 分产生的。Flume 消费 Kafka 里面的数据时,有可能已经是第二天了,那么这部门数据会被发往第二天的 HDFS 路径。我们希望的是根据日志里面的实际时间,发往HDFS的路径,所以下面拦截器作用是获取日志中的实际时间。

    解决的思路:拦截json日志,通过 fastjson 框架解析 json,获取实际时间ts。将获取的ts时间写入拦截器header头,header的key必须是timestamp,因为Flume框架会根据这个key的值识别为时间,写入到HDFS。

    A、在com.fancy.flume.interceptor包下创建TimeStampInterceptor类

    package com.fancy.flume.interceptor;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    public class TimeStampInterceptor implements Interceptor {
    
        private ArrayList<Event> events = new ArrayList<>();
    
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
    
            Map<String, String> headers = event.getHeaders();
            String log = new String(event.getBody(), StandardCharsets.UTF_8);
    
            JSONObject jsonObject = JSONObject.parseObject(log);
    
            String ts = jsonObject.getString("ts");
            headers.put("timestamp", ts);
    
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> list) {
            events.clear();
            for (Event event : list) {
                events.add(intercept(event));
            }
    
            return events;
        }
    
        @Override
        public void close() {
    
        }
    
        public static class Builder implements Interceptor.Builder {
            @Override
            public Interceptor build() {
                return new TimeStampInterceptor();
            }
    
            @Override
            public void configure(Context context) {
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    B、重新打包

    在这里插入图片描述
    C、需要先将打好的包放入到 node101 的 /opt/module/flume/lib 文件夹下面

    [fancy@node101 lib]$ ls | grep interceptor
    flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
    
    • 1
    • 2

    D、分发 Flume 到 node102、node103

    [fancy@node101 module]$ xsync flume/
    
    • 1

    4. 消费者 Flume 启动停止脚本

    A、在 /home/fancy/bin 目录下创建脚本 f2.sh

    [fancy@node101 bin]$ vim f2.sh
    
    • 1

    在脚本中填写如下内容

    #! /bin/bash
    
    case $1 in
    "start"){
            for i in node103
            do
                    echo " --------启动 $i 消费flume-------"
                    ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log2.txt   2>&1 &"
            done
    };;
    "stop"){
            for i in node103
            do
                    echo " --------停止 $i 消费flume-------"
                    ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
            done
    
    };;
    esac
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    B、增加脚本执行权限

    [fancy@node101 bin]$ chmod u+x f2.sh
    
    • 1

    C、f2集群启动脚本

    [fancy@node101 module]$ f2.sh start
    
    • 1

    D、f2集群停止脚本

    [fancy@node101 module]$ f2.sh stop
    
    • 1

    5. Flume 内存优化

    A、问题描述:

    如果启动消费Flume抛出如下异常

    ERROR hdfs.HDFSEventSink: process failed
    java.lang.OutOfMemoryError: GC overhead limit exceeded
    
    • 1
    • 2

    B、解决方案步骤

    在 node101 服务器的 /opt/module/flume/conf/flume-env.sh 文件中增加如下配置

    export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
    
    • 1

    同步配置到 node102、node103 服务器

    [fancy@node101 conf]$ xsync flume-env.sh
    
    • 1

    C、Flume内存参数设置及优化

    JVM heap一般设置为4G或更高
    -Xmx-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。
    -Xms表示JVM Heap(堆内存) 最小尺寸,初始分配;
    -Xmx 表示 JVM Heap(堆内存) 最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发 fullgc。

    6. 采集通道启动/停止脚本

    A、在 /home/fancy/bin 目录下创建脚本cluster.sh

    [fancy@node101 bin]$ vim cluster.sh
    
    • 1

    在脚本中填写如下内容

    #!/bin/bash
    
    case $1 in
    "start"){
            echo ================== 启动 集群 ==================
    
            #启动 Zookeeper集群
            zk.sh start
    
            #启动 Hadoop集群
            hdp.sh start
    
            #启动 Kafka采集集群
            kf.sh start
    
            #启动 Flume采集集群
            f1.sh start
    
            #启动 Flume消费集群
            f2.sh start
    
            };;
    "stop"){
            echo ================== 停止 集群 ==================
    
            #停止 Flume消费集群
            f2.sh stop
    
            #停止 Flume采集集群
            f1.sh stop
    
            #停止 Kafka采集集群
            kf.sh stop
    
            #停止 Hadoop集群
            hdp.sh stop
    
            #停止 Zookeeper集群
            zk.sh stop
    
    };;
    esac
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    B、增加脚本执行权限

    [fancy@node101 bin]$ chmod u+x cluster.sh
    
    • 1

    C、cluster集群启动脚本

    [fancy@node101 module]$ cluster.sh start
    
    • 1

    D、cluster集群停止脚本

    [fancy@node101 module]$ cluster.sh stop
    
    • 1

    七、常见问题及解决方案

    1. 2NN页面不能显示完整信息

    A、问题描述

    访问 2NN 页面 http://node103:9868,看不到详细信息

    B、解决办法

    在浏览器上按F12,查看问题原因。定位bug在61行

    找到要修改的文件

    [fancy@node101 static]$ pwd
    /opt/module/hadoop-3.1.3/share/hadoop/hdfs/webapps/static
    
    [fancy@node101 static]$ vim dfs-dust.js
    :set nu
    
    • 1
    • 2
    • 3
    • 4
    • 5

    修改61行

    return new Date(Number(v)).toLocaleString();
    
    • 1

    C、分发dfs-dust.js

    [fancy@node101 static]$ xsync dfs-dust.js
    
    • 1

    D、在 http://node103:9868/status.html 页面强制刷新

  • 相关阅读:
    如何申请百度apikey
    Elasticsearch实战(七)--- 词条为中心的 CrossFields 多字段搜索策略
    重复的DNA序列[hash判定重复+滑动窗口+二进制编码之位运算]
    ansible User 模块
    7月11日学习打卡,数据结构栈
    TCP和UDP和端口
    2023湖北汽车工业学院计算机考研信息汇总
    C++知识点总结(22):模拟算法
    c语言练习63:用malloc开辟二维数组的三种办法
    Mysql数据库 8.SQL语言 外键约束
  • 原文地址:https://blog.csdn.net/m0_51111980/article/details/127413835