JUC并发编程
一、什么是JUC
1. JUC简介
- JUC就是java.util.concurrent工具包的简称,这是一个处理线程的工具包,JDK1.5开始出现的。
2. 进程与线程
进程
正在运行的程序(软件)就是一个独立的进程。
线程是属于进程的,一个进程中可以同时运行多个线程。
进程中的多线程其实是并发和并行执行的。
线程
是操作系统能够进行运算调度的最小单位。
3. 线程的状态
3.1 线程状态枚举类
public enum State {
/**
* Thread state for a thread which has not yet started.
*/
NEW,
/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
RUNNABLE,
/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
BLOCKED,
/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called {@code Object.wait()}
* on an object is waiting for another thread to call
* {@code Object.notify()} or {@code Object.notifyAll()} on
* that object. A thread that has called {@code Thread.join()}
* is waiting for a specified thread to terminate.
*/
WAITING,
/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
TIMED_WAITING,
/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
TERMINATED;
}
3.2 wait和sleep的区别
3.2.1 所属类不同
- wait(): 属于Object类的方法,所有Java对象都可调用
- sleep(): 属于Thread类的静态方法,只能通过Thread类或线程对象调用
3.2.2 锁资源处理不同
- wait(): 释放当前持有的对象锁,允许其他线程获取该锁
- sleep(): 不释放锁资源,即使线程休眠,仍然保持对锁的占用
3.2.3 调用位置要求
- wait(): 必须在synchronized同步块或同步方法中调用,否则会抛出IllegalMonitorStateException
- sleep(): 可以在任何地方调用,无需在同步上下文中
3.2.4 唤醒机制不同
- wait(): 需要通过notify()/notifyAll()方法或中断来唤醒
- sleep(): 时间到后自动唤醒,或者可以通过interrupt()方法提前唤醒
3.2.5 使用场景不同
- wait(): 用于线程间协作与通信,实现线程间的等待/通知机制
- sleep(): 主要用于使当前线程暂停执行指定的时间,不涉及线程间通信
3.2.6 异常处理不同
- wait(): 抛出InterruptedException和IllegalMonitorStateException
- sleep(): 只抛出InterruptedException
3.3 并发与并行
3.3.1 串行模式
- 串行是一次只能取得一个任务,并执行这个任务
3.3.2 并行
- 在同一时刻,同时有多个线程在被CPU调度执行
3.3.3 并发
- 进程中的线程是由CPU负责调度执行的,但CPU能同时处理线程的数量有限,为了保证全部线程都能往前执行,CPU会轮询为系统的每个线程服务,由于CPU切换的速度很快,给我们的感觉这些线程在同时执行,这就是并发。
4. 管程
- 管程的描述为“Monitor”,操作系统中又称“监视器”,在线程中称之为“锁”
- Java多线程中的管程(Monitor)是一种同步机制,用于协调多个线程对共享资源的访问。作为Java并发编程的核心概念,管程提供了一种结构化的方式来实现线程间的互斥和同步。
4.1 基本概念
- 管程本质上是一种抽象数据类型,它封装了共享变量和对这些变量进行操作的过程集合。在Java中,每个对象都有一个与之关联的管程,用于控制对该对象的并发访问。
4.2 Java中管程的实现
synchronized关键字:Java中最基本的管程实现机制。当一个线程执行synchronized代码块或方法时,它获取对象的管程锁,其他线程必须等待锁被释放才能执行同样的代码块。
synchornized(Object){ //临界区 } //或者 public synchornized void method(){ //临界区 }
Object类的wait()、notify()和notifyAll()方法:这些方法提供了线程间协作的机制。wait()使线程进入等待状态并释放锁,notify()/notifyAll()唤醒等待的线程。
Lock接口和Condition接口:Java 5引入的更灵活的锁机制,允许更精细的线程控制。
Look look = new ReentrantLock(); Condition condition = lock.newCondition(); lock.lock(); try{ // 临界区代码 condition.await(); // 类似于wait() condition.signal(); // 类似于notify() }finally{ lock.unlock(); //确保锁被释放 }
4.3 管程特性
- 互斥访问:保证任一时刻只有一个线程能够执行临界区代码。
- 条件同步:通过条件变量(在Java中是wait/notify机制或Condition)支持线程间的协作。
- 封装性:将共享数据和对数据的操作封装在一起,提高代码的可维护性。
- 可重入性:Java的管程是可重入的,即同一个线程可以多次获取同一个锁。
4.4 管程与信号量的比较
- 相比于信号量,管程具有更高的抽象级别和更好的封装性,使并发编程更加结构化和安全。而信号量更为底层和灵活,但也更容易出错
5. 用户线程和守护线程
5.1 基本定义
- 用户线程:
- 也称为非守护线程,是Java应用程序的主要执行线程,如主线程(main thread)以及由它创建的子线程(默认情况下),可以理解为“自定义线程”
- 守护线程:
- 是一种在后台提供服务的线程,不会阻止JVM的退出。当所有用户线程结束时,无论守护线程是否结束,JVM都会退出。如垃圾回收
5.2 区别
5.2.1 生命周期管理
- 用户线程
- JVM会等待所有用户线程执行完毕后才会退出
- 用户线程的存在会阻止JVM的正常终止
- 当主线程结束时,如果有用户线程仍在运行,应用程序会继续执行
- 守护线程
- JVM不会等待守护线程执行完成
- 当所有用户线程执行完毕后,守护线程会被强制终止
- 守护线程不会阻止JVM的退出
5.2.2 创建方式
用户线程
Java中,线程默认创建为用户线程
守护线程
创建守护线程,需要在线程启动之前调用setDeamon()方法,传入true的boolean值
Thread thread2 = new Thread(() -> { for (int i = 1; i <= 10; i++) { System.out.println(Thread.currentThread().getName() + "--" + i); } }, "守护线程"); // 设置为守护线程 thread2.setDaemon(true); // 启动用户线程 thread.start(); // 启动守护线程 thread2.start();
注意:setDeamon()方法必须在start()方法之前调用,否则会抛出IllegalThreadStateException异常
5.2.3 优先级和资源分配
- 虽然从技术上讲,守护线程和用户线程的优先级机制相同,但在实际执行时:
- 用户线程通常获得更多的CPU时间和系统资源
- 守护线程的优先级往往较低,因为它们被设计为在后台执行辅助任务
5.2.4 应用场景
- 用户线程的应用场景:
- 执行应用程序的核心业务逻辑
- 处理用户交互
- 执行需要完整生命周期保证的任务
- 数据处理和计算任务
- 守护线程的应用场景:
- 后台支持服务,如垃圾回收器(GC)
- 定时任务调度
- 监控和维护任务
- 日志记录
- 资源清理
- 缓存维护
5.3 实际应用中的守护线程示例
- Java平台中的几个重要守护线程示例:
- 垃圾回收器线程(GC) - 最著名的守护线程,负责内存管理和垃圾回收
- JIT编译器线程 - 负责即时编译字节码为机器码
- Finalizer线程 - 执行对象的
finalize()
方法 - 引用处理线程 - 处理软引用、弱引用和虚引用
- 监控和管理线程 - 如JMX相关线程
5.4 使用守护线程的注意事项
- 资源清理问题:守护线程可能在任何时刻被终止,无法保证执行
finally
块或完成资源清理 - 不适合执行重要任务:不应将关键业务逻辑或需要可靠完成的任务放在守护线程中
- 状态继承:由守护线程创建的子线程默认也是守护线程
- 线程转换限制:线程启动后不能更改其守护状态,即不能将运行中的用户线程转换为守护线程,反之亦然
二、Lock接口
1. 复习synchronized
标注在方法内
在实例方法中使用synchronized关键字,也就是同步代码块,它的作用范围为当前对象
在静态方法中使用synchronized关键字,也就是同步代码块,它的作用范围为类对象
标注在方法上
在实例方法中使用synchronized关键字,也就是同步方法,它的作用范围为当前对象
在静态方法中使用synchronized关键字,也就是同步方法,它的作用范围为类对象
使用synchronized实现卖票
package com.leon.juc.synchro; /**
ClassName:SellTickets
Package:com.leon.juc.synchro
Description:
*@Author: leon
@Version: 1.0
*/
public class SellTickets {public static void main(String[] args) {
Ticket ticket = new Ticket(); // 线程一 new Thread(() -> { while (ticket.getTicket() > 0){ try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } ticket.sellTickets(); } },"AA").start(); // 线程二 new Thread(() -> { while (ticket.getTicket() > 0){ try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } ticket.sellTickets(); } },"BB").start(); // 线程三 new Thread(() -> { while (ticket.getTicket() > 0){ try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } ticket.sellTickets(); } },"CC").start();
}
}
class Ticket {
// 设置票的数量
private int ticket = 30;// 卖票方法
public synchronized void sellTickets(){// 判断是否有票 if( ticket > 0 ){ // 打印输入日志 System.out.println(Thread.currentThread().getName() +" -- " + "卖出一张票,还剩"+ --ticket); }
}
public int getTicket() {
return ticket;
}
}
2. 什么是Lock接口
Lock锁是JDK5开始提供的一个新的锁定操作,通过它可以创建出锁对象进行加锁和解锁,更灵活、更方便、更强大。
Lock是接口,不能直接实例化,可以采用它的实现类ReentrantLock来构建Lock锁对象。
| 构造器 | 说明 |
| :——————–: | :——————: |
| public ReentrantLock() | 获得Lock的实现类对象 |Lock的常用方法
| 方法名称 | 说明 |
| :———–: | :—-: |
| void lock() | 获得锁 |
| void unlock() | 释放锁 |
3.使用Lock接口实现买票的例子
package com.leon.juc.lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* ClassName:SellTicket
* Package:com.leon.juc.lock
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class SellTicket {
public static void main(String[] args) {
Ticket ticket = new Ticket();
// 线程一
new Thread(() -> {
while (ticket.getTicket() > 0){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ticket.sellTickets();
}
},"AA").start();
// 线程二
new Thread(() -> {
while (ticket.getTicket() > 0){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ticket.sellTickets();
}
},"BB").start();
// 线程三
new Thread(() -> {
while (ticket.getTicket() > 0){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ticket.sellTickets();
}
},"CC").start();
}
}
class Ticket {
// 设置票的数量
private int ticket = 30;
// 创建Lock对象
private final Lock lock = new ReentrantLock();
// 卖票方法
public void sellTickets(){
try {
// 加锁
lock.lock();
// 判断是否有票
if( ticket > 0 ){
// 打印输入日志
System.out.println(Thread.currentThread().getName() +" -- lock --- " + "卖出一张票,还剩"+ --ticket);
}
} finally {
// 释放锁
lock.unlock();
}
}
public int getTicket() {
return ticket;
}
}
4. Lock和synchronized之间的区别
实现方式:
synchronized是Java关键字,属于JVM层面的实现。
Lock是接口,是JDK层面的实现,需要手动编码。
灵活性:
Lock提供了更多灵活性。
synchronized无法中断一个正在等待获取锁的线程
Lock可以通过tryLock()尝试非阻塞获取锁,也可以设置超时时间
Lock可以通过lockInterruptibly()响应中断
公平性:
synchronized是非公平锁。
Lock可以通过构造函数指定为公平锁。
锁状态可查询:
Lock可以通过tryLock()查询锁状态;synchronized无法做到。
锁释放:
synchronized在同步代码块执行完或异常时自动释放锁
Lock需要手动释放,通常在finally块中调用unlock(),否则可能导致死锁
锁类型:
synchronized只有一种模式:独占锁
Lock接口的实现类提供了读写锁等多种模式(如ReentrantReadWriteLock)
性能:
在Java 6之后,synchronized进行了优化(偏向锁、轻量级锁、自旋锁),性能已接近Lock,但在复杂场景下Lock仍有优势。
条件变量:
synchronized使用Object的wait()/notify()机制
Lock可使用Condition接口,支持多个条件变量
三、线程之间的通信
1. 使用synchronized实现线程之间的通信
package com.leon.juc.synchro;
/**
* ClassName:Communication
* Package:com.leon.juc.synchro
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class Communication {
public static void main(String[] args) {
Share share = new Share();
// 线程一
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
// 进行加一
share.incr();
}
}, "AA").start();
// 线程二
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
// 进行减一
share.decr();
}
}, "BB").start();
// 线程三
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
// 进行加一
share.incr();
}
}, "CC").start();
// 线程四
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
// 进行减一
share.decr();
}
}, "DD").start();
}
}
class Share {
// 设置操作资源
private int num;
// 加一操作
public void incr() {
// 加锁
synchronized (this) {
// 这个使用while以防虚假唤醒
while (num != 0) {
try {
// 进行等待
this.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 数字进行加一操作
num++;
// 打印日志信息
System.out.println(Thread.currentThread().getName() + " -- 进行了加一 " + num);
// 唤醒其他线程
this.notifyAll();
}
}
// 减一操作
public void decr() {
// 加锁
synchronized (this) {
// 这里使用while以防虚假唤醒
while (num == 0) {
try {
// 等待
this.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 数字进行减一
num--;
// 输出打印日志
System.out.println(Thread.currentThread().getName() + " === 进行了减一 "+num );
// 唤醒其他线程
this.notifyAll();
}
}
}
2. 使用Lock实现线程之间的通信
package com.leon.juc.lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* ClassName:Communication
* Package:com.leon.juc.lock
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class Communication {
public static void main(String[] args) {
Share share = new Share();
// 线程一
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
// 进行加一
share.incr();
}
}, "AA").start();
// 线程二
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
// 进行减一
share.decr();
}
}, "BB").start();
// 线程三
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
// 进行加一
share.incr();
}
}, "CC").start();
// 线程四
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
// 进行减一
share.decr();
}
}, "DD").start();
}
}
class Share {
// 设置操作资源
private int num;
// 创建Lock对象
private Lock lock = new ReentrantLock();
// 创建Condition对象
private Condition condition = lock.newCondition();
// 加一操作
public void incr() {
try {
// 加锁
lock.lock();
// 这个使用while以防虚假唤醒
while (num != 0) {
// 等待
condition.await();
}
// 数字进行加一操作
num++;
// 打印日志信息
System.out.println(Thread.currentThread().getName() + " -- 进行了加一 " + num);
// 唤醒其他线程
condition.signalAll();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// 释放锁
lock.unlock();
}
}
// 减一操作
public void decr() {
try {
// 加锁
lock.lock();
// 这里使用while以防虚假唤醒
while (num == 0) {
// 等待
condition.await();
}
// 数字进行减一
num--;
// 输出打印日志
System.out.println(Thread.currentThread().getName() + " === 进行了减一 " + num);
// 唤醒其他线程
condition.signalAll();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// 释放锁
lock.unlock();
}
}
}
3. 多线程编程步骤
- 第一步:
- 创建资源类,在资源类创建属性和操作方法
- 第二步:
- 资源类操作方法
- 判断
- 操作
- 通知
- 第三步:
- 创建多个线程,调用资源类的操作方法
- 第四步:
- 防止虚假唤醒问题
四、线程间定制化通信
package com.leon.juc.lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* ClassName:CustomizationCommunication
* Package:com.leon.juc.lock
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class CustomizationCommunication {
public static void main(String[] args) {
LockCommunication lockCommunication = new LockCommunication();
// 线程一
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
// 进行打印日志
lockCommunication.print1(i);
}
}, "AA").start();
// 线程二
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
// 进行打印日志
lockCommunication.print2(i);
}
}, "BB").start();
// 线程三
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
// 进行打印日志
lockCommunication.print3(i);
}
}, "CC").start();
}
}
class LockCommunication{
// 设置初始旗帜
private int flag = 1;
// 创建Lock对象
private Lock lock = new ReentrantLock();
// 创建condition 对象
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void print1(int loop){
try {
// 加锁
lock.lock();
while (flag != 1){
// 睡眠
condition1.await();
}
// 循环遍历日志
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + " :: " + i + "--> 轮数 : " + loop);
}
//将旗帜设置为2
flag = 2 ;
// 唤醒第二线程
condition2.signal();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// 释放锁
lock.unlock();
}
}
public void print2(int loop){
try {
// 加锁
lock.lock();
while (flag != 2){
// 睡眠
condition2.await();
}
// 循环遍历日志
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + " :: " + i + "--> 轮数 : " + loop);
}
//将旗帜设置为2
flag = 3 ;
// 唤醒第二线程
condition3.signal();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// 释放锁
lock.unlock();
}
}
public void print3(int loop){
try {
// 加锁
lock.lock();
while (flag != 3){
// 睡眠
condition3.await();
}
// 循环遍历日志
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + " :: " + i + "--> 轮数 : " + loop);
}
//将旗帜设置为2
flag = 1 ;
// 唤醒第二线程
condition1.signal();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// 释放锁
lock.unlock();
}
}
}
五、集合的线程安全
1. List集合不安全演示
package com.leon.juc.collection;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* ClassName:CoolectionNotSafetyTest
* Package:com.leon.juc.collection
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class CollectionNotSafetyTest {
public static void main(String[] args) {
// 创建一个list 集合
List<String> list = new ArrayList<>();
for (int i = 1; i <= 20; i++) {
new Thread(() -> {
list.add(String.valueOf(new Random().nextInt(1000)));
System.out.println(list);
}).start();
}
}
}
1.1 解决方案-Vector
package com.leon.juc.collection;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* ClassName:CoolectionNotSafetyTest
* Package:com.leon.juc.collection
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class CollectionNotSafetyTest {
public static void main(String[] args) {
// 创建一个list 集合
//List<String> list = new ArrayList<>();
// 创建一个Vector解决线程安全问题
List<String> list = new Vector<>();
for (int i = 1; i <= 20; i++) {
new Thread(() -> {
list.add(String.valueOf(new Random().nextInt(1000)));
System.out.println(list);
}).start();
}
}
}
1.2 解决方案-Collections
package com.leon.juc.collection;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* ClassName:CoolectionNotSafetyTest
* Package:com.leon.juc.collection
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class CollectionNotSafetyTest {
public static void main(String[] args) {
// 创建一个list 集合
//List<String> list = new ArrayList<>();
// 创建一个Vector解决线程安全问题
//List<String> list = new Vector<>();
// 使用Collections解决线程安全问题
List<String> list = Collections.synchronizedList(new ArrayList<>());
for (int i = 1; i <= 20; i++) {
new Thread(() -> {
list.add(String.valueOf(new Random().nextInt(1000)));
System.out.println(list);
}).start();
}
}
}
1.3 解决方案-CopyOnWriteArrayList
package com.leon.juc.collection;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* ClassName:CoolectionNotSafetyTest
* Package:com.leon.juc.collection
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class CollectionNotSafetyTest {
public static void main(String[] args) {
// 创建一个list 集合
//List<String> list = new ArrayList<>();
// 创建一个Vector解决线程安全问题
//List<String> list = new Vector<>();
// 使用Collections解决线程安全问题
//List<String> list = Collections.synchronizedList(new ArrayList<>());
// 创建CopyOnWriteArrayList对象
List<String> list = new CopyOnWriteArrayList();
for (int i = 1; i <= 20; i++) {
new Thread(() -> {
list.add(String.valueOf(new Random().nextInt(1000)));
System.out.println(list);
}).start();
}
}
}
2. HashSet线程不安全演示
package com.leon.juc.collection;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
/**
* ClassName:HashSetNotSafetyTest
* Package:com.leon.juc.collection
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class HashSetNotSafetyTest {
public static void main(String[] args) {
// 创建一个HashSet集合
Set<String> set = new HashSet<>();
for (int i = 1; i <= 20; i++) {
new Thread(() -> {
set.add(String.valueOf(new Random().nextInt(1000)));
System.out.println(set);
}).start();
}
}
}
2.1 解决方案CopyOnWriteArraySet
package com.leon.juc.collection;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* ClassName:HashSetNotSafetyTest
* Package:com.leon.juc.collection
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class HashSetNotSafetyTest {
public static void main(String[] args) {
// 创建一个HashSet集合
//Set<String> set = new HashSet<>();
// 创建一个Set集合
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 1; i <= 20; i++) {
new Thread(() -> {
set.add(String.valueOf(new Random().nextInt(1000)));
System.out.println(set);
}).start();
}
}
}
3. HashMap线程不安全演示
package com.leon.juc.collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
/**
* ClassName:HashMapNotSafetyTest
* Package:com.leon.juc.collection
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class HashMapNotSafetyTest {
public static void main(String[] args) {
// 创建一个HashMap
Map<String,String> map = new HashMap<>();
for (int i = 1; i <= 20; i++) {
final int num = i ;
new Thread(() -> {
map.put(String.valueOf(num),String.valueOf(new Random().nextInt(1000)));
System.out.println(map);
}).start();
}
}
}
3.1 解决方案ConcurrentHashMap
package com.leon.juc.collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
/**
* ClassName:HashMapNotSafetyTest
* Package:com.leon.juc.collection
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class HashMapNotSafetyTest {
public static void main(String[] args) {
// 创建一个HashMap
//Map<String,String> map = new HashMap<>();
// 创建一个ConcurrentHashMap解决线程安全问题
Map<String,String> map = new ConcurrentHashMap<>();
for (int i = 1; i <= 20; i++) {
final int num = i ;
new Thread(() -> {
map.put(String.valueOf(num),String.valueOf(new Random().nextInt(1000)));
System.out.println(map);
}).start();
}
}
}
六、多线程锁
1. 八锁问题
package com.leon.juc.eightlock;
/**
* ClassName:EigthLockTest
* Package:com.leon.juc.eightlock
* Description:
*
* @Author: leon
* @Version: 1.0
*/
/*
1. 标准访问,先打印短信还是邮件
先打印短信
2. 停4秒在短信方法内,先打印短信还是邮件
先打印短信
3. 新增普通的hello方法,先打印短信还是hello
先打印hello
4. 现在有两部手机,先打印短信还是邮件
先打印邮件
5. 两个静态同步方法,1部手机,先打印短信还是邮件
先打印短信
6. 两个静态同步方法,2部手机,先打印短信还是邮件
先打印短信
7. 一个静态同步方法,一个普通同步方法,一部手机,先打印短信还是邮件
先打印邮件
8. 一个静态同步方法,一个普通同步方法,二部手机,先打印短信还是邮件
先打印邮件
*/
public class EightLockTest {
public static void main(String[] args) {
Phone phone = new Phone();
//Phone phone2 = new Phone();
new Thread(() -> {
phone.sendSMS();
},"AA").start();
new Thread(() -> {
phone.sendEmail();
},"AA").start();
}
}
class Phone{
public static synchronized void sendSMS(){
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Phone ........ sendSMS");
}
public /*static*/ synchronized void sendEmail(){
System.out.println("Phone ........ sendEmail");
}
public void getHello(){
System.out.println("Phone ........ getHello");
}
}
1. 1总结
标注在方法内
在实例方法中使用synchronized关键字,也就是同步代码块,它的作用范围为当前对象
在静态方法中使用synchronized关键字,也就是同步代码块,它的作用范围为类对象
标注在方法上
在实例方法中使用synchronized关键字,也就是同步方法,它的作用范围为当前对象
在静态方法中使用synchronized关键字,也就是同步方法,它的作用范围为类对象
2. 公平锁和非公平锁
package com.leon.juc.lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* ClassName:SellTicket
* Package:com.leon.juc.lock
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class SellTicket {
public static void main(String[] args) {
Ticket ticket = new Ticket();
// 线程一
new Thread(() -> {
while (ticket.getTicket() > 0){
//try {
// Thread.sleep(100);
//} catch (InterruptedException e) {
// throw new RuntimeException(e);
//}
ticket.sellTickets();
}
},"AA").start();
// 线程二
new Thread(() -> {
while (ticket.getTicket() > 0){
//try {
// Thread.sleep(100);
//} catch (InterruptedException e) {
// throw new RuntimeException(e);
//}
ticket.sellTickets();
}
},"BB").start();
// 线程三
new Thread(() -> {
while (ticket.getTicket() > 0){
//try {
// Thread.sleep(100);
//} catch (InterruptedException e) {
// throw new RuntimeException(e);
//}
ticket.sellTickets();
}
},"CC").start();
}
}
class Ticket {
// 设置票的数量
private int ticket = 30;
// 创建Lock对象
// 在创建ReentrantLock对象时,构造器不传入值和传入false值时,都是非公平锁
// 传入true时为公平锁
private final Lock lock = new ReentrantLock(true);
// 卖票方法
public void sellTickets(){
try {
// 加锁
lock.lock();
// 判断是否有票
if( ticket > 0 ){
// 打印输入日志
System.out.println(Thread.currentThread().getName() +" -- lock --- " + "卖出一张票,还剩"+ --ticket);
}
} finally {
// 释放锁
lock.unlock();
}
}
public int getTicket() {
return ticket;
}
}
2.1 公平锁和非公平锁的区别
2.1.1 基本概念
- 公平锁:线程按照请求锁的顺序获取锁,遵循先来先得的原则,在锁被释放后,等待时间最长的线程优先获得锁。
- 非公平锁:线程获取锁的顺序不是按照请求锁的顺序,而是在锁被释放后,任何等待的线程都有机会竞争获取锁,可能导致某些线程"饿死”。
2.1.2 具体区别
获取锁的机制
公平锁:当线程请求获取锁时,会先检查锁等待队列,如果队列中有其他线程在等待,当前线程会加入队列尾部等待
非公平锁:线程请求锁时,会先尝试获取锁,如果此时锁恰好可用,则直接获取成功,不考虑是否有其他线程在等待
性能表现
公平锁:整体吞吐量较低,因为涉及更多的上下文切换和线程调度
非公平锁:整体吞吐量更高,因为减少了线程切换和调度的开销
等待时间
公平锁:各线程等待时间较为均衡,避免了饥饿现象
非公平锁:某些线程可能长时间无法获取锁(饥饿现象),但整体平均等待时间更短
使用场景
公平锁:适用于对线程等待公平性要求高的场景,如事务处理系统
非公平锁:适用于追求高吞吐量的场景,如高并发系统
内部实现差异
公平锁:在获取锁时会调用
hasQueuedPredecessors()
检查是否有线程在等待非公平锁:直接通过CAS尝试获取锁,不检查等待队列
唤醒机制
公平锁:严格按FIFO顺序唤醒等待的线程
非公平锁:被唤醒的线程与新到达的线程共同竞争锁资源
实现方式
// 创建Lock对象 // 在创建ReentrantLock对象时,构造器不传入值和传入false值时,都是非公平锁 private final Lock lock = new ReentrantLock(); private final Lock lock = new ReentrantLock(false); // 传入true时为公平锁 private final Lock lock = new ReentrantLock(true);
2.1.3 性能影响
- 公平锁通常比非公平锁性能差10-20%,主要原因是:
- 公平锁需要维护一个有序队列
- 线程唤醒和切换的开销较大
- 非公平锁利用了"插队"机制减少了一部分线程切换
2.1.4 选择建议
- 默认情况下优先使用非公平锁(ReentrantLock的默认实现)
- 当系统中出现线程饥饿问题,或者业务要求严格按请求顺序处理时,考虑使用公平锁
- 在实际应用中,通常需要通过性能测试来确定最适合的锁策略
3. 可重入锁
3.1 synchronized方式
package com.leon.juc.synchro;
/**
* ClassName:ReturnSynchronized
* Package:com.leon.juc.synchro
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class ReturnSynchronized {
public static void main(String[] args) {
Object o = new Object();
new Thread(() ->{
synchronized (o){
System.out.println(Thread.currentThread().getName() + "---这是最外层");
synchronized (o){
System.out.println(Thread.currentThread().getName() + "---这是中间层");
synchronized (o){
System.out.println(Thread.currentThread().getName() + "---这是内层");
}
}
}
},"AA").start();
}
}
3.2 Lock接口方式
package com.leon.juc.lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* ClassName:ReturnLock
* Package:com.leon.juc.lock
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class ReturnLock {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
new Thread(() -> {
try {
// 加锁
lock.lock();
System.out.println(Thread.currentThread().getName() + " -- 最外层" );
try {
// 加锁
lock.lock();
System.out.println(Thread.currentThread().getName() + " -- 中间层" );
try {
// 加锁
lock.lock();
System.out.println(Thread.currentThread().getName() + " -- 内层" );
} finally {
// 释放锁
lock.unlock();
}
} finally {
// 释放锁
lock.unlock();
}
} finally {
// 释放锁
lock.unlock();
}
},"AA").start();
}
}
4. 死锁
- 死锁是多线程编程中的一种常见且严重的问题,指两个或多个线程互相等待对方持有的资源,导致所有相关线程都无法继续执行的状态。
4.1 死锁的基本特征
- 互斥条件:
- 资源不能被多个线程同时使用
- 占有并等待:
- 线程持有一部分资源,同时等待其他资源
- 不可抢占:
- 线程获取的资源在未主动释放前,其他线程无法强制获取
- 循环等待:
- 多个线程形成一个循环等待链,每个线程都在等待下一个线程持有的资源
4.2 死锁常见原因
- 资源竞争:
- 多线程同时争夺有限资源
- 嵌套锁:
- 线程在持有一个锁的同时请求另一个锁
- 锁顺序不一致:
- 不同线程获取多个锁的顺序不同
- 无限等待:
- 线程因某种原因无限期等待一个永远不会释放的锁
- 数据库事务死锁:
- 涉及多个数据库操作的事务互相阻塞
4.3 死锁预防策略
- 资源有序分配
- 对资源进行编号,所有线程按照相同的顺序请求资源
- 避免嵌套锁
- 尽量减少同时持有多个锁的情况
- 使用复合操作减少锁的交叉使用
- 使用显示锁的高级特性
- 使用
tryLock()
方法和超时设置避免无限等待 - 使用并发工具类
- 优先使用
java.util.concurrent
包中的高级并发工具 - 使用
ConcurrentHashMap
代替HashMap
配合synchronized
- 限时等待
- 避免无限期等待资源
- 在等待超时后主动释放已持有的资源并重试
- 使用锁层次结构
- 定义锁的层次结构并强制执行策略,不允许低层次的锁嵌套高层次的锁
4.4 死锁处理方式
- 重启应用:
- 最简单但不优雅的解决方案
- 终止部分线程:
- 识别并终止死锁循环中的一个或多个线程
- 资源抢占:
- 强制某些线程释放资源
- 回滚操作:
- 使用事务机制,回滚到死锁发生前的状态
4.5 实际开发建议
- 在设计多线程应用时必须考虑死锁风险
- 尽量避免同时获取多个锁
- 保持锁持有时间尽可能短
- 使用
java.util.concurrent
包提供的工具类 - 在开发测试阶段主动检测死锁
- 记录和监控线程状态,建立死锁自动检测机制
- 实现定期释放锁的机制,防止资源被无限期占用
- 进行压力测试,模拟高并发情况下的锁竞争
4.6 死锁复现
Lock锁死锁
package com.leon.juc.lock; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /**
ClassName:DeathLock
Package:com.leon.juc.lock
Description:
*@Author: leon
@Version: 1.0
*/
public class DeathLock {public static void main(String[] args) {
// 锁一 Lock lock1 = new ReentrantLock(); // 锁二 Lock lock2 = new ReentrantLock(); new Thread(() -> { try { // 加锁 lock1.lock(); System.out.println(Thread.currentThread().getName()+ "尝试获取锁 lock2"); try { // 加锁 lock2.lock(); System.out.println(Thread.currentThread().getName()+ "获取锁 lock2 成功"); } finally { // 释放锁 lock2.unlock(); } } finally { // 释放锁 lock1.unlock(); } },"AA").start(); new Thread(() -> { try { // 加锁 lock2.lock(); System.out.println(Thread.currentThread().getName()+ "尝试获取锁 lock1"); try { // 加锁 lock1.lock(); System.out.println(Thread.currentThread().getName()+ "获取锁 lock1 成功"); } finally { // 释放锁 lock1.unlock(); } } finally { // 释放锁 lock2.unlock(); } },"BB").start();
}
}
synchronized锁死锁
package com.leon.juc.synchro; /**
ClassName:DeathSynchronized
Package:com.leon.juc.synchro
Description:
*@Author: leon
@Version: 1.0
*/
public class DeathSynchronized {public static void main(String[] args) {
// 对象一 Object o1 = new Object(); // 对象二 Object o2 = new Object(); new Thread(() -> { synchronized (o1){ System.out.println(Thread.currentThread().getName() + "尝试获取锁对象o2"); synchronized (o2){ System.out.println(Thread.currentThread().getName() + "获取锁对象o2成功"); } } },"AA").start(); new Thread(() -> { synchronized (o2){ System.out.println(Thread.currentThread().getName() + "尝试获取锁对象o2"); synchronized (o1){ System.out.println(Thread.currentThread().getName() + "获取锁对象o2成功"); } } },"BB").start();
}
}
4.7 验证是否时死锁
- jps
- 通过该指令,获取到当前对象执行的进程号
- jstack
- jvm自带的指令,通过jps查询到的进程号,查询该进程号的堆栈信息
七、Callable接口
package com.leon.juc.callable;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* ClassName:CallableTest
* Package:com.leon.juc.callable
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class CallableTest {
public static void main(String[] args){
// 创建一个Callable接口
Callable callable = new MyCallable();
// 将Callable接口传入FutureTask构造器
FutureTask<Integer> futureTask = new FutureTask<>(callable);
// 创建一个新FutureTask,传入新的Callable对象
FutureTask<String> futureTask2 = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName() + " --- call2");
return Thread.currentThread().getName();
});
// 将FutureTask传入到Thread构造器中
new Thread(futureTask,"AA").start();
new Thread(futureTask2,"BB").start();
// 判断是否有值,没有则等待
while (!futureTask.isDone()){
System.out.println("-----wait-----");
}
try {
// 获取返回值
System.out.println(futureTask.get());
System.out.println(futureTask2.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
class MyCallable implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName() + " --- call" );
return 200;
}
}
八、JUC强大的辅助类
1. CountDownLatch(减少计数)
package com.leon.juc.assistjuc;
import java.util.concurrent.CountDownLatch;
/**
* ClassName:CountDownLacth
* Package:com.leon.juc.assistjuc
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class CountDownLatchTest {
public static void main(String[] args) {
// 创建一个减少计数的对象
CountDownLatch countDownLatch = new CountDownLatch(6);
// 循环创建线程
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
// 输出日志
System.out.println(Thread.currentThread().getName() + "走出教室");
// 建设计数
countDownLatch.countDown();
},String.valueOf(i)).start();
}
try {
// 等待计数完成
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "关闭教室门");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
2. CyclicBarrier(循环栅栏)
package com.leon.juc.assistjuc;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* ClassName:CyclicBarrirTest
* Package:com.leon.juc.assistjuc
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class CyclicBarrierTest {
public static void main(String[] args) {
// 创建循环栅栏对象
//CyclicBarrier cyclicBarrier = new CyclicBarrier(7,() -> {
// System.out.println("集齐七颗龙珠,召唤神龙~~~~~");
//});
CyclicBarrier cyclicBarrier = new CyclicBarrier(7);
// 循环打印线程
for (int i = 1; i <= 7 ; i++) {
new Thread(()->{
//打印日志
System.out.println("集齐了" + Thread.currentThread().getName() + "龙珠");
try {
// 等待
cyclicBarrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
},String.valueOf(i)).start();
}
}
}
3. Semaphore(信号灯)
package com.leon.juc.assistjuc;
import java.util.concurrent.Semaphore;
/**
* ClassName:SemaphoreTest
* Package:com.leon.juc.assistjuc
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class SemaphoreTest {
public static void main(String[] args) {
// 创建信号灯对象,并设置许可数量
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 10 ; i++) {
new Thread(() -> {
try {
// 获取许可
semaphore.acquire();
// 打印日志
System.out.println(Thread.currentThread().getName() +" 获取许可-----||||");
// 睡眠
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() +" 释放许可-----||||");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
// 释放许可
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
九、ReentrantReadWriteLock 读写锁
1. 锁的描述
悲观锁
在多线程中,当有线程获取到锁之后,对资源进行操作。其他线程需等待,等其操作数据完毕,剩余的线程去获取锁,然后操作数据。
乐观锁
在多线程中,所有线程去操作数据,当有线程修改数据之后,会设置新的版本号,如果其他线程需要修改,则需要进行版本号判断,查看当前线程所持有的版本号是否匹配,匹配则可以进行操作数据,然后设置新的版本号,不匹配则重新获取数据,持有最新的版本号去修改数据,最后再设置新的版本号。
表锁
在多线程中,当有线程在操作目标表时,其他线程需要等待,等该线程操作完之后,剩余的线程获取锁,然后进行操作。不会发生死锁。
行锁
在多线程中,当有线程在操作目标行时,其他线程需要等待,等该线程操作完之后,剩余的线程去获取锁,然后进行操作。会发生死锁。
读锁
读锁是共享的锁,当有线程读取数据时,其他线程也可以读数据。会发生死锁
写锁
写锁是独享锁,当有线程操作数据时,其他线程则需要等待,然后再去获取锁。会发生死锁。
2. 读写锁案例演示
package com.leon.juc.readwrite;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* ClassName:ReadWriteTest
* Package:com.leon.juc.readwrite
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class ReadWriteTest {
public static void main(String[] args) {
//
Cache cache = new Cache();
for (int i = 1; i <= 10; i++) {
final int num = i ;
// 创建线程写数据
new Thread(() -> {
cache.put(String.valueOf(num),num);
},String.valueOf(i)).start();
}
for (int i = 1; i <= 10; i++) {
final int num = i ;
// 创建线程写数据
new Thread(() -> {
cache.get(String.valueOf(num));
},String.valueOf(i)).start();
}
}
}
class Cache{
// 用于存放数据的容器
private final Map<String,Object> map = new HashMap<>();
// 创建读写锁对象
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// 写操作
public void put(String key,Object value){
try {
// 加锁-写锁
readWriteLock.writeLock().lock();
// 打印日志
System.out.println(Thread.currentThread().getName() + " 正在写数据 " + key);
// 睡眠
TimeUnit.MILLISECONDS.sleep(300);
// 存放数据
map.put(key,value);
// 打印日志
System.out.println(Thread.currentThread().getName() + " 写完数据 ");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally{
// 释放锁
readWriteLock.writeLock().unlock();
}
}
// 读操作
public Object get(String key){
Object result = null;
try {
// 加锁-读锁
readWriteLock.readLock().lock();
// 打印日志
System.out.println(Thread.currentThread().getName() + " 正在读数据 " + key);
// 睡眠
TimeUnit.MILLISECONDS.sleep(300);
// 获取数据
result = map.get(key);
// 打印日志
System.out.println(Thread.currentThread().getName() + " 读完数据 ");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally{
// 释放锁
readWriteLock.readLock().unlock();
}
// 返回结果
return result;
}
}
3. 读写锁深入
读写锁
一个资源可以被多个读线程访问,或者被一个写线程访问,但是不能同时存在读写线程,读和写是互斥的,读是共享的
读写锁的演变
无锁
- 多线程抢夺资源,资源不安全
synchronized锁
- 独享的,每次执行只能有一个线程
ReadWriteLock
- 读锁是共享的,所有线程都可以一起读操作
- 写操作是独享的,只能有一个线程进行写操作
- 缺点:
- 容易造成线程饥饿,就是一直都是读,没有写
- 读的时候不可以写,只有读完之后才可以写,写的时候可以读
4. 锁降级
- 将写锁降级为读锁。
写锁 --> 读锁 --> 释放写锁 --> 释放读锁
- 锁可以降级,但不能说升级,也就是读锁不能升级为写锁,写锁可以降级为读锁
- 演示代码
package com.leon.juc.readwrite;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* ClassName:ReadWriteTest2
* Package:com.leon.juc.readwrite
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class ReadWriteTest2 {
public static void main(String[] args) {
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
new Thread(() -> {
try {
// 加锁-写锁
readWriteLock.writeLock().lock();
// 打印日志
System.out.println(Thread.currentThread().getName() + "-----正在写操作-----");
try {
// 加锁-读锁
readWriteLock.readLock().lock();
// 打印日志
System.out.println(Thread.currentThread().getName() + "-----正在读操作-----");
} finally {
// 释放锁
readWriteLock.readLock().unlock();
}
} finally {
// 释放锁
readWriteLock.writeLock().unlock();
}
},"AA").start();
new Thread(() -> {
try {
// 加锁-写锁
readWriteLock.readLock().lock();
// 打印日志
System.out.println(Thread.currentThread().getName() + "-----正在读操作-----");
try {
// 加锁-读锁
readWriteLock.writeLock().lock();
// 打印日志
System.out.println(Thread.currentThread().getName() + "-----正在写操作-----");
} finally {
// 释放锁
readWriteLock.writeLock().unlock();
}
} finally {
// 释放锁
readWriteLock.readLock().unlock();
}
},"BB").start();
}
}
十、BlockingQueue阻塞队列
1. 阻塞队列的概述
- 队列
- 先进先出
- 栈
- 先进后出
2.阻塞队列分类
- ArrayBlockingQueue(常用)
- 有数组结构组成的有界阻塞队列
- LinkedBlockQueue(常用)
- 由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列
- DelayQueue
- 使用优先级队列实现的延迟无界阻塞队列
- PriorityBlockingQueue
- 支持优先级排序的无界阻塞队列
- SynchronousQueue
- 不存储元素的阻塞队列,也就是单个元素的队列
- LinkedTrantferQueue
- 由链表组成的无界阻塞队列
- LinkedBlockingDeque
- 由链表组成的双向阻塞队列
3.阻塞队列的核心方法
| 方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
| :——: | :——-: | :——: | :—-: | :—————-: |
| 插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
| 移除 | remove() | poll() | take() | poll(time,unit) |
| 检查 | element() | peek() | 不可用 | 不可用 |
| 抛出异常 | 当阻塞队列满时,再往队列中添加元素会抛IllegalStateException: Queue full
当阻塞队列为空时,再往队列中移除元素会抛NoSuchElementException |
| ———— | :———————————————————– |
| 特殊值 | 插入方法,成功返回true失败返回false
移除方法,成功返回出队列的元素,队列没有元素返回null |
| 阻塞 | 当阻塞队列满时,生产者线程继续往队列里添加元素,队列会一直阻塞生产者线程直到添加数据成功或响应中断
当阻塞队列空时,消费者线程试图从队列里移除元素,队列会一直阻塞消费者线程直到队列可用 |
| 超时退出 | 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过限时后生产者线程会退出 |
4. 阻塞对列实例演示
package com.leon.juc.queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* ClassName:QueueTest
* Package:com.leon.juc.queue
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class QueueTest {
public static void main(String[] args) {
// 创建一个容量只有3的阻塞队列
ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
// 添加元素进阻塞队列
arrayBlockingQueue.add("aa");
arrayBlockingQueue.add("bb");
arrayBlockingQueue.add("cc");
// 超容量添加
arrayBlockingQueue.add("dd");
// 抛出异常-空移除
arrayBlockingQueue.remove();
// 抛出异常-空检查
String element = arrayBlockingQueue.element();
System.out.println(element);
// 超出容量添加-是否成功
boolean dd = arrayBlockingQueue.offer("dd");
System.out.println(dd);
// 为空移除-受否成功
String poll = arrayBlockingQueue.poll();
System.out.println(poll);
// 为空检查是否移除
String peek = arrayBlockingQueue.peek();
System.out.println(peek);
try {
// 容量满时-添加是否阻塞
arrayBlockingQueue.put("dd");
// 容量为空-移除是否阻塞
arrayBlockingQueue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
// 容量满时-添加是否按照预定时间阻塞
arrayBlockingQueue.offer("dd",1L, TimeUnit.SECONDS);
// 容量为空时-移除是否按照预定时间阻塞
arrayBlockingQueue.poll(1L,TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
十一、ThreadPool线程池
1. 什么是线程池
- 线程池就是一个可以复用线程的技术
2. 不使用线程池的问题
- 用户每发起一个请求,后台就需要创建一个新线程来处理,下次新任务来了肯定又要创建新线程处理的,而创建新线程的开销是很大的,并且请求过多时,肯定会产生大量的线程出来,这样会严重影响系统的性能。
3.线程池的工作原理
4. 谁代表了线程池?
- JDK5.0起提供了代表线程池的接口:ExecutorService
5. 如何得到线程池对象?
- 方式一
- 使用ExecutorService的实现类ThreadPoolExecutor自创建一个线程池对象
- 方式二
- 使用Executors(线程池的工具类)调用方法返回不同特点的线程池对象
6. ThreadPoolExecutor构造器
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- 参数一:int corePoolSize :指定线程池的核心线程的数量
- 参数二:int maximumPoolSize:指定线程池的最大线程数量
- 参数三:long keepAliveTime:指定临时线程的存活时间
- 参数四:TimeUnit unit:指定临时线程存活的时间单位(秒、分、时、天)
- 参数五:BlockingQueue< Runnable > workQueue:指定线程池的任务队列
- 参数六:ThreadFactory threadFactory:指定线程池的线程工厂
- 参数七:RejectedExecutionHandler handler:指定线程池的任务拒绝策略(线程都在忙,任务队列也满了的时候,新任务来了该怎么处理)
7. 注意事项
- 临时线程什么时候创建?(核心:任务队列满了,且还可以创建临时线程)
- 新任务提交时发现核心线程都在忙,任务队列也满了,并且还可以创建临时线程,此时才会创建临时线程。
- 什么时候会开始拒绝新任务?(核心:任务队列满了,临时线程也满了)
- 核心线程和临时线程都在忙,任务队列也满了,新任务过来的时候才会开始拒绝任务。
8. ExecutorService的常用方法
| 方法名称 | 说明 |
| ————————————– | ———————————————————— |
| void execute(Runnable command) | 执行Runnable任务 |
| Future< T > submit(Callable< T > task) | 执行Callable任务,返回未来任务对象,用于获取线程返回的结果 |
| void shutdown() | 等全部任务执行完毕后,再关闭线程池 |
| List< Runnable > shutdownNow() | 立即关闭线程池,停止正在执行的任务,并返回队列中未执行的任务 |
9. 新任务拒绝策略
| 策略 | 详解 |
| ————————————– | ———————————————————- |
| ThreadPoolExecutor.AbortPolicy | 丢弃任务并抛出RejectedExecutionException异常。是默认的策略 |
| ThreadPoolExecutor.DiscardPolicy | 丢弃任务,但是不抛出异常这是不推荐的做法 |
| ThreadPoolExecutor.DiscardOldestPolicy | 抛弃队列中等待最久的任务,然后把当前任务加入队列中 |
| ThreadPoolExecutor.CallerRunsPolicy | 由主线程负责调用任务的run()方法从而绕过线程池直接执行 |
10. Executors
- 是一个线程池的工具类,提供了很多静态方法用于返回不同特点的线程池对象
| 方法名称 | 说明 |
| ———————————————————— | ———————————————————— |
| public static ExecutorService newFixedThreadPool(int nThreads) | 创建固定线程数量的线程池,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程替代它 |
| public static ExecutorService newSingleThreadExecutor() | 创建只有一个线程的线程池对象,如果该线程出现异常而结束,那么线程池会补充一个新线程 |
| public static ExecutorService newCachedThreadPool() | 线程数量随着任务增加而增加,如果线程任务执行完毕且空闲了60s则会被回收掉 |
| public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) | 创建一个线程池,可以实现在给定的延迟后运行任务,或者定期执行任务 |
11. Executors使用可能存在的陷阱
- 大型并发系统环境中使用Executor如果不注意可能会出现系统风险。
- 阿里巴巴Java卡法手册
- 【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
- FixedThreadPool和SingleThreadPool:
- 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
- CachedThreadPool和ScheduledThreadPool:
- 允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
12. 使用Executors创建线程池
package com.leon.juc.executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* ClassName:Executor
* Package:com.leon.juc.executor
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class Executor {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
ExecutorService executorService2 = Executors.newFixedThreadPool(3);
ExecutorService executorService3 = Executors.newCachedThreadPool();
ExecutorService executorService4 = Executors.newScheduledThreadPool(3);
}
}
13. 自定义线程池
package com.leon.juc.executor;
import java.util.concurrent.*;
/**
* ClassName:ExecutorTest2
* Package:com.leon.juc.executor
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class ExecutorTest2 {
public static void main(String[] args) {
// 自定义线程池
ExecutorService executorService = new ThreadPoolExecutor(3,5,30L,
TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
// 提交任务线程
executorService.execute(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+" : : " + i);
}
});
executorService.execute(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+" : : " + i);
}
});
executorService.execute(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+" : : " + i);
}
});
// 复用线程,在队列中等待
executorService.execute(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+" : : " + i);
}
});
executorService.execute(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+" : : " + i);
}
});
executorService.execute(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+" : : " + i);
}
});
// 创建新的线程
executorService.execute(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+" : : " + i);
}
});
}
}
十二、Fork/Join分支合并框架
1. 实例演示
package com.leon.juc.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* ClassName:ForkJionTest
* Package:com.leon.juc.forkjion
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class ForkJoinTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建自定义分支合并对象
MyForkJoin myForkJoin = new MyForkJoin(1, 100);
// 直接调用方法
Integer compute = myForkJoin.compute();
System.out.println(compute);
// 创建分支合并池对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> submit = forkJoinPool.submit(myForkJoin);
// 获取最终合并之后结果
Integer result = submit.get();
System.out.println(result);
}
}
class MyForkJoin extends RecursiveTask<Integer>{
// 开头结尾的差值
private static final int VALUE = 10 ;
// 开头
private int start ;
// 结尾
private int end ;
// 最后的结果
private int result;
public MyForkJoin(int start, int end){
this.start = start ;
this.end = end ;
}
@Override
protected Integer compute() {
// 判断中间值是否符合条件
if((end - start) <= VALUE){
for (int i = start; i <= end; i++) {
result = result + i ;
}
}else {
// 获取中间值
int middle = (end + start) / 2;
// 拆分左边
MyForkJoin left = new MyForkJoin(start,middle);
// 拆分右边
MyForkJoin right = new MyForkJoin(middle+1 , end);
// 调用方法拆分
left.fork();
right.fork();
// 合并
result = left.join() + right.join();
}
return result;
}
}
十三、CompletableFuture异步回调
1. 实例演示
package com.leon.juc.synchronous;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* ClassName:ComletableTutureTest
* Package:com.leon.juc.synchronous
* Description:
*
* @Author: leon
* @Version: 1.0
*/
public class CompletableFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 异步调用没有放回值
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + "completableFuture1");
});
// 调用get方法
completableFuture1.get();
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "completableFuture2");
//int i= 1/0;
return 200;
});
// 能执行方法,获取不到返回值
completableFuture2.get();
// 执行方法,并返回值
completableFuture2.whenComplete((t,u) ->{
// 返回的是,方法返回值
System.out.println("completableFuture2: t ==" + t);
// 返回的是,异常
System.out.println("completableFuture2: u ==" + u);
});
}
}