一、异步通讯和消息队列

1.同步通讯

(1)不同微服务间的通讯有同步和异步两种方式:
  • 同步通讯:就像打电话,需要实时响应,时效性强。
  • 异步通讯:就像发邮件,不需要马上回复。
(2)同步通讯的缺点
  • 耦合度高
  • 性能和吞吐能力下降:调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和
  • 资源浪费:调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源。
  • 级联失败问题:如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致整个微服务群故障。

2.异步通讯

        异步调用可以避免上述同步调用的问题。
        
        以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,调用仓储服务,从仓库分配响应的库存并准备发货。在事件驱动模式中,支付服务是事件发布者(publisher),在支付完成后只需要发布一个支付成功的事件(event),事件中带上订单id。订单服务和仓储服务是事件订阅者(Consumer),订阅支付成功的事件,监听事件完成自己业务即可。 为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)。发布者发布事件到Broker,不关心谁来订阅事件。订阅者从Broker订阅事件,不关心谁发来的消息。
(1)优点
  • 吞吐量提升:无需等待订阅者处理完成,响应更快速
  • 故障隔离:服务没有直接调用,不存在级联失败问题
  • 调用间没有阻塞,不会造成无效的资源占用
  • 耦合度极低,每个服务都可以灵活插拔、可替换
  • 流量削峰:不管发布事件的流量波动多大,都由 Broker 接收,订阅者可以按照自己的速度去处理事件
(2)缺点
  • 架构复杂了,业务没有明显的流程线,不好管理
  • 需要依赖于 Broker 的可靠、安全、性能

3.消息队列MQ

        消息队列MQ(MessageQueue),就是事件驱动架构中的 Broker
        常见的 MQ 实现:ActiveMQ、RabbitMQRocketMQKafka
RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP、XMPP、SMTP、STOMP OpenWire、STOMP、REST、XMPP、AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

4.RabbitMQ——消息中间件

(1)安装RabbitMQ

        在 Centos7 虚拟机中使用 Docker 来安装RabbitMQ。
  • 获取mq镜像:
    • 方式一:在线拉取mq镜像
      docker pull rabbitmq:3-management
    • 方式二:从本地上传mq.tar包并加载
      docker load -i mq.tar
  • 安装并运行mq容器
    docker run \
     -e RABBITMQ_DEFAULT_USER=itcast \  
     -e RABBITMQ_DEFAULT_PASS=123321 \
     --name mq \ # 容器名
     --hostname mq1 \ # 配置主机名,集群时会用到
     -p 15672:15672 \ # RabbitMQ管理平台端口,提供一个Ui界面方便我们管理mq
     -p 5672:5672 \ # 消息通讯端口
     -d \
     rabbitmq:3-management # 基于该镜像创建容器
  • 访问测试:http://192.168.152.100:15672

(2)MQ 的基本结构

        
  • exchange:交换机,负责消息路由
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的 exchange、queue、消息

(3)RabbitMQ常见消息模型

        RabbitMQ 官方提供了 5 个不同的 Demo 示例,对应了不同的消息模型。
        

(4)HelloWorld案例——简单队列模型

        官方的 HelloWorld 是基于最基础的消息队列模型来实现的,只包括三个角色:
         
  • publisher:消息发布者,将消息发送到队列queue
  • queue:消息队列,负责接受并缓存消息
  • consumer:订阅队列,处理队列中的消息
        1)简单队列模型代码实现
                根据资料提供的mq-demo项目学习简单队列模型:
                
  • publisher的实现
    public class PublisherTest {
        @Test
        public void testSendMessage() throws IOException, TimeoutException {
            // 1.与消息队列建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1.设置连接参数,分别是:主机名、消息通讯端口号、虚拟主机vhost、用户名、密码
            factory.setHost("192.168.152.100");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setUsername("itcast");
            factory.setPassword("123321");
            // 1.2.建立连接
            Connection connection = factory.newConnection();
    
            // 2.创建通道Channel,基于channel向队列发送消息
            Channel channel = connection.createChannel();
    
            // 3.创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);
    
            // 4.发送消息
            String message = "hello, rabbitmq!";
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("发送消息成功:【" + message + "】");
    
            // 5.关闭通道和连接
            channel.close();
            connection.close();
        }
    }
  • consumer的实现
    public class ConsumerTest {
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1.建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
            factory.setHost("192.168.152.100");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setUsername("itcast");
            factory.setPassword("123321");
            // 1.2.建立连接
            Connection connection = factory.newConnection();
    
            // 2.创建通道Channel
            Channel channel = connection.createChannel();
    
            // 3.创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);
    
            // 4.订阅消息
            channel.basicConsume(queueName, true, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 5.处理消息
                    String message = new String(body);
                    System.out.println("接收到消息:【" + message + "】");
                }
            });
            System.out.println("等待接收消息。。。。");
        }
    }
        2)简单消息队列消息发送和接收流程
            发送:
  • 与mq建立connection
  • 创建channel
  • 利用channel声明队列
  • 利用channel向队列发送消息
            接收:
  • 与mq建立connection
  • 创建channel
  • 利用channel声明队列
  • 定义consumer的消费行为handleDelivery()
  • 利用channel将消费者与队列绑定

二、SpringAMQP

        
        SpringAMQP 是基于 RabbitMQ 封装的一套模板,并且还利用 SpringBoot 对其实现了自动装配,使用方便。
        SpringAMQP 的官方地址:https://spring.io/projects/spring-amqp

1.SpringAMQP的功能

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了 RabbitTemplate 工具,用于发送消息

2.简单消息队列(Basic Queue)

        使用AMQP实现简单消息队列中publisher向simple.queue(把这个队列提前创建好)发送消息,consumer从simple.queue监听消息并接收。
      (springboot整合amqp)
      (amqp的使用)

  • 导入坐标因为publisher和consumer都需要AMQP依赖,所以直接在父工程中引入AMQP依赖——spring-boot-starter-amqp
    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  • 配置MQ信息在 publisher、consumer 服务中的 application.yml配置与虚拟主机的连接信息
    spring:
      rabbitmq:
        host: 192.168.152.100 # 主机名
        port: 5672 # 消息通讯端口
        virtual-host: / # 虚拟主机
        username: itcast # 用户名(默认guest)
        password: 123321 # 密码(默认guest)
    【tips】可以看出,使用AMQP时不需要自己写代码与mq建立连接,只需要配置mq信息,AMQP会自动建立mq连接。
  • 开启rabbitmq功能:在启动类上用@EnableRabbit注解开启RabbitMQ功能。
  • 使用AmqpAdmin创建exchange、queue、binding
    • 创建exchange
    • 创建queue

    • 绑定exchange和queue——binding
  • publisher发送消息使用RabbitTemplate向simple.queue队列发送消息message
    @SpringBootTest
    public class AMQP_PublisherTest {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void sendMessage() {
            String queueName = "simple.queue";
            String message = "hello AMQP";
            //向simple.queue发送message
            rabbitTemplate.convertAndSend(queueName,message);
        }
    }
  • consumer监听消息在 consumer 服务中添加监听队列的类,用@RabbitListener注解配置监听信息
    /*
        consumer监听类
     */
    @Component
    public class MQListener {
        /**
         * 监听队列的方法
         * @param message ★从队列中接收到的消息
         * @throws InterruptedException
         */
        @RabbitListener(queues = "simple.queue") //指定要监听的队列为simple.queue
        public void listenMQ(String message) throws InterruptedException{
            System.out.println("consumer接收到消息:" + message);
        }
    }

★@RabbitListener与@RabbitHandler的比较?

        ——二者都是用来监听消息队列的。
  • @RabbitListener可以标在类或方法上,@RabbitHandler可以标在方法上:
    • @RabbitListener标在方法上,指定该方法监听某个mq,此时方法接收的参数必须与发送的消息类型一致
    • @RabbitListener标在类上,需要配合@RabbitHandler一起使用:@RabbitListener 标注在类上,当收到消息的时候,就交给@RabbitHandler标注的方法处理,根据接收的参数类型进入不同方法进行处理中(方法重载)。这样就可以通过重载来区分不同类型的消息

3.工作队列(Work Queue)

(1)简介

        
       工作队列(Work queues):也被称为任务模型(Task queues)。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息
        在简单队列模型中,当消息处理比较耗时的时候,可能消息的产生速度会远远大于消费速度。长此以往,消息就会堆积越来越多,无法及时处理。 此时就可以使用 work queue模型,多个消费者共同处理消息处理,速度就能大大提高了。

(2)AMQP模拟实现工作队列

  • 消息发送:循环发送消息,模拟大量消息堆积现象,在 publisher 服务的测试类中添加一个测试方法:
    @Test
    public void sendMessage2WorkQueue() throws InterruptedException {
        String queueName = "simple.queue";
        String message = "hello AMQP-";
        //模拟1s内发送50条消息
        for (int i = 1; i <= 50; i++) {
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20);
        }
    }
  • 消息接收:模拟多个消费者绑定同一个队列,在 consumer 服务的 Listener类 中添加新方法:
    /**
     *模拟两个不同处理速度的消费者
     */
    @RabbitListener(queues = "simple.queue") //指定要监听的队列为simple.queue
    public void listenWorkQueue1(String message) throws InterruptedException{
        System.out.println("consumer1接收到消息:" + message);
        Thread.sleep(20);
    }
    
    @RabbitListener(queues = "simple.queue") //指定要监听的队列为simple.queue
    public void listenWorkQueue2(String message) throws InterruptedException{
        System.err.println("consumer2接收到消息:" + message);
        Thread.sleep(200);
    }
  • ★结果:启动 ConsumerApplication 后,再启动 publisher 服务中发送测试方法。可以看到consumer1很快完成了自己的25条消息,而consumer2却在缓慢的处理自己的25条消息。 也就是说此时消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这是因为 RabbitMQ 默认有一个消息预取机制,该机制让每个consumer轮流获取消息,不管他们处理的快慢,而我们希望消息处理快的consumer1多处理,consumer2少处理。因此可以通过限制每次只能取一条消息,处理完才能去下一条消息,就可以解决这个问题。
  • 通过配置消费者的yml:加一个  prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息  配置就能实现  每次只能取一条消息,处理完才能去下一条消息  。 
    spring:
      rabbitmq:
        host: 192.168.152.100 # 主机名
        port: 5672 # 消息通讯端口
        virtual-host: / # 虚拟主机
        username: itcast # 用户名
        password: 123321 # 密码
        listener:
          simple:
            prefetch: 1 # 规定每个消费者每次预处理只能获取一条消息,处理完才能获取下一个消息

4.发布订阅(Publish/Subscribe)模式

(1)简介

        
        在简单队列和工作队列中,由于RabbitMQ阅完既焚的特点,所以多个消费者无法获得同一消息,而发布订阅模式解决了这个问题。
        1)在发布订阅模型中,多了一个 exchange,而且过程略有变化:
  • Publisher:发送消息不再发送到队列中,而是发给 exchange(交换机)
  • Consumer:与以前一样,订阅队列,没有变化
  • Queue:消息队列也与以前一样,接收消息、缓存消息
  • Exchange:交换机,一方面,接收publisher发送的消息;另一方面,该知道如何处理消息。例如把消息递交给某个队列还是递交给所有队列,或是将消息丢弃。到底如何操作,取决于 Exchange 的类型
  • Exchange的作用:
    • 接收 publisher 发送的消息
    • 将消息按照规则路由到与之绑定的队列
    • 不能缓存消息,路由失败则消息丢失

        2)★exchange的类型

        exchange 有以下3种类型
  • Fanout Exchange:广播,将消息交给所有绑定到交换机的队列
    • 可以有多个队列
    • 每个队列都要绑定到 Exchange(交换机)
    • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
    • 交换机把消息发送给所有绑定过的队列
    • 订阅队列的消费者都能拿到消息
  • Direct Exchange定向,把消息交给符合指定 routing key 的队列
    • 队列与交换机的绑定,不再是任意绑定,而是要指定一个RoutingKey(路由key)
    • publisher 向 Exchange 发送消息时,也必须指定消息的 RoutingKey
    • Exchange 不再把消息交给每一个与之绑定的队列,而是根据消息的RoutingKey进行判断,将消息路由到【所有】相同RoutingKey的队列
  • Topic Exchange通配符,把消息交给符合 routing pattern(路由模式)的队列
