Optimistic Lock#

Overview#

乐观锁是一种处理在高并发情况下的冲突的策略. 简单来说就是当两个 worker 同时想对一条记录更新时候, 并且这个更新会涉及到先 get, 然后处理, 最后才更新. 也就是说这个更新的业务逻辑的周期比较长, 不是瞬间完成的, 完全有可能期间其他的 worker 把这条记录已经更新了.

与乐观锁对应的策略是悲观锁, 也就是每个 worker 开始干活之前先将这条记录锁上. 这样做是安全了, 但是由于上锁对于数据库来说是有额外的性能开销, 需要隔离和事务进行保证. 在高并发情况下会占用大量数据库资源.

而乐观锁的本质其实没有真正上锁. 它的核心策略是在数据库中加一个单调递增的版本号字段, 例如 version, 值可以是 1, 2, 3, … worker 干活之前先获得这条记录, 并且获得了当前版本. 干完活之后更新这条记录的时候, 使用 UPDATE ... SET version = ${old_version + 1} WHERE id = ${id} AND version = ${old_version} 确保只有数据库端的 version 和你之前获得的版本一致才进行更新. 并且这条记录的返回值还会告诉你更新到底有没有发生, 客户端也就能判断这个更新是成功了还是失败了.

注意, 虽然你数据库更新失败了, 但是你的 worker 可是事实在在的干了活的. 在逻辑上, 你更新失败了是应该把干的活页回滚. 但是大量真实世界的业务完美回滚都不容易, 因为你要考虑回滚到一半程序挂了的情况. 如果你的活是会改变外部系统的状态, 并且不能回滚或是回滚不成功的代价很大, 那么你就不应该使用乐观锁策略.

乐观锁还有一个问题是频繁竞争导致浪费了大量时间在竞争上. 比如很多用到了乐观锁的业务实现都会在当更新失败时候, 自动重新 get 再 update 一次. 但是这样其实逻辑上是有问题的, 我们没有说这个活要不要重新干一次. 如果不重新干了, 那是不是你逻辑上就不对了? 因为你是基于别人刚刚更新过的新数据干的活, 跟之前那次的活不一样. 如果重新干, 等于你之前的活白干了, 那么你实际上是浪费了干活的资源. 并且你还要考虑你的活的业务逻辑得满足幂等.

总之乐观锁是一个非常好的并发冲突解决策略, 但是一定要仔细考虑你的应用场景.

Sample Code#

下面我们有两个程序, optimistic_lock_1.py 扮演花 10 秒钟干活的程序, optimistic_lock_2.py 扮演 1 在干活的过程中把记录更新了的情况. 你会发现 10 秒后 worker 1 失败了.

optimistic_lock_1.py
 1# -*- coding: utf-8 -*-
 2
 3import time
 4import sqlalchemy as sa
 5import sqlalchemy.orm as orm
 6import sqlalchemy_mate as sam
 7
 8Base = orm.declarative_base()
 9
10
11class Job(Base):
12    """
13    version_id 就是用于乐观锁的属性.
14    """
15
16    __tablename__ = "jobs"
17
18    id: orm.Mapped[str] = orm.mapped_column(sa.String, primary_key=True)
19    version_id: orm.Mapped[int] = orm.mapped_column(sa.Integer)
20    value: orm.Mapped[int] = orm.mapped_column(sa.Integer)
21
22
23engine = sam.EngineCreator(
24    username="postgres",
25    password="password",
26    database="postgres",
27    host="localhost",
28    port=40311,
29).create_postgresql_pg8000()
30Base.metadata.create_all(engine)
31
32
33# 清理所有表中的数据, 确保以一个干净的表作为开始
34with engine.connect() as conn:
35    conn.execute(Job.__table__.delete())
36    conn.commit()
37
38# 插入一条数据用于实验
39with orm.Session(engine) as ses:
40    job = Job(id="job-1", version_id=0, value=0)
41    ses.add(job)
42    ses.commit()
43
44    job = ses.get(Job, "job-1")
45    # print(f"{job.id = }, {job.version_id = }, {job.value = }")
46
47with orm.Session(engine) as ses:
48    with ses.begin():
49        stmt = sa.select(Job).where(Job.id == "job-1")
50        job = ses.execute(stmt).scalar_one()
51        time.sleep(10)
52        # print(f"{job.id = }, {job.version_id = }, {job.value = }")
53        # 更新的时候也一并让 version + 1
54        new_version_id = job.version_id + 1
55        # 这里就是乐观锁的实现原理, 更新的时候要确认 version number 和自己的一致
56        # 如果不一致, 说明在 sleep 期间有其他人更新了这条记录
57        stmt = (
58            sa.update(Job)
59            .where(sa.and_(Job.id == job.id, Job.version_id == job.version_id))
60            .values(version_id=new_version_id, value=1)
61        )
62        res = ses.execute(stmt)
63        ses.commit()
64        if res.rowcount == 1:
65            print("optimistic lock update succeeded")
66        else:
67            raise ValueError("optimistic lock update failed")
68
69# 检验更新结果
70with orm.Session(engine) as ses:
71    job = ses.get(Job, "job-1")
72    print(f"{job.id = }, {job.version_id = }, {job.value = }")
optimistic_lock_2.py
 1# -*- coding: utf-8 -*-
 2
 3import sqlalchemy as sa
 4import sqlalchemy.orm as orm
 5import sqlalchemy_mate as sam
 6
 7Base = orm.declarative_base()
 8
 9
10class Job(Base):
11    __tablename__ = "jobs"
12
13    id: orm.Mapped[str] = orm.mapped_column(sa.String, primary_key=True)
14    version_id: orm.Mapped[int] = orm.mapped_column(sa.Integer)
15    value: orm.Mapped[int] = orm.mapped_column(sa.Integer)
16
17
18engine = sam.EngineCreator(
19    username="postgres",
20    password="password",
21    database="postgres",
22    host="localhost",
23    port=40311,
24).create_postgresql_pg8000()
25Base.metadata.create_all(engine)
26
27
28# 清理所有表中的数据, 确保以一个干净的表作为开始
29with engine.connect() as conn:
30    conn.execute(Job.__table__.delete())
31    conn.commit()
32
33# 插入一条数据用于实验
34with orm.Session(engine) as ses:
35    job = Job(id="job-1", version_id=0, value=0)
36    ses.add(job)
37    ses.commit()
38
39
40with orm.Session(engine) as ses:
41    with ses.begin():
42        stmt = sa.select(Job).where(Job.id == "job-1")
43        job = ses.execute(stmt).scalar_one()
44        new_version_id = job.version_id + 1
45        stmt = (
46            sa.update(Job)
47            .where(sa.and_(Job.id == job.id, Job.version_id == job.version_id))
48            .values(version_id=new_version_id, value=2)
49        )
50        res = ses.execute(stmt)
51        ses.commit()
52        if res.rowcount == 1:
53            print("optimistic lock update succeeded")
54        else:
55            print("optimistic lock update failed")
56
57with orm.Session(engine) as ses:
58    job = ses.get(Job, "job-1")
59    print(f"{job.id = }, {job.version_id = }, {job.value = }")