有没有一种方法可以将结果流传输到驱动程序,而无需等待所有分区完成执行?
问题内容:
有没有一种方法可以将结果流式传输到驱动程序,而无需等待所有分区完成执行?
我是Spark的新手,所以如果有更好的方法,请指出正确的方向。我想并行执行大量分区,并使用spark来处理分发/重新启动等操作。完成操作后,我想将结果收集到驱动程序中的单个存档中。
采用 toLocalIterator()
我已经能够做到这一点toLocalIterator()
,根据文档,它限制了驱动程序所需的资源。因此,它基本上可以工作。
问题在于,toLocalIterator()
不仅将驱动程序一次限制为一个分区,而且似乎一次只能执行一个分区。这对我没有用。下面的演示代码中演示了该行为。
使用persist()
+ count()
+toLocalIterator()
我发现我可以通过持久化然后用触发并行执行来解决这个问题count()
。之后,toLocalIterator()
可以快速提取预先计算的结果。
问题是我有很多分区(大约10 ^ 3或10 ^
4),每个分区大约需要15分钟。这样最终会保留大量数据(不是很多),但更糟糕的是,一旦整个工作持续太长时间,它似乎就失去了持久性。分区最终需要重新计算。我正在与可抢占的工作人员一起使用google
dataproc,所以这可能与它有关系,但是我很确定它最终甚至在固定工作人员上也要重新计算…我不确定到底发生了什么。
无论如何,在访问第一个结果之前必须执行所有分区似乎并不理想。
下面的演示代码演示了一切正常的情况,并且迭代不会触发重新计算时的最佳情况。
??? ->迭代数据而无需等待完整执行
有没有类似的东西?
复制/粘贴演示代码
import time
import pyspark.storagelevel
def slow_square(n):
time.sleep(5)
return n**2
with pyspark.SparkContext() as spark_context:
numbers = spark_context.parallelize(range(4), 4) # I think 4 is default executors locally
squares = numbers.map(slow_square)
# Use toLocalIterator()
start = time.time()
list(squares.toLocalIterator())
print('toLocalIterator() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
# I get about 20s
# Use count() to show that it's faster in parallel
start = time.time()
squares.count()
print('count() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
# I get about 5s
# Use persist() + count() + toLocalIterator()
start = time.time()
squares.persist(pyspark.storagelevel.StorageLevel.MEMORY_AND_DISK)
squares.count()
list(squares.toLocalIterator())
print('persisted toLocalIterator() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
# I get about 5s
问题答案:
一般来说,这不是您通常在Spark中执行的操作。通常,我们尝试将通过驱动程序传递的数据量限制为最小。有两个主要原因:
- 将数据传递给Spark驱动程序很容易成为应用程序的瓶颈。
- 驱动程序实际上是批处理应用程序中的单点故障。
在正常情况下,您只需要继续工作,写入持久性存储,最后对结果应用进一步的处理步骤。
如果您希望能够迭代访问结果,则可以选择以下几种方法:
- 使用Spark Streaming。创建一个简单的过程,将数据推送到群集,然后收集每个批次。它简单,可靠,经过测试,并且不需要任何其他基础结构。
- 使用
foreach
/处理数据foreachPartition
,并在产生数据时将数据推送到外部消息传递系统,并使用另一个过程进行消耗和写入。这需要额外的组件,但从概念上讲可能更容易(您可以使用背压,缓冲结果,从驱动程序分离合并逻辑以最大程度地减少应用程序失败的风险)。 - Hack Spark累加器。任务完成后,火花累积器会更新,因此您可以分批处理累积的即将到来的数据。
警告 : 以下代码仅是概念验证。 它没有经过适当的测试,很可能是非常不可靠的。
AccumulatorParam
使用RXPy的示例
# results_param.py
from rx.subjects import Subject
from pyspark import AccumulatorParam, TaskContext
class ResultsParam(AccumulatorParam, Subject):
"""An observable accumulator which collects task results"""
def zero(self, v):
return []
def addInPlace(self, acc1, acc2):
# This is executed on the workers so we have to
# merge the results
if (TaskContext.get() is not None and
TaskContext().get().partitionId() is not None):
acc1.extend(acc2)
return acc1
else:
# This is executed on the driver so we discard the results
# and publish to self instead
for x in acc2:
self.on_next(x)
return []
简单的Spark应用程序(Python 3.x):
# main.py
import time
from pyspark import SparkContext, TaskContext
sc = SparkContext(master="local[4]")
sc.addPyFile("results_param.py")
from results_param import ResultsParam
# Define accumulator
acc = sc.accumulator([], ResultsParam())
# Dummy subscriber
acc.accum_param.subscribe(print)
def process(x):
"""Identity proccess"""
result = x
acc.add([result])
# Add some delay
time.sleep(5)
return result
sc.parallelize(range(32), 8).foreach(process)
这相对简单,但是如果多个任务同时完成,则有使驱动程序不堪重负的风险,因此您必须大幅超额分配驱动程序资源(成比例地达到并行度和任务结果的预期大小)。
runJob
直接使用Scala (不支持Python)。
实际上,Spark实际上是异步获取结果的,只要您不关心顺序,就不需要等待所有数据被处理。例如,reduce
您可以查看实现Scala。
应该可以使用这种机制将分区推到Python进程中,但是我还没有尝试过。