Optimistic Lock#
Overview#
乐观锁是一种处理在高并发情况下的冲突的策略. 简单来说就是当两个 worker 同时想对一条记录更新时候, 并且这个更新会涉及到先 get, 然后处理, 最后才更新. 也就是说这个更新的业务逻辑的周期比较长, 不是瞬间完成的, 完全有可能期间其他的 worker 把这条记录已经更新了.
与乐观锁对应的策略是悲观锁, 也就是每个 worker 开始干活之前先将这条记录锁上. 这样做是安全了, 但是由于上锁对于数据库来说是有额外的性能开销, 需要隔离和事务进行保证. 在高并发情况下会占用大量数据库资源.
而乐观锁的本质其实没有真正上锁. 它的核心策略是在数据库中加一个单调递增的版本号字段, 例如 version, 值可以是 1, 2, 3, … worker 干活之前先获得这条记录, 并且获得了当前版本. 干完活之后更新这条记录的时候, 使用 UPDATE ... SET version = ${old_version + 1} WHERE id = ${id} AND version = ${old_version} 确保只有数据库端的 version 和你之前获得的版本一致才进行更新. 并且这条记录的返回值还会告诉你更新到底有没有发生, 客户端也就能判断这个更新是成功了还是失败了.
注意, 虽然你数据库更新失败了, 但是你的 worker 可是事实在在的干了活的. 在逻辑上, 你更新失败了是应该把干的活页回滚. 但是大量真实世界的业务完美回滚都不容易, 因为你要考虑回滚到一半程序挂了的情况. 如果你的活是会改变外部系统的状态, 并且不能回滚或是回滚不成功的代价很大, 那么你就不应该使用乐观锁策略.
乐观锁还有一个问题是频繁竞争导致浪费了大量时间在竞争上. 比如很多用到了乐观锁的业务实现都会在当更新失败时候, 自动重新 get 再 update 一次. 但是这样其实逻辑上是有问题的, 我们没有说这个活要不要重新干一次. 如果不重新干了, 那是不是你逻辑上就不对了? 因为你是基于别人刚刚更新过的新数据干的活, 跟之前那次的活不一样. 如果重新干, 等于你之前的活白干了, 那么你实际上是浪费了干活的资源. 并且你还要考虑你的活的业务逻辑得满足幂等.
总之乐观锁是一个非常好的并发冲突解决策略, 但是一定要仔细考虑你的应用场景.
Sample Code#
下面我们有两个程序, optimistic_lock_1.py 扮演花 10 秒钟干活的程序, optimistic_lock_2.py 扮演 1 在干活的过程中把记录更新了的情况. 你会发现 10 秒后 worker 1 失败了.
optimistic_lock_1.py
1# -*- coding: utf-8 -*-
2
3import time
4import sqlalchemy as sa
5import sqlalchemy.orm as orm
6import sqlalchemy_mate as sam
7
8Base = orm.declarative_base()
9
10
11class Job(Base):
12 """
13 version_id 就是用于乐观锁的属性.
14 """
15
16 __tablename__ = "jobs"
17
18 id: orm.Mapped[str] = orm.mapped_column(sa.String, primary_key=True)
19 version_id: orm.Mapped[int] = orm.mapped_column(sa.Integer)
20 value: orm.Mapped[int] = orm.mapped_column(sa.Integer)
21
22
23engine = sam.EngineCreator(
24 username="postgres",
25 password="password",
26 database="postgres",
27 host="localhost",
28 port=40311,
29).create_postgresql_pg8000()
30Base.metadata.create_all(engine)
31
32
33# 清理所有表中的数据, 确保以一个干净的表作为开始
34with engine.connect() as conn:
35 conn.execute(Job.__table__.delete())
36 conn.commit()
37
38# 插入一条数据用于实验
39with orm.Session(engine) as ses:
40 job = Job(id="job-1", version_id=0, value=0)
41 ses.add(job)
42 ses.commit()
43
44 job = ses.get(Job, "job-1")
45 # print(f"{job.id = }, {job.version_id = }, {job.value = }")
46
47with orm.Session(engine) as ses:
48 with ses.begin():
49 stmt = sa.select(Job).where(Job.id == "job-1")
50 job = ses.execute(stmt).scalar_one()
51 time.sleep(10)
52 # print(f"{job.id = }, {job.version_id = }, {job.value = }")
53 # 更新的时候也一并让 version + 1
54 new_version_id = job.version_id + 1
55 # 这里就是乐观锁的实现原理, 更新的时候要确认 version number 和自己的一致
56 # 如果不一致, 说明在 sleep 期间有其他人更新了这条记录
57 stmt = (
58 sa.update(Job)
59 .where(sa.and_(Job.id == job.id, Job.version_id == job.version_id))
60 .values(version_id=new_version_id, value=1)
61 )
62 res = ses.execute(stmt)
63 ses.commit()
64 if res.rowcount == 1:
65 print("optimistic lock update succeeded")
66 else:
67 raise ValueError("optimistic lock update failed")
68
69# 检验更新结果
70with orm.Session(engine) as ses:
71 job = ses.get(Job, "job-1")
72 print(f"{job.id = }, {job.version_id = }, {job.value = }")
optimistic_lock_2.py
1# -*- coding: utf-8 -*-
2
3import sqlalchemy as sa
4import sqlalchemy.orm as orm
5import sqlalchemy_mate as sam
6
7Base = orm.declarative_base()
8
9
10class Job(Base):
11 __tablename__ = "jobs"
12
13 id: orm.Mapped[str] = orm.mapped_column(sa.String, primary_key=True)
14 version_id: orm.Mapped[int] = orm.mapped_column(sa.Integer)
15 value: orm.Mapped[int] = orm.mapped_column(sa.Integer)
16
17
18engine = sam.EngineCreator(
19 username="postgres",
20 password="password",
21 database="postgres",
22 host="localhost",
23 port=40311,
24).create_postgresql_pg8000()
25Base.metadata.create_all(engine)
26
27
28# 清理所有表中的数据, 确保以一个干净的表作为开始
29with engine.connect() as conn:
30 conn.execute(Job.__table__.delete())
31 conn.commit()
32
33# 插入一条数据用于实验
34with orm.Session(engine) as ses:
35 job = Job(id="job-1", version_id=0, value=0)
36 ses.add(job)
37 ses.commit()
38
39
40with orm.Session(engine) as ses:
41 with ses.begin():
42 stmt = sa.select(Job).where(Job.id == "job-1")
43 job = ses.execute(stmt).scalar_one()
44 new_version_id = job.version_id + 1
45 stmt = (
46 sa.update(Job)
47 .where(sa.and_(Job.id == job.id, Job.version_id == job.version_id))
48 .values(version_id=new_version_id, value=2)
49 )
50 res = ses.execute(stmt)
51 ses.commit()
52 if res.rowcount == 1:
53 print("optimistic lock update succeeded")
54 else:
55 print("optimistic lock update failed")
56
57with orm.Session(engine) as ses:
58 job = ses.get(Job, "job-1")
59 print(f"{job.id = }, {job.version_id = }, {job.value = }")