一起学Spark-2

Spark的核心RDD(弹性分布式数据集)

先热下身😜 Hadoop

Hadoop的核心技术都是为了把传统的单点式结构转变为分布式结构:

  1. 把单机文件存储转变为分布式存储——HDFS;
  2. 把单机计算转变为分布式计算——MapReduce;
  3. 把单机数据库转换为分布式数据库——Hbase和Hive

Scala基础

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
Scala和Java很像. 用过java8中lambda表达式的(我之前博客中有写), 上手会更快的.

Scala中数据类型第一个字母都是大写: Int, Long, Char, Array, String, Float, Double, Boolean, Null, Any.....

1. 创建变量用var(分配以后还可指向类型相同的值)或者val(变量值一经分配就不可修改).
如:
scala> var num = 1
num: Int = 1

scala> num = "hello"
<console>:25: error: type mismatch;
found : String("hello")
required: Int
num = "hello"
^

再如:
scala> val num = 1
num: Int = 1

scala> num = 2
<console>:25: error: reassignment to val
num = 2
^

2. 匿名函数和类型推断
lines.filter(line => line.contains("world"))
即: filter函数里面的参数就是一个匿名函数; 箭头前面是params, 后面是body. 默认会推断出参数line是string类型. 然后调用string类的contains方法.

3. 函数:
def add(x:Int, y:Int): Int = {
return x + y;
}

带默认参数的函数:
def add(x:Int = 0, y:Int = 0):Int = {
return x + y;
}

可以指定最后一个参数为可变参数, 从而接受数目不定的同类型实参:
scala> def echo (args: String *) { for (arg <- args) println(arg) }

scala> echo("Hello", "World")
Hello
World

命名参数允许以任意顺序传入参数:
scala> def speed(dist:Double, time:Double):Double = {return dist / time}

scala> speed(time=2.0, dist=12.2)
res28: Double = 6.1

**注意**
还有注意: scala的参数传递采用传值的方式, 参数被当做常量val而非变量var传入.
scala> def swap(x:Int, y:Int) {var t = x; x = y; y = t;}
<console>: error: reassignment to val
def swap(x:Int, y:Int) {var t = x; x = y; y = t;}

Spark核心概念–RDD(弹性分布式数据集)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1. 一个RDDS是一个不可改变的分布式集合对象.
2. Spark中, 所有的计算都是通过RDDs的创建, 转换, 操作完成的.
3. 一个RDD内部是由许多分片(partitions)组成.
4. 每个分片包括一部分数据, partitions可在集群不同节点上计算.
5. 分片是Spark并行处理单元, Spark中顺序, 并行的处理分片.
6. SparkContext对象代表和一个集群的连接. 在spark shell中该对象已经帮我们创建好了, 之间引用即可(sc就是SparkContext对象)
如:
scala> var lines = sc.textFile("../../hello.txt")
lines: org.apache.spark.rdd.RDD[String] = ../../hello.txt MapPartitionsRDD[1] at textFile at <console>:24

这里的lines就是一个RDD.

RDD的创建方法:
1. 把一个存在的集合作为参数传给SparkContext(sc)的parallelize()方法, 一般在测试的时候使用:
val rdd = sc.parallelize(Array(1, 3, 2, 4), 4)
参数一: 待并行处理的集合,
参数二: 分区的个数

