• 1 分布式锁


    1、什么是锁

    场景描述

            锁在JAVA中是一个非常重要的概念,尤其是在当今的互联网时代,高并发的场景,更是离不开锁。那么锁到底是什么呢?在计算机科学中,锁(lock)或互斥(mutex)是一种同步机制,用于在有许多执行线程的环境中强制对资源的访问限制。锁旨在强制实施互斥排他、并发控制策略。咱们举一个生活中的例子:大家都去过超市买东西,如果你随身带了包呢,要放到储物柜里。咱们把这个例子再极端一下,假如柜子只有一个,现在同时来了3个人A,B,C都要往这个柜子里放东西。这个场景就构造了一个多线程,多线程自然离不开锁。如下图所示:

            A,B,C都要往柜子里放东西,可是柜子只能放一件东西,那怎么办呢?这个时候就引出了锁的概念,3个人谁抢到了柜子的锁,谁就可以使用这个柜子,其他人只能等待。比如:C抢到了锁,C可以使用这个柜子。A和B只能等待,等C使用完了,释放锁以后,A和B再争抢锁,谁抢到了,再继续使用柜子。

    代码实例

            我们再将上面的场景反映到程序中,首先创建一个柜子的类:

    1. public class Cabinet {
    2. //柜子中存储的数字
    3. private int storeNumber;
    4. public void setStoreNumber(int storeNumber){
    5. this.storeNumber = storeNumber;
    6. }
    7. public int getStoreNumber(){
    8. return this.storeNumber;
    9. }
    10. }

            柜子中存储的是数字。

            然后我们将3个用户抽象成一个类:

    1. public class User {
    2. //柜子
    3. private Cabinet cabinet;
    4. //存储的数字
    5. private int storeNumber;
    6. public User(Cabinet cabinet,int storeNumber){
    7. this.cabinet = cabinet;
    8. this.storeNumber = storeNumber;
    9. }
    10. //使用柜子
    11. public void useCabinet(){
    12. cabinet.setStoreNumber(storeNumber);
    13. }
    14. }

            在用户的构造方法中,需要传入两个参数,一个是要使用的柜子,另一个是要存储的数字。到这里,柜子和用户都已经抽象成了类,接下来我们再写一个启动类,模拟一下3个用户使用柜子的场景:

    1. public class Starter {
    2. public static void main(String[] args){
    3. Cabinet cabinet = new Cabinet();
    4. ExecutorService es = Executors.newFixedThreadPool(3);
    5. for (int i = 0; i < 3; i++){
    6. final int storeNumber = i;
    7. es.execute(()->{
    8. User user = new User(cabinet,storeNumber);
    9. user.useCabinet();
    10. System.out.println("我是用户"+storeNumber+",我存储的数字是:"+cabinet.getStoreNumber());
    11. });
    12. }
    13. es.shutdown();
    14. }
    15. }

            我们仔细看一下这个main函数的过程:

    • 首先创建一个柜子的实例,由于场景中只有一个柜子,所以我们只创建了一个柜子实例。
    • 然后我们新建了一个线程池,线程池中有3个线程,每个线程执行一个用户的操作。
    • 再来看看每个线程具体的执行过程,新建用户实例,传入的是用户使用的柜子,我们这里只有一个柜子,所以传入这个柜子的实例,然后传入这个用户要存储的数字,分别是1,2,3,也分别对应着用户A,用户B和用户C。
    • 再调用使用柜子的操作,也就是向柜子中放入要存储的数字,然后立即从柜子中取出数字并打印出来。

            我们运行一下main函数,看看打印的结果是什么?

    1. 我是用户0,我存储的数字是:2
    2. 我是用户2,我存储的数字是:2
    3. 我是用户1,我存储的数字是:2

            从结果中我们可以看出,3个用户在柜子中存储的数字都变成了2。我们再次运行程序,结果如下:

    1. 我是用户1,我存储的数字是:1
    2. 我是用户2,我存储的数字是:1
    3. 我是用户0,我存储的数字是:1

            这次又变成了1。这是为什么呢?问题就出在user.useCabinet()这个方法上,这是因为柜子这个实例没有加锁的原因,3个用户并行的执行,向柜子中存储他们的数字,虽然是3个用户并行的同时操作,但是在具体赋值时,也是有顺序的,因为变量storeNumber只占有一块内存,storeNumber只存储一个值,存储最后的线程所设置的值。至于哪个线程排在最后,则完全不确定。赋值语句完成后,进入到打印语句,打印语句取storeNumber的值并打印,这时storeNumber存储的是最后一个线程所设置的值,3个线程取到的值是相同的,就像上面打印的结果一样。

            那么如何解决这个文体呢?这就引出了我们要讲解的重点内容——锁。我们在赋值语句上加锁,这样当多个线程(本文当中的多个用户)同时赋值时,谁抢到了这把锁,谁才能赋值。这样保证同一时刻只能有一个线程进行赋值操作,避免了之前的混乱的情况。

            那个在程序中如何加锁呢?这就要使用JAVA中的一个关键字——synchronized。synchronized分为synchronized方法和synchronized同步代码块。下面我们看一下两者的具体用法:

    • synchronized方法,顾名思义,是吧synchronized关键字写在方法上,它表示这个方法是加了所的,当多个线程同时调用这个方法时,只有获得锁的线程才可以执行。我们看一下下面的例子:
    1. public synchronized String getTicket(){
    2. return "xxx";
    3. }

            我们可以看到getTicket()方法加了锁,当多个线程并发执行的时候,只有获得锁的线程才可以执行,其他线程只能等待。

    • 我们再来看看synchronized块,synchronized块语法是:
    1. synchronized (对象锁){
    2. ……
    3. }

            我们将需要加锁的语句都写在synchronized块内,而在对象锁的位置,需要填写锁的对象,他的含义是,当多个线程并发执行时,只有获得你写的这个对象的锁,才能执行后面的语句,其他的线程只能等待。synchronized块通常的写法是synchronized(this),这个this是当前类的实例,也就是说获得当前这个类的对象的锁,才能执行这个方法,这样写的效果和synchronized方法时一样的。

            再回到我们的示例当中,如何解决storeNumber混乱的问题呢?咱们可以在设置storeNumber的方法上加锁,这样保证同时只有一个线程能调用这个方法。如下所示:

    1. public class Cabinet {
    2. //柜子中存储的数字
    3. private int storeNumber;
    4. public synchronized void setStoreNumber(int storeNumber){
    5. this.storeNumber = storeNumber;
    6. }
    7. public int getStoreNumber(){
    8. return this.storeNumber;
    9. }
    10. }

            我们在set方法上加了synchronized关键字,这样在存储数字时,就不会并行的执行了,而是哪个用户抢到锁,哪个用户执行存储数字的方法。我们再运行一下main函数,看看运行的结果:

    1. 我是用户1,我存储的数字是:1
    2. 我是用户2,我存储的数字是:1
    3. 我是用户0,我存储的数字是:1

            咦?结果还是混乱的,为什么?我们再检查一下代码:

    1. es.execute(()->{
    2. User user = new User(cabinet,storeNumber);
    3. user.useCabinet();
    4. System.out.println("我是用户"+storeNumber+",我存储的数字是:"+cabinet.getStoreNumber());
    5. });

            我们可以看到在useCabinet和打印的方法是两个语句,并没有保持原子性,虽然在set方法上加了锁,但是在打印时又存在了一个并发,打印语句是有锁的,但是不能确定哪个线程去执行。所以这里,我们要保证useCabinet和打印的方法的原子性,我们使用synchronized块,但是synchronized块里的对象我们使用谁的?这又是一个问题,user还是cabinet?当然是cabinet,因为每个每个线程都初始化了user,总共有3个user对象了,而cabinet对象只有一个,所以synchronized要用cabinet对象。如下:

    1. synchronized (cabinet){
    2. user.useCabinet();
    3. System.out.println("我是用户"+storeNumber+",我存储的数字是:"+cabinet.getStoreNumber());
    4. }

            我们再去运行一下:

    1. 我是用户1,我存储的数字是:1
    2. 我是用户2,我存储的数字是:2
    3. 我是用户0,我存储的数字是:0

            由于我们加了synchronized块,保证了存储和取出的原子性,这样用户存储的数字和取出的数字就对应上了,不会造成混乱。

            最后我们通过一张图说明一下上面的整体情况:

             如上图所示,线程A,线程B,线程C同时调用cabinet类的setStoreNumber方法,线程B获得了锁,所以线程B可以执行setStoreNumber的方法,线程A和线程C只能等待。

    2、Java中单体应用锁的局限性&分布式锁

            前面内容中讲到的锁都是由JDK官方提供的锁的解决方案,也就是说这些锁只能在一个JVM进程内有效,我们把这种锁叫做单体应用锁。但是,在互联网告诉发展的今天,单体应用锁能够满足我们的需求吗?

    互联网系统架构的演进

            在互联网系统发展之初,系统比较简单,消耗资源小,用户访问量也比较少,我们只部署一个Tomcat应用就可以满足需求。系统架构图如下:

             一个Tomcat可以看做是一个JVM进程,当大量的请求并发到达系统时,所有的请求都落在这唯一的一个Tomcat上,如果某些请求方法是需要加锁的,比如:秒杀扣减库存,是可以满足需求的,这和我们前面章节所讲的内容是一样的。但是随着访问量的增加,导致一个Tomcat难以支撑,这时我们就要集群部署Tomcat,使用多个Tomcat共同支撑整个系统。系统架构图如下:

             上图中,我们部署了两个Tomcat,共同支撑系统。当一个请求到达系统时,首先会经过Nginx,Nginx主要是做负载转发的,它会根据自己配置的负载均衡策略将请求转发到其中一个Tomcat中。当大量的请求并发访问时,两个Tomcat共同承担所有的访问量,这时,我们同样在秒杀扣减库存的场景中,使用单体应用锁还能满足要求吗?

    单体应用锁的局限性

            如上图所示,在整个系统架构中,存在两个Tomcat,每个Tomcat是一个JVM。在进行秒杀业务的时候,由于大家都在抢购秒杀商品,大量的请求同时到达系统,通过Nginx分发到两个Tomcat上。我们通过一个极端的案例场景,可以更好地理解单体应用的局限性。假如,秒杀商品的数量只有1个,这时,这些大量的请求当中,只有一个请求可以成功的抢到这个商品,这就需要在扣减库存的方法上加锁,扣减库存的动作只能一个一个去执行,而不能同时去执行,如果同时执行,这1个商品可能同时被多个人抢到,从而产生超卖现象。加锁之后,扣减库存的动作一个一个去执行,凡是将库存扣减为负数的,都抛出异常,提示该用户没有抢到商品。通过加锁看似解决了秒杀的问题,但是事实真的是这样吗?

            我们看到系统中存在两个Tomcat,我们加的锁是JDK官方提供的锁,这种锁只能在一个JVM下起作用,也就是在一个Tomcat内是没问题的。当存在两个或两个以上的Tomcat时,大量的并发请求分散到不同的Tomcat上,在每一个Tomcat中都可以防止并发的产生,但是在多个Tomcat之间,每个Tomcat中获得的这个请求,又产生了并发,从而产生超卖现象。这也就是单体应用锁的局限性,它只能在一个JVM内加锁,而不能从这个应用层面去加锁。

            那么这个问题如何解决呢?这就需要使用分布式锁了,在整个应用层面去加锁。什么是分布式锁呢?我们怎么去使用分布式锁呢?

    什么是分布式锁

            在说分布式锁之前,我们看一下单体应用锁的特点,单体应用锁是在一个JVM进程内有效,无法跨JVM、跨进程。那么分布式锁的定义就出来了,分布式锁就是可以跨越多个JVM、跨越多个进程的锁,这种锁就叫做分布式锁。

    分布式锁的设计思路

             在上图中,由于Tomcat是由Java启动的,所以每个Tomcat可以看出一个JVM,JVM内部的锁是无法跨越多个进程的。所以,我们要实现分布式锁,我们只能在这些JVM之外去寻找,通过其他的组件来实现分布式锁。系统架构如下图所示:

             两个Tomcat通过第三方的组件实现跨JVM、跨进程的分布式锁。这就是分布式锁的解决思路,找到所有JVM可以共同访问的第三方组件,通过第三方组件实现分布式锁。

    目前存在的分布式的解决方案

            分布式锁都是通过第三方组件来实现的,目前比较流行的分布式锁的解决方案有:

    • 数据库,通过数据库可以实现分布式分布式锁,但是在高并发的情况下对数据库压力比较大,所以很少使用。
    • Redis,借助Redis也可以实现分布式锁,而且Redis的Java客户端种类很多,使用的方法也不尽相同。
    • Zookeeper,Zookeeper也可以实现分布式锁,同样Zookeeper也存在多个Java客户端,使用方法也不相同。

    3、Java中锁的解决方案

    乐观锁与悲观锁

            乐观锁与悲观锁应该是每个开发人员最先接触的两种锁。小编最早接触的就是这两种锁,但是不是在Java中接触的,而是在数据库当中。当时的应用场景主要是在更新数据的时候,更新数据这个场景也是使用锁的非常主要的场景之一。更新数据的主要流程如下:

    1. 检索出要更新的数据,供操作人员查看;
    2. 操作人员更改需要修改的数据;
    3. 点击保存,更新数据。

            这个流程看似简单,但是我们用多线程的思维去思考,这也应该算是一种互联网思维吧,就会发现其中隐藏着问题。我们具体看一下:

    1. A检测出数据;
    2. B检测出数据;
    3. B修改了数据;
    4. A修改数据,系统会修改成功吗?

            当然啦,A修改成功与否,要看程序怎么写。咱们抛开程序,从常理考虑,A保存数据的时候,系统要给提示,说“您修改的数据已被其他人修改过,请重新查询确认”。那么我们程序中要怎么实现呢?

    1. 在检索数据时,将数据的版本号(version)或者最后更新时间一并检索出来;
    2. 操作员更改数据以后,点击保存,在书库执行update操作;
    3. 执行update操作时,用步骤1检索出来的版本号或者最后的更新时间与数据库中的记录作比较;
    4. 如果版本号或者最后更新时间一致,则可以更新;
    5. 如果不一致,就要给出上面的提示;

            上述的流程就是乐观锁的实现方式。在Java中乐观锁并没有确定的方法,或者关键字,他只是一个处理流程、策略。咱们看懂上面的例子之后,再来看看Java中乐观锁。

            乐观锁,它是假设一个线程在获取数据的时候不会被其他线程更改数据,就像上面的例子那样,但是在更新数据的时候会校验数据有没有被修改过。它是一种比较交换的机制,简称CAS(Compare And Swap)机制。一旦检测到有冲突产生,也就是上面说到的版本号或者最后更新时间不一致,它是会进行重试,直到没有冲突为止。

    乐观锁的机制如图所示:

             咱们看一下Java中最常见的i++,咱们思考一个问题,i++它的执行顺序是什么样子的?它是线程安全的吗?当多个线程并发执行i++的时候,会不会有问题?接下来咱们通过程序看一下:

    1. public class Test {
    2. private int i=0;
    3. public static void main(String[] args) {
    4. Test test = new Test();
    5. //线程池:50个线程
    6. ExecutorService es = Executors.newFixedThreadPool(50);
    7. //闭锁
    8. CountDownLatch cdl = new CountDownLatch(5000);
    9. for (int i = 0;i < 5000; i++){
    10. es.execute(()->{
    11. test.i++;
    12. cdl.countDown();
    13. });
    14. }
    15. es.shutdown();
    16. try {
    17. //等待5000个任务执行完成后,打印出执行结果
    18. cdl.await();
    19. System.out.println("执行完成后,i="+test.i);
    20. } catch (InterruptedException e) {
    21. e.printStackTrace();
    22. }
    23. }
    24. }

            上面的程序中,我们模拟了50个线程同时执行i++,总共执行5000次,按照常规的理解,得到的结果应该是5000,我们运行一下程序,看看之心结果如何:

    1. 执行完成后,i=4975
    2. 执行完成后,i=4986
    3. 执行完成后,i=4971

            这时我们运行3次以后得到的结果,可以看到每次执行的结果都不一样,而且不是5000,这是为什么?这就说明i++并不是一个原子性的操作,在多线程的情况下并不安全。我们把i++的详细执行步骤拆解一下:

    1. 从内存中取出i的当前值;
    2. 将i的值加1;
    3. 将计算好的值放入到内存当中;

            这个流程和我们上面讲解的数据库的操作流程是一样的。在多线程的场景下,我们可以想象一下,线程A和线程B同时从内存中取出i的值,假如i的值是1000,然后线程A和线程B再同时执行+1的操作,然后把值再放入内存当中,这时,内存中的值是1001,而我们期望的值是1002,正是这个原因,导致了上面的错误。那么我们如何解决呢?在Java1.5以后,JDK官方提供了大量的原子类,这些类的内部都是基于CAS机制的,也就是使用了乐观锁。我们将上面的程序稍微改造一下,如下:

    1. public class Test {
    2. private AtomicInteger i = new AtomicInteger(0);
    3. public static void main(String[] args) {
    4. Test test = new Test();
    5. ExecutorService es = Executors.newFixedThreadPool(50);
    6. CountDownLatch cdl = new CountDownLatch(5000);
    7. for (int i = 0;i < 5000; i++){
    8. es.execute(()->{
    9. test.i.incrementAndGet();
    10. cdl.countDown();
    11. });
    12. }
    13. es.shutdown();
    14. try {
    15. cdl.await();
    16. System.out.println("执行完成后,i="+test.i);
    17. } catch (InterruptedException e) {
    18. e.printStackTrace();
    19. }
    20. }
    21. }

            我们将变量i的类型改为AtomicInteger,AtomicInteger是一个原子类。我们在之前调用i++的地方改成了i.incrementAndGet(),incrementAndGet()方法采用了CAS机制,也就是说使用了乐观锁。我们再运行一下程序,看看结果如何。

    1. 执行完成后,i=5000
    2. 执行完成后,i=5000
    3. 执行完成后,i=5000

            我们同样执行了3次,3次的结果都是5000,符合了我们预期。这个就是乐观锁。我们对乐观锁稍加总结,乐观锁在读取数据的时不做任何限制,而是在更新数据的时候,进行数据的比较,保证数据的版本一致时再更新数据。根据他的这个特点,可以看出乐观锁适用于读操作多,而写操作少的场景。

            悲观锁与乐观锁恰恰相反,悲观锁从读取数据的时候就显式的加锁,直到数据更新完成,释放锁为止。在这期间只能有一个线程去操作,其他的线程只能等待。在java中,悲观锁可以使用synchronized关键字或者ReentrantLock类来实现。还是上面的例子,我们分别使用这两种方式来实现一下。首先是使用synchronized关键字来实现:

    1. public class Test {
    2. private int i=0;
    3. public static void main(String[] args) {
    4. Test test = new Test();
    5. ExecutorService es = Executors.newFixedThreadPool(50);
    6. CountDownLatch cdl = new CountDownLatch(5000);
    7. for (int i = 0;i < 5000; i++){
    8. es.execute(()->{
    9. //修改部分 开始
    10. synchronized (test){
    11. test.i++;
    12. }
    13. //修改部分 结束
    14. cdl.countDown();
    15. });
    16. }
    17. es.shutdown();
    18. try {
    19. cdl.await();
    20. System.out.println("执行完成后,i="+test.i);
    21. } catch (InterruptedException e) {
    22. e.printStackTrace();
    23. }
    24. }
    25. }

            我们唯一的改动就是增加了synchronized块,它锁住的对象是test,在所有线程中,谁获得了test对象的锁,谁才能执行i++操作。我们使用了synchronized悲观锁的方式,使得i++线程安全。我们运行一下,看看结果如何。

    1. 执行完成后,i=5000
    2. 执行完成后,i=5000
    3. 执行完成后,i=5000

            我们运行3次,结构都是5000,符合预期。接下来,我们再使用ReentrantLock类来实现悲观锁。

    1. public class Test {
    2. //添加了ReentrantLock锁
    3. Lock lock = new ReentrantLock();
    4. private int i=0;
    5. public static void main(String[] args) {
    6. Test test = new Test();
    7. ExecutorService es = Executors.newFixedThreadPool(50);
    8. CountDownLatch cdl = new CountDownLatch(5000);
    9. for (int i = 0;i < 5000; i++){
    10. es.execute(()->{
    11. //修改部分 开始
    12. test.lock.lock();
    13. test.i++;
    14. test.lock.unlock();
    15. //修改部分 结束
    16. cdl.countDown();
    17. });
    18. }
    19. es.shutdown();
    20. try {
    21. cdl.await();
    22. System.out.println("执行完成后,i="+test.i);
    23. } catch (InterruptedException e) {
    24. e.printStackTrace();
    25. }
    26. }
    27. }

            我们在类中显式的增加了Lock lock = new ReentrantLock();,而且在i++之前增加了lock.lock(),加锁操作,在i++之后增加了lock.unlock()释放锁的操作。我们同样运行3次,看看结果。

    1. 执行完成后,i=5000
    2. 执行完成后,i=5000
    3. 执行完成后,i=5000

            3次运行结果都是5000,完全符合预期。我们再来总结一下悲观锁,悲观锁从读取数据的时候就加了锁,而且在更新数据的时候,保证只有一个线程在执行更新操作,没有像乐观锁那样进行数据版本的比较。所以悲观锁适用于读相对少,写相对多的操作。

    公平锁与非公平锁

            从名字不难看出,公平锁在多线程情况下,对待每个线程都是公平的;而非公平锁恰好与之相反。从字面上理解还是有些晦涩难懂,我们还是举例说明,场景还是去超市买东西,在储物柜存东西的例子。储物柜只有一个,同时来了3个人使用储物柜,这时A先抢到了柜子,A去使用,B和C自觉进行排队。A使用完以后,后面排队的第一个人将继续使用柜子,这就是公平锁。在公平锁当中,所有的线程都自觉排队,一个线程执行完以后,排在后面的线程继续使用。

            非公平锁则不然,A在使用柜子的时候,B和C并不会排队,A使用完以后,将柜子的钥匙往后一抛,B和C谁抢到了谁用,甚至可能突然跑来一个D,这个D抢到了钥匙,那么D将使用柜子,这个就是非公平锁。

    公平锁如图所示:

             多个线程同时执行方法,线程A抢到了锁,A可以执行方法。其他线程则在队列里进行排队,A执行完方法后,会从队列里取出下一个线程B,再去执行方法。以此类推,对于每一个线程来说都是公平的,不会存在后加入的线程先执行的情况。

    非公平锁如下图所示:

             多个线程同时执行方法,线程A抢到了锁,A可以执行方法。其他的线程并没有排队,A执行完方法,释放锁后,其他的线程谁抢到了锁,谁去执行方法。会存在后加入的线程,反而先抢到锁的情况。

            公平锁与非公平锁都在ReentrantLock类里给出了实现,我们来看一下ReentrantLock的源码:

    1. /**
    2. * Creates an instance of {@code ReentrantLock}.
    3. * This is equivalent to using {@code ReentrantLock(false)}.
    4. */
    5. public ReentrantLock() {
    6. sync = new NonfairSync();
    7. }
    8. /**
    9. * Creates an instance of {@code ReentrantLock} with the
    10. * given fairness policy.
    11. *
    12. * @param fair {@code true} if this lock should use a fair ordering policy
    13. */
    14. public ReentrantLock(boolean fair) {
    15. sync = fair ? new FairSync() : new NonfairSync();
    16. }

            ReentrantLock有两个构造方法,默认的构造方法中,sync = new NonfairSync();我们可以从字面意思看出它是一个非公平锁。再看看第二个构造方法,它需要传入一个参数,参数是一个布尔型,true是公平锁,false是非公平锁。从字面的源代码我们可以看出sync有两个实现类,分别是FairSync和NonfairSync,我们再看看获取锁的核心方法,收拾公平锁FairSync的,

    1. @ReservedStackAccess
    2. protected final boolean tryAcquire(int acquires) {
    3. final Thread current = Thread.currentThread();
    4. int c = getState();
    5. if (c == 0) {
    6. if (!hasQueuedPredecessors() &&
    7. compareAndSetState(0, acquires)) {
    8. setExclusiveOwnerThread(current);
    9. return true;
    10. }
    11. }
    12. else if (current == getExclusiveOwnerThread()) {
    13. int nextc = c + acquires;
    14. if (nextc < 0)
    15. throw new Error("Maximum lock count exceeded");
    16. setState(nextc);
    17. return true;
    18. }
    19. return false;
    20. }

    然后是非公平锁NonfairSync的,

    1. @ReservedStackAccess
    2. final boolean nonfairTryAcquire(int acquires) {
    3. final Thread current = Thread.currentThread();
    4. int c = getState();
    5. if (c == 0) {
    6. if (compareAndSetState(0, acquires)) {
    7. setExclusiveOwnerThread(current);
    8. return true;
    9. }
    10. }
    11. else if (current == getExclusiveOwnerThread()) {
    12. int nextc = c + acquires;
    13. if (nextc < 0) // overflow
    14. throw new Error("Maximum lock count exceeded");
    15. setState(nextc);
    16. return true;
    17. }
    18. return false;
    19. }

            通过对比两个方法,我们可以看出唯一不同之处在于!hasQueuedPredecessors()这个方法,很明显这个方法是一个队列,由此可以推断,公平锁是将所有的线程放在一个队列中,一个线程执行完成后,从队列中取出下一个线程,而非公平锁则没有这个队列。这些都是公平锁与非公平锁底层的实现原理,我们在使用的时候不用追到这么深层次的代码,只需要了解公平锁与非公平锁的含义,并且在调用构造方法时,传入true和false即可。

       

    4、Redisson介绍

            Redis有很多Java客户端,我们比较常用的有Jedis,spring-data-redis,lettuce等。今天我们介绍一个非常好用的Redis的Java客户端——Redission。我们先看一下Redis官网中介绍的Java客户端列表:

             在这个列表中,我们可以看到Redission的后面有星,说明还是比较受欢迎的。再看看后面的简介,Redission是一个在Redis服务至上的,分布式、可扩展的Java数据结构。我们进入Redission的官网,看看官网是怎么介绍的。

             上面一段话看起来有点晦涩难懂,总结起来可以归结为以下几点:

    • Redission提供了使用Redis的最简单和最快捷的方法;
    • 开发人员不需要过分关注Redis,集中精力关注业务即可;
    • 基于Redis,提供了在Java中具有分布式特性的工具类;
    • 使Java中的并发工具包获得了协调多机多线程并发的能力;

    Redission特性

            上面我们对Redission有了一个整体的印象,接下来我们来看看它有哪些特点。

    支持Redis配置

            Redission支持多种Redis配置,无论你的Redis是单点、集群、主从还是哨兵模式,它都是支持的。只需要在Redission的配置文件中,增加相应的配置就可以了。

    支持的Java实体

            Redission支持多种Java实体,使其具有分布式的特性。我们比较常用的有:AtomicLong(原子Long)、AtomicDouble(原子Double)、PublishSubscribe(发布订阅)等。

    Java分布式锁与同步器

            Redission支持Java并发包中的多种锁,比如:Lock(可重入锁)、FairLock(公平锁)、MultiLock(联锁)、RedLock(红锁)、ReadWriteLock(读写锁)、Semaphore(信号量)、CountDownLatch(闭锁)等。我们注意到这些都是Java并发包中的类,Redission借助于Redis又重新实现了一套,使其具有分布式的特点。以后我们在使用Redission中的这些类的时候,可以跨进程跨JVM去使用。

    分布式Java集合

            Redission对Java的集合也进行了封装,使其具有分布式的特性。如:Map、Set、List、Queue、Deque、BlockingQueue等。以后我们就可以在分布式的环境中使用这些集合了。

    与Spring框架整合

            Redission可以与Spring大家族中的很多框架进行整合,其中包括:Spring基础框架、Spring Cache、Spring Session、Spring Data Redis、Spring Boot等。在项目中我们可以轻松的与这些框架整合,通过简单的配置就可以实现项目的需求。

    5、实战解决电商超卖问题

     

     

     

     

     

     

     

     

     

     

     

    Synchronized在方法上加锁,由于事务提交是交由spring管理,在锁释放后,该线程所在的事务有可能未提交,MySQL innodb的默认隔离级别是RR,这就会导致下一个事务并不能及时获取到更新后的值,从而导致超卖。解决方案是手动提交事务,并且提交事务的操作必须在锁的控制方法内。

    1. import com.lvxiaosha.distributeDemo.dao.OrderItemMapper;
    2. import com.lvxiaosha.distributeDemo.dao.OrderMapper;
    3. import com.lvxiaosha.distributeDemo.dao.ProductMapper;
    4. import com.lvxiaosha.distributeDemo.model.Order;
    5. import com.lvxiaosha.distributeDemo.model.OrderItem;
    6. import com.lvxiaosha.distributeDemo.model.Product;
    7. import lombok.extern.slf4j.Slf4j;
    8. import org.springframework.beans.factory.annotation.Autowired;
    9. import org.springframework.stereotype.Service;
    10. import org.springframework.transaction.PlatformTransactionManager;
    11. import org.springframework.transaction.TransactionDefinition;
    12. import org.springframework.transaction.TransactionStatus;
    13. import javax.annotation.Resource;
    14. import java.math.BigDecimal;
    15. import java.util.Date;
    16. import java.util.concurrent.locks.Lock;
    17. import java.util.concurrent.locks.ReentrantLock;
    18. @Service
    19. @Slf4j
    20. public class OrderService {
    21. @Resource
    22. private OrderMapper orderMapper;
    23. @Resource
    24. private OrderItemMapper orderItemMapper;
    25. @Resource
    26. private ProductMapper productMapper;
    27. //购买商品id
    28. private int purchaseProductId = 100100;
    29. //购买商品数量
    30. private int purchaseProductNum = 1;
    31. @Autowired
    32. private PlatformTransactionManager platformTransactionManager;
    33. @Autowired
    34. private TransactionDefinition transactionDefinition;
    35. private Lock lock = new ReentrantLock();
    36. // @Transactional(rollbackFor = Exception.class)
    37. public Integer createOrder() throws Exception{
    38. Product product = null;
    39. lock.lock();
    40. try {
    41. TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition);
    42. product = productMapper.selectByPrimaryKey(purchaseProductId);
    43. if (product==null){
    44. platformTransactionManager.rollback(transaction1);
    45. throw new Exception("购买商品:"+purchaseProductId+"不存在");
    46. }
    47. //商品当前库存
    48. Integer currentCount = product.getCount();
    49. System.out.println(Thread.currentThread().getName()+"库存数:"+currentCount);
    50. //校验库存
    51. if (purchaseProductNum > currentCount){
    52. platformTransactionManager.rollback(transaction1);
    53. throw
    54. new Exception("商品"+purchaseProductId+"仅剩"+currentCount+"件,无法购买");
    55. }
    56. productMapper.updateProductCount(purchaseProductNum,"xxx",new Date(),product.getId());
    57. platformTransactionManager.commit(transaction1);
    58. }finally {
    59. lock.unlock();
    60. }
    61. TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition);
    62. Order order = new Order();
    63. order.setOrderAmount(product.getPrice().multiply(new BigDecimal(purchaseProductNum)));
    64. order.setOrderStatus(1);//待处理
    65. order.setReceiverName("xxx");
    66. order.setReceiverMobile("13311112222");
    67. order.setCreateTime(new Date());
    68. order.setCreateUser("xxx");
    69. order.setUpdateTime(new Date());
    70. order.setUpdateUser("xxx");
    71. orderMapper.insertSelective(order);
    72. OrderItem orderItem = new OrderItem();
    73. orderItem.setOrderId(order.getId());
    74. orderItem.setProductId(product.getId());
    75. orderItem.setPurchasePrice(product.getPrice());
    76. orderItem.setPurchaseNum(purchaseProductNum);
    77. orderItem.setCreateUser("xxx");
    78. orderItem.setCreateTime(new Date());
    79. orderItem.setUpdateTime(new Date());
    80. orderItem.setUpdateUser("xxx");
    81. orderItemMapper.insertSelective(orderItem);
    82. platformTransactionManager.commit(transaction);
    83. return order.getId();
    84. }
    85. }

    数据库表设计:

    1. /*
    2. Navicat MySQL Data Transfer
    3. Source Server : 本地数据库
    4. Source Server Version : 80014
    5. Source Host : localhost:3306
    6. Source Database : distribute
    7. Target Server Type : MYSQL
    8. Target Server Version : 80014
    9. File Encoding : 65001
    10. Date: 2020-07-21 14:09:14
    11. */
    12. SET FOREIGN_KEY_CHECKS=0;
    13. -- ----------------------------
    14. -- Table structure for distribute_lock
    15. -- ----------------------------
    16. DROP TABLE IF EXISTS `distribute_lock`;
    17. CREATE TABLE `distribute_lock` (
    18. `id` int(11) NOT NULL AUTO_INCREMENT,
    19. `business_code` varchar(255) NOT NULL,
    20. `business_name` varchar(255) NOT NULL,
    21. PRIMARY KEY (`id`)
    22. ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
    23. -- ----------------------------
    24. -- Records of distribute_lock
    25. -- ----------------------------
    26. INSERT INTO `distribute_lock` VALUES ('1', 'demo', 'demo演示');
    27. -- ----------------------------
    28. -- Table structure for order
    29. -- ----------------------------
    30. DROP TABLE IF EXISTS `order`;
    31. CREATE TABLE `order` (
    32. `id` int(11) NOT NULL AUTO_INCREMENT,
    33. `order_status` int(1) NOT NULL DEFAULT '1' COMMENT '订单状态 1:待支付;',
    34. `receiver_name` varchar(255) NOT NULL COMMENT '收货人姓名',
    35. `receiver_mobile` varchar(11) NOT NULL COMMENT '收货人手机号',
    36. `order_amount` decimal(11,2) NOT NULL COMMENT '订单金额',
    37. `create_time` time NOT NULL COMMENT '创建时间',
    38. `create_user` varchar(255) NOT NULL COMMENT '创建人',
    39. `update_time` time NOT NULL COMMENT '更新时间',
    40. `update_user` varchar(255) NOT NULL COMMENT '更新人',
    41. PRIMARY KEY (`id`)
    42. ) ENGINE=InnoDB AUTO_INCREMENT=46 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
    43. -- ----------------------------
    44. -- Records of order
    45. -- ----------------------------
    46. INSERT INTO `order` VALUES ('35', '1', 'xxx', '13311112222', '5.00', '16:53:27', 'xxx', '16:53:27', 'xxx');
    47. INSERT INTO `order` VALUES ('36', '1', 'xxx', '13311112222', '5.00', '16:53:27', 'xxx', '16:53:27', 'xxx');
    48. INSERT INTO `order` VALUES ('37', '1', 'xxx', '13311112222', '5.00', '16:56:14', 'xxx', '16:56:14', 'xxx');
    49. INSERT INTO `order` VALUES ('38', '1', 'xxx', '13311112222', '5.00', '16:56:14', 'xxx', '16:56:14', 'xxx');
    50. INSERT INTO `order` VALUES ('39', '1', 'xxx', '13311112222', '5.00', '17:06:10', 'xxx', '17:06:10', 'xxx');
    51. INSERT INTO `order` VALUES ('40', '1', 'xxx', '13311112222', '5.00', '17:09:49', 'xxx', '17:09:49', 'xxx');
    52. INSERT INTO `order` VALUES ('41', '1', 'xxx', '13311112222', '5.00', '17:11:07', 'xxx', '17:11:07', 'xxx');
    53. INSERT INTO `order` VALUES ('42', '1', 'xxx', '13311112222', '5.00', '17:11:07', 'xxx', '17:11:07', 'xxx');
    54. INSERT INTO `order` VALUES ('43', '1', 'xxx', '13311112222', '5.00', '17:12:53', 'xxx', '17:12:53', 'xxx');
    55. INSERT INTO `order` VALUES ('44', '1', 'xxx', '13311112222', '5.00', '17:40:24', 'xxx', '17:40:24', 'xxx');
    56. INSERT INTO `order` VALUES ('45', '1', 'xxx', '13311112222', '5.00', '18:03:06', 'xxx', '18:03:06', 'xxx');
    57. -- ----------------------------
    58. -- Table structure for order_item
    59. -- ----------------------------
    60. DROP TABLE IF EXISTS `order_item`;
    61. CREATE TABLE `order_item` (
    62. `id` int(11) NOT NULL AUTO_INCREMENT,
    63. `order_id` int(11) NOT NULL COMMENT '订单id',
    64. `product_id` int(11) NOT NULL COMMENT '商品数量',
    65. `purchase_price` decimal(11,2) NOT NULL COMMENT '购买金额',
    66. `purchase_num` int(3) NOT NULL COMMENT '购买数量',
    67. `create_time` time NOT NULL COMMENT '创建时间',
    68. `create_user` varchar(255) NOT NULL COMMENT '创建人',
    69. `update_time` time NOT NULL COMMENT '更新时间',
    70. `update_user` varchar(255) NOT NULL COMMENT '更新人',
    71. PRIMARY KEY (`id`)
    72. ) ENGINE=InnoDB AUTO_INCREMENT=46 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
    73. -- ----------------------------
    74. -- Records of order_item
    75. -- ----------------------------
    76. INSERT INTO `order_item` VALUES ('35', '35', '100100', '5.00', '1', '16:53:27', 'xxx', '16:53:27', 'xxx');
    77. INSERT INTO `order_item` VALUES ('36', '36', '100100', '5.00', '1', '16:53:27', 'xxx', '16:53:27', 'xxx');
    78. INSERT INTO `order_item` VALUES ('37', '37', '100100', '5.00', '1', '16:56:14', 'xxx', '16:56:14', 'xxx');
    79. INSERT INTO `order_item` VALUES ('38', '38', '100100', '5.00', '1', '16:56:14', 'xxx', '16:56:14', 'xxx');
    80. INSERT INTO `order_item` VALUES ('39', '39', '100100', '5.00', '1', '17:06:10', 'xxx', '17:06:10', 'xxx');
    81. INSERT INTO `order_item` VALUES ('40', '40', '100100', '5.00', '1', '17:09:49', 'xxx', '17:09:49', 'xxx');
    82. INSERT INTO `order_item` VALUES ('41', '41', '100100', '5.00', '1', '17:11:07', 'xxx', '17:11:07', 'xxx');
    83. INSERT INTO `order_item` VALUES ('42', '42', '100100', '5.00', '1', '17:11:07', 'xxx', '17:11:07', 'xxx');
    84. INSERT INTO `order_item` VALUES ('43', '43', '100100', '5.00', '1', '17:12:53', 'xxx', '17:12:53', 'xxx');
    85. INSERT INTO `order_item` VALUES ('44', '44', '100100', '5.00', '1', '17:40:24', 'xxx', '17:40:24', 'xxx');
    86. INSERT INTO `order_item` VALUES ('45', '45', '100100', '5.00', '1', '18:03:06', 'xxx', '18:03:06', 'xxx');
    87. -- ----------------------------
    88. -- Table structure for product
    89. -- ----------------------------
    90. DROP TABLE IF EXISTS `product`;
    91. CREATE TABLE `product` (
    92. `id` int(11) NOT NULL AUTO_INCREMENT,
    93. `product_name` varchar(255) NOT NULL COMMENT '商品名称',
    94. `price` decimal(11,2) NOT NULL COMMENT '商品金额',
    95. `count` int(5) NOT NULL COMMENT '数量',
    96. `product_desc` varchar(255) NOT NULL COMMENT '商品描述',
    97. `create_time` time NOT NULL COMMENT '创建时间',
    98. `create_user` varchar(255) NOT NULL COMMENT '创建人',
    99. `update_time` time NOT NULL COMMENT '更新时间',
    100. `update_user` varchar(255) NOT NULL COMMENT '更新人',
    101. PRIMARY KEY (`id`)
    102. ) ENGINE=InnoDB AUTO_INCREMENT=100101 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
    103. -- ----------------------------
    104. -- Records of product
    105. -- ----------------------------
    106. INSERT INTO `product` VALUES ('100100', '测试商品', '5.00', '1', '测试商品', '11:01:57', 'xxx', '18:03:06', 'xxx');

     

            详细代码请见github地址:https://github.com/lvdapiaoliang/996-dev/tree/master/all-learning/DistributeLock/Distribute-demo

            

    6、基于数据库的分布式锁

     

     

    1. import com.lvxiaosha.distributeLock.dao.DistributeLockMapper;
    2. import com.lvxiaosha.distributeLock.model.DistributeLock;
    3. import com.lvxiaosha.distributeLock.model.DistributeLockExample;
    4. import lombok.extern.slf4j.Slf4j;
    5. import org.springframework.beans.factory.annotation.Autowired;
    6. import org.springframework.transaction.annotation.Transactional;
    7. import org.springframework.web.bind.annotation.RequestMapping;
    8. import org.springframework.web.bind.annotation.RestController;
    9. import javax.annotation.Resource;
    10. import java.util.concurrent.locks.Lock;
    11. import java.util.concurrent.locks.ReentrantLock;
    12. @RestController
    13. @Slf4j
    14. public class DemoController {
    15. @Resource
    16. private DistributeLockMapper distributeLockMapper;
    17. @RequestMapping("singleLock")
    18. @Transactional(rollbackFor = Exception.class)
    19. public String singleLock() throws Exception {
    20. log.info("我进入了方法!");
    21. DistributeLock distributeLock = distributeLockMapper.selectDistributeLock("demo");
    22. if (distributeLock==null) throw new Exception("分布式锁找不到");
    23. log.info("我进入了锁!");
    24. try {
    25. Thread.sleep(20000);
    26. } catch (InterruptedException e) {
    27. e.printStackTrace();
    28. }
    29. return "我已经执行完成!";
    30. }
    31. }
    DistributeLock selectDistributeLock(@Param("businessCode") String businessCode);
    1. <select id="selectDistributeLock" resultType="com.lvxiaosha.distributeLock.model.DistributeLock">
    2. select * from distribute_lock
    3. where business_code = #{businessCode,jdbcType=VARCHAR}
    4. for update
    5. </select>

    7、基于Redis的分布式锁

     

     

     

     

     

     Coding演示:

    • 启动redis

            Redis的安装配置请参考我的另外一篇文章:

     1.2 redis7.0.4安装与配置开机自启动_Iamlvxiaosha的博客-CSDN博客

    • 添加Maven依赖
    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-data-redis</artifactId>
    4. </dependency>
    • 在application.properties里面添加SpringBoot的Redis相关依赖

    spring.redis.host=192.168.110.130

    • 编写代码
    1. import com.lvxiaosha.distributeLock.lock.RedisLock;
    2. import com.lvxiaosha.distributeLock.lock.ZkLock;
    3. import lombok.extern.slf4j.Slf4j;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.data.redis.core.RedisTemplate;
    6. import org.springframework.web.bind.annotation.RequestMapping;
    7. import org.springframework.web.bind.annotation.RestController;
    8. @RestController
    9. @Slf4j
    10. public class RedisLockController {
    11. @Autowired
    12. private RedisTemplate redisTemplate;
    13. @RequestMapping("redisLock")
    14. public String redisLock(){
    15. log.info("我进入了方法!");
    16. try (RedisLock redisLock = new RedisLock(redisTemplate,"redisKey",30)){
    17. if (redisLock.getLock()) {
    18. log.info("我进入了锁!!");
    19. Thread.sleep(15000);
    20. }
    21. } catch (InterruptedException e) {
    22. e.printStackTrace();
    23. } catch (Exception e) {
    24. e.printStackTrace();
    25. }
    26. log.info("方法执行完成");
    27. return "方法执行完成";
    28. }
    29. }
    1. import lombok.extern.slf4j.Slf4j;
    2. import org.springframework.data.redis.connection.RedisStringCommands;
    3. import org.springframework.data.redis.core.RedisCallback;
    4. import org.springframework.data.redis.core.RedisTemplate;
    5. import org.springframework.data.redis.core.script.RedisScript;
    6. import org.springframework.data.redis.core.types.Expiration;
    7. import java.util.Arrays;
    8. import java.util.List;
    9. import java.util.UUID;
    10. @Slf4j
    11. public class RedisLock implements AutoCloseable {
    12. private RedisTemplate redisTemplate;
    13. private String key;
    14. private String value;
    15. //单位:秒
    16. private int expireTime;
    17. public RedisLock(RedisTemplate redisTemplate,String key,int expireTime){
    18. this.redisTemplate = redisTemplate;
    19. this.key = key;
    20. this.expireTime=expireTime;
    21. this.value = UUID.randomUUID().toString();
    22. }
    23. /**
    24. * 获取分布式锁
    25. * @return
    26. */
    27. public boolean getLock(){
    28. RedisCallback<Boolean> redisCallback = connection -> {
    29. //设置NX
    30. RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
    31. //设置过期时间
    32. Expiration expiration = Expiration.seconds(expireTime);
    33. //序列化key
    34. byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
    35. //序列化value
    36. byte[] redisValue = redisTemplate.getValueSerializer().serialize(value);
    37. //执行setnx操作
    38. Boolean result = connection.set(redisKey, redisValue, expiration, setOption);
    39. return result;
    40. };
    41. //获取分布式锁
    42. Boolean lock = (Boolean)redisTemplate.execute(redisCallback);
    43. return lock;
    44. }
    45. public boolean unLock() {
    46. String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
    47. " return redis.call(\"del\",KEYS[1])\n" +
    48. "else\n" +
    49. " return 0\n" +
    50. "end";
    51. RedisScript<Boolean> redisScript = RedisScript.of(script,Boolean.class);
    52. List<String> keys = Arrays.asList(key);
    53. Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value);
    54. log.info("释放锁的结果:"+result);
    55. return result;
    56. }
    57. @Override
    58. public void close() throws Exception {
    59. unLock();
    60. }
    61. }

    8、基于Zookeeper与curator的分布式锁

     Zookeeper的下载与安装请参考我的另外一篇文章:

    1、Kafka急速入门_Iamlvxiaosha的博客-CSDN博客

    运行Zookeeper,在Zookeeper里面创建lock节点:

     

    基于 zookeeper的瞬时有序节点 和 zookeeper的观察器实现

    • 持久节点:会话消失不会持久节点不会消失

    • 瞬时节点: 连接zookeeper会话中断或者zookeeer关闭,那么瞬时节点就会消失;

      • 瞬时节点不能有子节点

      • 瞬时节点是有序的: 名称可排序

    • 观察器: 监测节点发生的变化,并通知客户端

      • 可以检测 getData()获取数据方法, getChildren()获取子节点方法,exists()判断当前节点是否存在

      • 节点数据发生变化,发送给客户端

      • 观察器只能监控一次,再监控需要重新设置(新版zookeeper可以解决这个缺点)

    实现原理:

    • Zookeeper不像Redis那样,并发请求进来的时候会形成队列,zookeeper会并发的创建瞬时节点,这些节点可能是在同一时间创建的,但是这个节点会是有序的

    • 这时我们可以规定,序号最小的线程获得 锁,其他瞬时节点的线程处于等待状态

    • 其他线程监听自己序号的前一个序号,

    • 当前一个线程执行完成,则删除自己序号节点,这时观察器会监测到节点发生变化,那么就会发送通知,则下一个线程就获得锁继续执行

    1. <dependency>
    2. <groupId>org.apache.zookeeper</groupId>
    3. <artifactId>zookeeper</artifactId>
    4. <version>3.8.0</version>
    5. </dependency>
    6. <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
    7. <dependency>
    8. <groupId>org.apache.curator</groupId>
    9. <artifactId>curator-recipes</artifactId>
    10. <version>5.3.0</version>
    11. </dependency>

    1. import lombok.extern.slf4j.Slf4j;
    2. import org.apache.zookeeper.*;
    3. import org.apache.zookeeper.data.Stat;
    4. import java.io.IOException;
    5. import java.util.Collections;
    6. import java.util.List;
    7. @Slf4j
    8. public class ZkLock implements AutoCloseable, Watcher {
    9. private ZooKeeper zooKeeper;
    10. private String znode;
    11. public ZkLock() throws IOException {
    12. this.zooKeeper = new ZooKeeper("http://192.168.110.1130:2181",
    13. 10000,this);
    14. }
    15. public boolean getLock(String businessCode) {
    16. try {
    17. //创建业务 根节点
    18. Stat stat = zooKeeper.exists("/" + businessCode, false);
    19. if (stat==null){
    20. zooKeeper.create("/" + businessCode,businessCode.getBytes(),
    21. ZooDefs.Ids.OPEN_ACL_UNSAFE,
    22. CreateMode.PERSISTENT);
    23. }
    24. //创建瞬时有序节点 /order/order_00000001
    25. znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),
    26. ZooDefs.Ids.OPEN_ACL_UNSAFE,
    27. CreateMode.EPHEMERAL_SEQUENTIAL);
    28. //获取业务节点下 所有的子节点
    29. List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false);
    30. //子节点排序
    31. Collections.sort(childrenNodes);
    32. //获取序号最小的(第一个)子节点
    33. String firstNode = childrenNodes.get(0);
    34. //如果创建的节点是第一个子节点,则获得锁
    35. if (znode.endsWith(firstNode)){
    36. return true;
    37. }
    38. //不是第一个子节点,则监听前一个节点
    39. String lastNode = firstNode;
    40. for (String node:childrenNodes){
    41. if (znode.endsWith(node)){
    42. zooKeeper.exists("/"+businessCode+"/"+lastNode,true);
    43. break;
    44. }else {
    45. lastNode = node;
    46. }
    47. }
    48. synchronized (this){
    49. wait();
    50. }
    51. return true;
    52. } catch (Exception e) {
    53. e.printStackTrace();
    54. }
    55. return false;
    56. }
    57. @Override
    58. public void close() throws Exception {
    59. zooKeeper.delete(znode,-1);
    60. zooKeeper.close();
    61. log.info("我已经释放了锁!");
    62. }
    63. @Override
    64. public void process(WatchedEvent event) {
    65. if (event.getType() == Event.EventType.NodeDeleted){
    66. synchronized (this){
    67. notify();
    68. }
    69. }
    70. }
    71. }
    1. import com.lvxiaosha.distributezklock.lock.ZkLock;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.apache.curator.framework.CuratorFramework;
    4. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    5. import org.springframework.beans.factory.annotation.Autowired;
    6. import org.springframework.web.bind.annotation.RequestMapping;
    7. import org.springframework.web.bind.annotation.RestController;
    8. import java.io.IOException;
    9. import java.util.concurrent.TimeUnit;
    10. @RestController
    11. @Slf4j
    12. public class ZookeeperController {
    13. @Autowired
    14. private CuratorFramework client;
    15. @RequestMapping("zkLock")
    16. public String zookeeperLock(){
    17. log.info("我进入了方法!");
    18. try (ZkLock zkLock = new ZkLock()) {
    19. if (zkLock.getLock("order")){
    20. log.info("我获得了锁");
    21. Thread.sleep(10000);
    22. }
    23. } catch (IOException e) {
    24. e.printStackTrace();
    25. } catch (Exception e) {
    26. e.printStackTrace();
    27. }
    28. log.info("方法执行完成!");
    29. return "方法执行完成!";
    30. }
    31. @RequestMapping("curatorLock")
    32. public String curatorLock(){
    33. log.info("我进入了方法!");
    34. InterProcessMutex lock = new InterProcessMutex(client, "/order");
    35. try{
    36. if (lock.acquire(30, TimeUnit.SECONDS)){
    37. log.info("我获得了锁!!");
    38. Thread.sleep(10000);
    39. }
    40. } catch (IOException e) {
    41. e.printStackTrace();
    42. } catch (Exception e) {
    43. e.printStackTrace();
    44. }finally {
    45. try {
    46. log.info("我释放了锁!!");
    47. lock.release();
    48. } catch (Exception e) {
    49. e.printStackTrace();
    50. }
    51. }
    52. log.info("方法执行完成!");
    53. return "方法执行完成!";
    54. }
    55. }
    1. import com.lvxiaosha.distributezklock.lock.ZkLock;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.apache.curator.RetryPolicy;
    4. import org.apache.curator.framework.CuratorFramework;
    5. import org.apache.curator.framework.CuratorFrameworkFactory;
    6. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    7. import org.apache.curator.retry.ExponentialBackoffRetry;
    8. import org.junit.Test;
    9. import org.junit.runner.RunWith;
    10. import org.springframework.boot.test.context.SpringBootTest;
    11. import org.springframework.test.context.junit4.SpringRunner;
    12. import java.io.IOException;
    13. import java.util.concurrent.TimeUnit;
    14. @RunWith(SpringRunner.class)
    15. @SpringBootTest
    16. @Slf4j
    17. public class DistributeZkLockApplicationTests {
    18. @Test
    19. public void contextLoads() {
    20. }
    21. @Test
    22. public void testZkLock() throws Exception {
    23. ZkLock zkLock = new ZkLock();
    24. boolean lock = zkLock.getLock("order");
    25. log.info("获得锁的结果:"+lock);
    26. zkLock.close();
    27. }
    28. @Test
    29. public void testCuratorLock(){
    30. RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    31. CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
    32. client.start();
    33. InterProcessMutex lock = new InterProcessMutex(client, "/order");
    34. try {
    35. if ( lock.acquire(30, TimeUnit.SECONDS) ) {
    36. try {
    37. log.info("我获得了锁!!!");
    38. }
    39. finally {
    40. lock.release();
    41. }
    42. }
    43. } catch (Exception e) {
    44. e.printStackTrace();
    45. }
    46. client.close();
    47. }
    48. }

    注:比较推荐curator的分布式锁实现方法,实现更简单。

    9、基于Redisson实现分布式锁

    maven依赖引入:

    1. <dependency>
    2. <groupId>org.redisson</groupId>
    3. <artifactId>redisson-spring-boot-starter</artifactId>
    4. <version>3.17.6</version>
    5. </dependency>

    修改application.properties配置文件,添加Redis配置:

    spring.redis.host=192.168.110.130

    创建RedissonLockController:

    1. import lombok.extern.slf4j.Slf4j;
    2. import org.redisson.Redisson;
    3. import org.redisson.api.RLock;
    4. import org.redisson.api.RedissonClient;
    5. import org.redisson.config.Config;
    6. import org.springframework.beans.factory.annotation.Autowired;
    7. import org.springframework.web.bind.annotation.RequestMapping;
    8. import org.springframework.web.bind.annotation.RestController;
    9. import java.util.concurrent.TimeUnit;
    10. @RestController
    11. @Slf4j
    12. public class RedissonLockController {
    13. @Autowired
    14. private RedissonClient redisson;
    15. @RequestMapping("redissonLock")
    16. public String redissonLock() {
    17. RLock rLock = redisson.getLock("order");
    18. log.info("我进入了方法!!");
    19. try {
    20. rLock.lock(30, TimeUnit.SECONDS);
    21. log.info("我获得了锁!!!");
    22. Thread.sleep(10000);
    23. } catch (InterruptedException e) {
    24. e.printStackTrace();
    25. }finally {
    26. log.info("我释放了锁!!");
    27. rLock.unlock();
    28. }
    29. log.info("方法执行完成!!");
    30. return "方法执行完成!!";
    31. }
    32. }

  • 相关阅读:
    大学生静态HTML鲜花网页设计作品 DIV布局网上鲜花介绍网页模板代码 DW花店网站制作成品 web网页制作与实现
    Linux内核分析与应用2-内存寻址
    ACM-概率题(其一)
    可执行文件的装载与进程
    计算机网络初识
    【元宇宙】5个视角,重新看待区块链和元宇宙
    Javascript基础
    MySQL性能调优,必须掌握这一个工具!!!(1分钟系列)
    项目十结构体与共用体的基本应用
    JVM之垃圾回收
  • 原文地址:https://blog.csdn.net/Xx13624558575/article/details/126802036