如何在PySpark 2.1.0中的事件时间窗口上定义UDAF
问题内容:
我正在编写一个Python应用程序,该程序可在带有时间戳的值序列上滑动窗口。我想对滑动窗口中的值应用一个函数,以便根据N个最新值计算分数,如图所示。我们已经使用Python库实现了该功能,以利用GPU。
我发现Apache Spark
2.0附带了结构化流,并且它支持事件时间的窗口操作。如果您想从.csv文件中读取有限的记录序列,并希望在这样的滑动窗口中对记录进行计数,则可以在PySpark中使用以下代码:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType
from pyspark.sql.functions import window
from os import getcwd
spark = SparkSession \
.builder \
.master('local[*]') \
.getOrCreate()
schema = StructType() \
.add('ts', 'timestamp') \
.add('value', 'double') \
sqlContext = SQLContext(spark)
lines = sqlContext \
.readStream \
.format('csv') \
.schema(schema) \
.load(path='file:///'+getcwd()+'/csv')
windowedCount = lines.groupBy(
window(lines.ts, '30 minutes', '10 minutes')
).agg({'value':'count'})
query = windowedCount \
.writeStream \
.outputMode('complete') \
.format('console') \
.start()
query.awaitTermination()
但是,我想在滑动窗口上应用除预定义聚合功能以外的UDAF。根据https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.GroupedData.agg,可用的汇总函数仅为avg,max,min,求和和计数。
还不支持吗?如果是这样,何时在PySpark中支持它?
https://stackoverflow.com/a/32750733/1564381显示,可以在Java或Scala中定义UserDefinedAggregateFunction,然后在PySpark中调用它。看起来很有趣,但是我想将自己的Python函数应用于滑动窗口中的值。我想要一种纯粹的Python方式。
ps让我知道了除PySpark以外的Python中可以解决此类问题的任何框架(将UDAF应用于在流上滑动的窗口上)。
问题答案:
在Spark <2.3中,您不能执行此操作。
对于Spark> = 2.3,这对于分组数据是可行的,但对于使用“带有Pys的PySpark UDAF的Windows”而言,尚不可行。
当前,PySpark无法在Windows上运行UserDefined函数。
这是一个对此有一个很好描述的SO问题:在PySpark中的GroupedData上应用UDF(带有可运行的python示例)
这是添加了此功能的JIRA票证-https:
//issues.apache.org/jira/browse/SPARK-10915