从SPARK中的另一个RDD返回最大N个值的RDD
问题内容:
我正在尝试过滤元组的RDD以基于键值返回最大的N个元组。我需要返回格式为RDD。
因此,RDD:
[(4, 'a'), (12, 'e'), (2, 'u'), (49, 'y'), (6, 'p')]
筛选出最大的3个键应返回RDD:
[(6,'p'), (12,'e'), (49,'y')]
先执行a sortByKey()
,然后take(N)
返回值,并且不会导致RDD,所以它将无法正常工作。
我可以返回所有键,对它们进行排序,找到第N个最大值,然后为更大的键值筛选RDD,但这似乎效率很低。
最好的方法是什么?
问题答案:
用RDD
一个快速但不是特别有效的解决方案是跟随sortByKey
使用zipWithIndex
和filter
:
n = 3
rdd = sc.parallelize([(4, 'a'), (12, 'e'), (2, 'u'), (49, 'y'), (6, 'p')])
rdd.sortByKey().zipWithIndex().filter(lambda xi: xi[1] < n).keys()
如果n与RDD大小相比相对较小,则更有效的方法是避免完全排序:
import heapq
def key(kv):
return kv[0]
top_per_partition = rdd.mapPartitions(lambda iter: heapq.nlargest(n, iter, key))
top_per_partition.sortByKey().zipWithIndex().filter(lambda xi: xi[1] < n).keys()
如果键远小于值,并且最终输出的顺序无关紧要,则filter
方法可以正常工作:
keys = rdd.keys()
identity = lambda x: x
offset = (keys
.mapPartitions(lambda iter: heapq.nlargest(n, iter))
.sortBy(identity)
.zipWithIndex()
.filter(lambda xi: xi[1] < n)
.keys()
.max())
rdd.filter(lambda kv: kv[0] <= offset)
同样,如果出现平局,它将不会保留确切的n值。
用DataFrames
您可以orderBy
和limit
:
from pyspark.sql.functions import col
rdd.toDF().orderBy(col("_1").desc()).limit(n)