异步注解相关
# 异步注解 @Async
# 1、启动类开启异步线程
启动类
上添加或者 自定义线程池
上添加注解:@EnableAsync
# 2、配置线程池
示例:
/**
* 全局异步任务配置类
*
* @author chenmeng
*/
@Slf4j
@EnableAsync
@Configuration
public class GlobalAsyncConfig implements AsyncConfigurer {
/**
* 创建自定义异步线程池
*
* @return Executor
*/
@Bean(name = "asyncExecutor")
public Executor asyncExecutor() {
// 1、核心线程数
int corePoolSize = 10;
// 2、最大线程数
int maxPoolSize = 50;
// 3、非核心线程空闲存活时间
long keepAliveTime = 60L;
// 4、时间单位: 秒
TimeUnit unit = TimeUnit.SECONDS;
// 5、工作队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
// 6、设置线程工厂(线程名称格式)
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("async-pool-%d")
.build();
// 7、设置拒绝策略
ThreadPoolExecutor.CallerRunsPolicy rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
rejectedExecutionHandler
);
// 允许核心线程超时
executor.allowCoreThreadTimeOut(true);
return executor;
}
@Override
public Executor getAsyncExecutor() {
// 返回自定义线程池
return asyncExecutor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
// 设置异步执行时的异常处理器
return new GlobalAsyncUncaughtExceptionHandler();
}
public static class GlobalAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, @NotNull Object... params) {
log.error("[handleUncaughtException][method({}) params({}) 发生异常]", method.getName(), Arrays.deepToString(params), ex);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
在这个配置类中,我们完成了以下内容:
- 通过
@Configuration
注解标注这个类是一个配置类。 - 通过
@EnableAsync
注解开启异步支持。 - 实现
AsyncConfigurer
接口,以自定义异步执行器和异常处理器。 - 创建了一个
ThreadPoolExecutor
作为自定义线程池,配置了核心线程数、最大线程数、空闲存活时间、工作队列等参数。 - 使用
ThreadFactoryBuilder
创建了一个命名的线程工厂。 - 设置了拒绝策略为
CallerRunsPolicy
,这意味着如果线程池达到饱和状态,任务将由调用者线程执行。 - 实现了自定义的异常处理器
SimpleAsyncUncaughtExceptionHandler
,用于处理异步方法执行中的未捕获异常。
# 3、方法体上添加异步注解
示例:
@Async
public void test() {
}
2
3
4
注意事项:
- 在内部类之间的方法调用,此注解无效,不会有异步线程
- 需要外部的类来调用这个异步方法才会开启异步线程
# 4、不使用自定义线程池可能引起的问题
在 Spring 框架中,@Async
注解可以用于异步执行方法。
当你使用 @Async
注解时,如果不自定义线程池,Spring 会为你自动创建一个默认的线程池。这个默认线程池是由 SimpleAsyncTaskExecutor
管理的,它为每个任务创建一个新的线程。
虽然这可以工作,但可能会遇到以下问题:
无限制的线程创建:
SimpleAsyncTaskExecutor
会为每个任务创建一个新的线程,而没有最大线程数的限制。如果异步任务的数量非常多,这可能导致大量的线程被创建,消耗大量的系统资源,最终可能导致OutOfMemoryError
或降低系统性能。线程管理:由于每次调用都会创建新线程,没有线程复用,这可能会导致线程管理上的开销,尤其是在高并发场景下。
调试和监控困难:默认线程池创建的线程名称没有明确的命名规则,这可能会使得在日志中或监控工具中跟踪异步任务变得困难。
资源竞争:大量的线程可能会引起CPU和内存资源的激烈竞争,尤其是在JVM和操作系统层面上的上下文切换。
安全性问题:如果异步任务执行的时间过长,而默认线程池没有适当的管理策略,可能会因为线程过多而影响到系统的稳定性和安全性。
为了避免这些问题,建议自定义线程池。自定义线程池可以提供以下优势:
- 可控的线程数量:通过配置最大线程数来限制并发执行的任务数量,避免过度创建线程。
- 线程复用:线程池可以重用已存在的线程来执行新的任务,减少了线程创建和销毁的开销。
- 优雅的关闭:可以配置线程池的关闭策略,确保在应用程序关闭时,正在执行的任务能够优雅地完成或中断。
- 更好的监控和调试:自定义线程池可以设置有意义的线程名称,便于在日志和监控系统中识别和跟踪。
在 Spring 中自定义线程池通常涉及到配置一个 ThreadPoolTaskExecutor
或者实现一个 AsyncConfigurer
接口,并定义线程池的各种参数,如核心线程数、最大线程数、队列容量等。这样可以更好地控制异步任务的执行和资源的使用。
# 如何自定义一个适合我项目的线程池?
自定义线程池以适应您的项目需求,需要考虑以下几个关键因素:
核心线程数(Core Pool Size):
- 核心线程是线程池中始终保持活跃的线程,即使它们处于空闲状态。通常,核心线程数取决于任务的类型(CPU密集型、IO密集型或混合型)以及系统的并发需求。
最大线程数(Maximum Pool Size):
- 最大线程数是线程池中允许的最大线程数。当所有核心线程都在忙碌时,并且任务队列已满,线程池会创建新的线程,直到达到这个数值。
工作队列(Work Queue):
- 工作队列用于存储等待执行的任务。队列的大小可以是有限或无限的。选择合适的队列类型(如
ArrayBlockingQueue
、LinkedBlockingQueue
、SynchronousQueue
等)对于控制线程池的行为至关重要。
- 工作队列用于存储等待执行的任务。队列的大小可以是有限或无限的。选择合适的队列类型(如
线程存活时间(Keep-Alive Time):
- 当线程池中的线程数超过核心线程数时,非核心线程在空闲状态下可以存活的时间。这个参数可以用来控制线程的生命周期,以避免长时间保持不必要的线程。
时间单位(Time Unit):
- 设置线程存活时间的单位
线程工厂(Thread Factory):
- 线程工厂用于创建新线程。通过自定义线程工厂,可以设置线程的名称、优先级和其他属性,这有助于调试和监控。
拒绝策略(Rejected Execution Handler):
- 当任务太多,无法被线程池及时处理时,拒绝策略决定了如何处理这些额外的任务。常见的拒绝策略包括
ThreadPoolExecutor.AbortPolicy
、CallerRunsPolicy
、DiscardPolicy
和DiscardOldestPolicy
。
- 当任务太多,无法被线程池及时处理时,拒绝策略决定了如何处理这些额外的任务。常见的拒绝策略包括
任务类型:
根据任务的特性(CPU密集型、IO密集型或混合型)来调整线程池的参数。CPU密集型任务通常需要较少的线程,而IO密集型任务可能需要更多的线程。
// 示例 @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(determineCorePoolSize()); // 根据任务类型确定核心线程数 executor.setMaxPoolSize(determineMaxPoolSize()); // 根据任务类型确定最大线程数 executor.setQueueCapacity(determineQueueCapacity()); // 根据任务类型确定队列容量 executor.setThreadNamePrefix("async-task-"); // 线程名称前缀 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略 executor.initialize(); return executor; } private int determineCorePoolSize() { // 根据任务类型和系统资源确定核心线程数 // 例如,对于CPU密集型任务,可以返回Runtime.getRuntime().availableProcessors(); // 对于IO密集型任务,可以返回更高的值 } private int determineMaxPoolSize() { // 根据任务类型和系统资源确定最大线程数 } private int determineQueueCapacity() { // 根据任务类型确定队列容量 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26