【tips】Exchange只负责转发,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

(2)Fanout Exchange发布订阅模式案例

        
        如图,模拟两个consumer分别从各自订阅的队列中接收同一publisher发送的消息。
  • 在 consumer 服务中,创建一个配置类,用来声明队列、交换机、绑定(binding)对象——用来将队列和交换机绑定起来
    @Configuration
    public class FanoutConfig {
        //声明队列1、2
        @Bean
        public Queue fanoutQueue1() {
            return new Queue("fanout-queue1");
        }
        @Bean
        public Queue fanoutQueue2() {
            return new Queue("fanout-queue2");
        }
    
        //声明fount exchange
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanout-exchange");
        }
    
        //声明绑定对象:有几个队列就有几个绑定对象
        //将队列1与交换机绑定
        @Bean
        public Binding bindingQueue1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {
            return BindingBuilder
                    .bind(fanoutQueue1)//将fanoutQueue1绑定
                    .to(fanoutExchange);//到(to)fanoutExchange上
        }
        //将队列2与交换机绑定
        @Bean
        public Binding bindingQueue2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {
            return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
        }
    }
  • 在 consumer 服务的监听类中编写两个消费者方法,分别监听 fanout-queue1 和 fanout-queue2
    //consumer1监听fanount-queue1接收消息
    @RabbitListener(queues = "fanout-queue1")
    public void listenFanoutQueue1(String message){
        System.out.println("consumer1接收到fanout-queue1的消息:" + message);
    }
    //consumer2监听fanount-queue2接收消息
    @RabbitListener(queues = "fanout-queue2")
    public void listenFanoutQueue2(String message){
        System.out.println("consumer2接收到fanout-queue2的消息:" + message);
    }
  • 在 publisher 中编写测试方法,向 fanout-exchange发送消息
    @Test
    public void sendMessage2FanoutQueue() throws InterruptedException {
        String exchangeName = "fanout-exchange";
        String message = "hello fanoutQueue";
        //publisher发消息给exchange而不是queue
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }
  • 结果:

(3)Direct Exchange发布订阅模式案例

        
        在 Fanout 模式中,一条消息会被所有订阅的队列都消费。但是,在某些场景下,我们希望消息被路由到指定队列
  • consumer声明并监听:Fanout中采用Bean的方式在配置类中声明队列、交换机、绑定对象;这里采用@RabbitListener注解监听Listener类中声明队列、交换机、绑定对象及routingkey
    /*
    采用注解声明队列、交换机、routingkey并监听队列
     */
    @RabbitListener(bindings = @QueueBinding(//声明绑定对象
            value = @Queue(name="direct.queue1"),//声明队列
            exchange = @Exchange(name="itcast.direct",type = ExchangeTypes.DIRECT),//声明交换机及其类型(默认direct)
            key ={"blue","red"}//声明routingkey
        ))
    public void listenDirectQueue1(String message){
        System.out.println("consumer1接收到direct.queue1的消息:" + message);
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name="direct.queue2"),
            exchange = @Exchange(name="itcast.direct",type = ExchangeTypes.DIRECT),
            key ={"yellow","red"}
    ))
    public void listenDirectQueue2(String message){
        System.out.println("consumer2接收到direct.queue2的消息:" + message);
    }
  • publisher发送消息:在 publisher 中编写测试方法,向 itcast.direct交换机发送消息
    @Test
    public void sendMessage2DirectQueue() throws InterruptedException {
        String exchangeName = "itcast.direct";
        String message = "hello,blue!";
        //publisher发消息给exchange并指定routingkey
        rabbitTemplate.convertAndSend(exchangeName, "blue", message);
    }
    【tips】在direct模式中,当每个队列的routingkey相同时,就变成了fonout模式(发给每个队列)。

(4)Topic Exchange发布订阅模式案例

        
        Topic 与 Direct相比,都可以根据RoutingKey把消息路由到不同的队列,但Topic 类型可以让队列在绑定Routing key 的时候使用通配符(#、*)。
        通配符规则: #:匹配一个或多个词 
                               *:只能匹配一个词
        Topic交换机Direct交换机的差异?
  • Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符

5.AMQP消息转换器(MessageConverter)

(1)默认JDK序列化

        如果publisher发送的消息是Java对象,该对象必须实现Serializable,Spring 会把消息序列化为字节发送给 MQ,consumer接收消息时,需要再把字节反序列化为 Java 对象
        默认情况下 Spring 采用的序列化方式是 JDK 序列化。可以通过代码验证一下:
  • publisher:
    @Test
    public void sendMessage2Object() throws InterruptedException {
        String queue = "object.queue";
        Map<String, Object> message = new HashMap<>();
        message.put("name", "zhangsan");
        message.put("age", 22);
        rabbitTemplate.convertAndSend(queue, message);
    }
  • consumer:
    @RabbitListener(queuesToDeclare = @Queue(value = "object.queue"))
    public void listenObjectQueue(Map<String,Object> message) {
        System.out.println("object收到消息:" + message);
    }
  • 控制台输出:
  • UI收到的消息:
    
        可以看到,控制台输出的是反序列化以后的Java对象,而在UI控制台收到的却是一串字节。因此验证了上面的说法。而JDK序列化存在以下问题:
  • 数据体积过大
  • 有安全漏洞(注入问题)
  • 可读性差

(2)★ JSON序列化——Jackson2JsonMessageConverter

        通过使用JSON序列化来解决JDK序列化带来的问题。
  • 在父工程中导入json依赖(因为publisher和consumer都需要该依赖)
    <dependency>
        <groupId>com.fasterxml.jackson.dataformat</groupId>
        <artifactId>jackson-dataformat-xml</artifactId>
        <version>2.9.10</version>
    </dependency>
  • ★配置消息转换器:在二者各自的启动类中添加一个 Bean
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
  • 结果:
        

三、分布式搜索ElasticSearch

1.倒排索引

(1)正向索引
        倒排索引的概念是相较于像MySQL 这样的正向索引而言的。
        例如给下表(tb_goods)中的 id 创建索引,根据 id 查询,那么直接走索引,查询速度非常快。 但如果是基于 title 做模糊查询,只能是逐行扫描数据全表扫描,随着数据量增加,其查询效率也会越来越低。流程如下:
        
  • 用户根据条件搜索数据,条件是 title 符合 "%手机%"
  • 逐行获取数据,比如 id 为 1 的数据
  • 判断数据中的 title 是否符合用户搜索条件
  • 如果符合则放入结果集;不符合则丢弃,然后回到步骤1

(2)倒排索引

        1)倒排索引中有两个非常重要的概念:
  • 文档(Document):搜索到的数据中的每一条数据就是一个文档。例如一个网页、一个商品信息。
  • 词条(Term):对文档数据或用户搜索数据,利用某种算法分词,得到的具备含义的词语就是词条。例如:我是中国人,就可以分为:我、是、中国人、中国、国人这样的几个词条。
        2)创建倒排索引是对正向索引的一种特殊处理,流程和示意图如下:
  • 将每一个文档的数据利用算法分词,得到一个个词条
  • 创建表,每行数据包括词条、词条所在文档 id、位置等信息
  • 因为词条唯一性,可以给词条创建索引,例如 hash 表结构索引
        

        3)倒排索引的搜索流程如下(以搜索"华为手机"为例):

  • 用户输入条件"华为手机"进行搜索
  • 对用户输入的条件分词,得到词条:华为、手机
  • 拿着词条在倒排索引中查找,可以得到包含词条的文档 id 有 1、2、3
  • 拿着文档 id 到正向索引中查找具体文档
        
        4)二者的比较
        虽然要先查询倒排索引,再查询正向索引,看似两次查询操作,比单纯正向索引要多查一次,但是词条和文档id 都建立了索引,查询速度非常快,无需全表扫描。 
        为什么一个叫做正向索引,一个叫做倒排索引呢? 
        ——正向索引是最传统的根据 id 索引的方式。但根据词条查询时,必须先逐条获取每个文档,然后判断文档中是否包含所需要的词条,是根据文档找词条的过程。
        ——倒排索引则相反,是先找到用户要搜索的词条,根据得到的文档 id 获取该文档。是根据词条找文档的过程。

2.ES与MySQL概念比较

(1)文档和字段

        
        es 是面向文档(Document)存储的,文档可以是数据库中的一条商品数据,一个订单信息。文档数据会被序列化为【 json 格式】存储在 es 中。
        JSON 文档中往往包含很多的字段(Field),类似于数据库中的

(2)索引和映射

        索引(Index):就是相同类型的文档的【集合】,可以把索引当做是数据库中的表 例如: 所有用户文档,就可以组织在一起,称为用户的索引; 所有商品的文档,可以组织在一起,称为商品的索引; 所有订单的文档,可以组织在一起,称为订单的索引。 
        数据库的表会有约束信息,用来定义表的结构、字段的名称、类型等信息。因此,索引库中就有映射(mapping),是索引中文档的字段约束信息,类似表的结构约束。
        

(3)ES与MySQL对比

  • 概念上

MySQL Elasticsearch 说明
Table Index 索引(index),就是文档的集合,类似数据库的表(table)
Row Document 文档(Document),就是一条条的数据,类似数据库中的行(Row),文档都是JSON格式
Column Field 字段(Field),就是JSON文档中的字段,类似数据库中的列(Column)
Schema Mapping Mapping(映射)是索引中文档的约束,例如字段类型约束。类似数据库的表结构(Schema)
SQL DSL DSL是elasticsearch提供的JSON风格的请求语句,用来操作elasticsearch,实现CRUD
  • 作用上
    • Mysql:擅长事务类型操作,可以确保数据的安全和一致性
    • Elasticsearch:擅长海量数据的搜索、分析、计算

    因此在企业中,往往是两者结合使用

    • 安全性要求较高的写操作,使用 MySQL 实现
    • 查询性能要求较高的搜索需求,使用 ELasticsearch 实现
    • 两者再基于某种方式,实现数据的同步,保证一致性
        

3.ES、kibana、IK分词器的安装

(1)安装ES

  • 因为到时还需要部署 kibana 容器,需要让 es 和 kibana 容器处在同一个网络中进行互联,所以先创建一个网络
    docker network create es-net 
  • 加载es.tar
    docker load -i es.tar
    
  • 运行docker命令,部署单点es
    docker run -d \
    	--name es \
        -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
        -e "discovery.type=single-node" \
        -v es-data:/usr/share/elasticsearch/data \
        -v es-plugins:/usr/share/elasticsearch/plugins \
        --privileged \
        --network es-net \
        -p 9200:9200 \
        -p 9300:9300 \
    elasticsearch:7.12.1
    【tips】命令解释:
    • -e "cluster.name=es-docker-cluster":设置集群名称
    • -e "http.host=0.0.0.0":监听的地址,可以外网访问
    • -e "ES_JAVA_OPTS=-Xms512m -Xmx512m":内存大小
    • -e "discovery.type=single-node":非集群模式
    • -v es-data:/usr/share/elasticsearch/data:挂载逻辑卷,绑定es的数据目录
    • -v es-logs:/usr/share/elasticsearch/logs:挂载逻辑卷,绑定es的日志目录
    • -v es-plugins:/usr/share/elasticsearch/plugins:挂载逻辑卷,绑定es的插件目录
    • --privileged:授予逻辑卷访问权
    • --network es-net :加入一个名为 es-net 的网络中
    • -p 9200:9200:端口映射配置
  • 访问地址:http://192.168.152.100:9200 即可看到 elasticsearch 的响应结果
        

