Spark-Shuffle原理

前言

Spark Shuffle是大众讨论的比较多的话题了。它是Spark任务执行过程中最为重要的过程之一。那么什么是Shuffle呢?
Shuffle一般被翻译成数据混洗,是类MapReduce分布式计算框架独有的机制,也是这类分布式计算框架最重要的执行机制。接下来会按照两个层面来谈谈Shuffle机制。分别为:

  • 逻辑层面
  • 物理层面

逻辑层面主要是从RDD的血缘出发,从DAG的角度来讲解Shuffle,另外也会说明Spark容错机制。物理层面是从执行角度来剖析Shuffle是如何发生的

1. RDD血缘与Spark容错

从血缘角度出发就需先了解DAG,DAG被称之为有向无环图。在DAG中,最初的RDD被成为基础RDD,在基础RDD之上使用算子的过程中后续生成RDD被成为一个个子RDD,它们之间存在依赖关系。无论哪个RDD出现问题,都可以由这种依赖关系重新计算而成。这种依赖关系就被成为RDD血缘。血缘的表现方式主要分为宽依赖与窄依赖。

1.1 窄依赖与宽依赖

窄依赖的标准定义是:子RDD中的分区与父RDD中的分区只存在一对一的映射关系。
宽依赖的标准定义是:子RDD中分区与父RDD中分区存在一对多的映射关系。

23abbd9ac0c0c032330c6ca695f98b37

从实际算子来说,map,filter,union等就是窄依赖,而groupByKey,reduceByKey就是典型的宽依赖。

宽依赖还有个名字,叫shuffle依赖,也就是说宽依赖必然会发生在shuffle操作,shuffle也是划分stage的重要依据。而窄依赖由于不需要发生shuffle,所有计算都是在分区所在节点完成,类似于MR中的ChainMapper。所以说,在如果在程序中选取的算子形成了宽依赖,那么就必然会触发shuffle。
所以当RDD在shuffle过程中某个分区出现了故障,只需要找到当前对应的Stage,而这个Stage必然是某个shuffle算子所进行划分的,找到了这个算子,就离定位错误原因越来越近了。
2613599ec6366c549584bd777e88523c

如上图所示,如果P1_0分区发生故障,那么按照依赖关系,则需要P0_0与P0_1的分区重算,P0_0与P0_1没有持久化,就会不断回溯,直到找到存在的父分区为止。至于为什么要持久化,原因就是当计算逻辑复杂时,就会引发依赖链过长,如果其中的某个RDD发生了问题。若没有进行持久化,Spark则会根据RDD血缘关系进行重头开始计算。重算显然对我们是代价极高的,所以用户可以在计算过程中,适当的调用RDD的checkpoint方法,保存好当前算好的中间结果,这样依赖关系链就会大大的缩短。因为checkpoint其实是会切断血缘的。这就是RDD的血缘机制即RDD的容错机制。

而Spark的容错机制则是主要分为资源管理平台的容错和Spark应用的容错。

1.2 Spark的容错机制

Spark的应用是基于资源管理平台运行的,所以资源管理平台的容错也是Spark容错的一部分,如Yarn的ResourceManager HA机制。在Spark应用执行的过程中,可能会遇到以下几种失败情况:

  • Driver出错
  • Executor出错
  • Task出错

Dirver执行失败是Spark应用最严重的一种情况,因为它标记着整个作业的执行失败,需要开发人员手动重启Driver。而Executor报错通常是所在的Worker出错,这时Driver就会将执行失败的Task调度到另一个Executor继续执行,重新执行的Task会根据RDD的依赖关系继续计算,并将报错的Executor从可用的Executor列表中移除。Spark会对执行失败的Task进行重试,重试3次后若依然出错,则整个作业就会失败。而在这个过程中,数据恢复和重试都依赖于RDD血缘机制。

2. Spark Shuffle

