上一篇:https://blog.csdn.net/LawssssCat/article/details/105169045
下一篇:https://blog.csdn.net/LawssssCat/article/details/105280108

上一章的代码可以在github获取 https://github.com/LawssssCat/v-security/tree/v1.2

概念

服务器(如tomcat)能异步处理为什么,为什么还要异步处理?

服务器(如tomcat)的线程数量是有限的,当线程数过多,多出的请求将不被处理。

<mark>为了加强服务器的吞吐量,需要我们提供异步处理服务</mark>

提供异步处理REST服务需要:

  • 使用 Runnable 异步处理 Rest 服务
  • 使用 DeferredResult 异步处理 Rest 服务
  • 异步处理配置

Callable (SpringMVC例子)

先看结果

我们通过Callable(java 提供的一个工具类),可以让耗时的主业务放在新开的线程中,而主线程不用阻塞便可返回。

原理:
servlet3.0 以后提供了对异步处理的支持(对java提供的Callable和Future两个工具类的支持)
springmvc 做了进一步封装

满足用户请求后,主线程很快结束,并开启其它线程处理任务,并将处理结果响应用户,而主线程就可以接收更多请求。

编写案例的 controller

用到了 java 提供的 concurrent utils

package cn.vshop.security.web.async;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.Callable;

/** * @author alan smith * @version 1.0 * @date 2020/4/1 22:54 */
@RestController
public class AsyncController {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @RequestMapping("/order")
    public Callable order() throws InterruptedException {
        logger.info("主线程开始");
        // 单开一个线程,处理业务逻辑
        // 这个线程在Spring环境下运行
        Callable<String> result = new Callable<String>() {
            @Override
            public String call() throws Exception {
                logger.info("@ @ 副线程开始");
                // 模拟业务逻辑的处理
                Thread.sleep(1000);
                logger.info("@ @ 副线程结束");
                return "success";
            }
        };
        logger.info("主线程返回");
        return result;
    }

}

测试脚本

直接 发送 get 请求即可

GET http://{{host}}/order
###

模拟消息队列

实际应用场景是这样的:

  • 请求被 应用1(tomcat) 接收
  • 应用1 把请求放到 消息队列
  • 真正为请求提供服务的 <mark>应用2</mark> 只要看到 消息队列 中有任务,便拿请求处理
  • <mark>应用2</mark> 处理完任务 把<mark>结果结果</mark> 放回 消息队列
  • 应用1 监听 消息队列 中 <mark>结果数据</mark> 的更新,有新结果,便封装成响应返回给前端即可。


这种去情况下,应用1(tomcat) 只需要两个线程(最少)便可无阻塞的为 n多个的请求作出响应。


由于重点不是消息队列的搭建(是后面要讲的 Spring Security),因此接下来的 消息队列服务和 应用2(上图)的服务均用采用模拟方式提供

DeferredResult

下面我们模拟搭建如上面消息队列一样的任务处理机制(消息队列是模拟的)

要用到的核心类时 DeferredResult,它是 springmvc 提供的,能完成如 Callable 一样的多线程回调功能

为什么是 DeferredResult ?

延迟的(Deferred)处理(Result)是 SpringMVC 提供的一个类,用以替代 Servlet3 之后的 Callable。
DeferredResult 比 Callable 更强大的地方在于:

修改 AsyncController

这里注入了两个自定义的类 : MockQueue 和 DeferredResultHolder
(分别对应着上图中的,消息队列,和应用1两个线程的资源仓库)

package cn.vshop.security.web.async;

import org.apache.commons.lang.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;

/** * @author alan smith * @version 1.0 * @date 2020/4/1 22:54 */
@RestController
public class AsyncController {

    private Logger logger = LoggerFactory.getLogger(getClass());

    /** * 模拟的消息队列 */
    @Autowired
    private MockQueue mockQueue;

    /** * 请求响应应用的结果存储器 */
    @Autowired
    private DeferredResultHolder deferredResultHolder;

    @RequestMapping("/order")
    public DeferredResult<String> order() throws InterruptedException {
        logger.info("主线程开始");

        /* 注释掉:用消息队列代替 // 单开一个线程,处理业务逻辑 // 这个线程在Spring环境下运行 Callable<String> result = new Callable<String>() { @Override public String call() throws Exception { logger.info("@ @ 副线程开始"); // 模拟业务逻辑的处理 Thread.sleep; logger.info("@ @ 副线程结束"); return "success"; } }; */

        // 生成一个8为随机数作为订单号
        String orderNumber = RandomStringUtils.randomNumeric(8);
        // 订单号放入消息队列
        mockQueue.setPlaceOrder(orderNumber);
        // 延迟(Deferred)处理结果
        DeferredResult<String> result = new DeferredResult<>();
        // 订单号和处理结果一同放进结果处理器的map中
        deferredResultHolder.getMap().put(orderNumber, result);

        logger.info("主线程返回");
        return result;
    }

}

