A、简介
一、概述
1. MapReduce是Hadoop提供的一套用于进行分布式计算的框架
2. 将计算过程拆分为2个阶段:Map(映射)阶段和Reduce(规约)阶段

MapReduce在对文件进行计算的时候,会先将文件进行切片(注:切片和切块不一样),每一个切片对应的
MapTask默认情况下,每一个MapTask在拿到切片之后会进行按行读取按行处理。
入门例子:
统计字母的数量
package cn.Ajaxtxdy.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// 需要指定Map阶段的处理逻辑
// MapReduce中,要求所有的对象都能够被序列化
// MapReduce所默认采取的序列化机制是AVRO
// KEYIN - 输入的键的类型 - 默认输入的键表示的是一行数据的字节偏移量
// VALUEIN - 输入的值的类型 - 默认输入的值表示的是读取的一行数据
// KEYOUT - 输出的键的类型
// VALUEOUT - 输出的值的类型
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
// key - 行偏移量
// value - 一行数据
// context - 用于数据传递和指定环境参数
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// hello tom hello bob
// 先将这一行中每一个单词拆分出来
String[] arr = value.toString().split(" ");
// hello 2 tom 1 bob 1
// hello 1 tom 1 hello 1 bob 1
for (String str : arr) {
context.write(new Text(str), new IntWritable(1));
}
}
}
package cn.Ajaxtxdy.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
// Reduce的数据从Map来,那么也就意味着Map的输出就是Reduce的输入
// KEYIN, VALUEIN
// 最后应该是输出每一个单词对应的总次数
// KEYOUT, VALUEOUT
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
// key - 输入的键
// values - 键所对应的所有的值
// hello
// values = 1,1,1,1,1,1,1...
// context - 将结果写到HDFS上
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
package cn.Ajaxtxdy.wordcount;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// System.setProperty("hadoop.home.dir","hadoop安装路径");
// 先向Hadoop申请一个job任务执行逻辑
Job job = Job.getInstance();
// 设置入口类
job.setJarByClass(WordCountDriver.class);
// 设置Mapper类
job.setMapperClass(WordCountMapper.class);
// 设置Reducer类
job.setReducerClass(WordCountReducer.class);
// 设置Mapper的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置Reducer的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入文件
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/words.txt"));
// 设置输出路径
// 输出路径要求不存在
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/wordcount"));
// 启动执行
job.waitForCompletion(true);
}
}
例1:统计文件中每一个非空字符的个数
package cn.Ajaxtxdy.charcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class CharCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 拆分字符
char[] cs = value.toString().replaceAll("\\s", "").toCharArray();
for (char c : cs) {
context.write(new Text(c + ""), new LongWritable(1));
}
}
}
package cn.Ajaxtxdy.charcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class CharCountReducer
extends Reducer<Text, LongWritable, Text, LongWritable> {
public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable val : values) {
sum += val.get();
}
context.write(key, new LongWritable(sum));
}
}
package cn.Ajaxtxdy.charcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CharCountDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(cn.Ajaxtxdy.charcount.CharCountDriver.class);
job.setMapperClass(CharCountMapper.class);
job.setReducerClass(CharCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/characters.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/charcount"));
if (!job.waitForCompletion(true))
return;
}
}
例2:IP去重
package cn.Ajaxtxdy.ip;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class IPMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
package cn.Ajaxtxdy.ip;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class IPReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
// key = IP
// value = null,null,null
public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
package cn.Ajaxtxdy.ip;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class IPDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(cn.tedu.Ajaxtxdy.IPDriver.class);
job.setMapperClass(IPMapper.class);
job.setReducerClass(IPReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 如果输入的路径是指定文件,则只读取这个文件
// 如果输入的路径是目录,则读取这个目录下所有的文件
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/ip.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/ip"));
if (!job.waitForCompletion(true))
return;
}
}
例3:统计每一个篮球运动员的总得分
package cn.Ajaxtxdy.totalscore;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TotalScoreMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// LBJ 39
String[] arr = value.toString().split(" ");
context.write(new Text(arr[0]),
new IntWritable(Integer.parseInt(arr[1])));
}
}
package cn.Ajaxtxdy.totalscore;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TotalScoreReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
package cn.Ajaxtxdy.totalscore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TotalScoreDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(cn.Ajaxtxdy.totalscore.TotalScoreDriver.class);
job.setMapperClass(TotalScoreMapper.class);
job.setReducerClass(TotalScoreReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/score2/"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/totalscore"));
if (!job.waitForCompletion(true))
return;
}
}
例4:获取最大值
package cn.Ajaxtxdy.maxscore;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxScoreMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] arr = value.toString().split(" ");
context.write(new Text(arr[0]), new IntWritable(Integer.parseInt(arr[1])));
}
}
package cn.Ajaxtxdy.maxscore;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxScoreReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// 在MapReduce中,为了减少对象的创建和销毁,采用了地址复用机制
// 在迭代过程中,被迭代的对象只创建一次
IntWritable max = new IntWritable(0);
// key = LBJ
// values = 684 512 340 312
// IntWritable val = new IntWritable();
// val.set(684);
// val.get() > max.get() -> 684 > 0 -> true
// max = val; - 将val赋值给max,给的是地址,所以max和val的指向地址一致
// val.set(512);
// val.get() > max.get() -> 512 > 512 -> false
// 最后max的值是最后一个被迭代的值
for (IntWritable val : values) {
if (val.get() > max.get())
// max = val;
max.set(val.get());
}
context.write(key, max);
}
}
package cn.Ajaxtxdy.maxscore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxScoreDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(cn.Ajaxtxdy.maxscore.MaxScoreDriver.class);
job.setMapperClass(MaxScoreMapper.class);
job.setReducerClass(MaxScoreReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/score2.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/maxscore"));
if (!job.waitForCompletion(true))
return;
}
}
B、组件
一、序列化
1. 在MapReduce中,要求数据能够被序列化
2. MapReduce的序列化机制默认采用的AVRO
3. MapReduce对AVRO的序列化机制进行了封装,提供了更简便的序列化形式 - 实现接口Writable
例1:求总分
package cn.Ajaxtxdy.serialscore;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Score implements Writable {
private String name;
private int chinese;
private int math;
private int english;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getChinese() {
return chinese;
}
public void setChinese(int chinese) {
this.chinese = chinese;
}
public int getMath() {
return math;
}
public void setMath(int math) {
this.math = math;
}
public int getEnglish() {
return english;
}
public void setEnglish(int english) {
this.english = english;
}
@Override
public void readFields(DataInput in) throws IOException {
this.name = in.readUTF();
this.chinese = in.readInt();
this.math = in.readInt();
this.english = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(chinese);
out.writeInt(math);
out.writeInt(english);
}
}
package cn.Ajaxtxdy.serialscore;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SerialScoreMapper extends Mapper<LongWritable, Text, Text, Score> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] arr = value.toString().split(" ");
Score s = new Score();
s.setName(arr[0]);
s.setChinese(Integer.parseInt(arr[1]));
s.setMath(Integer.parseInt(arr[2]));
s.setEnglish(Integer.parseInt(arr[3]));
context.write(new Text(s.getName()), s);
}
}
package cn.Ajaxtxdy.serialscore;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SerialScoreReducer extends Reducer<Text, Score, Text, IntWritable> {
public void reduce(Text key, Iterable<Score> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (Score val : values) {
sum = val.getChinese() + val.getMath() + val.getEnglish();
}
context.write(key, new IntWritable(sum));
}
}
package cn.Ajaxtxdy.serialscore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SerialScoreDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(cn.Ajaxtxdy.serialscore.SerialScoreDriver.class);
job.setMapperClass(SerialScoreMapper.class);
job.setReducerClass(SerialScoreReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Score.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/score.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/serialscore"));
if (!job.waitForCompletion(true))
return;
}
}
二、分区 - Partition
1. 分区的作用是将数据进行分类
2. 有无分区并不影响Map和Reduce的执行逻辑
3. 分区默认是从0开始依次递增的
4. 在MapReduce中,每一个分区要对应一个ReduceTask,每一个ReduceTask都会产生一个结果文件。默认情况下只有1个分区,也就只有1个ReduceTask,只产生一个结果文件
5. 在MapReduce中,如果没有手动指定Partitioner,那么默认使用的分区类是HashPartitioner

