Lock Row for Update#

Overview#

在高并发的环境下, 我们要慎重考虑多个 Worker 对同一个 row 进行 Update 的情况. 为了方便说明, 我们将情况简化为 2 个 Worker A 和 B 同时对一个 row update 的情况.

在实际生产环境中, 常见的并发可以分为两种情况.

情况 1: 两个 worker 同时发起了一个 update 操作, 两个操作相差 0.001 秒到达数据库, 而这个 update 操作需要耗时 0.005 秒. 在这种情况下, 数据库引擎内部会将对同一个 row 的两个操作序列化执行. 换言之哪个请求先到就执行哪个, 然后再执行第二个. 这种情况比较简单, 数据库引擎本身就把冲突通过序列化解决了.

情况 2: 在这种情况下, 执行 update 之前我们需要先执行一次 get 获取目前的数据, 然后决定 update 的值应该是什么. 而从 get 到目前的值和真正 update 之间是有一段间隔的, 在这个期间其他人可能会对这个 row 进行 update 并生效, 导致你这个 update 从逻辑上不成立. 在这种情况下我们需要在 Application Code 层面做一些加锁操作进行处理.

本文介绍了如何用 sqlalchemy 正确处理情况 2.

How it Work#

在现代关系数据库系统中, 通常都会有 SELECT FOR UPDATE 这一手动加锁的语法. 用户可以用 SELECT 选定一部分 row, 所有被选定的 row 在一个 Transaction 的生命周期内会被加锁, 也就是当你 commit 或是 rollback 后锁才会取消. 这个动作也被称为 “获取锁”. 之所以叫它获取锁是因为这个动作是尝试获取锁, 如果不成功 (已经被其他客户端锁住了), 那么用户能选择几种处理方案中的一种. 默认是一直等到这个锁被释放, 也就是自己获得了一把新锁. 你还可以用 NOWAIT 来选择立刻抛出异常. 你还可以用 SKIP LOCKED 来自动跳过已经有锁的行.

例如我们在 postgres 的官方文档中 (查看 [ FOR { UPDATE | NO KEY UPDATE | SHARE | KEY SHARE } [ OF table_name [, ...] ] [ NOWAIT | SKIP LOCKED ] [...] ] 这一部分) 就可以看到这一语法的详细说明.

sqlalchemy 的官方文档 中我们可以看到这里的关键点有两点:

  1. with orm.Session().begin(): 或者类似的语法先创建一个 Transaction 的 context manager.

  2. 所有要竞争锁的人用 select(...).where(...).with_for_update(nowait=True) 语法获取锁. (建议用 no wait, 如果被锁住了则立刻抛出异常, 这适用于大多数情况).

总结一下. SELECT ... FOR UPDATE 是一种悲观锁的做法. 也就是手动将 row 锁住, 然后再开始干活.

Sample Code#

下面我们有两个程序, select_for_update_1.py 扮演先锁住 row 30 秒的程序, select_for_update_2.py 扮演后来尝试想获取锁的程序.

select_for_update_1.py
 1# -*- coding: utf-8 -*-
 2
 3import time
 4import sqlalchemy as sa
 5import sqlalchemy.orm as orm
 6import sqlalchemy_mate as sam
 7
 8Base = orm.declarative_base()
 9
10
11class Job(Base):
12    __tablename__ = "jobs"
13
14    id: orm.Mapped[str] = orm.mapped_column(sa.String, primary_key=True)
15    value: orm.Mapped[int] = orm.mapped_column(sa.Integer)
16
17
18engine = sam.EngineCreator(
19    username="postgres",
20    password="password",
21    database="postgres",
22    host="localhost",
23    port=40311,
24).create_postgresql_pg8000()
25Base.metadata.create_all(engine)
26
27
28# 清理所有表中的数据, 确保以一个干净的表作为开始
29with engine.connect() as conn:
30    conn.execute(Job.__table__.delete())
31    conn.commit()
32
33# 插入一条数据用于实验
34with orm.Session(engine) as ses:
35    job = Job(id="job-1", value=0)
36    ses.add(job)
37    ses.commit()
38
39# 用 with_for_update 获取一条锁, 并且在 30 秒内不会释放
40with orm.Session(engine) as ses:
41    with ses.begin():
42        # 加了 nowait 则当锁被占用时会立刻抛出异常
43        stmt = sa.select(Job).where(Job.id == "job-1").with_for_update(nowait=True)
44        # 如果不加 nowait, 则不会立刻抛出异常, 而是一直等到获得锁
45        # stmt = sa.select(Job).where(Job.id == "job-1").with_for_update()
46        job = ses.execute(stmt).scalar_one()
47        print(f"Got job {job.id = }, doing a long transaction ...")
48        time.sleep(30)
49        ses.commit()
50        print("Transaction finished")
select_for_update_2.py
 1# -*- coding: utf-8 -*-
 2
 3import sqlalchemy as sa
 4import sqlalchemy.orm as orm
 5import sqlalchemy_mate as sam
 6
 7Base = orm.declarative_base()
 8
 9
10class Job(Base):
11    __tablename__ = "jobs"
12
13    id: orm.Mapped[str] = orm.mapped_column(sa.String, primary_key=True)
14    value: orm.Mapped[int] = orm.mapped_column(sa.Integer)
15
16
17engine = sam.EngineCreator(
18    username="postgres",
19    password="password",
20    database="postgres",
21    host="localhost",
22    port=40311,
23).create_postgresql_pg8000()
24Base.metadata.create_all(engine)
25
26# 在一个 transaction 中尝试获取锁, 当然由于另一个程序已经在运行了, 所以肯定会失败.
27with orm.Session(engine) as ses:
28    with ses.begin():
29        stmt = sa.select(Job).where(Job.id == "job-1").with_for_update(nowait=True)
30        job = ses.execute(stmt).scalar_one()
31        print(f"before: {job.id = }, {job.value = }")  # this will print
32        job.value = 1
33        ses.commit()  # this will not work
34
35    job = ses.get(Job, "job-1")
36    print(f"after: {job.id = }, {job.value = }")