具有池/队列的Python多个子进程在一个完成时立即恢复输出并在队列中启动下一个作业
问题内容:
我当前正在启动一个子进程,并在旅途中解析stdout,而无需等待它完成对stdout的解析。
for sample in all_samples:
my_tool_subprocess = subprocess.Popen('mytool {}'.format(sample),shell=True, stdout=subprocess.PIPE)
line = True
while line:
myline = my_tool_subprocess.stdout.readline()
#here I parse stdout..
在我的脚本中,我实际上多次执行此操作,具体取决于输入样本的数量。
这里的主要问题是每个子进程都是一个程序/工具,在运行时会使用1个CPU占100%。这需要一些时间..也许每个输入20-40分钟。
我想要实现的是设置同时运行的N个最大子进程作业进程的池,队列(我不确定这里的确切术语是什么)。因此,我可以最大限度地提高性能,而不必按顺序进行。
因此,例如最多4个作业池的执行流应为:
- 启动4个子进程。
- 当一项作业完成时,解析stdout并随后启动。
- 这样做直到队列中的所有作业都完成。
如果我能做到这一点,我真的不知道如何确定哪个样本子流程已经完成。目前,我不需要识别它们,因为每个子进程都按顺序运行,并且当子进程正在打印stdout时,我将解析stdout。
这一点非常重要,因为我需要确定每个子流程的输出并将其分配给相应的输入/样本。
问题答案:
ThreadPool
可能非常适合您的问题,您可以设置工作线程的数量并添加作业,然后线程将在所有任务中正常工作。
from multiprocessing.pool import ThreadPool
import subprocess
def work(sample):
my_tool_subprocess = subprocess.Popen('mytool {}'.format(sample),shell=True, stdout=subprocess.PIPE)
line = True
while line:
myline = my_tool_subprocess.stdout.readline()
#here I parse stdout..
num = None # set to the number of workers you want (it defaults to the cpu count of your machine)
tp = ThreadPool(num)
for sample in all_samples:
tp.apply_async(work, (sample,))
tp.close()
tp.join()