MySQL中的Sqlalchemy批量更新工作非常缓慢


问题内容

我正在使用SQLAlchemy 1.0.0,并且要UPDATE ONLY批量进行一些查询(如果匹配主键则进行更新,否则不执行任何操作)。

我做了一些实验,发现批量更新看起来比批量插入或批量慢得多upsert

您能否帮我指出为什么它如此缓慢地工作,还是有其他替代方法/想法来制作BULK UPDATE (not BULK UPSERT) with SQLAlchemy

下表是MYSQL中的表:

CREATE TABLE `test` (
  `id` int(11) unsigned NOT NULL,
  `value` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

和测试代码:

from sqlalchemy import create_engine, text
import time

driver = 'mysql'
host = 'host'
user = 'user'
password = 'password'
database = 'database'
url = "{}://{}:{}@{}/{}?charset=utf8".format(driver, user, password, host, database)

engine = create_engine(url)
engine.connect()

engine.execute('TRUNCATE TABLE test')

num_of_rows = 1000

rows = []
for i in xrange(0, num_of_rows):
    rows.append({'id': i, 'value': i})

print '--------- test insert --------------'
sql = '''
    INSERT INTO test (id, value)
    VALUES (:id, :value)
'''
start = time.time()
engine.execute(text(sql), rows)
end = time.time()
print 'Cost {} seconds'.format(end - start)

print '--------- test upsert --------------'
for r in rows:
    r['value'] = r['id'] + 1

sql = '''
    INSERT INTO test (id, value)
    VALUES (:id, :value)
    ON DUPLICATE KEY UPDATE value = VALUES(value)
'''
start = time.time()
engine.execute(text(sql), rows)
end = time.time()
print 'Cost {} seconds'.format(end - start)

print '--------- test update --------------'
for r in rows:
    r['value'] = r['id'] * 10

sql = '''
    UPDATE test
    SET value = :value
    WHERE id = :id
'''
start = time.time()
engine.execute(text(sql), rows)
end = time.time()
print 'Cost {} seconds'.format(end - start)

num_of_rows = 100时的输出:

--------- test insert --------------
Cost 0.568960905075 seconds
--------- test upsert --------------
Cost 0.569655895233 seconds
--------- test update --------------
Cost 20.0891299248 seconds

num_of_rows = 1000时的输出:

--------- test insert --------------
Cost 0.807548999786 seconds
--------- test upsert --------------
Cost 0.584554195404 seconds
--------- test update --------------
Cost 206.199367046 seconds

数据库服务器的网络延迟约为500毫秒。

看起来像是批量更新,它一次又一次地发送并执行每个查询,而不是批量发送?

提前致谢。


问题答案:

即使数据库服务器(如您的情况)的延迟非常糟糕,您也可以使用技巧来加快批量更新操作的速度。不用直接更新表,而是使用 阶段表 非常快速地插入新数据,然后对
目标表 进行一次join-update 。这还有一个优点,您可以大大减少必须发送到数据库的语句数量。

UPDATE如何工作?

假设您有一个表,entries并且一直有新数据出现,但是您只想更新已存储的数据。您创建目标表的副本,entries_stage其中仅包含相关字段:

entries = Table('entries', metadata,
    Column('id', Integer, autoincrement=True, primary_key=True),
    Column('value', Unicode(64), nullable=False),
)

entries_stage = Table('entries_stage', metadata,
    Column('id', Integer, autoincrement=False, unique=True),
    Column('value', Unicode(64), nullable=False),
)

然后,使用大容量插入插入数据。如果您使用MySQL的多值插入语法,这可以进一步加快速度,SQLAlchemy本身不支持这种语法,但是构建起来并不难。

INSERT INTO enries_stage (`id`, `value`)
VALUES
(1, 'string1'), (2, 'string2'), (3, 'string3'), ...;

最后,使用阶段表中的值更新目标表的值,如下所示:

 UPDATE entries e
 JOIN entries_stage es ON e.id = es.id
 SET e.value = es.value;

这样就完成了。

插件呢?

当然,这也可以加快插入速度。由于您已经在 stage-table中 包含了数据,因此您所要做的就是发出一条INSERT INTO ... SELECT语句,使用尚未在 Destination-table中 的数据。

INSERT INTO entries (id, value)
SELECT FROM entries_stage es
LEFT JOIN entries e ON e.id = es.id
HAVING e.id IS NULL;

关于这样做的好处是,你没有做INSERT IGNOREREPLACE或者ON DUPLICATE KEY UPDATE,它
会增加你的主键,即使他们会做什么