Spark中的shuffle操作

Spark中的shuffle操作

[TOC]

Shuffle 操作

​ Spark 中的某些操作会触发一个称为 shuffle 的事件。 shuffle 是 Spark 用于重新分配数据的机制,以便在不同分区之间进行不同的分组。 这通常涉及在执行器和机器之间复制数据,从而使 shuffle 成为一项复杂且成本高昂的操作。

背景

​ 要了解 shuffle 期间发生了什么,我们可以考虑 reduceByKey 操作的示例。 reduceByKey 操作生成一个新的 RDD,其中单个键的所有值都组合成一个元组 - 键和对与该键关联的所有值执行 reduce 函数的结果。挑战在于,单个键的所有值不一定位于同一分区,甚至同一台机器上,但它们必须位于同一位置以计算结果。

​ 在 Spark 中,数据通常不会跨分区分布在特定操作的必要位置。在计算过程中,单个任务将在单个分区上运行 - 因此,为了组织单个“reduceByKey”reduce 任务执行的所有数据,Spark 需要执行所有操作。它必须从所有分区中读取以找到所有键的所有值,然后将跨分区的值组合在一起以计算每个键的最终结果 - 这称为 shuffle

​ 尽管新混洗数据的每个分区中的元素集是确定性的,分区本身的排序也是确定的,但这些元素的排序不是。如果在 shuffle 之后需要可预测的有序数据,那么可以使用:

  • mapPartitions 使用例如 .sorted 对每个分区进行排序
  • repartitionAndSortWithinPartitions 可在重新分区的同时有效地对分区进行排序
  • sortBy 来创建一个全局有序的 RDD

​ 可能导致 shuffle 的操作包括 repartition 操作,如 repartitioncoalesceByKey 操作(计数除外)如 [groupByKey](https://spark .apache.org/docs/latest/rdd-programming-guide.html#GroupByLink) 和 reduceByKeyjoin 操作,如 cogroup 和 [join](https:// spark.apache.org/docs/latest/rdd-programming-guide.html#JoinLink)。

性能影响

Shuffle 是一项昂贵的操作,因为它涉及磁盘 I/O、数据序列化和网络 I/O。为了组织 shuffle 的数据,Spark 生成一组任务 map 任务来组织数据,以及一组 reduce 任务来聚合它。这个命名法来自 MapReduce,与 Spark 的 mapreduce 操作没有直接关系。

​ 在内部,单个map任务的结果会保存在内存中,直到无法容纳为止。然后,根据目标分区对它们进行排序并写入单个文件。在reduce 方面,任务读取相关的排序块。

​ 某些 shuffle 操作可能会消耗大量的堆内存,因为它们在传输之前或之后使用内存数据结构来组织记录。具体来说,reduceByKeyaggregateByKeymap 端创建这些结构,而ByKey 操作在 reduce 端生成这些结构。当数据不适合内存时,Spark 会将这些表溢出到磁盘,从而导致磁盘 I/O 的额外开销和垃圾收集增加。

Shuffle 还会在磁盘上生成大量中间文件。从 Spark 1.3 开始,这些文件会一直保留,直到相应的 RDD 不再使用并被垃圾回收。这样做是为了在重新计算谱系时不需要重新创建 shuffle 文件。如果应用程序保留对这些 RDD 的引用或者 GC[垃圾回收器] 不频繁启动,则垃圾收集可能会在很长一段时间后发生。这意味着长时间运行的 Spark 作业可能会消耗大量磁盘空间。临时存储目录由配置 Spark 上下文时的 spark.local.dir 配置参数指定。

​ 可以通过调整各种配置参数来调整 Shuffle 行为。请参阅 Spark 配置指南 中的Shuffle Behavior部分。

Spark中shuffle的三种方式

shuffle的方式共三种,分别是:

  1. HashShuffle
  2. SortShuffle(默认)
  3. TungstenShuffle

​ 在Spark程序中设置方式,通过设置spark.shuffle.manager 进行配置:

1
2
3
4
5
6
val session = SparkSession.builder()
.appName("XXXXX")
.master("local[*]")
// 可设置 hash、sort、tungsten-sort
.config("spark.shuffle.manager","hash")
.getOrCreate()

HashShuffleManager 特点

  • 数据不进行排序,速度较快
  • 直接

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!