• 【并发编程】同步容器与并发容器


    1.同步容器类

    (1)为什么会出现同步容器

    • Java集合框架中,主要有四大类别:List、Set、Queue、Map。
    • List、Set、Queue接口分别继承了Collection接口,Map本身是一个接口。
    • 注意Collection和Map是一个顶层接口,而List、Set、Queue则继承了Collection接口,分别代表数组、集合和对列这三大容器。
    • 像ArrayList、LinkedList都是实现List接口,HashSet实现了Set接口,而Deque继承了Queue接口,PriorityQueue实现了Queue接口。另外LinkedList实现了Deque接口。
    • 像ArrayList、LinkedList、HashMap这些容器都是非线程安全的。如果有多个线程并发地访问这些容器时,就会出现问题。
    • 因此,在编写程序时,必须要求程序员手动地在任何访问到这些容器的地方进行同步处理,这样导致在使用这些容器的时候非常地不方便。
    • 所以,Java提供了同步容器供用户使用。

    (2)Java中的同步容器类

    • Vector、Stack、HashTable
    • Collections类中提供的静态工厂方法创建的类
    • Vector实现了List接口,Vector实际上就是一个数组,和ArrayList类似,但是Vector中的方法都是synchronized方法,即进行了同步措施。
    • Stack也是一个同步容器,它的方法也是用synchronzied进行了同步,继承Vector类。
    • HashTable实现了Map接口,他和HashMap相似,但是HashTable进行了同步处理,而HashMap没有。
    • Collections类是一个工具提供类,注意,它和Collection不同,Collection是一个顶层的接口。在Collections类中提供了大量的方法,比如对集合或者容器进行排序、查找等操作。最重要的是,在它里面提供了几个静态工厂方法来创建同步容器类

    在这里插入图片描述

    (3)同步容器的缺陷

    • 1.性能问题:传统的非同步容器和同步容器的性能差异,我们以ArrayList和Vector为例
    public class Demo1 {
        public static void main(String[] args) {
            ArrayList<Integer> arrayList = new ArrayList<>();
            Vector<Integer> vector = new Vector<>();
    
            long startTime = System.currentTimeMillis();
            for (int i = 0; i < 100000; i++) {
                vector.add(i);
            }
            long endTime = System.currentTimeMillis();
            System.out.println("vector循环10万次添加元素消耗的时间:"+(endTime-startTime)+"ms");
    
            long startTime1 = System.currentTimeMillis();
            for (int i = 0; i < 100000; i++) {
                arrayList.add(i);
            }
            long endTime1 = System.currentTimeMillis();
            System.out.println("ArrayList循环10万次添加元素消耗的时间:"+(endTime1-startTime1)+"ms");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在这里插入图片描述

    • 进行同样多的插入操作,Vector的耗时是ArrayList的两倍。

      • 这只是其中的一方面性能问题上的反映。
      • 另外,由于Vector中的add方法和get方法都进行了同步,因此,在有多个线程进行访问时,如果多个线程都只是进行读取操作,那么每个时刻就只能有一个线程进行读取,其他线程便只能等待,这些线程必须竞争同一把锁。
      • 因此为了解决同步容器的性能问题,在Java 1.5中提供了并发容器,位于java.util.concurrent目录下。
    • 2.同步容器的安全性问题

      • Vector中的方法都进行了同步处理,那么一定就是线程安全的,事实上这可不一定。
    public class Demo2 {
        private static Vector<Integer> vector = new Vector<>();
    
        public static void main(String[] args) {
    
            while (true) {
                for (int i = 0; i < 10; i++) {
                    vector.add(i);
                }
                Thread thread1 = new Thread(() -> {
    
                    for (int i = 0; i < vector.size(); i++) {
                        vector.remove(i);
                    }
                });
                Thread thread2 = new Thread(() -> {
    
                    for (int i = 0; i < vector.size(); i++) {
                        vector.get(i);
                    }
                });
                thread1.start();
                thread2.start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    在这里插入图片描述

    正如大家所看到的,这段代码报错了:数组下标越界。

    也许有朋友会问:Vector是线程安全的,为什么还会报这个错?很简单,对于Vector,虽然能保证每一个时刻只能有一个线程访问它,但是不排除这种可能:

    当某个线程在某个时刻执行这句时:

    for(int i= 0;i < vector.size();i++) vector.get(i);
    
    • 1

    假若此时vector的size方法返回的是10,i的值为9

    然后另外一个线程执行了这句:

    for(int i= 0;i < vector.size();i++) vector.remove(i);
    
    • 1

    将下标为9的元素删除了。

    那么通过get方法访问下标为9的元素肯定就会出问题了。

    因此为了保证线程安全,必须在方法调用端做额外的同步措施,如下面所示:

    public class Demo2 {
        private static Vector<Integer> vector = new Vector<>();
        private static Object object = new Object();
    
        public static void main(String[] args) {
    
            while (true) {
                for (int i = 0; i < 10; i++) {
                    vector.add(i);
                }
                Thread thread1 = new Thread(() -> {
    
                    synchronized (object) {
                        for (int i = 0; i < vector.size(); i++) {
                            vector.remove(i);
                        }
                    }
                });
                Thread thread2 = new Thread(() -> {
    
                    synchronized (object) {
                        for (int i = 0; i < vector.size(); i++) {
                            vector.get(i);
                        }
                    }
                });
                thread1.start();
                thread2.start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 3.ConcurrentModificationException异常
      • 在对Vector等容器并发地进行迭代修改时,会报ConcurrentModificationException异常,关于这个异常将会在后续文章中讲述。但是在并发容器中不会出现这个问题。
    2.并发容器类

    (1)ConcurrectHashMap

    在这里插入图片描述

    • HashMap、HashTable与ConcurrentHashMap都是实现的哈希表数据结构,在随机读取的时候效率很高。HashTable实现同步是利用synchronzied关键字进行锁定的,其实针对整张Hash表进行锁定,及每次锁住整张表让线程独占,在线程安全的背后是巨大的浪费。
    • ConcurrentHashMap和HashTable主要的区别就在于围绕这锁的粒度进行区别以及如何区锁定。
    • 上图中,左边是HashTable的实现方式,可以看到锁住的是整张哈希表,而右边是ConcurrectHashMap的实现方式,单独锁住每一个桶(segment),ConcurrentHashMap将哈希表分为16个桶(默认值),诸如get()、put()、remove()等常用操作之锁定当前需要的桶,而size()才锁定整张表。原来只能一个线程进入,现在却能同时接受16个写线程并发进入(写线程需要锁定,而读线程几乎不受限制),并发性的提升是显而易见的。
    • 而在迭代时,ConcurrentHashMap使用了不同于传统集合的快速失败迭代器(fast-fail iterator)的另外一种迭代方式,称为弱一致迭代器。在这种迭代方式中,当iterator被创建后集合在发生改变就不再是抛出ConcurrentModificationException,取而代之的是在改变时实例化出新的数据从而不影响原有的数据,iterator完成后在将头指针替换成为新的数据,这样iterator线程可以使用原来老的数据,而写线程也可以并发的完成改变,更重要的,这保证了多个线程并发执行的连续性和扩展性,是性能提升的关键。
    public class Demo3 {
        public static void main(String[] args) {
            //Map map = new ConcurrentHashMap<>();  //84ms
            //Map map = new ConcurrentSkipListMap<>();  //108ms
            //Map map = new Hashtable<>(); //92ms
            Map<String,String> map = new HashMap<>(); //139ms
    
            Random random = new Random();
    
            Thread[] threads = new Thread[100];
    
            CountDownLatch latch = new CountDownLatch(threads.length);
            long startTime = System.currentTimeMillis();
            for (int i = 0; i < threads.length; i++) {
                threads[i] = new Thread(()->{
                    for (int j = 0; j < 10000; j++) {
                        map.put(random.nextInt(1000000)+"",random.nextInt()+"");
                        latch.countDown();
                    }
                });
            }
    
            Arrays.asList(threads).forEach(thread -> thread.start());
    
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            long endTime = System.currentTimeMillis();
            System.out.println(endTime-startTime+"ms");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 运行结果:
    ConcurrentHashMap:84ms
    ConcurrentSkipListMap:108ms
    Hashtable:92ms
    HashMap:139ms
    
    • 1
    • 2
    • 3
    • 4
    • 启动100个线程,向每个容器中添加100000个元素,最终发现,ConcurrentHashMap要比HashMap效率高,ConcurrentHashMap是将大锁分成若干小锁,实现多个线程共同运行,锁以效率有很大差距。ConcurrentSkipListMap较ConcurrentHashMap除了实现高并发外还能对元素进行排序。

    (2)ConcurrentQueue

    • 与ConcurrentHashMap相同,ConcurrentQueue也是通过同样的方式来提高并发性能的。
    • 同步容器中提到过火车票问题:
      • 有N张火车票,每张车票都有一个编号,同时有10个窗口对外售票。
    • 使用ConcurrentQueue进一步提高并发性:
    public class Demo4 {
        private static Queue<String> queues = new ConcurrentLinkedDeque<>();
    
        static {
            for (int i = 0; i < 10000; i++) {
                queues.add("票编号:"+i);
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            long start = System.currentTimeMillis();
            for (int i = 0; i < 10; i++) {
                new Thread(()->{
                    while(true){
                        String s = queues.poll();
                        if (s ==null) break;
                        else System.out.println("销售了---"+s);
                    }
                }).start();
            }
            long end = System.currentTimeMillis();
            Thread.sleep(3000L);
            System.out.println("总耗时:"+(end-start)+"ms");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    在这里插入图片描述

    • 常用的API
    Queue<String> strings = new ConcurrentLinkedQueue<String>();
    
    strings.offer(元素) //相当于add,放进队列
    
    strings.size() //获取当前队列的元素个数
        
    strings.poll() //取出并移除
        
    strings.peek() //取出不会移除,相当于get();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    (3)CopyOnWriteArrayList

    • 写时复制容器,即copy-on-write,多线程环境下,写时效率低,读时效率高,适合写少读多的环境。
    public class Demo5 implements Runnable{
        private static List<String> lists = new ArrayList<>();
        //private static List lists = new Vector<>();
        //private static List lists = new CopyOnWriteArrayList<>();
        private Random random = new Random();
    
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                lists.add(random.nextInt()+"");
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            long start = System.currentTimeMillis();
            System.out.println("线程开始操作");
            for (int i = 0; i < 10; i++) {
                new Thread(new Demo5()).start();
            }
            for (int i = 0; i < 100; i++) {
                int finalI = i;
                new Thread(()->{
                    for (int j = 0; j < lists.size(); j++) {
                        lists.get(finalI);
                    }
                }).start();
            }
            long end = System.currentTimeMillis();
            Thread.sleep(6000L);
            System.out.println("耗时:"+(end-start)+"ms");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 运行结果:
    ArrayList:报错:Exception in thread "Thread-1" java.lang.ArrayIndexOutOfBoundsException: 244
    Vector:117ms
    CopyOnWriteArrayList:222ms
    
    • 1
    • 2
    • 3
    • 从JDK5开始Java并发包里面提供了两个使用CopyOnWrite机制实现的并发容器,它们是CopyOnWriteArrayList和CopyOnWriteArraySet。
    • 当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后向新的容器添加元素,添加完成元素后,再将原来的容器的引用指向新的容器。这样做的好处就是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为在当前读的容器不会添加任何元素。所以CopyOnWrite容器是一种读写分离的思想,读和写写对应不同的容器。

    (4)BlockingQueue

    • 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。
    • 阻塞对列,顾名思义,首先他是一个队列,一个队列在数据结构当中起到的作用大致如下:

    在这里插入图片描述

    • 队列可以使得数据由队列的一端输入,从另一端输出。
    • 先进先出(FIFO):先插入的队列的元素也是最先出队列,这种队列体现了一种公平性。
    • 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。

    在这里插入图片描述

    (5)LinkedBlockingQueue

    • 这中并发容器,会自动实现阻塞式的生产者/消费者模式。使用队列解耦合,在实现异步事物的时候很有用。
    • 案例
    public class Demo6 {
    
        //实例化时指定容器容量
        private static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1000);
    
        public static void main(String[] args) {
            for (int i = 0; i < 100; i++) {
                int finalI = i;
                new Thread(() -> {
                    for (int j = 0; j < 1000; j++) {
                        try {
                            //向对列中添加元素,如果对列满了 就等待1s在进行添加
                            boolean b = linkedBlockingQueue.offer(finalI + "", 1, TimeUnit.SECONDS);
                            if (b) {
                                System.out.println(finalI + "队列添加成功");
                            } else {
                                System.out.println(finalI + "队列添加失败,进入等待");
                            }
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            }
    
            for (int i = 0; i < 10; i++) {
                int finalI = i;
                new Thread(() -> {
                    for (int j = 0; j < 1000; j++) {
                        try {
                            //消费队列,如果为空就等待消费
                            String take = linkedBlockingQueue.take();
                            System.out.println("消费队列元素:" + take);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
    
                }).start();
            }
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    在这里插入图片描述

    • 常用API
    //实例化时指定容器容量
    private static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1000);
    
    linkedBlockingQueue.add(元素) //如果队列满了,再次添加就会抛出异常:java.lang.IllegalStateException: Queue full
    
    linkedBlockingQueue.offer(元素,时间,时间单位) //队列满了,等待时间后,再次添加,失败返回false
    
    linkedBlockingQueue.offer(元素) //队列满了,添加失败返回false,成功返回true
    
    linkedBlockingQueue.put(元素) //加入队列,如果满了就等待阻塞
    
    linkedBlockingQueue.take() //取出队列中的元素,如果空了,就会等待阻塞
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    (6)ArrayBlockingQueue

    • ArrayBlockingQueue和LinkedBlockingQueue对象的方法都是一样的,用法是一样的。
    • 二者的区别:
      • LinkedBlockingQueue是一个单向链表实现的阻塞队列,在链表一头加入元素,如果队列满了,就会阻塞,另一头取出元素,如果队列为空,就会阻塞。
      • LinkedBlockingQueue内部使用ReetrantLock实现插入锁(putLock)和取出锁(takeLock)。
      • ArrayBlockingQueue基于数组实现,成为有界队列,LinkedBlockingQueue认为是无界队列。当然LinkedBlockingQueue也可以指定队列容量。

    在这里插入图片描述

    (7)DelayQueue

    • DelayQueue也是一个BlockingQueue,用于放置实现了Delayed接口的对象,只能是实现了Delayed接口的对象,其中对象只能在其到期时才能从队列中取走。

    • Delayed扩展了Comparable接口,比较的基准为延时的时间,Delayed接口实现类getDelay()返回值为固定值(final),DelayedQueue内部是使用PriorityQueue实现的;即 (DelayQueue = BlockingQueue + PriorityQueue + Delayed

    • 可以说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准是时间。是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时能从队列中取走。这种队列是有序的,及队头对象的延迟到期时间最长。但是要注意不能将null元素放置到队列中。

    • Delayed,一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。此接口的实现类必须重写一个compareTo()方法,该方法提供于此接口的getDelay()方法一致的排序。

    • DelayQueue存储的对象是实现了Delayed接口的对象,在这个对象中,需要重写compareTo()和getDelay()方法。

    • 自定义MyTask类实现Delayed

    public class MyTask implements Delayed {
    
        private long time;
        private String name;
        private long start = System.currentTimeMillis();
    
        public MyTask(String name,long time) {
            this.time = time;
            this.name = name;
        }
    
        @Override
        public long getDelay(TimeUnit unit) {
            return (start+time) - System.currentTimeMillis();
        }
    
        @Override
        public int compareTo(Delayed o) {
            return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
        }
    
        @Override
        public String toString() {
            return "MyTask{" +
                    "time=" + time +
                    ", name='" + name + '\'' +
                    '}';
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 测试main
    public class Main {
        public static void main(String[] args) throws InterruptedException {
    
            DelayQueue <MyTask> myTasks = new DelayQueue<>();
    
            new Thread(()->{
                myTasks.offer(new MyTask("task1",10000));
                myTasks.offer(new MyTask("task2",4000));
                myTasks.offer(new MyTask("task3",4200));
                myTasks.offer(new MyTask("task4",6200));
                myTasks.offer(new MyTask("task5",9800));
            }).start();
            long start = System.currentTimeMillis();
            Thread.sleep(2000);
            System.out.println("队列中存放数据:");
            for (MyTask myTask : myTasks) {
                System.out.println(myTask);
            }
    
            System.out.println();
            System.out.println("队列中取出数据:");
            while(true){
                MyTask myTask = myTasks.take();
                System.out.println(myTask+":取出耗时:"+(System.currentTimeMillis()-start)+"ms");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    在这里插入图片描述

    • DelayQueue能做什么
      • 淘宝订单业务:下单之后如果30分钟之内没有付款就自动取消订单。
      • 饿了么定餐通知:下单成功后60s后给用户发短信。
      • 关闭空闲连接:服务器中,很多客户端的连接,空闲一段时间之后需要关闭。
      • 缓存:缓存中的对象,超过了空闲时间,需要从缓存中移出。
      • 任务超时处理:在网络协议滑动窗口请求应答交互时,处理超时未响应的请求等。

    (8)LinkedTransferQueue

    • TransferQueue是一个继承了BlockingQueue的接口,并且增加了若干新方法。
    • LinkedTransferQueue是TransferQueue接口的实现类,其定义一个无界的队列,具有先进先出(FIFO)的特性。
    • TransferQueue接口含有下面几个重要方法:
      • transfer(E e)
        • 若当前存在一个正在等待获取的消费者线程,即立刻移交之,否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素。
      • tryTransfer(E e)
        • 若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻转移\传输对象元素e;如不存在,则返回false,并且不进入队列。这是一个不阻塞的操作。
      • tryTransfer(E e,long timeout,TimeUnit nuit)
        • 若当前存在一个正在等待的消费者线程,会立即传输给它,否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉,若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除。
      • hasWaitingConsumer()
        • 判断是否由消费者线程。
      • getWaitingConsumerCount()
        • 获取所有等待获取元素的消费者线程数量。
      • size()
        • 因为队列的异步特性,检测当前队列的元素个数需要逐一迭代,无法保证原子性,可能会得到一个不太准确的结果,尤其是在遍历时有可能队列发生更改。
    • 消费者生产者案例
    • Producer
    public class Producer implements Runnable {
    
        private final TransferQueue<String> queue;
    
        //构造传入LinkedTransferQueue队列
        public Producer(TransferQueue<String> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                //生产者循环
                while (true){
                    //判断当前队列是否还有消费者,有的话就生产产品交由消费者线程
                    if(queue.hasWaitingConsumer()) queue.transfer(produce());
                    //休眠1s
                    TimeUnit.SECONDS.sleep(1);
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        //生产产品方法
        private String produce(){
            return "Your lucky number:"+(new Random().nextInt(100));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • Consumer
    public class Consumer implements Runnable{
        private final TransferQueue<String> queue;
    
        public Consumer(TransferQueue<String> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try{
                //消费者线程取出队列元素
                System.out.println("Consumer--"+Thread.currentThread().getName()+"--"+queue.take());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • main测试
    public class Main {
        public static void main(String[] args) {
            TransferQueue<String> queue = new LinkedTransferQueue<>();
    
            Thread producer = new Thread(new Producer(queue));
            producer.setDaemon(true);
            producer.start();
    
            for (int i = 0; i < 20; i++) {
                Thread consumer = new Thread(new Consumer(queue));
                consumer.setDaemon(true);
                consumer.start();
                try{
                    Thread.sleep(1000L);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    在这里插入图片描述

    (9)SynchronousQueue

    • SynchronousQueue也是一种BlockingQueue,是一种无缓冲的等待队列。所以在某次添加元素后必须等待其他线程取走后才能继续添加,可以认为SynchronousQueue是一个缓存值为0的阻塞队列(也可以是1),它的isEmpty()方法永远返回时true,remainingCapacity()方法永远返回时0。
    • remove和removeAll方法返回永远是false,iterator()方法永远返回空,peek()方法永远返回null。
    • 使用put()方法时,会一直阻塞在这里,等待被消费。
    • 案例代码
    public class SynchronousQueueDemo {
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<String> strings = new SynchronousQueue<>();
    
            for (int i = 0; i < 2; i++) {
                new Thread(()->{
                    try{
                        System.out.println("取出数据:"+strings.take());
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }).start();
            }
            strings.put("aaa");
            strings.put("bbb");
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    在这里插入图片描述

    在这里插入图片描述

  • 相关阅读:
    【kali-漏洞利用】(3.3)Metasploit后渗透(下):后渗透模块使用
    mybatis:mybatis-generator插件使用
    从零使用TensorFlow搭建CNN(卷积)神经网络
    JS预解析/编译(变量提升):var(仅声明,无赋值)、function变量 创建作用域
    双十二选哪个品牌led灯好一点?国产led灯这些品牌护眼好
    【JavaWeb】之Tomcat介绍、安装与使用
    docker-compose概述与简单编排部署
    解决常见的电脑故障
    防爆对讲机在消防救援工作中的重要性
    Spring Boot插件化开发概念原理及实现
  • 原文地址:https://blog.csdn.net/weixin_47533244/article/details/127804138