• 大数据培训MapReduce扩展案例倒排索引案例(多job串联)


    MapReduce扩展案例

    倒排索引案例(多job串联)

    1.需求

    有大量的文本(文档、网页),需要建立搜索索引,如图4-31所示。

    (1)数据输入

    大数据培训

    (2)期望输出数据

    atguigu   c.txt–>2 b.txt–>2 a.txt–>3

    pingping c.txt–>1 b.txt–>3 a.txt–>1

    ss    c.txt–>1 b.txt–>1 a.txt–>2

    2.需求分析

    大数据培训

    3.第一次处理

    (1)第一次处理,编写OneIndexMapper类

    package com.atguigu.mapreduce.index; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;   public class OneIndexMapper extends Mapper{     String name;   Text k = new Text();   IntWritable v = new IntWritable();     @Override   protected void setup(Context context)throws IOException, InterruptedException {         // 获取文件名称       FileSplit split = (FileSplit) context.getInputSplit();             name = split.getPath().getName();   }     @Override   protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {         // 1 获取1行       String line = value.toString();             // 2 切割       String[] fields = line.split(” “);             for (String word : fields) {            // 3 拼接          k.set(word+”–“+name);          v.set(1);                   // 4 写出          context.write(k, v);       }   } }

    大数据培训

    (2)第一次处理,编写OneIndexReducer类

    package com.atguigu.mapreduce.index; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;   public class OneIndexReducer extends Reducer{   IntWritable v = new IntWritable();     @Override   protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException {             int sum = 0;         // 1 累加求和       for(IntWritable value: values){          sum +=value.get();       }              v.set(sum);         // 2 写出       context.write(key, v);   } }

    (3)第一次处理,编写OneIndexDriver类

    package com.atguigu.mapreduce.index; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   public class OneIndexDriver {     public static void main(String[] args) throws Exception {          // 输入输出路径需要根据自己电脑上实际的输入输出路径设置       args = new String[] { “e:/input/inputoneindex”, “e:/output5” };         Configuration conf = new Configuration();         Job job = Job.getInstance(conf);       job.setJarByClass(OneIndexDriver.class);         job.setMapperClass(OneIndexMapper.class);       job.setReducerClass(OneIndexReducer.class);         job.setMapOutputKeyClass(Text.class);       job.setMapOutputValueClass(IntWritable.class);             job.setOutputKeyClass(Text.class);       job.setOutputValueClass(IntWritable.class);         FileInputFormat.setInputPaths(job, new Path(args[0]));       FileOutputFormat.setOutputPath(job, new Path(args[1]));         job.waitForCompletion(true);   } }

    大数据培训

    (4)查看第一次输出结果

    atguigu–a.txt  3 atguigu–b.txt  2 atguigu–c.txt  2 pingping–a.txt 1 pingping–b.txt 3 pingping–c.txt 1 ss–a.txt 2 ss–b.txt 1 ss–c.txt 1

    4.第二次处理

    (1)第二次处理,编写TwoIndexMapper类

    package com.atguigu.mapreduce.index; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;   public class TwoIndexMapper extends Mapper{     Text k = new Text();   Text v = new Text();     @Override   protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {             // 1 获取1行数据       String line = value.toString();             // 2用“–”切割       String[] fields = line.split(“–“);             k.set(fields[0]);       v.set(fields[1]);             // 3 输出数据       context.write(k, v);   } }

    大数据培训

    (2)第二次处理,编写TwoIndexReducer类

    package com.atguigu.mapreduce.index; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class TwoIndexReducer extends Reducer {   Text v = new Text();     @Override   protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {       // atguigu a.txt 3       // atguigu b.txt 2       // atguigu c.txt 2         // atguigu c.txt–>2 b.txt–>2 a.txt–>3         StringBuilder sb = new StringBuilder();           // 1 拼接       for (Text value : values) {          sb.append(value.toString().replace(“\t”, “–>”) + “\t”);       }   v.set(sb.toString());         // 2 写出       context.write(key, v);   } }

    (3)第二次处理,编写TwoIndexDriver类

    package com.atguigu.mapreduce.index; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   public class TwoIndexDriver {     public static void main(String[] args) throws Exception {          // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { “e:/input/inputtwoindex”, “e:/output6” };         Configuration config = new Configuration();       Job job = Job.getInstance(config);   job.setJarByClass(TwoIndexDriver.class);       job.setMapperClass(TwoIndexMapper.class);       job.setReducerClass(TwoIndexReducer.class);         job.setMapOutputKeyClass(Text.class);       job.setMapOutputValueClass(Text.class);             job.setOutputKeyClass(Text.class);       job.setOutputValueClass(Text.class);         FileInputFormat.setInputPaths(job, new Path(args[0]));       FileOutputFormat.setOutputPath(job, new Path(args[1]));         boolean result = job.waitForCompletion(true); System.exit(result?0:1);   } }

    大数据培训

    (4)第二次查看最终结果

    atguigu c.txt–>2 b.txt–>2 a.txt–>3

    pingping   c.txt–>1 b.txt–>3 a.txt–>1

    ss  c.txt–>1 b.txt–>1 a.txt–>2

  • 相关阅读:
    Flink系列之Flink中Broadcast和Counter整理和实战
    计算电磁学(一)前置知识
    Unreal NetMode&NetRole 解析
    瑞芯微rk3568移植openbmc(三)
    ORACLE无法OPEN,处理三板斧
    uni-app:实现当前时间的获取,并且根据当前时间判断所在时间段为早上,下午还是晚上
    按键控制LED状态翻转
    Android:实现手机前后摄像头预览同开
    物理不可克隆功能 (PUF)介绍
    shell多线程的资料
  • 原文地址:https://blog.csdn.net/zjjcchina/article/details/127841472