1 Lock接口
JDK5加入,与synchronized比较,显示定义,结构更灵活。 提供更多实用性方法,功能更强大、性能更优越。(同步锁、互斥锁)
常用方法: |
---|
void lock() //获取锁,如锁被占用,则等待。 | boolean tryLock() //尝试获取锁(成功返回true。失败返回false,不阻塞)。 | void unlock() //释放锁。 |
2 重入锁
所谓重入锁,指的是以线程为单位,当一个线程获取对象锁之后,这个线程可以再次获取本对象上的锁,而其他的线程是不可以的。 synchronized 和 ReentrantLock 都是可重入锁。(参考API) 可重入锁的意义在于防止死锁。
ReentrantLock:Lock接口的实现类,与synchronized一样具有互斥锁功能。
示例案例1(多个线程向数组中添加数据)
class TestMyList {
private Lock lock = new ReentrantLock();
private String[] strs = {"A","B","","",""};
private int count = 2;
public void add(String val){
lock.lock();
try {
strs[count] = val;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
System.out.println(Thread.currentThread().getName()+"添加了数据:"+val);
} finally {
lock.unlock();
}
}
public String[] getStr(){
return strs;
}
}
public class MyList {
public static void main(String[] args) throws InterruptedException {
final TestMyList list = new TestMyList();
Runnable runnable1 = new Runnable(){
@Override
public void run() {
list.add("hello");
}
};
Runnable runnable2 = new Runnable(){
@Override
public void run() {
list.add("world");
}
};
Thread th1 = new Thread(runnable1);
Thread th2 = new Thread(runnable2);
th1.start();
th2.start();
th1.join();
th2.join();
System.out.println(Arrays.toString(list.getStr()));
}
}
示例案例2(4个窗口卖火车票)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Ticket implements Runnable{
private int ticket = 100;
private Lock lock = new ReentrantLock();
@Override
public void run() {
while(true){
lock.lock();
try {
if(ticket <= 0){
break;
}
System.out.println(Thread.currentThread().getName()+"卖了第"+ticket+"张票");
ticket--;
} finally {
lock.unlock();
}
}
}
}
public class TestTicket {
public static void main(String[] args) {
Ticket ticket = new Ticket();
ExecutorService es = Executors.newFixedThreadPool(4);
for (int i = 0; i < 4; i++) {
es.submit(ticket);
}
es.shutdown();
}
}
可重入锁可以参考以下链接:https://blog.csdn.net/yanyan19880509/article/details/52345422.
2 读写锁
ReentrantReadWriteLock implements ReadWriteLock(查看jdk API) 一种支持一写多读的同步锁,读写分离,可分别分配读锁、写锁。 支持多次分配读锁,使多个读操作可以并发执行。
互斥规则 |
---|
写-写:互斥,阻塞 | 读-写:互斥,读阻塞写、写阻塞读 | 读-读:不互斥、不阻塞。(效率高) 可以采用异步方式 |
读写锁在读操作远远高于写操作的环境中,可在保障线程安全的情况下,提高运行效率。
示例案例1(演示 读写锁 使用,4秒左右)
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
class ReadWriteDemo {
private String val;
ReentrantReadWriteLock rrw = new ReentrantReadWriteLock();
private ReadLock readLock = rrw.readLock();
private WriteLock writeLock = rrw.writeLock();
public void setVal(String val){
writeLock.lock();
try {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" 写入:"+val);
this.val = val;
} finally {
writeLock.unlock();
}
}
public String getVal(){
readLock.lock();
try {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" 读取:"+this.val);
return this.val;
} finally{
readLock.unlock();
}
}
}
public class Test {
public static void main(String[] args) {
final ReadWriteDemo readWriteDemo = new ReadWriteDemo();
ExecutorService es = Executors.newFixedThreadPool(20);
Runnable readRunnable = new Runnable(){
@Override
public void run() {
readWriteDemo.getVal();
}
};
Runnable writeRunnable = new Runnable(){
@Override
public void run() {
readWriteDemo.setVal("Hello"+new Random().nextInt(100));
}
};
long start = System.currentTimeMillis();
for (int i = 0; i < 2; i++) {
es.submit(writeRunnable);
}
for (int i = 0; i < 18; i++) {
es.submit(readRunnable);
}
es.shutdown();
while(!es.isTerminated()){
}
long end = System.currentTimeMillis();
System.out.println("用时:"+(end-start));
}
}
示例案例2(演示 互斥锁 使用,20秒左右)
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
class ReadWriteDemo {
private String val;
private ReentrantLock lock = new ReentrantLock();
public void setVal(String val){
lock.lock();
try {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" 写入:"+val);
this.val = val;
} finally {
lock.unlock();
}
}
public String getVal(){
lock.lock();
try {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" 读取:"+this.val);
return this.val;
} finally{
lock.unlock();
}
}
}
public class Test2 {
public static void main(String[] args) {
final ReadWriteDemo readWriteDemo = new ReadWriteDemo();
ExecutorService es = Executors.newFixedThreadPool(20);
Runnable readRunnable = new Runnable(){
@Override
public void run() {
readWriteDemo.getVal();
}
};
Runnable writeRunnable = new Runnable(){
@Override
public void run() {
readWriteDemo.setVal(" Hello "+new Random().nextInt(100));
}
};
long start = System.currentTimeMillis();
for (int i = 0; i < 2; i++) {
es.submit(writeRunnable);
}
for (int i = 0; i < 18; i++) {
es.submit(readRunnable);
}
es.shutdown();
while(!es.isTerminated()){
}
long end = System.currentTimeMillis();
System.out.println("用时:"+(end-start));
}
}
boolean isTerminated() 若关闭后所有任务都已完成,则返回true。注意除非首先调用shutdown或shutdownNow,否则isTerminated永不为true。
3 线程安全的集合
线程安全集合介绍
Collection体系集合、以及线程安全集合。
Collections工具类中提供了多个可以获得线程安全集合的方法。 |
---|
public static Collection synchronizedCollection(Collection c) | public static List synchronizedList(List list) | public static Set synchronizedSet(Set s) | public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) | public static SortedSet synchronizedSortedSet(SortedSet s) | public static <K,V> SortedMap<K,V> synchronizedSortedMap(SortedMap<K,V> m) |
4 CopyOnWriteArrayList
import java.util.ArrayList;
public class Demo1 {
public static void main(String[] args) {
ArrayList list = new ArrayList();
for (int i = 0; i < 20; i++) {
int temp = i;
new Thread(new Runnable(){
@Override
public void run() {
for (int j = 0; j < 10; j++) {
list.add(Thread.currentThread().getName()+"======"+temp+"====="+j);
System.out.println(list.toString());
System.out.println("--------");
}
}
}).start();
}
}
}
Exception in thread "Thread-18" --------java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
at java.util.ArrayList$Itr.next(ArrayList.java:851)
at java.util.AbstractCollection.toString(AbstractCollection.java:461)
at Demo1$1.run(Demo1.java:20)
at java.lang.Thread.run(Thread.java:745)
集合线程不安全问题解决方案
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class Demo2 {
public static void main(String[] args) {
ArrayList list1 = new ArrayList();
List list = Collections.synchronizedList(list1);
for (int i = 0; i < 20; i++) {
int temp = i;
new Thread(new Runnable(){
@Override
public void run() {
for (int j = 0; j < 10; j++) {
list.add(Thread.currentThread().getName()+"======"+temp+"====="+j);
System.out.println(list.toString());
}
}
}).start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("元素个数:"+list.size());
for (Object str:list) {
System.out.println(str);
}
}
}
CopyOnWriteArrayList 特点
线程安全的ArrayList,加强版读写分离。 写有锁,读无锁,读写之间不阻塞,优于读写锁。 写入时,先copy一个容器副本、再添加新元素,最后替换引用。 使用方式与ArrayList无异。
public class TestCopyOnWriteArrayList {
public static void main(String[] args) {
List<String> list = new CopyOnWriteArrayList<String>();
}
}
特点:以空间换取性能,缺点是比较耗费空间。
import java.util.concurrent.CopyOnWriteArrayList;
public class Demo3 {
public static void main(String[] args) {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<String>();
for (int i = 0; i < 5; i++) {
int temp = i;
new Thread(new Runnable(){
@Override
public void run() {
for (int j = 0; j < 10; j++) {
list.add(Thread.currentThread().getName()+"======"+temp+"====="+j);
System.out.println(list.toString());
}
}
}).start();
}
}
}
注意:结果可能没有产生50个数据,因为在输出最终结果的时候,可能线程还没结束!
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Demo4 {
public static void main(String[] args) {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<String>();
ExecutorService es = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
es.submit(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 10; j++) {
list.add(Thread.currentThread().getName()+"..."+new Random().nextInt(1000));
}
}
});
}
es.shutdown();
while(!es.isTerminated()) {
}
System.out.println("元素个数:"+list.size());
for (String str:list) {
System.out.println(str);
}
}
}
CopyOnWriteArrayList 源碼
final void setArray(Object[] a) {
array = a;
}
public CopyOnWriteArrayList() {
setArray(new Object[0]);
}
5 CopyOnWriteArraySet示例代码
import java.util.concurrent.CopyOnWriteArraySet;
public class Demo5 {
public static void main(String[] args) {
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<String>();
set.add("apple");
set.add("huawei");
set.add("xiaomi");
set.add("lenovo");
set.add("apple");
System.out.println("元素个数:"+set.size());
System.out.println(set.toString());
}
}
CopyOnWriteArraySet源碼
public class CopyOnWriteArraySet<E> extends AbstractSet<E>
implements java.io.Serializable {
private static final long serialVersionUID = 5457747651344034263L;
private final CopyOnWriteArrayList<E> al;
public CopyOnWriteArraySet() {
al = new CopyOnWriteArrayList<E>();
}
public CopyOnWriteArraySet(Collection<? extends E> c) {
if (c.getClass() == CopyOnWriteArraySet.class) {
@SuppressWarnings("unchecked") CopyOnWriteArraySet<E> cc =
(CopyOnWriteArraySet<E>)c;
al = new CopyOnWriteArrayList<E>(cc.al);
}
else {
al = new CopyOnWriteArrayList<E>();
al.addAllAbsent(c);
}
}
线程安全的Set,底层使用CopyOnWriteArrayList实现。 |
---|
唯一不同在于,使用addIfAbsent()添加元素,会遍历数组, | 如存在元素,则不添加(扔掉副本) |
6 ConcurrentHashMap
简单理解为线程安全的HashMap |
---|
初始容量默认为16段(Segment) ,使用分段锁设计 | 不对整个Map加锁,而是为每个Segment加锁 | 当多个对象存入同-个Segment时,才需要互斥 | 最理想状态为1 6个对象分别存入16个Segment,并行数量16 | 使用方式与HashMap无异 |
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class TestConcurrentHashMap {
public static void main(String[ ] args) {
Map<String,String> map = new ConcurrentHashMap<String, String>();
}
}
注意:jdk1.8后,Oracle对ConcurrentHashMap做了调整,不再采用分段锁设计方式,而采用的是无锁算法(CAS)。 参考链接:https://www.cnblogs.com/williamjie/p/9099861.html.
ConcurrentHashMap示例代码
import java.util.concurrent.ConcurrentHashMap;
public class Demo6 {
public static void main(String[] args) {
ConcurrentHashMap<String, String> hashMap=new ConcurrentHashMap<String, String>();
for(int i=0;i<5;i++) {
new Thread(new Runnable() {
@Override
public void run() {
for(int k=0;k<10;k++) {
hashMap.put(Thread.currentThread().getName()+"--"+k, k+"");
System.out.println(hashMap);
}
}
}).start();
}
}
}
7 Queue接口(Collection的子接口,表示队列FIFO(First In First Out)。)
常用方法: 抛出异常:
- boolean add(E e) //顺序添加-个元素(到达上限后,再添加则会抛出异常)
- E remove() //获得第一一个元素井移除(如果队列没有元素时,则抛异常)
- E element() //获得第- -个元素但不移除(如果队列没有元素时,则抛异常)
返回特殊值:推荐使用
- boolean offer(E e) //顺序添加- -个元素 (到达上限后,再添加则会返回false)
- E poll() //获得第一个元素并移除(如果队列没有元素时,则返回null)
- E keep( //获得第- 个元素但不移除 (如果队列没有元素
示例代码
import java.util.LinkedList;
import java.util.Queue;
public class Demo7 {
public static void main(String[] args) {
Queue<String> queue=new LinkedList<>();
queue.offer("苹果");
queue.offer("橘子");
queue.offer("葡萄");
queue.offer("西瓜");
queue.offer("榴莲");
System.out.println(queue.peek());
System.out.println("----------------");
System.out.println("元素个数:"+queue.size());
int size=queue.size();
for(int i=0;i<size;i++) {
System.out.println(queue.poll());
}
System.out.println("出队完毕:"+queue.size());
}
}
注意:LinkedList 是线程不安全的,如果需要用到多线程且安全的队列怎么办呢?
ConcurrentLinkedQueue
线程安全、可高效读写的队列,高并发下性能最好的队列。 无锁、CAS比较交换算法,修改的方法包含三个核心参数(V,E,N) V:要更新的变量、E:预期值、N:新值。 只有当V==E时, V=N;否则表示已被更新过,则取消当前操作。
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class TestConcurrentLinkedQueue {
public static void main(String[] args) {
Queue<String> queue = new ConcurrentLinkedQueue<String>();
queue.offer("Hello");
queue.offer("World");
queue.poll();
queue.peek();
}
}
ConcurrentLinkedQueue示例代码
import java.util.concurrent.ConcurrentLinkedQueue;
public class Demo8 {
public static void main(String[] args) throws Exception {
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
for(int i=1;i<=5;i++) {
queue.offer(i);
}
}
});
Thread t2=new Thread(new Runnable() {
@Override
public void run() {
for(int i=6;i<=10;i++) {
queue.offer(i);
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("-------------出队-------------");
int size=queue.size();
for(int i=0;i<size;i++) {
System.out.println(queue.poll());
}
}
}
8 BlockingQueue(阻塞队列`有界队列)
Queue的子接口,阻塞的队列,增加了两个线程状态为无限期等待的法。
方法:
- void put(E e) //将指定元素插入此队列中,如果没有可用空间,则等待。
- E take() //获取并移除此队列头部元素,如果没有可用元素,则等待。
可用于解决生产生、消费者问题。
ArrayBlockingQueue数组结构实现,有界队列。(手工固定. 上限)
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class TestArrayBlockingQueue {
public static void main(String[ ] args) {
BlockingQueue<String> abq = new ArrayBlockingQueue<String>(10);
}
}
LinkedBlockingQueue链表结构实现,无界队列。(默认. 上限Integer.MAX_ VALUE)
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TestLinkedBlockingQueue {
public static void main(String[] args) {
BlockingQueue<String> lbq = new LinkedBlockingQueue<String>();
}
}
ArrayBlockingQueue示例代码
import java.util.concurrent.ArrayBlockingQueue;
public class Demo9 {
public static void main(String[] args) throws Exception{
ArrayBlockingQueue<String> queue=new ArrayBlockingQueue<>(5);
queue.put("aaa");
queue.put("bbb");
queue.put("ccc");
queue.put("ddd");
queue.put("eee");
queue.take();
System.out.println("已经添加了"+queue.size()+"个元素");
queue.put("xyz");
System.out.println("已经添加了6个元素");
System.out.println(queue.toString());
}
}
LinkedBlockingQueue示例代码
import java.util.concurrent.LinkedBlockingQueue;
public class Demo10 {
public static void main(String[] args) {
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(6);
Thread t1=new Thread(new Runnable() {
@Override
public void run() {
for(int i=0;i<30;i++) {
try {
queue.put(i);
System.out.println(Thread.currentThread().getName()+"生产了第"+i+"号面包");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "生产者");
Thread t2=new Thread(new Runnable() {
@Override
public void run() {
for(int i=0;i<30;i++) {
try {
Integer num=queue.take();
System.out.println(Thread.currentThread().getName()+"消费了第"+i+"号面包");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "消费者");
t1.start();
t2.start();
}
}
总结
ExecutorService线程池接口、Executors工厂。 Callable线程任务、Future异步返回值。 Lock、ReentrantLock重入锁、 ReentrantReadWriteLock读写锁。 CopyOnWriteArrayList线程安全的ArrayList。 CopyOnWriteArraySet线程安全的Set。 ConcurrentHashMap线程安全的HashMap. ConcurrentLinkedQueue线程安全的Queue。 ArrayBlockingQueue线程安全的阻塞Queue。(生产者、 消费者)
|