上一篇:https://blog.csdn.net/LawssssCat/article/details/105169045
下一篇:https://blog.csdn.net/LawssssCat/article/details/105280108
上一章的代码可以在github获取 https://github.com/LawssssCat/v-security/tree/v1.2
概念
服务器(如tomcat)能异步处理为什么,为什么还要异步处理?
服务器(如tomcat)的线程数量是有限的,当线程数过多,多出的请求将不被处理。
<mark>为了加强服务器的吞吐量,需要我们提供异步处理服务</mark>
提供异步处理REST服务需要:
- 使用 Runnable 异步处理 Rest 服务
- 使用 DeferredResult 异步处理 Rest 服务
- 异步处理配置
Callable (SpringMVC例子)
先看结果
我们通过Callable(java 提供的一个工具类),可以让耗时的主业务放在新开的线程中,而主线程不用阻塞便可返回。
原理:
servlet3.0
以后提供了对异步处理的支持(对java提供的Callable和Future两个工具类的支持)
springmvc
做了进一步封装
- springmvc 的处理参考 Sping :官方文档
- Callable 参考:Java并发编程:Callable、Future和FutureTask
- 扩展关键词:JUC、CompletionService
.满足用户请求后,主线程很快结束,并开启其它线程处理任务,并将处理结果响应用户,而主线程就可以接收更多请求。
编写案例的 controller
用到了 java 提供的 concurrent utils
package cn.vshop.security.web.async;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Callable;
/** * @author alan smith * @version 1.0 * @date 2020/4/1 22:54 */
@RestController
public class AsyncController {
private Logger logger = LoggerFactory.getLogger(getClass());
@RequestMapping("/order")
public Callable order() throws InterruptedException {
logger.info("主线程开始");
// 单开一个线程,处理业务逻辑
// 这个线程在Spring环境下运行
Callable<String> result = new Callable<String>() {
@Override
public String call() throws Exception {
logger.info("@ @ 副线程开始");
// 模拟业务逻辑的处理
Thread.sleep(1000);
logger.info("@ @ 副线程结束");
return "success";
}
};
logger.info("主线程返回");
return result;
}
}
测试脚本
直接 发送 get 请求即可
GET http://{{host}}/order
###
模拟消息队列
实际应用场景是这样的:
- 请求被
应用1
(tomcat) 接收 应用1
把请求放到 消息队列 中- 真正为请求提供服务的 <mark>应用2</mark> 只要看到 消息队列 中有任务,便拿请求处理
- <mark>应用2</mark> 处理完任务 把<mark>结果结果</mark> 放回 消息队列
应用1
监听 消息队列 中 <mark>结果数据</mark> 的更新,有新结果,便封装成响应返回给前端即可。
这种去情况下,应用1(tomcat) 只需要两个线程(最少)便可无阻塞的为 n多个的请求作出响应。
由于重点不是消息队列的搭建(是后面要讲的 Spring Security),因此接下来的 消息队列服务和 应用2(上图)的服务均用采用模拟方式提供
DeferredResult
下面我们模拟搭建如上面消息队列一样的任务处理机制(消息队列是模拟的)
要用到的核心类时 DeferredResult,它是 springmvc 提供的,能完成如 Callable 一样的多线程回调功能
为什么是 DeferredResult ?
延迟的(Deferred)处理(Result)是 SpringMVC 提供的一个类,用以替代 Servlet3 之后的 Callable。
DeferredResult 比 Callable 更强大的地方在于:
- 能够管理我们的线程
参考:https://www.cnblogs.com/aheizi/p/5659030.html
修改 AsyncController
这里注入了两个自定义的类 : MockQueue 和 DeferredResultHolder
(分别对应着上图中的,消息队列,和应用1两个线程的资源仓库)
package cn.vshop.security.web.async;
import org.apache.commons.lang.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
/** * @author alan smith * @version 1.0 * @date 2020/4/1 22:54 */
@RestController
public class AsyncController {
private Logger logger = LoggerFactory.getLogger(getClass());
/** * 模拟的消息队列 */
@Autowired
private MockQueue mockQueue;
/** * 请求响应应用的结果存储器 */
@Autowired
private DeferredResultHolder deferredResultHolder;
@RequestMapping("/order")
public DeferredResult<String> order() throws InterruptedException {
logger.info("主线程开始");
/* 注释掉:用消息队列代替 // 单开一个线程,处理业务逻辑 // 这个线程在Spring环境下运行 Callable<String> result = new Callable<String>() { @Override public String call() throws Exception { logger.info("@ @ 副线程开始"); // 模拟业务逻辑的处理 Thread.sleep; logger.info("@ @ 副线程结束"); return "success"; } }; */
// 生成一个8为随机数作为订单号
String orderNumber = RandomStringUtils.randomNumeric(8);
// 订单号放入消息队列
mockQueue.setPlaceOrder(orderNumber);
// 延迟(Deferred)处理结果
DeferredResult<String> result = new DeferredResult<>();
// 订单号和处理结果一同放进结果处理器的map中
deferredResultHolder.getMap().put(orderNumber, result);
logger.info("主线程返回");
return result;
}
}
应用1 的资源仓库
就是一个映射 名字和其 延迟结果集(DeferredResult) 的 map
package cn.vshop.security.web.async;
import lombok.Getter;
import lombok.Setter;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.HashMap;
import java.util.Map;
/** * 请求处理应用(tomcat)在请求和响应中传递 DefferredResult 的订单存储器 * * @author alan smith * @version 1.0 * @date 2020/4/2 1:22 */
@Component
@Getter
@Setter
public class DeferredResultHolder {
/** * key 订单号 * DeferredResult<String> 订单处理结果 */
private Map<String, DeferredResult<String>> map = new HashMap<>();
}
MockQueue
模拟消息队列,其中的异步线程是模拟消息队列后的业务处理应用(对应上图的 应用2)
package cn.vshop.security.web.async;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/** * 模拟消息队列 * * @author alan smith * @version 1.0 * @date 2020/4/2 1:16 */
@Slf4j
@Getter
@Setter
@Component
public class MockQueue {
/** * 代表下单消息 */
private String placeOrder;
/** * 代表订单完成消息 */
private String completeOrder;
/** * 模拟 * 消息中间件到业务处理应用 * 再到收到返回消息的过程 */
public void setPlaceOrder(String completeOrder) {
// 新开一个线程,模拟被处理程序处理
new Thread(() -> {
log.info("接到下单请求:{}", completeOrder);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.completeOrder = completeOrder;
log.info("下单请求处理完毕");
}).start();
}
}
消息队列结果***
最后还需要一个线程(应用1的线程2)来监听消息队列的结果,好让结果能返回回前端
package cn.vshop.security.web.async;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
/** * 请求响应进程的消息队列***, * 当监听到消息队列中有处理好的结果,便把结果封装交给SpringMVC,最终返回给前端 * <p> * ApplicationListener 是 spring 上下文(context)的***接口 * ContextRefreshedEvent 容器初始化完毕的事件,监听这个事件相当于指定当程序启动起来后,我们需要做的事情 * * @author alan smith * @version 1.0 * @date 2020/4/2 10:43 */
@Slf4j
@Component
public class QueueListener implements ApplicationListener<ContextRefreshedEvent> {
// 消息队列(自定义的、模拟的)
@Autowired
private MockQueue mockQueue;
// 请求和响应间的订单存储器
@Autowired
private DeferredResultHolder deferredResultHolder;
/** * 事件触发后执行代码 * * @param event 事件,这里是 ContextRefreshedEvent */
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// 因为是死循环,因此必须新开一个线程,否则会阻塞程序的启动
new Thread(() -> {
while (true) {
if (StringUtils.isNotBlank(mockQueue.getCompleteOrder())) {
// 如果订单完成字段有值,那么就需要做出一些处理
String orderNumber = mockQueue.getCompleteOrder();
log.info("返回订单处理结果:{}", orderNumber);
// 模拟处理的结果为 "place order success"
deferredResultHolder.getMap().get(orderNumber).setResult("place order success");
mockQueue.setCompleteOrder(null);
} else {
// 如果没有,就等待一段时间,再查看
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
测试脚本
GET http://{{host}}/order
###
测试结果
引入第三方异步支持
最后,如果使用到第三方的异步支持,可以在 WebMvcConfigurerAdapter 的实现类里面配置
(如下)
package cn.vshop.security.web.config;
@Configuration
// 继承 WebMvc注册类的适配:WebMvcConfigurerAdapter
public class WebConfig extends WebMvcConfigurerAdapter {
// 省略
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
// 选择1 用 Callable 做异步
// configurer.registerCallableInterceptors();
// 选择2 用 DeferredResult 做异步
// configurer.registerDeferredResultInterceptors()
// 设置超时时间
// configurer.setDefaultTimeout()
// 用于自定义线程池
// configurer.setTaskExecutor()
}
}
至此代码可以在github获取 https://github.com/LawssssCat/v-security/tree/v1.3