1.学习内容
- 并发编程的发展历史
- 线程在java中的应用
- 多线程的实际应用场景
- 线程的生命周期
- 线程的基本操作-启动/停止
2.学习方法
==公式:场景->需求->解决方案->应用->原理==
例子:
Doubbo:
分布式-> 服务治理 -> dubbo -> dubbo应用 -> 了解原理
jvm(从计算机的发展带来的进程和线程的概念):
场景(提高资源的利用率)->需求->解决方案(进程、线程)->应用(java)->原理(启动、终止)
3.并发的发展历史
3.1真空管/穿孔打卡
计算机大部分时间处于空闲状态
3.2晶体管/批处理系统
- IO问题
- 造成CPU资源浪费(痛点)
- 需求-> 如何最大化的利用cpu资源
3.3集成电路/多道程序设计
进程A(阻塞)/ 进程B
为什么需要线程?
1.单核->多核->真正意义上达到并行计算
2.实时性需求->线程
4.线程在java中的应用
- Runnable接口
- Thread类(本质上是对Runnable接口的实现)
- Callable/Future带返回值的线程
- ThreadPool
==线程可以合理的利用多核心CPU资源,提高程序的吞吐量==
5.多线程的实际应用场景
==线程池、文件跑批、收益文件、对接文件==
demo案例:模拟处理请求案例通过多线程提高系统资源利用率 entity / interface
@Data
public class Request {
private String name;
}
/**
* @PackageName: com.raven.multithreaded
* @ClassName: IProcessor
* @Blame: raven
* @Date: 2021-08-12 14:29
* @Description: 抽象处理器接口
*/
public interface IRequestProcessor {
/**
* 处理请求
* @param request
*/
void process(Request request);
}
==V1 :未使用多线程== processor
/**
* @PackageName: com.raven.multithreaded
* @ClassName: PreProcessor
* @Blame: raven
* @Date: 2021-08-12 14:31
* @Description: 预处理请求
*/
public class PreProcessor implements IRequestProcessor {
/**
* 下一请求处理过程
*/
private IRequestProcessor nextProcessor;
public PreProcessor() {
}
/**
* 通过构造参数传递其他请求过程 达到链式调用的目的
* @param requestProcessor
*/
public PreProcessor(IRequestProcessor requestProcessor) {
this.nextProcessor = requestProcessor;
}
@Override
public void process(Request request) {
System.out.println("PreProcessor do sth");
if (Objects.nonNull(nextProcessor)){
nextProcessor.process(request);
}
}
}
/**
* @PackageName: com.raven.multithreaded.processrequest.v1
* @ClassName: PrintProcessor
* @Blame: raven
* @Date: 2021-08-12 14:40
* @Description: 打印请求信息
*/
public class PrintProcessor implements IRequestProcessor {
/**
* 下一请求处理过程
*/
private IRequestProcessor nextProcessor;
public PrintProcessor() {
}
/**
* 通过构造参数传递其他请求过程 达到链式调用的目的
* @param requestProcessor
*/
public PrintProcessor(IRequestProcessor requestProcessor) {
this.nextProcessor = requestProcessor;
}
@Override
public void process(Request request) {
System.out.println("PrintProcessor do sth");
if (Objects.nonNull(nextProcessor)){
nextProcessor.process(request);
}
}
}
/**
* @PackageName: com.raven.multithreaded.processrequest.v1
* @ClassName: SaveProcessor
* @Blame: raven
* @Date: 2021-08-12 14:40
* @Description: 存储请求信息
*/
public class SaveProcessor implements IRequestProcessor {
/**
* 下一请求处理过程
*/
private IRequestProcessor nextProcessor;
public SaveProcessor() {
}
/**
* 通过构造参数传递其他请求过程 达到链式调用的目的
*
* @param requestProcessor
*/
public SaveProcessor(IRequestProcessor requestProcessor) {
this.nextProcessor = requestProcessor;
}
@Override
public void process(Request request) {
System.out.println("SaveProcessor do sth");
if (Objects.nonNull(nextProcessor)){
nextProcessor.process(request);
}
}
}
main
/**
* @PackageName: com.raven.multithreaded.processrequest.v1
* @ClassName: RequestProcessorTest
* @Blame: raven
* @Date: 2021-08-12 14:43
* @Description: 模拟处理请求案例 阻塞式处理问题
*/
public class RequestProcessorTest {
public static void main(String[] args) {
Request request = new Request();
request.setName("processTest");
// 基于责任链模式 自由组合完成对请求的处理
IRequestProcessor processor = new SaveProcessor();
processor = new PrintProcessor(processor);
processor = new PreProcessor(processor);
processor.process(request);
}
}
==V2 :使用多线程== processor
/**
* @PackageName: com.raven.multithreaded
* @ClassName: PreProcessor
* @Blame: raven
* @Date: 2021-08-12 14:31
* @Description: 预处理请求 并通过多线程的方式提高处理请求的效率
*/
public class PreProcessor extends Thread implements IRequestProcessor {
/**
* 阻塞队列 承载请求列表
*/
private LinkedBlockingDeque<Request> requests = new LinkedBlockingDeque<>();
/**
* 下一请求处理过程
*/
private IRequestProcessor nextProcessor;
public PreProcessor() {
}
/**
* 可通过有参构造进行责任链式处理请求
*
* @param nextProcessor 下一处理过程
*/
public PreProcessor(IRequestProcessor nextProcessor) {
this.nextProcessor = nextProcessor;
}
/**
* 定义标识符,通过 @See shunDown()方法终止线程 通过volatile保证内存可见性
*/
private volatile boolean isFinish = false;
/**
* 对外提供关闭的方法
*/
public void shunDown() {
isFinish = true;
}
@Override
public void run() {
while (!isFinish) {
try {
// 取出第一个请求
Request request = requests.take();
System.out.println("PreProcessor do sth " + request);
if (Objects.nonNull(nextProcessor)) {
// 消费者 从阻塞队列中取请求进行消费
nextProcessor.process(request);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void process(Request request) {
// 生产者,将请求加入到阻塞队列中
requests.add(request);
}
}
/**
* @PackageName: com.raven.multithreaded
* @ClassName: PrintProcessor
* @Blame: raven
* @Date: 2021-08-12 14:31
* @Description: 打印请求信息 并通过多线程的方式提高处理请求的效率
*/
public class PrintProcessor extends Thread implements IRequestProcessor {
/**
* 阻塞队列 承载请求列表
*/
private LinkedBlockingDeque<Request> requests = new LinkedBlockingDeque<>();
/**
* 下一请求处理过程
*/
private IRequestProcessor nextProcessor;
public PrintProcessor() {
}
/**
* 可通过有参构造进行责任链式处理请求
*
* @param nextProcessor
*/
public PrintProcessor(IRequestProcessor nextProcessor) {
this.nextProcessor = nextProcessor;
}
/**
* 定义标识符,通过 @See shunDown()方法终止线程 通过volatile保证内存可见性
*/
private volatile boolean isFinish = false;
/**
* 对外提供关闭的方法
*/
public void shunDown() {
isFinish = true;
}
@Override
public void run() {
while (!isFinish) {
try {
Request request = requests.take();
System.out.println("PrintProcessor do sth " + request);
if (Objects.nonNull(nextProcessor)) {
// 消费者 从阻塞队列中取请求进行消费
nextProcessor.process(request);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void process(Request request) {
// 生产者,将请求加入到阻塞队列中
requests.add(request);
}
}
/**
* @PackageName: com.raven.multithreaded
* @ClassName: PreProcessor
* @Blame: raven
* @Date: 2021-08-12 14:31
* @Description: 存储请求信息 并通过多线程的方式提高处理请求的效率
*/
public class SaveProcessor extends Thread implements IRequestProcessor {
/**
* 阻塞队列 承载请求列表
*/
private LinkedBlockingDeque<Request> requests = new LinkedBlockingDeque<>();
/**
* 下一请求处理过程
*/
private IRequestProcessor nextProcessor;
public SaveProcessor() {
}
/**
* 可通过有参构造进行责任链式处理请求
*
* @param nextProcessor
*/
public SaveProcessor(IRequestProcessor nextProcessor) {
this.nextProcessor = nextProcessor;
}
/**
* 定义标识符,通过 @See shunDown()方法终止线程 通过volatile保证内存可见性
*/
private volatile boolean isFinish = false;
/**
* 对外提供关闭的方法
*/
public void shunDown() {
isFinish = true;
}
@Override
public void run() {
while (!isFinish) {
try {
Request request = requests.take();
System.out.println("SaveProcessor do sth " + request);
if (Objects.nonNull(nextProcessor)) {
// 消费者 从阻塞队列中取请求进行消费
nextProcessor.process(request);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void process(Request request) {
// 生产者,将请求加入到阻塞队列中
requests.add(request);
}
}
main
/**
* @PackageName: com.raven.multithreaded.processrequest.v2
* @ClassName: MultiThreadRequestProcessorTest
* @Blame: raven
* @Date: 2021-08-12 20:57
* @Description: 模拟处理请求场景并使用责任链模式 通过多线程的方式调高处理请求的效率
*/
public class MultiThreadRequestProcessorTest {
private static IRequestProcessor requestProcessor;
/**
* 启动线程
*/
public void setUp() {
SaveProcessor saveProcessor = new SaveProcessor();
saveProcessor.start();
PrintProcessor printProcessor = new PrintProcessor(saveProcessor);
printProcessor.start();
requestProcessor = new PreProcessor(printProcessor);
((PreProcessor) requestProcessor).start();
}
public static void main(String[] args) {
Request request = new Request();
request.setName("MultiThreadRequestProcessorTest");
new MultiThreadRequestProcessorTest().setUp();
requestProcessor.process(request);
}
}
6.并发基础
线程指令最终被os系统所执行,各java方法与os系统方法的对应关系
#define ARRAY_LENGTH(a) (sizeof(a)/sizeof(a[0]))
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
{"setNativeName", "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};
6.1 线程的声明周期
==demo:==
/**
* @PackageName: com.raven.multithreaded.threadstatus
* @ClassName: ThreadStatusTest
* @Blame: raven
* @Date: 2021-08-12 21:44
* @Description: 模拟线程不同的状态
* 线程六大状态:
* new (新建) runnable(可运行状态)[ready/running] terminated(终止状态)
* waiting(等待状态) time_waiting(有时间限制的等待) blocked(阻塞状态)
*/
public class ThreadStatusTest {
public static void main(String[] args) {
// Time_Waiting_Thread 模拟有时间限制的等待状态
new Thread(() -> {
while (true) {
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
// 当发生中断异常时,可通过catch中逻辑 终止线程
e.printStackTrace();
}
}
}, "Time_Waiting_Thread").start();
// Wait_Thread 模拟无时间限制的等待状态
new Thread(() -> {
while (true) {
synchronized (ThreadStatusTest.class) {
try {
ThreadStatusTest.class.wait();
} catch (InterruptedException e) {
// 当发生中断异常时,可通过catch中逻辑 终止线程
e.printStackTrace();
}
}
}
}, "Wait_Thread").start();
// Blocked 模拟锁阻塞状态 01线程拿到锁后不释放,02阻塞
new Thread(new BlockedThread(), "Blocked_01").start();
new Thread(new BlockedThread(), "Blocked_02").start();
// 测试方式:
// 0.运行demo程序
// 1.在target\classes\com\raven\multithreaded\threadstatus 目录下打开terminal命令窗口
// 2.通过jps指令知道进行id
// 3.通过jstack + 进程id值 查看线程运行状态 (jstack命令学习 https://blog.csdn.net/qq_41904194/article/details/104150372)
// 4.查看线程状态
/*
* "Blocked_02" #17 prio=5 os_prio=0 tid=0x000000001f280800 nid=0x53f8 waiting for monitor entry [0x000000001fdbf000]
* java.lang.Thread.State: BLOCKED (on object monitor)
*
* "Blocked_01" #15 prio=5 os_prio=0 tid=0x000000001f280000 nid=0x5d10 waiting on condition [0x000000001fcbf000]
* java.lang.Thread.State: TIMED_WAITING (sleeping)
*
* "Wait_Thread" #13 prio=5 os_prio=0 tid=0x000000001f27f000 nid=0x475c in Object.wait() [0x000000001fbbe000]
* java.lang.Thread.State: WAITING (on object monitor)
*
* "Time_Waiting_Thread" #12 prio=5 os_prio=0 tid=0x000000001f27c800 nid=0x2be0 waiting on condition [0x000000001fabe000]
* java.lang.Thread.State: TIMED_WAITING (sleeping)
*/
}
static class BlockedThread extends Thread {
@Override
public void run() {
// 设置synchronized 模拟死锁场景
synchronized (BlockedThread.class) {
while (true) {
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
6.2 线程中断
public class InterruptThreadTest {
private static int i;
public static void main(String[] args) throws InterruptedException {
Thread interruptThread = new Thread(() -> {
// isInterrupted 默认是false 默认是为中断
while (!Thread.currentThread().isInterrupted()) {
i++;
}
System.out.println("i:" + i);
}, "InterruptThread");
interruptThread.start();
TimeUnit.SECONDS.sleep(1);
// 把isInterrupted设置为true
// 底层是通过os系统设置 0,1来改变线程的中断状态,中断线程
interruptThread.interrupt();
}
}
6.3 线程的复位
/**
* @PackageName: com.raven.multithreaded.interruptthread
* @ClassName: ThreadResetTest
* @Blame: raven
* @Date: 2021-08-13 18:37
* @Description: 线程的复位
*/
public class ThreadResetTest {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
while (true){
// 默认为false status = 0
if (Thread.currentThread().isInterrupted()){
System.out.println("before:" + Thread.currentThread().isInterrupted());
// 复位 回到初始状态
Thread.interrupted();
System.out.println("after:" + Thread.currentThread().isInterrupted());
}
}
});
thread.start();
TimeUnit.SECONDS.sleep(1);
// 将中断标识改为 true
thread.interrupt();
}
}
6.4中断一个处于阻塞状态的线程
public class ExceptionThreadTest {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()){
try {
//中断一个处于阻塞状态的线程 会抛出 InterruptedException异常
TimeUnit.SECONDS.sleep(1);
// System.out.println("demo");
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
// System.out.println("ExceptionThreadTest");
});
thread.start();
TimeUnit.SECONDS.sleep(1);
// 将中断状态改为true
thread.interrupt();
System.out.println(thread.isInterrupted());
}
}
6.5总结
- 线程的启动,start 基于不同的操作系统来实现不同的线程创建和启动指令
- interrupt() 可以中断线程,底层是 ==_interrupted==
- 可以通过Thread.interrupted()重置线程的中断状态
- 中断一个阻塞中的线程(sleep /wait/queue.take)会抛出InterruptedException