• Hadoop-MapReduce


    A、简介
    一、概述
    1. MapReduce是Hadoop提供的一套用于进行分布式计算的框架
    2. 将计算过程拆分为2个阶段:Map(映射)阶段和Reduce(规约)阶段
    在这里插入图片描述
    MapReduce在对文件进行计算的时候,会先将文件进行切片(注:切片和切块不一样),每一个切片对应的
    MapTask默认情况下,每一个MapTask在拿到切片之后会进行按行读取按行处理。
    入门例子:
    统计字母的数量

    package cn.Ajaxtxdy.wordcount;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    // 需要指定Map阶段的处理逻辑
    // MapReduce中,要求所有的对象都能够被序列化
    // MapReduce所默认采取的序列化机制是AVRO
    // KEYIN - 输入的键的类型 - 默认输入的键表示的是一行数据的字节偏移量 
    // VALUEIN - 输入的值的类型 - 默认输入的值表示的是读取的一行数据
    // KEYOUT - 输出的键的类型
    // VALUEOUT - 输出的值的类型
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    	// key - 行偏移量
    	// value - 一行数据
    	// context - 用于数据传递和指定环境参数
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    			throws IOException, InterruptedException {
    
    		// hello tom hello bob
    		// 先将这一行中每一个单词拆分出来
    		String[] arr = value.toString().split(" ");
    		// hello 2 tom 1 bob 1
    		// hello 1 tom 1 hello 1 bob 1
    		for (String str : arr) {
    			context.write(new Text(str), new IntWritable(1));
    		}
    
    	}
    
    }
    
    
    package cn.Ajaxtxdy.wordcount;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    // Reduce的数据从Map来,那么也就意味着Map的输出就是Reduce的输入
    // KEYIN, VALUEIN
    // 最后应该是输出每一个单词对应的总次数
    // KEYOUT, VALUEOUT
    public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    	// key - 输入的键
    	// values - 键所对应的所有的值
    	// hello
    	// values = 1,1,1,1,1,1,1...
    	// context - 将结果写到HDFS上
    	@Override
    	protected void reduce(Text key, Iterable<IntWritable> values,
    			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    
    		int sum = 0;
    		for (IntWritable val : values) {
    			sum += val.get();
    		}
    
    		context.write(key, new IntWritable(sum));
    
    	}
    
    }
    
    
    package cn.Ajaxtxdy.wordcount;
    
    import java.io.IOException;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WordCountDriver {
    
    	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    
    		// System.setProperty("hadoop.home.dir","hadoop安装路径");
    
    		// 先向Hadoop申请一个job任务执行逻辑
    		Job job = Job.getInstance();
    
    		// 设置入口类
    		job.setJarByClass(WordCountDriver.class);
    		// 设置Mapper类
    		job.setMapperClass(WordCountMapper.class);
    		// 设置Reducer类
    		job.setReducerClass(WordCountReducer.class);
    
    		// 设置Mapper的输出类型
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    		// 设置Reducer的输出类型
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		// 设置输入文件
    		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/words.txt"));
    		// 设置输出路径
    		// 输出路径要求不存在
    		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/wordcount"));
    
    		// 启动执行
    		job.waitForCompletion(true);
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120

    例1:统计文件中每一个非空字符的个数

    package cn.Ajaxtxdy.charcount;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class CharCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    
    	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
    		// 拆分字符
    		char[] cs = value.toString().replaceAll("\\s", "").toCharArray();
    		for (char c : cs) {
    			context.write(new Text(c + ""), new LongWritable(1));
    		}
    
    	}
    
    }
    
    
    package cn.Ajaxtxdy.charcount;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class CharCountReducer 
    	extends Reducer<Text, LongWritable, Text, LongWritable> {
    
    	public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
    
    		long sum = 0;
    		for (LongWritable val : values) {
    			sum += val.get();
    		}
    		context.write(key, new LongWritable(sum));
    	}
    
    }
    
    
    package cn.Ajaxtxdy.charcount;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class CharCountDriver {
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf, "JobName");
    		job.setJarByClass(cn.Ajaxtxdy.charcount.CharCountDriver.class);
    		job.setMapperClass(CharCountMapper.class);
    		job.setReducerClass(CharCountReducer.class);
    
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(LongWritable.class);
    
    		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/characters.txt"));
    		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/charcount"));
    
    		if (!job.waitForCompletion(true))
    			return;
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    例2:IP去重

    package cn.Ajaxtxdy.ip;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class IPMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    
    	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
    		context.write(value, NullWritable.get());
    	}
    
    }
    
    package cn.Ajaxtxdy.ip;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class IPReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    
    	// key = IP
    	// value = null,null,null
    	public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
    		
    		context.write(key, NullWritable.get());
    		
    	}
    
    }
    
    package cn.Ajaxtxdy.ip;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class IPDriver {
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf, "JobName");
    		job.setJarByClass(cn.tedu.Ajaxtxdy.IPDriver.class);
    		job.setMapperClass(IPMapper.class);
    		job.setReducerClass(IPReducer.class);
    
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(NullWritable.class);
    
    		// 如果输入的路径是指定文件,则只读取这个文件
    		// 如果输入的路径是目录,则读取这个目录下所有的文件
    		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/ip.txt"));
    		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/ip"));
    
    		if (!job.waitForCompletion(true))
    			return;
    	}
    
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    例3:统计每一个篮球运动员的总得分

    package cn.Ajaxtxdy.totalscore;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class TotalScoreMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
    		// LBJ 39
    		String[] arr = value.toString().split(" ");
    		context.write(new Text(arr[0]), 
    				new IntWritable(Integer.parseInt(arr[1])));
    
    	}
    
    }
    
    
    package cn.Ajaxtxdy.totalscore;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class TotalScoreReducer 
    	extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    	public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    
    		int sum = 0;
    		for (IntWritable val : values) {
    			sum += val.get();
    		}
    		context.write(key, new IntWritable(sum));
    	}
    
    }
    
    
    package cn.Ajaxtxdy.totalscore;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class TotalScoreDriver {
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf, "JobName");
    		job.setJarByClass(cn.Ajaxtxdy.totalscore.TotalScoreDriver.class);
    		job.setMapperClass(TotalScoreMapper.class);
    		job.setReducerClass(TotalScoreReducer.class);
    
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/score2/"));
    		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/totalscore"));
    
    		if (!job.waitForCompletion(true))
    			return;
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    例4:获取最大值

    package cn.Ajaxtxdy.maxscore;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class MaxScoreMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
    		String[] arr = value.toString().split(" ");
    		context.write(new Text(arr[0]), new IntWritable(Integer.parseInt(arr[1])));
    
    	}
    
    }
    
    
    package cn.Ajaxtxdy.maxscore;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MaxScoreReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    	public void reduce(Text key, Iterable<IntWritable> values, Context context)
    			throws IOException, InterruptedException {
    
    		// 在MapReduce中,为了减少对象的创建和销毁,采用了地址复用机制
    		// 在迭代过程中,被迭代的对象只创建一次
    		IntWritable max = new IntWritable(0);
    		// key = LBJ
    		// values = 684 512 340 312
    		// IntWritable val = new IntWritable();
    		// val.set(684);
    		// val.get() > max.get() -> 684 > 0 -> true
    		// max = val; - 将val赋值给max,给的是地址,所以max和val的指向地址一致
    		// val.set(512);
    		// val.get() > max.get() -> 512 > 512 -> false
    		// 最后max的值是最后一个被迭代的值
    		for (IntWritable val : values) {
    			if (val.get() > max.get())
    				// max = val;
    				max.set(val.get());
    		}
    		context.write(key, max);
    	}
    
    }
    
    
    package cn.Ajaxtxdy.maxscore;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class MaxScoreDriver {
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf, "JobName");
    		job.setJarByClass(cn.Ajaxtxdy.maxscore.MaxScoreDriver.class);
    		job.setMapperClass(MaxScoreMapper.class);
    		job.setReducerClass(MaxScoreReducer.class);
    
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/score2.txt"));
    		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/maxscore"));
    
    		if (!job.waitForCompletion(true))
    			return;
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    B、组件
    一、序列化
    1. 在MapReduce中,要求数据能够被序列化
    2. MapReduce的序列化机制默认采用的AVRO
    3. MapReduce对AVRO的序列化机制进行了封装,提供了更简便的序列化形式 - 实现接口Writable

    例1:求总分

    package cn.Ajaxtxdy.serialscore;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    public class Score implements Writable {
    
    	private String name;
    	private int chinese;
    	private int math;
    	private int english;
    
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    	public int getChinese() {
    		return chinese;
    	}
    
    	public void setChinese(int chinese) {
    		this.chinese = chinese;
    	}
    
    	public int getMath() {
    		return math;
    	}
    
    	public void setMath(int math) {
    		this.math = math;
    	}
    
    	public int getEnglish() {
    		return english;
    	}
    
    	public void setEnglish(int english) {
    		this.english = english;
    	}
    
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		this.name = in.readUTF();
    		this.chinese = in.readInt();
    		this.math = in.readInt();
    		this.english = in.readInt();
    	}
    
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeUTF(name);
    		out.writeInt(chinese);
    		out.writeInt(math);
    		out.writeInt(english);
    	}
    
    }
    
    
    package cn.Ajaxtxdy.serialscore;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class SerialScoreMapper extends Mapper<LongWritable, Text, Text, Score> {
    
    	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    		String[] arr = value.toString().split(" ");
    		Score s = new Score();
    		s.setName(arr[0]);
    		s.setChinese(Integer.parseInt(arr[1]));
    		s.setMath(Integer.parseInt(arr[2]));
    		s.setEnglish(Integer.parseInt(arr[3]));
    		context.write(new Text(s.getName()), s);
    	}
    
    }
    
    
    package cn.Ajaxtxdy.serialscore;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class SerialScoreReducer extends Reducer<Text, Score, Text, IntWritable> {
    
    	public void reduce(Text key, Iterable<Score> values, Context context) throws IOException, InterruptedException {
    		int sum = 0;
    		for (Score val : values) {
    			sum = val.getChinese() + val.getMath() + val.getEnglish();
    		}
    		context.write(key, new IntWritable(sum));
    	}
    
    }
    
    
    package cn.Ajaxtxdy.serialscore;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class SerialScoreDriver {
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf, "JobName");
    		job.setJarByClass(cn.Ajaxtxdy.serialscore.SerialScoreDriver.class);
    		job.setMapperClass(SerialScoreMapper.class);
    		job.setReducerClass(SerialScoreReducer.class);
    
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Score.class);
    
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/score.txt"));
    		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/serialscore"));
    
    		if (!job.waitForCompletion(true))
    			return;
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144

    二、分区 - Partition
    1. 分区的作用是将数据进行分类
    2. 有无分区并不影响Map和Reduce的执行逻辑
    3. 分区默认是从0开始依次递增的
    4. 在MapReduce中,每一个分区要对应一个ReduceTask,每一个ReduceTask都会产生一个结果文件。默认情况下只有1个分区,也就只有1个ReduceTask,只产生一个结果文件
    5. 在MapReduce中,如果没有手动指定Partitioner,那么默认使用的分区类是HashPartitioner
    在这里插入图片描述

    例子1:按地区统计每一个人的总流量

    package cn.Ajaxtxdy.partflow;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    public class Flow implements Writable {
    
    	private String phone;
    	private String addr;
    	private String name;
    	private int flow;
    
    	public String getPhone() {
    		return phone;
    	}
    
    	public void setPhone(String phone) {
    		this.phone = phone;
    	}
    
    	public String getAddr() {
    		return addr;
    	}
    
    	public void setAddr(String addr) {
    		this.addr = addr;
    	}
    
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    	public int getFlow() {
    		return flow;
    	}
    
    	public void setFlow(int flow) {
    		this.flow = flow;
    	}
    
    	// 反序列化
    	// 按照什么顺序写的,就得按照什么顺序读
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		this.phone = in.readUTF();
    		this.addr = in.readUTF();
    		this.name = in.readUTF();
    		this.flow = in.readInt();
    	}
    
    	// 序列化
    	// 只需要将有必要的属性来依次写出即可序列化
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeUTF(phone);
    		out.writeUTF(addr);
    		out.writeUTF(name);
    		out.writeInt(flow);
    	}
    
    }
    
    
    package cn.Ajaxtxdy.partflow;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class AddrPartitioner extends Partitioner<Text, Flow> {
    
    	// 指定分类规则
    	@Override
    	public int getPartition(Text key, Flow value, int numReduceTasks) {
    
    		// 按照地区分类
    		// 先拿到地区
    		String addr = value.getAddr();
    
    		if (addr.equals("bj"))
    			return 0;
    		else if (addr.equals("sh"))
    			return 1;
    		else
    			return 2;
    
    	}
    
    }
    
    
    package cn.Ajaxtxdy.partflow;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class PartFlowMapper extends Mapper<LongWritable, Text, Text, Flow> {
    
    	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
    		String[] arr = value.toString().split(" ");
    		Flow f = new Flow();
    		f.setPhone(arr[0]);
    		f.setAddr(arr[1]);
    		f.setName(arr[2]);
    		f.setFlow(Integer.parseInt(arr[3]));
    
    		context.write(new Text(f.getName()), f);
    
    	}
    
    }
    
    
    package cn.Ajaxtxdy.partflow;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class PartFlowReducer extends Reducer<Text, Flow, Text, IntWritable> {
    
    	public void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException {
    
    		int sum = 0;
    		for (Flow val : values) {
    			sum += val.getFlow();
    		}
    		context.write(key, new IntWritable(sum));
    	}
    
    }
    
    
    package cn.Ajaxtxdy.partflow;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class PartFlowDriver {
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf, "JobName");
    		job.setJarByClass(cn.Ajaxtxdy.serialflow.SerialFlowDriver.class);
    		job.setMapperClass(PartFlowMapper.class);
    		job.setReducerClass(PartFlowReducer.class);
    
    		// 设置分区类
    		job.setPartitionerClass(AddrPartitioner.class);
    		// 设置ReduceTask的数量
    		job.setNumReduceTasks(3);
    
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Flow.class);
    
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/flow.txt"));
    		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/partflow"));
    
    		if (!job.waitForCompletion(true))
    			return;
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184

    例2:按月份统计每一个人的总成绩

    package cn.Ajaxtxdy.partscore;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    public class Score implements Writable {
    
    	private int month;
    	private String name;
    	private int score;
    
    	public int getMonth() {
    		return month;
    	}
    
    	public void setMonth(int month) {
    		this.month = month;
    	}
    
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    	public int getScore() {
    		return score;
    	}
    
    	public void setScore(int score) {
    		this.score = score;
    	}
    
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		this.month = in.readInt();
    		this.name = in.readUTF();
    		this.score = in.readInt();
    	}
    
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeInt(month);
    		out.writeUTF(name);
    		out.writeInt(score);
    	}
    
    }
    
    
    package cn.Ajaxtxdy.partscore;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class PartScoreMapper extends Mapper<LongWritable, Text, Text, Score> {
    
    	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
    		String[] arr = value.toString().split(" ");
    		Score s = new Score();
    		s.setMonth(Integer.parseInt(arr[0]));
    		s.setName(arr[1]);
    		s.setScore(Integer.parseInt(arr[2]));
    
    		context.write(new Text(s.getName()), s);
    
    	}
    
    }
    
    
    package cn.Ajaxtxdy.partscore;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class PartScoreReducer extends Reducer<Text, Score, Text, IntWritable> {
    
    	public void reduce(Text key, Iterable<Score> values, Context context) throws IOException, InterruptedException {
    		int sum = 0;
    		for (Score val : values) {
    			sum += val.getScore();
    		}
    		context.write(key, new IntWritable(sum));
    	}
    
    }
    
    
    package cn.Ajaxtxdy.partscore;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class MonthPartitioner extends Partitioner<Text, Score> {
    
    	@Override
    	public int getPartition(Text key, Score value, int numReduceTasks) {
    
    		// 按月份进行分类
    		// 1 2 3
    		// 0 1 2
    		return value.getMonth() - 1;
    
    	}
    
    }
    
    
    
    package cn.Ajaxtxdy.partscore;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class PartScoreDriver {
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf, "JobName");
    		job.setJarByClass(cn.Ajaxtxdy.partscore.PartScoreDriver.class);
    		job.setMapperClass(PartScoreMapper.class);
    		job.setReducerClass(PartScoreReducer.class);
    
    		job.setPartitionerClass(MonthPartitioner.class);
    		job.setNumReduceTasks(3);
    
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Score.class);
    
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/score1/"));
    		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/partscore"));
    
    		if (!job.waitForCompletion(true))
    			return;
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159

    三、排序
    1. 在MapReduce中,自动对键进行排序
    2. 要求键所对应的类必须实现Comparable接口,但是考虑到键需要序列化,所以一般实现的WritableComparable
    3. 在排序过程中,如果compareTo方法的返回值为0,则MapReduce会认为这两个键是同一个,则将这两个键的值放到一组,相当于去重过程

    例1:按分数升序排序

    package cn.Ajaxtxdy.sortscore;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class Score implements WritableComparable<Score> {
    
    	private String name;
    	private int score;
    
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    	public int getScore() {
    		return score;
    	}
    
    	public void setScore(int score) {
    		this.score = score;
    	}
    
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeUTF(name);
    		out.writeInt(score);
    	}
    
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		this.name = in.readUTF();
    		this.score = in.readInt();
    	}
    
    	// 按照分数降序排序
    	@Override
    	public int compareTo(Score o) {
    		return o.score - this.score;
    	}
    
    }
    
    
    package cn.Ajaxtxdy.sortscore;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class SortScoreMapper extends Mapper<LongWritable, Text, Score, NullWritable> {
    
    	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
    		String[] arr = value.toString().split("\t");
    		Score s = new Score();
    		s.setName(arr[0]);
    		s.setScore(Integer.parseInt(arr[1]));
    
    		context.write(s, NullWritable.get());
    
    	}
    
    }
    
    
    package cn.Ajaxtxdy.sortscore;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class SortScoreReducer extends Reducer<Score, NullWritable, Text, IntWritable> {
    
    	public void reduce(Score key, Iterable<NullWritable> values, Context context)
    			throws IOException, InterruptedException {
    		context.write(new Text(key.getName()), new IntWritable(key.getScore()));
    
    	}
    
    }
    
    
    package cn.Ajaxtxdy.sortscore;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class SortScoreDriver {
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf, "JobName");
    		job.setJarByClass(cn.Ajaxtxdy.sortscore.SortScoreDriver.class);
    		job.setMapperClass(SortScoreMapper.class);
    		job.setReducerClass(SortScoreReducer.class);
    
    		job.setMapOutputKeyClass(Score.class);
    		job.setMapOutputValueClass(NullWritable.class);
    
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		// _开头的文件在MapReduce中会认为是一个隐藏文件不被读取
    		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/result/totalscore/"));
    		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/sortscore"));
    
    		if (!job.waitForCompletion(true))
    			return;
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131

    例2:按照月份升序排序,如果月份一样,则按照利润降序排序

    package cn.Ajaxtxdy.sortprofit;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class Profit implements WritableComparable<Profit> {
    
    	private int month;
    	private String name;
    	private int profit;
    
    	public int getMonth() {
    		return month;
    	}
    
    	public void setMonth(int month) {
    		this.month = month;
    	}
    
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    	public int getProfit() {
    		return profit;
    	}
    
    	public void setProfit(int profit) {
    		this.profit = profit;
    	}
    
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeInt(month);
    		out.writeUTF(name);
    		out.writeInt(profit);
    	}
    
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		this.month = in.readInt();
    		this.name = in.readUTF();
    		this.profit = in.readInt();
    	}
    
    	// 先按照月份排序,如果月份一致则按照利润降序
    	@Override
    	public int compareTo(Profit o) {
    
    		int r1 = this.month - o.month;
    
    		if (r1 == 0) {
    			int r2 = o.profit - this.profit;
    			return r2 == 0 ? 1 : r2;
    		}
    
    		return r1;
    	}
    
    	@Override
    	public String toString() {
    		return "Profit [month=" + month + ", name=" + name + ", profit=" + profit + "]";
    	}
    
    }
    
    
    package cn.Ajaxtxdy.sortprofit;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class SortProfitMapper extends Mapper<LongWritable, Text, Profit, NullWritable> {
    
    	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
    		String[] arr = value.toString().split(" ");
    		Profit p = new Profit();
    		p.setMonth(Integer.parseInt(arr[0]));
    		p.setName(arr[1]);
    		p.setProfit(Integer.parseInt(arr[2]));
    		context.write(p, NullWritable.get());
    	}
    
    }
    
    
    package cn.Ajaxtxdy.sortprofit;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class SortProfitReducer extends Reducer<Profit, NullWritable, Profit, NullWritable> {
    
    	public void reduce(Profit key, Iterable<NullWritable> values, Context context)
    			throws IOException, InterruptedException {
    		context.write(key, NullWritable.get());
    	}
    
    }
    
    
    package cn.Ajaxtxdy.sortprofit;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class SortProfitDriver {
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf, "JobName");
    		job.setJarByClass(cn.Ajaxtxdy.sortprofit.SortProfitDriver.class);
    		job.setMapperClass(SortProfitMapper.class);
    		job.setReducerClass(SortProfitReducer.class);
    
    		job.setOutputKeyClass(Profit.class);
    		job.setOutputValueClass(NullWritable.class);
    
    		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/profit3.txt"));
    		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/sortprofit3"));
    
    		if (!job.waitForCompletion(true))
    			return;
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145

    四、合并 - Combine
    1. 大多数情况下,MapTask的数量要远远多于ReduceTask的数量,导致计算压力几乎全部落在了ReduceTask上,ReduceTask的计算效率就成为整个MapReduce的瓶颈
    2. 合并的逻辑和Reducer的逻辑是一样的 - 只需要在Driver中添加job.setCombinerClass(Reducer.class);
    3. 合并的特点:减少数据总量但是不改变计算结果
    4. 如果进行汇总、获取最值、去重之类操作可以使用Combiner,但是例如求平均之类的操作不能使用Combiner

    C、细节
    一、数据本地化策略
    1. 当JobTracker收到MR程序的时候,会访问NameNode获取文件信息。文件信息包含文件大小以及块信息
    2. JobTracker对这个文件进行切片处理。注意:切片是逻辑切分不是物理切分。切片数量决定了MapTask的数量。默认情况下,Split和Block是等大的
    3. JobTracker会将划分出来的MapTask分配到TaskTracker上执行
    4. 因为MapTask在执行过程中需要读取数据,而数据在DataNode上,所以将DataNode和TaskTracker部署在相同的节点上以减少跨集群的网络传输
    5. 为了减少网络资源的消耗,在分配任务的时候会考虑Block的位置。哪个节点上有要处理的数据,将任务分配给哪个节点,这个过程称之为数据本地化
    在这里插入图片描述
    6. 切片产生过程:
    a. 如果文件为空,则整个文件作为一个切片处理
    b. 在MapReduce中,文件要区分可切或者不可切,例如绝大部分的压缩文件就是不可切的
    c. 如果文件不可切,则整个文件作为一个切片处理
    d. 如果需要减小splitsize,需要调小maxsize;如果需要调大splitsize,需要调大minsize
    e. 在计算切片的时候,需要考虑切片阈值 - SPLIT_SLOP = 1.1

    二、MR执行流程
    1. 准备阶段:
    a. 检查输入和输出路径
    b. 计算切片数量
    c. 如果有必要,设置缓存存根
    d. 将jar包和配置上传到HDFS上
    e. 将任务提交给JobTracker,并且可以选择是否监控这个任务
    2. 执行阶段:
    a. JobTracker收到Job任务之后,会将这个任务进行拆分,拆分成MapTask和ReduceTask。MapTask的数量由切片决定;ReduceTask的数量由分区数量决定
    b. JobTracker在拆分完成任务之后,会等待TaskTracker的心跳,然后给TaskTracker分配任务。分配任务的时候,MapTask尽量满足数据本地化策略,ReduceTask无法满足数据本地化,所以ReduceTask在分配的时候是考虑节点的空闲
    c. TaskTracker通过心跳领取任务,领取到任务之后,会去对应节点上下载jar包,这一步体现的思想是逻辑移动数据固定
    d. TaskTracker会在本节点上开启JVM子进程执行MapTask或者ReduceTask。注意:每一个MapTask或者ReduceTask的执行都会开启一次JVM子进程

    D、Shuffle
    一、Map端的Shuffle
    1. map方法在处理完成数据之后会将结果写出到MapTask自带的缓冲区中 - 每一个MapTask自带一个缓冲区 - MapOutputCollector
    2. 数据在缓冲区中进行分区、排序,如果指定了Combiner,那么数据在缓冲区中还会进行combine。注意:在缓冲区中的排序是将完全无序的数据整理成有序数据,采取的是快速排序
    3. 缓冲区是维系在内存中,默认是100M
    4. 当缓冲区的使用达到一定限度(溢写阈值:0.8)的时候,会将缓冲区中的数据溢写(spill)到磁盘上,map方法后续产生的结果会继续写到缓冲区中
    5. 每一次溢写都会产生一个新的溢写文件 - 单个溢写文件中的数据是分区且有序的,所有的溢写文件之间是局部有序的
    6. 在map方法完成之后,将所有的溢写文件进行合并(merge),将所有的溢写文件合并成一个结果文件(final out),在merge过程中,数据会再次进行分区排序 - final out是整体分区且有序的。注意:merge过程中的排序是将局部有序变成整体有序,所以采用的是归并排序
    7. 如果map方法执行完成之后,缓冲区中依然有数据,则会直接合并到最后的final out中
    8. 在merge过程中,如果spill文件个数>=3并且指定了Combiner,则在merge的时候会再进行一次combine
    9. 注意问题:
    a. spill过程不一定产生
    b. 默认情况下,溢写文件的大小不一定是80M,考虑序列化因素
    c. 缓冲区本质上是一个环形的字节数组,设置为环形的目的是为了避免寻址,能够重复利用缓冲区
    d. 阈值的作用是为了减少写入的阻塞

    二、Reduce端的Shuffle
    1. ReduceTask启动多个fetch线程去MapTask处抓取对应分区的数据
    2. ReduceTask将从每一个MapTask上抓取过来的数据存储在一个本地文件中
    3. 将抓取来数据进行一次merge,合并成一个大文件,在merge过程中,会再次进行排序,采用的是归并排序
    4. merge完成之后,ReduceTask会再将相同的键对应的值放到一个迭代器中,这个过程称之为分组(group)
    5. 分组完成之后,每一个键对应一个迭代器,每一个键调用一次reduce方法
    6. 注意问题:
    a. ReduceTask的启动阈值:0.05 - 当5%的MapTask结束,就会启动ReduceTask去抓取数据
    b. fetch线程通过HTTP请求获取数据
    c. fetch线程的数量默认为5
    d. merge因子:10 - 每10个小文件合并成1个大文件

    三、Shuffle的调优
    1. 减少溢写次数:
    a. 增大缓冲区,实际过程中缓冲区的大小一般是在250~400M之间
    b. 增大缓冲区阈值,同时增加了写入阻塞的风险 - 不建议
    c. 增加Combine的过程
    2. 可以考虑将Map的结果文件进行压缩,这个方案是在网络资源和CPU资源之间的取舍
    3. 增加fetch线程的数量
    4. 增大merge因子,会增加底层计算的复杂度 - 不建议
    5. 减小ReduceTask的启动阈值,增加了ReduceTask的阻塞风险 - 不建议

    E、InputFormat

    1. InputFormat中定义了2个抽象方法:
      a. getSplits用于产生切片
      b. createRecordReader产生输入流读取切片
    2. InputFormat会把结果给到MapTask
      在这里插入图片描述
      3. 实际过程中,如果需要自定义输入格式类,一般不是直接继承InputFormat而是继承它的子类FileInputFormat,这个子类中已经覆盖了getSplits方法,而只需要考虑如何读取数据即可
      4. 如果没有指定输入格式,那么默认使用的TextInputFormat。除了第一个切片对应的MapTask以外,其余的MapTask都是从当前切片的第二行开始读取到下一个切片的第一个行
      5. 多源输入下,允许输入不同格式的文件,但是文件格式可以不同Mapper类也可以不一样,但是最后交给Reducer处理的时候要一样
      例1:
    package cn.Ajaxtxdy.authinput;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URI;
    
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.util.LineReader;
    
    public class AuthInputFormat extends FileInputFormat<Text, Text> {
    
    	@Override
    	public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
    			throws IOException, InterruptedException {
    		return new AuthReader();
    	}
    
    }
    
    class AuthReader extends RecordReader<Text, Text> {
    
    	private static final Text blank = new Text(" ");
    	private LineReader reader;
    	private Text key = new Text();
    	private Text value = new Text();
    
    	// 初始化
    	// 在这个初始化方法中先获取到流
    	@Override
    	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    
    		// 从切片中确定要读取的文件
    		FileSplit fSplit = (FileSplit) split;
    		// 获取文件路径
    		Path p = fSplit.getPath();
    		// 连接HDFS
    		FileSystem fs = FileSystem.get(URI.create(p.toString()), context.getConfiguration());
    		// 获取到针对文件的输入流
    		InputStream in = fs.open(p);
    		// 将字节流转化为字符流 --- 这个字符流最好能按行读取
    		reader = new LineReader(in);
    
    	}
    
    	// 读取文件
    	// 如果读到了,则表示还有键和值需要处理
    	// 如果没有读到,则表示已经没有数据了
    	@Override
    	public boolean nextKeyValue() throws IOException, InterruptedException {
    		// 按照当前说法,这个方法,只需要试着读取三行
    		// 如果读到了三行表示有数据需要处理,需要返回true
    
    		Text tmp = new Text();
    		// 表示会将读到这一行数据放入传的tmp中
    		// readLine方法返回值表示读取的这一行的字节数
    		if (reader.readLine(tmp) == 0)
    			return false;
    		// 读取完第一行,需要将第一行的数据作为键来使用
    		key.set(tmp.toString());
    		// 第二行和第三行拼接作为值来使用
    		// math 90
    		// english 98
    		// math 90 english 98
    		if (reader.readLine(tmp) == 0)
    			return false;
    		value.set(tmp.toString());
    		value.append(blank.getBytes(), 0, blank.getLength());
    		if (reader.readLine(tmp) == 0)
    			return false;
    		value.append(tmp.getBytes(), 0, tmp.getLength());
    
    		return true;
    	}
    
    	@Override
    	public Text getCurrentKey() throws IOException, InterruptedException {
    		return key;
    	}
    
    	@Override
    	public Text getCurrentValue() throws IOException, InterruptedException {
    		return value;
    	}
    
    	// 获取执行进度
    	@Override
    	public float getProgress() throws IOException, InterruptedException {
    		return 0;
    	}
    
    	@Override
    	public void close() throws IOException {
    		if (reader != null)
    			reader.close();
    		key = null;
    		value = null;
    	}
    
    }
    
    
    package cn.Ajaxtxdy.authinput;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    public class Score implements Writable {
    
    	private int math;
    	private int english;
    
    	public int getMath() {
    		return math;
    	}
    
    	public void setMath(int math) {
    		this.math = math;
    	}
    
    	public int getEnglish() {
    		return english;
    	}
    
    	public void setEnglish(int english) {
    		this.english = english;
    	}
    
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeInt(math);
    		out.writeInt(english);
    	}
    
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		this.math = in.readInt();
    		this.english = in.readInt();
    	}
    
    }
    
    
    package cn.Ajaxtxdy.authinput;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class AuthMapper extends Mapper<Text, Text, Text, Score> {
    
    	public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
    
    		// key = tom
    		// value = math 90 english 98
    		String[] arr = value.toString().split(" ");
    		Score s = new Score();
    		s.setMath(Integer.parseInt(arr[1]));
    		s.setEnglish(Integer.parseInt(arr[3]));
    		context.write(key, s);
    
    	}
    
    }
    
    
    package cn.Ajaxtxdy.authinput;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class AuthReducer extends Reducer<Text, Score, Text, IntWritable> {
    
    	public void reduce(Text key, Iterable<Score> values, Context context) throws IOException, InterruptedException {
    
    		int sum = 0;
    		for (Score val : values) {
    			sum = val.getMath() + val.getEnglish();
    		}
    		context.write(key, new IntWritable(sum));
    	}
    
    }
    
    
    package cn.Ajaxtxdy.authinput;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    //import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class AuthDriver {
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		conf.set("mapreduce.output.basename", "auth");
    		conf.set("mapreduce.output.textoutputformat.separator", "+++");
    		Job job = Job.getInstance(conf, "JobName");
    		job.setJarByClass(cn.tedu.authinput.AuthDriver.class);
    		job.setMapperClass(AuthMapper.class);
    		job.setReducerClass(AuthReducer.class);
    
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Score.class);
    
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		// 指定输入格式
    		job.setInputFormatClass(AuthInputFormat.class);
    
    
    		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/score3.txt"));
    		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/authscore3"));
    
    		if (!job.waitForCompletion(true))
    			return;
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239

    F、其他细节
    一、数据倾斜
    1. 数据本身就有倾斜特性,即日常生活中所产生的数据本身就是不均等的
    2. 实际过程中,绝大部分的数据倾斜都会产生在Reduce端
    3. Map端产生倾斜的条件:多源输入、文件不可切且文件大小不均 - Map端的倾斜一旦产生无法解决 - 如果真的要解决,在特定条件下可以考虑缓存存根问题
    4. Reduce端的倾斜的本质是因为数据的倾斜性,但是直观原因是因为对数据进行了分类 - 分类规则往往是不可变的,所以在实际过程中往往考虑的是使用两阶段聚合 - 数据先打散后聚合
    在这里插入图片描述
    例1

    package cn.Ajaxtxdy.join;
    
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class JoinDriver {
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf, "JobName");
    		job.setJarByClass(cn.tedu.join.JoinDriver.class);
    		job.setMapperClass(JoinMapper.class);
    		job.setReducerClass(JoinReducer.class);
    
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Order.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(DoubleWritable.class);
    
    		// 将小文件缓存,处理大文件
    		// 设置缓存存根
    		URI[] uri = { URI.create("hdfs://192.168.1.1:9000/txt/union/product.txt") };
    		job.setCacheFiles(uri);
    		// 输入路径中给定的应该是大文件
    		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.1:9000/txt/union/order.txt"));
    		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.1:9000/result/joinprice"));
    
    		if (!job.waitForCompletion(true))
    			return;
    	}
    
    }
    
    
    package cn.Ajaxtxdy.join;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    public class Order implements Writable {
    
    	private String orderid;
    	private String date;
    	private String proid;
    	private int num;
    	private String name;
    	private double price;
    
    	public String getOrderid() {
    		return orderid;
    	}
    
    	public void setOrderid(String orderid) {
    		this.orderid = orderid;
    	}
    
    	public String getDate() {
    		return date;
    	}
    
    	public void setDate(String date) {
    		this.date = date;
    	}
    
    	public String getProid() {
    		return proid;
    	}
    
    	public void setProid(String proid) {
    		this.proid = proid;
    	}
    
    	public int getNum() {
    		return num;
    	}
    
    	public void setNum(int num) {
    		this.num = num;
    	}
    
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    	public double getPrice() {
    		return price;
    	}
    
    	public void setPrice(double price) {
    		this.price = price;
    	}
    
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeUTF(orderid);
    		out.writeUTF(date);
    		out.writeUTF(proid);
    		out.writeInt(num);
    		out.writeUTF(name);
    		out.writeDouble(price);
    	}
    
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		this.orderid = in.readUTF();
    		this.date = in.readUTF();
    		this.proid = in.readUTF();
    		this.num = in.readInt();
    		this.name = in.readUTF();
    		this.price = in.readDouble();
    
    	}
    
    }
    
    
    package cn.Ajaxtxdy.join;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class JoinMapper extends Mapper<LongWritable, Text, Text, Order> {
    
    	private Map<String, Order> map = new HashMap<>();
    
    	// 在处理大文件的时候需要小文件中的数据
    	// 那也就意味着在处理大文件之前需要先把小文件解析
    	// 小文件解析一次之后放入内存中供我们进行查询
    	@Override
    	protected void setup(Mapper<LongWritable, Text, Text, Order>.Context context)
    			throws IOException, InterruptedException {
    		// 先将小文件从缓存中取出来
    		URI file = context.getCacheFiles()[0];
    		// 连接HDFS,读取小文件
    		FileSystem fs = FileSystem.get(file, context.getConfiguration());
    		// 获取到针对这个文件的输入流
    		InputStream in = fs.open(new Path(file.toString()));
    		// 考虑将字节流转化为字符流 --- 最好能够按行读取
    		BufferedReader reader = new BufferedReader(new InputStreamReader(in));
    		String line;
    		while ((line = reader.readLine()) != null) {
    
    			// 1 chuizi 3999
    			String[] arr = line.split(" ");
    			Order o = new Order();
    			o.setProid(arr[0]);
    			o.setName(arr[1]);
    			o.setPrice(Double.parseDouble(arr[2]));
    			map.put(o.getProid(), o);
    
    		}
    		reader.close();
    	}
    
    	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
    		// 1001 20170710 4 2
    		String[] arr = value.toString().split(" ");
    		Order o = new Order();
    		o.setOrderid(arr[0]);
    		o.setDate(arr[1]);
    		o.setProid(arr[2]);
    		o.setNum(Integer.parseInt(arr[3]));
    		o.setName(map.get(o.getProid()).getName());
    		o.setPrice(map.get(o.getProid()).getPrice());
    		context.write(new Text(o.getOrderid()), o);
    	}
    
    }
    
    
    package cn.Ajaxtxdy.join;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class JoinReducer extends Reducer<Text, Order, Text, DoubleWritable> {
    
    	public void reduce(Text key, Iterable<Order> values, Context context) throws IOException, InterruptedException {
    
    		double sum = 0;
    		for (Order val : values) {
    			sum = val.getNum() * val.getPrice();
    		}
    		context.write(key, new DoubleWritable(sum));
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217

    二、小文件
    1. 小文件的危害:
    a. 存储:大量小文件会产生大量的元数据,就导致内存被大量占用
    b. 计算:大量小文件就产生大量的切片,大量切片则意味着有大量的MapTask,会导致服务器的执行效率变低甚至会导致服务器崩溃
    2. 针对小文件的处理手段常见的有2种:合并和压缩
    3. Hadoop提供了一种原生的合并手段:Hadoop Archive,将多个小文件打成一个har包

    G、完全分布式架构
    在这里插入图片描述

  • 相关阅读:
    Java 集合面试题小结(1)
    达梦数据库V8(启动数据库实例服务)
    SpringBoot统一返回值与actuator的矛盾
    A First Look At Java
    vlan简单实验
    Spring Cloud Alibaba 容器化部署最佳实践 | 本地部署版本 | Rocketmq组件安装
    php——三篇夯实根基第一篇
    Redux 源码解析
    MySQL安装validate_password_policy插件
    杰理之MIC 免电容方案需要设置【篇】
  • 原文地址:https://blog.csdn.net/Ajaxtxdy/article/details/126593272