HDFS: 安装部署,配置参数,架构思想(DataNode, NameNode),基本原理 、文件读写, 元数据,命令操作,API操作,联邦路由,机架感知,基本调优,RPC。
YARN:架构思想,组成模块,基本原理,调度器,调度原理, API操作,日志查看,参数优化,运维调优。
MapReduce: 安装部署,配置参数,架构思想,基本原理、map reduce的过程,API操作,shuffle, 读写原理
日志聚集
Hadoop 是分布式系统基础架构。在某种程度上将多台计算机组织成了一台分布式计算机执行任务,
hadoop三大组件:
HDFS 相当于这台分布式计算机的硬盘,文件系统
YARN 相当于CPU、内存资源控制器
MapReduce 是这台计算机运行程序的框架
HDFS文件读写,用MapReduce框架编写程序,提交给YARN运行
三大发行版本:Apache、Cloudera、Hortonworks。
Apache 版本最原始(最基础)的版本
Cloudera 内部集成了很多大数据框架,对应产品 CDH。
Hortonworks 文档较好,对应产品 HDP。
2018被收购,推出新的品牌 CDP。
hadoop version
# 配置
cd $HADOOP_HOME/etc/hadoop
|-- workers 记录所有的数据节点的主机名或 IP 地址
|-- core-site.xml Hadoop 核心配置
|-- hdfs-site.xml HDFS 配置项
|-- mapred-site.xml MapReduce 配置项
|-- yarn-site.xml YRAN 配置项
# 启停
cd $HADOOP_HOME/sbin
# 启动HDFS(NameNode节点)
start-dfs.sh
# 启动YARN(ResourceManager节点)
start-yarn.sh
cd $HADOOP_HOME/bin
# 资料,案例
cd $HADOOP_HOME/share
高可靠性:数据块复制备份在不同服务器上
高扩展性:节点方便扩展
高性能:任务跟数据走
(1)任务和数据在同一节点
(2)任务和数据在同一机架
(3)任务和数据不在同一节点也不在同一机架
高容错性:能够自动将失败的任务重新分配。
高可用 HA模式
文件系统
HDFS 集群是建立在 Hadoop 集群之上
主从结构,NameNode=主,DataNode=从
一个文件分成多个数据块 block 存储到DataNode,文件元数据记录在NameNode
存储单位:block 块, 默认128M
节点数量:(2.x 1个)(3.x 多个)
作用:
保存元数据(文件目录结构,文件名,文件属性、文件的block列表 与所在的DataNode)
指挥其它节点存储的节点
元数据都先保存到内存、定时持久化到本地磁盘
一个block一条数据,block位置由DataNode维护
储存block的节点
每个 block 都会有一个校验码,并存放到独立的文件中,以便读的时候来验证其完整性
数据在 DataNode 之间是通过 pipeline 的方式进行复制的
节点数量:多个
作用:
分摊命名节点的压力、定时备份NN的状态并执行一些管理工作
提供备份数据以恢复命名节点
一个文件如果小于128MB,则按照真实的文件大小独占一个数据存储块,存放到DataNode节点中
1浪费DataNode空间 2NameNode存储元数据膨胀

SequenceFile 存储方案
二进制文件格式,类似key-value存储, 通过MR程序从文件中将文件取出
CombinedFile 存储方案
基于Map/Reduce将原文件进行转换,通过CombineFile InputFormat类将多个文件分别打包到一个split中, 将小文件进行整合
基于HBase的小文件存储方案
基于打包构建索引方案
基于压缩的思路,将多个小文件压缩成一个tar文件存放至HDFS上,通过HBASE记录文件名和HDFS文件的位置映射关系
对于既要存储大文件又要存储小文件的场景,我建议在上层作一个逻辑处理层,在存储时先判断是大文件还是小文件,再决定是否用打包压缩还是直接上传至HDFS
HDFS Client:本地磁盘的HDFS客户端(文件切分,提供HDFS命令,读写数据)


