Spark: 新一代大数据计算引擎,因为内存计算的特性,具有比hadoop更快的计算速度.
Spark: 一个快速且通用的集群计算平台.
概念
Spark: 是通用的, 也是高度开放的:
Spark使用Scala语言写的, 运行在JVM上, 要求JDK的版本: 1.7+. Python2.6+ 或者 3.4+
提供了 Python, Java, Scala, Sql的API和丰富的内置库.
Spark和其他大数据的工具整合的很好, 包括hadoop.
Spark不具有hdfs的存储能力, 要借助hdfs等进行持久化数据.
Spark本质提供了一个引擎(我们通常称作Spark core), 在此之上有许多高级的模块(组件)也成工具: SparkSql(写sql完成对数据的计算), Sparkstreaming(完成实时计算, 当然也可以离线计算), MLlib(机器学习), GraphX(图计算)
这是官网上介绍Spark Overview
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.
开发环境搭建
1 | spark2.2.1下载链接: |
1 | 下载scala的压缩包 |
用Scala写的第一个Spark程序1
2
3
4
5
6
7
8打开IDEA, 新建一个Scala Project(根据IDEA左侧提供的scala构建项目).
WordCount程序
1. 创建一个Spark Context
2. 加载源数据
3. 将每行分割成一个个单词
4. 转成pairs,并计数.
5. 输出结果
先不用java8的lambda表达式写wordcount
1 | 导入依赖 |
主程序1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* Created by Daejong
*/
public class WordCount {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("WordCount").setMaster("local");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
JavaRDD<String> javaRDD = sparkContext.textFile(args[0]);
// 切分压平
JavaRDD<String> stringJavaRDD = javaRDD.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
String[] words = s.split(" ");
List<String> stringList = Arrays.asList(words);
Iterator<String> stringIterator = stringList.iterator();
return stringIterator;
}
});
// 将每个单词和一 进行组装
JavaPairRDD<String, Integer> stringIntegerJavaPairRDD = stringJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
// 聚合 reduce
JavaPairRDD<String, Integer> reduceValue = stringIntegerJavaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
// 排序
// 因为 javaRdd只支持sortByKey
// 而现在是 ("dottie", 3) ("hello", 2) 要对value进行排序
// 所以 可以把key和value暂时对调一下.
JavaPairRDD<Integer, String> swapReduceValue = reduceValue.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return new Tuple2<Integer, String>(stringIntegerTuple2._2, stringIntegerTuple2._1);
}
});
// 排序, 默认是按升序排序, false表示不按升序排序, 按降序排序
JavaPairRDD<Integer, String> sortSwapReduceValue = swapReduceValue.sortByKey(false);
// 将key和value交换过来
JavaPairRDD<String, Integer> resultReduceValue = sortSwapReduceValue.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
return new Tuple2<String, Integer>(integerStringTuple2._2, integerStringTuple2._1);
}
});
// 将结果保存到文件
resultReduceValue.saveAsTextFile(args[1]);
// 释放资源
sparkContext.stop();
}
}
1 | 在IDEA中配置下 Edit Configuration |
用java8中lambda表达式简写. 很简单哦! (跟用scala基本没啥区别)
1 | import org.apache.spark.SparkConf; |
1 | 查看输出结果: |