前言
作为与大数据相关的一种重要的编程模型,mapreduce是我们需要知道的,而mapreduce入门基本都是从单词计数开始的,我们本次就来粗解一下wordcount的源码.
代码部分
map部分
package com.qqa.MapReduce;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class wordcount_Mapper extends Mapper <LongWritable , Text ,Text , IntWritable > { Text k=new Text(); IntWritable v=new IntWritable(1 ); @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String string = value.toString(); String[] words = string.split(" " ); for (String word:words){ k.set(word); context.write(k,v); } } }
KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,Long;
而在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而是用LongWritable
VALUEIN:默认情况下,是mr框架所读到的一行文本内容,String;此处用Text
KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String;此处用Text
VALUEOUT,是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,此处用IntWritable
在这里解释一下什么是偏移量
比如文本数据是这样的
0 hello 5 hadoop 11 mapreduce 20 .......
文本偏移量在第一行默认都是0,在第二行则是第一行的文本字符数(包括空格和符号)
reduce部分
package com.qqa.MapReduce;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class wordcount_Reducer extends Reducer <Text , IntWritable , Text , IntWritable > { IntWritable v = new IntWritable(); @Override protected void reduce (Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0 ; for (IntWritable value : values) { sum += value.get(); } context.write(key, v); } }
KEYIN , VALUEIN 对应mapper输出的KEYOUT, VALUEOUT类型
KEYOUT,VALUEOUT 对应自定义reduce逻辑处理结果的输出数据类型 KEYOUT是单词 VALUEOUT是总次数
job提交客户端实现:
package com.qqa.MapReduce;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class wordcount_Driver { public static void main (String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); FileSystem fs=FileSystem.get(conf); Job job = Job.getInstance(conf); job.setJarByClass(wordcount_Driver.class); job.setMapperClass(wordcount_Mapper.class); job.setReducerClass(wordcount_Reducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0 ])); FileOutputFormat.setOutputPath(job, new Path(args[1 ])); if (fs.exists(new Path(args[1 ]))){ fs.delete(new Path(args[1 ]),true ); } boolean result = job.waitForCompletion(true ); System.out.println("job提交" +(result ? "成功" : "失败" )); } }
在实例化驱动程序时,基本都是这样模版,而mapreduce只是一个框架,具体的业务逻辑都是看自己的java逻辑水平来编写具体的业务代码.而框架的使用是自己对于自己需要完成的目标数据的分析来决定的mapreduce的输入输出类型的.
常用的数据类型对应的Hadoop数据序列化类型
Java 类型
Hadoop Writable 类型
boolean
BooleanWritable
byte
ByteWritable
int
IntWritable
float
FloatWritable
long
LongWritable
double
DoubleWritable
String
Text
map
MapWritable
array
ArrayWritable