前言
上一篇博客简单介绍了ConcurrentHashMap,这个也是并发容器之一,这一篇博客继续介绍另外两个并发容器,CopyOnWriteArrayList和BlockingQueue,同样不会深入到具体源码级别,只简单说说源码中认为比较常见的点即可。
CopyOnWriteArrayList
在上一篇博客中我们简单提到过,为了替代SynchronizedMap,才有了ConcurrentHashMap,原因是SynchronizedMap锁的粒度太大。ConcurrentHashMap锁的粒度稍微小一点,并发效率更高。同样的情况,Vector和SynchronizedList也存在同样的问题,为了解决Vector和SynchronizedList锁粒度太大和迭代时无法编辑的问题,于是有了CopyOnWriteArrayList。同类型的还有CopyOnWriteArraySet。
在CopyOnWriteArrayList中,读取是完全不用加锁的,写入也不会阻塞读取操作,只有写入与写入之间需要同步互斥等待
相关实例
其核心原理其实比较简单,CopyOnWrite本质上就是常说的读写分离,CopyOnWriteArrayList是在修改时对原有的List数据重新复制了一份,并开辟了新的内存空间,在新的数据上进行修改,修改完成之后,再将原有的数据引用指向当前新的内存地址。在修改数据期间读数据依旧读取的是原有的内存地址。
@Slf4j
public class CopyOnWriteArrayListUseDemo {
public static void main(String[] args) {
ordinalListDemo();
copyOnWriteArrayListDemo();
}
public static void ordinalListDemo(){
ArrayList<String> oridinalList = new ArrayList<>();
oridinalList.add("1");
oridinalList.add("2");
oridinalList.add("3");
oridinalList.add("4");
oridinalList.add("5");
Iterator<String> iterator = oridinalList.iterator();
while (iterator.hasNext()) {
System.out.println("oridinalList is " + oridinalList);
String next = iterator.next();
System.out.println("current node is " + next);
if (next.equals("2")) {
oridinalList.remove("5");
}
}
}
public static void copyOnWriteArrayListDemo(){
CopyOnWriteArrayList<String> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
copyOnWriteArrayList.add("1");
copyOnWriteArrayList.add("2");
copyOnWriteArrayList.add("3");
copyOnWriteArrayList.add("4");
copyOnWriteArrayList.add("5");
Iterator<String> iterator = copyOnWriteArrayList.iterator();
while (iterator.hasNext()) {
System.out.println("oridinalList is " + copyOnWriteArrayList);
String next = iterator.next();
System.out.println("current node is " + next);
if (next.equals("2")) {
copyOnWriteArrayList.remove("5");
}
if(next.equals("3")){
copyOnWriteArrayList.add("new node");
}
}
}
}
上述代码中的第二个方法运行实例如下所示
oridinalList is [1, 2, 3, 4, 5]
current node is 1
oridinalList is [1, 2, 3, 4, 5]
current node is 2
oridinalList is [1, 2, 3, 4]
current node is 3
oridinalList is [1, 2, 3, 4, new node]
current node is 4
oridinalList is [1, 2, 3, 4, new node]
current node is 5
注意最后一行输出,在编辑完成CopyOnWriteArrayList之后,输出的当前节点数据依旧是5,也就是说,修改的数据,并没有即时生效。似乎数据虽然修改了,但是读取依旧读取的是原来的数据。
第二个代码实例
@Slf4j
public class CopyOnWriteArrayListMultiIterDemo {
public static void main(String[] args) {
CopyOnWriteArrayList<Integer> nums = new CopyOnWriteArrayList<Integer>(new Integer[]{1,2,3});
Iterator<Integer> iteratorOne = nums.iterator();
System.out.println(nums);
nums.add(5);
System.out.println(nums);
Iterator<Integer> iteratorTwo = nums.iterator();
iteratorOne.forEachRemaining(System.out::print);
System.out.println();
iteratorTwo.forEachRemaining(System.out::print);
}
}
相关源码
其中的add方法
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
其中的get方法
public E get(int index) {
return get(getArray(), index);
}
private E get(Object[] a, int index) {
return (E) a[index];
}
get直接返回并没有任何加锁的逻辑
BlockingQueue
BlockingQueue是阻塞队列,这个相比于普通队列,在读取数据和存入数据的时候,存在可能阻塞的差异。关于Java中常见有以下几种队列。
阻塞队列的几个方法
这张图只是列出了几种常用的队列。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XyYdxbAP-1638009136590)(F:\blog_doc\blogPic\sync&JUC-learn\image-20211124200834327.png)]
ConcurrentLinkedQueue是一个非阻塞队列,同样也能保证线程安全。
所谓阻塞队列,其实就是带有阻塞功能的队列,其是线程安全的,通常用于生产者从一端存放数据,消费者从另一端读取数据(貌似队列大部分场景都是这个用法),而所谓的阻塞队列是指,如果队列为空,则消费者从队列读取数据会被阻塞;如果队列数据已满,生产者往队列存放数据会被阻塞。
通常的阻塞队列中会有如下一些方法
put,take | 这两个会阻塞, 队列满的时候put会阻塞,队列空的时候take会阻塞 |
---|
add,remove,element | add,如果队列满了,直接抛异常 remove,如果队列空了,直接抛异常 element,返回队列头元素,如果空抛异常 | offer,poll,peek | offer,添加,队列满了返回false poll,取出头元素,会在队列中删除头元素,队列空返回null peek,返回头元素,但不删除取出的元素 |
典型的阻塞队列
常用的阻塞队列有ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue,PriorityBlockingQueue。这里只简单介绍一下前两种
ArrayBlockingQueue
ArrayBlockingQueue是一个有界队列,在初始化的时候需要指定容量,同时还可以指定是否是公平的,如果指定其为公平的,则会将队列中等待了最长时间的数据优先处理。
@Slf4j
public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
Interviewer interviewer = new Interviewer(arrayBlockingQueue);
Consumer consumer = new Consumer(arrayBlockingQueue);
new Thread(interviewer).start();
new Thread(consumer).start();
}
}
class Interviewer implements Runnable {
private BlockingQueue queue;
public Interviewer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("10个候选人都来了");
for (int i = 0; i < 10; i++) {
String candidate = "Candidate" + (i+1);
try {
queue.put(candidate);
System.out.println("候选人" + (i + 1) + "等待面试");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
queue.put("allin");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
BlockingQueue<String> queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String msg = "";
try {
while (!((msg = queue.take()).equals("allin"))) {
System.out.println(msg + "正在面试");
}
System.out.println("所有候选人都面试完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
其put源码如下
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
LinkedBlockingQueue
这是一个无界的阻塞队列,并不是真正意义上的无界,而是容量为Integer.MAX_VALUE,意味着很大程度生产者并不会出现阻塞,底层数据结构是链表。同时其维护了两把锁,一个putLock,一个takeLock。
其中的put源码
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
其中的take源码
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
其他阻塞队列
PriorityBlockingQueue可以简单理解为PriorityQueue的线程安全版本,支持优先级,在容量不够的时候,会进行扩容,通知支持按照元素的compareTo的结果进行排序。
SynchronousQueue没有peek等函数,因为peek是取出头结点,但是SynchronousQueue的容量是0,因此没有peek方法,newCachedThreadPool线程池就是使用的这个队列作为任务队列。
总结
简单梳理了一下CopyOnWriteArrayList和BlockingQueue的内容,其中CopyOnWriteArrayList适用于读多写少的场景,BlockingQueue其实除了用于生产者消费者模式之外,还大部分用于线程池中的任务队列。
|