网站首页 > 精选教程 正文
一、 Spring框架中的异步执行
在Spring Framework中分别使用TaskExecutor和TaskScheduler接口提供异步执行和任务调度的抽象,本节我们着重讲解基于TaskExecutor如何进行 异步处理的。
二、 Spring中对TaskExecutor的抽象
Spring 2.0版本中提供了一种新的处理执行器(executors)的抽象,即TaskExecutor接口。TaskExecutor接口与java.util.concurrent.Executor是等价的,其只有一个接口:
public interface TaskExecutor {
void execute(Runnable task);
}
实际上,TaskExecutor存在的主要原因是在使用线程池时抽象出对Java 5的需求。该接口具有单个方法execute(Runnable task),该方法基于线程池的语义和配置接受要执行的任务。
最初创建TaskExecutor是为了给其他Spring组件提供所需的线程池抽象。诸如ApplicationEventMulticaster,JMS的AbstractMessageListenerContainer和Quartz集成之类的组件都使用TaskExecutor抽象来池化线程。但是,如果您对线程池有定制需要,则可以根据自己的需要实现此抽象。
Spring框架本身内置了很多类型的TaskExecutor的实现,包含如下:
- SimpleAsyncTaskExecutor 这种TaskExecutor接口的实现不会复用线程,对应每个请求会新创建一个对应的线程来执行。但是,它确实支持并发限制,该限制将阻止任何超出限制的调用,这个可以通过调用setConcurrencyLimit方法来进行限制并发数,默认是不限制并发数。
- SyncTaskExecutor 这种TaskExecutor接口的实现不会异步的执行提交的任务,而是同步使用调用线程来执行,这种实现主要用于没有必要多线程进行处理的情况,比如进行简单的单元测试时候。
- ConcurrentTaskExecutor 这种TaskExecutor接口的实现是对JDK5中的 java.util.concurrent.Executor的一个包装,通过setConcurrentExecutor(Executor concurrentExecutor)接口可以设置一个JUC中的线程池到其内部来做适配。还有一个替代方案ThreadPoolTaskExecutor,它通过bean属性的方式配置Executor线程池的属性。一般会很少需要使用ConcurrentTaskExecutor,但如果ThreadPoolTaskExecutor不够健壮满足不了您的需求,则ConcurrentTaskExecutor是一种选择。
- SimpleThreadPoolTaskExecutor 这个实现实际上是Quartz的SimpleThreadPool的子类,它监听Spring的生命周期回调。当您有一个可能需要Quartz和非Quartz组件共享的线程池时,通常会使用此实现。
- ThreadPoolTaskExecutor 此实现只能在Java 5环境中使用,但也是该环境中最常用的实现。它公开了bean属性,用于配置java.util.concurrent.ThreadPoolExecutor并将其包装在TaskExecutor中。如果您需要一些高级的东西,例如ScheduledThreadPoolExecutor,建议您使用ConcurrentTaskExecutor。
- TimerTaskExecutor 此实现使用单个java.util.Timer对象作为其内部异步线程来执行任务。它与SyncTaskExecutor的不同之处在于该实现对所有提交的任务在Timer内的单独的线程中执行,尽管提交的多个任务的执行是顺序同步的。
如上,Spring框架本身提供了很多TaskExecutor的实现,但是如果不符合你的需要,你可以实现TaskExecutor接口来定制自己的执行器。
三 如何在Spring中使用异步执行
3.1 使用TaskExecutor实现异步执行
在Spring中TaskExecutor的实现类是以JavaBeans的方式提供服务的,比如下面例子,我们通过xml方式向Spring 容器中注入了TaskExecutor的实现者ThreadPoolTaskExecutor的实例:
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!--1. 核心线程个数-->
<property name="corePoolSize" value="5" />
<!--2.最大线程个数 -->
<property name="maxPoolSize" value="10" />
<!--3.超过核心线程个数的线程空闲多久被回收 -->
<property name="keepAliveSeconds" value="60" />
<!--4.缓存队列大小 -->
<property name="queueCapacity" value="20" />
<!--5.拒绝策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
- 如上代码我们向Spring容器中注入了一个ThreadPoolTaskExecutor处理器实例,其配置属性与Java并发包中的线程池ThreadPoolExecutor类似。
- 其中代码1,2我们设置了处理器中核心线程个数为5,最大线程个数为10
- 代码3设置线程池中非核心线程空闲60s后会被自动回收
- 代码4则设置了当线程池的阻塞队列的大小为20
- 代码5设置线程池的拒绝策略,这里设置为CallerRunsPolicy,意思是当线程池里面队列满了,并且所有线程都在忙碌时候,如果这时候向处理器提交了新的任务,则新的任务不再是异步执行,而是使用调用线程来执行。
当我们向Spring容器中注入了TaskExecutor的实例后,我们就可以在Spring容器中使用它:
<bean id="asyncExecutorExample"
class="com.jiaduo.async.AsyncProgram.AsyncExecutorExample">
<property name="taskExecutor" ref="taskExecutor" />
</bean>
- 如上代码通过xml方式我们向Spring 容器注入了AsyncExecutorExample的实例,并且其属性taskExecutor注入了上面我们创建名称为taskExecutor的执行器,下面我们看AsyncExecutorExample的代码:
public class AsyncExecutorExample {
private class MessagePrinterTask implements Runnable {
private String message;
public MessagePrinterTask(String message) {
this.message = message;
}
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " " + message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public TaskExecutor getTaskExecutor() {
return taskExecutor;
}
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
// 线程池执行器
private TaskExecutor taskExecutor;
public void printMessages() {
for (int i = 0; i < 6; i++) {
taskExecutor.execute(new MessagePrinterTask("Message" + i));
}
}
}
如上代码AsyncExecutorExample中有一个类型为TaskExecutor的属性,我们通过setter访问器注入了该属性,然后其有一个printMessages方法用来触发异步任务执行,这里异步任务被封装为了MessagePrinterTask,其run方法内先休眠1s模拟任务执行,然后打印输出。
下面我们看如何把上面内容组成可执行的程序,首先需要把上面两个xml配置汇总到beans.xml里面,其内容如下:
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd">
<bean id="taskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
...
</bean>
<bean id="asyncExecutorExample"
class="com.jiaduo.async.AsyncProgram.AsyncExecutorExample">
<property name="taskExecutor" ref="taskExecutor" />
</bean>
</beans>
然后我们需要编写测试代码如下:
public static void main(String arg[]) throws InterruptedException {
// 1.创建容器上下文
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(
new String[] { "beans.xml" });
// 2. 获取AsyncExecutorExample实例并调用打印方法
System.out.println(Thread.currentThread().getName() + " begin ");
AsyncExecutorExample asyncExecutorExample = applicationContext.getBean(AsyncExecutorExample.class);
asyncExecutorExample.printMessages();
System.out.println(Thread.currentThread().getName() + " end ");
}
- 如上代码1我们使用ClassPathXmlApplicationContext创建了一个 Spring容器上下文,并且以beans.xml 作为容器中bean的元数据。
- 代码2则从容器上下文中获取到AsyncExecutorExample的实例,并且调用了printMessages方法,由于printMessages方法内6个任务提交到了执行器线程进行处理,所以main函数所在线程调用printMessages方法后马上返回,然后具体任务的执行是执行器中线程。
- 运行上面代码,一个可能的输出为:
main begin
main end
taskExecutor-1 Message0
taskExecutor-3 Message2
taskExecutor-2 Message1
taskExecutor-5 Message4
taskExecutor-4 Message3
taskExecutor-1 Message5
可知具体任务是在执行器线程里面执行的,而不是main函数所在线程执行;运行上面代码后,虽然main函数所在线程会马上结束,并且异步任务也执行完了,但是JVM进程并没有退出,这是因为执行器ThreadPoolTaskExecutor中的线程都是用户线程而不是Deamon线程。而JVM退出的条件是进程中不含有任何用户线程,所以我们需要和使用Java并发包中的线程池一样,需要显示关闭线程池。
为此我们在AsyncExecutorExample中添加shutdown方法:
public void shutdown() {
if (taskExecutor instanceof ThreadPoolTaskExecutor) {
((ThreadPoolTaskExecutor) taskExecutor).shutdown();
}
}
然在测试类的main函数最后添加下面代码:
// 3.关闭执行器,释放线程
asyncExecutorExample.shutdown();
添加代码后,运行测试代码,却输出如下:
main begin
main end
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at com.jiaduo.async.AsyncProgram.AsyncExecutorExample$MessagePrinterTask.run(AsyncExecutorExample.java:17)
...
可知我们的任务都被中断了(因为我们任务中调用了sleep方法),这是因为默认情况下执行器ThreadPoolTaskExecutor中的变量waitForTasksToCompleteOnShutdown为false,意思是关闭执行器时候不等待正在执行的任务执行完毕就中断执行任务的线程。所以我们需要修改ThreadPoolTaskExecutor注入的配置如下:
<bean id="taskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
...
<property name="waitForTasksToCompleteOnShutdown"
value="true"></property>
</bean>
如上配置我们在注入ThreadPoolTaskExecutor的配置的属性最后添加了变量waitForTasksToCompleteOnShutdown为true的配置,然后我们运行测试类,就会发现等异步任务执行完毕后,当前jvm进程就不存在了,这说明我们的执行器被优雅退出了。
3.2 使用注解@Async实现异步执行
在Spring中可以在方法上添加@Async注释,以便异步执行该方法。换句话说,调用线程将在调用含有@Async注释的方法时立即返回,并且该方法的实际执行将发生在Spring的TaskExecutor异步处理器线程中。需要注意的是该注解@Async默认是不会解析的,你可以使用两种方式开开启该注解的解析:
- 基于xml配置Bean时候需要加入下面配置,才可以开启异步处理:
<task:annotation-driven />
- 在基于注解的情况下可以添加下面注解来启动异步处理:
@EnableAsync
下面我们看如何使用第一种方式开启并使用异步执行,首先我们需要在beans-annotation.xml里面配置如下:
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task.xsd">
<!--1.开启Async注解的解析 -->
<task:annotation-driven />
<!--2.注入业务Bean -->
<bean id="asyncCommentExample"
class="com.jiaduo.async.AsyncProgram.AsyncAnnotationExample">
</bean>
</beans>
如上代码1通过配置开启了对注解Async的解析,代码2注入了我们的业务Bean,其代码如下:
public class AsyncAnnotationExample {
@Async
public void printMessages() {
for (int i = 0; i < 6; i++) {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " Message" + i);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
如上代码printMessages方法上添加了@Async注解,方法内循环6次,循环内先让执行线程休眠1s,然后打印输出。
下面我们组合上面代码片段形成一个可执行程序进行测试,测试代码如下:
public static void main(String arg[]) throws InterruptedException {
// 1.创建容器上下文
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(
new String[] { "beans-annotation.xml" });
// 2. 获取AsyncAnnotationExample实例并调用打印方法
System.out.println(Thread.currentThread().getName() + " begin ");
AsyncAnnotationExample asyncCommentExample = applicationContext.getBean(AsyncAnnotationExample.class);
asyncCommentExample.printMessages();
System.out.println(Thread.currentThread().getName() + " end ");
}
如上代码1我们使用beans-annotation.xml作为容器Bean的元数据创建了Spring上下文,代码2从中获取AsyncAnnotationExample的实例,然后调用其printMessages,main线程调用该方法后该方法会马上返回,printMessages内的任务是执行是使用Spring框架内的默认执行器SimpleAsyncTaskExecutor中的线程来执行的。运行上面代码一个可能的输出结果如下:
main begin
main end
SimpleAsyncTaskExecutor-1 Message0
SimpleAsyncTaskExecutor-1 Message1
SimpleAsyncTaskExecutor-1 Message2
SimpleAsyncTaskExecutor-1 Message3
SimpleAsyncTaskExecutor-1 Message4
SimpleAsyncTaskExecutor-1 Message5
可知具体执行异步任务的是SimpleAsyncTaskExecutor中的线程,而不是main函数所在线程。当然我们可以指定我们自己的执行器来执行我们的异步任务,这需要我们在xml配置自己的执行器,代码如下:
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
...
<!--0.创建自己的业务线程池处理器 -->
<task:executor id="myexecutor" pool-size="5" />
<!--1.开启Async注解的解析 -->
<task:annotation-driven executor="myexecutor"/>
<!--2.注入业务Bean -->
<bean id="asyncCommentExample"
class="com.jiaduo.async.AsyncProgram.AsyncAnnotationExample">
</bean>
</beans>
如上代码0我们创建了自己的线程池处理器,代码1则把我们的线程池处理器作为我们异步任务的处理器,运行上面代码,则可以看到一个可能的输出结果如下:
main begin
main end
myexecutor-1 Message0
myexecutor-1 Message1
myexecutor-1 Message2
myexecutor-1 Message3
myexecutor-1 Message4
myexecutor-1 Message5
如上代码可知异步任务的执行是使用我们自己的线程池执行器执行的。
下面我们看第二种方式如何使用注解方式开启异步处理,首先我们需要在xml里面配置如下:
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task.xsd">
<!--1.扫描bean的包路径 -->
<context:component-scan
base-package="com.jiaduo.async.AsyncProgram" />
</beans
如上代码1我们配置了包扫描路径,框架会扫描该包下面还有@Component注解的Bean 到Spring容器。
然后我们要在com.jiaduo.async.AsyncProgram包下的AsyncAnnotationExample类上加上下面注解:
@EnableAsync//开启异步执行
@Component//注入该Bean到Spring容器
public class AsyncAnnotationExample {
@Async
public void printMessages() {
...
}
}
如上代码我们使用注解@EnableAsync开启异步执行。
另外需要注意的是@Async注解本身也是有参数的,比如我们可以在某一个需要异步处理的方法上加@Async注解时候指定使用哪个线程池处理器来进行异步处理:
@Async("bizExecutor")
void doSomething(String s) {
....
}
如上代码指定了方法doSomething使用名称为bizExecutor的线程池处理器来执行异步任务。
上面我们讲解的异步任务都是没有返回结果的,其实基于@Async注解的异步处理也支持返回值的,但是返回值类型必须是Future或者其子类类型的,比如返回的Future类型可以是普通的java.util.concurrent.Future类型,也可以是Spring框架的org.springframework.util.concurrent.ListenableFuture类型,或者JDK8中的java.util.concurrent.CompletableFuture类型,或者Spring中的AsyncResult类型等等。这提供了异步执行的好处,以便调用者可以在调用Future上的get()之前处理其他任务。
以下代码展示了AsyncAnnotationExample中方法doSomething如何在具有返回值的方法上使用注解@Async:
@Async
public CompletableFuture<String> doSomething() {
// 1.创建future
CompletableFuture<String> result = new CompletableFuture<String>();
// 2.模拟任务执行
try {
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + "doSomething");
} catch (Exception e) {
e.printStackTrace();
}
result.complete("done");
// 3.返回结果
return result;
}
如上代码1我们创建了一个CompletableFuture类型的Future的实例,代码2休眠5s模拟任务执行,然后设置Future的执行结果,代码3则返回Future对象。
下面我们修改我们的测试代码对其进行测试,代码如下:
public static void main(String arg[]) throws InterruptedException {
// 1.创建容器上下文
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(
new String[] { "beans-annotation.xml" });
// 2. 获取AsyncExecutorExample实例并调用打印方法
System.out.println(Thread.currentThread().getName() + " begin ");
AsyncAnnotationExample asyncCommentExample = applicationContext.getBean(AsyncAnnotationExample.class);
// 3.获取异步future并设置回调
CompletableFuture<String> resultFuture = asyncCommentExample.doSomething();
resultFuture.whenComplete(new BiConsumer<String, Throwable>() {
@Override
public void accept(String t, Throwable u) {
if (null == u) {
System.out.println(Thread.currentThread().getName() + " " + t);
} else {
System.out.println("error:" + u.getLocalizedMessage());
}
}
});
System.out.println(Thread.currentThread().getName() + " end ");
}
如上代码3main函数所在线程调用了AsyncAnnotationExample的doSomething方法,该方法会马上返回一个CompletableFuture,然后我们在其上设置了回调函数,然后main线程就退出了,最终doSomething方法内的代码是使用处理器线程池中的线程来执行的,并当执行完毕后回调我们设置的回调函数。
运行上面代码会输出如下:
main begin
main end
SimpleAsyncTaskExecutor-1doSomething
SimpleAsyncTaskExecutor-1 done
如上可知doSomething方法的执行是使用的SimpleAsyncTaskExecutor线程池处理器来执行的,而不是main函数所在线程。
最后我们看使用@Async注解时候当遇到异常时候该如何处理,当@Async方法具有Future类型返回值时,很容易管理在方法执行期间抛出的异常,因为会在调用get方法等待结果时抛出此异常。但是对于void返回类型,异常未被捕获且无法传输。这时候可以提供AsyncUncaughtExceptionHandler来处理此类异常。以下示例显示了如何执行此操作:
public class MyAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
// handle exception
}
}
然后我们在xml里面里面配置即可:
<task:annotation-driven
exception-handler="myAsyncUncaughtExceptionHandler" />
<bean id="myAsyncUncaughtExceptionHandler"
class="com.jiaduo.async.AsyncProgram.MyAsyncUncaughtExceptionHandler"></bean>
如上xml配置首先创建了实例myAsyncUncaughtExceptionHandler,然后设置到注解annotation-driven中,在异步任务中抛出异常时候会在MyAsyncUncaughtExceptionHandler的handleUncaughtException方法中得到处理。
如上可知基于@Async注解实现异步执行的方式时,大大简化了我们异步编程的心智负担,我们不必在显式的创建线程池并把任务手动提交到线程池内,而是直接在需要异步执行的方法上添加@Async注解即可,当然当我们需要使用自己的线程池来异步执行标注@Async的方法时,还是需要显式创建线程池的,但是这时并不需要显式提交任务到线程池。
4.总结
本节摘录自《Java异步编程实战》一书,讲解了如何使用Spring框架中@Async进行异步处理,可知使用@Async实现异步编程属于声明式编程,一般情况下不需要我们显式创建线程池并提交任务到线程池,这大大减轻了编程者的负担。另外《Java异步编程实战》一书中还含有剖析@Async注解异步执行原理的小节,欢迎大家翻看。
- 上一篇: Java异步编程指南
- 下一篇: Spring 异步调用,一行代码实现!舒服,不接受任何反驳
猜你喜欢
- 2024-11-19 你知道@Async 是怎么让方法异步执行的吗?
- 2024-11-19 Java 并发编程 11 - 异步执行框架 Executor
- 2024-11-19 Java中异步复用io
- 2024-11-19 JAVA异步方法笔记
- 2024-11-19 面试官:Redis分布式锁超时了,任务还没执行完怎么办?
- 2024-11-19 万能通用的异步实战方案,设计多线程,mq
- 2024-11-19 Java客户端Jedis 对Redis的几种调用方式包括事务、管道、分布式
- 2024-11-19 Dubbo 2.7新特性之异步化改造
- 2024-11-19 新手也能看懂的 SpringBoot 异步编程指南
- 2024-11-19 SpringBoot 异步编程
你 发表评论:
欢迎- 04-11Java面试“字符串三兄弟”String、StringBuilder、StringBuffer
- 04-11Java中你知道几种从字符串中找指定的字符的数量
- 04-11探秘Java面试中问的最多的String、StringBuffer、StringBuilder
- 04-11Python字符串详解与示例(python字符串的常见操作)
- 04-11java正则-取出指定字符串之间的内容
- 04-11String s1 = new String("abc");这句话创建了几个字符串对象?
- 04-11java判断字符串中是否包含某个字符
- 04-11关于java开发中正确的发牌逻辑编写规范
- 最近发表
-
- Java面试“字符串三兄弟”String、StringBuilder、StringBuffer
- Java中你知道几种从字符串中找指定的字符的数量
- 探秘Java面试中问的最多的String、StringBuffer、StringBuilder
- Python字符串详解与示例(python字符串的常见操作)
- java正则-取出指定字符串之间的内容
- String s1 = new String("abc");这句话创建了几个字符串对象?
- java判断字符串中是否包含某个字符
- 关于java开发中正确的发牌逻辑编写规范
- windows、linux如何后台运行jar(并且显示进程名)
- 腾讯大佬私人收藏,GitHub上最受欢迎的100个JAVA库,值得学习
- 标签列表
-
- nginx反向代理 (57)
- nginx日志 (56)
- nginx限制ip访问 (62)
- mac安装nginx (55)
- java和mysql (59)
- java中final (62)
- win10安装java (72)
- java启动参数 (64)
- java链表反转 (64)
- 字符串反转java (72)
- java逻辑运算符 (59)
- java 请求url (65)
- java信号量 (57)
- java定义枚举 (59)
- java字符串压缩 (56)
- java中的反射 (59)
- java 三维数组 (55)
- java插入排序 (68)
- java线程的状态 (62)
- java异步调用 (55)
- java中的异常处理 (62)
- java锁机制 (54)
- java静态内部类 (55)
- java怎么添加图片 (60)
- java 权限框架 (55)
本文暂时没有评论,来添加一个吧(●'◡'●)