很多算子都会引起RDD中的数据进行重分区,新的分区被创建,旧的分区被合并或者打碎,在重分区过程中,如果数据发生了跨节点移动,就被称为Shuffle。 Spark对Shuffle的实现方式有两种:Hash Shuffle与Sort-based Shuffle,这其实是一个优化的过程。在较老的版本中,Spark Shuffle的方式可以通过spark.shuffle.manager配置项进行配置,而在最新的版本中,已经移除了该配置项,统一称为Sort-based Shuffle。

2.1 Hash Shuffle

在Spark 1.6.3之前,Hash Shuffle都是Spark Shuffle的解决方案之一。Shuffle的过程一般分为两个部分:Shuffle Write和Shuffle Fetch,前者是Map任务划分分区,输出中间结果,而后者则是Reduce任务获取到的这些中间结果。Hash Shuffle的过程如图下所示:
1b3f9d7e0de8684358ff919559d13a62

图中,Shuffle Write发生在一个节点上,执行shuffle任务的CPU核数为1,可以同时执行两个任务,每个任务输出的分区数与Reducer数相同,即为3。每个分区都有一个缓冲区(bucket)用来接收结果,每个缓冲区的大小由配置spark.shuffle.file.buffer.kb决定。这样每个缓冲区写满后,就会输出到一个文件段中。而Reducer就会去相应的节点拉取文件。

这样设计起来其实是不复杂的。但问题也很明显,主要有两个:

  • 生成的文件个数太大。理论上,每个Shuffle任务输出会产生R个文件(由Reduce个数决定),而Shuffle任务的个数往往是由Map任务个数M决定的,所以总共会生成M * R个中间结果文件,而在大型作业中,若是M和R都是很大的数字的话,就会出现文件句柄数突破操作系统的限制。
  • 缓冲区占用内存空间过大。 单节点在执行Shuffle任务时缓存区大小消耗(spark.shuffle.file.buffer.kb) × m × R , m为该节点运行的shuffle个数,如果一个核可以执行一个任务,那么m就与cpu核数相等。这对于有32,64核的服务器来说都是不小的内存开销。所有为了解决第一个问题,Spark引入了Flie Consolidation机制,指通过共同输出文件以降低文件数,如下图所示:

ac8c4e5d8e27ec9319930726a7f15068

每当Shuffle输出时,同一个CPU核心处理的Map任务的中间结果会输出到同分区的一个文件中,然后Reducer只需要一次性将整个文件拿到即可。这样,Shuffle产生的文件数为C(CPU核数)* R。Spark的FileConsolidation机制默认开启,可以通过spark.shuffle.consolidateFiles配置项进行配置。

2.2 Sort-based Shuffle

即便是引入了FlieConsolidation后,还是无法根本解决中间文件数太大的问题,这时候Sort-based Shuffle才算是真正的引入进来。如图所示:7d856c507ee118374475fa61c694203b

  • 每个Map任务会最后只输出两个文件(其中一个是索引文件),其中间过程采用MapReduce一样的归并排序,但是会用索引文件记录每个分区的偏移量,输出完成后,Reducer会根据索引文件得到属于自己的分区,这种情况下,shuffle产生的中间结果文件为2 * M(M为Map任务数)。
  • 在基于排序的 Shuffle 中, Spark 还提供了一种折中方案——Bypass Sort-based Shuffle,当 Reduce 任务小于 spark.shuffle.sort.bypassMergeThreshold 配置(默认 200)时,Spark Shuffle 开始按照 Hash Shuffle 的方式处理数据,而不用进行归并排序,只是在 Shuffle Write 步骤的最后,将其合并为 1 个文件,并生成索引文件。这样实际上还是会生成大量的中间文件,只是最后合并为 1 个文件并省去排序所带来的开销,该方案的准确说法是 Hash Shuffle 的Shuffle Fetch 优化版。

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. 前言
    1. 1.1. 1. RDD血缘与Spark容错
      1. 1.1.1. 1.1 窄依赖与宽依赖
      2. 1.1.2. 1.2 Spark的容错机制
    2. 1.2. 2. Spark Shuffle
      1. 1.2.1. 2.1 Hash Shuffle
      2. 1.2.2. 2.2 Sort-based Shuffle
,