通过保留基于另一个变量的顺序来收集列表


问题内容

我试图在现有列集上使用groupby聚合在Pyspark中创建列表的新列。下面提供了一个示例输入数据帧:

------------------------
id | date        | value
------------------------
1  |2014-01-03   | 10 
1  |2014-01-04   | 5
1  |2014-01-05   | 15
1  |2014-01-06   | 20
2  |2014-02-10   | 100   
2  |2014-03-11   | 500
2  |2014-04-15   | 1500

预期输出为:

id | value_list
------------------------
1  | [10, 5, 15, 20]
2  | [100, 500, 1500]

列表中的值按日期排序。

我尝试使用collect_list,如下所示:

from pyspark.sql import functions as F
ordered_df = input_df.orderBy(['id','date'],ascending = True)
grouped_df = ordered_df.groupby("id").agg(F.collect_list("value"))

但是,即使我在聚合之前按日期对输入数据帧进行排序,collect_list也不能保证顺序。

有人可以通过根据第二个(日期)变量保留顺序来帮助进行汇总吗?


问题答案:

如果您同时收集日期和值作为列表,则可以使用和根据日期对结果列进行排序udf,然后仅将值保留在结果中。

import operator
import pyspark.sql.functions as F

# create list column
grouped_df = input_df.groupby("id") \
               .agg(F.collect_list(F.struct("date", "value")) \
               .alias("list_col"))

# define udf
def sorter(l):
  res = sorted(l, key=operator.itemgetter(0))
  return [item[1] for item in res]

sort_udf = F.udf(sorter)

# test
grouped_df.select("id", sort_udf("list_col") \
  .alias("sorted_list")) \
  .show(truncate = False)
+---+----------------+
|id |sorted_list     |
+---+----------------+
|1  |[10, 5, 15, 20] |
|2  |[100, 500, 1500]|
+---+----------------+