前言
线程交替打印,想要实现并不难,难的是从不同的角度去实现。需要对Synchronized、LockSupport、ReentrantLock、CAS、Semaphore等并发技术有较为深刻的理解,才能随心所欲的写出相应的代码。那么如何灵活的使用以上技术,实现线程交替打印呢?
基于Synchronized实现
synchronized可以保证方法或代码块在运行时,同一时刻只有一个线程可以进入到临界区(互斥性),同时它还保证了共享变量的内存可见性。使用wait()和notify()可实现线程间通讯。
/**
* synchronized线程交替打印
*
* @author liudong
* @date 2021/12/15 16:59
*/
public class SynchronizedDemo {
public static void main(String[] args) {
final Object object = new Object();
char[] a1 = "1234567".toCharArray();
char[] a2 = "ABCDEFG".toCharArray();
new Thread(() -> {
synchronized (object) {
for (char c : a1) {
System.out.print(c);
try {
object.notify();
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
object.notify();//避免有线程未被唤醒
}
}, "t1").start();
new Thread(() -> {
synchronized (object) {
for (char c : a2) {
System.out.print(c);
try {
object.notify();
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
object.notify();//避免有线程未被唤醒
}
}, "t2").start();
}
}
-
wait(),当前线程进入无限等待状态,必须被唤醒才能继续执行,调用后会释放锁对象 -
wait(long timeout),wait(long timeout,int nanos),当前线程进入等待状态,可以被提前唤醒,但在指定时间后会自动唤醒 -
notify(),随机唤醒一个在锁对象上调用wait的线程 -
notifyAll(),唤醒全部在锁对象上调用wait的线程
基于ReentrantLock实现
Condition类能实现synchronized和wait、notify搭配的功能,另外比后者更灵活,Condition可以实现多路通知功能,也就是在一个Lock对象里可以创建多个Condition(即对象监视器)实例,线程对象可以注册在指定的Condition中,从而可以有选择的进行线程通知,在调度线程上更加灵活。而synchronized就相当于整个Lock对象中只有一个单一的Condition对象,所有的线程都注册在这个对象上。线程开始notifyAll时,需要通知所有的WAITING线程,没有选择权,会有相当大的效率问题。
/**
* ReentrantLock线程交替打印
*
* @author liudong
* @date 2021/12/15 17:16
*/
public class ReentrantLockDemo {
public static void main(String[] args) {
char[] a1 = "1234567".toCharArray();
char[] a2 = "ABCDEFG".toCharArray();
Lock lock = new ReentrantLock();//锁
Condition t1 = lock.newCondition();//t1队列
Condition t2 = lock.newCondition();//t2队列
new Thread(() -> {
try {
lock.lock();
for (char c : a1) {
System.out.print(c);
t2.signal();//唤醒t2队列中等待的线程
t1.await();//进入t1队列自旋等待
}
t1.signal();//避免有线程未被唤醒
t2.signal();//避免有线程未被唤醒
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t1").start();
new Thread(() -> {
try {
lock.lock();
for (char c : a2) {
System.out.print(c);
t1.signal();//唤醒t1队列中等待的线程
t2.await();//进入t2队列自旋等待
}
t1.signal();//避免有线程未被唤醒
t2.signal();//避免有线程未被唤醒
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t2").start();
}
}
-
void await():当前线程进入等待状态,如果其他线程调用condition的signal或者signalAll方法并且当前线程获取Lock从await方法返回,如果在等待状态中被中断会抛出被中断异常; -
long awaitNanos(long nanosTimeout):当前线程进入等待状态直到被通知,中断或者超时; -
boolean await(long time, TimeUnit unit):同第二种,支持自定义时间单位 -
boolean awaitUntil(Date deadline):当前线程进入等待状态直到被通知,中断或者到了某个时间
基于LockSupport实现
LockSupport类的核心方法其实就两个:park()和unpark(),其中park()方法用来阻塞当前调用线程,unpark()方法用于唤醒指定线程。
/**
* LockSupport线程交替打印
*
* @author liudong
* @date 2021/12/15 17:03
*/
public class LockSupportDemo {
private static Thread t1;
private static Thread t2;
public static void main(String[] args) {
char[] a1 = "1234567".toCharArray();
char[] a2 = "ABCDEFG".toCharArray();
t1 = new Thread(() -> {
for (char c : a1) {
System.out.print(c);
LockSupport.unpark(t2);//释放t2线程 设置锁标志位
LockSupport.park();//阻塞当前线程
}
}, "t1");
t2 = new Thread(() -> {
for (char c : a2) {
LockSupport.park();//阻塞当前线程
System.out.print(c);
LockSupport.unpark(t1);//释放t1线程
}
}, "t2");
t1.start();
t2.start();
}
}
-
void park(): 阻塞当前线程,如果调用 unpark(Thread thread) 方法或者当前线程被中断,才能从 park()方法返回。 -
void parkNanos(long nanos): 阻塞当前线程,最长不超过 nanos 纳秒,返回条件在park() 的基础上增加了超时返回。 -
void parkUntil(long deadline): 阻塞当前线程,知道 deadline 时间 (从 1970 年开始到 deadline 时间的毫秒数)。 -
void unpark(Thread thread): 唤醒处于阻塞状态的线程 thread。 -
void park(Object blocker): 阻塞当前线程,blocker 用来标识当前线程在等待的对象。 -
parkNanos(Object blocker, long nanos): 比 void park(Object blocker) 增加一个超时时间。 -
parkUntil(Object blocker, long deadline): 比 void parkUntil(long deadline) ?多一个阻塞当前对象。
基于CAS机制实现
CAS,全称Compare And Swap(比较与交换),解决多线程并行情况下使用锁造成性能损耗的一种机制。
CAS(V, A, B),V为内存地址、A为预期原值,B为新值。如果内存地址的值与预期原值相匹配,那么将该位置值更新为新值。否则,说明已经被其他线程更新,处理器不做任何操作;无论哪种情况,它都会在 CAS 指令之前返回该位置的值。而我们可以使用自旋锁,循环CAS,重新读取该变量再尝试再次修改该变量,也可以放弃操作。
/**
* cas线程交替打印
*
* @author liudong
* @date 2021/12/15 17:18
*/
public class CasDemo {
enum ReadyToRun {
T1, T2
}
private static volatile ReadyToRun readyToRun = ReadyToRun.T1;
public static void main(String[] args) {
char[] a1 = "1234567".toCharArray();
char[] a2 = "ABCDEFG".toCharArray();
new Thread(() -> {
for (char c : a1) {
while (readyToRun != ReadyToRun.T1) {
}//cas自旋
System.out.print(c);
readyToRun = ReadyToRun.T2;//线程可见性
}
}, "t1").start();
new Thread(() -> {
for (char c : a2) {
while (readyToRun != ReadyToRun.T2) {
}//cas自旋
System.out.print(c);
readyToRun = ReadyToRun.T1;//线程可见性
}
}, "t2").start();
}
}
基于TransferQueue实现
TransferQueue(java7引入)继承了BlockingQueue(BlockingQueue又继承了Queue)并扩展了一些新方法。生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列里就完事)。
/**
* TransferQueue线程交替打印
*
* @author liudong
* @date 2021/12/15 17:19
*/
public class TransferQueueDemo {
public static void main(String[] args) {
char[] a1 = "1234567".toCharArray();
char[] a2 = "ABCDEFG".toCharArray();
TransferQueue<Character> queue = new LinkedTransferQueue<>();
new Thread(() -> {
try {
for (char c : a1) {
System.out.print(queue.take());
queue.transfer(c);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
try {
for (char c : a2) {
queue.transfer(c);
System.out.print(queue.take());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
}
}
-
transfer(E e): 若当前存在一个正在等待获取的消费者线程,即立刻将e移交之;否则将元素e插入到队列尾部,并且当前线程进入阻塞状态,直到有消费者线程取走该元素。 -
tryTransfer(E e): 若当前存在一个正在等待获取的消费者线程,则该方法会即刻转移e,并返回true;若不存在则返回false,但是并不会将e插入到队列中。这个方法不会阻塞当前线程,要么快速返回true,要么快速返回false。 -
hasWaitingConsumer()和getWaitingConsumerCount(): 用来判断当前正在等待消费的消费者线程个数。 -
tryTransfer(E e, long timeout, TimeUnit unit): 若当前存在一个正在等待获取的消费者线程,会立即传输给它; 否则将元素e插入到队列尾部,并且等待被消费者线程获取消费掉。若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素从队列中移除。
基于Semaphore实现
Semaphore 通常我们叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。
可以把它简单的理解成我们停车场入口立着的那个显示屏,每有一辆车进入停车场显示屏就会显示剩余车位减1,每有一辆车从停车场出去,显示屏上显示的剩余车辆就会加1,当显示屏上的剩余车位为0时,停车场入口的栏杆就不会再打开,车辆就无法进入停车场了,直到有一辆车从停车场出去为止。
/**
* Semaphore线程交替打印
*
* @author liudong
* @date 2021/12/15 17:21
*/
public class SemaphoreDemo {
private static char[] a1 = "1234567".toCharArray();
private static char[] a2 = "ABCDEFG".toCharArray();
static class ThreadDemo extends Thread {
private Semaphore current;
private Semaphore next;
private char[] content;
/**
* 构造方法
*
* @param current 要获取的当前锁
* @param next 要释放的下一把锁
* @param content 打印内容
*/
public ThreadDemo(Semaphore current, Semaphore next, char[] content) {
this.current = current;
this.next = next;
this.content = content;
}
@Override
public void run() {
for (char c : content) {
try {
// 获取当前锁,然后打印
current.acquire();
System.out.print(c);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 释放下一把锁
next.release();
}
}
}
public static void main(String[] args) {
// 初始化两把锁,只有A锁是可用的
Semaphore A = new Semaphore(1);
Semaphore B = new Semaphore(0);
// 创建并启动两个线程,线程1获取A锁,释放B锁
new ThreadDemo(A, B, a1).start();
// 线程2获取B锁,释放A锁
new ThreadDemo(B, A, a2).start();
}
}
-
acquire() : 获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态。 -
acquire(int permits) : 获取一个令牌,在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态。 -
acquireUninterruptibly(): 获取一个令牌,在获取到令牌之前线程一直处于阻塞状态(忽略中断) -
tryAcquire(): 尝试获得令牌,返回获取令牌成功或失败,不阻塞线程。 -
tryAcquire(long timeout, TimeUnit unit): 尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。 -
release(): 释放一个令牌,唤醒一个获取令牌不成功的阻塞线程。 -
hasQueuedThreads(): 等待队列里是否还存在等待线程。 -
getQueueLength(): 获取等待队列里阻塞的线程数。 -
drainPermits(): 清空令牌把可用令牌数置为0,返回清空令牌的数量。 -
availablePermits(): 返回可用的令牌数量。
|