喵了个喵,我又遇到瓶颈了
[TOC]
接下来以一个简单的WordCount为例子,介绍Java版本的MapReduce的程序编写。
mapreduce程序主要分三部分:1.map部分,2.reduce部分,3.提交部分。
1. 准备部分
hadoop中,针对数据类型自成一体,与java的数据类型对应。封装在hadoop.io包中,主要分为基本类型和其它类型。
数据类型 |
hadoop数据类型 |
Java数据类型 |
布尔 |
BooleanWritable |
boolean |
整型 |
IntWritable |
int |
长整型 |
LongWritable |
long |
浮点型 |
FloatWritable |
float |
双精浮点 |
DoubleWritable |
double |
字节 |
ByteWritable |
byte |
|
|
|
数据类型 |
hadoop数据类型 |
Java数据类型 |
字符串 |
Text |
String |
数组 |
ArrayWritable |
Array |
Map |
MapWritable |
Map |
2. jar包依赖
创建一个maven工程,pom.xml文件中,添加以下依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency>
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0</version> </dependency>
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0</version> </dependency> </dependencies>
|
3. Map部分
映射部分,将数据逐条处理
首先,需要继承Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>类
四个泛型参数分别代表:输入key 输入value 输出key 输出value
然后重写mapper的map方法
key, VALUEIN value, Context context) throws IOException, InterruptedException```1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| 主体代码:
```java public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
/** * 流程,输入key和value,map的结果写入到context中 */ @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //读取一行数据 String line = value.toString(); //因为英文字母是以“ ”为间隔的,因此使用“ ”分隔符将一行数据切成多个单词并存在数组中 String str[] = line.split(" "); //循环迭代字符串,将一个单词变成<key,value>形式,及<"hello",1> for (String s : str) { context.write(new Text(s), new IntWritable(1)); } }
}
|
4.Reduce部分
归并部分,将map处理和shuffle之后的数据进行归并。shuffle过程由hadoop控制
Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
四个泛型参数分别代表:输入key 输入value 输出key 输出value
然后重写reducer的reduce方法
key, Iterable values, Context context) throws IOException, InterruptedException```1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| 主体代码:
```java public class WordCountReduce extends Reducer<Text,IntWritable,Text, IntWritable> { /** * 流程,输入key和value,map的结果写入到context中 */ @Override public void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{ int count = 0;
for(IntWritable value: values) { count++; } context.write(key,new IntWritable(count)); } }
|
5.提交部分
mapreduce的入口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job mrJob = Job.getInstance(conf); mrJob.setJarByClass(WordCount.class); mrJob.setMapperClass(WordCountMap.class); mrJob.setReducerClass(WordCountReduce.class);
mrJob.setMapOutputKeyClass(Text.class); mrJob.setMapOutputValueClass(IntWritable.class);
mrJob.setOutputKeyClass(Text.class); mrJob.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(mrJob,new Path(args[0]));
FileOutputFormat.setOutputPath(mrJob,new Path(args[1]));
boolean waitForCompletion = mrJob.waitForCompletion(true); System.exit(waitForCompletion ? 0 : 1); }
|
6.打包提交
将程序打成jar包,提交到hadoop集群中,然后使用命令行进行任务提交
对于输入输出路径,均可接受本地路径和hdfs路径。
本地路径前缀:file://
hdfs路径前缀:hdfs://
1
| /hadoop-3.1.2/bin/hadoop jar my_mapreduce-1.0-SNAPSHOT.jar com.breakthrough.wordcount.WordCount file:///home/hadoop/english.txt file:///home/hadoop/output
|