阻塞队列
概念
队列
队列就可以想成是一个数组,从一头进入,一头出去,排队买饭
阻塞队列
BlockingQueue 阻塞队列,排队拥堵,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如下图所示:
线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素
也就是说 试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其它线程往空的队列插入新的元素
同理,试图往已经满的阻塞队列中添加新元素的线程,直到其它线程往满的队列中移除一个或多个元素,或者完全清空队列后,使队列重新变得空闲起来,并后续新增
阻塞队列的方法:
BlockingQueue核心方法
抛出异常 | 当阻塞队列满时:在往队列中add插入元素会抛出 IIIegalStateException:Queue full 当阻塞队列空时:再往队列中remove移除元素,会抛出NoSuchException |
---|
特殊性 | 插入方法,成功true,失败false 移除方法:成功返回出队列元素,队列没有就返回空 | 一直阻塞 | 当阻塞队列满时,生产者继续往队列里put元素,队列会一直阻塞生产线程(一直尝试放入元素)直到put数据or响应中断退出, 当阻塞队列空时,消费者线程试图(一直尝试取出元素)从队列里take元素,队列会一直阻塞消费者线程直到队列可用。 | 超时退出 | 当阻塞队列满时,队里会阻塞生产者线程一定时间,在这个时间内,一直尝试向队列中放入或取出元素,超过限时后生产者线程会退出 |
生产者消费者 传统版
题目:一个初始值为0的变量,两个线程对其交替操作,一个加1,一个减1,来5轮
/**
* 线程 操作 资源类
* 判断 干活 通知
* 防止虚假唤醒机制
*/
/**
* 资源类
*/
class ShareData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws Exception{
// 同步代码块,加锁
lock.lock();
try {
// 判断
while(number != 0) {
// 等待不能生产
condition.await();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// 通知 唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() throws Exception{
// 同步代码块,加锁
lock.lock();
try {
// 判断
while(number == 0) {
// 等待不能消费
condition.await();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// 通知 唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ProdConsumerTraditionDemo {
public static void main(String[] args) {
// 高内聚,低耦合 内聚指的是,一个空调,自身带有调节温度高低的方法
ShareData shareData = new ShareData();
// t1线程,生产
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "t1").start();
// t2线程,消费
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "t2").start();
}
}
?生产者消费者 ?阻塞队列版
/**
* 生产者消费者 阻塞队列版
*
*/
class MyResource {
// 使用原子包装类,而不用number++
private AtomicInteger atomicInteger = new AtomicInteger();
// 这里不能为了满足条件,而实例化一个具体的SynchronousBlockingQueue
BlockingQueue<String> blockingQueue = null;
// 而应该采用依赖注入里面的,构造注入方法传入
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
// 查询出传入的class是什么
System.out.println(blockingQueue.getClass().getName());
}
/**
* 生产
* @throws Exception
*/
public void myProd() throws Exception{
String data = null;
// 多线程环境的判断,一定要使用while进行,防止出现虚假唤醒
// 当FLAG为true的时候,开始生产
data = atomicInteger.incrementAndGet() + "";
blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data + "成功" );
}
/**
* 消费
* @throws Exception
*/
public void myConsumer() throws Exception{
String retValue= blockingQueue.poll(2L, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + "\t 消费队列:" + retValue + "成功" );
}
}
class ProdConsumerBlockingQueueDemo {
public static void main(String[] args) {
// 传入具体的实现类, ArrayBlockingQueue
MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(1));
new Thread(() -> {
for(int i=0;i<100;i++){
try {
myResource.myProd();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 消费线程启动");
for(int i=0;i<100;i++){
try {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "consumer").start();
// 5秒后,停止生产和消费
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
|