(1)为什么会出现同步容器
(2)Java中的同步容器类

(3)同步容器的缺陷
public class Demo1 {
public static void main(String[] args) {
ArrayList<Integer> arrayList = new ArrayList<>();
Vector<Integer> vector = new Vector<>();
long startTime = System.currentTimeMillis();
for (int i = 0; i < 100000; i++) {
vector.add(i);
}
long endTime = System.currentTimeMillis();
System.out.println("vector循环10万次添加元素消耗的时间:"+(endTime-startTime)+"ms");
long startTime1 = System.currentTimeMillis();
for (int i = 0; i < 100000; i++) {
arrayList.add(i);
}
long endTime1 = System.currentTimeMillis();
System.out.println("ArrayList循环10万次添加元素消耗的时间:"+(endTime1-startTime1)+"ms");
}
}

进行同样多的插入操作,Vector的耗时是ArrayList的两倍。
2.同步容器的安全性问题
public class Demo2 {
private static Vector<Integer> vector = new Vector<>();
public static void main(String[] args) {
while (true) {
for (int i = 0; i < 10; i++) {
vector.add(i);
}
Thread thread1 = new Thread(() -> {
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
});
thread1.start();
thread2.start();
}
}
}

正如大家所看到的,这段代码报错了:数组下标越界。
也许有朋友会问:Vector是线程安全的,为什么还会报这个错?很简单,对于Vector,虽然能保证每一个时刻只能有一个线程访问它,但是不排除这种可能:
当某个线程在某个时刻执行这句时:
for(int i= 0;i < vector.size();i++) vector.get(i);
假若此时vector的size方法返回的是10,i的值为9
然后另外一个线程执行了这句:
for(int i= 0;i < vector.size();i++) vector.remove(i);
将下标为9的元素删除了。
那么通过get方法访问下标为9的元素肯定就会出问题了。
因此为了保证线程安全,必须在方法调用端做额外的同步措施,如下面所示:
public class Demo2 {
private static Vector<Integer> vector = new Vector<>();
private static Object object = new Object();
public static void main(String[] args) {
while (true) {
for (int i = 0; i < 10; i++) {
vector.add(i);
}
Thread thread1 = new Thread(() -> {
synchronized (object) {
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
});
Thread thread2 = new Thread(() -> {
synchronized (object) {
for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
}
});
thread1.start();
thread2.start();
}
}
}
(1)ConcurrectHashMap

public class Demo3 {
public static void main(String[] args) {
//Map map = new ConcurrentHashMap<>(); //84ms
//Map map = new ConcurrentSkipListMap<>(); //108ms
//Map map = new Hashtable<>(); //92ms
Map<String,String> map = new HashMap<>(); //139ms
Random random = new Random();
Thread[] threads = new Thread[100];
CountDownLatch latch = new CountDownLatch(threads.length);
long startTime = System.currentTimeMillis();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(()->{
for (int j = 0; j < 10000; j++) {
map.put(random.nextInt(1000000)+"",random.nextInt()+"");
latch.countDown();
}
});
}
Arrays.asList(threads).forEach(thread -> thread.start());
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println(endTime-startTime+"ms");
}
}
ConcurrentHashMap:84ms
ConcurrentSkipListMap:108ms
Hashtable:92ms
HashMap:139ms
(2)ConcurrentQueue
public class Demo4 {
private static Queue<String> queues = new ConcurrentLinkedDeque<>();
static {
for (int i = 0; i < 10000; i++) {
queues.add("票编号:"+i);
}
}
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
new Thread(()->{
while(true){
String s = queues.poll();
if (s ==null) break;
else System.out.println("销售了---"+s);
}
}).start();
}
long end = System.currentTimeMillis();
Thread.sleep(3000L);
System.out.println("总耗时:"+(end-start)+"ms");
}
}

Queue<String> strings = new ConcurrentLinkedQueue<String>();
strings.offer(元素) //相当于add,放进队列
strings.size() //获取当前队列的元素个数
strings.poll() //取出并移除
strings.peek() //取出不会移除,相当于get();
(3)CopyOnWriteArrayList
public class Demo5 implements Runnable{
private static List<String> lists = new ArrayList<>();
//private static List lists = new Vector<>();
//private static List lists = new CopyOnWriteArrayList<>();
private Random random = new Random();
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
lists.add(random.nextInt()+"");
}
}
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
System.out.println("线程开始操作");
for (int i = 0; i < 10; i++) {
new Thread(new Demo5()).start();
}
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(()->{
for (int j = 0; j < lists.size(); j++) {
lists.get(finalI);
}
}).start();
}
long end = System.currentTimeMillis();
Thread.sleep(6000L);
System.out.println("耗时:"+(end-start)+"ms");
}
}
ArrayList:报错:Exception in thread "Thread-1" java.lang.ArrayIndexOutOfBoundsException: 244
Vector:117ms
CopyOnWriteArrayList:222ms
(4)BlockingQueue


