线程的状态:
New
Runnable
Blocked : 某些操作被阻塞
Waiting: 某些操作在等待
Time Waiting: 因为sleep()的计时等待
Terminated: 打断
t.join()
t.join(毫秒);
线程打断:
-
interrupt() 中断一个线程,目标线程通过检测isInterrupted() 标志获取自身是否被打断。。如果目标线程处于等待状态,该线程会捕获到 InterruptException; 当你捕获到 InterruptException就意味着有线程希望打断了,这时候应该主动的结束自身线程,给自身线程运行的其他线程提示打断 public class Demo1 {
public static void main(String[] args) throws InterruptedException {
MyThread t = new MyThread();
t.start();
Thread.sleep(1);
t.interrupt();
t.join();
System.out.println("end");
}
}
class MyThread extends Thread{
@Override
public void run() {
HelloThread hello = new HelloThread();
hello.start();
try {
hello.join();
} catch (InterruptedException e) {
e.printStackTrace();
hello.interrupt();
}
System.out.println("mythread end");
}
}
class HelloThread extends Thread{
@Override
public void run() {
int n =0;
while (!isInterrupted()){
n++;
System.out.println("hello"+n);
}
System.out.println("hehe");
}
}
-
设置标志位。。表示这个线程是否在运行 public class Demo2 {
public static void main(String[] args) throws InterruptedException {
HelloThread1 t = new HelloThread1();
t.start();
Thread.sleep(1);
t.running=false;
}
}
class HelloThread1 extends Thread{
public volatile boolean running = true;
@Override
public void run() {
int n = 0;
while (running){
n++;
System.out.println("hello"+n);
}
System.out.println("end");
}
}
volatile 作用:
- 每次访问变量时,总是获取主内存的最新值
- 每次修改变量后,立刻回写到主内存
守护线程(Daemon Thread)
? 守护线程是指为其他线程服务的线程,在JVM中,所有非守护线程都执行完毕后,无论有没有守护线程,虚拟机都会自动退出
。守护线程不能持有任何需要关闭的资源,例如:打开文件等,因为虚拟机退出后,没有任何机会来关闭文件,会导致数据丢失
在调用start() 之前,调用setDeamon(true)
public class Demo4 {
public static void main(String[] args) throws InterruptedException {
TimerThread t = new TimerThread();
t.setDaemon(true);
t.start();
Thread.sleep(2000);
}
}
class TimerThread extends Thread{
@Override
public void run() {
while (true){
System.out.println(LocalTime.now());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
线程同步
临界区(critical section) : 这种加锁和解锁之间的代码块我们称之为临界区,任何时候临界区最多只有一个线程能执行
synchronized(lock){...}
注意事项:
-
使用synchronized的时候,不必担心抛出异常,因为无论是否有异常,都会在synchronized 结束处正确释放锁 -
组之间不存在竞争的,应该使用不同的锁,不然执行效率大大降低 -
JVM规范定义了几种原子操作
- 基本类型(long和double除外)赋值
- 引用类型的赋值
long和double是64位数据,JVM没有明确规定64位赋值操作是不是一个原子操作,不过在x64平台的JVM是把long和double的赋值作为原子操作实现的
public class Demo5 {
public static void main(String[] args) throws InterruptedException {
AddThread add = new AddThread();
DecThread dec = new DecThread();
add.start();
dec.start();
add.join();
dec.join();
System.out.println(Counter.count);
}
}
class Counter{
public static final Object lock = new Object();
public static int count = 0;
}
class AddThread extends Thread{
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
synchronized (Counter.lock){
Counter.count += 1;
}
}
}
}
class DecThread extends Thread{
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
synchronized (Counter.lock){
Counter.count -= 1;
}
}
}
}
- 有些时候,通过一些巧妙的转换,可以把非原子操作变成原子操作
class Pair {
int first;
int last;
public void set(int first, int last) {
synchronized(this) {
this.first = first;
this.last = last;
}
}
}
class Pair {
int[] pair;
public void set(int first, int last) {
int[] ps = new int[] { first, last };
this.pair = ps;
}
}
让线程自己选择锁对象往往使得代码逻辑混乱,也不利于封装。
一般的方法锁对象为this ,,静态方法为 Class.class
如果一个类被设计为多线程正确访问,我们就说这个类是线程安全的(thread-safe)…比如:java.lang.StringBuffer
还有一些不变类,例如:String,Integer,LocalDate 他们所有成员变量都是final,多线程同时访问时只能读不能写,这些不变类也是线程安全的,,
类似Math这些只提供静态方法,没有成员变量的类,也是线程安全的‘
没有特殊说明,一个类默认是非线程安全的
死锁
获取多个锁的时候,两个线程各自持有不同的锁,然后各自试图获取对方手里的锁,造成双方无限等待
可重入锁:JVM允许同一个线程重复获取同一个锁,这种能被同一个线程反复获取的锁,就叫做可重入锁。。获取锁的时候,不但要判断是否是第一次获取,还要记录这是第几次获取,每一次获取,记录+1,每次退出synchronized 块,记录 -1.减到0的时候才会真正释放锁
wait() 和 notify()
wait():
- 是一个native方法
- 只能在synchronized块中调用。因为调用完成后要释放锁。。。 锁对象.wait()
notify():
- 唤醒锁对象wait()的线程。。notify()只会唤醒其中一个(具体哪个依赖操作系统,有一定的随机性)
- notifyAll() 唤醒所有
ReentrantLock
java语言直接提供了synchronized 用于加锁,但是这种锁一是很重,二是获取时必须一直等待,没有额外的尝试机制
java.util.concurrent 包中的ReentrantLock 用于替代synchronized 加锁:
声明一个 ReentrantLock()
lock()
unlock()
tryLock() 重试
public class Demo2 {
private final Lock lock = new ReentrantLock();
private int count;
public void add(int n){
lock.lock();
try {
count += n;
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public boolean hello() throws InterruptedException {
boolean b = lock.tryLock(1, TimeUnit.SECONDS);
System.out.println(b);
return b;
}
public static void main(String[] args) throws InterruptedException {
Demo2 d = new Demo2();
new Thread(()->{
d.add(5);
}).start();
while (!d.hello()){
System.out.println("没有锁");
}
System.out.println("i get it");
}
}
Condition
synchronized 可以配合 wait 和 notify 实现条件不满足时等待,条件满足时唤醒。。
ReentrantLock 用 Condition对象来实现 wait 和 notify 功能
Condition对象必须从Lock实例的 newCondition()返回,这样才能获得了一个绑定了Lock实例的Condition实例
Condition方法:
- await() : await()可以在等待指定时间后,如果还没有被其他线程唤醒,可以自己醒来!!
- signal() // signal:标志暗号
- signalAll()
public class Demo3 {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private Queue<String> queue= new LinkedList<>();
public void addTask(String s){
lock.lock();
try {
queue.add(s);
condition.signalAll();
} finally {
lock.unlock();
}
}
public String getTask() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()){
condition.await();
}
return queue.remove();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
Demo3 d = new Demo3();
new Thread(()->{
try {
System.out.println(d.getTask());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
d.addTask("cc");
}).start();
}
}
ReadWriteLock
ReentrantLock保护的有些过头,只允许一个线程修改
- ReadWriteLock只允许一个线程写入(其他线程既不能写入也不能读取)。
- 没有写入时,多个线程允许同时读(提高性能)
public class Demo4 {
private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
private final Lock rlock=rwlock.readLock();
private final Lock wlock=rwlock.writeLock();
private String s;
public void set(String s){
wlock.lock();
try {
this.s= s;
Thread.sleep(1000);
System.out.println(s);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
wlock.unlock();
}
}
public String get(){
rlock.lock();
try {
System.out.println("----");
System.out.println(this.s);
Thread.sleep(1000);
return s;
} catch (InterruptedException e) {
e.printStackTrace();
return null;
} finally {
rlock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
Demo4 d = new Demo4();
long oldTime = System.currentTimeMillis();
Thread t1 = new Thread(() -> {
d.get();
});
Thread t2 = new Thread(() -> {
d.get();
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(System.currentTimeMillis()-oldTime);
}
}
StampedLock
ReadWriteLock:在读的过程中不允许写,这是一种悲观的读锁
为了进一步提升并发执行效率,Java 8 中引入了新的读写锁: StampedLock
- 读的过程中页允许写锁的写入!!这样一来,我们读的数据就可能不一致,所以需要一点额外的代码来判断读的过程中是否有写入。这种读锁是一种乐观锁
- StampedLock 是不可重入锁
乐观锁的意思就是:乐观的估计读的过程中大概率不会有写入。。
悲观锁就是:读的过程中拒绝有写入
显然,乐观锁的并发效率更高,但一旦有效概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就好
public class Demo5 {
private final StampedLock stampedLock= new StampedLock();
private double x;
private double y;
public void move(double dx,double dy){
long stamp = stampedLock.writeLock();
try {
x += dx;
y += dy;
}finally{
stampedLock.unlockWrite(stamp);
}
}
public double distanceFromOrigin(){
long stamp = stampedLock.tryOptimisticRead();
double currentX = x;
double currentY=y;
if(!stampedLock.validate(stamp)){
stamp = stampedLock.readLock();
try {
currentX =x;
currentY=y;
}finally{
stampedLock.unlockRead(stamp);
}
}
return x+y;
}
}
Concurrent集合
java.util.concurrent 包提供了并发集合类
java.util.Collections 工具类还提供了一个旧的线程安全集合转换器:
Map unsafeMap = new HashMap();
Map threadSafeMap = Collections.synchronizedMap(unsafeMap)
实际上是用一个包装类包装了非线性安全的Map,然后对所有读写方法都用synchronized 加锁,。。比java.util.concurrent 集合要低得多,不推荐使用
线程池
public class Demo7 {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(4);
for (int i = 0; i < 6; i++) {
es.submit(new Task(""+i));
}
es.shutdown();
System.out.println(es.awaitTermination(10L, TimeUnit.SECONDS));
}
}
class Task implements Runnable{
private final String name;
Task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("start task" + name);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end task"+ name);
}
}
Future
java标准库还提供了一个Callable接口,和Runnable比,多了个返回值。。
当我们提交一个Callable任务后,会同时得到一个Future对象,然后主程序调用Future对象的get()方法,就可以获得异步执行的结果。。在调用get()时,如果异步任务已经完成,就可以直接获得结果,否则会阻塞,直到任务完成才返回结果
Future 接口:
? 表示一个未来可能返回的结果。。
- get()
- get(long timeout, TimeUnit unit) 获取结果,但只等待指定的时间
- cancel(boolean mayInterruptIfRunning) :取消当前任务
- isDone() : 判断任务是否完成
public class Demo8 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(4);
Future<?> future = es.submit(new Task1("123456"));
System.out.println(future.isDone());
System.out.println(future.get()+"---");
System.out.println(future.isDone());
es.shutdown();
}
}
class Task1 implements Callable<BigDecimal>{
public Task1(String code) {
}
@Override
public BigDecimal call() throws Exception {
Thread.sleep(1000);
double d = Math.random() * 20 + 5;
return new BigDecimal(d).setScale(2, RoundingMode.DOWN);
}
}
CompletableFuture
使用Future获得异步执行结果,要么调用异步阻塞方法 get() ,.,要么轮询 isDone() 是否为true。。这两种方法都不好,因为主线程会被迫等待。。
Java8引入了 CompletableFuture 针对Future做了改进,可以传入回调对象。当异步任务完成或者发生异常时,自动调用对应的回调方法
public static void main(String[] args) throws Exception {
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice);
cf.thenAccept((result) -> {
System.out.println("price: " + result);
});
cf.exceptionally((e) -> {
e.printStackTrace();
return null;
});
Thread.sleep(200);
}
CompletableFuture 会被提交给默认的线程池执行,我们需要定义的是CompletableFuture 完成时的异常和需要回调的实例
CompletableFuture 优点:
- 异步任务结束时,自动回调某个对象的方法
- 异步任务异常时,自动回调某个对象的方法
- 主线程设置好回调后,不再关心异步任务的执行
- 多个CompletableFuture可以串行执行
thenApplayAysnc() - 多个CompletableFuture可以并行执行 。。使用
anyOf(future1,future2) 获取其中任意一个返回的结果(只要一个成功),allOf() 需要所有CompletableFuture都必须成功
thenAccept() 处理正常结果
exceptional() 处理异常结果
thenApplyAsync() 串行化另一个CompletableFuture
anyOf()和allOf() 用于并行化多个 CompletableFuture
public class Demo9 {
public static void main(String[] args) throws InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "123";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "456";
});
CompletableFuture<Object> future3 = CompletableFuture.anyOf(future1, future2);
CompletableFuture<String> future4= future3.thenApplyAsync((code) -> {
return code + "hehe";
});
future4.thenAccept((result)->{
System.out.println("sb"+result);
});
Thread.sleep(2000);
}
}
ForkJoin
将一个大任务拆分成若干个小任务,待续。。
ThreadLocal
ThreadLocal表示线程的局部变量,确保每个线程的ThreadLocal变量都是各自独立的
ThreadLocal适合在一个线程的处理流程中保持上下文(避免了同一参数在所有方法中传递)
使用ThreadLocal用使用try…finally结构,在finally中清除。。;否则该线程执行其他代码时,会把上一次的状态带进去
实现 AutoCloseable接口,,强制重写close()…关闭ThreaLocal
public class Hehe {
public static void main(String[] args) {
try(UserContext cc = new UserContext("cc")){
System.out.println(UserContext.getName());
}
}
}
class UserContext implements AutoCloseable{
static final ThreadLocal<String> tl = new ThreadLocal<>();
public UserContext(String user) {
tl.set(user);
}
public static String getName(){
return tl.get();
}
@Override
public void close() {
tl.remove();
}
}
抄袭: 廖雪峰官网
|