• 介绍分布式锁


    前言

    本文介绍了分布式锁遇到的问题及对应的解决方案。

    数据库更新问题

    在数据库中创建一个商品表,包含id、name、count字段,这里使用的peewee来操作数据库。

    from peewee import *
    
    db = SqliteDatabase('people.db')
    
    
    class Goods(Model):
        id = IntegerField()
        count = IntegerField()
        name = CharField()
    
        class Meta:
            database = db  # This model uses the "people.db" database.
    

    初始化数据表中数据。给id=1的商品初始化商品数量为100

    idnamecount
    1clothes100

    使用两个线程消费商品卖出的场景,每次消费数量为10,当商品数量充足时,商品数量减少10。因为程序可能会在任何地方暂停运行,我们使用time.Sleep来构造程序暂停的场景。

    import time
    
    def main():
        # 卖出的数量
        num = 10
    
        goods = Goods.get(Goods.id == 1)
        time.sleep(random.randint(1, 3))
        if goods.count < num:
            print("商品数量不足")
        else:
            goods.count -= num
            goods.save()
    
    if __name__ == '__main__':
        import threading
    
        t1 = threading.Thread(target=main)
        t2 = threading.Thread(target=main)
    
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    
    '
    运行

    运行后会发现,商品数量变成了90,这明显不符合我们的预期。两个线程都消费了10个,预期结果应该是80才对。

    idnamecount
    1clothes90

    执行过程

    在t1线程查询到的goods的商品数量为100,保存在变量中,停止,然后t2线程开始查询,查询到的数量也是100,然后往下执行时,t1减10,调用save时是告诉数据库保存的数量为90,结束。t2线程也是100-10,save时,也是保存90。

    解决方案

    应该让数据库根据自己当前的值更新,而不是使用变量中的值进行更新。

    我们恢复商品的数量到100,然后修改代码如下,使用update来让数据库根据当前的值进行更新。

    def main():
        # 卖出的数量
        num = 10
    
        goods = Goods.get(Goods.id == 1)
        time.sleep(random.randint(1, 3))
        if goods.count < num:
            print("商品数量不足")
        else:
            query = Goods.update(count=Goods.count - num).where(Goods.id == 1)
            ok = query.execute()
            if ok:
                print("更新成功")
            else:
                print("更新失败")
    '
    运行

    运行结果符合我们的预期,商品数量变成了80。

    idnamecount
    1clothes80

    超卖问题

    虽然解决了更新数量不一致的问题,依然没有解决商品超卖问题。商品数量依然是100,但是我们两个线程都想买99件。

    def main():
        # 卖出的数量
        num = 99
    
        goods = Goods.get(Goods.id == 1)
        time.sleep(random.randint(1, 3))
        if goods.count < num:
            print("商品数量不足")
        else:
            query = Goods.update(count=Goods.count - num).where(Goods.id == 1)
            ok = query.execute()
            if ok:
                print("更新成功")
            else:
                print("更新失败")
    '
    运行

    运行结果是,两个线程都成功买入,数据库表中的数量变成了-98。我们肯定期望一个线程买入成功,而另一个线程执行失败。

    idnamecount
    1clothes-98

    加锁解决

    我们在买入之前,加一个锁,这样同时只能有一个用户在执行买入。

    import threading
    R = threading.Lock()
    
    def main():
        # 卖出的数量
        num = 99
    
        R.acquire()
        goods = Goods.get(Goods.id == 1)
        time.sleep(random.randint(1, 3))
        if goods.count < num:
            print("商品数量不足")
        else:
            query = Goods.update(count=Goods.count - num).where(Goods.id == 1)
            ok = query.execute()
            if ok:
                print("更新成功")
            else:
                print("更新失败")
    
        R.release()
    '
    运行

    运行之后,可以看到库存为1件,只有一个线程更新成功,另一个更新失败。

    当前是同一个服务的两个线程中,可以拿到通一把锁,但是如果在微服务中,每一次请求因为负载均衡可能请求在不同的服务中,这两个服务甚至不在同一台服务器上,那么这个锁就失效了。

    这个时候需要使用分布式锁来解决该问题。

    基于mysql乐观锁机制实现

    什么是乐观锁?

    乐观锁,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制,乐观锁适用于多读的应用类型,这样可以提高吞吐量

    实现

    从业务中实现乐观锁机制。

    在之前的Goods表中添加version字段

    class Goods(Model):
        id = IntegerField()
        count = IntegerField()
        name = CharField()
        version = IntegerField()
    
        class Meta:
            database = db  # This model uses the "people.db" database.
    

    数据表如下:

    idnamecountversion
    1clothes1001

    在更新条件中添加版本的判断,确认在更新库存数量时,是否有其他服务更改了该条记录,如果没有则进行更新。并且在更新库存时,给版本号+1,代表着该记录已被修改。

    如果没有更新成功,则一直重试,直至成功为止。

    def main():
        # 卖出的数量
        num = 99
    
        while True:
            goods = Goods.get(Goods.id == 1)
            time.sleep(random.randint(1, 3))
            if goods.count < num:
                print("商品数量不足")
                break
            else:
                query = Goods.update(count=Goods.count - num, version=Goods.version + 1).where(Goods.id == 1,
                                                                                               Goods.version == goods.version)
                ok = query.execute()
                if ok:
                    print("更新成功")
                    break
                else:
                    print("更新失败")
    '
    运行

    运行结果:

    更新成功
    更新失败
    商品数量不足
    

    库存剩下1件,版本号更新为2,符合我们的预期。

    idnamecountversion
    1clothes11


    优点:

    1. 简单
    2. 不需要额外的组件

    缺点:

    并发高时,不断的对数据库进行查询,一样会增加数据库的压力。性能差。

    基于mysql的悲观锁机制实现

    悲观锁,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。

    缺点:并发性不高,不建议使用

    redis分布式锁

    分布式锁需要解决的问题

    1. 互斥性,任何时刻只能有一个客户拥有锁,不能同时多个客户获取

    2. 安全性,只有被持有该锁的用户删除,而不能被其他用户删除

    3. 死锁,获取锁的客户单因为某些原因而宕机,而未能释放锁,其他客户端无法获取锁,需要有机制来避免该类问题的发生

      1. 代码异常,导致无法运行到release
      2. 你的当前服务器网络问题-脑裂

    抛出问题

    我创建一个redis锁的类,使用acquire加锁,release解锁。

    class Lock:
        def __init__(self, name):
            self.redis_client = redis.Redis(host="10.61.74.37")
            self.name = name
    
        def acquire(self):
            if not self.redis_client.get(self.name):
                self.redis_client.set(self.name, 1)
                return True
            else:
                while True:
                    import time
                    time.sleep(1)
                    if self.redis_client.get(self.name):
                        self.redis_client.set(self.name, 1)
                        return True
    
        def release(self):
            self.redis_client.delete(self.name)
    '
    运行

    在入口处加锁,在出口处释放锁,这样同时只有一个服务能够执行更新操作。

    def main():
        # 卖出的数量
        num = 99
        # 商品ID
        goods_id = 1
    
        lock = Lock("lock:goods_{}".format(goods_id))
        lock.acquire()
        goods = Goods.get(Goods.id == goods_id)
        time.sleep(random.randint(1, 3))
        if goods.count < num:
            print("商品数量不足")
        else:
            query = Goods.update(count=Goods.count - num).where(Goods.id == 1)
            ok = query.execute()
            if ok:
                print("更新成功")
            else:
                print("更新失败")
        lock.release()
    '
    运行

    运行之后,发现库存的数量是-98,没有达到预期的效果。

    idnamecount
    1clothes-98

    我通过打日志的方式,在redis_client.get之后和release中打日志。

    class Lock:
        def __init__(self, name):
            self.redis_client = redis.Redis(host="10.61.74.37")
            self.name = name
    
        def acquire(self):
    
            if not self.redis_client.get(self.name):
                print("acquire\n")
                self.redis_client.set(self.name, 1)
                return True
            else:
                while True:
                    import time
                    time.sleep(1)
                    if self.redis_client.get(self.name):
                        self.redis_client.set(self.name, 1)
                        return True
    
        def release(self):
            print("release")
            self.redis_client.delete(self.name)
    '
    运行

    运行结果如下:

    acquireacquire
    
    更新成功
    release
    更新成功
    release
    

    在没有释放锁的时候,两个线程竟然都拿到锁了?

    因为,线程t1在执行redis_client.get(self.name)之后还没有redis_client.set(self.name, 1)时,线程t2也进来到这一步了,也就是两个线程同时在self.redis_client.get(self.name)self.redis_client.set(self.name, 1)之间。

    我们需要保证get和set是原子性的,才能解决该问题。

    redis中原子操作setnx

    redis中自带了一个原子性操作setnx,可以进行查询并更新。

    class Lock:
        def __init__(self, name):
            self.redis_client = redis.Redis(host="10.61.74.37")
            self.name = name
    
        def acquire(self):
    #       # 如果不存在,设置值为1,返回1. 否则返回0. 原子操作。
            if self.redis_client.setnx(self.name, 1):
                return True
            else:
                while True:
                    import time
                    time.sleep(1)
                    if self.redis_client.setnx(self.name, 1):
                        return True
    
        def release(self):
            self.redis_client.delete(self.name)
    '
    运行

    运行后,库存数量为1,符合我们的预期。

    idnamecount
    1clothes1

    死锁问题

    获取锁的客户单因为某些原因而宕机,而未能释放锁,其他客户端无法获取锁,需要有机制来避免该类问题的发生

    1. 代码异常,导致无法运行到release
    2. 断点


    解决方案:

    通过设置过期时间来解决,每次在拿锁时,给redis中对应的key设置一个过期时间,即使出现上面的问题,key也能自动被删除,解决死锁问题。

    class Lock:
        def __init__(self, name):
            self.redis_client = redis.Redis(host="10.61.74.37")
            self.name = name
    
        def acquire(self):
            if self.redis_client.set(self.name, 1, nx=True, ex=15):
                return True
            else:
                while True:
                    import time
                    time.sleep(1)
                    if self.redis_client.set(self.name, 1, nx=True, ex=15):
                        return True
    '
    运行

    但是会有新问题:

    • 当前线程如果在一段时间后没有执行完,当前的程序没有执行完,然后key过期
    • 不安全,另一个线程进来以后会将当前的key给删掉,另一个线程删掉了本该属于我设置的值。

    解决方案:

    如果当前线程没有执行完,那我的这个线程还应该在适当的时候去续租,将过期时间重新设置。一般是在快要过期的2/3的时候去续租。定时程序可以使用另一个线程去完成。

    class Lock:
        def __init__(self, name):
            self.redis_client = redis.Redis(host="10.61.74.37")
            self.name = name
    
        def acquire(self):
            if self.redis_client.set(self.name, 1, nx=True, ex=15):
                # 启动一个线程然后去定时的刷新这个过期,这个操作最好也是使用lua脚本来完成。
                return True
            else:
                while True:
                    import time
                    time.sleep(1)
                    if self.redis_client.set(self.name, 1, nx=True, ex=15):
                        return True
    '
    运行


    如何防止我设置的值被其他的线程给删除掉?

    解决方法

    可以拿锁的时候生成一个ID,并将其设置redis中键对应的值,在删除的时候,判断从redis中拿出的值是否为该程序设置的ID,如果不是,则删除失败。

    class Lock:
        def __init__(self, name, id=None):
            self.redis_client = redis.Redis(host="10.61.74.37")
            self.name = name
            self.id = id if id else str(uuid.uuid4())
    
        def acquire(self):
            if self.redis_client.set(self.name, self.id, nx=True, ex=15):
                # 启动一个线程然后去定时的刷新这个过期,这个操作最好也是使用lua脚本来完成。
                return True
            else:
                while True:
                    import time
                    time.sleep(1)
                    if self.redis_client.set(self.name, self.id, nx=True, ex=15):
                        return True
    
        def release(self):
            val = str(self.redis_client.get(self.name), encoding="utf8")
            if val == self.id:
                self.redis_client.delete(self.name)
            else:
                print("不能删除自己的锁")
    '
    运行


    但是还会有新的问题,上面的release方法,get和delete redis中key分成了两个步骤,还是有可能在两者之间中断,所以需要使用redis的lua脚本来实现两者的原子操作

    py-redis-lock和redis-py

    该库是开源的分布式锁py实现库,解决了上面的问题。后面有空可以分析下该库的源码。

    redis的分布式锁优缺点

    优点

    • 性能高
    • 简单
    • redis本身使用很频繁,不需要额外维护

    缺点

    • 依赖了第三方组件

    • 单机的redis挂掉的可能性相对较高,需要引入哨兵机制

    • redis的cluster的引入会导致刚才的redis的锁会有问题 - redlock

    欢迎关注,互相学习,共同进步~

    我的个人博客

    我的微信公众号:编程黑洞

  • 相关阅读:
    vue2不同版本下如何分环境打包
    深度解析:Stable Diffusion中negative prompt是如何作用的?
    macOS Big Sur(macos11版本)
    Excel转换为Lua的配置文件
    UE5、CesiumForUnreal实现加载GeoJson绘制单面(Polygon)功能(StaticMesh方式)
    T31开发笔记:metaipc测试
    16 Linux 内核定时器
    【Python从入门到进阶】34、selenium基本概念及安装流程
    Python常用基础语法知识点大全
    Vulnhub靶机:CEREAL_ 1
  • 原文地址:https://blog.csdn.net/qq_22918243/article/details/127062941