等待通知机制
等待通知机制的实现
wait()作用就是使当前执行代码进程进行等待,只能在同步方法和同步块中调用。当前线程释放锁,在wait()返回前,线程及其其它线程竞争重新获得锁。 notify()方法只能在同步方法和同步块中调用。
package multiply.com.test;
public class Run {
public static void main(String[] args) {
try {
Object lock = new Object();
ThreadA aThread = new ThreadA(lock);
ThreadB bThread = new ThreadB(lock);
aThread.start();
Thread.sleep(50);
bThread.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package multiply.com.test;
public class ThreadA extends Thread {
private Object lock;
public ThreadA(Object lock) {
this.lock = lock;
}
@Override
public void run() {
super.run();
try {
synchronized (lock) {
if (MyList.size() != 5) {
System.out.println("wait begin " + System.currentTimeMillis());
lock.wait();
System.out.println("wait end " + System.currentTimeMillis());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package multiply.com.test;
public class ThreadB extends Thread {
private Object lock;
public ThreadB(Object lock) {
this.lock = lock;
}
@Override
public void run() {
super.run();
synchronized (lock) {
for (int i = 0; i < 10; i++) {
MyList.add();
if (MyList.size() == 5) {
lock.notify();
System.out.println("notify sent!");
}
System.out.println("add " + (i + 1));
}
}
}
}
package multiply.com.test;
import java.util.ArrayList;
import java.util.List;
public class MyList {
private static List<String> list = new ArrayList<>();
public static void add() {
list.add("anyString");
}
public static int size() {
return list.size();
}
}
wait begin 1635219350053 add 1 add 2 add 3 add 4 notify sent! add 5 add 6 add 7 add 8 add 9 add 10 wait end 1635219350104
当interrupt方法遇到了wait方法
方法wait(long)的使用
通知过早
如果通知过早,会打乱程序正常运行的逻辑。
package multiply.com.test;
public class MyRun1 {
private String lock = new String("");
private boolean isFirstRunB = false;
private Runnable runnableA = new Runnable() {
@Override
public void run() {
try {
synchronized (lock) {
while (!isFirstRunB) {
System.out.println("begin wait");
lock.wait();
System.out.println(" end wait");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
private Runnable runnableB = new Runnable() {
@Override
public void run() {
synchronized (lock) {
System.out.println("notify begin");
lock.notify();
System.out.println("notify end");
isFirstRunB = true;
}
}
};
public static void main(String[] args) throws InterruptedException {
MyRun1 run = new MyRun1();
Thread a = new Thread(run.runnableA);
Thread b = new Thread(run.runnableB);
a.start();
b.start();
}
}
notify begin notify end
生产者和消费者模式实现
多生产者与多消费者:操作值-假死
package multiply.com.test;
public class Run {
public static void main(String[] args) throws InterruptedException {
String lock = "";
int pLen = 2;
int cLen = 2;
P p = new P(lock);
C c = new C(lock);
ThreadP[] pThread = new ThreadP[pLen];
ThreadCu[] cThread = new ThreadCu[cLen];
for (int i = 0; i < pLen; i++) {
pThread[i] = new ThreadP(p);
pThread[i].setName("Provider " + (i + 1));
pThread[i].start();
}
for (int i = 0; i < cLen; i++) {
cThread[i] = new ThreadCu(c);
cThread[i].setName("consumer " + (i + 1));
cThread[i].start();
}
Thread.sleep(5000);
Thread[] threads = new Thread[Thread.currentThread().getThreadGroup().activeCount()];
Thread.currentThread().getThreadGroup().enumerate(threads);
for (int i = 0; i < threads.length; i++) {
System.out.println(threads[i].getName() + " " + threads[i].getState());
}
}
}
package multiply.com.test;
public class ThreadCu extends Thread {
private C c;
public ThreadCu(C c) {
this.c = c;
}
@Override
public void run() {
super.run();
while (true) {
c.getValue();
}
}
}
package multiply.com.test;
public class ThreadP extends Thread {
private P p;
public ThreadP(P p) {
this.p = p;
}
@Override
public void run() {
super.run();
while (true) {
p.setValue();
}
}
}
package multiply.com.test;
public class C {
private String lock;
public C(String lock) {
this.lock = lock;
}
public void getValue() {
try {
synchronized (lock) {
Thread.sleep(100);
while ("".equals(ValueObject.value)) {
System.out.println("consumer " + Thread.currentThread().getName() + " waiting ");
lock.wait();
}
System.out.println("consumer " + Thread.currentThread().getName() + " runnable ");
System.out.println("get value = " + ValueObject.value);
ValueObject.value = "";
lock.notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package multiply.com.test;
public class P {
private String lock;
public P(String lock) {
this.lock = lock;
}
public void setValue() {
try {
synchronized (lock) {
Thread.sleep(100);
while (!ValueObject.value.equals("")) {
System.out.println("provider " + Thread.currentThread().getName() + " waiting ");
lock.wait();
}
System.out.println("provider " + Thread.currentThread().getName() + " runnable ");
String value = System.currentTimeMillis() + "_" + System.nanoTime();
System.out.println("set value = " + value);
ValueObject.value = value;
lock.notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
多生产与多消费:操作数栈
package multiply.com.test;
public class Run {
public static void main(String[] args) {
MyStack myStack = new MyStack();
P[] p = new P[5];
C[] c = new C[6];
ThreadP[] pThread = new ThreadP[5];
ThreadC[] cThread = new ThreadC[6];
for (int i = 0; i < 5; i++) {
p[i] = new P(myStack);
pThread[i] = new ThreadP(p[i]);
pThread[i].setName("p thread " + i);
pThread[i].start();
}
for (int i = 0; i < 6; i++) {
c[i] = new C(myStack);
cThread[i] = new ThreadC(c[i]);
cThread[i].setName("c thread " + i);
cThread[i].start();
}
}
}
package multiply.com.test;
public class C {
private MyStack myStack;
public C(MyStack myStack) {
this.myStack = myStack;
}
public void popService() {
System.out.println("pop = " + myStack.pop());
}
}
package multiply.com.test;
public class P {
private MyStack myStack;
public P(MyStack myStack) {
this.myStack = myStack;
}
public void pushService() {
myStack.push();
}
}
package multiply.com.test;
public class ThreadC extends Thread {
private C c;
public ThreadC(C c) {
this.c = c;
}
@Override
public void run() {
super.run();
while (true) {
c.popService();
}
}
}
package multiply.com.test;
public class ThreadP extends Thread {
private P p;
public ThreadP(P p) {
this.p = p;
}
@Override
public void run() {
super.run();
while (true) {
p.pushService();
}
}
}
package multiply.com.test;
import java.util.ArrayList;
import java.util.List;
public class MyStack {
private List<String> list = new ArrayList<>();
synchronized public void push() {
try {
while (list.size() == 1) {
this.wait();
}
list.add("anyString = " + Math.random());
this.notifyAll();
System.out.println("push = " + list.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized public String pop() {
String returnValue = "";
try {
while (list.size() == 0) {
System.out.println("pop thread " + Thread.currentThread().getName() + " process is waiting");
this.wait();
}
returnValue = "" + list.get(0);
list.remove(0);
this.notifyAll();
System.out.println("pop = " + list.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
return returnValue;
}
}
通过管道进行线程间通信:字节流
第一种是创建字节流。第二种是创建字符流。
package multiply.com.test;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class Run {
public static void main(String[] args) throws IOException, InterruptedException {
WriteData writeData = new WriteData();
ReadData readData = new ReadData();
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream();
out.connect(in);
ReadThread readThread = new ReadThread(readData, in);
WriteThread writeThread = new WriteThread(writeData, out);
readThread.start();
Thread.sleep(2000);
writeThread.start();
}
}
package multiply.com.test;
import java.io.PipedInputStream;
public class ReadThread extends Thread {
private ReadData read;
private PipedInputStream in;
public ReadThread(ReadData read, PipedInputStream in) {
this.read = read;
this.in = in;
}
@Override
public void run() {
super.run();
read.readMethod(in);
}
}
package multiply.com.test;
import java.io.PipedOutputStream;
public class WriteThread extends Thread {
private WriteData writeData;
private PipedOutputStream out;
public WriteThread(WriteData writeData, PipedOutputStream out) {
this.writeData = writeData;
this.out = out;
}
@Override
public void run() {
super.run();
writeData.writeMethod(out);
}
}
package multiply.com.test;
import java.io.IOException;
import java.io.PipedInputStream;
public class ReadData {
public void readMethod(PipedInputStream in) {
try {
System.out.println("read : ");
byte[] bytes = new byte[20];
int length = in.read(bytes);
while (length != -1) {
String newData = new String(bytes, 0, length);
System.out.print(newData);
length = in.read(bytes);
}
System.out.println();
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
package multiply.com.test;
import java.io.IOException;
import java.io.PipedOutputStream;
public class WriteData {
public void writeMethod(PipedOutputStream out) {
try {
System.out.println("write : ");
for (int i = 0; i < 300; i++) {
String outData = "" + (i + 1);
out.write(outData.getBytes());
System.out.print(outData);
}
System.out.println();
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
方法join的使用
package multiply.com.test;
public class Test {
public static void main(String[] args) {
try {
MyThread thread = new MyThread();
thread.start();
thread.join(2000);
System.out.println("End timer = " + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package multiply.com.test;
public class MyThread extends Thread {
@Override
public void run() {
super.run();
try {
System.out.println("begin timmer = " + System.currentTimeMillis());
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
begin timmer = 1635257078799 End timer = 1635257080818
类ThreadLocal的使用
验证线程变量的私有性
package multiply.com.test;
public class Run {
public static void main(String[] args) {
try {
for (int i = 0; i < 10; i++) {
System.out.println("Main process value = " + Tools.t1.get());
Thread.sleep(100);
}
Thread.sleep(5000);
ThreadA a = new ThreadA();
a.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package multiply.com.test;
import java.util.Date;
public class ThreadLocalExt extends ThreadLocal<Long> {
@Override
protected Long initialValue() {
return System.currentTimeMillis();
}
}
package multiply.com.test;
public class ThreadA extends Thread {
@Override
public void run() {
super.run();
try {
for (int i = 0; i < 10; i++) {
System.out.println("ThreadA process value = " + Tools.t1.get());
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package multiply.com.test;
public class Tools {
public static ThreadLocalExt t1 = new ThreadLocalExt();
}
Main process value = 1635257609963 Main process value = 1635257609963 Main process value = 1635257609963 Main process value = 1635257609963 Main process value = 1635257609963 Main process value = 1635257609963 Main process value = 1635257609963 Main process value = 1635257609963 Main process value = 1635257609963 Main process value = 1635257609963 ThreadA process value = 1635257615968 ThreadA process value = 1635257615968 ThreadA process value = 1635257615968 ThreadA process value = 1635257615968 ThreadA process value = 1635257615968 ThreadA process value = 1635257615968 ThreadA process value = 1635257615968 ThreadA process value = 1635257615968 ThreadA process value = 1635257615968 ThreadA process value = 1635257615968
package multiply.com.test;
public class Run {
public static void main(String[] args) {
try {
for (int i = 0; i < 10; i++) {
System.out.println("Main process value = " + Tools.t1.get());
Thread.sleep(102);
}
Thread.sleep(5000);
ThreadA a = new ThreadA();
a.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package multiply.com.test;
public class Tools {
public static InheritableThreadLocalExt t1 = new InheritableThreadLocalExt();
}
package multiply.com.test;
public class ThreadA extends Thread {
@Override
public void run() {
super.run();
try {
for (int i = 0; i < 10; i++) {
System.out.println("ThreadA process value = " + Tools.t1.get());
Thread.sleep(101);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package multiply.com.test;
public class InheritableThreadLocalExt extends InheritableThreadLocal<Long> {
@Override
protected Long initialValue() {
return System.currentTimeMillis();
}
}
Main process value = 1635258018157 Main process value = 1635258018157 Main process value = 1635258018157 Main process value = 1635258018157 Main process value = 1635258018157 Main process value = 1635258018157 Main process value = 1635258018157 Main process value = 1635258018157 Main process value = 1635258018157 Main process value = 1635258018157 ThreadA process value = 1635258018157 ThreadA process value = 1635258018157 ThreadA process value = 1635258018157 ThreadA process value = 1635258018157 ThreadA process value = 1635258018157 ThreadA process value = 1635258018157 ThreadA process value = 1635258018157 ThreadA process value = 1635258018157 ThreadA process value = 1635258018157 ThreadA process value = 1635258018157
|