Hadoop之MapReduce编程示例

根据前面介绍:

hdfs负责海量数据存储

而MapReduce负责海量数据计算

所以, 这篇博客 会用代码详细介绍MapReduce中典型的wordcount(单词计数)例子的过程.

新建一个java project, 这里我用maven构建.

项目构建

1
2
3
4
5
6
7
8
9
10
11
12
打开项目的pom.xml文件, 导入我们项目所需的依赖.
因为mapreduce中需要依赖的jar有很多, hdfs, common, mapreduce等等.
所以为了避免jar包之间依赖麻烦, 所以这里我推荐直接导入一个依赖即可:
直接在maven的中央仓库中搜索 hadoop-client 即可.

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<!-- 该依赖同时依赖好多其他的jar, hdfs, common, yarn, mapreduce等等-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>

程序分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
* 比如有一段话: (需要进行分词操作)
* i love beijing
* i love china
* beijing is the captial of china

*
*///////////////////////////////////////
* 过程一:
* k1 v1
* 1 i love beijing
* 4 i love china
* 7 beijing is the captial of china
*///////////////////////////////////////


* 过程二:
* 经过分词操作后,这里也就是mapper的主要操作.
*
* k2 v2
* i 1
* love 1
* beijing 1
* i 1
* love 1
* china 1
* beijing 1
* is 1
* the 1
* captial 1
* of 1
* china 1
*///////////////////////////////////////


* 过程三:
* 该结果作为reduce的输入:
* k3 v3
* i (1, 1)
* beijing (1, 1, 1)
* love (1, 1)
* china (1, 1)
* the (1)
* .......


*///////////////////////////////////////
* 过程四:
* 然后在reduce阶段要做的就是 将v3集合中的值求和即可, 然后输出k4, v4
* k4 v4
* i 2
* beijing 3
* love 2
* china 2
* the 2
* ........
*
*///////////////////////////////////////

编写代码

首先编写Map的处理过程代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
新建Mapper 继承 org.apache.hadoop.mapreduce.Mapper
注意: KEYIN, VALUEIN, KEYOUT, VALUEOUT的类型.
要使用hadoop中自带的数据类型, 不能使用Java的数据类型


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/**
* key就是输入的key
* value就是数据
* context就是map的上下文, 上文就是hdfs, 下文就是reduce
*/
// 得到一行数据
String data = value.toString();
// 进行分词
String[] strings = data.split(" ");
// 输出每个单词
for (String word : strings) {
context.write(new Text(word), new LongWritable(1));
}
}
}

再编写 Reduce 过程代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* KEYIN, VALUEIN, KEYOUT, VALUEOUT
* 这里的KEYIN 和 VALUEIN 就是Mapper的输出. 类型可以直接复制过来.
*/
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

/**
* 规约函数
* @param key mapper的输出key
* @param values mapper的输出value
* @param context context的上文是mapper的输出, 下文是hdfs
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
/**
* values是一个集合, 就是在mapper阶段的输出结果的一个集合.
*/
long total = 0;
for (LongWritable writable : values) {
total = total + writable.get();
}
context.write(key, new LongWritable(total));
}
}

最后编写主函数即入口函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountMain {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
/**
* 创建一个job = map + reduce;
*/
Configuration conf = new Configuration();
// 创建一个job
Job job = Job.getInstance(conf);
// 指定任务的入口
job.setJarByClass(WordCountMain.class);
// 指定job的 mapper是谁.
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 指定job的reducer是谁.
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 指定任务的输入和输出. 参数有运行时传入
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交任务. ture表示运行过程中 把日志打印出来.
job.waitForCompletion(true);
}

}

最后一步, 打成jar包.

由于是用maven构建项目的, 打包方式有很多, 这里我介绍一种最简单的方式, 在pom.xml中使用maven插件的方式打包, 只要配置下, 指定下jar的运行的主类即可. 我的主类是 mapreduce.WordCountMain(在idea中直接右键copy PATH) 即可.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<useUniqueVersions>false</useUniqueVersions>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>mapreduce.WordCountMain</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>

然后我是直接用的idea自带的maven projects生命周期中的packaeg点击直接打包, 当然 你也可以用命令行的方式 mvn package, 也可以先clean下,再package.

打包的结果 wc.jar

最后就是测试自己写的程序是否有问题.

直接在命令行中 hadoop jar wc.jar /input/data.txt /output (注意路径是hdfs上的文件路径, 不是本机上的)

这里的/input/data.txt /output 两个参数将作为wc.jar的主程序args[0], args[1] 传入.

就可以得到结果了, 使用 hdfs dfs -cat /output/part-r-00000 既可以查看各单词统计的结果

结束

到此, 一个mapreduce的wordcount程序就已经完成了, 相当于我们一开始写代码的时都要写一个Hello World程序.

注意, 重要的是一定要理解mapreduce这个过程. map + reduce = job, map的输出就是reduce的输入. 理解了这个过程, 再写其他程序之前也这样分析下, 写程序就不是什么难事了. 加油哦. 共勉!