(2)安装kibana

  • 加载kibana.tar
    docker load -i kibana.tar
  • 运行docker命令,部署kibana
    docker run -d \  --name kibana \ -e ELASTICSEARCH_HOSTS=http://es:9200 \ --network=es-net \ -p 5601:5601  \ kibana:7.12.1
    
    【tips】命令解释:
    • --network es-net :加入一个名为 es-net 的网络中,与 elasticsearch 在同一个网络中
    • -e ELASTICSEARCH_HOSTS=http://es:9200":设置 elasticsearch 的地址,因为 kibana 已经与 elasticsearch 在一个网络,因此可以用容器名直接访问 elasticsearch
    • -p 5601:5601:端口映射配置
  • 访问地址:http://192.168.152.100:5601,即可看到结果
        

(3)安装IK分词器

        离线模式安装:
  • 安装es插件需要知道 es 的 plugins 目录位置,而我们用了数据卷挂载,因此需要查看 es 的数据卷目录,通过下面命令查看
    docker volume inspect es-plugins

        可以看到 plugins 目录被挂载到了 /var/lib/docker/volumes/es-plugins/_data 这个目录中
  • 进到上述目录,将解压好的ik文件夹放到里面
        
  • 重启es容器
    docker restart es
  • IK分词器包含两种模式:
            ik_smart:智能切分,粗粒度
            ik_max_word:最细切分,细粒度
  • 测试分词器
    POST /_analyze
    {
      "analyzer": "ik_max_word",
      "text": "学习java太棒啦!"
    }

(4)扩展/屏蔽词典

        我们可以对IK分词器进行新词扩展和屏蔽某些词,让其不参与分词。只需要修改ik分词器 config 目录下的 IKAnalyzer.cfg.xml 文件
        
  • 扩展新词:
        法一:添加一个文件名,以 ext.dic 文件名为例。
        法二:配置远程扩展字典——使用nginx,在下面的目录中创建一个txt文件,以后将要扩展的词加到该txt文件中即可,然后在IKAnalyzer.cfg.xml中进行配置:
        
  • 屏蔽词:添加一个文件名,以 stopword.dic 文件名为例。
        
  • 在config目录下创建ext.dic和stopword.dic(已存在),在里面添加要扩展和屏蔽的词,一个词一行
  • 重启es容器即可

4.★ 索引库 的常用操作

(1)mapping属性

        要向 es 中存储数据,必须先创建索引库,而mapping属性是对索引库中文档的约束
        常见的 mapping 属性:
  • type:字段的数据类型,常见的简单类型有:
    • 字符串:text(可分词的文本)keyword(精确值,例如:品牌、国家、ip地址)
    • 数值:long、integer、short、byte、double、float、
    • 布尔:boolean
    • 日期:date
    • 对象:object
        【tips】:可使用  GET  /索引库名/_mapping  来查看索引库中所有字段的数据类型。
  • index是否创建索引默认为 true,创建了索引就可以被搜索了。
  • analyzer:使用哪种分词器,一般与text属性一起用。
  • properties:该字段的子字段,只有在object字段才会用到。

(2)_cat操作

  • GET /_cat/nodes:查看所有节点
  • GET /_cat/health:查看 es 健康状况
  • GET /_cat/master:查看主节点
  • GET /_cat/indices:查看所有索引 ,相当于MySQL里的show databases

(3)创建索引库

        ES中通过Restful请求操作索引库、文档。请求内容用DSL语句来表示。创建索引库和mapping的DSL语法如下:
        

(4)查看、删除索引库、添加字段

  • 查看
    GET /索引库名
  • 删除
    DELETE /索引库名
  • 修改——添加新字段:倒排索引结构虽然不复杂,但是一旦修改mapping属性(比如改变了分词器),就需要重新创建倒排索引。因此索引库一旦创建,无法修改 mapping虽然无法修改 mapping 中已有的字段,但是却允许添加新字段到 mapping 中,不会对倒排索引产生影响。
    PUT /索引库名/_mapping
    {
      "properties": {
        "新字段名":{
          "type": "integer"
        }
      }
    }

(5)修改mapping映射(修改索引库)

        上面我们提到,索引库一旦创建就无法修改 mapping。但我们可以通过 数据迁移 来实现mapping的修改。即按新的mapping需求创建一个新的索引库newBase,然后将之前的索引库oldBase的数据迁移到newBase中。
POST _reindex  //数据迁移的固定写法
{
  "source": {
    "index": "twitter" 
  },
  "dest": {
    "index": "new_twitter"
  }
}

5.DSL操作文档(document)

  • POST /索引库名/_doc/文档id
    {
        "字段1": "值1",
        "字段2": "值2",
        "字段3": {
            "子属性1": "值3",
            "子属性2": "值4"
        }
        // ...
    }
  • DELETE /索引库名/_doc/文档id
  • 改:修改文档有两种方式:
    • 全量修改:直接覆盖原来的文档,即删除旧文档、新增新文档
      PUT /索引库名/_doc/文档id
      {
          "字段1": "值1",
          "字段2": "值2",
          // ... 略
      }
    • 增量修改:修改文档中的部分字段
      POST /索引库名/_update/文档id
      {
          "doc": {
              "字段名": "新的值",
          }
      }
  • GET /索引库名/_doc/文档id

6.es中操作类型(type)——已过时

        类型(type):一个索引可以存储多个用于不同用途的对象,可以通过类型来区分索引中的不同对象,有点类似mysql中的6.0版本被废弃,是为了提高搜索效率)。
(1)保存数据(文档)
  • PUT方式:需要指定保存在哪个索引哪个类型下,指定用哪个唯一标识
    //在 customer 索引下的 external 类型下保存以下数据作为1号数据
    PUT customer/external/1
    {
      "name": "John Doe" 
    }
    【tips】①当使用PUT多次对 customer/external/1 进行数据保存时,第一次是新增操作,以后就都是修改操作了。
                 ②PUT 必须指定 id不指定 id 会报错。由于 PUT 需要指定 id,因此我们一般用PUT来做修改操作。
  • POST方式:可以不指定id,如果不指定 id,会自动生成 id;指定 id 就会修改这个数据,并新增版本号version
    PUT customer/external/1
    {
      "name": "John Doe" 
    }
(2)查询数据(文档)
  • 需要指定哪个索引哪个类型下的哪个id的文档
    GET customer/external/1
(3)修改文档
  • PUT方式:
    PUT customer/external/1
    { 
      "name": "John Doe"
    }
  • POST方式:
    //带_update就要带"doc"
    POST customer/external/1/_update
    {
      "doc":{
           "name": "John Doew"
      }
    }
    //不带就都不带
    POST customer/external/1
    {
      "name": "John Doe2"
    }
    
  • 区别:
    POST 操作会对比原文档数据,如果修改的内容与原文档内容相同,则不会有什么操作,文档 version 不增加;
    PUT 操作总会将数据重新保存并增加 version 版本
(4)删除索引库和删除文档
//删除文档
DELETE customer/external/1
//删除索引库
DELETE customer
(5)bulk批量API
  • bulk API 按顺序执行所有的 action(动作)。如果单个的动作因任何原因而失败,它将继续处理它后面剩余的动作。当 bulk API 返回时,它将提供每个动作的状态(与发送的顺序相同)。
    POST customer/external/_bulk
    //每两个为一条数据:
    //索引(新增)操作:给id为1的文档新增数据
    {"index":{"_id":"1"}}
    //要新增的数据
    {"name": "John Doe" }
    //同上
    {"index":{"_id":"2"}}
    {"name": "Jane Doe" }
    语法格式:
    //action——执行什么操作:index/delete/create/update
    //metadata——指定索引、类型、id
    { action: { metadata }}\n
    { request body }\n
    { action: { metadata }}\n
    { request body }\n

7.RestClient操作索引库

        ES 官方提供了各种不同语言的客户端,用来操作 ES。这些客户端的本质就是组装 DSL 语句,通过【 http 请求】发送给 ES
        其中
Java Rest Client又包括两种:Java Low Level Rest Clien t和 Java High Level Rest Client官方文档地址:https://www.elastic.co/guide/en/elasticsearch/client/index.html
        这里基于资料提供的hotel-demotb_hotel来学习Java High Level Rest Client。
(1)根据tb_hotel分析索引库mapping映射
        
        在分析mapping映射时要考虑:字段名、数据类型、是否参与搜索、是否分词、如果分词,分词器是什么?
  • 字段名、字段数据类型,可以参考数据表结构的名称和类型
  • 是否参与搜索要分析业务来判断,例如图片地址,就无需参与搜索
  • 是否分词要看内容,内容如果是一个整体就无需分词
  • 分词器,这里统一使用 ik_max_word
        通过分析可以写出创建索引库的DSL语句:
PUT /hotel
{
  "mappings": {
    "properties": {
      "id": {
        "type": "keyword"
      },
      "name":{
        "type": "text",
        "analyzer": "ik_max_word",
        "copy_to": "all"
      },
      "address":{
        "type": "keyword",
        "index": false
      },
      "price":{
        "type": "integer"
      },
      "score":{
        "type": "integer"
      },
      "brand":{
        "type": "keyword",
        "copy_to": "all"
      },
      "city":{
        "type": "keyword",
        "copy_to": "all"
      },
      "starName":{
        "type": "keyword"
      },
      "business":{
        "type": "keyword"
      },
      "location":{
        "type": "geo_point"
      },
      "pic":{
        "type": "keyword",
        "index": false
      },
      "all":{
        "type": "text",
        "analyzer": "ik_max_word"
      }
    }
  }
}
【tips】特殊字段说明:
  • location:地理坐标,里面包含经度、纬度。
  • ES中支持两种地理坐标数据类型:
  •         geo_point:由纬度(latitude)经度(longitude)确定的一个点。例如:"32.8752345, 120.2981576"
  •         geo_shape:有多个 geo_point 组成的复杂几何图形。例如一条直线,"LINESTRING (-77.03653 38.897676, -77.009051 38.889939)"
  • all:一个组合字段,其目的是将多字段的值利用 copy_to 合并,提供给用户搜索,这样一来就只需要搜索all这一个字段就可以得到结果,性能更好。
(2)初始化JavaRestClient
        在 es 提供的 API 中,es 一切交互都封装在一个名为 RestHighLevelClient 的类中,必须先完成这个对象的初始化,建立与 es 的连接。
  • 导入坐标
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
    </dependency>
    【tips】SpringBoot 默认的 ES 版本是 7.6.2,因此我们需要覆盖默认的ES版本,将版本改为7.12.1。
  • 覆盖ES版本
    <properties>
        <java.version>1.8</java.version>
        <elasticsearch.version>7.12.1</elasticsearch.version>
    </properties>
  • 初始化 RestHighLevelClient
    RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(
            HttpHost.create("http://192.168.152.100:9200")  //es访问地址
    ));
  • 创建一个测试类 HotelIndexTest,然后将初始化的代码编写在 @BeforeEach 方法
    public class HotelIndexTest {
        private RestHighLevelClient restHighLevelClient;
    
        @Test
        void testInit(){
            System.out.println(this.restHighLevelClient);
        }
        @BeforeEach  //将初始化的代码编写在 @BeforeEach 方法 ,这样每个测试方法运行前都会初始化一个client     
            void init(){
            this.restHighLevelClient = new RestHighLevelClient(RestClient.builder(
                    HttpHost.create("http://192.168.152.100:9200")
            ));
        }
        @AfterEach
        void down() throws IOException {
            this.restHighLevelClient.close();
        }
    }