例子1:按地区统计每一个人的总流量
package cn.Ajaxtxdy.partflow;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Flow implements Writable {
private String phone;
private String addr;
private String name;
private int flow;
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getAddr() {
return addr;
}
public void setAddr(String addr) {
this.addr = addr;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getFlow() {
return flow;
}
public void setFlow(int flow) {
this.flow = flow;
}
// 反序列化
// 按照什么顺序写的,就得按照什么顺序读
@Override
public void readFields(DataInput in) throws IOException {
this.phone = in.readUTF();
this.addr = in.readUTF();
this.name = in.readUTF();
this.flow = in.readInt();
}
// 序列化
// 只需要将有必要的属性来依次写出即可序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phone);
out.writeUTF(addr);
out.writeUTF(name);
out.writeInt(flow);
}
}
package cn.Ajaxtxdy.partflow;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class AddrPartitioner extends Partitioner<Text, Flow> {
// 指定分类规则
@Override
public int getPartition(Text key, Flow value, int numReduceTasks) {
// 按照地区分类
// 先拿到地区
String addr = value.getAddr();
if (addr.equals("bj"))
return 0;
else if (addr.equals("sh"))
return 1;
else
return 2;
}
}
package cn.Ajaxtxdy.partflow;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class PartFlowMapper extends Mapper<LongWritable, Text, Text, Flow> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] arr = value.toString().split(" ");
Flow f = new Flow();
f.setPhone(arr[0]);
f.setAddr(arr[1]);
f.setName(arr[2]);
f.setFlow(Integer.parseInt(arr[3]));
context.write(new Text(f.getName()), f);
}
}
package cn.Ajaxtxdy.partflow;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class PartFlowReducer extends Reducer<Text, Flow, Text, IntWritable> {
public void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (Flow val : values) {
sum += val.getFlow();
}
context.write(key, new IntWritable(sum));
}
}
package cn.Ajaxtxdy.partflow;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PartFlowDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(cn.Ajaxtxdy.serialflow.SerialFlowDriver.class);
job.setMapperClass(PartFlowMapper.class);
job.setReducerClass(PartFlowReducer.class);
// 设置分区类
job.setPartitionerClass(AddrPartitioner.class);
// 设置ReduceTask的数量
job.setNumReduceTasks(3);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Flow.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/flow.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/partflow"));
if (!job.waitForCompletion(true))
return;
}
}
例2:按月份统计每一个人的总成绩
package cn.Ajaxtxdy.partscore;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Score implements Writable {
private int month;
private String name;
private int score;
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
@Override
public void readFields(DataInput in) throws IOException {
this.month = in.readInt();
this.name = in.readUTF();
this.score = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(month);
out.writeUTF(name);
out.writeInt(score);
}
}
package cn.Ajaxtxdy.partscore;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class PartScoreMapper extends Mapper<LongWritable, Text, Text, Score> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] arr = value.toString().split(" ");
Score s = new Score();
s.setMonth(Integer.parseInt(arr[0]));
s.setName(arr[1]);
s.setScore(Integer.parseInt(arr[2]));
context.write(new Text(s.getName()), s);
}
}
package cn.Ajaxtxdy.partscore;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class PartScoreReducer extends Reducer<Text, Score, Text, IntWritable> {
public void reduce(Text key, Iterable<Score> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (Score val : values) {
sum += val.getScore();
}
context.write(key, new IntWritable(sum));
}
}
package cn.Ajaxtxdy.partscore;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MonthPartitioner extends Partitioner<Text, Score> {
@Override
public int getPartition(Text key, Score value, int numReduceTasks) {
// 按月份进行分类
// 1 2 3
// 0 1 2
return value.getMonth() - 1;
}
}
package cn.Ajaxtxdy.partscore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PartScoreDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(cn.Ajaxtxdy.partscore.PartScoreDriver.class);
job.setMapperClass(PartScoreMapper.class);
job.setReducerClass(PartScoreReducer.class);
job.setPartitionerClass(MonthPartitioner.class);
job.setNumReduceTasks(3);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Score.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/score1/"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/partscore"));
if (!job.waitForCompletion(true))
return;
}
}
三、排序
1. 在MapReduce中,自动对键进行排序
2. 要求键所对应的类必须实现Comparable接口,但是考虑到键需要序列化,所以一般实现的WritableComparable
3. 在排序过程中,如果compareTo方法的返回值为0,则MapReduce会认为这两个键是同一个,则将这两个键的值放到一组,相当于去重过程
例1:按分数升序排序
package cn.Ajaxtxdy.sortscore;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class Score implements WritableComparable<Score> {
private String name;
private int score;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(score);
}
@Override
public void readFields(DataInput in) throws IOException {
this.name = in.readUTF();
this.score = in.readInt();
}
// 按照分数降序排序
@Override
public int compareTo(Score o) {
return o.score - this.score;
}
}
package cn.Ajaxtxdy.sortscore;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SortScoreMapper extends Mapper<LongWritable, Text, Score, NullWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] arr = value.toString().split("\t");
Score s = new Score();
s.setName(arr[0]);
s.setScore(Integer.parseInt(arr[1]));
context.write(s, NullWritable.get());
}
}
package cn.Ajaxtxdy.sortscore;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SortScoreReducer extends Reducer<Score, NullWritable, Text, IntWritable> {
public void reduce(Score key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
context.write(new Text(key.getName()), new IntWritable(key.getScore()));
}
}
package cn.Ajaxtxdy.sortscore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortScoreDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(cn.Ajaxtxdy.sortscore.SortScoreDriver.class);
job.setMapperClass(SortScoreMapper.class);
job.setReducerClass(SortScoreReducer.class);
job.setMapOutputKeyClass(Score.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// _开头的文件在MapReduce中会认为是一个隐藏文件不被读取
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/result/totalscore/"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/sortscore"));
if (!job.waitForCompletion(true))
return;
}
}
例2:按照月份升序排序,如果月份一样,则按照利润降序排序
package cn.Ajaxtxdy.sortprofit;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class Profit implements WritableComparable<Profit> {
private int month;
private String name;
private int profit;
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getProfit() {
return profit;
}
public void setProfit(int profit) {
this.profit = profit;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(month);
out.writeUTF(name);
out.writeInt(profit);
}
@Override
public void readFields(DataInput in) throws IOException {
this.month = in.readInt();
this.name = in.readUTF();
this.profit = in.readInt();
}
// 先按照月份排序,如果月份一致则按照利润降序
@Override
public int compareTo(Profit o) {
int r1 = this.month - o.month;
if (r1 == 0) {
int r2 = o.profit - this.profit;
return r2 == 0 ? 1 : r2;
}
return r1;
}
@Override
public String toString() {
return "Profit [month=" + month + ", name=" + name + ", profit=" + profit + "]";
}
}
package cn.Ajaxtxdy.sortprofit;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SortProfitMapper extends Mapper<LongWritable, Text, Profit, NullWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] arr = value.toString().split(" ");
Profit p = new Profit();
p.setMonth(Integer.parseInt(arr[0]));
p.setName(arr[1]);
p.setProfit(Integer.parseInt(arr[2]));
context.write(p, NullWritable.get());
}
}
package cn.Ajaxtxdy.sortprofit;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class SortProfitReducer extends Reducer<Profit, NullWritable, Profit, NullWritable> {
public void reduce(Profit key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
package cn.Ajaxtxdy.sortprofit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortProfitDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(cn.Ajaxtxdy.sortprofit.SortProfitDriver.class);
job.setMapperClass(SortProfitMapper.class);
job.setReducerClass(SortProfitReducer.class);
job.setOutputKeyClass(Profit.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/profit3.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/sortprofit3"));
if (!job.waitForCompletion(true))
return;
}
}
四、合并 - Combine
1. 大多数情况下,MapTask的数量要远远多于ReduceTask的数量,导致计算压力几乎全部落在了ReduceTask上,ReduceTask的计算效率就成为整个MapReduce的瓶颈
2. 合并的逻辑和Reducer的逻辑是一样的 - 只需要在Driver中添加job.setCombinerClass(Reducer.class);
3. 合并的特点:减少数据总量但是不改变计算结果
4. 如果进行汇总、获取最值、去重之类操作可以使用Combiner,但是例如求平均之类的操作不能使用Combiner
C、细节
一、数据本地化策略
1. 当JobTracker收到MR程序的时候,会访问NameNode获取文件信息。文件信息包含文件大小以及块信息
2. JobTracker对这个文件进行切片处理。注意:切片是逻辑切分不是物理切分。切片数量决定了MapTask的数量。默认情况下,Split和Block是等大的
3. JobTracker会将划分出来的MapTask分配到TaskTracker上执行
4. 因为MapTask在执行过程中需要读取数据,而数据在DataNode上,所以将DataNode和TaskTracker部署在相同的节点上以减少跨集群的网络传输
5. 为了减少网络资源的消耗,在分配任务的时候会考虑Block的位置。哪个节点上有要处理的数据,将任务分配给哪个节点,这个过程称之为数据本地化

6. 切片产生过程:
a. 如果文件为空,则整个文件作为一个切片处理
b. 在MapReduce中,文件要区分可切或者不可切,例如绝大部分的压缩文件就是不可切的
c. 如果文件不可切,则整个文件作为一个切片处理
d. 如果需要减小splitsize,需要调小maxsize;如果需要调大splitsize,需要调大minsize
e. 在计算切片的时候,需要考虑切片阈值 - SPLIT_SLOP = 1.1
二、MR执行流程
1. 准备阶段:
a. 检查输入和输出路径
b. 计算切片数量
c. 如果有必要,设置缓存存根
d. 将jar包和配置上传到HDFS上
e. 将任务提交给JobTracker,并且可以选择是否监控这个任务
2. 执行阶段:
a. JobTracker收到Job任务之后,会将这个任务进行拆分,拆分成MapTask和ReduceTask。MapTask的数量由切片决定;ReduceTask的数量由分区数量决定
b. JobTracker在拆分完成任务之后,会等待TaskTracker的心跳,然后给TaskTracker分配任务。分配任务的时候,MapTask尽量满足数据本地化策略,ReduceTask无法满足数据本地化,所以ReduceTask在分配的时候是考虑节点的空闲
c. TaskTracker通过心跳领取任务,领取到任务之后,会去对应节点上下载jar包,这一步体现的思想是逻辑移动数据固定
d. TaskTracker会在本节点上开启JVM子进程执行MapTask或者ReduceTask。注意:每一个MapTask或者ReduceTask的执行都会开启一次JVM子进程
D、Shuffle
一、Map端的Shuffle
1. map方法在处理完成数据之后会将结果写出到MapTask自带的缓冲区中 - 每一个MapTask自带一个缓冲区 - MapOutputCollector
2. 数据在缓冲区中进行分区、排序,如果指定了Combiner,那么数据在缓冲区中还会进行combine。注意:在缓冲区中的排序是将完全无序的数据整理成有序数据,采取的是快速排序
3. 缓冲区是维系在内存中,默认是100M
4. 当缓冲区的使用达到一定限度(溢写阈值:0.8)的时候,会将缓冲区中的数据溢写(spill)到磁盘上,map方法后续产生的结果会继续写到缓冲区中
5. 每一次溢写都会产生一个新的溢写文件 - 单个溢写文件中的数据是分区且有序的,所有的溢写文件之间是局部有序的
6. 在map方法完成之后,将所有的溢写文件进行合并(merge),将所有的溢写文件合并成一个结果文件(final out),在merge过程中,数据会再次进行分区排序 - final out是整体分区且有序的。注意:merge过程中的排序是将局部有序变成整体有序,所以采用的是归并排序
7. 如果map方法执行完成之后,缓冲区中依然有数据,则会直接合并到最后的final out中
8. 在merge过程中,如果spill文件个数>=3并且指定了Combiner,则在merge的时候会再进行一次combine
9. 注意问题:
a. spill过程不一定产生
b. 默认情况下,溢写文件的大小不一定是80M,考虑序列化因素
c. 缓冲区本质上是一个环形的字节数组,设置为环形的目的是为了避免寻址,能够重复利用缓冲区
d. 阈值的作用是为了减少写入的阻塞
二、Reduce端的Shuffle
1. ReduceTask启动多个fetch线程去MapTask处抓取对应分区的数据
2. ReduceTask将从每一个MapTask上抓取过来的数据存储在一个本地文件中
3. 将抓取来数据进行一次merge,合并成一个大文件,在merge过程中,会再次进行排序,采用的是归并排序
4. merge完成之后,ReduceTask会再将相同的键对应的值放到一个迭代器中,这个过程称之为分组(group)
5. 分组完成之后,每一个键对应一个迭代器,每一个键调用一次reduce方法
6. 注意问题:
a. ReduceTask的启动阈值:0.05 - 当5%的MapTask结束,就会启动ReduceTask去抓取数据
b. fetch线程通过HTTP请求获取数据
c. fetch线程的数量默认为5
d. merge因子:10 - 每10个小文件合并成1个大文件
三、Shuffle的调优
1. 减少溢写次数:
a. 增大缓冲区,实际过程中缓冲区的大小一般是在250~400M之间
b. 增大缓冲区阈值,同时增加了写入阻塞的风险 - 不建议
c. 增加Combine的过程
2. 可以考虑将Map的结果文件进行压缩,这个方案是在网络资源和CPU资源之间的取舍
3. 增加fetch线程的数量
4. 增大merge因子,会增加底层计算的复杂度 - 不建议
5. 减小ReduceTask的启动阈值,增加了ReduceTask的阻塞风险 - 不建议
E、InputFormat

package cn.Ajaxtxdy.authinput;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
public class AuthInputFormat extends FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new AuthReader();
}
}
class AuthReader extends RecordReader<Text, Text> {
private static final Text blank = new Text(" ");
private LineReader reader;
private Text key = new Text();
private Text value = new Text();
// 初始化
// 在这个初始化方法中先获取到流
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// 从切片中确定要读取的文件
FileSplit fSplit = (FileSplit) split;
// 获取文件路径
Path p = fSplit.getPath();
// 连接HDFS
FileSystem fs = FileSystem.get(URI.create(p.toString()), context.getConfiguration());
// 获取到针对文件的输入流
InputStream in = fs.open(p);
// 将字节流转化为字符流 --- 这个字符流最好能按行读取
reader = new LineReader(in);
}
// 读取文件
// 如果读到了,则表示还有键和值需要处理
// 如果没有读到,则表示已经没有数据了
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// 按照当前说法,这个方法,只需要试着读取三行
// 如果读到了三行表示有数据需要处理,需要返回true
Text tmp = new Text();
// 表示会将读到这一行数据放入传的tmp中
// readLine方法返回值表示读取的这一行的字节数
if (reader.readLine(tmp) == 0)
return false;
// 读取完第一行,需要将第一行的数据作为键来使用
key.set(tmp.toString());
// 第二行和第三行拼接作为值来使用
// math 90
// english 98
// math 90 english 98
if (reader.readLine(tmp) == 0)
return false;
value.set(tmp.toString());
value.append(blank.getBytes(), 0, blank.getLength());
if (reader.readLine(tmp) == 0)
return false;
value.append(tmp.getBytes(), 0, tmp.getLength());
return true;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
// 获取执行进度
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
if (reader != null)
reader.close();
key = null;
value = null;
}
}
package cn.Ajaxtxdy.authinput;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Score implements Writable {
private int math;
private int english;
public int getMath() {
return math;
}
public void setMath(int math) {
this.math = math;
}
public int getEnglish() {
return english;
}
public void setEnglish(int english) {
this.english = english;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(math);
out.writeInt(english);
}
@Override
public void readFields(DataInput in) throws IOException {
this.math = in.readInt();
this.english = in.readInt();
}
}
package cn.Ajaxtxdy.authinput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class AuthMapper extends Mapper<Text, Text, Text, Score> {
public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
// key = tom
// value = math 90 english 98
String[] arr = value.toString().split(" ");
Score s = new Score();
s.setMath(Integer.parseInt(arr[1]));
s.setEnglish(Integer.parseInt(arr[3]));
context.write(key, s);
}
}
package cn.Ajaxtxdy.authinput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class AuthReducer extends Reducer<Text, Score, Text, IntWritable> {
public void reduce(Text key, Iterable<Score> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (Score val : values) {
sum = val.getMath() + val.getEnglish();
}
context.write(key, new IntWritable(sum));
}
}
package cn.Ajaxtxdy.authinput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
//import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class AuthDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapreduce.output.basename", "auth");
conf.set("mapreduce.output.textoutputformat.separator", "+++");
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(cn.tedu.authinput.AuthDriver.class);
job.setMapperClass(AuthMapper.class);
job.setReducerClass(AuthReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Score.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定输入格式
job.setInputFormatClass(AuthInputFormat.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/score3.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/authscore3"));
if (!job.waitForCompletion(true))
return;
}
}
F、其他细节
一、数据倾斜
1. 数据本身就有倾斜特性,即日常生活中所产生的数据本身就是不均等的
2. 实际过程中,绝大部分的数据倾斜都会产生在Reduce端
3. Map端产生倾斜的条件:多源输入、文件不可切且文件大小不均 - Map端的倾斜一旦产生无法解决 - 如果真的要解决,在特定条件下可以考虑缓存存根问题
4. Reduce端的倾斜的本质是因为数据的倾斜性,但是直观原因是因为对数据进行了分类 - 分类规则往往是不可变的,所以在实际过程中往往考虑的是使用两阶段聚合 - 数据先打散后聚合