应用1 的资源仓库

就是一个映射 名字和其 延迟结果集(DeferredResult) 的 map

package cn.vshop.security.web.async;

import lombok.Getter;
import lombok.Setter;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.async.DeferredResult;

import java.util.HashMap;
import java.util.Map;

/** * 请求处理应用(tomcat)在请求和响应中传递 DefferredResult 的订单存储器 * * @author alan smith * @version 1.0 * @date 2020/4/2 1:22 */
@Component
@Getter
@Setter
public class DeferredResultHolder {

    /** * key 订单号 * DeferredResult<String> 订单处理结果 */
    private Map<String, DeferredResult<String>> map = new HashMap<>();

}

MockQueue

模拟消息队列,其中的异步线程是模拟消息队列后的业务处理应用(对应上图的 应用2)

package cn.vshop.security.web.async;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/** * 模拟消息队列 * * @author alan smith * @version 1.0 * @date 2020/4/2 1:16 */
@Slf4j
@Getter
@Setter
@Component
public class MockQueue {

    /** * 代表下单消息 */
    private String placeOrder;

    /** * 代表订单完成消息 */
    private String completeOrder;

    /** * 模拟 * 消息中间件到业务处理应用 * 再到收到返回消息的过程 */
    public void setPlaceOrder(String completeOrder) {

        // 新开一个线程,模拟被处理程序处理
        new Thread(() -> {
            log.info("接到下单请求:{}", completeOrder);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.completeOrder = completeOrder;
            log.info("下单请求处理完毕");
        }).start();
    }

}

消息队列结果***

最后还需要一个线程(应用1的线程2)来监听消息队列的结果,好让结果能返回回前端

package cn.vshop.security.web.async;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

/** * 请求响应进程的消息队列***, * 当监听到消息队列中有处理好的结果,便把结果封装交给SpringMVC,最终返回给前端 * <p> * ApplicationListener 是 spring 上下文(context)的***接口 * ContextRefreshedEvent 容器初始化完毕的事件,监听这个事件相当于指定当程序启动起来后,我们需要做的事情 * * @author alan smith * @version 1.0 * @date 2020/4/2 10:43 */
@Slf4j
@Component
public class QueueListener implements ApplicationListener<ContextRefreshedEvent> {

    // 消息队列(自定义的、模拟的)
    @Autowired
    private MockQueue mockQueue;

    // 请求和响应间的订单存储器
    @Autowired
    private DeferredResultHolder deferredResultHolder;

    /** * 事件触发后执行代码 * * @param event 事件,这里是 ContextRefreshedEvent */
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {

        // 因为是死循环,因此必须新开一个线程,否则会阻塞程序的启动
        new Thread(() -> {
            while (true) {
                if (StringUtils.isNotBlank(mockQueue.getCompleteOrder())) {
                    // 如果订单完成字段有值,那么就需要做出一些处理
                    String orderNumber = mockQueue.getCompleteOrder();
                    log.info("返回订单处理结果:{}", orderNumber);
                    // 模拟处理的结果为 "place order success"
                    deferredResultHolder.getMap().get(orderNumber).setResult("place order success");

                    mockQueue.setCompleteOrder(null);
                } else {
                    // 如果没有,就等待一段时间,再查看
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }
        }).start();

    }
}

测试脚本

GET http://{{host}}/order
###

测试结果


引入第三方异步支持

最后,如果使用到第三方的异步支持,可以在 WebMvcConfigurerAdapter 的实现类里面配置
(如下)

package cn.vshop.security.web.config;


@Configuration
// 继承 WebMvc注册类的适配:WebMvcConfigurerAdapter
public class WebConfig extends WebMvcConfigurerAdapter {

	// 省略

    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        // 选择1 用 Callable 做异步
        // configurer.registerCallableInterceptors();
        // 选择2 用 DeferredResult 做异步
        // configurer.registerDeferredResultInterceptors()

        // 设置超时时间
        // configurer.setDefaultTimeout()

        // 用于自定义线程池
        // configurer.setTaskExecutor()
    }
}

至此代码可以在github获取 https://github.com/LawssssCat/v-security/tree/v1.3