(3)创建索引库
  • 使用API创建索引库的本质是根据DSL语句,因此我们写的代码与DSL语句是对应的:
        
  • 这个MAPPING_TEMPLATE就是DSL语句,我们把他定义成一个静态常量:
    public class HotelConstants {
        public static final String MAPPING_TEMPLATE ="{\n" +
                "  \"mappings\": {\n" +
                "    \"properties\": {\n" +
                "      \"id\": {\n" +
                "        \"type\": \"keyword\"\n" +
                "      },\n" +
                "      \"name\":{\n" +
                "        \"type\": \"text\",\n" +
                "        \"analyzer\": \"ik_max_word\",\n" +
                "        \"copy_to\": \"all\"\n" +
                "      },\n" +
                "      \"address\":{\n" +
                "        \"type\": \"keyword\",\n" +
                "        \"index\": false\n" +
                "      },\n" +
                "      \"price\":{\n" +
                "        \"type\": \"integer\"\n" +
                "      },\n" +
                "      \"score\":{\n" +
                "        \"type\": \"integer\"\n" +
                "      },\n" +
                "      \"brand\":{\n" +
                "        \"type\": \"keyword\",\n" +
                "        \"copy_to\": \"all\"\n" +
                "      },\n" +
                "      \"city\":{\n" +
                "        \"type\": \"keyword\",\n" +
                "        \"copy_to\": \"all\"\n" +
                "      },\n" +
                "      \"starName\":{\n" +
                "        \"type\": \"keyword\"\n" +
                "      },\n" +
                "      \"business\":{\n" +
                "        \"type\": \"keyword\"\n" +
                "      },\n" +
                "      \"location\":{\n" +
                "        \"type\": \"geo_point\"\n" +
                "      },\n" +
                "      \"pic\":{\n" +
                "        \"type\": \"keyword\",\n" +
                "        \"index\": false\n" +
                "      },\n" +
                "      \"all\":{\n" +
                "        \"type\": \"text\",\n" +
                "        \"analyzer\": \"ik_max_word\"\n" +
                "      }\n" +
                "    }\n" +
                "  }\n" +
                "}";
    }
(4)删除索引库
@Test
void deleteHotelIndex() throws IOException {
    DeleteIndexRequest hotel = new DeleteIndexRequest("hotel");
    restHighLevelClient.indices().delete(hotel,RequestOptions.DEFAULT);
}
(5)判断索引库是否存在
@Test
void existHotelIndex() throws IOException {
    GetIndexRequest hotel = new GetIndexRequest("hotel");
    boolean exists = restHighLevelClient.indices().exists(hotel, RequestOptions.DEFAULT);
    System.out.println(exists);
}

8.★RestClient操作文档

(1)新增文档
        新增文档与新建索引库的道理一样,都是需要根据DSL来写代码:
        
  • 先根据id查询酒店数据,然后给这条数据创建倒排索引,即可完成添加:
    @Test
    void testAddDocument() throws IOException {
        //根据id查询酒店数据
        Hotel hotel = hotelService.getById(61083L);
        HotelDoc hotelDoc = new HotelDoc(hotel);
    
        //1.创建request对象,想一下DSL新增文档时都需要哪些信息?——索引库名和id
        IndexRequest request = new IndexRequest("hotel").id(hotelDoc.getId().toString());
            
        //2.准备JSON文档:把Java对象序列化成json格式
            //source 方法用于保存数据,数据的格式为 键值对形式 的类型
        request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
    
        //3.发送请求
        client.index(request, RequestOptions.DEFAULT);
    }
(2)根据文档id查询文档
@Test
void testGetDocuments() throws IOException {
    //1.准备request
    GetRequest request = new GetRequest("hotel","61083");
    //2.发送请求
    GetResponse response = client.get(request, RequestOptions.DEFAULT);
    //3.解析响应结果
    String json = response.getSourceAsString();
    //4.json转java对象
    HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);

    System.out.println(hotelDoc);
}
(3)修改文档
        前面我们说过修改文档有全量修改和增量修改两种方式:全量修改与新增的 API 完全一致,判断依据是 ID:如果新增时,ID已经存在,则修改;如果新增时,ID不存在,则新增。
        所以全量修改写法与新增文档一样,下面我们主要是介绍增量修改。 
@Test
void testUpdateDocument() throws IOException {
    // 1.准备Request
    UpdateRequest request = new UpdateRequest("hotel", "61083");
    // 2.准备请求参数
    request.doc(
        "price", "952",
        "starName", "四钻"
    );
    // 3.发送请求
    client.update(request, RequestOptions.DEFAULT);
}
(4)删除文档
@Test
void testDeleteDocument() throws IOException {
    DeleteRequest request=new DeleteRequest("hotel","61083");
    client.delete(request,RequestOptions.DEFAULT);
}
(5)批量新建文档
        需求:将数据库酒店数据批量(bulk)导入到索引库中。
  • 利用 mybatis-plus 查询酒店数据
  • 将查询到的酒店数据(Hotel)转换为文档类型数据(HotelDoc)
  • 利用 JavaRestClient 中的 BulkRequest 批处理,实现批量新增文档:批量处理 BulkRequest,其本质就是将多个普通的 CRUD 请求组合在一起发送。 因此Bulk中添加了多个IndexRequest,就是批量新增功能了。示例:
    @Test
    void testBulkDocument() throws IOException {
        //1.创建批量操作的request对象
        BulkRequest request = new BulkRequest();
        //2.查询hotel数据
        List<Hotel> hotels = hotelService.list();
        //3.将每个hotel数据转为hotelDoc
        for (Hotel hotel : hotels) {
            HotelDoc hotelDoc = new HotelDoc(hotel);
            //4.创建新增文档的request对象
            request.add(new IndexRequest("hotel").id(hotelDoc.getId().toString()).source(JSON.toJSONString(hotelDoc), XContentType.JSON));
        }
        //5.发送请求
        client.bulk(request, RequestOptions.DEFAULT);
    }
(6)总结
        在 Java 代码中,client 无论针对操作索引库还是文档,基本都是一样的代码:
            restHighLevelClient.indices().xxx——代表操作索引库
            restHighLevelClient.xxx——代表操作文档 
        而其中所需要的request参数,直接通过 ctrl+p 这样的快捷键去查看就可以,需要什么样的request对象我们就创建什么样的request对象。

四、★DSL文档【查询】语句(_search)

        Elasticsearch 提供了基于 JSON 的 DSL(Domain Specific Language)来定义查询。
        常见的查询类型包括:

1.查询所有(match_all)

        查询出所有数据,一般测试用。例如:
  • match_all
    GET /indexName/_search
    {    
        // query{}用来定义如何查询
        "query": {
            //具体的查询类型,如match_all、match、multi_match、 range、term等     "match_all": {
        }
      }
    }
  • 只返回部分字段"_source"
    GET bank/_search
    { 
        "query": {
            "match_all": {}
        },
        "from": 0,
        "size": 5,
        //_source:以数组的形式指定要返回的字段名
        "_source": ["age","balance"]
    }

2.全文检索查询——要分词

        利用分词器对用户输入内容分词,然后去倒排索引库中匹配。例如:
  • match:
    GET /indexName/_search
    {
      "query": {
        "match": {
                //要查询的字段FIELD: 输入的内容TEXT
          "FIELD": "TEXT"
        }
      }
    }
  • multi_match:多字段匹配
    GET /indexName/_search
    {
      "query": {
        "multi_match": {
          "query": "TEXT",
          "fields": ["FIELD1", "FIELD12"]
        }
      }
    }
    【tips】搜索字段越多,对查询性能影响越大,因此建议采用 copy_to 将多个字段合并为一个,然后使用单字段查询的方式。

3.精确查询——不分词

        根据精确词条值查找数据,一般是查找 keyword、数值、日期、boolean 等类型字段,因此 不会对搜索条件分词 。例如:
  • range范围查询,一般应用在对数值类型做范围过滤的时候。比如做价格范围过滤。
    GET /indexName/_search
    {
      "query": {
        "range": {
          "FIELD": {
            "gte": 10, // 这里的gte代表大于等于,gt则代表大于
            "lte": 20 // lte代表小于等于,lt则代表小于
          }
        }
      }
    }
  • term:精确查询的字段是不分词的字段(非文本字段),因此查询的条件也必须是不分词的词条。查询时,用户输入的内容跟自动值完全匹配时才认为符合条件。
    GET /indexName/_search
    {
      "query": {
        "term": {
          "FIELD": {
            "value": "VALUE"
          }
        }
      }
    }
    

【tips】一般情况下,全文检索字段用match,其他非文本字段匹配用term。

4.地理(geo)查询

        根据经纬度查询。例如:
  • geo_distance:附近查询,也叫做距离查询(geo_distance):查询到指定中心点小于某个距离值的所有文档 在地图上找一个点作为圆心,以指定距离为半径,画一个圆,落在圆内的坐标都算符合条件:
  •         
    GET /indexName/_search
    {
      "query": {
        "geo_distance": {
          "distance": "15km", // 半径
          "FIELD": "31.21,121.5" // 圆心
        }
      }
    }
  • geo_bounding_box:矩形范围查询,查询坐标落在某个矩形范围的所有文档。查询时,需要指定矩形的左上、右下两个点的坐标,然后画出一个矩形,落在该矩形内的都是符合条件的点。
  •         
    GET /indexName/_search
    {
      "query": {
        "geo_bounding_box": {
          "FIELD": {
            "top_left": { // 左上点
              "lat": 31.1,
              "lon": 121.5
            },
            "bottom_right": { // 右下点
              "lat": 30.9,
              "lon": 121.7
            }
          }
        }
      }
    }

5.复合(compound)查询

   复合查询可以将上述各种查询条件组合起来,合并查询条件。例如:

(1)相关性算分

        当我们利用 match 查询时,文档结果会根据与搜索词条的关联度打分(_score),返回结果时按照分值降序排列。例如,我们搜索 "虹桥如家",结果如下:
        
  •  elasticsearch 早期(5.0之前)使用的打分算法是TF-IDF 算法,公式如下:
  •         
  • 在后来的5.1版本升级中,elasticsearch 将算法改进为 BM25 算法,公式如下:
  •         
  • TF-IDF 算法有个缺陷,就是词条频率越高,文档得分也会越高,单个词条对文档影响较大。而 BM25 则会让单个词条的算分有一个上限,曲线更加平滑:
  •         

(2)function_scores

        算分函数查询:可以使用 elasticsearch 中的 function_score 查询人为控制相关性算分
        
        因此,其中的关键点是
  • 过滤条件:决定哪些文档的算分被修改 
  • 算分函数:决定函数算分的算法 
  • 运算模式:决定最终算分结果