例1
package cn.Ajaxtxdy.join;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JoinDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(cn.tedu.join.JoinDriver.class);
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Order.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
// 将小文件缓存,处理大文件
// 设置缓存存根
URI[] uri = { URI.create("hdfs://192.168.1.1:9000/txt/union/product.txt") };
job.setCacheFiles(uri);
// 输入路径中给定的应该是大文件
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/union/order.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/joinprice"));
if (!job.waitForCompletion(true))
return;
}
}
package cn.Ajaxtxdy.join;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Order implements Writable {
private String orderid;
private String date;
private String proid;
private int num;
private String name;
private double price;
public String getOrderid() {
return orderid;
}
public void setOrderid(String orderid) {
this.orderid = orderid;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getProid() {
return proid;
}
public void setProid(String proid) {
this.proid = proid;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderid);
out.writeUTF(date);
out.writeUTF(proid);
out.writeInt(num);
out.writeUTF(name);
out.writeDouble(price);
}
@Override
public void readFields(DataInput in) throws IOException {
this.orderid = in.readUTF();
this.date = in.readUTF();
this.proid = in.readUTF();
this.num = in.readInt();
this.name = in.readUTF();
this.price = in.readDouble();
}
}
package cn.Ajaxtxdy.join;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class JoinMapper extends Mapper<LongWritable, Text, Text, Order> {
private Map<String, Order> map = new HashMap<>();
// 在处理大文件的时候需要小文件中的数据
// 那也就意味着在处理大文件之前需要先把小文件解析
// 小文件解析一次之后放入内存中供我们进行查询
@Override
protected void setup(Mapper<LongWritable, Text, Text, Order>.Context context)
throws IOException, InterruptedException {
// 先将小文件从缓存中取出来
URI file = context.getCacheFiles()[0];
// 连接HDFS,读取小文件
FileSystem fs = FileSystem.get(file, context.getConfiguration());
// 获取到针对这个文件的输入流
InputStream in = fs.open(new Path(file.toString()));
// 考虑将字节流转化为字符流 --- 最好能够按行读取
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line;
while ((line = reader.readLine()) != null) {
// 1 chuizi 3999
String[] arr = line.split(" ");
Order o = new Order();
o.setProid(arr[0]);
o.setName(arr[1]);
o.setPrice(Double.parseDouble(arr[2]));
map.put(o.getProid(), o);
}
reader.close();
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1001 20170710 4 2
String[] arr = value.toString().split(" ");
Order o = new Order();
o.setOrderid(arr[0]);
o.setDate(arr[1]);
o.setProid(arr[2]);
o.setNum(Integer.parseInt(arr[3]));
o.setName(map.get(o.getProid()).getName());
o.setPrice(map.get(o.getProid()).getPrice());
context.write(new Text(o.getOrderid()), o);
}
}
package cn.Ajaxtxdy.join;
import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class JoinReducer extends Reducer<Text, Order, Text, DoubleWritable> {
public void reduce(Text key, Iterable<Order> values, Context context) throws IOException, InterruptedException {
double sum = 0;
for (Order val : values) {
sum = val.getNum() * val.getPrice();
}
context.write(key, new DoubleWritable(sum));
}
}
二、小文件
1. 小文件的危害:
a. 存储:大量小文件会产生大量的元数据,就导致内存被大量占用
b. 计算:大量小文件就产生大量的切片,大量切片则意味着有大量的MapTask,会导致服务器的执行效率变低甚至会导致服务器崩溃
2. 针对小文件的处理手段常见的有2种:合并和压缩
3. Hadoop提供了一种原生的合并手段:Hadoop Archive,将多个小文件打成一个har包
G、完全分布式架构
