一起学Spark

Spark: 新一代大数据计算引擎,因为内存计算的特性,具有比hadoop更快的计算速度.

Spark: 一个快速且通用的集群计算平台.

概念

Spark: 是通用的, 也是高度开放的:

Spark使用Scala语言写的, 运行在JVM上, 要求JDK的版本: 1.7+. Python2.6+ 或者 3.4+

提供了 Python, Java, Scala, Sql的API和丰富的内置库.

Spark和其他大数据的工具整合的很好, 包括hadoop.

Spark不具有hdfs的存储能力, 要借助hdfs等进行持久化数据.

Spark本质提供了一个引擎(我们通常称作Spark core), 在此之上有许多高级的模块(组件)也成工具: SparkSql(写sql完成对数据的计算), Sparkstreaming(完成实时计算, 当然也可以离线计算), MLlib(机器学习), GraphX(图计算)

这是官网上介绍Spark Overview

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

Spark官网链接

开发环境搭建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
spark2.2.1下载链接:

wget http://mirrors.hust.edu.cn/apache/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz

tar -zxvf 解压

cd到bin中 有Spark Shell

Spark Shell
1. Spark的Shell使你能够处理分布在集群上的数据.
2. Spark把数据加载到节点的内存中, 因此分布式计算就可以在秒级完成.
3. 快速式 迭代式计算, 实时查询, 分析一般能在shell中完成.
4. Spark提供了Python ShellScala Shell.

Python Shell在bin/pyshell
Scala Shell 在bin/spark-shell

Scala Shell的例子:
查看hello.txt内容:
➜ java more hello.txt
i love nanjing
i love China
nanjing is beautiful

scala> var lines = sc.textFile("../../hello.txt")
lines: org.apache.spark.rdd.RDD[String] = ../../hello.txt MapPartitionsRDD[1] at textFile at <console>:24

scala> lines.first()
res6: String = i love nanjing

scala> lines.count()
res7: Long = 3
1
2
3
4
5
6
下载scala的压缩包
网址: https://www.scala-lang.org/download/2.10.5.html
我的是mac所以下载的是
wget https://downloads.lightbend.com/scala/2.10.5/scala-2.10.5.tgz

然后打开IDEA, 搜索Scala插件. 安装 重启IDEA

用Scala写的第一个Spark程序

1
2
3
4
5
6
7
8
打开IDEA, 新建一个Scala Project(根据IDEA左侧提供的scala构建项目).

WordCount程序
1. 创建一个Spark Context
2. 加载源数据
3. 将每行分割成一个个单词
4. 转成pairs,并计数.
5. 输出结果

先不用java8的lambda表达式写wordcount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
导入依赖
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.11</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>

</dependencies>

主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;


/**
* Created by Daejong
*/
public class WordCount {

public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("WordCount").setMaster("local");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
JavaRDD<String> javaRDD = sparkContext.textFile(args[0]);

// 切分压平
JavaRDD<String> stringJavaRDD = javaRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
String[] words = s.split(" ");
List<String> stringList = Arrays.asList(words);
Iterator<String> stringIterator = stringList.iterator();
return stringIterator;
}
});

// 将每个单词和一 进行组装
JavaPairRDD<String, Integer> stringIntegerJavaPairRDD = stringJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});

// 聚合 reduce
JavaPairRDD<String, Integer> reduceValue = stringIntegerJavaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});

// 排序
// 因为 javaRdd只支持sortByKey
// 而现在是 ("dottie", 3) ("hello", 2) 要对value进行排序
// 所以 可以把key和value暂时对调一下.
JavaPairRDD<Integer, String> swapReduceValue = reduceValue.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return new Tuple2<Integer, String>(stringIntegerTuple2._2, stringIntegerTuple2._1);
}
});

// 排序, 默认是按升序排序, false表示不按升序排序, 按降序排序
JavaPairRDD<Integer, String> sortSwapReduceValue = swapReduceValue.sortByKey(false);

// 将key和value交换过来
JavaPairRDD<String, Integer> resultReduceValue = sortSwapReduceValue.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
return new Tuple2<String, Integer>(integerStringTuple2._2, integerStringTuple2._1);
}
});

// 将结果保存到文件
resultReduceValue.saveAsTextFile(args[1]);

// 释放资源
sparkContext.stop();
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
IDEA中配置下 Edit Configuration
指明主类, 参数等信息后 即可运行

如: 我的arg[0]:源文件信息
Hello Dottie
Hello Daejong
i love Dottie

完成后会产生一个文件夹:
打开part-00000文件
即可查看文件内容
(Hello,2)
(Dottie,2)
(love,1)
(i,1)
(Daejong,1)

用java8中lambda表达式简写. 很简单哦! (跟用scala基本没啥区别)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

/**
* Created by Daejong.
*/
public class WordCountLambda {

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf().setAppName("WordCountLambda").setMaster("local");

JavaSparkContext jsc = new JavaSparkContext(sparkConf);

JavaRDD<String> lines = jsc.textFile(args[0]);

// flatmap
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());

// 将每个单词和一 组装
JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(word -> new Tuple2<String, Integer>(word, 1));

// reduce 操作
JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey((x, y) -> x + y);

// 交换key和value, reduced, 该rdd中的元素是tuple类型, 利用swap函数调换key和value
JavaPairRDD<Integer, String> swapd = reduced.mapToPair(tuple -> tuple.swap());

// 排序
JavaPairRDD<Integer, String> sortSwap = swapd.sortByKey(false);

// 将rdd中的元素中的key和vlaue交换回来
JavaPairRDD<String, Integer> result = sortSwap.mapToPair(tuple -> tuple.swap());

// 将结果保存到文件
result.saveAsTextFile(args[1]);

// 关闭资源
jsc.stop();


}

}
1
2
3
4
5
6
7
查看输出结果:

(Hello,2)
(Dottie,2)
(love,1)
(i,1)
(Daejong,1)