前言

作为与大数据相关的一种重要的编程模型,mapreduce是我们需要知道的,而mapreduce入门基本都是从单词计数开始的,我们本次就来粗解一下wordcount的源码.

代码部分

map部分

package com.qqa.MapReduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* @author qqa
* @date 2020/7/17 22:02
*/
public class wordcount_Mapper extends Mapper<LongWritable, Text,Text, IntWritable> {
Text k=new Text();
IntWritable v=new IntWritable(1);

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取一行
String string = value.toString();
//分割
String[] words = string.split(" ");
//循环写出
for (String word:words){
//存储word
k.set(word);
//然后写出去
context.write(k,v);

}
}
}



KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,Long;

而在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而是用LongWritable

VALUEIN:默认情况下,是mr框架所读到的一行文本内容,String;此处用Text

KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String;此处用Text

VALUEOUT,是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,此处用IntWritable

在这里解释一下什么是偏移量

比如文本数据是这样的

0   hello
5 hadoop
11 mapreduce
20 .......

文本偏移量在第一行默认都是0,在第二行则是第一行的文本字符数(包括空格和符号)

reduce部分

package com.qqa.MapReduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* @author qqa
* @date 2020/7/17 22:02
*/
public class wordcount_Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable v = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
//累加求和
for (IntWritable value : values) {
sum += value.get();
}

//写出
context.write(key, v);


}
}


KEYIN , VALUEIN 对应mapper输出的KEYOUT, VALUEOUT类型

KEYOUT,VALUEOUT 对应自定义reduce逻辑处理结果的输出数据类型 KEYOUT是单词 VALUEOUT是总次数

job提交客户端实现:

package com.qqa.MapReduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* @author qqa
* @date 2020/7/17 22:03
*/
public class wordcount_Driver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取配置信息
Configuration conf = new Configuration();
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//1.获取job对象
Job job = Job.getInstance(conf);
//2.设置jar包存储路径
job.setJarByClass(wordcount_Driver.class);
//3.关联map和red类
job.setMapperClass(wordcount_Mapper.class);
job.setReducerClass(wordcount_Reducer.class);
//4.设置map阶段输出的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5.设置最终输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//如果输出路径已经存在删除它
if (fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]),true);
}
//7.提交job
boolean result = job.waitForCompletion(true);
System.out.println("job提交"+(result ? "成功": "失败"));


}
}

在实例化驱动程序时,基本都是这样模版,而mapreduce只是一个框架,具体的业务逻辑都是看自己的java逻辑水平来编写具体的业务代码.而框架的使用是自己对于自己需要完成的目标数据的分析来决定的mapreduce的输入输出类型的.

常用的数据类型对应的Hadoop数据序列化类型

Java类型 Hadoop Writable类型
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
map MapWritable
array ArrayWritable