异步的实现 ->Async方式
使用注意事项
- 需要在spring管理的bean中使用
- 可使用的地方 类上 接口 以及方法上
- 在放在类上的话 整个类的方法都是异步的
- 启动类上需要enableAsyc 激活配置
为什么可以实现异步
依赖于启动上的 EnableAsync
深入研究下
注解类如下
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
Class<? extends Annotation> annotation() default Annotation.class;
boolean proxyTargetClass() default false;
AdviceMode mode() default AdviceMode.PROXY;
int order() default Ordered.LOWEST_PRECEDENCE;
}
重点导入了一个类AsyncConfigurationSelector
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
重写了父类的selectImports 父类实现 ImportSelector 可以实现selectImports 来导入bean
注解类 EnableAsync 有个 mode 参数 在没指定的情况下 默认为 AdviceMode.PROXY; 根据AsyncConfigurationSelector类的selectImports 逻辑 根据代理类型不同 来用不同的类实现 如果是jdk的动态代理 使用 ProxyAsyncConfiguration 如果是 aspectj 则使用AspectJAsyncConfiguration 处理 我们这里传入AdviceMode.PROXY 则定位到 ProxyAsyncConfiguration 这个类
类如下
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
重点注入了一个bean AsyncAnnotationBeanPostProcessor 设置启动的注解 EnableAsync 上携带的参数
重点看下 配置执行器 以及异常处理器
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
this.executor和this.exceptionHandler 都是父类的变量
@Configuration(proxyBeanMethods = false)
public abstract class AbstractAsyncConfiguration implements ImportAware {
@Nullable
protected AnnotationAttributes enableAsync;
@Nullable
protected Supplier<Executor> executor;
@Nullable
protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
@Override
public void setImportMetadata(AnnotationMetadata importMetadata) {
this.enableAsync = AnnotationAttributes.fromMap(
importMetadata.getAnnotationAttributes(EnableAsync.class.getName()));
if (this.enableAsync == null) {
throw new IllegalArgumentException(
"@EnableAsync is not present on importing class " + importMetadata.getClassName());
}
}
@Autowired
void setConfigurers(ObjectProvider<AsyncConfigurer> configurers) {
Supplier<AsyncConfigurer> configurer = SingletonSupplier.of(() -> {
List<AsyncConfigurer> candidates = configurers.stream().collect(Collectors.toList());
if (CollectionUtils.isEmpty(candidates)) {
return null;
}
if (candidates.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
return candidates.get(0);
});
this.executor = adapt(configurer, AsyncConfigurer::getAsyncExecutor);
this.exceptionHandler = adapt(configurer, AsyncConfigurer::getAsyncUncaughtExceptionHandler);
}
private <T> Supplier<T> adapt(Supplier<AsyncConfigurer> supplier, Function<AsyncConfigurer, T> provider) {
return () -> {
AsyncConfigurer configurer = supplier.get();
return (configurer != null ? provider.apply(configurer) : null);
};
}
}
断点看着两个变量值如下
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bcUvE0YG-1663811963617)(/usr/local/develop/markdown-source-image/image-20220919103138658.png)]
AsyncAnnotationBeanPostProcessor 配置执行器以及 异常处理器 值如下
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EKqJjRGU-1663811963618)(/usr/local/develop/markdown-source-image/image-20220919103448684.png)]
首先看下 AsyncAnnotationBeanPostProcessor 类图
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kzFruOWd-1663811963618)(/usr/local/develop/markdown-source-image/image-20220815161438722.png)]
顶层实现 beanPostProccessor 可以用来实现动态代理
重点看下实现的方法 postProcessBeforeInitialization 和 postProcessAfterInitialization
AsyncAnnotationBeanPostProcessor 对beanpostProccessor 接口的实现在父类的父类 也就是AbstractAdvisingBeanPostProcessor
public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport implements BeanPostProcessor {
@Nullable
protected Advisor advisor;
protected boolean beforeExistingAdvisors = false;
private final Map<Class<?>, Boolean> eligibleBeans = new ConcurrentHashMap(256);
public Object postProcessBeforeInitialization(Object bean, String beanName) {
return bean;
}
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (this.advisor != null && !(bean instanceof AopInfrastructureBean)) {
if (bean instanceof Advised) {
Advised advised = (Advised)bean;
if (!advised.isFrozen() && this.isEligible(AopUtils.getTargetClass(bean))) {
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
} else {
advised.addAdvisor(this.advisor);
}
return bean;
}
}
if (this.isEligible(bean, beanName)) {
ProxyFactory proxyFactory = this.prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
this.evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
this.customizeProxyFactory(proxyFactory);
ClassLoader classLoader = this.getProxyClassLoader();
if (classLoader instanceof SmartClassLoader && classLoader != bean.getClass().getClassLoader()) {
classLoader = ((SmartClassLoader)classLoader).getOriginalClassLoader();
}
return proxyFactory.getProxy(classLoader);
} else {
return bean;
}
} else {
return bean;
}
}
}
protected boolean isEligible(Object bean, String beanName) {
return isEligible(bean.getClass());
}
protected boolean isEligible(Class<?> targetClass) {
Boolean eligible = this.eligibleBeans.get(targetClass);
if (eligible != null) {
return eligible;
}
if (this.advisor == null) {
return false;
}
eligible = AopUtils.canApply(this.advisor, targetClass);
this.eligibleBeans.put(targetClass, eligible);
return eligible;
}
protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
ProxyFactory proxyFactory = new ProxyFactory();
proxyFactory.copyFrom(this);
proxyFactory.setTarget(bean);
return proxyFactory;
}
protected void customizeProxyFactory(ProxyFactory proxyFactory) {
}
advisor 的赋值在AsyncAnnotationBeanPostProcessor 的setBeanFactory方法中 方法如下
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
如果使用async符合最上的要求的话 可以被切面 切入然后进行增强
- 需要在spring管理的bean中使用
- 启动类上或者其他的配置类引入enableAsyc 激活配置
那我们看下切面类所做的事情
首先看下AsyncAnnotationAdvisor对应的构造方法
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
}
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
增强器中 有创建了新的类 AnnotationAsyncExecutionInterceptor
类图如下
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gO1OYIZ5-1663811963618)(/usr/local/develop/markdown-source-image/image-20220921161946181.png)]
还是一样 我们来看 构造方法
public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {
super(defaultExecutor);
}
public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {
super(defaultExecutor);
}
public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);
}
defaultExecutor的实例为SimpleAsyncTaskExecutor 包裹了一层 SingletonSupplier
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EvLPcJWQ-1663811963618)(/usr/local/develop/markdown-source-image/image-20220921175542713.png)]
传入executor为空的话 会调用getDefaultExecutor 这个方法获取默认的 这里会有的方法重载 会调用 AsyncExecutionAspectSupport的子类AsyncExecutionInterceptor 的 getDefaultExecutor 方法 内容如下
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
logger.debug("Could not find unique TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskExecutor bean found within the context, and none is named " +
"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
logger.debug("Could not find default TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
logger.info("No task executor bean found for async processing: " +
"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
}
}
}
return null;
}
其次再来看 AnnotationAsyncExecutionInterceptor 类 实现了 MethodInterceptor 我们来看下具体的实现 这里会进行增强方法
具体的实现在他的父类 AsyncExecutionInterceptor 中
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}
public <T> Future<T> submit(Callable<T> task) {
FutureTask<T> future = new FutureTask(task);
this.execute(future, Long.MAX_VALUE);
return future;
}
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
this.concurrencyThrottle.beforeAccess();
doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
}else {
doExecute(taskToUse);
}
}
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
看到这里是否发现了什么 原来都是 async在没配置的情况下 都是通过创建线程来执行任务
我们在来串一下整个流程
-
EnableAsync激活配置 根据不同的代理类型使用不同的类 进行处理 默认为jdk动态代理 -
jdk的动态代理 使用 ProxyAsyncConfiguration 类处理 -
使用AsyncAnnotationBeanPostProcessor bean进行处理 -
重点 在重写的父类 setBeanFactory 方法 会进行赋值一个 增强器 AsyncAnnotationAdvisor 给父类成员变量 -
其中 AsyncAnnotationBeanPostProcessor 实现 了 BeanPostProcessor 接口 -
重点看下 AsyncAnnotationBeanPostProcessor 实现的 postProcessAfterInitialization bean初始化完成进行增强 -
方法中会进行判断 是否有资格进行增强 有资格返回 返回代理类 -
在进行创建增强类 AsyncAnnotationAdvisor 构造方法中 创建增强点 和 切点 -
切点即为 增加了 Async 以及 Asynchronous (可能不存在)注解中 -
在buildAdvice方法中 创建增强器的过程中 又 创建了新的类 AnnotationAsyncExecutionInterceptor -
AnnotationAsyncExecutionInterceptor 初始化 会有个 defaultExecutor 赋值操作 其中 会有方法重载 -
具体使用的方法 org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor 同时调用父类的方法org.springframework.aop.interceptor.AsyncExecutionAspectSupport#getDefaultExecutor 同时会将defaultExecutor赋值为SimpleAsyncTaskExecutor -
AnnotationAsyncExecutionInterceptor 又实现了 MethodInterceptor 接口 同时 实现位于父类 AsyncExecutionInterceptor 的invoke方法 -
这个方法是重点 有两步 第一步在 determineAsyncExecutor 方法中 获取 AsyncTaskExecutor 实例 实例对象为 SimpleAsyncTaskExecutor -
第二步 封装为任务 调用 doSubmit 提供任务 -
doSubmit 中 根据执行方法不同返回值 进行不同的处理 我们看的是方法返回普通对象的处理方式 -
调用 executor 也就是 SimpleAsyncTaskExecutor 的 submit提交任务 -
默认 不启用资源节流 也就是 isThrottleActive为 false 故,调用 doExecute -
根据线程工厂是否为空 不为空则使用创建线程 否则自行创建线程 执行任务
so. 我们还是自定义配置线程池
推荐使用自定义配置 如下
@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(7);
executor.setMaxPoolSize(42);
executor.setQueueCapacity(11);
executor.setThreadNamePrefix("MyExecutor-");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
}
|