使用SQLAlchemy和多处理程序挂在Python脚本中
问题内容:
考虑以下Python脚本,该脚本使用SQLAlchemy和Python多处理模块。这是在Debian squeeze上使用Python 2.6.6-8 +
b1(默认)和SQLAlchemy 0.6.3-3(默认)的情况。这是一些实际代码的简化版本。
import multiprocessing
from sqlalchemy import *
from sqlalchemy.orm import *
dbuser = ...
password = ...
dbname = ...
dbstring = "postgresql://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
db = create_engine(dbstring)
m = MetaData(db)
def make_foo(i):
t1 = Table('foo%s'%i, m, Column('a', Integer, primary_key=True))
conn = db.connect()
for i in range(10):
conn.execute("DROP TABLE IF EXISTS foo%s"%i)
conn.close()
db.dispose()
for i in range(10):
make_foo(i)
m.create_all()
def do(kwargs):
i, dbstring = kwargs['i'], kwargs['dbstring']
db = create_engine(dbstring)
Session = scoped_session(sessionmaker())
Session.configure(bind=db)
Session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
Session.commit()
db.dispose()
pool = multiprocessing.Pool(processes=5) # start 4 worker processes
results = []
arglist = []
for i in range(10):
arglist.append({'i':i, 'dbstring':dbstring})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
r.get()
r.wait()
pool.close()
pool.join()
该脚本因以下错误消息而挂起。
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/usr/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.6/multiprocessing/pool.py", line 259, in _handle_results
task = get()
TypeError: ('__init__() takes at least 4 arguments (2 given)', <class 'sqlalchemy.exc.ProgrammingError'>, ('(ProgrammingError) syntax error at or near "%"\nLINE 1: COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;\n ^\n',))
当然,这里的语法错误是TRUNCATE foo%s;
。我的问题是,为什么该过程挂起了,我是否可以说服它退出而不显示错误,而无需对我的代码进行大量改动?此行为与我的实际代码非常相似。
请注意,如果将语句替换为,则不会发生挂起print foobarbaz
。另外,如果我们更换,挂起仍然会发生
Session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
Session.commit()
db.dispose()
只是 Session.execute("TRUNCATE foo%s;")
我使用的是以前的版本,因为它更接近我的实际代码在做什么。
同样,multiprocessing
从图片中删除并依次遍历表格会使挂起消失,并且挂起时会出现错误。
我也对错误的形式感到困惑,尤其是一TypeError: ('__init__() takes at least 4 arguments (2 given)'
点点。这个错误来自哪里?似乎是从multiprocessing
代码中的某个地方来的。
PostgreSQL日志没有帮助。我看到很多线条
2012-01-09 14:16:34.174 IST [7810] 4f0aa96a.1e82/1 12/583 0 ERROR: syntax error at or near "%" at character 28
2012-01-09 14:16:34.175 IST [7810] 4f0aa96a.1e82/2 12/583 0 STATEMENT: COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;
但似乎没有其他相关的东西。
更新1:多亏了lbolla和他的深入分析,我得以提交有关此问题的Python错误报告。请参阅该报告中的sbt分析,以及此处。另请参阅Python错误报告修复异常酸洗。因此,按照sbt的解释,我们可以使用
import sqlalchemy.exc
e = sqlalchemy.exc.ProgrammingError("", {}, None)
type(e)(*e.args)
这使
Traceback (most recent call last):
File "<stdin>", line 9, in <module>
TypeError: __init__() takes at least 4 arguments (2 given)
更新2:至少对于SQLAlchemy,此问题已由Mike
Bayer修复,请参见错误报告StatementError不可拾取的异常。。根据Mike的建议,我也报告了与psycopg2类似的错误,尽管我没有(也没有)实际的损坏示例。无论如何,尽管他们没有提供修复的详细信息,但他们显然已经修复了它。请参见psycopg异常不能被腌制。出于良好的考虑,我还报告了一个Python错误ConfigParser异常,该异常与相应的lbolla的SO问题无关。看来他们想要对此进行测试。
无论如何,这似乎在可预见的将来仍将是一个问题,因为从总体上来说,Python开发人员似乎并不了解此问题,因此也不要对此加以防范。令人惊讶的是,似乎没有足够的人使用多重处理来解决这个问题,或者只是他们忍受了这个问题。我希望Python开发人员至少可以修复它,至少适用于Python
3,因为它很烦人。
我接受了lbolla的回答,因为没有他对问题与异常处理之间的关系的解释,我可能一无所知。我还要感谢sbt,他解释说Python无法腌制异常是问题所在。我非常感谢他们两个,请投票给他们答案。谢谢。
更新3:我发布了一个后续问题:捕获无法修复的异常并重新引发。
问题答案:
我相信,TypeError
来自multiprocessing
的get
。
我已经从脚本中删除了所有数据库代码。看看这个:
import multiprocessing
import sqlalchemy.exc
def do(kwargs):
i = kwargs['i']
print i
raise sqlalchemy.exc.ProgrammingError("", {}, None)
return i
pool = multiprocessing.Pool(processes=5) # start 4 worker processes
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
# Use get or wait?
# r.get()
r.wait()
pool.close()
pool.join()
print results
使用会r.wait
返回预期结果,但会使用r.get
raises
TypeError
。如python文档中所述,请r.wait
在后面使用map_async
。
编辑 :我必须修改我以前的答案。我现在相信这些TypeError
来自SQLAlchemy。我已经修改了脚本以重现该错误。
编辑2
:问题似乎是,multiprocessing.pool
如果任何工作程序引发异常,而该异常的构造函数需要参数(请参见此处),则该问题将无法很好地发挥。
我修改了脚本以突出显示此内容。
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
class GoodExc(Exception):
def __init__(self, a=None):
'''Optional param in the constructor.'''
self.a = a
def do(kwargs):
i = kwargs['i']
print i
raise BadExc('a')
# raise GoodExc('a')
return i
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
在您的情况下,鉴于您的代码引发了SQLAlchemy异常,我想到的唯一解决方案是捕获do
函数中的所有异常,然后重新引发一个普通的异常Exception
。像这样:
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
def do(kwargs):
try:
i = kwargs['i']
print i
raise BadExc('a')
return i
except Exception as e:
raise Exception(repr(e))
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
编辑3
:因此,这似乎是Python的错误,但是SQLAlchemy中的适当异常可以解决该问题:因此,我也提出了SQLAlchemy的问题。
作为解决该问题的方法,我认为 Edit 2 末尾的解决方案可以解决(在try-except中包装回调并重新引发)。