WCDriver
- package com.atguigu.mr.wordcount;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- import java.io.IOException;
-
- /*
- 程序的入口
- 1.创建job实例并允许
- */
- public class WCDriver {
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- // 创建job实例
- Configuration conf=new Configuration();
- Job job=Job.getInstance(conf);
-
- // 给job赋值
- // 关联本程序的jar 运行必须写
- job.setJarByClass(WCDriver.class);
- // 设置mapper reduce类
- job.setMapperClass(WCMapper.class);
- job.setReducerClass(WCReducer.class);
- // 设置mapper输出的key value的类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
- // 设置最终输出的key value类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
-
- /*
- 设置输出路径
- windows:
- FileInputFormat.setInputPaths(job,new Path("//"));
- FileInputFormat.setOutputPaths(job,new Path("//"));
- */
- FileInputFormat.setInputPaths(job,new Path(args[0]));
- FileOutputFormat.setOutputPath(job,new Path(args[1]));
- // FileInputFormat.setInputPaths(job,new Path("F:\\input"));
- // FileOutputFormat.setOutputPath(job,new Path("F:\\aa\\output"));
- // 运行job
- boolean b=job.waitForCompletion(true);
- System.out.println("b===="+ b);
-
-
-
- }
- }
WCMapper
- package com.atguigu.mr.wordcount;
-
- import org.apache.commons.net.imap.IMAP;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.xbill.DNS.LOCRecord;
-
- import java.io.IOException;
-
- /*
- mapper阶段会运行MapTask -会调用Mappper类
- 在该类中实现业务逻辑
- */
- public class WCMapper extends Mapper
{ -
- private Text outKey=new Text();
- private LongWritable outValue=new LongWritable();
-
- /**
- *
- * @param key 读取数据时的偏移量
- * @param value 读取的数据
- * @param context 上下文
- * @throws IOException
- * @throws InterruptedException
- */
-
- @Override
- protected void map(LongWritable key, Text value, Mapper
.Context context) throws IOException, InterruptedException { - // super.map(key, value, context);
- // 1.将数据进行切割
-
-
-
- // 1.1将Text转换成string---为了使用String API
- String line=value.toString();
- // 1.2对数据切割
- String[] words =line.split("");
-
- // 2.遍历数据
- for (String word:words){
- // 3.封装key,value
- // 创建key,value对象
-
-
- // 赋值
- outKey.set(word);
- outValue.set(1);
- // 4.将 key,value写进去
- context.write(outKey,outValue);
-
-
- }
- //
- //
-
- }
- }
WCReduce
- package com.atguigu.mr.wordcount;
-
- import org.apache.hadoop.classification.InterfaceAudience;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
-
- import java.io.IOException;
-
- /*
- reduce阶段会运行reduceTask -会调用reducer类
- 在该类中实现业务逻辑
- */
- public class WCReducer extends Reducer
{ - private LongWritable outValue=new LongWritable();
- /**
- *
- * @param key
- * @param values
- * @param context
- * @throws IOException
- * @throws InterruptedException
- */
- @Override
- protected void reduce(Text key, Iterable
values, Reducer.Context context) throws IOException, InterruptedException { - // super.reduce(key, values, context);
- // super.reduce(key, values, context);
- long sum =0; //value的和
- // 遍历所有的value
- for (LongWritable value : values){
- long v=value.get();
- // 累加
- sum+=v;
-
- }
- outValue.set(sum);
- context.write(key,outValue);
- }
- }