查看 HDFS 上存储的数据信息
http://NameNode-IP:9870
hadoop fs [命令]
hdfs dfs [命令]
# 显示根目录 / 下的文件和子目录,绝对路径
hadoop fs -ls /
# 新建文件夹,绝对路径
hadoop fs -mkdir /hello
# 上传文件
hadoop fs -put hello.txt /hello/
# 下载文件
hadoop fs -get /hello/hello.txt
# 输出文件内容
hadoop fs -cat /hello/hello.txt
# 从本地磁盘复制文件到HDFS
hadoop fs -copyFromLocal [localSource] [hdfs]
# 检查文件的完整性
hadoop fsck
# 重新平衡HDFS
start-balancer.sh
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
dependency>
代码向数据迁移:大数据系统会尽量地将任务分配到离数据最近的机器上运行(程序运行时,将程序及其依赖包都复制到数据所在的机器运行) 尽量让一段数据的计算发生在同一台机器上
传输时间 << 寻道时间
一般数据写入后不再修改
hadoop 数据只在自己集群节点传输, 需要序列化 不需要复杂的处理,自己实现了比java轻量的序列化
序列化接口:Writable
默认序列化类型
| Java类型 | Hadoop Writable类型 |
|---|---|
| boolean | BooleanWritable |
| byte | ByteWritable |
| int | IntWritable |
| float | FloatWritable |
| long | LongWritable |
| double | DoubleWritable |
| String | Text |
| map | MapWritable |
| array | ArrayWritable |
两个阶段
一个 Map 阶段和一个 Reduce 阶段
三类实例
MrAppMaster实例: 整个程序的过程调度及状态协调
map阶段:并发执行MapTask 实例
Reduce 阶段:并发执行ReduceTask 实例
job提交
FileInputFormat每个文件单独切片
数量:一个MapTask处理一个切片》MapTask个数=切片数
切片大小
切片规划文件提交到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数。
shuffle阶段,数据按照条件输出到不同文件
分区号必须从零开始,逐一累加
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
自定义Partitioner
job.setPartitionerClass(CustomPartitioner.class);
job.setNumReduceTasks(n);
程序入口 driver类
作用:job驱动,提交job, yarn创建MrAppMaster进程(Mr=job)
job.waitForCompletion();
submit(){
// 1 建立连接
connect();
// 创建job代理
// 判断是 yarn集群运行环境 还是 本地运行环境
YarnClient
LocalClient
// 2 提交job
// 创建job路径
// 拷贝jar包到集群
// 计算切片,产生 切片规划文件
// 提交job,返回状态
}


