Hadoop系列(七)Hadoop三大核心之MapReduce-程序编写

喵了个喵,我又遇到瓶颈了

[TOC]

接下来以一个简单的WordCount为例子,介绍Java版本的MapReduce的程序编写。

mapreduce程序主要分三部分:1.map部分,2.reduce部分,3.提交部分。

1. 准备部分

hadoop中,针对数据类型自成一体,与java的数据类型对应。封装在hadoop.io包中,主要分为基本类型和其它类型。

  • 基本数据类型
数据类型 hadoop数据类型 Java数据类型
布尔 BooleanWritable boolean
整型 IntWritable int
长整型 LongWritable long
浮点型 FloatWritable float
双精浮点 DoubleWritable double
字节 ByteWritable byte
  • 其它类型
数据类型 hadoop数据类型 Java数据类型
字符串 Text String
数组 ArrayWritable Array
Map MapWritable Map

2. jar包依赖

创建一个maven工程,pom.xml文件中,添加以下依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
</dependencies>

3. Map部分

映射部分,将数据逐条处理

首先,需要继承Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>类

四个泛型参数分别代表:输入key 输入value 输出key 输出value

然后重写mapper的map方法

key, VALUEIN value, Context context) throws IOException, InterruptedException```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

主体代码:

```java
public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {

/**
* 流程,输入key和value,map的结果写入到context中
*/
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//读取一行数据
String line = value.toString();
//因为英文字母是以“ ”为间隔的,因此使用“ ”分隔符将一行数据切成多个单词并存在数组中
String str[] = line.split(" ");
//循环迭代字符串,将一个单词变成<key,value>形式,及<"hello",1>
for (String s : str) {
context.write(new Text(s), new IntWritable(1));
}
}

}

4.Reduce部分

归并部分,将map处理和shuffle之后的数据进行归并。shuffle过程由hadoop控制

Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

四个泛型参数分别代表:输入key 输入value 输出key 输出value

然后重写reducer的reduce方法

key, Iterable values, Context context) throws IOException, InterruptedException```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

主体代码:

```java
public class WordCountReduce extends Reducer<Text,IntWritable,Text, IntWritable> {
/**
* 流程,输入key和value,map的结果写入到context中
*/
@Override
public void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
int count = 0;

for(IntWritable value: values) {
count++;
}
context.write(key,new IntWritable(count));
}
}

5.提交部分

mapreduce的入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
// 构造一个job对象来封装本mapreduce业务到所有信息
Job mrJob = Job.getInstance(conf);
// 指定本job工作用到的jar包位置
mrJob.setJarByClass(WordCount.class);
// 指定本job用到的mapper类
mrJob.setMapperClass(WordCountMap.class);
// 指定本job用到的reducer类
mrJob.setReducerClass(WordCountReduce.class);

// 指定mapper输出的kv类型
mrJob.setMapOutputKeyClass(Text.class);
mrJob.setMapOutputValueClass(IntWritable.class);


// 指定reducer输出到kv数据类型(setOutputKeyClass 会对mapper和reducer都起作用,如果上面mapper不设置的话)
mrJob.setOutputKeyClass(Text.class);
mrJob.setOutputValueClass(IntWritable.class);


FileInputFormat.addInputPath(mrJob,new Path(args[0]));

//设置mapreduce程序的输出路径,MapReduce的结果都是输入到文件中
FileOutputFormat.setOutputPath(mrJob,new Path(args[1]));

// 最后提交任务
boolean waitForCompletion = mrJob.waitForCompletion(true);
System.exit(waitForCompletion ? 0 : 1);
}

6.打包提交

将程序打成jar包,提交到hadoop集群中,然后使用命令行进行任务提交

对于输入输出路径,均可接受本地路径和hdfs路径。

本地路径前缀:file://

hdfs路径前缀:hdfs://

1
/hadoop-3.1.2/bin/hadoop jar my_mapreduce-1.0-SNAPSHOT.jar com.breakthrough.wordcount.WordCount file:///home/hadoop/english.txt file:///home/hadoop/output
文章目录
  1. 1. 1. 准备部分
  2. 2. 2. jar包依赖
  3. 3. 3. Map部分
  4. 4. 4.Reduce部分
  5. 5. 5.提交部分
  6. 6. 6.打包提交
|