(3)bool

        布尔查询是一个或多个查询子句的组合,每一个子句就是一个子查询。子查询的组合方式有
  • must:必须匹配每个子查询,类似“与”,并且的意思
  • should:选择性匹配子查询,类似“或”
  • must_not:必须不匹配,不参与算分,类似“非”
  • filter必须匹配,但不参与算分
    GET /hotel/_search
    {
      "query": {
        "bool": {
          "must": [
            {"term": {"city": "上海" }}
          ],
          "should": [
            {"term": {"brand": "皇冠假日" }},
            {"term": {"brand": "华美达" }}
          ],
          "must_not": [
            { "range": { "price": { "lte": 500 } }}
          ],
          "filter": [
            { "range": {"score": { "gte": 45 } }}
          ]
        }
      }
    }
        在搜索酒店时,除了关键字搜索外,我们还可能根据品牌、价格、城市等字段做过滤。每一个不同的字段,其查询的条件、方式都不一样,必须是多个不同的查询,而要组合这些查询,就必须用 bool查询了。 
        需要注意的是,搜索时,参与打分的字段越多,查询的性能也越差。因此这种多条件查询时,建议这样做: 搜索框的关键字搜索,是全文检索查询,使用 must 查询,参与算分 其它过滤条件,采用 filter 查询,不参与算分。

五、搜索结果的处理

1.排序

        elasticsearch 默认根据相关度算分(_score)排序,但是也支持自定义方式对搜索结果排序。
        可以排序字段类型有:keyword 类型、数值类型、地理坐标类型、日期类型等
  • keyword、数值、日期类型排序的语法基本一致:
    GET /indexName/_search
    {
      "query": {
        "match_all": {}
      },
      "sort": [
        {
          "FIELD": "desc"  // 排序字段、排序方式ASC、DESC
        }
      ]
    }
    
    【tips】排序条件是一个数组,也就是可以写多个排序条件。按照声明的顺序,当第一个条件都满足时,再按照第二个条件排序
  • 地理坐标排序:
    GET /indexName/_search
    {
      "query": {
        "match_all": {}
      },
      "sort": [
        {
          "_geo_distance" : {
              "FIELD" : "纬度,经度", // 文档中geo_point类型的字段名、目标坐标点
              "order" : "asc", // 排序方式
              "unit" : "km" // 排序的距离单位
          }
        }
      ]
    }

2.分页

        elasticsearch 默认只返回 top10 的数据,如果要查询更多数据就需要修改分页参数了。
        elasticsearch 通过修改 from、size 参数来控制要返回的分页结果,类似于mysql中的limit ?, ?
  • from:从第几个文档开始
  • size:总共查询几个文档
    GET /hotel/_search
    {
      "query": {
        "match_all": {}
      },
      "from": 0,  // 分页开始的位置,默认为0
      "size": 10, // 期望获取的文档总数
      "sort": [
        {"price": "asc"}
      ]
    }

3.★深度分页问题

        如果要用 from + size 查询第990~1000条数据,必须先查询0~1000条,然后截取其中的 990 ~ 1000 的这10条。如果 es 是单点模式,这并无太大影响。 但是 elasticsearch 将来一定是集群,例如集群有5个节点,要查询 TOP1000 的数据,并不是每个节点查询200条就可以了。节点A的 TOP200,在另一个节点可能排到10000名以外了。 因此要想获取整个集群的 TOP1000,必须先查询出每个节点的 TOP1000,汇总结果后,重新排名,重新截取 TOP1000。
        
        当查询分页深度较大时,汇总数据过多,对内存和CPU会产生非常大的压力,因此 elasticsearch 会禁止from+ size 超过10000的请求。 
        针对深度分页,ES提供了两种解决方案: 
  • search after:分页时需要排序,原理是从上一次的排序值开始,查询下一页数据。官方推荐使用的方式。
  • scroll:原理将排序后的文档id形成快照保存在内存。官方已经不推荐使用。
        分页查询的常见实现方案以及优缺点:
  • from + size
    • 优点:支持随机翻页
    • 缺点:深度分页问题,默认查询上限(from + size)是10000
    • 场景:百度、京东、谷歌、淘宝这样的随机翻页搜索
  • after search
    • 优点:没有查询上限(单次查询的size不超过10000)
    • 缺点:只能向后逐页查询,不支持随机翻页
    • 场景:没有随机翻页需求的搜索,例如手机向下滚动翻页
  • scroll
    • 优点:没有查询上限(单次查询的size不超过10000)
    • 缺点:会有额外内存消耗,并且搜索结果是非实时的
    • 场景:海量数据的获取和迁移。从ES7.1开始不推荐,建议用 after search方案。

4.高亮显示

        高亮(highlight)显示的实现分为两步:
  • 给文档中的所有关键字都添加一个标签,例如<em>标签
    GET /hotel/_search
    {
      "query": {
        "match": {
          "FIELD": "TEXT" // 查询条件,高亮一定要使用全文检索查询
        }
      },
      "highlight": {
        "fields": { // 指定要高亮的字段
          "FIELD": {
            "pre_tags": "<em>",  // 用来标记高亮字段的前置标签
            "post_tags": "</em>" // 用来标记高亮字段的后置标签
          }
        }
      }
    }
    【tips】
    • 高亮是对关键字高亮,因此搜索条件必须带有关键字,而不能是范围这样的查询
    • 默认情况下,高亮的字段,必须与搜索指定的字段一致,否则无法高亮
    • 如果要对非搜索字段高亮,则需要添加一个属性:"required_field_match": "false"
  • 页面给<em>标签编写CSS样式

六、RestClient实现DSL查询语句

1.match_all

(1)发送请求,获取结果

         
  • 第一步:创建SearchRequest对象,指定索引库名 
  • 第二步:利用request.source()构建 DSL,DSL 中可以包含查询、分页、排序、高亮等
  •         query():代表查询条件,利用 QueryBuilders.matchAllQuery() 构建一个 match_all DSL
  •         
  • 第三步:利用 client.search() 发送请求,得到响应
        关键的 API 有两个,一个是 request.source(),其中包含了查询、排序、分页、高亮等所有功能。

(2)解析结果,获取数据

        
        Elasticsearch 返回的结果是一个 JSON 字符串,结构包含
  • hits:命中的结果
    • total:总条数,其中的value是具体的总条数值
    • max_score:所有结果中得分最高的文档的相关性算分
    • hits:搜索结果的文档数组,其中的每个文档都是一个 json 对象
      • _source:文档中的原始数据,也是 json 对象
        因此,我们解析响应结果,就是逐层解析 JSON 字符串,流程如下:
  • SearchHits:通过 response.getHits() 获取,就是 json 中的最外层的 hits,代表命中的结果
    • SearchHits.getTotalHits().value:获取总条数信息
    • SearchHits.getHits():获取 SearchHit 数组,也就是文档数组
      • SearchHit.getSourceAsString():获取文档结果中的 _source,也就是原始的 json 文档数据

(3)match_all整体代码

@Test
void testMatchAll() throws IOException {
    //1.创建SearchRequest对象
    SearchRequest request = new SearchRequest("hotel");
    //2.创建 match_all DSL语句
    request.source().query(QueryBuilders.matchAllQuery());
    //3.发送请求
    SearchResponse response = client.search(request, RequestOptions.DEFAULT);
    //4.处理结果
    SearchHits searchHits = response.getHits();
    //4.1 获取到的总文档数
    long total = searchHits.getTotalHits().value;
    System.out.println("共搜索到" + total + "条数据");
    //4.2 获取文档数组
    SearchHit[] hits = searchHits.getHits();
    for (SearchHit hit : hits) {
        //获取文档source
        String json = hit.getSourceAsString();
        //反序列化
        HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
        System.out.println("HotelDoc:" + hotelDoc);
    }
}

2.全文查询

        全文检索的match和multi_match查询与match_all的API基本一致。差别是查询条件,即query的部分。 同样是利用QueryBuilders提供的方法:
        
        那么match和mutil_match的代码只需要各改一行即可:
        

3.精确查询

        精确查询主要是两者 term 和 range,同样利用QueryBuilders实现:
        

4.复合查询——bool

        布尔查询是用 must、must_not、filter等方式组合其它查询,代码示例如下:
        

5.搜索结果处理——排序和分页

        搜索结果的排序和分页是与 query 同级的参数,因此同样是使用 request.source() 来设置。
        

6.高亮显示

        高亮显示的API包括两部分:
  • 查询的 DSL:其中除了查询条件,还需要添加高亮条件,同样是与 query 同级。
  •         
  • 结果解析:高亮的结果与查询的文档结果默认是分离的,并不在一起。 因此解析高亮的代码需要额外处理(逐层解析JSON):
  •      
    @Test
    void testHighLight() throws IOException {
        //1.创建SearchRequest对象
        SearchRequest request = new SearchRequest("hotel");
        //2.创建DSL
        //2.1 query
        request.source().query(QueryBuilders.matchQuery("all","如家"));
        //2.2 高亮
        request.source().highlighter(new HighlightBuilder().field("name").requireFieldMatch(false));
        //3.发送请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        //4.处理结果
        SearchHits searchHits = response.getHits();
        //4.1 获取到的总文档数
        long total = searchHits.getTotalHits().value;
        System.out.println("共搜索到" + total + "条数据");
        //4.2 获取文档数组
        SearchHit[] hits = searchHits.getHits();
        for (SearchHit hit : hits) {
            //获取文档source
            String json = hit.getSourceAsString();
            //反序列化
            HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
            //处理高亮
            Map<String, HighlightField> highlightFields = hit.getHighlightFields();
            if(!CollectionUtils.isEmpty(highlightFields)){
                //获取高亮字段
                HighlightField highlightField = highlightFields.get("name");
                if(highlightField != null){
                    //取出高亮结果数组中的第一个,就是酒店名称
                    String name = highlightField.getFragments()[0].string();
                    hotelDoc.setName(name);
                }
            }
            System.out.println("HotelDoc:" + hotelDoc);
        }
    }
       

7.算分控制查询

        function_score 查询结构如下:
        
        对应的API如下:
        

七、★DSL数据聚合

1.DSL聚合的分类

        聚合(aggregations)可以方便的实现对数据的统计、分析、运算。在 Elasticsearch 实现这些统计功能比数据库的 sql 要方便的多,而且查询速度非常快,可以实现近实时搜索效果。 
        聚合常见的有三类:
  • 桶(Bucket)聚合:用来对文档做分组
    • TermAggregation:按照文档字段分组,例如按照品牌值分组、按照国家分组
    • Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
  • 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
    • avg:求平均值 
    • max:求最大值 
    • min:求最小值 
    • stats:同时求 max、min、avg、sum 等
  • 管道(Pipeline)聚合:以其它聚合的结果为基础做聚合
tips】参加聚合的字段必须是像keyword、日期、数值、布尔类型等不可分词的字段

2.DSL实现聚合

(1)DSL实现Bucket聚合

        aggs 代表聚合,与query同级,此时query的作用是限定聚合的的文档范围
        聚合三要素: 聚合名称聚合类型聚合字段
        聚合可以配置属性有:size:指定聚合结果数量;order:指定聚合结果排序方式;field:指定聚合字段。
  • 案例1:统计所有数据中的酒店品牌有几种(其实就是按照品牌对数据分组,也就是 Bucket 聚合。)
    GET /hotel/_search
    {
      "size": 0,  // 设置size为0,表示结果中不包含文档,只包含聚合结果
      "aggs": { // 定义聚合
        "brandAgg": { //自定义聚合名称
          "terms": { // 聚合的类型,按照品牌值聚合,所以选择terms
            "field": "brand", // 参与聚合的字段
            "size": 20 // 最终看到的聚合结果个数,默认是只显示10个
          }
        }
      }
    }

  • 案例2:统计所有数据中的酒店品牌有几种,并按数量升序排序。(默认情况下,Bucket 聚合会统计 Bucket 内的文档数量,记为 _count,并且按照 _count 降序排序。)
    GET /hotel/_search
    {
      "size": 0, 
      "aggs": {
        "brandAgg": {
          "terms": {
            "field": "brand",
            "order": {
              "_count": "asc" // 按_count升序排列
            },
            "size": 20
          }
        }
      }
    }
  • 案例3:统计价格在200以内酒店品牌有几种。(默认情况下,Bucket 聚合是对索引库的所有文档做聚合。但是也可以限定要聚合的文档范围,只要添加 query 条件即可,即在查询到的文档里做聚合)
    GET /hotel/_search
    {
      "query": {
        "range": {
          "price": {
            "lte": 200 // 只对200元以下的文档聚合
          }
        }
      }, 
      "size": 0, 
      "aggs": {
        "brandAgg": {
          "terms": {
            "field": "brand",
            "size": 20
          }
        }
      }
    }

