异步注解相关
# 异步注解 @Async
# 1、启动类开启异步线程
启动类
上添加或者 自定义线程池
上添加注解:@EnableAsync
# 2、配置线程池
示例:
/**
* 全局异步任务配置类
*
* @author chenmeng
*/
@Slf4j
@EnableAsync
@Configuration
public class GlobalAsyncConfig implements AsyncConfigurer {
/**
* 创建自定义异步线程池
*
* @return Executor
*/
@Bean(name = "asyncExecutor")
public Executor asyncExecutor() {
// 1、核心线程数
int corePoolSize = 10;
// 2、最大线程数
int maxPoolSize = 50;
// 3、非核心线程空闲存活时间
long keepAliveTime = 60L;
// 4、时间单位: 秒
TimeUnit unit = TimeUnit.SECONDS;
// 5、工作队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
// 6、设置线程工厂(线程名称格式)
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("async-pool-%d")
.build();
// 7、设置拒绝策略
ThreadPoolExecutor.CallerRunsPolicy rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
rejectedExecutionHandler
);
// 允许核心线程超时
executor.allowCoreThreadTimeOut(true);
return executor;
}
@Override
public Executor getAsyncExecutor() {
// 返回自定义线程池
return asyncExecutor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
// 设置异步执行时的异常处理器
return new GlobalAsyncUncaughtExceptionHandler();
}
public static class GlobalAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, @NotNull Object... params) {
log.error("[handleUncaughtException][method({}) params({}) 发生异常]", method.getName(), Arrays.deepToString(params), ex);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
在这个配置类中,我们完成了以下内容:
- 通过
@Configuration
注解标注这个类是一个配置类。 - 通过
@EnableAsync
注解开启异步支持。 - 实现
AsyncConfigurer
接口,以自定义异步执行器和异常处理器。 - 创建了一个
ThreadPoolExecutor
作为自定义线程池,配置了核心线程数、最大线程数、空闲存活时间、工作队列等参数。 - 使用
ThreadFactoryBuilder
创建了一个命名的线程工厂。 - 设置了拒绝策略为
CallerRunsPolicy
,这意味着如果线程池达到饱和状态,任务将由调用者线程执行。 - 实现了自定义的异常处理器
SimpleAsyncUncaughtExceptionHandler
,用于处理异步方法执行中的未捕获异常。
# 3、方法体上添加异步注解
示例:
@Async
public void test() {
}
2
3
4
注意事项:
- 在内部类之间的方法调用,此注解无效,不会有异步线程
- 需要外部的类来调用这个异步方法才会开启异步线程
# 4、不使用自定义线程池可能引起的问题
在 Spring 框架中,@Async
注解可以用于异步执行方法。
当你使用 @Async
注解时,如果不自定义线程池,Spring 会为你自动创建一个默认的线程池。这个默认线程池是由 SimpleAsyncTaskExecutor
管理的,它为每个任务创建一个新的线程。
虽然这可以工作,但可能会遇到以下问题:
无限制的线程创建:
SimpleAsyncTaskExecutor
会为每个任务创建一个新的线程,而没有最大线程数的限制。如果异步任务的数量非常多,这可能导致大量的线程被创建,消耗大量的系统资源,最终可能导致OutOfMemoryError
或降低系统性能。线程管理:由于每次调用都会创建新线程,没有线程复用,这可能会导致线程管理上的开销,尤其是在高并发场景下。
调试和监控困难:默认线程池创建的线程名称没有明确的命名规则,这可能会使得在日志中或监控工具中跟踪异步任务变得困难。
资源竞争:大量的线程可能会引起CPU和内存资源的激烈竞争,尤其是在JVM和操作系统层面上的上下文切换。
安全性问题:如果异步任务执行的时间过长,而默认线程池没有适当的管理策略,可能会因为线程过多而影响到系统的稳定性和安全性。
为了避免这些问题,建议自定义线程池。自定义线程池可以提供以下优势:
- 可控的线程数量:通过配置最大线程数来限制并发执行的任务数量,避免过度创建线程。
- 线程复用:线程池可以重用已存在的线程来执行新的任务,减少了线程创建和销毁的开销。
- 优雅的关闭:可以配置线程池的关闭策略,确保在应用程序关闭时,正在执行的任务能够优雅地完成或中断。
- 更好的监控和调试:自定义线程池可以设置有意义的线程名称,便于在日志和监控系统中识别和跟踪。
在 Spring 中自定义线程池通常涉及到配置一个 ThreadPoolTaskExecutor
或者实现一个 AsyncConfigurer
接口,并定义线程池的各种参数,如核心线程数、最大线程数、队列容量等。这样可以更好地控制异步任务的执行和资源的使用。
# 如何自定义一个适合我项目的线程池?
自定义线程池以适应您的项目需求,需要考虑以下几个关键因素:
核心线程数(Core Pool Size):
- 核心线程是线程池中始终保持活跃的线程,即使它们处于空闲状态。通常,核心线程数取决于任务的类型(CPU密集型、IO密集型或混合型)以及系统的并发需求。
最大线程数(Maximum Pool Size):
- 最大线程数是线程池中允许的最大线程数。当所有核心线程都在忙碌时,并且任务队列已满,线程池会创建新的线程,直到达到这个数值。
工作队列(Work Queue):
- 工作队列用于存储等待执行的任务。队列的大小可以是有限或无限的。选择合适的队列类型(如
ArrayBlockingQueue
、LinkedBlockingQueue
、SynchronousQueue
等)对于控制线程池的行为至关重要。
- 工作队列用于存储等待执行的任务。队列的大小可以是有限或无限的。选择合适的队列类型(如
线程存活时间(Keep-Alive Time):
- 当线程池中的线程数超过核心线程数时,非核心线程在空闲状态下可以存活的时间。这个参数可以用来控制线程的生命周期,以避免长时间保持不必要的线程。
时间单位(Time Unit):
- 设置线程存活时间的单位
线程工厂(Thread Factory):
- 线程工厂用于创建新线程。通过自定义线程工厂,可以设置线程的名称、优先级和其他属性,这有助于调试和监控。
拒绝策略(Rejected Execution Handler):
- 当任务太多,无法被线程池及时处理时,拒绝策略决定了如何处理这些额外的任务。常见的拒绝策略包括
ThreadPoolExecutor.AbortPolicy
、CallerRunsPolicy
、DiscardPolicy
和DiscardOldestPolicy
。
- 当任务太多,无法被线程池及时处理时,拒绝策略决定了如何处理这些额外的任务。常见的拒绝策略包括
任务类型:
根据任务的特性(CPU密集型、IO密集型或混合型)来调整线程池的参数。CPU 密集型任务通常需要较少的线程,而 IO 密集型任务可能需要更多的线程。
// 示例 @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(determineCorePoolSize()); // 根据任务类型确定核心线程数 executor.setMaxPoolSize(determineMaxPoolSize()); // 根据任务类型确定最大线程数 executor.setQueueCapacity(determineQueueCapacity()); // 根据任务类型确定队列容量 executor.setThreadNamePrefix("async-task-"); // 线程名称前缀 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略 executor.initialize(); return executor; } private int determineCorePoolSize() { // 根据任务类型和系统资源确定核心线程数 // 例如,对于CPU密集型任务,可以返回Runtime.getRuntime().availableProcessors(); // 对于IO密集型任务,可以返回更高的值 } private int determineMaxPoolSize() { // 根据任务类型和系统资源确定最大线程数 } private int determineQueueCapacity() { // 根据任务类型确定队列容量 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 为什么需要自定义线程池?
@Async
默认使用SimpleAsyncTaskExecutor
SimpleAsyncTaskExecutor
的问题@Async
默认使用 无界线程池,每次请求都会创建新线程,可能会导致线程资源耗尽,最终引发OOM
。- 没有线程复用机制,对高并发业务不友好。
自定义线程池的好处
- 解决默认线程池的局限性
- 自定义线程池可以控制线程数量、防止资源耗尽,提高系统的吞吐量和稳定性。
- 提高
@Async
任务执行的可控性- 默认
@Async
线程池可能会无限创建线程,自定义线程池后,可以设定最大线程数,防止应用崩溃。
- 默认
# 线程池参数的调优建议
参数 | 作用 | 推荐值(取决于业务场景) |
---|---|---|
corePoolSize | 核心线程数,长期存活 | CPU 密集型:CPU 核心数,IO 密集型:2 * CPU 核心数 |
maxPoolSize | 最大线程数 | corePoolSize * 2 或更多(高并发场景) |
queueCapacity | 任务队列大小 | corePoolSize * 10 (视情况而定) |
keepAliveTime | 非核心线程存活时间 | 60s |
ThreadFactory | 自定义线程名称 | 方便排查问题 |
RejectedExecutionHandler | 线程池满时的策略 | CallerRunsPolicy(让调用线程执行任务) |
# 重点参数
参数 | 作用 | 推荐配置 |
---|---|---|
corePoolSize | 核心线程数,常驻线程数 | 通常设置为 CPU 核心数 + N(N 视业务情况,通常 0~2) |
maxPoolSize | 最大线程数 | 通常为 corePoolSize * 2~4 (具体看业务峰值) |
queueCapacity | 队列容量,缓存等待任务 | 通常设置为 500~10000 ,根据 QPS 及任务量调整 |
keepAliveSeconds | 非核心线程空闲存活时间 | 建议设置为 30~60秒 |
threadNamePrefix | 线程名前缀,便于监控和排查问题 | 业务相关前缀,例如 order-executor- |
rejectedExecutionHandler | 拒绝策略(超出 maxPoolSize + queueCapacity) | 通常选用 CallerRunsPolicy ,防止丢消息 |
更多拒绝策略相关的内容:线程池详解 | 沉梦听雨的编程指南 (opens new window)
# 动态配置 vs 固定配置
类型 | 优点 | 缺点 | 使用场景 |
---|---|---|---|
固定配置 | 稳定可靠,线程池规模明确,利于运维和监控 | 高并发时可能资源浪费或不足,无法根据硬件动态调整(缺乏灵活性) | 并发压力可预估,流量稳定的系统 |
动态配置 | 可自动适应机器配置,充分利用CPU资源 | 在业务高峰期仍需手动扩容调优 | 部署在多规格服务器、资源敏感型系统 |
动态配置示例:
@Bean(name = "myExecutor")
public ThreadPoolTaskExecutor myExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 动态配置(根据可用的 JVM 处理器数)
int cores = Runtime.getRuntime().availableProcessors();
executor.setCorePoolSize(cores);
executor.setMaxPoolSize(cores * 2);
executor.setQueueCapacity(cores * 10);
// 固定配置(可根据配置文件动态传参)
// executor.setCorePoolSize(10);
// executor.setMaxPoolSize(20);
// executor.setQueueCapacity(200);
executor.setThreadNamePrefix("myExecutor-");
executor.setKeepAliveSeconds(60);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# IO密集型 vs CPU密集型
类型 | 特点 | 典型场景 | 线程数建议 |
---|---|---|---|
CPU密集型 | 主要消耗 CPU 运算资源,线程大部分时间在做计算 | 加密解密、图像处理、压缩解压、数据分析、复杂算法 | 线程数 ≈ CPU核心数 + 1 (一般 ≤ 核心数 × 2) |
IO密集型 | 主要在等待 IO 操作(磁盘、网络、数据库) | 文件读写、数据库操作、网络请求、RPC调用、接口聚合 | 线程数 ≈ 核心数 × 2 ~ 核心数 × 4 |
# 如何快速判断?
1. 看业务流程
- 是否大量涉及磁盘读写、网络请求? → IO密集型
- 是否纯 CPU 计算、几乎无 IO? → CPU密集型
2. 用性能监控工具 可以通过 top、htop、jstack 等工具观察:
- CPU占用高,Load Average接近或超过核心数 → CPU密集
- CPU占用低,线程经常处于 WAITING、TIMED_WAITING 状态 → IO密集
3. 代码逻辑特征
- 循环大量计算 → CPU密集
- 多次调用第三方接口、文件读写、数据库交互 → IO密集
# 为什么要区分?
因为 线程池配置策略不同:
- CPU密集型:不宜设置过多线程,避免上下文切换带来的开销。
- IO密集型:线程阻塞等待 IO,所以需要多开一些线程,以提高吞吐。
# 实际企业开发举例
业务 | 类型 | 线程池设置建议 |
---|---|---|
视频转码、图片处理 | CPU密集型 | 核心数 + 1 |
日志收集、异步写库、发送消息队列 | IO密集型 | 核心数 × 2~4 |
Web接口聚合调用第三方服务 | IO密集型 | 核心数 × 2~3 |