scala> val rdd = sc.parallelize(Array(1, 3, 2, 4), 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> rdd.count
res1: Long = 4

scala> rdd.foreach(println)
2134


2. 加载外部数据集
val rdd = sc.textFile("hello.txt")

RDD的基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
Transformations(转换):
从之前的RDD构建一个新的RDD, 想map(), filter()
感觉这里很像java8中lambda表达式中的filter(从一个list中, 根据条件过滤出list中需要的item, 然后在组成一个list).

1. map()接受函数, 把函数应用到RDD的每个元素, 返回一个新的RDD
示例数据:
scala> val rdd = sc.parallelize(Array("hello", "dottie", "hello", "daejong", "hello", "China"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> rdd.foreach(println)
hello
daejong
hello
China
dottie
hello

产生一个新的rdd1
scala> val rdd1 = rdd.map(word=>(word,1))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[7] at map at <console>:26

scala> rdd1.foreach(println)
(hello,1)
(dottie,1)
(hello,1)
(daejong,1)
(hello,1)
(China,1)


2. filter()函数: 返回满足条件的元素
scala> val rdd2 = rdd.filter(word=>word.contains("hello"))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at filter at <console>:26

scala> rdd2.foreach(println)
hello
hello
hello


3. flatMap():
对每个输入元素, 输出多个元素
flat是压扁的意思, 将rdd中元素压扁后返回一个新的rdd.
示例数据:
scala> val input = sc.textFile("hello.txt")
input: org.apache.spark.rdd.RDD[String] = hello.txt MapPartitionsRDD[10] at textFile at <console>:24

scala> input.foreach(println)
i love RDD
hello daejong
hello dottie


scala> val lines = input.flatMap(line=>line.split(" "))
lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at flatMap at <console>:26

这里会对将每一行按照空格分隔成多个元素, 最后对所有结果flat(压扁)成一行多个元素.

scala> lines.foreach(println)
hello
daejong
hello
dottie
i
love
RDD

如果我们不适用println函数:
scala> lines.foreach(print)
helloidaejongloveRDDhellodottie
则会将所有单词输出到一起.

这里我们可以查看一下lines的输出:
scala> lines
res23: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at flatMap at <console>:26

然后我们再对lines使用map函数产生一个新的rdd.

scala> val maprdd = lines.map(word=>(word, 1))
maprdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at map at <console>:28

scala> maprdd.foreach(println)
[Stage 15:> (0 + 0) / 2](i,1)
(love,1)
(RDD,1)
(hello,1)
(daejong,1)
(hello,1)
(dottie,1)

最后我们可以实现我们之前学hadoop时的mapreduce中wordcount程序

scala> val maprdddd = maprdd.reduceByKey((x, y) => x + y)
maprdddd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at <console>:30

scala> maprdddd.foreach(println)
(love,1)
(i,1)
(dottie,1)
(daejong,1)
(hello,2)
(RDD,1)

这里的算子reduceByKey, 是根据一个map中的key进行判断, 如果key相同,则将对应的value相加. 这里的参数就是 value 值.

RDDs中的集合运算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
示例数据:
scala> val rdd1 = sc.parallelize(Array("apple", "orange", "banana", "pear", "milk"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[15] at parallelize at <console>:24

scala> rdd1.foreach(println)
banana
pear
milk
orange
apple

scala> val rdd2 = sc.parallelize(Array("apple", "black", "white", "banana", "yellow", "red", "apple", "white"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[17] at parallelize at <console>:24

scala> rdd2.foreach(println)
yellow
white
banana
apple
black
apple
white
red


1. 我们可以对rdd2元素进行去重.
scala> val rdd_distinct = rdd2.distinct
rdd_distinct: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[20] at distinct at <console>:26

scala> rdd_distinct.foreach(println)
yellow
white
banana
red
black
apple

2. 获取rdd1和rdd2的并集
scala> val rdd_union = rdd1.union(rdd2)
rdd_union: org.apache.spark.rdd.RDD[String] = UnionRDD[21] at union at <console>:28

scala> rdd_union.foreach(println)
apple
banana
orange
pear
milk
apple
black
white
banana
yellow
red
apple
white

3. 获取rdd1和rdd2的交集
scala> val rdd_intersection = rdd1.intersection(rdd2)
rdd_intersection: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[27] at intersection at <console>:28

scala> rdd_intersection.foreach(println)
apple
banana

4. 获取rdd1与rdd2的差集

scala> val rdd_subtract = rdd1.subtract(rdd2)
rdd_subtract: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[31] at subtract at <console>:28

scala> rdd_subtract.foreach(println)
orange
pear
milk

RDD的Action操作.

上面我们对RDD的操作都是一个转换的操作.

Action: 这里我们要在RDD计算出一个结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
1. reduce()
接收一个函数, 作用在RDD两个类型相同的元素上, 返回新元素. 可以实现累加,计数等操作.

scala> val rdd = sc.parallelize(Array(1, 2, 3, 4, 4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:24

scala> rdd.collect()
res36: Array[Int] = Array(1, 2, 3, 4, 4)

scala> rdd.reduce((x, y) => x + y)
res37: Int = 14

2. collect() 遍历整个rdd并返回, 一般用于测试

3. take(n) 随机在rdd中取n个元素, 结果无序的.
scala> rdd.take(4)
res41: Array[Int] = Array(1, 2, 3, 4)

4. top(n) 排序输出
scala> rdd.top(1)
res43: Array[Int] = Array(4)

scala> rdd.top(2)
res44: Array[Int] = Array(4, 4)

scala> rdd.top(3)
res45: Array[Int] = Array(4, 4, 3)

5. foreach() 遍历输出, 注意:该函数不会有返回值. 即不会保存到本地.
scala> rdd.foreach(e => println(e))
1
4
4
3
2
简写:
scala> rdd.foreach(println)
4
4
2
1
3

RDDs的特性

Spark维护着RDDs之间的依赖关系和创建关系, 称之为血统关系图.

Spark使用血统关系图来计算每个RDD的需求和恢复丢失的数据.

1
2
3
4
5
6
7
8
9
10
11
12
如: rdd经过filter产生了rdd1, rdd1经过map产生rdd2, rdd1和rdd2经过union操作产生了rdd3.

任何一个rdd丢失都没关系, 都可以通过血统关系图 恢复丢失的rdd.

1. 延迟操作
第一次使用action操作, spark对rdd的计算都会延迟.
如加载数据也是一个延迟计算, 因为只有数据在需要的时候才会被加载进去.

2. 持久化 persist()
默认每次在rdds上进行action操作时, spark都会重新计算rdds.
要想重复利用一个rdd, 可以使用rdd.persist(), 可以传参(缓存的级别 )
unpersist()方法从缓存中移除rdd.

RDDs的KeyValue对

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
1. 创建keyvalue
使用map函数, 返回keyValue
如: 将每行的第一个单词作为key, 每行的值作为value

scala> val rdd = sc.textFile("hello.txt")
rdd: org.apache.spark.rdd.RDD[String] = hello.txt MapPartitionsRDD[37] at textFile at <console>:24

scala> rdd.foreach(println)
i love RDD
hello daejong
hello dottie

scala> val rdd1 = rdd.map(line => (line.split(" ")(0), line))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[38] at map at <console>:26

scala> rdd1.foreach(println)
(hello,hello daejong)
(i,i love RDD)
(hello,hello dottie)

2. 手动构建一个keyvalue对, 测试
scala> val rdd2 = sc.parallelize(Array((1, 2), (3, 4), (3, 6)))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[40] at parallelize at <console>:24

scala> rdd2.foreach(println)
(3,4)
(1,2)
(3,6)

3. 还有spark提供的
reduceByKey(之前在写wordcount程序用到的)
rdd.reduceByKey((value1, value2) => value1 + value2)
将相同key的value值相加.
scala> val rdd3 = rdd2.reduceByKey((x, y) => x + y)
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[42] at reduceByKey at <console>:26

scala> rdd3.foreach(println)
(3,10)
(1,2)


groupByKey() 按相同的key进行分组
scala> val rdd4 = rdd2.groupByKey()
rdd4: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[43] at groupByKey at <console>:26

scala> rdd4.foreach(println)
(1,CompactBuffer(2))
(3,CompactBuffer(4, 6))


mapValues() 对每个keyvalue对的value进行操作, key不操作
scala> val rdd5 = rdd2.mapValues(x => x+1)
rdd5: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[44] at mapValues at <console>:26

keys() 返回key的集合
scala> val rdd6 = rdd2.keys
rdd6: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[45] at keys at <console>:26

scala> rdd6.foreach(println)
1
3
3


values() 返回value的集合
scala> val rdd7 = rdd2.values
rdd7: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[46] at values at <console>:26

scala> rdd7.foreach(println)
6
2
4

scala> rdd5.foreach(println)
(3,7)
(1,3)
(3,5)

sortByKey() 按照key来排序
scala> val rdd8 = rdd2.sortByKey().collect()
rdd8: Array[(Int, Int)] = Array((1,2), (3,4), (3,6))

scala> rdd8.foreach(println)
(1,2)
(3,4)
(3,6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
combineByKey();
(create combiner, mergeValue, merge Combiner, partitioner)
最常用的基于key的聚合函数, 返回的类型可以与输入的类型不一致.
许多基于key的聚合函数都用到了该函数. 如groupByKey
该函数会遍历partition分区中的元素, 元素的key要么是见过, 要么没见过.
如果是新元素就是使用我们传入的createCombiner函数
如果不是新元素(则partition中已经存在key), 就会使用mergeValue函数
合计每个partition的结果时, 使用mergeCombiner函数


示例: (求平均值)
scala> val rdd = sc.parallelize(Array(("dottie", 91.0), ("dottie", 78.0), ("dottie", 88.0), ("daejong", 89.0), ("daejong", 99.0), ("daejong", 97.0)))
rdd: org.apache.spark.rdd.RDD[(String, Double)] = ParallelCollectionRDD[54] at parallelize at <console>:24

scala> rdd.foreach(println)
(dottie,91.0)
(dottie,78.0)
(dottie,88.0)
(daejong,89.0)
(daejong,99.0)
(daejong,97.0)


还有如何获取key和value的值
scala> val r1 = (1, 1111)
r1: (Int, Int) = (1,1111)

scala> r1._1
res77: Int = 1

scala> r1._2
res78: Int = 1111


先计算总和
scala> val score = rdd.combineByKey(score=>(1, score), ((c1:(Int, Double), newScore) => (c1._1+1, c1._2+newScore)), ((c1:(Int, Double), c2:(Int, Double)) => (c1._1+c2._1, c1._2+c2._2)))
score: org.apache.spark.rdd.RDD[(String, (Int, Double))] = ShuffledRDD[55] at combineByKey at <console>:26


查看结果
scala> score.foreach(println)
(daejong,(3,285.0))
(dottie,(3,257.0))

分析:
rdd.combineByKey(
#分区中没有key时的处理函数, 返回一个keyvalue对(1, value)
score=>(1, score),
#分区存在key是的处理函数, 将分区中的keyvalue的key+1, value+newScore, 作为新的keyvalue对返回到分区
(
(c1:(Int, Double), newScore) => (c1._1+1, c1._2+newScore)
),
#最后的收尾工作, 将各个分区中的结果, 进行累加求和操作.
(
(c1:(Int, Double), c2:(Int, Double)) => (c1._1+c2._1, c1._2+c2._2)
)
)

最后一步求平均值
scala> val score2 = score.map((name, (num, score)) => (name, score/num))
<console>:1: error: not a legal formal parameter.
Note: Tuples cannot be directly destructured in method or function parameters.
Either create a single parameter accepting the Tuple2,
or consider a pattern matching anonymous function: `{ case (num, score) => ... }
val score2 = score.map((name, (num, score)) => (name, score/num))
这里提示我们要用case进行判断参数和接收

scala> val score2 = score.map{case (name, (num, score)) => (name, score/num)}
score2: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[56] at map at <console>:30

scala> score2.foreach(println)
(daejong,95.0)
(dottie,85.66666666666667)


当然可以对结果格式化下
scala> "%.2f".format(1.14534)
res87: String = 1.15

scala> 1.1231.toInt
res88: Int = 1

四舍五入保留两位小数:
scala> val score3 = score2.map{case(name, score)=>(name, "%.2f".format(score))}
score3: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[59] at map at <console>:32

scala> score3.foreach(println)
(dottie,85.67)
(daejong,95.00)

结尾:

Spark是一个基于内存的计算, 因此计算速度很快.

Spark的基础知识点差不多都涵盖了.

后面就要深入学习Spark. 如Spark的架构和运行原理以及部署程序等等.

一起加油吧!