(2)DSL实现Metric聚合

  • 案例1:获取每个品牌用户评分最小值、最大值、平均值等数据。(上面的案例对酒店按品牌分组,形成了一个个桶,因此现在需要对桶内的酒店做聚合运算。)
    GET /hotel/_search
    {
      "size": 0, 
      "aggs": {
        "brandAgg": { 
          "terms": { 
            "field": "brand", 
            "size": 20
          },
          "aggs": { // 是对品牌聚合的子聚合,也就是分组后分别对每组再聚合
            "score_stats": { // 聚合名称
              "stats": { // 聚合类型,这里stats可以计算min、max、avg等
                "field": "score" // 参与聚合的字段,这里是用户评分score
              }
            }
          }
        }
      }
    }

  • 案例2:获取每个品牌用户评分最小值、最大值、平均值等数据,并按品牌平均分进行降序排序。
        

3.RestClient实现聚合

        以上面的酒店品牌案例为例。
        aggregation条件与 query 条件同级别,因此也需要使用 request.source() 来指定聚合条件。
          
  • 构建DSL语句,发送请求
    //1.创建request
    SearchRequest request = new SearchRequest("hotel");
    //2.创建DSL
    request.source().aggregation(AggregationBuilders
            .terms("brandAgg")
            .field("brand")
            .size(10)
    );
    //3.发送请求 
    SearchResponse response = client.search(request, RequestOptions.DEFAULT);
  • 获取结果,解析结果
    //4.解析结果
    //4.1 获取聚合结果
    Aggregations aggregations = response.getAggregations();
    //4.2 根据名称获取具体的聚合结果
    Terms brandAgg = aggregations.get("brandAgg");
    //4.3 获取桶
    List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();
    for (Terms.Bucket bucket : buckets) {
        //4.4 获取key
        String key = bucket.getKeyAsString();
        System.out.println(key);
    }

八、DSL自动补全查询

1.自动补全及拼音分词器

        自动补全:当用户在搜索框输入字符时,可以提示出与该字符有关的搜索项,提示完整词条的功能,这就是自动补全。
        如果我们需要根据拼音字母来推断,就要用到拼音分词功能。 要实现根据拼音做补全,就必须对文档按照拼音分词。
  • 插件地址:https://github.com/medcl/elasticsearch-analysis-pinyin
  • 通过docker volume inspect es-plugins命令查看插件目录,将下载好的插件解压上传到该目录
  • 重启es
  • 测试用例:
    POST /_analyze
    {
      "text": "测试用例",
      "analyzer": "pinyin"  //指定分词为拼音分词器
    }

2.自定义分词器

        拼音分词器默认会将每个汉字单独分为拼音,而我们希望的是每个词条形成一组拼音。因此需要对拼音分词器进行配置。
        elasticsearch 中分词器(analyzer)的组成包含三部分:
  • character filters:在 tokenizer 之前对文本进行处理。例如删除字符、替换字符。
  • tokenizer:将文本按照一定的规则切割成词条(term)。例如 keyword,就是不分词;还有 ik_smart。
  • filter:将 tokenizer 输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等。
        文档分词时会依次由这三部分来处理文档:
(1)自定义分词器
        可以在创建索引库时自定义分词,如在创建test索引库时自定义分词器my_analyzer:
