• 23、分布式锁


    一、并发下的库存扣减

    1 - 并发问题模拟

    • inventory_srv/tests/inventory/main.go:修改为并发下的库存扣减
    package main
    
    import (
    	"context"
    	"fmt"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/credentials/insecure"
    	"sync"
    
    	"nd/inventory_srv/proto"
    	"nd/inventory_srv/tests"
    )
    
    var invClient proto.InventoryClient
    var conn *grpc.ClientConn
    
    func Init() {
    	var err error
    	conn, err = grpc.Dial(tests.TargetAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    	if err != nil {
    		panic(err)
    	}
    	invClient = proto.NewInventoryClient(conn)
    }
    
    func TestSell(wg *sync.WaitGroup) {
    	/*
    		1. 第一件扣减成功: 第二件: 1. 没有库存信息 2. 库存不足
    		2. 两件都扣减成功
    	*/
    	defer wg.Done()
    	_, err := invClient.Sell(context.Background(), &proto.SellInfo{
    		GoodsInfo: []*proto.GoodsInvInfo{
    			{GoodsId: 1, Num: 1},
    			//{GoodsId: 422, Num: 30},
    		},
    	})
    	if err != nil {
    		panic(err)
    	}
    	fmt.Println("库存扣减成功")
    }
    
    func main() {
    	Init()
    	//var i int32
    	//for i = 1; i <= 9; i++ {
    	//	TestSetInv(i, 90)
    	//}
    
    	//TestInvDetail(2)
    	//TestSell()
    	//TestReback()
    
    	//并发情况之下,库存无法正确的扣减
    	var wg sync.WaitGroup
    	wg.Add(20)
    	for i := 0; i < 20; i++ {
    		go TestSell(&wg)
    	}
    
    	wg.Wait()
    	conn.Close()
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    在这里插入图片描述

    2 - 并发问题分析

    在这里插入图片描述

    3 - 锁解决并发问题

    • inventory_srv/handler/inventory.go:非完美方案
    var m sync.Mutex
    
    func (*InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
    	// 扣减库存,本地事务
    	// 数据库基本的一个应用场景:数据库事务
    	// 并发情况之下 可能会出现超卖 1
    	tx := global.DB.Begin()
    	m.Lock() // 获取锁
    	for _, goodInfo := range req.GoodsInfo {
    		var inv model.Inventory
    		if result := global.DB.Where(&model.Inventory{Goods: goodInfo.GoodsId}).First(&inv); result.RowsAffected == 0 {
    			tx.Rollback() // 回滚之前的操作
    			return nil, status.Errorf(codes.InvalidArgument, "没有库存信息")
    		}
    		// 判断库存是否充足
    		if inv.Stocks < goodInfo.Num {
    			tx.Rollback() // 回滚之前的操作
    			return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
    		}
    		// 扣减,这里会出现数据不一致的问题
    		inv.Stocks -= goodInfo.Num
    		tx.Save(&inv) // 一旦使用了事务的,保存修改数据库的操作就需要使用事务的tx,而不能使用db
    	}
    	tx.Commit() // 需要自己手动提交操作
    	m.Unlock()
    	return &emptypb.Empty{}, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    二、分布式锁

    1 - 为什么需要分布式锁

    在这里插入图片描述

    2 - 常见的分布式锁方案

    • 有三种常见的分布式锁方案
      • 基于musql的悲观锁、乐观锁
      • 基于redis的分布式锁
      • 基于zookeeper的分布式锁

    三、基于mysql实现的分布式锁

    悲观锁与乐观锁是人们定义出来的概念,可以理解为一种思想,是处理并发资源的常用手段;
    不要把他们与mysql提供的锁机制(表锁、行锁、排它锁、共享锁)混为一谈

    1 - 悲观锁概念

    • 悲观锁概念:顾名思义,就是对于数据的处理持悲观态度,总认为会发生并发冲突,获取和修改数据时,别人会修改数据;所以在整个数据处理过程中,需要将数据锁定
    • 悲观锁的实现:通常依靠数据库提供的锁机制实现,比如mysql的排他锁,select … for update来实现悲观锁;例如,商品秒杀过程中,库存数量的减少,避免出现超卖的情况

    在这里插入图片描述

    2 - 悲观锁实现

    • mysql中的悲观锁实现for update
      • mysql请求一把锁 for update
      • 使用 for update的时候要注意:每个语句mysql都是默认提交的
      • 需要关闭autocommit:set autocommit=0;(注意这个只针对当前窗口有效,不是全局的);(查询select @@autocommit;
      • 具体执行逻辑:select * from inventary where goods=1 for update;
      • 释放锁:commit;
    • for update的本质
      • 其实是行锁,只会锁住满足条件的数据,where goods=1where goods=2这2个是不会触发锁的
      • 如果条件部分没有索引goods,那么行锁会升级成表锁
      • 锁只是锁住要更新的语句 for update,普通的查询不会锁住
      • 如果没有满足条件,不会锁表

    3 - gorm实现 for update

    • inventory_srv/handler/inventory.go
      • 去掉之前的mutex锁
      • tx := global.DB.Begin()这个就相当于关闭了autocommit
      • global.DB替换为tx.Clauses(clause.Locking{Strength: "UPDATE"})
    func (*InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
    	// 扣减库存,本地事务
    	// 数据库基本的一个应用场景:数据库事务
    	// 并发情况之下 可能会出现超卖 1
    	tx := global.DB.Begin()
    	for _, goodInfo := range req.GoodsInfo {
    		var inv model.Inventory
    		if result := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where(&model.Inventory{Goods: goodInfo.GoodsId}).First(&inv); result.RowsAffected == 0 {
    			tx.Rollback() // 回滚之前的操作
    			return nil, status.Errorf(codes.InvalidArgument, "没有库存信息")
    		}
    		// 判断库存是否充足
    		if inv.Stocks < goodInfo.Num {
    			tx.Rollback() // 回滚之前的操作
    			return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
    		}
    		// 扣减,这里会出现数据不一致的问题
    		inv.Stocks -= goodInfo.Num
    		tx.Save(&inv) // 一旦使用了事务的,保存修改数据库的操作就需要使用事务的tx,而不能使用db
    	}
    	tx.Commit() // 需要自己手动提交操作
    	return &emptypb.Empty{}, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    4 - 乐观锁的概念

    • 乐观锁概念:乐观锁准确的说不是一种锁,而是解决数据不一致的方案
    • 乐观锁的实现原理

    在这里插入图片描述

    5 - gorm 实现mysql乐观锁

    • inventory_srv/handler/inventory.go
    func (*InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
    	// 扣减库存,本地事务
    	// 数据库基本的一个应用场景:数据库事务
    	// 并发情况之下 可能会出现超卖 1
    	tx := global.DB.Begin()
    	for _, goodInfo := range req.GoodsInfo {
    		var inv model.Inventory
    
    		for {
    			if result := global.DB.Where(&model.Inventory{Goods: goodInfo.GoodsId}).First(&inv); result.RowsAffected == 0 {
    				tx.Rollback() // 回滚之前的操作
    				return nil, status.Errorf(codes.InvalidArgument, "没有库存信息")
    			}
    			// 判断库存是否充足
    			if inv.Stocks < goodInfo.Num {
    				tx.Rollback() // 回滚之前的操作
    				return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
    			}
    			// 扣减,这里会出现数据不一致的问题
    			inv.Stocks -= goodInfo.Num
    			//update inventory set stocks = stocks-1, version=version+1 where goods=goods and version=version
    			//这种写法有瑕疵,为什么?
    			//零值 对于int类型来说 默认值是0 这种会被gorm给忽略掉
    			if result := tx.Model(&model.Inventory{}).Select("Stocks", "Version").Where("goods = ? and version= ?",
    				goodInfo.GoodsId, inv.Version).Updates(model.Inventory{Stocks: inv.Stocks, Version: inv.Version + 1});
    				result.RowsAffected == 0 {
    				zap.S().Info("库存扣减失败")
    			} else {
    				break
    			}
    		}
    	}
    	tx.Commit() // 需要自己手动提交操作
    	return &emptypb.Empty{}, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    四、基于redis实现分布式锁

    1 - redsync方案测试

    业务开发的最常用的方案:基于redis实现分布式锁

    package main
    
    import (
    	"fmt"
    	goredislib "github.com/go-redis/redis/v8"
    	"github.com/go-redsync/redsync/v4"
    	"github.com/go-redsync/redsync/v4/redis/goredis/v8"
    	"sync"
    	"time"
    )
    
    func main() {
    	// Create a pool with go-redis (or redigo) which is the pool redisync will
    	// use while communicating with Redis. This can also be any pool that
    	// implements the `redis.Pool` interface.
    	client := goredislib.NewClient(&goredislib.Options{
    		Addr: "192.168.91.129:6379", // 这个自己修改成redis的ip地址
    	})
    	pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)
    
    	// Create an instance of redisync to be used to obtain a mutual exclusion
    	// lock.
    	rs := redsync.New(pool)
    
    	// Obtain a new mutex by using the same name for all instances wanting the
    	// same lock.
    	gNum := 2 // 启动2个协程
    
    	mutexname := "1" // 假设以goods的id来命名锁
    	var wg sync.WaitGroup
    	wg.Add(gNum)
    	for i := 0; i < gNum; i++ {
    		go func() {
    			defer wg.Done()
    			mutex := rs.NewMutex(mutexname)
    			//zookeeper的分布式锁 -
    			fmt.Println("开始获取锁")
    			if err := mutex.Lock(); err != nil {
    				panic(err)
    			}
    			fmt.Println("获取锁成功")
    			time.Sleep(time.Second * 3)
    			fmt.Println("开始释放锁")
    			if ok, err := mutex.Unlock(); !ok || err != nil {
    				panic("unlock failed")
    			}
    			fmt.Println("释放锁成功")
    		}()
    	}
    	wg.Wait()
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    在这里插入图片描述

    2 - 库存服务集成redsync

    • inventory_srv/handler/inventory.go
      • 待优化1:将redis的配置集成到nacos中
      • 待优化2:将redis的初始化分离出来
    func (*InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
    	client := goredislib.NewClient(&goredislib.Options{
    		Addr: "192.168.91.129:6379",
    	})
    	pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)
    	rs := redsync.New(pool)
    
    	tx := global.DB.Begin()
    	for _, goodInfo := range req.GoodsInfo {
    		var inv model.Inventory
    		mutex := rs.NewMutex(fmt.Sprintf("goods_%d", goodInfo.GoodsId))
    		if err := mutex.Lock(); err != nil {
    			return nil, status.Errorf(codes.Internal, "获取redis分布式锁异常")
    		}
    		if result := global.DB.Where(&model.Inventory{Goods: goodInfo.GoodsId}).First(&inv); result.RowsAffected == 0 {
    			tx.Rollback() //回滚之前的操作
    			return nil, status.Errorf(codes.InvalidArgument, "没有库存信息")
    		}
    		//判断库存是否充足
    		if inv.Stocks < goodInfo.Num {
    			tx.Rollback() //回滚之前的操作
    			return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
    		}
    		//扣减, 会出现数据不一致的问题 - 锁,分布式锁
    		inv.Stocks -= goodInfo.Num
    		tx.Save(&inv)
    
    		if ok, err := mutex.Unlock(); !ok || err != nil {
    			return nil, status.Errorf(codes.Internal, "释放redis分布式锁异常")
    		}
    	}
    	tx.Commit() // 需要自己手动提交操作
    	//m.Unlock() //释放锁
    	return &emptypb.Empty{}, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    五、redsync深入原理

    1 - SetNX原理

    • 非原子操作产生的问题

    在这里插入图片描述

    • SetNX:Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值;将获取和设置合并成一个原子操作

    在这里插入图片描述

    • redis的SetNX底层源码
      在这里插入图片描述

    2 - 过期与延长过期锁

    • 死锁场景:考虑如果业务逻辑执行过程中挂了,或者是其他原因服务挂了,没有删除key,就会导致死锁
    • 死锁解决方案
      • 设置过期时间

    在这里插入图片描述

    • 问题:服务宕机:如果设置了过期时间,那么如果过期时间到了我的业务逻辑没有执行完怎么办
      • 在过期之前刷新一下
      • 需要自己启动协程完成延时的工作
        • 延迟的接口可能会带来负面的影响-如果其中一个服务hung住了,2s就能执行完的逻辑但是hung住之后就会导致一直去申请延长锁;这样会导致其他人永远获取不到锁,这个是非常致命的

    在这里插入图片描述

    3 - 锁的安全性

    • 分布式锁需要解决的问题
      • ①.互斥性 —— setnx
      • ②.死锁 —— 过期删除
      • ③.安全性 —— 锁只能被持有该锁的用户删除,不能被其他用户删除
        • 当时设置的value值是多少,只有当时获取到锁的go程知道
        • 在删除的时候取出redis中的值和当前保存的值对比就能知道是否能删除
          在这里插入图片描述
          在这里插入图片描述

    六、redlock

    1 - 基于redis分布式锁在集群中的问题

    在这里插入图片描述

    2 - redlock原理

    在这里插入图片描述

    • 时钟漂移:如果redis服务器的机器时钟发生了向前跳跃,就会导致这个key过早失效,比如客户端1拿到锁后,key的过期时间是12:02分,但redis服务器本身的时钟比客户端快了2分钟,导致key在12:00的时候就失效了,这时候如果客户端1还没有释放锁的话,就可能导致多个客户端同时持有同一把锁的问题

    在这里插入图片描述

    七、完整源码

    • 完整源码下载mxshop_srvsV8.7.rar
    • 源码说明:(nacos的ip配置自行修改,全局变量DEV_CONFIG设置:1=zsz,2=comp,3=home)
      • goods_srv/model/sql/mxshop_goods.sql:包含了建表语句
      • other_import/api.json:YApi的导入文件
      • other_import/nacos_config_export_user.zip:nacos的user配置集导入文件
      • other_import/nacos_config_export_goods.zip:nacos的goods配置集导入文件
      • other_import/nacos_config_export_inventory.zip:nacos的inventory的配置导入文件
  • 相关阅读:
    verilog 从入门到看得懂---matlab 自动生成verilog
    柯桥实用口语学习,韩语口头禅系列短句-恋爱篇
    SaaS的本质其实是和网络游戏一样的
    【Qt高阶】Linux安装了多个版本的Qt 部署Qt程序,出包【2023.10.17】
    一个完全纯净的windows资源站
    如何免费下载百度指数数据
    「Vue3」手把手教你使用 Vite 快速搭建项目
    mp3stego(mp3隐写工具)使用手册
    如何把JavaWeb项目部署到服务器
    海艺互娱与亚马逊云科技合作,在生成式AI领域探索更多的训练方向
  • 原文地址:https://blog.csdn.net/qq23001186/article/details/126271485