Spark中的shuffle操作
Spark中的shuffle操作
[TOC]
Shuffle 操作
Spark 中的某些操作会触发一个称为 shuffle 的事件。 shuffle 是 Spark 用于重新分配数据的机制,以便在不同分区之间进行不同的分组。 这通常涉及在执行器和机器之间复制数据,从而使 shuffle 成为一项复杂且成本高昂的操作。
背景
要了解 shuffle 期间发生了什么,我们可以考虑 reduceByKey
操作的示例。 reduceByKey
操作生成一个新的 RDD,其中单个键的所有值都组合成一个元组 - 键和对与该键关联的所有值执行 reduce 函数的结果。挑战在于,单个键的所有值不一定位于同一分区,甚至同一台机器上,但它们必须位于同一位置以计算结果。
在 Spark 中,数据通常不会跨分区分布在特定操作的必要位置。在计算过程中,单个任务将在单个分区上运行 - 因此,为了组织单个“reduceByKey”reduce 任务执行的所有数据,Spark 需要执行所有操作。它必须从所有分区中读取以找到所有键的所有值,然后将跨分区的值组合在一起以计算每个键的最终结果 - 这称为 shuffle。
尽管新混洗数据的每个分区中的元素集是确定性的,分区本身的排序也是确定的,但这些元素的排序不是。如果在 shuffle 之后需要可预测的有序数据,那么可以使用:
mapPartitions
使用例如.sorted
对每个分区进行排序repartitionAndSortWithinPartitions
可在重新分区的同时有效地对分区进行排序sortBy
来创建一个全局有序的 RDD
可能导致 shuffle 的操作包括 repartition 操作,如 repartition
和 coalesce
,ByKey 操作(计数除外)如 [groupByKey
](https://spark .apache.org/docs/latest/rdd-programming-guide.html#GroupByLink) 和 reduceByKey
和 join 操作,如 cogroup
和 [join
](https:// spark.apache.org/docs/latest/rdd-programming-guide.html#JoinLink)。
性能影响
Shuffle 是一项昂贵的操作,因为它涉及磁盘 I/O、数据序列化和网络 I/O。为了组织 shuffle 的数据,Spark 生成一组任务 map 任务来组织数据,以及一组 reduce 任务来聚合它。这个命名法来自 MapReduce,与 Spark 的 map
和 reduce
操作没有直接关系。
在内部,单个map
任务的结果会保存在内存中,直到无法容纳为止。然后,根据目标分区对它们进行排序并写入单个文件。在reduce
方面,任务读取相关的排序块。
某些 shuffle
操作可能会消耗大量的堆内存,因为它们在传输之前或之后使用内存数据结构来组织记录。具体来说,reduceByKey
和aggregateByKey
在 map 端创建这些结构,而ByKey
操作在 reduce 端生成这些结构。当数据不适合内存时,Spark 会将这些表溢出到磁盘,从而导致磁盘 I/O 的额外开销和垃圾收集增加。
Shuffle
还会在磁盘上生成大量中间文件。从 Spark 1.3 开始,这些文件会一直保留,直到相应的 RDD 不再使用并被垃圾回收。这样做是为了在重新计算谱系时不需要重新创建 shuffle
文件。如果应用程序保留对这些 RDD 的引用或者 GC[垃圾回收器] 不频繁启动,则垃圾收集可能会在很长一段时间后发生。这意味着长时间运行的 Spark 作业可能会消耗大量磁盘空间。临时存储目录由配置 Spark 上下文时的 spark.local.dir
配置参数指定。
可以通过调整各种配置参数来调整 Shuffle 行为。请参阅 Spark 配置指南 中的Shuffle Behavior
部分。
Spark中shuffle的三种方式
shuffle的方式共三种,分别是:
- HashShuffle
- SortShuffle(默认)
- TungstenShuffle
在Spark程序中设置方式,通过设置spark.shuffle.manager
进行配置:
1 |
|
HashShuffleManager 特点
- 数据不进行排序,速度较快
- 直接
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!