PUT /test
{
  "settings": {
    "analysis": {
      "analyzer": { // 自定义分词器
        "my_analyzer": {  // 自定义分词器的名称
          "tokenizer": "ik_max_word",
          "filter": "py"
        }
      },
      "filter": { // 自定义tokenizer filter
        "py": { // 过滤器名称
          "type": "pinyin", // 过滤器类型,这里是pinyin
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "name": {  //指定对name字段使用my_analyzer自定义分词器
        "type": "text",
        "analyzer": "my_analyzer",
        "search_analyzer": "ik_smart"
      }
    }
  }
}
(2)设置不同分词器
        拼音分词器适合在创建倒排索引时使用,而为了避免搜索到同音字,搜索时不要使用拼音分词器。
        设置在创建倒排索引时用my_analyzer分词器,搜索时用ik_smart分词器:
PUT /test
{
  "settings": {
    "analysis": {
      "analyzer": {
        "my_analyzer": {
          "tokenizer": "ik_max_word", 
          "filter": "py"
        }
      },
      "filter": {
        "py": { ... }
      }
    }
  },
  "mappings": {
    "properties": {
      "name": {
        "type": "text",
        "analyzer": "my_analyzer",   //创建时用my_analyzer
        "search_analyzer": "ik_smart"//搜索时用ik_smart
      }
    }
  }
}

3.自动补全查询(suggest)

        elasticsearch 提供了 Completion Suggester 查询来实现自动补全功能。这个查询会匹配以用户输入内容为开头的词条并返回
        为了提高补全查询的效率,对于文档中字段的类型有一些要求:
  • 参与补全查询的字段必须是 completion 类型
  • 字段的内容一般是用来补全的多个词条形成的数组
    // 自动补全的索引库
    PUT test
    {
      "mappings": {
        "properties": {
          "title":{
            "type": "completion"
          }
        }
      }
    }
    // 示例数据
    POST test/_doc
    {
      "title": ["Sony", "WH-1000XM3"]
    }
    POST test/_doc
    {
      "title": ["SK-II", "PITERA"]
    }
    POST test/_doc
    {
      "title": ["Nintendo", "switch"]
    }
    
    // 自动补全查询
    POST /test/_search
    {
      "suggest": {
        "title_suggest": { //自定义的名字
          "text": "s", // 搜索关键字
          "completion": {
            "field": "title", // 要补全的字段
            "skip_duplicates": true, // 跳过重复的词条
            "size": 10 // 获取前10条结果
          }
        }
      }
    }

九、ES数据同步问题

        elasticsearch 中的数据来自于 mysq l数据库,因此 mysql 数据发生改变时,elasticsearch 也必须跟着改变,这个就是 elasticsearch 与 mysql 之间的数据同步。

1.数据同步方案

(1)常见的三种数据同步方案
  • 同步调用
    • hotel-demo对外提供接口,用来修改 elasticsearch 中的数据
    • 酒店管理服务在完成数据库操作后,直接调用 hotel-demo 提供的接口
  • 异步通知
    • hotel-admin 对 mysql 数据库数据完成增、删、改后,发送 MQ 消息
    • hotel-demo监听 MQ,接收到消息后完成 elasticsearch 数据修改
           
  • 监听 binlog
    • mysql 开启 binlog 功能
    • mysql 完成增、删、改操作都会记录在 binlog 中
    • hotel-demo 基于canal 监听 binlog 变化,实时更新 elasticsearch 中的内容
           
(2)三种数据同步方案的优缺点
  • 同步调用
    • 优点:实现简单粗暴
    • 缺点:业务耦合度高 
  • 异步通知
    • 优点:低耦合,实现难度一般
    • 缺点:依赖 mq 的可靠性
  • 监听binlog
    • 优点:完全解除服务间耦合
    • 缺点:开启 binlog 增加数据库负担、实现复杂度高

2.异步通知实现数据同步

        
        以异步通知为例,使用 MQ 消息中间件,实现es与mysql间的数据同步。当酒店数据库发生增删改时,es中的数据也要完成相同的操作。
  • 将hotel-admin项目导入idea,作为酒店管理的微服务
  • 在两个项目中都导入AMQP坐标
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  • 配置AMQP信息
    spring:
      rabbitmq:
        host: 192.168.152.100 # 主机名
        port: 5672 # 消息通讯端口
        virtual-host: / # 虚拟主机
        username: itcast # 用户名
        password: 123321 # 密码
  • 在两个微服务中声明exchange、queue、routingKey的常量名
    public class MQConstants {
        /**
         * 交换机
         */
        public final static String HO***CHANGE = "hotel.topic";
        /**
         * 监听新增和修改的队列
         */
        public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
        /**
         * 监听删除的队列
         */
        public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
        /**
         * 新增或修改的RoutingKey
         */
        public final static String HOTEL_INSERT_KEY = "hotel.insert";
        /**
         * 删除的RoutingKey
         */
        public final static String HOTEL_DELETE_KEY = "hotel.delete";
    }
  • 在消费者(hotel-demo)中定义exchange和queue间的绑定关系
    @Configuration
    public class MQConfig {
        /**
         * 定义Topic交换机
         * @return
         */
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(MQConstants.HO***CHANGE, true, false);
        }
    
        /**
         * 定义两个队列
         */
        @Bean
        public Queue insertQueue() {
            return new Queue(MQConstants.HOTEL_INSERT_QUEUE, true);
        }
        @Bean
        public Queue deleteQueue() {
            return new Queue(MQConstants.HOTEL_DELETE_QUEUE, true);
        }
    
        /**
         * 分别定义交换机和两个队列的绑定关系
         * @return
         */
        @Bean
        public Binding insertQueueBinding() {
            return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MQConstants.HOTEL_INSERT_KEY);
        }
        @Bean
        public Binding deleteQueueBinding() {
            return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MQConstants.HOTEL_DELETE_KEY);
        }
    }
  • 消息发送方(hotel-admin):在controller中对应的方法中添加发送消息的功能:
    rabbitTemplate.convertAndSend(MQConstants.HO***CHANGE, MQConstants.HOTEL_INSERT_KEY, hotel.getId());
    
    rabbitTemplate.convertAndSend(MQConstants.HO***CHANGE, MQConstants.HOTEL_DELETE_KEY, id);
  • 消息接收方(hotel-demo):监听MQ消息。做出对应操作:
    @Override
    public void insertById(Long id) {
        try {
            // 根据id查询酒店数据
            Hotel hotel = getById(id);
            // 转换为文档类型
            HotelDoc hotelDoc = new HotelDoc(hotel);
    
            IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
            request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
            client.index(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    @Override
    public void deleteById(Long id) {
        try {
            DeleteRequest request = new DeleteRequest("hotel", id.toString());
            client.delete(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    @Component
    public class HotelListener {
        @Autowired
        private IHotelService hotelService;
    
        /**
         * 监听酒店管理服务新增或修改的业务
         * @param id 酒店id
         */
        @RabbitListener(queues = MQConstants.HOTEL_INSERT_QUEUE)
        public void listenInsertOrUpdate(Long id){
            hotelService.InsertById(id);
        }
    
        /**
         * 监听酒店管理服务删除的业务
         * @param id 酒店id
         */
        @RabbitListener(queues = MQConstants.HOTEL_DELETE_QUEUE)
        public void listenDelete(Long id){
            hotelService.deleteById(id);
        }
    }

十、ES集群

1.搭建es集群

(1)单机es存在的问题及es集群的解决方案
        
  • 海量数据存储问题
    • es集群:将索引库中的数据分为多个分片(shard),存储到多个节点
  • 单点故障问题
    • es集群:将分片数据进行备份(replica)并存储到与原分片不同的节点中
(2)搭建集群
        在单机上利用 Docker 容器运行3个 Elasticsearch 实例来模拟搭建es集群。
  • 准备docker-composer文件
    version: '2.2'
    services:
      es01:
        image: elasticsearch:7.12.1
        container_name: es01  #容器名
        environment:
          - node.name=es01  #节点名
          - cluster.name=es-docker-cluster  #集群名,只要节点的集群名一致,es会自动将这些节点部署成一个集群。
          - discovery.seed_hosts=es02,es03
          - cluster.initial_master_nodes=es01,es02,es03  #参与竞选的所有节点
          - "ES_JAVA_OPTS=-Xms512m -Xmx512m"  #最大最小内存
        volumes:
          - data01:/usr/share/elasticsearch/data
        ports:
          - 9200:9200  #宿主机端口:容器内端口
        networks:
          - elastic
      es02:
        image: elasticsearch:7.12.1
        container_name: es02
        environment:
          - node.name=es02
          - cluster.name=es-docker-cluster
          - discovery.seed_hosts=es01,es03
          - cluster.initial_master_nodes=es01,es02,es03
          - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
        volumes:
          - data02:/usr/share/elasticsearch/data
        ports:
          - 9201:9200
        networks:
          - elastic
      es03:
        image: elasticsearch:7.12.1
        container_name: es03
        environment:
          - node.name=es03
          - cluster.name=es-docker-cluster
          - discovery.seed_hosts=es01,es02
          - cluster.initial_master_nodes=es01,es02,es03
          - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
        volumes:
          - data03:/usr/share/elasticsearch/data
        networks:
          - elastic
        ports:
          - 9202:9200
    volumes:
      data01:
        driver: local
      data02:
        driver: local
      data03:
        driver: local
    networks:
      elastic:
        driver: bridge
  • 修改 Linux 系统权限,修改 /etc/sysctl.conf 文件
     vi /etc/sysctl.conf 
  • 添加下面的内容
     vm.max_map_count=262144 
  • 让配置生效:
     sysctl -p 
  • 通过docker-compose启动集群
     docker-compose up -d
  • 集群状态监控
    • kibana 可以监控 Elasticsearch 集群,但是更推荐使用 cerebro 
    • 在windows下下载并解压安装包,打开 /bin/cerebro.bat,访问 http://localhost:9000 即可进入管理界面


  • 利用cerebro创建索引库,并进行分片和备份

2.集群节点职责划分及脑裂问题

(1)节点职责划分

        
【tips】默认情况下,集群中的任何一个节点都同时兼职上述四种角色。
        
        职责分离可以让我们根据不同节点的需求分配不同的硬件去部署,而且避免业务之间的互相干扰。真实的集群一定要将集群职责分离:
  • 主(master)节点:对 CPU 要求高,但是内存要求低
  • 数据(data)节点:对 CPU 和内存要求都高
  • 协同(coordinating)节点:对网络带宽、CPU 要求高

(2)脑裂问题及解决方案

        1)脑裂问题
        脑裂是由于集群中的节点失联导致的。 我们知道主节点一旦宕机,其余候选节点会重选主节点,但主节点失联时(主节点并没有宕机),其余候选节点(eligible node)联系不上主节点,误以为他宕机了,所以也会重选节点。这时就有了两个主节点(脑裂),一旦网络恢复,就会出现数据不同步问题。
      【tips】节点失联指的是节点与其他节点发生网络故障,而不是节点宕机。
        2)解决方案
        解决脑裂的方案是:要求选票超过 (eligible节点数量+1)/2 才能当选为 master,因此 eligible 节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes。
        例如:3个节点形成的集群,选票必须超过 (3+1)/2 = 2 票。node3 得到 node2 和 node3 的选票,当选为 master。node1 只有自己 1 票,没有当选。集群中依然只有1个主节点,没有出现脑裂。
        在版本 7.0 以后,已经成为默认配置,因此一般不会发生脑裂问题。

3.es集群的分布式存储

        上面提到协调节点(coordinating node)负责请求路由,将请求路由到数据节点(data node),完成对应的业务操作,并返回结果。那么协调节点如何确定请求该路由到哪个数据节点呢?
        es 会通过 hash 算法来计算文档应该存储到哪个分片:
        
【tips】①_routing 默认是文档id
             ②算法与分片数量有关,因此索引库一旦创建,分片数量不能修改

(1)es的分布式新增

        

(2)es的分布式查询

        es查询分成两个阶段——先分散再聚集:
  • scatter phase:分散阶段,coordinating node 会把请求分发到每一个分片
  • gather phase:聚集阶段,coordinating node 汇总 data node 的搜索结果,并处理为最终结果集返回给用户
        

4.es集群的故障转移

        集群的 master 节会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它正常节点,确保数据安全,这个叫做故障转移。
【tips】若主节点宕机,候选节点会先选举产生新主节点,然后新主节点进行故障迁移。

十一、微服务保护——Sentinel

1.雪崩问题

(1)概念
        微服务之间相互调用,由于微服务调用链中的【某个】微服务故障,引起整个链路都无法访问的情况,称为雪崩。
        
(2)解决方案
        解决雪崩问题的常见方式有四种
  • 超时处理:设定超时时间,请求超过一定时间没有响应就返回错误信息,不会无休止等待。
  • 线程隔离(舱壁模式):如图,船舱都会被隔板分离为多个独立空间,当船体破损时,只会导致部分船舱进水,这样就将故障控制在一定范围内,避免整个船体都被淹没。类似地,我们可以限定每个服务能使用的线程数,这样一旦某个服务故障,他也只能占据一部分资源,避免耗尽整个 tomcat 的资源,因此也叫线程隔离。

  • 降级熔断:是一种断路器模式:由断路器统计业务执行的异常比例如果超出阈值则会熔断该业务,拦截访问该业务的一切请求。
  • 限流(流量控制):限制业务访问的 QPS(每秒处理请求数),避免服务因流量的突增而故障
(3)服务保护技术对比
        在 SpringCloud 当中支持多种服务保护技术:
  • Netfix Hystrix
  • Sentinel
  • Resilience4J
早期比较流行的是 Hystrix 框架,但目前国内实用最广泛的还是阿里巴巴的 Sentinel 框架
        

2.Sentinel简介及安装

(1)简介
        Sentinel是阿里巴巴开源的一款微服务流量控制组件。官网地址:https://sentinelguard.io/zh-cn/index.html Sentinel
        具有以下特征
  • 丰富的应用场景:Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等。
  • 完备的实时监控:Sentinel 同时提供实时的监控功能。可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。
  • 广泛的开源生态:Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Dubbo、gRPC 的整合。只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。
  • 完善的 SPI 扩展点:Sentinel 提供简单易用、完善的 SPI 扩展接口。可以通过实现扩展接口来快速地定制逻辑。例如定制规则管理、适配动态数据源等。
(2)安装Sentinel控制台
  • 下载 jar 包至非中文目录,运行代码:
    java -jar sentinel-dashboard-1.8.1.jar
  • 访问localhost:8080即可看到控制台页面,默认账号密码都是sentinel
        
  • 如果要修改 Sentinel 的默认端口、账户、密码,可以通过下列配置:
    • server.port:默认端口8080
    • sentinel.dashboard.auth.username:默认用户名sentinel
    • sentinel.dashboard.auth.password:默认密码sentinel
    • 例如:修改端口为8090:
      java -jar sentinel-dashboard-1.8.1.jar -Dserver.port=8090

3.微服务整合sentinel

        以cloud-demo项目为例,演示微服务整合sentinel。在 order-service 中整合 Sentinel,并连接 Sentinel 的控制台,步骤如下:
  • 引入sentinel依赖
    <dependency>
        <groupId>com.alibaba.cloud</groupId> 
        <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
    </dependency>
  • 配置sentinel控制台信息
    server:
      port: 8088
    spring:
      cloud: 
        sentinel:
          transport:
            dashboard: localhost:8080
  • 访问 order-service 的任意端点:访问 http://localhost:10010/order/101,多访问几次,多点几次刷新,这样才能触发 Sentinel 的监控。
  • 访问 Sentinel 的控制台,查看效果
        

4.限流规则

        虽然雪崩问题有四种解决方案,但是限流(流控)是避免服务因突发的流量而发生故障,是对微服务雪崩问题的预防。因此先学习流量控制。

(1)簇点链路

        当请求进入微服务时,首先会访问 DispatcherServlet,然后进入 Controller、Service、Mapper,这样的一个调用链就叫做簇点链路。
        簇点链路中被监控的(没被监控的不是资源)每一个接口就是一个资源默认情况下 Sentinel 会监控 SpringMVC 的每一个端点(Endpoint,也就是 Controller 中的方法),因此 SpringMVC 的每一个端点(Endpoint)就是调用链路中的一个资源。 
        一个方法要想被 Sentinel 监控,需要通过@SentinelResource注解来标记要监控的方法:
        
  • 例如,我们刚才访问的 order-service 中的 OrderController 中的端点:/order/{orderId}
        
  • 流控、熔断等都是针对簇点链路中的资源来设置的,因此可以点击对应资源后面的按钮来设置限流规则
    • 流控:流量控制
    • 降级:降级熔断
    • 热点:热点参数限流,是限流的一种
    • 授权:请求的权限控制

(2)流控★

        点击资源 /order/{orderId} 后面的流控按钮,就可以弹出表单。表单中可以填写限流规则,如下:
        
        1)案例:给 /order/{orderId} 这个资源设置流控规则,QPS 不能超过 5,然后使用JMeter测试。
  • 在 sentinel 控制台添加限流规则:
        
  • 利用 jmeter 测试:20 个用户,2 秒内运行完,这样的话 QPS 就是 10,超过了我们设置的 QPS阈值5,所以会出发限流
        
  • 右键运行
        
        2)流控模式
        在添加限流规则时,点击高级选项,可以选择三种流控模式
  • 直接:统计当前资源的请求,触发阈值时对当前资源(即触发限流的资源)直接限流,也是默认的模式。
    • 上面的案例我们用的就是这种模式。
  • 关联:统计与当前资源相关的另一个资源,触发阈值时,对当前资源限流。相当于高优先级资源触发阈值,对低优先级资源限流
    • 使用场景:比如用户支付时需要修改订单状态,同时用户要查询订单。查询和修改操作会争抢数据库锁,产生竞争。业务需求是优先支付和更新订单的业务,因此当修改订单业务触发阈值时,需要查询订单业务限流
        
        【tips】满足竞争关系的两个优先级不同的资源可以使用关联模式:想对谁限流,就对谁加规则。
  • 链路:统计从指定链路访问到本资源的请求,触发阈值时,对指定链路限流,就是针对请求来源的限流
                
【tips】链路模式中,是对不同来源的两个链路做监控。但是 sentinel 默认会给进入 SpringMVC 的所有请求设置同一个 root 资源,会导致链路模式失效。因此需要通过修改服务的 application.yml 文件来关闭这种对 SpringMVC 的资源聚合:
spring:
  cloud:
    sentinel:
      web-context-unify: false # 关闭context整合
        3)流控效果——请求达到阈值时采取的措施
  • 快速失败:达到阈值后,新请求会被立即拒绝并抛出 FlowException 异常,是默认的处理方式。之前我们用的都是这种快速失败的模式。
  • Warm Up:阈值一般是一个微服务能承担的最大 QPS,但是一个服务刚刚启动时,一切资源尚未初始化(冷启动),如果直接将 QPS 跑到最大值,可能导致服务瞬间宕机。 预热模式是应对服务冷启动的一种方案,虽然同样会拒绝超出阈值的请求并抛出异常,但这种模式下阈值会动态变化,从一个较小值逐渐增加到最大阈值请求阈值初始值 = maxThreshold / coldFactor,持续指定时长后,逐渐提高到 maxThreshold 值。而 coldFactor 的默认值是 3。
        
  • 排队等待:当请求超过 QPS 阈值时,「快速失败」和 「Warm Up」会拒绝新请求并抛出异常。 而排队等待则是让所有请求进入一个队列中,然后按照阈值允许的时间间隔依次执行。后来的请求必须等待前面执行完成,如果请求预期的等待时间超出最大时长,该请求则会被拒绝。这种方式严格控制了请求通过的间隔时间,即让请求以均匀的速度通过,对应的是漏桶算法
    • 假设第 1 秒同时接收到 10 个请求,但第 2 秒只有 1 个请求,此时 QPS 的曲线如左图;如果使用排队等待的流控效果,所有进入的请求都要排队,以固定的 200ms 的间隔执行,QPS 曲线会变的很平滑(右图):
    •                                        
           

(3)热点参数限流

        之前的限流是统计访问某个资源的所有请求,判断是否超过 QPS 阈值。而「热点参数限流」是分别统计参数值相同的请求,判断是否超过 QPS 阈值。
        例如,一个根据 id 查询商品的接口,访问 /goods/{id} 的请求中,id 参数值会有变化,「热点参数限流」会根据参数值分别统计 QPS。当 id=1 的请求触发阈值被限流时,id值不为1的请求则不受影响。
        
  • 全局参数限流
    • 对 hot 这个资源的 0 号参数(也就是第一个参数)做统计,每 1s 相同参数值的请求数不能超过 5:
            
  • 热点参数限流
    • 全局参数限流的配置中,对这个接口的所有商品一视同仁,QPS 都限定为 5。而在实际开发中,可能部分商品是热点商品,例如秒杀商品,我们希望这部分商品的 QPS 限制与其它商品不一样,高一些。那就需要配置「热点参数限流」的高级选项了——参数例外项
    • 结合上面的配置,这里的含义是对 0 号的 long 类型参数限流,每个相同参数的 QPS 不能超过 5,但如下两个参数例外:如果参数值是 100,则每 1s 允许的 QPS 为 10;如果参数值是 101,则每 1s 允许的 QPS 为 15。
            
【tips】热点参数限流对默认的SpringMVC资源(controller中的方法)是无效的需要利用 @SentinelResource 注解标记资源!

5.隔离和降级

        限流只是一种预防措施,虽然限流可以尽量避免因高并发而引起的服务故障,但服务还会因为其它原因而故障。 而要将这些故障控制在一定范围,避免雪崩,就要靠线程隔离(舱壁模式)和熔断降级手段了。
        

(1)Feign整合sentinel

        可以看到,不管是线程隔离还是熔断降级,都是对客户端(调用方)的保护。需要在调用方发起远程调用时做线程隔离、或者服务熔断。 而我们的微服务远程调用都是基于 Feign 来完成的,因此我们需要 Feign 与 Sentinel 整合在 Feign 里面实现线程隔离和服务熔断
  • 修改配置,开启 sentinel 功能:修改服务调用方的 application.yml 文件,开启 Feign 的 sentinel 功能
    feign:
      sentinel:
        enabled: true # 开启feign对sentinel的支持
  • 给 FeignClient 编写失败后的降级逻辑
    • 方式一:FallbackClass,但无法对远程调用的异常做处理
    • 方式二:FallbackFactory可以对远程调用的异常做处理(推荐)
        【tips】业务失败后,一般不会直接给用户报错,而应该给用户返回一个友好提示或者默认结果,这个就是失败降级逻辑。
  • FallbackFactory实现失败降级处理
    • 在 feing-api 项目中定义类并实现 FallbackFactory 
      @Slf4j
      public class UserClientFallbackFactory implements FallbackFactory<UserClient> {
          @Override
          public UserClient create(Throwable throwable) {
             return userClient -> {
                 log.error("查询用户失败",throwable);
                 return new User();
             };
          }
      }
    • 在 feing-api 项目中的配置类中将 UserClientFallbackFactory 注册为Bean
      @Bean
      public UserClientFallbackFactory userClientFallbackFactory(){
          return new UserClientFallbackFactory();
      }
    • 在 feing-api 项目中的 UserClient 接口中使用 UserClientFallbackFactory
      @FeignClient(value = "userservice", fallbackFactory = UserClientFallbackFactory.class)
      public interface UserClient {
          @GetMapping("/user/{id}")
          User findById(@PathVariable("id") Long id);
      }

(2)线程隔离

        
        1)线程隔离(舱壁模式)有两种方式实现
  • 线程池隔离:给每个服务调用业务分配一个线程池,利用线程池本身实现隔离效果
    • 优点:支持主动超时和异步调用,适用于低扇出的场景。
    • 缺点:线程的额外开销比较大。
  • 信号量隔离(Sentinel默认采用):不创建线程池,而是利用计数器模式记录业务使用的线程数量,达到信号量上限时,禁止新的请求。
    • 优点:轻量级,无额外开销,适用于高频调用和高扇出的场景。
    • 缺点:不支持主动超时和异步调用
        2)sentinel实现信号量线程隔离
        线程数是指该资源能使用的 Tomcat 线程数的最大值。也就是通过限制线程数量,实现线程隔离(舱壁模式)。
        

