Spark中groupByKey,reduceByKey和combineByKey的区别?

Spark中groupByKey,reduceByKey和combineByKey的区别?

[TOC]

相同

  • 三者都是涉及到聚合操作,故都为宽依赖算子,都会产生 shuffle 操作

区别

  • groupByKey
    • 用于对每个 Key 进行操作,将结果生成一个 sequence
    • groupByKey 本身不能自定义函数
    • 会将所有的键值对进行移动,不会进行局部的 merge
    • 会导致集群节点之间的开销很大,导致传输延时
  • reduceByKey
    • 用于对于每个 Key 对应的多个 Value 进行 merge 操作
    • 该算子能在本地进行 merge 操作(分区内先 merge)
    • merge 操作阔以对应函数进行自定义
  • combineByKey
    • 是一个比较底层的算子
    • reduceByKey 底层使用的就是 combineByKey

reduceByKey

groupByKey

三个函数的源码定义

reduceByKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
  /**
* 使用关联和可交换的归约函数合并每个键的值。 这也将在将结果发送到减速器之前在每个映射器上本地执行合并,类似于 MapReduce 中 * 的“组合器”。
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
//

/**
* 使用一组自定义聚合函数组合每个键的元素的通用函数。 将 RDD[(K, V)] 转换为 RDD[(K, C)] 类型的结果,用于“组合类型” C
*
* 用户提供三个功能:
*
* - `createCombiner`,将 V 转换为 C(例如,创建一个单元素列表)
* - `mergeValue`,将 V 合并到 C(例如,将其添加到列表的末尾)
* - `mergeCombiners`,将两个 C 组合成一个。
*
* 另外,用户可以控制输出RDD的分区,以及是否进行map-side聚合(如果一个mapper可以产生多个相同key的item)。
*
* @note V 和 C 可以不同——例如,可以将类型为 (Int, Int) 的 RDD 分组为类型为 (Int, Seq[Int]) 的 RDD。
*/
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}

groupByKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  /**
* 将 RDD 中每个键的值分组为一个序列。 使用现有的分区器/并行级别对生成的 RDD 进行哈希分区。 每个组中元素的顺序是不能保证的, * 甚至在每次评估结果 RDD 时都可能不同。
*
* @note 此操作可能非常昂贵。 如果您分组是为了对每个键执行聚合(例如求和或平均值),使用 * `PairRDDFunctions.aggregateByKey` 或 `PairRDDFunctions.reduceByKey` 将提供更好的性能。
*/
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(defaultPartitioner(self))
}


/**
* 注意:按照目前的实现,groupByKey 必须能够在内存中保存任何键的所有键值对。 如果一个键有太多的值,它可能会导致 * [[OutOfMemoryError]]。
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}

combineByKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* 使用一组自定义聚合函数组合每个键的元素的通用函数。 此方法用于向后兼容。 它不向 shuffle 提供组合器类标签信息。
*
* @see `combineByKeyWithClassTag`
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
partitioner, mapSideCombine, serializer)(null)
}


/**
* 使用一组自定义聚合函数组合每个键的元素的通用函数。 将 RDD[(K, V)] 转换为 RDD[(K, C)] 类型的结果,用于“组合类型” C
*
* 用户提供三个功能:
*
* - `createCombiner`,将 V 转换为 C(例如,创建一个单元素列表)
* - `mergeValue`,将 V 合并到 C(例如,将其添加到列表的末尾)
* - `mergeCombiners`,将两个 C 组合成一个。
*
* 另外,用户可以控制输出RDD的分区,以及是否进行map-side聚合(如果一个mapper可以产生多个相同key的item)。
*
* @note V 和 C 可以不同——例如,可以将类型为 (Int, Int) 的 RDD 分组为类型为 (Int, Seq[Int]) 的 RDD。
*/
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}