目录
读写锁ReadWriteLock
阻塞队列(Array/Linked)BlockingQueue
什么情况会使用阻塞队列?
四组API
1.抛出异常
2.不会抛出异常
3.等待阻塞(一直阻塞)
4.等待超时
同步队列(SynchronousQueue)
读写锁ReadWriteLock
读的时候可以被多线程同时读,写的时候只能有一个线程去写。
- 独占锁(写锁):一次只能被一个线程占有
- 共享锁(读锁):可以被多个线程同时占有
- 读-读:可以共存
- 读-写:不能共存
- 写-写:不能共存
public class ReadWriteLockTest {
public static void main(String[] args) {
MyCacheLock myCache = new MyCacheLock();
//写入
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
myCache.put(temp+"",temp+"");
},String.valueOf(i)).start();
}
//读取
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
myCache.get(temp+"");
},String.valueOf(i)).start();
}
}
}
//加锁的
class MyCacheLock{
private volatile Map<String,Object> map = new HashMap<>();
//读写锁(更加细粒度的控制)
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//存 写入的时候,只希望同时只有一个线程在写
public void put(String key, Object value){
//加写锁
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
//读 取,所有人可以读
public void get(String key){
//加读锁
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
阻塞队列(Array/Linked)BlockingQueue
写入:如果队列满了,就必须阻塞等待 读取:如果队列是空的,必须阻塞等待生产
?
什么情况会使用阻塞队列?
多线程(A调用B,必须等B先执行,B没有执行完,A就会挂起或者等待) 线程池(出了弹性大小之外,一般会用一个队列去维护里面的大小)
学会使用队列 添加,移除
四组API
方式 | 抛出异常 | 有返回值,不抛出异常 | 阻塞等待 | 超时等待 |
---|
添加 | add | offer | put | offer(,) | 移除 | remove | poll | take | poll(,) | 判断队列首 | element | peek | - | - |
?
1.抛出异常
public class Test {
public static void main(String[] args) {
//List,Set的父类Collection
//BlockQueue不是新的东西继承自 Collection
// 什么情况下会使用阻塞队列:多线程并发处理,线程池
test1();
}
// 1. 抛出异常
public static void test1 () {
//队列的大小
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(arrayBlockingQueue.add("a"));
System.out.println(arrayBlockingQueue.add("b"));
System.out.println(arrayBlockingQueue.add("c"));
System.out.println(arrayBlockingQueue.element());//查看队首元素
//ava.lang.IllegalStateException
//java.lang.IllegalStateException: Queue full 抛出异常
System.out.println(arrayBlockingQueue.add("d"));//队列满了抛出异常
System.out.println("=============");
//队列移除顺序
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
//java.util.NoSuchElementException
System.out.println(arrayBlockingQueue.remove());//队列为空,抛出异常
}
}
2.不会抛出异常
// 2. 有返回值,不抛出异常
public static void test2(){
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(arrayBlockingQueue.offer("a"));
System.out.println(arrayBlockingQueue.offer("b"));
System.out.println(arrayBlockingQueue.offer("c"));
System.out.println(arrayBlockingQueue.peek());//查看队首元素
System.out.println(arrayBlockingQueue.offer("d")); //返回false 不抛出异常
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll()); //返回null 不抛出异常
}
3.等待阻塞(一直阻塞)
// 3. 等待,阻塞(一直阻塞)
public static void test3() throws InterruptedException {
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
arrayBlockingQueue.put("a");
arrayBlockingQueue.put("b");
arrayBlockingQueue.put("c");
//arrayBlockingQueue.put("d"); //队列没有位置,一直阻塞
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());
//System.out.println(arrayBlockingQueue.take()); //没有这个元素,一直阻塞
}
4.等待超时
// 4. 等待,阻塞(等待超时)
public static void test4() throws InterruptedException {
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(arrayBlockingQueue.offer("a"));
System.out.println(arrayBlockingQueue.offer("b"));
System.out.println(arrayBlockingQueue.offer("c"));
//等待超过2秒就退出
System.out.println(arrayBlockingQueue.offer("d", 2, TimeUnit.SECONDS));
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
//等待超过2秒就退出
System.out.println(arrayBlockingQueue.poll(2, TimeUnit.SECONDS));
}
同步队列(SynchronousQueue)
- 没有容量,进去一个元素,必须等待取出来之后,才能再往里面放一个元素。
- 和其他的BlockingQueue 不一样, SynchronousQueue 不存储元素
- put进去一个元素,必须从里面先take取出来,否则不能再put进去值!
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();//同步队列
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" put 1");
synchronousQueue.put("1");
System.out.println(Thread.currentThread().getName()+" put 2");
synchronousQueue.put("2");
System.out.println(Thread.currentThread().getName()+" put 3");
synchronousQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
new Thread(()->{
try {
//模拟睡眠3s
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" take "+synchronousQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" take "+synchronousQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" take "+synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}
?
|