BigData - Apache Spark
Apache Spark
basic
- 统一的分布式内存计算引擎
- 支持语言: Python Java Scala和R
库: SQL结构化查询到流计算、机器学习
- Apache Spark 是当今最流行的开源大数据处理框架。
- 和 MapReduce 一样,Spark 用于进行分布式、大规模的数据处理
- 但 Spark 作为 MapReduce 的接任者,提供了更高级的编程接口、更高的性能。
- 除此之外,Spark 不仅能进行常规的批处理计算,还提供了流式计算支持。
Apache Spark 诞生于大名鼎鼎的 AMPLab(这里还诞生过 Mesos 和 Alluxio)
- 设计目标是为各种大数据处理需求提供一个统一的技术栈。
如今 Spark 背后的商业公司 Databricks 创始人也是来自 AMPLab 的博士毕业生。
- Spark 本身使用 Scala 语言编写,Scala 是一门融合了面向对象与函数式的“双范式”语言,运行在 JVM 之上。
- Spark 大量使用了它的函数式、即时代码生成等特性。
- Spark 目前提供了 Java、Scala、Python、R 四种语言的 API,前两者因为同样运行在 JVM 上可以达到更原生的支持。
Spark
- 同时支持批处理和流计算的分布式计算系统。
- Spark 的所有计算均构建于
RDD
之上,RDD 通过算子
连接形成DAG
的执行计划 - RDD 的确定性及不可变性是 Spark 实现故障恢复的基础。
Spark Streaming 的
D-Stream
本质上也是将输入数据分成一个个micro-batch
的 RDD。- Spark SQL 是在 RDD 之上的一层封装
- 相比原始 RDD,DataFrame API 支持数据表的 schema 信息,从而可以执行 SQL 关系型查询,大幅降低了开发成本。
- Spark Structured Streaming 为 Spark SQL 提供了流计算支持,它将输入的数据流看作不断追加的数据行。
确定性: deterministic 持续的: continuous 弹性: Resilient 分布式: RDistributed
基本概念
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Cluster = Master + Master
Master = Worker Node + Worker Node + Worker Node
worker node = RDD
RDD = Partition + Partition + Partition
Driver: Application main() -> SparkContext
Job -> Task -> Executor
n Transformation + 1 Action = 1 Job
1 Job = Stage + Stage + Stage = n Stage
1 Stage = n Task
1 Job = Task + Task + Task = n Task
1 Task: 1 Partition in RDD
1 Job = Stage(RDDs) + Stage(RDDs) + Stage(RDDs) = n Stage(RDDs)
RDD = Partition + Partition + Partition
Application:
- 基于 Spark 的用户程序
- 即由用户编写的调用 Spark API 的应用程序
- User program built on Spark.
- Consists of a driver program and executors on the cluster.
- 其中应用程序的入口为用户所定义的 main 方法。
Application jar
- A jar containing the user’s Spark application.
- In some cases users will want to create an “uber jar” containing their application along with its dependencies.
- The user’s jar should never include Hadoop or Spark libraries, however, these will be added at runtime.
SparkContext:
- 是 Spark 所有功能的主要入口点
- 是用户逻辑与 Spark 集群主要的交互接口。
- 通过 SparkContext ,可以连接到集群管理器( Cluster Manager )
- 能够直接与集群 Master 节点进行交互,并能够向 Master 节点申请计算资源,
- 也能够将应用程序用到的 JAR 包或 Python 文件发送到多个执行器( Executor )节点上。
- SparkContext can connect to several types of
cluster managers
. - Once connected, Spark acquires
executors
on nodes in the cluster. - Next, it sends your application code to the
executors
. - Finally, SparkContext sends tasks to the executors to run.
Task:
- 由 SparkContext 发送到 Executor 节点上执行的一个工作单元。
- A unit of work that will be sent to one executor
Job
- A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g. save, collect);
- you’ll see this term used in the driver’s logs.
- 前面提到,RDD 支持两种类型的算子操作: Transformation 和 Action 。
- Spark 采用惰性机制,
- Transformation 算子的代码不会被立即执行
- 只有当遇到第一个 Action 算子时,会生成一个 Job ,并执行前面的一系列 Transformation 操作。
- 一个 Job 包含 N 个 Transformation 和 1 个 Action 。
- 而每个 Job 会分解成一系列可并行处理的 Task
- 然后将 Task 分发到不同的 Executor 上运行,这也是 Spark 分布式执行的简要流程。
Stage
- Each job gets divided into smaller sets of tasks called stages that depend on each other (similar to the map and reduce stages in MapReduce);
- you’ll see this term used in the driver’s logs.
Cluster Manager:
- 集群管理器
- 存在于 Master 进程中
- 主要用来对应用程序申请的资源进行管理。
- either Spark’s own standalone cluster manager, Mesos, YARN or Kubernetes
- allocate resources across applications.
- 负责集群的资源分配,Spark 自带的 Spark Master 支持任务的资源分配,并包含一个 Web UI 用来监控任务运行状况。
- 多个 Master 可以构成一主多备,通过 ZooKeeper 进行协调和故障恢复。
- 通常 Spark 集群使用 Spark Master 即可,但如果用户的集群中不仅有 Spark 框架、还要承担其他任务,官方推荐使用 Mesos 作为集群调度器。
Worker Node: 节点
- 负责执行计算任务,上面保存了 RDD 等数据。
- 任何能够在集群中能够运行 Spark 应用程序的节点。
- processes that run computations and store data for your application
Driver Program:
- 驱动器节点
- 即用户编写的程序,对应一个
SparkContext
- 负责任务的构造、调度、故障恢复等。
- 驱动程序可以
- 直接运行在客户端,例如用户的应用程序中;
- 也可以托管在 Master 上,这被称为集群模式(cluster mode),通常用于流计算等长期任务。
- The process running the main() function of the application and creating the SparkContext
- 运行 Application 中 main() 函数并创建 SparkContext 的进程。
- Driver 节点也负责提交 Job ,并将 Job 转化为 Task ,在各个 Executor 进程间协调 Task 的调度。
- Driver 节点可以不运行于集群节点机器上。
Executor:
- 执行器节点
- 是在一个在工作节点( Worker Node )上为 Application 启动的进程
- 能够运行 Task 并将数据保存在内存或磁盘存储中
- 也能够将结果数据返回给 Driver
- A process launched for an application on a worker node
- runs tasks and keeps data in memory or disk storage across them.
- Each application has its own executors.
Deploy mode
- Distinguishes where the driver process runs.
- In “cluster” mode, the framework launches the driver inside of the cluster.
- In “client” mode, the submitter launches the driver outside of the cluster.
Spark 程序在运行时的内部协调过程:
除了以上几个基本概念外,Spark 中还有几个比较重要的概念。
Spark产生的背景:大数据问题
处理器速度提升,可以在不改动代码的情况下,使应用程序自动的变快。
- 然而,2005年左右这种趋势停止了
- 由于散热方面的严格限制,硬件开发人员停止让单个处理器的速度更快,转而使用相同的速度增加更多的并行CPU内核
- 这种变化意味着需要修改应用程序以增加并行性,以便更快地运行,这为新的编程模型(如Apache Spark)创造了舞台
- 与此同时,存储成本也在下降,可以获得的数据量在增加
- 如随着互联网的发展,视频数据、图像数据随处
- 搜集了大数据量的内容,处理这些内容需要大的、并行度高的计算引擎,通常需要运行在集群之上。
- 此外,过去50年开发的软件不能自动伸缩,传统的数据处理程序的编程模型也不能满足新的编程模型的需求。
Spark的现在和未来
- Spark已经存在了许多年,仍然是当前最流行的大数据计算框架
- 使用Spark的公司和项目都在不断增加。Spark本身也在不断改进,新功能不断增加
例如,2016年引入了一种新的高级流处理引擎,即Structured Streaming结构化流处理。
- Spark将继续成为在可预见的未来进行大数据分析的公司的基石,尤其是考虑到该项目仍在快速发展。
- 任何需要解决大数据问题的数据科学家或工程师都可能需要在他们的机器上安装一个Spark。
相关概念
- 统一
- Spark关键驱动目标是为编写大数据应用程序提供一个统一的平台。
设计目的是支持广泛的数据分析任务,从简单的数据加载 —>SQL查询—>机器学习和流计算,这些都通过 相同的计算引擎 和 一致的api集合 实现。
现实世界的数据分析任务——无论它们是工具中的交互式分析,还是用于生产应用程序的传统软件开发——都倾向于结合许多不同的处理引擎类型和库。
- Spark的统一特性使得这些任务的编写更加简单和有效。
- Spark提供了一致的、可组合的api,可以使用这些api从较小的部分或现有的库中构建应用程序。
- 然而,可组合的api是不够的。Spark的api也被设计为通过优化在用户程序中组合的不同库和函数来是实现高性能。
- Spark优化api的组合
- 比如:使用SQL查询加载数据,然后使用Spark的ML库评机器学习模型,那么Spark计算引擎可以将这些步骤合并到一个 步骤中 ,来扫描数据。
- 在Spark之前,没有一个开源系统试图提供这种类型的统一引擎来进行并行数据处理,这意味着用户应用程序中需要整合多个api来完成一项任务。
- 因此,Spark很快成为了这种类型开发的标准。
- 随着时间的推移,Spark继续扩展其内置的api,以覆盖更多的工作任务 。
- 与此同时,该项目的开发人员继续完善其统一引擎的主题。
- 计算引擎
Spark在打造一个统一平台的同时,它小心地将其范围限制在计算引擎上。
Spark处理从存储系统加载数据并在其上执行计算,但最终数据并不永久存储在Spark中
Spark可以和多种存储系统结合使用:Kafka、HBase、Hive、HDFS以及关系型数据库。
Spark如此设计的原因是:大多数数据已经存在于现有的存储系统中,数据移动成本非常昂贵,所以Spark关注于对数据进行计算。
Spark对计算的关注使得它有别于早期的大数据软件平台
如Apache Hadoop,Hadoop包括一个存储系统(Hadoop文件系统HDFS)和一个紧密集成的计算引擎(MapReduce),Hadoop这种设计在某些场景下会出现难以决择的问题,如:如果只使用计算引擎MapReduce,而不使用HDFS,此时无法割裂两者,只能同时安装。
- 尽管Spark在Hadoop存储上运行得很好,但今天它在没有Hadoop的环境中也广泛使用。
- 如Spark+kafka联合起来,进行流处理。
- 函数库
- Spark的最终组件是它的库,它以统一引擎的设计为基础,为公共数据分析任务提供统一的API。
Spark既支持使用内置的标准库(主要部分),也支持由开源社区发布为第三方包的大量外部库。
Spark 核心计算引擎自发布以来几乎没有变化, 但是,函数库已经提供了越来越多的功能类型。
- Spark包括用于
- SQL和结构化数据的库(Spark SQL)、
- 机器学习(MLlib)、
- 流处理(Spark流和新的结构化流处理Structured Streaming)
和图形分析(GraphX)。
- 还有数百个开放源代码的外部库,从各种存储系统的连接器到机器学习算法。
- 在spark-packages.org上有一个外部库索引。
计算模型
Hadoop与Spark的区别
- Hadoop 是大数据处理领域的开创者。
- 严格来说,Hadoop 不只是一个软件,而是一整套生态系统
- 例如
MapReduce
负责进行分布式计算,而HDFS
负责存储大量文件。
- 例如
- MapReduce 模型的诞生是大数据处理从无到有的飞跃。
- 但随着技术的进步,对大数据处理的需求也变得越来越复杂,MapReduce 的问题也日渐凸显。
- 通常,我们将 MapReduce 的输入和输出数据保留在 HDFS 上
- 很多时候,复杂的 ETL、数据清洗等工作无法用一次 MapReduce 完成,所以需要将多个 MapReduce 过程连接起来
上图中只有两个 MapReduce 串联,实际上可能有几十个甚至更多,依赖关系也更复杂。
这种方式下,
- 每次中间结果都要写入 HDFS 落盘保存,代价很大,HDFS 的每份数据都需要冗余若干份拷贝
- 另外,由于本质上是多次 MapReduce 任务,调度也比较麻烦,实时性无从谈起。
何时使用 Apache Spark,何时使用 Apache Hadoop?
- 两者都是目前市场上最杰出的分布式系统,也是功能类似的 Apache 顶级项目,经常一起使用。
- Hadoop
- 采用 MapReduce 范式,主要用于大量磁盘操作。
- Spark
- 更灵活、通常费用更高的内存中处理架构。
- 解决问题的层面不一样
- 都是大数据框架,但是各自存在的目的不尽相同。
- Hadoop
- 实质上更多是一个分布式数据基础设施:
- 将巨大的数据集分派到一个由普通计算机组成的集群中的多个节点进行存储
- 意味着您不需要购买和维护昂贵的服务器硬件。
- 同时,Hadoop还会索引和跟踪这些数据,让大数据处理和分析效率达到前所未有的高度。 5.
- Spark
- 则是那么一个专门用来对那些分布式存储的大数据进行处理的工具,它并不会进行分布式数据的存储。
- 两者可合可分
- Hadoop除了提供HDFS分布式数据存储功能之外,还提供了叫做MapReduce的数据处理功能。
- 完全可以抛开Spark,使用Hadoop自身的MapReduce来完成数据的处理。
- MapReduce
- 数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。
- 现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。
- Spark也不是非要依附在Hadoop身上才能生存。
- 但毕竟它没有提供文件管理系统,所以必须和其他的分布式文件系统进行集成才能运作。
- 可以选择Hadoop的HDFS,也可以选择其他的基于云的数据系统平台。
- 但Spark默认来说还是被用在Hadoop上面的,毕竟它们的结合是最好的。
- Hadoop除了提供HDFS分布式数据存储功能之外,还提供了叫做MapReduce的数据处理功能。
- Spark数据处理速度秒杀MapReduce
- Spark因为其处理数据的方式不一样,会比MapReduce快上很多。
- Spark 基于 Hadoop MapReduce 算法实现的分布式计算
- 拥有 Hadoop MapReduce 所具有的优点,并且具有更高的运算速度。
- MapReduce
- 分步对数据进行处理的
- 从集群中读取数据,进行一次处理,将结果写到集群,从集群中读取更新后的数据,进行下一次的处理,将结果写到集群,等等
- 在一次 MapReduce 运算之后,会将数据的运算结果从内存写入到磁盘中第, 二次 MapReduce 运算时在从磁盘中读取数据
- 两次对磁盘的操作,增加了多余的 IO 消耗
- Spark
- 在 内存中 以接近“实时”的时间完成所有的数据分析
- 从集群中读取数据,完成所有必须的分析处理,将结果写回集群,完成
- 将数据一直缓存在内存中,运算时直接从内存读取数据,只有在必要时,才将部分数据写入到磁盘中。
- 使用 DAG(Directed Acyclic Graph,有向无环图)
调度程序、查询优化器和物理执行引擎
- 在处理批量处理以及处理流数据时具有较高的性能。
- 在 内存中 以接近“实时”的时间完成所有的数据分析
按照Spark 官网的说法,Spark 相对于 Hadoop 能够达到 100 倍以上的运行负载。
- Spark的批处理速度比MapReduce快近10倍,内存中的数据分析速度则快近100倍。
- 如果需要处理的数据和结果需求大部分情况下是静态的,且你也有耐心等待批处理的完成的话,MapReduce的处理方式也是完全可以接受的。
- 但如果需要对流数据进行分析,比如那些来自于工厂的传感器收集回来的数据,又或者说你的应用是需要多重数据处理的,那么你也许更应该使用Spark进行处理。
- 大部分机器学习算法都是需要多重数据处理的。
- 此外,通常会用到Spark的应用场景有以下方面:实时的市场活动,在线产品推荐,网络安全分析,机器日记监控等。
- Spark因为其处理数据的方式不一样,会比MapReduce快上很多。
- 灾难恢复
- 两者的灾难恢复方式迥异,但是都很不错。
- Hadoop
- 将每次处理后的数据都写入到磁盘上
- 其天生就能很有弹性的对系统错误进行处理。
- Spark
- Spark的数据对象 存储在分布于数据集群中 的
弹性分布式数据集(RDD: Resilient Distributed Dataset)
中 - 这些数据对象既可以放在内存,也可以放在磁盘
- 所以RDD同样也可以提供完成的灾难恢复功能
- Spark的数据对象 存储在分布于数据集群中 的
RDD - Resilient Distributed Datasets
如果能把中间结果保存在内存里,岂不是快的多?
- 之所以不能这么做,最大的障碍是:分布式系统必须能容忍一定的故障,fault-tolerance。
- 如果只是放在内存中,一旦某个计算节点宕机,其他节点无法恢复出丢失的数据,只能重启整个计算任务
- 这对于动辄成百上千节点的集群来说是不可接受的。
- 一般来说,想做到 fault-tolerance 只有两个方案:
- 要么存储到外部(例如 HDFS)
- 要么拷贝到多个副本
Spark 大胆地提出了第三种——重算一遍。
能做到这一点是依赖于一个额外的假设:所有计算过程都是确定性的(deterministic)。
Spark 借鉴了函数式编程思想,提出了 RDD(Resilient Distributed Datasets),译作“弹性分布式数据集”。
一个简单的例子,其中节点对应 RDD,边对应算子。
basic
- 一个只读的、分区的(partitioned)数据集合。
- RDD 要么来源于不可变的外部文件(例如 HDFS 上的文件),要么由确定的算子由其他 RDD 计算得到。
- RDD 通过算子连接构成有向无环图(DAG)
RDD 如何做到 fault-tolerance
- RDD 中的每个分区都能被确定性的计算出来
- 所以一旦某个分区丢失了,另一个计算节点可以从它的前继节点出发、用同样的计算过程重算一次,即可得到完全一样的 RDD 分区。
- 这个过程可以递归的进行下去。
RDD 分区的恢复。为了简洁并没有画出分区,实际上恢复是以分区为单位的。
弹性分布式数据集(Resilient Distributed Datasets)
- 一种容错的、可以被并行操作的元素集合,
- 是 Spark 中最重要的一个概念,是 Spark 对所有数据处理的一种基本抽象。
- Spark 中的计算过程可以简单抽象为对 RDD 的创建、转换和返回操作结果的过程:
Spark 的 RDD 计算抽象过程:
makeRDD:
- 可以通过访问外部物理存储(如 HDFS),通过调用
SparkContext.textFile()
方法来读取文件并创建一个 RDD, - 也可以对输入数据集合通过调用
SparkContext.parallelize()
方法来创建一个 RDD。 - RDD 被创建后不可被改变,只可以对 RDD 执行 Transformation 及 Action 操作。
Transformation(转换):
- 对已有的 RDD 中的数据执行计算进行转换,并产生新的 RDD,
- 在这个过程中有时会产生中间 RDD。
- Spark 对于 Transformation 采用惰性计算机制,
- 即在 Transformation 过程并不会立即计算结果,而是在 Action 才会执行计算过程。
- 如 map 、 filter 、 groupByKey、cache 等方法,只执行 Transformation 操作,而不计算结果。
Action(执行):
- 对已有的 RDD 中的数据执行计算产生结果,将结果返回 Driver 程序或写入到外部物理存储(如 HDFS)。
- 如 reduce 、 collect 、 count 、 saveAsTextFile 等方法,会对 RDD 中的数据执行计算。
算子
Spark 的编程接口和 Java 8 的 Stream 很相似:
- RDD 作为数据,在多种算子间变换,构成对执行计划 DAG 的描述。
- 最后,一旦遇到类似
collect()
这样的输出命令,执行计划会被发往 Spark 集群、开始计算。 - 不难发现,算子分成两类:
map()
、filter()
、join()
等算子称为 Transformation- 输入一个或多个 RDD,输出一个 RDD。
collect()
、count()
、save()
等算子称为 Action- 将数据收集起来返回;
例子: 收集包含“HDFS”关键字的错误日志时间戳。当执行到 collect() 时,右边的执行计划开始运行。
partition 分区
partition(分区) 是 Spark 中的重要概念
是 RDD 的最小单元
- RDD 是由分布在各个节点上的 partition 组成的。
- RDD 的数据由多个分区(partition)构成
- 这些分区可以分布在集群的各个机器上,这也就是 RDD 中 “distributed” 的含义。
- 熟悉 DBMS 的同学可以把 RDD 理解为逻辑执行计划,partition 理解为物理执行计划。
- partition 的数量决定了 task 的数量
- 每个 task 对应着一个 partition 。
- partition 的数量可以在创建 RDD 时指定
- 如果未指定 RDD 的 partition 大小,则在创建 RDD 时,Spark 将使用默认值
- 默认值为 spark.default.parallelism 配置的参数。
例如
- 使用 Spark 来读取本地文本文件内容
- 读取完后,这些内容将会被分成多个 partition
- 这些 partition 就组成了一个 RDD
- 同时这些 partition 可以分散到不同的机器上执行。
RDD 的 partition 描述:
Partition 数量影响及调整
Partition 数量的影响:
- 如果 partition 数量太少,则直接影响是计算资源不能被充分利用。
- 例如分配 8 个核,但 partition 数量为 4,则将有一半的核没有利用到。
- 如果 partition 数量太多,计算资源能够充分利用,但会导致 task 数量过多
- 而 task 数量过多会影响执行效率,主要是 task 在序列化和网络传输过程带来较大的时间开销。
- 根据 Spark RDD Programming Guide 上的建议,集群节点的每个核分配 2-4 个 partitions 比较合理。
Partition 调整:
1
2
3
4
5
6
7
8
9
10
def coalesce(self, numPartitions, shuffle=False):
# Return a new RDD that is reduced into \`numPartitions\` partitions.
def repartition(self, numPartitions):
# Return a new RDD that has exactly numPartitions partitions.
# Can increase or decrease the level of parallelism in this RDD.
# Internally, this uses a shuffle to redistribute data.
# If you are decreasing the number of partitions in this RDD, consider
# using \`coalesce\`, which can avoid performing a shuffle.
return self.coalesce(numPartitions, shuffle=True)
Spark 中主要有两种调整 partition 的方法:coalesce、repartition
- reparation 是直接调用 coalesce(numPartitions, shuffle=True) ,
- 不同的是, reparation 函数可以增加或减少 partition 数量
- 调用 repartition 函数时,还会产生 shuffle 操作。
- 而 coalesce 函数可以控制是否 shuffle
- 但当 shuffle 为 False 时,只能减小 partition 数,而无法增大。
Dependency 依赖关系
此外,RDD 还包含它的每个分区的依赖关系(dependency),以及一个函数指出如何计算出本分区的数据。
Spark 中 RDD 的每一次 Transformation 都会生成一个新的 RDD
- 这样 RDD 之间就会形成类似于流水线一样的前后依赖关系
- 在 Spark 中,依赖关系被定义为两种类型,分别是窄依赖和宽依赖:
窄依赖(Narrow Dependency)
- 生产的每个分区只依赖父 RDD 中的一个分区。
- 每个父 RDD 的一个分区最多被子 RDD 的一个分区所使用
- 即 RDD 之间是一对一的关系。
- 窄依赖的情况下,如果下一个 RDD 执行时,某个分区执行失败(数据丢失),只需要重新执行父 RDD 的对应分区即可进行数恢复。
- 例如
map 、 filter 、 union
等算子都会产生窄依赖。
和宽依赖(Wide Dependency)
groupByKey()
等算子构成宽依赖- 生成的每个分区依赖父 RDD 中的多个分区(往往是全部分区)
- 是指一个父 RDD 的分区会被子 RDD 的多个分区所使用
- 即 RDD 之间是一对多的关系。
- 当遇到宽依赖操作时,数据会产生 Shuffle ,所以也称之为 ShuffleDependency 。
- 宽依赖情况下,如果下一个 RDD 执行时,某个分区执行失败(数据丢失),则需要将父 RDD 的所有分区全部重新执行才能进行数据恢复。
- 例如 groupByKey 、 reduceByKey 、 sortByKey 等操作都会产生宽依赖。
左图展示了宽依赖和窄依赖,其中 Join 算子因为 Join key 分区情况不同二者皆有; 右图展示了执行过程,由于宽依赖的存在,执行计划被分成 3 个阶段。
在执行时,窄依赖可以很容易的按流水线(pipeline)的方式计算
- 对于每个分区从前到后依次代入各个算子即可。
- 然而,宽依赖需要等待前继 RDD 中所有分区计算完成;换句话说,宽依赖就像一个栅栏(barrier)会阻塞到之前的所有计算完成。
- 整个计算过程被宽依赖分割成多个阶段(stage)
宽依赖本质上就是一个 MapReduce 过程。
- 但是相比 MapReduce 自己写 Map 和 Reduce 函数的编程接口,Spark 的接口要容易的多;
- 并且在 Spark 中,多个阶段的 MapReduce 只需要构造一个 DAG 即可。
Job
Job
- A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g. save, collect);
- you’ll see this term used in the driver’s logs.
- 前面提到,RDD 支持两种类型的算子操作: Transformation 和 Action 。
- Spark 采用惰性机制,
- Transformation 算子的代码不会被立即执行
- 只有当遇到第一个 Action 算子时,会生成一个 Job ,并执行前面的一系列 Transformation 操作。
- 一个 Job 包含 N 个 Transformation 和 1 个 Action 。
- 而每个 Job 会分解成一系列可并行处理的 Task
- 然后将 Task 分发到不同的 Executor 上运行,这也是 Spark 分布式执行的简要流程。
Stage
Stage
- Each job gets divided into smaller sets of tasks called stages that depend on each other (similar to the map and reduce stages in MapReduce);
you’ll see this term used in the driver’s logs.
- Spark 在对 Job 中的所有操作划分 Stage 时,一般会按照倒序进行,依据 RDD 之间的依赖关系(宽依赖或窄依赖)进行划分。
- 即从 Action 开始
- 当遇到窄依赖类型的操作时,则划分到同一个执行阶段;
- 遇到宽依赖操作,则划分一个新的执行阶段
- 新的阶段为之前阶段的 Parent ,之前的阶段称作 Child Stage ,然后依次类推递归执行。
- Child Stage 需要等待所有的 Parent Stage 执行完之后才可以执行,这时 Stage 之间根据依赖关系构成了一个大粒度的 DAG。
DAG Stage 划分示意图:
上图为一个 Job,该 Job 生成的 DAG 划分成了 3 个 Stage。
- 上图的 Stage 划分过程是这样的:
- 从最后的 Action 开始,从后往前推,当遇到操作为 NarrowDependency 时,则将该操作划分为同一个 Stage ,
- 当遇到操作为 ShuffleDependency 时,则将该操作划分为新的一个 Stage 。
Task
Task:
- 由 SparkContext 发送到 Executor 节点上执行的一个工作单元。
- A unit of work that will be sent to one executor
- 为一个 Stage 中的一个执行单元
- 是 Spark 中的最小执行单元
- 一般来说,一个 RDD 有多少个 Partition ,就会有多少个 Task
- 因为每一个 Task 只是处理一个 Partition 上的数据。
- 在一个 Stage 内,所有的 RDD 操作以串行的 Pipeline 方式,由一组并发的 Task 完成计算,这些 Task 的执行逻辑完全相同,只是作用于不同的 Partition 。
- 每个 Stage 里面 Task 的数目由该 Stage 最后一个 RDD 的 Partition 个数决定。
Spark 中 Task 分为两种类型,ShuffleMapTask 和 ResultTask
- 位于最后一个 Stage 的 Task 为 ResultTask,其他阶段的属于 ShuffleMapTask。
- ShuffleMapTask 和 ResultTask 分别类似于 Hadoop 中的 Map 和 Reduce。
Spark生态系统
Spark 除了 Spark Core 外,还有其它由多个组件组成,
- 目前主要有四个组件:Spark SQL、Spark Streaming、MLlib、GraphX。
- 这四个组件加上 Spark Core 组成了 Spark 的生态。
- 通常,我们在编写一个 Spark 应用程序,需要用到 Spark Core 和其余 4 个组件中的至少一个。
Spark 的整体构架图
- Spark Core:
- Spark 的核心
- 主要负责任务调度等管理功能。
- Spark Core 的实现依赖于 RDDs(Resilient Distributed Datasets, 弹性分布式数据集)的程序抽象概念。
- 通用的分布式数据处理引擎。
- 在其上有 SQL、流式处理、机器学习和图计算的库,所有这些库都可以在应用中一起使用。
- Spark Core 是整个项目的基础,提供分布式任务调度、安排和基本的 I/O 功能。
- Spark SQL:
- Spark 处理结构化数据的模块
- 该模块旨在将 SQL 数据库查询与更复杂的基于算法的分析相结合,
- Spark SQL 支持开源 Hive 项目及其类似 SQL 的 HiveQL 查询语法。
- Spark SQL 还支持 JDBC 和 ODBC 连接,能够直接连接现有的数据库。
- 支持访问各种数据源的通用方法。
- 它支持您使用 SQL 或熟悉的 DataFrame API 在 Spark 程序中查询结构化数据。
- Spark SQL 支持 HiveQL 语法,并允许访问现有的 Apache Hive 仓库。
- 服务器模式通过 Java 数据库连接或开放数据库连接提供标准连接。
- Spark Streaming:
- 模块主要是对流数据的处理
- 支持流数据的可伸缩和容错处理
- 可以与 Flume(针对数据日志进行优化的一个系统)和 Kafka(针对分布式消息传递进行优化的流处理平台)等已建立的数据源集成。
- Spark Streaming 的实现,也使用 RDD 抽象的概念,使得在为流数据(如批量历史日志数据)编写应用程序时,能够更灵活,也更容易实现。
- MLlib:
- 主要用于机器学习领域
- 实现了一系列常用的机器学习和统计算法,如分类、回归、聚类、主成分分析等算法。
- 它还包含工作流和其他实用程序,包括特征转换、机器学习流水线构造、模型评估、分布式线性代数和统计信息。
- GraphX:
- 模块主要支持数据图的分析和计算
- 并支持图形处理的 Pregel API 版本。
- GraphX 包含了许多被广泛理解的图形算法,如 PageRank。
- 用于图和图并行计算的 Spark API。它非常灵活,可以无缝处理图和集合 - 统一提取、转换、加载;进行探索性分析;在一个系统中迭代图计算。
- 除了灵活性较高的 API 外,GraphX 还提供了多种图算法。
- 它在性能上比肩最快的图系统,同时保留了 Spark 的灵活性、容错性和易用性。
Spark 运行模式
Cluster Manager Types:
Spark 有多种运行模式
- 本地运行模式(Local 模式)
- 最简单的一种模式,也可称作伪分布式模式
- 独立运行模式(Standalone 模式)
- Spark 自带的一种集群管理模式
- 相比较 Mesos 及 YARN 两种模式而言,独立运行模式是最简单,也最容易部署的一种集群运行模式。
- Mesos、YARN(Yet Another Resource Negotiator)
- 比较常用的集群管理模式。
- Kubernetes 模式
- 用于自动化部署、扩展和管理容器化应用程序的开源系统。
Spark 底层还支持多种数据源,能够从其它文件系统读取数据
- 如 HDFS、Amazon S3、Hypertable、HBase 等。
- Spark 对这些文件系统的支持,同时也丰富了整个 Spark 生态的运行环境。
Spark 部署模式
Spark 支持多种分布式部署模式,主要支持三种部署模式,分别是: Standalone 、 Spark on YARN 和 Spark on Mesos 模式。
- Standalone 模式
- Spark 自带的一种集群管理模式,即独立模式
- 自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统。
- 它是 Spark 实现的资源调度框架,其主要的节点有
Driver 节点
、Master 节点
和Worker 节点
。 - Standalone 模式也是最简单最容易部署的一种模式。
- Spark on YARN 模式
- Spark 运行在 Hadoop YARN 框架之上的一种模式。
- Hadoop YARN (Yet Another Resource Negotiator 另一种资源协调者)
- 一种新的 Hadoop 资源管理器
- 一个通用资源管理系统,可为上层应用提供统一的资源管理和调度。
- Spark 运行在 Hadoop YARN 框架之上的一种模式。
- Spark on Mesos 模式
- Spark 运行在 Apache Mesos 框架之上的一种模式。
- Apache Mesos
- 一个更强大的分布式资源管理框架,负责集群资源的分配
- 它允许多种不同的框架部署在其上,包括 YARN 。
- 它被称为是分布式系统的内核。
- Spark 运行在 Apache Mesos 框架之上的一种模式。
三种架构都采用了 Master / Worker ( Slave )的架构,
Spark 分布式运行架构大致如下:
Spark 调度原理
Spark 集群整体运行架构
Spark 集群 分为 Master 节点和 Worker 节点
- 相当于 Hadoop 的 Master 和 Slave 节点。
- Master 节点上常驻 Master 守护进程,负责管理全部的 Worker 节点。
- Worker 节点上常驻 Worker 守护进程,负责与 Master 节点通信并管理 Executors。
Worker Node 节点
- 负责执行计算任务,上面保存了 RDD 等数据。
- 任何能够在集群中能够运行 Spark 应用程序的节点。
- processes that run computations and store data for your application
Driver Program:驱动程序
- 驱动器节点
- 即用户编写的程序,对应一个
SparkContext
- Driver 为用户编写的 Spark 应用程序所运行的进程。
负责任务的构造、调度、故障恢复等。
- Driver 程序:
- 可以运行在 Master 节点上,也可运行在 Worker 节点上,还可运行在非 Spark 集群的节点上。
- 直接运行在客户端,例如用户的应用程序中;
- 也可以托管在 Master 上,这被称为集群模式(cluster mode),通常用于流计算等长期任务。
- 可以不运行于集群节点机器上。
- The process running the main() function of the application and creating the SparkContext
- 运行 Application 中 main() 函数并创建 SparkContext 的进程。
- Driver 节点也负责提交 Job ,并将 Job 转化为 Task ,在各个 Executor 进程间协调 Task 的调度。
Spark 调度器
Spark 中主要有两种调度器:DAGScheduler 和 TaskScheduler,
- DAGScheduler
- 主要是把一个 Job 根据 RDD 间的依赖关系,划分为多个 Stage,
- 对于划分后的 每个 Stage 都抽象为一个由多个 Task 组成的任务集(TaskSet),并交给 TaskScheduler 来进行进一步的任务调度。
- TaskScheduler
- 负责对每个具体的 Task 进行调度。
DAGScheduler
- 当创建一个 RDD 时,每个 RDD 中包含一个或多个分区
- 当执行 Action 操作时,相应的产生一个 Job
而一个 Job 会根据 RDD 间的依赖关系分解为多个 Stage,每个 Stage 由多个 Task 组成(即 TaskSet),每个 Task 处理 RDD 中的一个 Partition。
- 一个 Stage 里面所有分区的任务集合被包装为一个 TaskSet 交给 TaskScheduler 来进行任务调度。这个过程由是由 DAGScheduler 来完成的。
DAGScheduler 对 RDD 的调度过程:
TaskScheduler
- DAGScheduler 将一个 TaskSet 交给 TaskScheduler
- TaskScheduler 会为每个 TaskSet 进行任务调度
- Spark 中的任务调度分为两种:FIFO(先进先出)调度和 FAIR(公平调度)调度。
FIFO 调度:
- 即谁先提交谁先执行,后面的任务需要等待前面的任务执行。
- 这是 Spark 的默认的调度模式。
FAIR 调度:
- 支持将作业分组到池中,并为每个池设置不同的调度权重
- 任务可以按照权重来决定执行顺序。
在 Spark 中使用哪种调度器可通过配置 spark.scheduler.mode 参数来设置,可选的参数有 FAIR 和 FIFO,默认是 FIFO。
FIFO 调度算法为 FIFOSchedulingAlgorithm
- 该算法的 comparator 方法的 Scala 源代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
override def comparator(s1: Schedulable, s2: Schedulable):
Boolean = {
val priority1 = s1.priority // priority实际为Job ID
val priority2 = s2.priority
var res = math.signum(priority1 - priority2)
if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
res < 0
}
根据以上代码,FIFO 调度算法实现的是:
- 对于两个调度任务 s1 和 s2,
- 首先比较两个任务的优先级(Job ID)大小,
- 如果 priority1 比 priority2 小,那么返回 true,表示 s1 的优先级比 s2 的高。
- 由于 Job ID 是顺序生成的,先生成的 Job ID 比较小,所以先提交的 Job 肯定比后提交的 Job 优先级高,也即先提交的 Job 会被先执行。
- 如果 s1 和 s2 的 priority 相同,表示为同一个 Job 的不同 Stage,则比较 Stage ID,Stage ID 小则优先级高。
FAIR 调度算法为 FairSchedulingAlgorithm
,该算法的 comparator 方法的 Scala 源代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
override def comparator(s1: Schedulable, s2: Schedulable):
Boolean = {
val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var compare = 0
if (s1Needy && !s2Needy) return true
else if (!s1Needy && s2Needy) return false
else if (s1Needy && s2Needy) compare = minShareRatio1.compareTo(minShareRatio2)
else compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
if (compare < 0) true
else if (compare > 0) false
else s1.name < s2.name
}
- 由以上代码可以看到,FAIR 任务调度主要由两个因子来控制(关于 FAIR 调度的配置,可参考 ${SPARK_HOME}/conf/fairscheduler.xml.template 文件):
- weight:
- 相对于其它池,它控制池在集群中的份额。
- 默认情况下,所有池的权值为 1。
- 例如,如果给定一个特定池的权重为 2,它将获得比其它池多两倍的资源。
- 设置高权重(比如 1000)也可以实现池与池之间的优先级。
- 如果设置为-1000,则该调度池一有任务就会马上运行。
- minShare:
- 最小 CPU 核心数,默认是 0,
- 它能确保池总是能够快速地获得一定数量的资源(例如 10 个核),
- 在权重相同的情况下,minShare 越大,可以获得更多的资源。
- weight:
对以上代码的理解:
- 如果 s1 所在的任务池正在运行的任务数量比 minShare 小,而 s2 所在的任务池正在运行的任务数量比 minShare 大,那么 s1 会优先调度。反之,s2 优先调度。
- 如果 s1 和 s2 所在的任务池正在运行的 task 数量都比各自 minShare 小,那么 minShareRatio 小的优先被调度。
- 如果 s1 和 s2 所在的任务池正在运行的 task 数量都比各自 minShare 大,那么 taskToWeightRatio 小的优先被调度。
- 如果 minShareRatio 或 taskToWeightRatio 相同,那么最后比较各自 Pool 的名字。
Spark RDD 调度过程
- Spark 对 RDD 执行调度的过程,
- 创建 RDD 并生成 DAG,由 DAGScheduler 分解 DAG 为包含多个 Task(即 TaskSet)的 Stages,
- 再将 TaskSet 发送至 TaskScheduler
- 由 TaskScheduler 来调度每个 Task,并分配到 Worker 节点上执行,最后得到计算结果。
Spark RDD 常用函数
Transformation
Action
声明式接口:Spark-SQL
Spark 诞生后,大幅简化了 MapReduce 编程模型,但人们并不满足于此。
与命令式(imperative)编程相对的是声明式(declarative)编程
- 前者需要告诉程序怎样得到我需要的结果
- 后者则是告诉程序我需要的结果是什么。
- 举例而言:你想知道,各个部门
<dept_id, dept_name>;
中性别为女'female'
的员工分别有多少?
命令式编程中,你需要编写一个程序
。
1
2
3
4
5
6
7
8
9
10
11
12
13
employees = db.getAllEmployees()
countByDept = dict() // 统计各部门女生人数 (dept_id ->; count)
for employee in employees:
if (employee.gender == 'female')
countByDept[employee.dept_id] += 1
results = list() // 加上 dept.name 列
depts = db.getAllDepartments()
for dept in depts:
if (countByDept containsKey dept.id)
results.add(row(dept.id, dept.name, countByDept[dept.id]))
return results;
声明式编程中,只要用关系代数的运算
表达出结果:
- 声明式的要简洁的多
- 但声明式编程依赖于执行者产生真正的程序代码
- 所以除了上面这段程序,还需要把
数据模型(schema)
一并告知执行者。
1
2
3
4
5
6
7
8
9
10
11
12
13
employees.join(dept, employees.deptId == dept.id)
.where(employees.gender == 'female')
.groupBy(dept.id, dept.name)
.agg()
-- 等价地 也可以写成这样:
SELECT dept.id, dept.name, COUNT(\*)
FROM employees
JOIN dept ON employees.dept_id == dept.id
WHERE employees.gender = 'female'
GROUP BY dept.id, dept.name
声明式编程最广为人知的形式就是 SQL。
Spark SQL 就是这样一个基于 SQL 的声明式编程接口。
你可以将它看作在 Spark 之上的一层封装,在 RDD 计算模型的基础上,提供了 DataFrame API 以及一个内置的 SQL 执行计划优化器 Catalyst。
上图黄色部分是 Spark SQL 中新增的部分。
DataFrame
- 就像数据库中的表,保存了
数据
+数据的 schema 信息
。 - 计算中,schema 信息也会经过算子进行相应的变换。
- DataFrame 的数据是行(row)对象组成的 RDD,对 DataFrame 的操作最终会变成对底层 RDD 的操作。
- 就像数据库中的表,保存了
Catalyst
- 一个内置的 SQL 优化器
- 负责把用户输入的 SQL 转化成执行计划。
- Catelyst 强大之处是它利用了 Scala 提供的代码生成(codegen)机制
- 物理执行计划经过编译,产出的执行代码效率很高,和直接操作 RDD 的命令式代码几乎没有分别。
上图是 Catalyst 的工作流程 与大多数 SQL 优化器一样是一个 Cost-Based Optimizer (CBO) 但最后使用代码生成(codegen)转化成直接对 RDD 的操作。
流计算框架:Spark-Streaming
以往,批处理和流计算被看作大数据系统的两个方面。
- 我们常常能看到这样的架构——以 Kafka、Storm 为代表的流计算框架用于实时计算
- 而 Spark 或 MapReduce 则负责每天、每小时的数据批处理。
- 在 ETL 等场合,这样的设计常常导致同样的计算逻辑被实现两次,耗费人力不说,保证一致性也是个问题。
Spark Streaming 正是诞生于此类需求。
- 传统的流计算框架大多注重于低延迟,采用了持续的(continuous)算子模型;
- 而 Spark Streaming 基于 Spark,提出了
D-Stream(Discretized Streams)
方案
D-Stream(Discretized Streams)
- 将流数据切成很小的批(micro-batch),用一系列的短暂、无状态、确定性的批处理实现流处理。
Spark Streaming 的做法在流计算框架中很有创新性
- 它牺牲了 低延迟
- 一般流计算能做到 100ms 级别,Spark Streaming 延迟一般为 1s 左右
- 但是带来了三个优势:
- 更高的吞吐量
- 大约是 Storm 的 2-5 倍
- 更快速的失败恢复
- 通常只要 1-2s
- 对于
straggler
(性能拖后腿的节点)直接杀掉即可
- 开发者只需要维护一套 ETL 逻辑
- 即可同时用于批处理和流计算
- 更高的吞吐量
左图中,为了在持续算子模型的流计算系统中保证一致性,不得不在主备机之间使用同步机制,导致性能损失 右图是 D-Stream 的原理示意图, Spark Streaming 完全没有这个问题
流计算中的状态一直是个难题
- D-Stream 方案是无状态的
- 那诸如 word count 之类的问题,怎么做到保持 count 算子的状态呢?
- 答案是通过 RDD:
- 将前一个时间步的 RDD 作为当前时间步的 RDD 的前继节点,就能造成状态不断更替的效果。
- 实际上,新的 State RDD 总是不断生成,而旧的 RDD 并不会被“替代”,而是作为新 RDD 的前继依赖。
- 对于底层的 Spark 框架来说,并没有时间步的概念,有的只是不断扩张的 DAG 图和新的 RDD 节点。
流式计算 word count 的例子,count 结果在不同时间步中不断累积。
另一个问题: 恢复过程
- 随着时间的推进,上图中的 State RDD
counts
会越来越多,他的祖先(lineage)变得越来越长 - 极端情况下,恢复过程可能溯源到很久之前。这是不可接受的
- solution: lineage cut
- Spark Streming 会定期地对 State RDD 做 checkpoint,将其持久化到 HDFS 等存储中
- 在它之前更早的 RDD 就可以没有顾虑地清理掉了。
流行的几个开源流计算框架的对比,可以参考文章 [Comparison of Apache Stream Processing Frameworks]
流计算与-SQL:Spark-Structured-Streaming
Spark 通过 Spark Streaming 拥有了流计算能力
- 那 Spark SQL 是否也能具有类似的流处理能力呢?
- 答案是肯定的,只要将数据流建模成一张不断增长、没有边界的表,在这样的语义之下,很多 SQL 操作等就能直接应用在流数据上。
Spark Structured Streaming 的流式计算引擎 并没有复用 Spark Streaming,而是在 Spark SQL 上设计了新的一套引擎。
- 因此从 Spark SQL 迁移到 Spark Structured Streaming 十分容易
但从 Spark Streaming 迁移过来就要困难得多。
Spark SQL 中的大部分
接口、实现
都得以在 Spark Structured Streaming 中直接复用。- 将用户的 SQL 执行计划 转化成 流计算执行计划 的过程被称为 增量化(incrementalize)
- 这一步是由 Spark 框架自动完成的。
- 对于用户来说只要知道:每次计算的输入是某一小段时间的流数据,而输出是对应数据产生的计算结果。
左图是 Spark Structured Streaming 模型示意图; 右图展示了同一个任务的批处理、流计算版本,可以看到,除了输入输出不同,内部计算过程完全相同。
与 Spark SQL 相比,流式 SQL 计算还有两个额外的特性,分别是窗口(window)和水位(watermark)。
窗口(window)
- 是对过去某段时间的定义。
- 批处理中,查询通常是全量的(例如:总用户量是多少);
- 流计算中,我们通常关心近期一段时间的数据(例如:最近24小时新增的用户量是多少)。
- 用户通过选用合适的窗口来获得自己所需的计算结果,常见的窗口有滑动窗口(Sliding Window)、滚动窗口(Tumbling Window)等。
水位(watermark)
- 用来丢弃过早的数据。
- 在流计算中,上游的输入事件可能存在不确定的延迟,而流计算系统的内存是有限的、只能保存有限的状态,一定时间之后必须丢弃历史数据。
- 以双流 A JOIN B 为例,假设窗口为 1 小时
- 那么 A 中比当前时间减 1 小时更早的数据(行)会被丢弃;如果 B 中出现 1 小时前的事件,因为无法处理只能忽略。
上图为水位的示意图,“迟到”太久的数据(行)由于已经低于当前水位无法处理,将被忽略。
水位和窗口的概念都是因时间而来。在其他流计算系统中,也存在相同或类似的概念。
关于 SQL 的流计算模型,常常被拿来对比的还有另一个流计算框架 Apache Flink。 与 Spark 相比,它们的实现思路有很大不同,但在模型上是很相似的。
运行Spark
- 可以使用Python、Java、Scala、R或SQL与Spark进行交互。
- Spark本身是用Scala编写的,并在Java虚拟机(JVM)上运行,因此在笔记本或集群上运行Spark,所需要的只是安装Java环境。
- 如果想要使用Python API,还需要一个Python解释器(版本2.7或更高版本)。
- 如果想使用R,需要在机器上安装R语言环境。
有两种选择,我们建议开始使用Spark: 在笔记本电脑上下载并安装Apache Spark。或者在Databricks Community Edition中运行基于web的版本,这是一个学习Spark的免费云环境,其中包含了本书中的代码。我们接下来解释这两个选项。
- 下载spark到本地
1
$ pip install pyspark.
- 启动 Spark交互式控制台
您可以在Spark中为几种不同的编程语言启动交互式shell。
1
2
3
4
5
./bin/pyspark
./bin/spark-shell
./bin/spark-sql
Deploy
standalone
Spark’s primary abstraction is a distributed collection of items called a Dataset
.
- Datasets can be created from Hadoop InputFormats (such as HDFS files) or by transforming other Datasets.
- Due to Python’s dynamic nature, we don’t need the Dataset to be strongly-typed in Python.
- As a result, all Datasets in Python are Dataset[Row], and we call it
DataFrame
to be consistent with the data frame concept in Pandas and R. - Let’s make a new DataFrame from the text of the README file in the Spark source directory:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
./bin/pyspark
# Or if PySpark is installed with pip in your current environment:
pyspark
>>> textFile = spark.read.text("README.md")
# You can get values from DataFrame directly, by calling some actions, or transform the DataFrame to get a new one.
>>> textFile.count() # Number of rows in this DataFrame
126
>>> textFile.first() # First row in this DataFrame
Row(value=u'# Apache Spark')
# transform this DataFrame to a new one.
# call filter to return a new DataFrame with a subset of the lines in the file.
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
# chain together transformations and actions:
>>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"?
15
Dataset Operations Dataset actions and transformations can be used for more complex computations.
to find the line with the most words:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
>>> from pyspark.sql.functions import *
>>> textFile.select( size( split(textFile.value, "\s+") ).name("numWords") ).agg( max( col("numWords") ) ).collect()
[Row(max(numWords)=15)]
# This first maps a line to an integer value and aliases it as “numWords”, creating a new DataFrame.
# agg is called on that DataFrame to find the largest word count.
# The arguments to select and agg are both Column, we can use df.colName to get a column from a DataFrame. We can also import pyspark.sql.functions, which provides a lot of convenient functions to build a new Column from an old one.
# One common data flow pattern is MapReduce, as popularized by Hadoop.
# Spark can implement MapReduce flows easily:
>>> wordCounts = textFile.select( explode( split(textFile.value, "\s+") ).alias("word") ).groupBy("word").count()
# use the explode function in select, to transform a Dataset of lines to a Dataset of words, and then combine groupBy and count to compute the per-word counts in the file as a DataFrame of 2 columns: “word” and “count”.
# To collect the word counts in our shell, we can call collect:
>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
Caching
- Spark also supports pulling data sets into a cluster-wide in-memory cache.
- This is very useful when data is accessed repeatedly,
- such as when querying a small “hot” dataset or when running an iterative algorithm like PageRank.
- As a simple example, let’s mark our linesWithSpark dataset to be cached:
1
2
3
4
5
6
7
>>> linesWithSpark.cache()
>>> linesWithSpark.count()
15
>>> linesWithSpark.count()
15
It may seem silly to use Spark to explore and cache a 100-line text file.
- The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes.
- You can also do this interactively by connecting bin/pyspark to a cluster, as described in the RDD programming guide.
Self-Contained Applications
This example will use Maven to compile an application JAR, but any similar build system will work.
- create a very simple Spark application, SimpleApp.java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
Dataset<String> logData = spark.read().textFile(logFile).cache();
long numAs = logData.filter(s -> s.contains("a")).count();
long numBs = logData.filter(s -> s.contains("b")).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
spark.stop();
}
}
This program just counts the number of lines containing ‘a’ and the number containing ‘b’ in the Spark README.
- Note that you’ll need to replace YOUR_SPARK_HOME with the location where Spark is installed.
- Unlike the earlier examples with the Spark shell, which initializes its own SparkSession, we initialize a SparkSession as part of the program.
To build the program, we also write a Maven pom.xml file that lists Spark as a dependency.
- Note that Spark artifacts are tagged with a Scala version.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!-- pom.xml -->
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.2.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
lay out these files according to the canonical Maven directory structure:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java
# Now, package the application using Maven and execute it with ./bin/spark-submit.
# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23
# Other dependency management tools such as Conda and pip can be also used for custom classes or third-party libraries.
CentOS7 环境准备
本文主要介绍 Spark 的 Standalone 模式的部署。
将 Spark 部署在安装有 CentOS7 系统的 VirtualBox 虚拟机中。
搭建 Spark 集群,需要准备以下文件及环境:
- jdk-8u211-linux-x64.tar.gz
- spark-2.4.3-bin-hadoop2.7.tgz
- 3 个独立的 CentOS7 虚拟机系统,机器集群规划如下:
安装
master
配置环境
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 配置 jdk 环境
# 解压文件:
tar -zxf jdk-8u211-linux-x64.tar.gz
# 配置环境变量:
export JAVA_HOME=/path/to/jdk1.8.0_211
export PATH=$PATH:$JAVA_HOME/bin
# 配置 Spark 环境
tar -xf spark-2.4.3-bin-hadoop2.7.tgz
export SPARK_HOME=/path/to/spark-2.4.3-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin
# 修改 spark-env.sh 文件
cd spark-2.4.3-bin-hadoop2.7
cp conf/spark-env.sh.template conf/spark-env.sh
vim conf/spark-env.sh
# 增加如下内容:
export JAVA_HOME=/path/to/jdk1.8.0_211
export SPARK_MASTER_HOST=192.168.56.106
# 修改 **slaves** 文件
cp conf/slaves.template conf/slaves
vim conf/slaves
# 增加如下内容:
192.168.56.106
192.168.56.107
192.168.56.108
配置 ssh 免密登录
- 配置 ssh 免密登录,是为了能够在 master 机器上来启动所有 worker 节点
- 如果不配置免密登录,则在启动每个 worker 时,都需要输入一遍密码,会很麻烦。
- 当然,如果机器少的话,也可以登录到 worker 节点上,手动一个一个启动 worker 。
1
2
3
4
5
6
7
8
9
10
11
ssh-keygen -t rsa
# - 在用户目录下会自动生成 **.ssh** + 两个文件:
# - id_rsa 生成的私钥文件
# - id_rsa.pub 生成的公钥文件
# 将 **id_rsa.pub** 复制到其它机器上,执行以下几条命令:
ssh-copy-id -i ~/.ssh/id_rsa.pub royran@192.168.56.106 # master所在的主机,如果master不做woker可以不需要。
ssh-copy-id -i ~/.ssh/id_rsa.pub royran@192.168.56.107
ssh-copy-id -i ~/.ssh/id_rsa.pub royran@192.168.56.108
worker 节点
当前已在 master 节点配置好了环境,还需要在其它 worker 节点上配置相类似的环境。
- 要将 jdk1.8.0_211 及 spark-2.4.3-bin-hadoop2.7 两个目录复制到其它 worker 节点机器上即可。
- 但要注意,这两个目录在其它 worker 上的绝对路径需要与 master 上的绝对路径一致,不然无法直接在 master 上启动其它 worker 节点。
- 依次执行以下命令
- 如果已经配置好 ssh 免密,可以发现执行 scp 指令不需要两次输入密码
1
2
3
4
scp -r /path/to/jdk1.8.0_211username@192.168.56.107:/path/to/jdk1.8.0_211
scp -r /path/to/jdk1.8.0_211username@192.168.56.108:/path/to/jdk1.8.0_211
scp -r /path/to/spark-2.4.3-bin-hadoop2.7username@192.168.56.107:/path/to/spark-2.4.3-bin-hadoop2.7
scp -r /path/to/spark-2.4.3-bin-hadoop2.7username@192.168.56.108:/path/to/spark-2.4.3-bin-hadoop2.7
启动 master
1
2
3
4
5
sbin/start-master.sh
jps
输入 jps 指令( 该指令在 $JAVA_HOME/bin
目录下)
- 可以查看 java 进程名,如输入 jps 后,会显示这样的信息
- 看到有 Master 字样,说明 master 进程启动成功了
- 启动 master 后,spark 默认会监听 8080 端口,并可以通过浏览器打开 web 界面
- 在地址栏输入 https://192.168.56.106:8080 ,查看集群状态。
- 当前只启动了 master ,所以看不到任何 worker 信息。
启动 worker 节点
1
2
3
4
5
sbin/slaves.sh
jps
会看到类似这样的输出:
- 输入 jps ,会列出当前启动的 java 进程
- 显示 Worker 字样,说明 worker 进程启动成功了。
再刷新下打开的浏览器界面 https://192.168.56.106:8080
- 可以看到当前启动了三个 Worker 节点。`
界面上显示的 Address 列是 10 开头的 ip 地址,并且都是一样的,不是 192 开头的三个不同的 ip 地址。
- 这是因为虚拟机内有两块虚拟网卡
- Spark 会读取环境变量 SPARK_LOCAL_IP
- 如果没设置这个变量,Spark 就会使用 getHostByName 来获取 ip 地址,会得到 10.0.2.15 这个 ip 地址。
要解决这个问题,有两种方法:
- 设置网卡
- 将 仅主机(Host-Only)网络 设置为网卡 1
- 将 网络地址转换(NAT) 设置为网卡 2
- 不过如果使用这种方法,重启虚拟机后,如果是动态 ip,则 ip 地址会变化,会影响之前的配置。
- 另一种方法
- 在 conf/spark-env.sh 中设置 SPARK_LOCAL_IP 这个变量,可以固定为一个 ip 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
vim conf/spark-env.sh
# 添加一行:
export SPARK_LOCAL_IP=192.168.56.106
# 在其他机器上同样需要手动添加这一行,不过要修改为对应的机器 ip。
# 觉得这样有点麻烦。可以通过脚本动态获取本机 ip 地址,
# 在 **conf/spark-env.sh** 中添加这两行:
SPARK_LOCAL_IP=`python -c "\
import socket;\
import fcntl;\
import struct;\
print([(socket.inet_ntoa(fcntl.ioctl(s.fileno(),0x8915,struct.pack('256s', 'enp0s8'))[20:24]), s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][0])"`
export SPARK_LOCAL_IP
# 这样就可以自动获取本机的enp0s8这块网卡的 ip 地址。
# 将修改后的conf/spark-env.sh这个文件复制到其它机器上:
scp conf/spark-env.sh username@192.168.56.107:/path/to/spark-2.4.3-bin-hadoop2.7/conf/spark-env.sh scp conf/spark-env.sh username@192.168.56.108:/path/to/spark-2.4.3-bin-hadoop2.7/conf/spark-env.sh
# 重新启动所有节点:
sbin/stop-all.sh
sbin/start-all.sh
刷新浏览器界面
- 有 3 个 Woker 启动了,
- 并且在 Address 列也可以看到都变为 192 开头的 ip 地址了。
测试
在 {SPARK_HOME}/examples/src/main 目录下,有一些 spark 自带的示例程序
- 有 java、python、r、scala 四种语言版本的程序。
测试 python 版的计算 PI 的程序。
1
2
3
4
5
cd ${SPARK_HOME}/examples/src/main/python
# 将 **pi.py** 程序提交到 spark 集群,执行:
spark-submit --master spark://192.168.56.106:7077 pi.py
最后可以看到输出这样的日志:
刷新浏览器界面
- 在 Completed Applications 栏可以看到一条记录,即刚才执行的计算 PI 的 python 程序。
如果觉得在终端中输出的日志太多,可以修改日志级别:
1
2
3
cp ${SPARK_HOME}/conf/log4j.properties.template ${SPARK_HOME}/conf/log4j.properties
vim ${SPARK_HOME}/conf/log4j.properties
修改日志级别为 WARN :
再重新执行可以看到输出日志少了很多。
除了提交 python 程序外,spark-submit 还可以提交打包好的 java 、 scala 程序
Spark 配置文件说明
在下载下来的 spark-2.4.3-bin-hadoop2.7.tgz 中,conf 目录下会默认存在几个文件
- 均为 Spark 的配置示例模板文件:
- 这些模板文件,均不会被 Spark 读取
- 需要将 .template 后缀去除,Spark 才会读取这些文件。
- 在 Spark 集群中主要需要关注的是 log4j.properties 、 slaves 、 spark-defaults.conf 、 spark-env.sh 这四个配置文件。
log4j.properties 的配置
- 可以参考 Apache Log4j 官网上的 Propertities 属性配置说明。
slaves 的配置
- 里面为集群的所有 worker 节点的主机信息
- 可以为主机名,也可以为 ip 地址。
spark-defaults.conf 的配置
- 可以参考 Spark 官网的属性配置 页。
- 比如指定 master 节点地址,可以设置 spark.master 属性;
- 指定 executor 的运行时的核数,可以设置 spark.executor.cores 属性等。
spark-env.sh
- 是 Spark 运行时,会读取的一些环境变量,
- 在本文中,主要设置了三个环境变量: JAVA_HOME 、 SPARK_HOME 、 SPARK_LOCAL_IP ,这是 Spark 集群搭建过程中主要需要设置的环境变量。
- 其它未设置的环境变量,Spark 均采用默认值。
- 其它环境变量的配置说明,可以参考 Spark 官网的环境变量配置 页。
至此,Spark 集群的 Standalone 模式部署全部结束。
Run spark
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
nums = list(range(0,10000001))
nums_rdd = sc.parallelize(nums)
nums_rdd.take(5)
squared_nums_rdd = nums_rdd.map(lambda x: x**2)
squared_nums_rdd.take()
str(56)
len(str(56))
pairs = squared_nums_rdd.map(lambda x: (x, len(str(X))))
pairs.take(10)
even_digit_pairs = pairs.filter(lambda x: x[1]%2==0 )
even_digit_pairs.take(10)
flipped_pairs = even_digit_pairs.map(lambda x: (x[1], x[0]))
grouped = flipped_pairs.groupByKey()
grouped = grouped.map(lambda x: (x[0], list(x[1])))
grouped.take(10)
averaged = grouped.map(lambda x: (x[0], sum(x[1])/len(x[1]) ) )
averaged.collet()
.
Comments powered by Disqus.