(5)LinkedBlockingQueue
public class Demo6 {
//实例化时指定容器容量
private static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1000);
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
try {
//向对列中添加元素,如果对列满了 就等待1s在进行添加
boolean b = linkedBlockingQueue.offer(finalI + "", 1, TimeUnit.SECONDS);
if (b) {
System.out.println(finalI + "队列添加成功");
} else {
System.out.println(finalI + "队列添加失败,进入等待");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
try {
//消费队列,如果为空就等待消费
String take = linkedBlockingQueue.take();
System.out.println("消费队列元素:" + take);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}

//实例化时指定容器容量
private static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1000);
linkedBlockingQueue.add(元素) //如果队列满了,再次添加就会抛出异常:java.lang.IllegalStateException: Queue full
linkedBlockingQueue.offer(元素,时间,时间单位) //队列满了,等待时间后,再次添加,失败返回false
linkedBlockingQueue.offer(元素) //队列满了,添加失败返回false,成功返回true
linkedBlockingQueue.put(元素) //加入队列,如果满了就等待阻塞
linkedBlockingQueue.take() //取出队列中的元素,如果空了,就会等待阻塞
(6)ArrayBlockingQueue

(7)DelayQueue
DelayQueue也是一个BlockingQueue,用于放置实现了Delayed接口的对象,只能是实现了Delayed接口的对象,其中对象只能在其到期时才能从队列中取走。
Delayed扩展了Comparable接口,比较的基准为延时的时间,Delayed接口实现类getDelay()返回值为固定值(final),DelayedQueue内部是使用PriorityQueue实现的;即 (DelayQueue = BlockingQueue + PriorityQueue + Delayed)
可以说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准是时间。是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时能从队列中取走。这种队列是有序的,及队头对象的延迟到期时间最长。但是要注意不能将null元素放置到队列中。
Delayed,一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。此接口的实现类必须重写一个compareTo()方法,该方法提供于此接口的getDelay()方法一致的排序。
DelayQueue存储的对象是实现了Delayed接口的对象,在这个对象中,需要重写compareTo()和getDelay()方法。
自定义MyTask类实现Delayed
public class MyTask implements Delayed {
private long time;
private String name;
private long start = System.currentTimeMillis();
public MyTask(String name,long time) {
this.time = time;
this.name = name;
}
@Override
public long getDelay(TimeUnit unit) {
return (start+time) - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "MyTask{" +
"time=" + time +
", name='" + name + '\'' +
'}';
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
DelayQueue <MyTask> myTasks = new DelayQueue<>();
new Thread(()->{
myTasks.offer(new MyTask("task1",10000));
myTasks.offer(new MyTask("task2",4000));
myTasks.offer(new MyTask("task3",4200));
myTasks.offer(new MyTask("task4",6200));
myTasks.offer(new MyTask("task5",9800));
}).start();
long start = System.currentTimeMillis();
Thread.sleep(2000);
System.out.println("队列中存放数据:");
for (MyTask myTask : myTasks) {
System.out.println(myTask);
}
System.out.println();
System.out.println("队列中取出数据:");
while(true){
MyTask myTask = myTasks.take();
System.out.println(myTask+":取出耗时:"+(System.currentTimeMillis()-start)+"ms");
}
}
}

(8)LinkedTransferQueue
public class Producer implements Runnable {
private final TransferQueue<String> queue;
//构造传入LinkedTransferQueue队列
public Producer(TransferQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
//生产者循环
while (true){
//判断当前队列是否还有消费者,有的话就生产产品交由消费者线程
if(queue.hasWaitingConsumer()) queue.transfer(produce());
//休眠1s
TimeUnit.SECONDS.sleep(1);
}
}catch (Exception e){
e.printStackTrace();
}
}
//生产产品方法
private String produce(){
return "Your lucky number:"+(new Random().nextInt(100));
}
}
public class Consumer implements Runnable{
private final TransferQueue<String> queue;
public Consumer(TransferQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try{
//消费者线程取出队列元素
System.out.println("Consumer--"+Thread.currentThread().getName()+"--"+queue.take());
}catch (Exception e){
e.printStackTrace();
}
}
}
public class Main {
public static void main(String[] args) {
TransferQueue<String> queue = new LinkedTransferQueue<>();
Thread producer = new Thread(new Producer(queue));
producer.setDaemon(true);
producer.start();
for (int i = 0; i < 20; i++) {
Thread consumer = new Thread(new Consumer(queue));
consumer.setDaemon(true);
consumer.start();
try{
Thread.sleep(1000L);
}catch (Exception e){
e.printStackTrace();
}
}
}
}

(9)SynchronousQueue
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> strings = new SynchronousQueue<>();
for (int i = 0; i < 2; i++) {
new Thread(()->{
try{
System.out.println("取出数据:"+strings.take());
}catch (Exception e){
e.printStackTrace();
}
}).start();
}
strings.put("aaa");
strings.put("bbb");
}
}