(3)熔断降级

        熔断降级的思路是由断路器统计服务调用的异常比例、慢请求比例,如果超出阈值则会熔断该服务,即拦截访问该服务的一切请求;而当服务恢复时,断路器会放行访问该服务的请求。 
        降级:某些服务不处理或只做简单处理,如抛异常、返回null、调用Mock数据、调用Fallback处理逻辑等。
        1)断路器状态机
        断路器控制熔断和放行是通过状态机来完成的,如下图就是一个断路器的状态机:
        
        状态机包括三个状态:
  • closed:关闭状态,断路器放行所有请求,并开始统计异常比例、慢请求比例,并判断是否达到熔断条件(熔断策略),达到该条件则切换到 open 状态,打开断路器
  • open:打开状态,服务调用被熔断,访问被熔断服务的请求都会被拒绝,快速失败,直接走降级逻辑。Open 状态 5 秒后会进入 half-open 状态。
  • half-open:半开状态,会一段时间放行一次请求,根据执行结果来判断接下来的操作:
    • 请求成功:切换到 closed 状态
    • 请求失败:切换到 open 状态
        2)断路器熔断策略
        断路器熔断策略有三种:慢调用比例、异常比例、异常数
  • 慢调用比例:业务的响应时长(RT)大于指定时长的请求认定为慢调用请求。在指定时间内,如果请求数量超过设定的最小数量,慢调用比例大于设定的阈值,则触发熔断。
    • 设置 RT 超过 500ms 的调用是慢调用,统计最近 10000ms 内的请求,如果请求量超过 10 次,并且慢调用比例高于50%(0.5),则触发熔断,熔断时长为 5s,然后进入 half-open 状态,放行一次请求做测试。
            
  • 异常比例或异常数:统计指定时间内的调用,如果调用次数超过指定请求数,并且出现异常的比例(或异常次数)达到设定的比例阈值(或超过指定异常数),则触发熔断。
    • 异常比例设置如左图,统计最近 1000ms 内的请求,如果请求量超过 10 次且异常比例高于0.4,则触发熔断。
    • 异常数设置如下,统计最近 1000ms 内的请求,如果请求量超过 10 次且异常次数高于 2 次,则触发熔断。
            

6.授权规则

        授权规则可以对请求方来源做判断和控制,有白名单和黑名单两种方式:
  • 白名单:来源(origin)在白名单内的调用者允许访问 
  • 黑名单:来源(origin)在黑名单内的调用者不允许访问
        
案例】:比如我们允许请求从 网关gateway 到 order-service,不允许浏览器访问 order-service,那么白名单中就要填写网关的来源名称(origin)。
        
  • sentinel 是通过 RequestOriginParser 这个接口的 parseOrigin() 方法来获取请求的来源
    public interface RequestOriginParser {
        /**
         * 从请求request对象中获取来源origin,获取方式自定义
         */
        String parseOrigin(HttpServletRequest request);
    }
    【tips】默认情况下,sentinel 不管请求者从哪里来,返回值永远是 default,也就是说一切请求的来源都被认为是一样的值 default。因此,我们需要自定义这个接口的方法:让不同的请求,返回不同的 origin。
  • 在order-service 服务中,定义一个 RequestOriginParser 的实现类 HeaderOriginParser ,重写parseOrigin方法
    @Component
    public class HeaderOriginParser implements RequestOriginParser {
        @Override
        public String parseOrigin(HttpServletRequest request) {
            // 1.获取来源的请求头
            String origin = request.getHeader("origin");
            // 2.非空判断
            if (StringUtils.isEmpty(origin)) {
                origin = "blank";
            }
                   //3.返回来源
            return origin;
        }
    }
    【tips】默认情况下,来自gateway和浏览器的请求,请求头里的origin都是空的,所以我们需要让所有从 gateway 路由到微服务的请求都带上请求头origin:通过网关过滤器实现。
  • 修改 gateway 服务中的 application.yml,使用 AddRequestHeader 过滤器给所有经过gateway的请求加上请求头 origin = gateway
    spring:
      cloud:
        gateway:
          default-filters:
            - AddRequestHeader=origin,gateway #键值对形式
  • 在sentinel控制台添加一个授权规则,放行 origin 值为 gateway 的请求
        
        
  • 测试
        
        

7.自定义异常结果

        默认情况下,发生限流、降级、授权拦截时,都会抛出异常到调用方,但是异常结果都是 flow limmiting(限流),这样无法得知是限流还是降级还是授权拦截。
        我们可以通过实现BlockExceptionHandler接口自定义异常时的返回结果。
public interface BlockExceptionHandler {
    /**
     * 该方法用来处理请求被限流、降级、授权拦截时抛出的BlockException异常。
     */
    void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception;
}
        BlockException 包含多个不同的子类:
        
【案例】:自定义异常处理
  • 在 order-service 定义一个自定义异常处理类
    @Component
    public class SentinelExceptionHandler implements BlockExceptionHandler {
        @Override
        public void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception {
            String msg = "未知异常";
            int status = 429;
            if (e instanceof FlowException) {
                msg = "请求被限流了";
            } else if (e instanceof ParamFlowException) {
                msg = "请求被热点参数限流";
            } else if (e instanceof DegradeException) {
                msg = "请求被降级了";
            } else if (e instanceof AuthorityException) {
                msg = "没有权限访问";
                status = 401;
            }
            response.setContentType("application/json;charset=utf-8");
            response.setStatus(status);
            response.getWriter().println("{\"msg\": " + msg + ", \"status\": " + status + "}");
        }
    }
  • 测试

8.规则持久化

        sentinel 的所有规则都是内存存储,重启后所有规则都会丢失。在生产环境下,我们必须确保这些规则的持久化,避免丢失。

规则管理模式

        规则能否持久化,取决于规则的管理模式。sentinel 支持三种规则管理模式
  • 原始模式:Sentinel 的默认模式,将规则保存在内存,重启服务会丢失。
  • pull模式:控制台将配置的规则推送到 Sentinel 客户端,而客户端会将配置规则保存在本地文件或数据库中。以后会定时去本地文件或数据库中查询,更新本地规则。
                
  • push模式:控制台将配置规则推送到远程配置中心,例如 Nacos,Sentinel 客户端监听 Nacos,获取配置变更的推送消息,完成本地配置实时更新