| 输入 | 输出 | |
|---|---|---|
| input | 文件 | |
| mapper | list ( | |
| shuffle | list ( | |
| reduce | list ( |
作用:分割及映射
并发运行MapTask处理数据, 互不相干
各种InputFormat读取机制不同:
默认用TextInputFormat类
class TextInputFormat{
// 读取文件,返回kv给mapper
RecorderReader(){}
// 判断文件是否可切片
isSplitable(){}
}
FilelnputFormat。RecordReader,实现一次读取一个完整文件,将文件名为key,文件内容为value。SequenceFileOutPutFormat输出合并文件。Mapper抽象类
// 输入kv
Mapper< 输入k,输入v,输出可排序类型k,输出v> >{
//重写三个方法:reduce() setup() cleanup ()
map(输入k,v,Context);
setup(Context);
cleanup(Context);
public void run(Context context){
setup(context);
try {
while (context.nextKeyValue()){
map(...)
}
} finally {
cleanup(context);
}
}
}
在map阶段输出时,reducer拉取之前,混洗数据的过程

hadoop默认会对key进行排序
map阶段:环形缓冲区,溢写前快排,溢写后归并
reducer阶段:拉取数据后归并排序
可选流程。shuffle最后 相同key合并
优:以减少磁盘 IO、减少磁盘存储空间
缺:增加 CPU 开销
IO 密集型的 Job,多用压缩
作用:重排,还原,合并结果
依赖map阶段MapTask的输出,并发运行ReduceTask处理数据, 互不相干

Reducer抽象类
// 输入kv = map输出kv的聚合
Reducer< 输入kv<String,int>,输出kv<String,int> >{
//重写三个方法:reduce() setup() cleanup ()
reduce(k,Iterable<v>,Context);
setup(Context);
cleanup(Context);
public void run(Context context){
setup(context);
try {
while (context.nextKey()){
reduce(...)
}
} finally {
cleanup(context);
}
}
}
输出到文件、mysql、其他数据库等等
默认实现类是 TextOutputFormat,
功能逻辑是:将每一个 KV 对,向目标文本文件输出一行。

| 输入 | 输出 | |
|---|---|---|
| input | 源文件 | < 行偏移,行内容 > |
| mapper | < 行偏移,行内容 > | list ( |
| shuffle | list ( | <有序 归并word, list(1)> |
| reduce | list ( |
void map(key,value,context){
//获取一行
String line = value.toString();
//行分词
String[] words = line.split(" ");
//输出
Text outK = new Text();
IntWritable outV = new IntWritable(1);
outK.set(word);
context.write(outK,outV);
}
// 输入kv = map输出kv的聚合
Reducer< 输入kv<String,int>,输出kv<String,int> >
逻辑写在 reducer() 方法
ReducerTask 对每组相同k的输入kv调用一次reducer()
void reduce(key,Iterable<Int> values,context){
//遍历 values集合
for (IntWritable count : values) {
//累加value
sum += count.get();
}
//输出
IntWritable v = new IntWritable();
v.set(sum);
context.write(key,v);
}
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取配置信息, 获取 job 对象实例
//框架会按照JobConf描述的信息去完成这个作业
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 关联本 Driver 程序的 jar
job.setJarByClass(WordCountDriver.class);
// 3 关联 Mapper 和 Reducer 的 jar
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置 Mapper 输出的 kv 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出 kv 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
job.setInputFormatClass(input类.class);
job.setOutputFormatClass(output类.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交 job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
# 运行job,本地方式或yarn方式
# hadoop jar [mainClass: driver类名] args...
hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar wordcount input.txt output
需求:读取不同的数据格式,进行join操作(2张表 数据join)
setup(Context context){
//判断数据源
FileSplit fileSplit = (FileSplit) context.getInputSplit();//文件切片信息
filename = fileSplit.getPath().getName();
}
map(...){
//不同filename 输出value的数据不同
}
reduce(...){
//相同key 不同value 数据join
}
Driver 驱动类{
//缓存小表
}
mapper类
setup(Context context){
//获取缓存
}
map(...){
//读大表
//join 大表-缓存
}
清理的过程往往只需要运行 Mapper 程序,不需要运行 Reduce 程序。
需求:筛选error日志
// 在 Map 阶段对输入的数据根据规则进行过滤清洗。
map(key,value,context){
if(不合法){
return;
}
//直接写出value
context.write(value, NullWritable.get());
}
job.setNumReduceTasks(0); // 设置 reducetask 个数为 0
为 MapReduce 提供资源管理服务 ,主要管理cpu、内存
管理整个集群的资源
管理单节点资源,每个节点上一个
单个作业的资源管理和任务监控
资源申请的单位和任务运行的容器


job.waitForCompletion(), 产生一个YarnRunner—
ResourceManager 任务调度队列,任务优先级
三种作业调度器
资源默认只考虑内存,DRF策略(内存+CPU 综合配比)
多队列
多用户

单个队列内所有任务共享资源,时间尺度公平资源
多队列
多用户

是否饥饿: 实际使用份额<最小资源份额
资源配比:实际使用份额 / 最小资源份额
饥饿优先,资源配比小优先,时间顺序
查看 YARN 上运行的 Job 信息
http://ResourceManager-ip:8088
# 列出所有 Application
yarn application -list
>>> Application-Id Application-Name Application-Type
# 根据 Application 状态过滤
yarn application -list -appStates FINISHED
# Kill Application:
yarn application -kill <Application-Id>
# 查看任务
yarn applicationattempt -list <ApplicationId>
>>> ApplicationAttempt-Id State Container-Id Tracking-URL
# 查看容器
yarn container -list <ApplicationAttemptId>
# 查询日志 Application :
yarn logs -applicationId <Application-Id>
# 查询日志 Container :
yarn logs -applicationId <ApplicationId> -containerId <ContainerId>
# 查看节点状态
yarn node -list -all
# 查看队列
yarn queue -status <QueueName>
# 按yarn平台运行job
yarn jar <jar> [mainClass] args...
新建hadoop集群,基础组件可直接使用
/usr/local/service/hadoophttps://cloud.tencent.com/document/product/589/12289