一、异步通讯和消息队列
1.同步通讯
(1)不同微服务间的通讯有同步和异步两种方式:
- 同步通讯:就像打电话,需要实时响应,时效性强。
- 异步通讯:就像发邮件,不需要马上回复。
- 耦合度高
- 性能和吞吐能力下降:调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和。
- 资源浪费:调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源。
- 级联失败问题:如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致整个微服务群故障。
2.异步通讯
异步调用可以避免上述同步调用的问题。
以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,调用仓储服务,从仓库分配响应的库存并准备发货。在事件驱动模式中,支付服务是事件发布者(publisher),在支付完成后只需要发布一个支付成功的事件(event),事件中带上订单id。订单服务和仓储服务是事件订阅者(Consumer),订阅支付成功的事件,监听事件完成自己业务即可。 为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)。发布者发布事件到Broker,不关心谁来订阅事件。订阅者从Broker订阅事件,不关心谁发来的消息。
(1)优点
- 吞吐量提升:无需等待订阅者处理完成,响应更快速
- 故障隔离:服务没有直接调用,不存在级联失败问题
- 调用间没有阻塞,不会造成无效的资源占用
- 耦合度极低,每个服务都可以灵活插拔、可替换
- 流量削峰:不管发布事件的流量波动多大,都由 Broker 接收,订阅者可以按照自己的速度去处理事件
(2)缺点
- 架构复杂了,业务没有明显的流程线,不好管理
- 需要依赖于 Broker 的可靠、安全、性能
3.消息队列MQ
消息队列MQ(MessageQueue),就是事件驱动架构中的 Broker。
常见的 MQ 实现:ActiveMQ、RabbitMQ、RocketMQ、Kafka。
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP、XMPP、SMTP、STOMP | OpenWire、STOMP、REST、XMPP、AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
4.RabbitMQ——消息中间件
(1)安装RabbitMQ
在 Centos7 虚拟机中使用 Docker 来安装RabbitMQ。
-
获取mq镜像:
-
方式一:在线拉取mq镜像
docker pull rabbitmq:3-management
-
方式二:从本地上传mq.tar包并加载
docker load -i mq.tar
-
方式一:在线拉取mq镜像
-
安装并运行mq容器
docker run \ -e RABBITMQ_DEFAULT_USER=itcast \ -e RABBITMQ_DEFAULT_PASS=123321 \ --name mq \ # 容器名 --hostname mq1 \ # 配置主机名,集群时会用到 -p 15672:15672 \ # RabbitMQ管理平台端口,提供一个Ui界面方便我们管理mq -p 5672:5672 \ # 消息通讯端口 -d \ rabbitmq:3-management # 基于该镜像创建容器
- 访问测试:http://192.168.152.100:15672
(2)MQ 的基本结构
- exchange:交换机,负责消息路由
- queue:队列,存储消息
- virtualHost:虚拟主机,隔离不同租户的 exchange、queue、消息
(3)RabbitMQ常见消息模型
RabbitMQ 官方提供了 5 个不同的 Demo 示例,对应了不同的消息模型。
(4)HelloWorld案例——简单队列模型
官方的 HelloWorld 是基于最基础的消息队列模型来实现的,只包括三个角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息
1)简单队列模型代码实现
根据资料提供的mq-demo项目学习简单队列模型:
-
publisher的实现
public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1.与消息队列建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、消息通讯端口号、虚拟主机vhost、用户名、密码 factory.setHost("192.168.152.100"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel,基于channel向队列发送消息 Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.发送消息 String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("发送消息成功:【" + message + "】"); // 5.关闭通道和连接 channel.close(); connection.close(); } }
-
consumer的实现
public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.152.100"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.处理消息 String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息。。。。"); } }
2)简单消息队列消息发送和接收流程
发送:
- 与mq建立connection
- 创建channel
- 利用channel声明队列
- 利用channel向队列发送消息
- 与mq建立connection
- 创建channel
- 利用channel声明队列
- 定义consumer的消费行为handleDelivery()
- 利用channel将消费者与队列绑定
二、SpringAMQP
SpringAMQP 是基于 RabbitMQ 封装的一套模板,并且还利用 SpringBoot 对其实现了自动装配,使用方便。
1.SpringAMQP的功能
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了 RabbitTemplate 工具,用于发送消息
2.简单消息队列(Basic Queue)
使用AMQP实现简单消息队列中publisher向simple.queue(把这个队列提前创建好)发送消息,consumer从simple.queue监听消息并接收。
(springboot整合amqp)
(amqp的使用)
(amqp的使用)
-
导入坐标:因为publisher和consumer都需要AMQP依赖,所以直接在父工程中引入AMQP依赖——spring-boot-starter-amqp
<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
配置MQ信息:在 publisher、consumer 服务中的 application.yml 中配置与虚拟主机的连接信息
spring: rabbitmq: host: 192.168.152.100 # 主机名 port: 5672 # 消息通讯端口 virtual-host: / # 虚拟主机 username: itcast # 用户名(默认guest) password: 123321 # 密码(默认guest)
【tips】可以看出,使用AMQP时不需要自己写代码与mq建立连接,只需要配置mq信息,AMQP会自动建立mq连接。 - 开启rabbitmq功能:在启动类上用@EnableRabbit注解开启RabbitMQ功能。
-
使用AmqpAdmin创建exchange、queue、binding:
-
创建exchange
-
创建queue
-
绑定exchange和queue——binding
-
创建exchange
-
publisher发送消息:使用RabbitTemplate向simple.queue队列发送消息message
@SpringBootTest public class AMQP_PublisherTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void sendMessage() { String queueName = "simple.queue"; String message = "hello AMQP"; //向simple.queue发送message rabbitTemplate.convertAndSend(queueName,message); } }
-
consumer监听消息:在 consumer 服务中添加监听队列的类,用@RabbitListener注解配置监听信息
/* consumer监听类 */ @Component public class MQListener { /** * 监听队列的方法 * @param message ★从队列中接收到的消息 * @throws InterruptedException */ @RabbitListener(queues = "simple.queue") //指定要监听的队列为simple.queue public void listenMQ(String message) throws InterruptedException{ System.out.println("consumer接收到消息:" + message); } }
★@RabbitListener与@RabbitHandler的比较?
——二者都是用来监听消息队列的。-
@RabbitListener可以标在类或方法上,@RabbitHandler可以标在方法上:
- @RabbitListener标在方法上,指定该方法监听某个mq,此时方法接收的参数必须与发送的消息类型一致。
- ★@RabbitListener标在类上,需要配合@RabbitHandler一起使用:@RabbitListener 标注在类上,当收到消息的时候,就交给@RabbitHandler标注的方法处理,根据接收的参数类型进入不同方法进行处理中(方法重载)。这样就可以通过重载来区分不同类型的消息。
3.工作队列(Work Queue)
(1)简介
工作队列(Work queues):也被称为任务模型(Task queues)。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
在简单队列模型中,当消息处理比较耗时的时候,可能消息的产生速度会远远大于消费速度。长此以往,消息就会堆积越来越多,无法及时处理。 此时就可以使用 work queue模型,多个消费者共同处理消息处理,速度就能大大提高了。
(2)AMQP模拟实现工作队列
-
消息发送:循环发送消息,模拟大量消息堆积现象,在 publisher 服务的测试类中添加一个测试方法:
@Test public void sendMessage2WorkQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "hello AMQP-"; //模拟1s内发送50条消息 for (int i = 1; i <= 50; i++) { rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }
-
消息接收:模拟多个消费者绑定同一个队列,在 consumer 服务的 Listener类 中添加新方法:
/** *模拟两个不同处理速度的消费者 */ @RabbitListener(queues = "simple.queue") //指定要监听的队列为simple.queue public void listenWorkQueue1(String message) throws InterruptedException{ System.out.println("consumer1接收到消息:" + message); Thread.sleep(20); } @RabbitListener(queues = "simple.queue") //指定要监听的队列为simple.queue public void listenWorkQueue2(String message) throws InterruptedException{ System.err.println("consumer2接收到消息:" + message); Thread.sleep(200); }
- ★结果:启动 ConsumerApplication 后,再启动 publisher 服务中发送测试方法。可以看到consumer1很快完成了自己的25条消息,而consumer2却在缓慢的处理自己的25条消息。 也就是说此时消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这是因为 RabbitMQ 默认有一个消息预取机制,该机制让每个consumer轮流获取消息,不管他们处理的快慢,而我们希望消息处理快的consumer1多处理,consumer2少处理。因此可以通过限制每次只能取一条消息,处理完才能去下一条消息,就可以解决这个问题。
-
通过配置消费者的yml:加一个 prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息 配置就能实现 每次只能取一条消息,处理完才能去下一条消息 。
spring: rabbitmq: host: 192.168.152.100 # 主机名 port: 5672 # 消息通讯端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码 listener: simple: prefetch: 1 # 规定每个消费者每次预处理只能获取一条消息,处理完才能获取下一个消息
4.发布订阅(Publish/Subscribe)模式
(1)简介
在简单队列和工作队列中,由于RabbitMQ阅完既焚的特点,所以多个消费者无法获得同一消息,而发布订阅模式解决了这个问题。
1)在发布订阅模型中,多了一个 exchange,而且过程略有变化:
- Publisher:发送消息不再发送到队列中,而是发给 exchange(交换机)
- Consumer:与以前一样,订阅队列,没有变化
- Queue:消息队列也与以前一样,接收消息、缓存消息
- Exchange:交换机,一方面,接收publisher发送的消息;另一方面,该知道如何处理消息。例如把消息递交给某个队列还是递交给所有队列,或是将消息丢弃。到底如何操作,取决于 Exchange 的类型。
-
Exchange的作用:
- 接收 publisher 发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败则消息丢失
2)★exchange的类型
exchange 有以下3种类型:
-
Fanout Exchange:广播,将消息交给所有绑定到交换机的队列
- 可以有多个队列
- 每个队列都要绑定到 Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 交换机把消息发送给所有绑定过的队列
- 订阅队列的消费者都能拿到消息
-
Direct Exchange:定向,把消息交给符合指定 routing key 的队列
- 队列与交换机的绑定,不再是任意绑定,而是要指定一个RoutingKey(路由key)
- publisher 向 Exchange 发送消息时,也必须指定消息的 RoutingKey
- Exchange 不再把消息交给每一个与之绑定的队列,而是根据消息的RoutingKey进行判断,将消息路由到【所有】相同RoutingKey的队列
- Topic Exchange:通配符,把消息交给符合 routing pattern(路由模式)的队列
(2)Fanout Exchange发布订阅模式案例
如图,模拟两个consumer分别从各自订阅的队列中接收同一publisher发送的消息。
-
在 consumer 服务中,创建一个配置类,用来声明队列、交换机、绑定(binding)对象——用来将队列和交换机绑定起来
@Configuration public class FanoutConfig { //声明队列1、2 @Bean public Queue fanoutQueue1() { return new Queue("fanout-queue1"); } @Bean public Queue fanoutQueue2() { return new Queue("fanout-queue2"); } //声明fount exchange @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout-exchange"); } //声明绑定对象:有几个队列就有几个绑定对象 //将队列1与交换机绑定 @Bean public Binding bindingQueue1(FanoutExchange fanoutExchange, Queue fanoutQueue1) { return BindingBuilder .bind(fanoutQueue1)//将fanoutQueue1绑定 .to(fanoutExchange);//到(to)fanoutExchange上 } //将队列2与交换机绑定 @Bean public Binding bindingQueue2(FanoutExchange fanoutExchange, Queue fanoutQueue2) { return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
-
在 consumer 服务的监听类中编写两个消费者方法,分别监听 fanout-queue1 和 fanout-queue2
//consumer1监听fanount-queue1接收消息 @RabbitListener(queues = "fanout-queue1") public void listenFanoutQueue1(String message){ System.out.println("consumer1接收到fanout-queue1的消息:" + message); } //consumer2监听fanount-queue2接收消息 @RabbitListener(queues = "fanout-queue2") public void listenFanoutQueue2(String message){ System.out.println("consumer2接收到fanout-queue2的消息:" + message); }
-
在 publisher 中编写测试方法,向 fanout-exchange发送消息
@Test public void sendMessage2FanoutQueue() throws InterruptedException { String exchangeName = "fanout-exchange"; String message = "hello fanoutQueue"; //publisher发消息给exchange而不是queue rabbitTemplate.convertAndSend(exchangeName, "", message); }
-
结果:
(3)Direct Exchange发布订阅模式案例
在 Fanout 模式中,一条消息会被所有订阅的队列都消费。但是,在某些场景下,我们希望消息被路由到指定队列。
-
consumer声明并监听:Fanout中采用Bean的方式在配置类中声明队列、交换机、绑定对象;这里采用@RabbitListener注解在监听Listener类中声明队列、交换机、绑定对象及routingkey
/* 采用注解声明队列、交换机、routingkey并监听队列 */ @RabbitListener(bindings = @QueueBinding(//声明绑定对象 value = @Queue(name="direct.queue1"),//声明队列 exchange = @Exchange(name="itcast.direct",type = ExchangeTypes.DIRECT),//声明交换机及其类型(默认direct) key ={"blue","red"}//声明routingkey )) public void listenDirectQueue1(String message){ System.out.println("consumer1接收到direct.queue1的消息:" + message); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name="direct.queue2"), exchange = @Exchange(name="itcast.direct",type = ExchangeTypes.DIRECT), key ={"yellow","red"} )) public void listenDirectQueue2(String message){ System.out.println("consumer2接收到direct.queue2的消息:" + message); }
-
publisher发送消息:在 publisher 中编写测试方法,向 itcast.direct交换机发送消息
@Test public void sendMessage2DirectQueue() throws InterruptedException { String exchangeName = "itcast.direct"; String message = "hello,blue!"; //publisher发消息给exchange并指定routingkey rabbitTemplate.convertAndSend(exchangeName, "blue", message); }
【tips】在direct模式中,当每个队列的routingkey相同时,就变成了fonout模式(发给每个队列)。
(4)Topic Exchange发布订阅模式案例
Topic 与 Direct相比,都可以根据RoutingKey把消息路由到不同的队列,但Topic 类型可以让队列在绑定Routing key 的时候使用通配符(#、*)。
通配符规则: #:匹配一个或多个词
*:只能匹配一个词
Topic交换机与Direct交换机的差异?
- Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
- Topic交换机与队列绑定时的bindingKey可以指定通配符
5.AMQP消息转换器(MessageConverter)
(1)默认JDK序列化
如果publisher发送的消息是Java对象,该对象必须实现Serializable,Spring 会把消息序列化为字节发送给 MQ,consumer接收消息时,需要再把字节反序列化为 Java 对象。
默认情况下 Spring 采用的序列化方式是 JDK 序列化。可以通过代码验证一下:
-
publisher:
@Test public void sendMessage2Object() throws InterruptedException { String queue = "object.queue"; Map<String, Object> message = new HashMap<>(); message.put("name", "zhangsan"); message.put("age", 22); rabbitTemplate.convertAndSend(queue, message); }
-
consumer:
@RabbitListener(queuesToDeclare = @Queue(value = "object.queue")) public void listenObjectQueue(Map<String,Object> message) { System.out.println("object收到消息:" + message); }
-
控制台输出:
- UI收到的消息:
可以看到,控制台输出的是反序列化以后的Java对象,而在UI控制台收到的却是一串字节。因此验证了上面的说法。而JDK序列化存在以下问题:
- 数据体积过大
- 有安全漏洞(注入问题)
- 可读性差
(2)★ JSON序列化——Jackson2JsonMessageConverter
通过使用JSON序列化来解决JDK序列化带来的问题。
-
在父工程中导入json依赖(因为publisher和consumer都需要该依赖)
<dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
-
★配置消息转换器:在二者各自的启动类中添加一个 Bean
@Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }
- 结果:
三、分布式搜索ElasticSearch
1.倒排索引
(1)正向索引
倒排索引的概念是相较于像MySQL 这样的正向索引而言的。
例如给下表(tb_goods)中的 id 创建索引,根据 id 查询,那么直接走索引,查询速度非常快。 但如果是基于 title 做模糊查询,只能是逐行扫描数据,全表扫描,随着数据量增加,其查询效率也会越来越低。流程如下:
- 用户根据条件搜索数据,条件是 title 符合 "%手机%"
- 逐行获取数据,比如 id 为 1 的数据
- 判断数据中的 title 是否符合用户搜索条件
- 如果符合则放入结果集;不符合则丢弃,然后回到步骤1
(2)倒排索引
1)倒排索引中有两个非常重要的概念:
- 文档(Document):搜索到的数据中的每一条数据就是一个文档。例如一个网页、一个商品信息。
- 词条(Term):对文档数据或用户搜索数据,利用某种算法分词,得到的具备含义的词语就是词条。例如:我是中国人,就可以分为:我、是、中国人、中国、国人这样的几个词条。
2)创建倒排索引是对正向索引的一种特殊处理,流程和示意图如下:
- 将每一个文档的数据利用算法分词,得到一个个词条
- 创建表,每行数据包括词条、词条所在文档 id、位置等信息
- 因为词条唯一性,可以给词条创建索引,例如 hash 表结构索引
3)倒排索引的搜索流程如下(以搜索"华为手机"为例):
- 用户输入条件"华为手机"进行搜索
- 对用户输入的条件分词,得到词条:华为、手机
- 拿着词条在倒排索引中查找,可以得到包含词条的文档 id 有 1、2、3
- 拿着文档 id 到正向索引中查找具体文档
4)二者的比较
虽然要先查询倒排索引,再查询正向索引,看似两次查询操作,比单纯正向索引要多查一次,但是词条和文档id 都建立了索引,查询速度非常快,无需全表扫描。
为什么一个叫做正向索引,一个叫做倒排索引呢?
——正向索引是最传统的根据 id 索引的方式。但根据词条查询时,必须先逐条获取每个文档,然后判断文档中是否包含所需要的词条,是根据文档找词条的过程。
——倒排索引则相反,是先找到用户要搜索的词条,根据得到的文档 id 获取该文档。是根据词条找文档的过程。
2.ES与MySQL概念比较
(1)文档和字段
es 是面向文档(Document)存储的,文档可以是数据库中的一条商品数据,一个订单信息。文档数据会被序列化为【 json 格式】存储在 es 中。
JSON 文档中往往包含很多的字段(Field),类似于数据库中的列。
(2)索引和映射
索引(Index):就是相同类型的文档的【集合】,可以把索引当做是数据库中的表。 例如: 所有用户文档,就可以组织在一起,称为用户的索引; 所有商品的文档,可以组织在一起,称为商品的索引; 所有订单的文档,可以组织在一起,称为订单的索引。
数据库的表会有约束信息,用来定义表的结构、字段的名称、类型等信息。因此,索引库中就有映射(mapping),是索引中文档的字段约束信息,类似表的结构约束。
(3)ES与MySQL对比
-
概念上
MySQL | Elasticsearch | 说明 |
---|---|---|
Table | Index | 索引(index),就是文档的集合,类似数据库的表(table) |
Row | Document | 文档(Document),就是一条条的数据,类似数据库中的行(Row),文档都是JSON格式 |
Column | Field | 字段(Field),就是JSON文档中的字段,类似数据库中的列(Column) |
Schema | Mapping | Mapping(映射)是索引中文档的约束,例如字段类型约束。类似数据库的表结构(Schema) |
SQL | DSL | DSL是elasticsearch提供的JSON风格的请求语句,用来操作elasticsearch,实现CRUD |
-
作用上
- Mysql:擅长事务类型操作,可以确保数据的安全和一致性
- Elasticsearch:擅长海量数据的搜索、分析、计算
因此在企业中,往往是两者结合使用:
- 对安全性要求较高的写操作,使用 MySQL 实现
- 对查询性能要求较高的搜索需求,使用 ELasticsearch 实现
- 两者再基于某种方式,实现数据的同步,保证一致性
3.ES、kibana、IK分词器的安装
(1)安装ES
-
因为到时还需要部署 kibana 容器,需要让 es 和 kibana 容器处在同一个网络中进行互联,所以先创建一个网络
docker network create es-net
-
加载es.tar
docker load -i es.tar
-
运行docker命令,部署单点es
docker run -d \ --name es \ -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \ -e "discovery.type=single-node" \ -v es-data:/usr/share/elasticsearch/data \ -v es-plugins:/usr/share/elasticsearch/plugins \ --privileged \ --network es-net \ -p 9200:9200 \ -p 9300:9300 \ elasticsearch:7.12.1
【tips】命令解释:- -e "cluster.name=es-docker-cluster":设置集群名称
- -e "http.host=0.0.0.0":监听的地址,可以外网访问
- -e "ES_JAVA_OPTS=-Xms512m -Xmx512m":内存大小
- -e "discovery.type=single-node":非集群模式
- -v es-data:/usr/share/elasticsearch/data:挂载逻辑卷,绑定es的数据目录
- -v es-logs:/usr/share/elasticsearch/logs:挂载逻辑卷,绑定es的日志目录
- -v es-plugins:/usr/share/elasticsearch/plugins:挂载逻辑卷,绑定es的插件目录
- --privileged:授予逻辑卷访问权
- --network es-net :加入一个名为 es-net 的网络中
- -p 9200:9200:端口映射配置
- 访问地址:http://192.168.152.100:9200 即可看到 elasticsearch 的响应结果
(2)安装kibana
-
加载kibana.tar
docker load -i kibana.tar
-
运行docker命令,部署kibana
docker run -d \ --name kibana \ -e ELASTICSEARCH_HOSTS=http://es:9200 \ --network=es-net \ -p 5601:5601 \ kibana:7.12.1
【tips】命令解释:- --network es-net :加入一个名为 es-net 的网络中,与 elasticsearch 在同一个网络中
- -e ELASTICSEARCH_HOSTS=http://es:9200":设置 elasticsearch 的地址,因为 kibana 已经与 elasticsearch 在一个网络,因此可以用容器名直接访问 elasticsearch
- -p 5601:5601:端口映射配置
- 访问地址:http://192.168.152.100:5601,即可看到结果
(3)安装IK分词器
离线模式安装:
-
安装es插件需要知道 es 的 plugins 目录位置,而我们用了数据卷挂载,因此需要查看 es 的数据卷目录,通过下面命令查看
docker volume inspect es-plugins
- 进到上述目录,将解压好的ik文件夹放到里面
-
重启es容器
docker restart es
- IK分词器包含两种模式:
ik_max_word:最细切分,细粒度
-
测试分词器
POST /_analyze { "analyzer": "ik_max_word", "text": "学习java太棒啦!" }
(4)扩展/屏蔽词典
我们可以对IK分词器进行新词扩展和屏蔽某些词,让其不参与分词。只需要修改ik分词器 config 目录下的 IKAnalyzer.cfg.xml 文件:
-
扩展新词:
法一:添加一个文件名,以 ext.dic 文件名为例。
法二:配置远程扩展字典——使用nginx,在下面的目录中创建一个txt文件,以后将要扩展的词加到该txt文件中即可,然后在IKAnalyzer.cfg.xml中进行配置:
- 屏蔽词:添加一个文件名,以 stopword.dic 文件名为例。
- 在config目录下创建ext.dic和stopword.dic(已存在),在里面添加要扩展和屏蔽的词,一个词一行
- 重启es容器即可
4.★ 索引库 的常用操作
(1)mapping属性
要向 es 中存储数据,必须先创建索引库,而mapping属性是对索引库中文档的约束。
常见的 mapping 属性:
- type:字段的数据类型,常见的简单类型有:
- 字符串:text(可分词的文本)、keyword(精确值,例如:品牌、国家、ip地址)
- 数值:long、integer、short、byte、double、float、
- 布尔:boolean
- 日期:date
-
对象:object
- index:是否创建索引,默认为 true,创建了索引就可以被搜索了。
- analyzer:使用哪种分词器,一般与text属性一起用。
- properties:该字段的子字段,只有在object字段才会用到。
(2)_cat操作
- GET /_cat/nodes:查看所有节点
- GET /_cat/health:查看 es 健康状况
- GET /_cat/master:查看主节点
- GET /_cat/indices:查看所有索引 ,相当于MySQL里的show databases
(3)创建索引库
ES中通过Restful请求来操作索引库、文档。请求内容用DSL语句来表示。创建索引库和mapping的DSL语法如下:
(4)查看、删除索引库、添加字段
-
查看
GET /索引库名
-
删除
DELETE /索引库名
-
修改——添加新字段:倒排索引结构虽然不复杂,但是一旦修改mapping属性(比如改变了分词器),就需要重新创建倒排索引。因此索引库一旦创建,无法修改 mapping。虽然无法修改 mapping 中已有的字段,但是却允许添加新字段到 mapping 中,不会对倒排索引产生影响。
PUT /索引库名/_mapping { "properties": { "新字段名":{ "type": "integer" } } }
(5)修改mapping映射(修改索引库)
上面我们提到,索引库一旦创建就无法修改 mapping。但我们可以通过 数据迁移 来实现mapping的修改。即按新的mapping需求创建一个新的索引库newBase,然后将之前的索引库oldBase的数据迁移到newBase中。
POST _reindex //数据迁移的固定写法 { "source": { "index": "twitter" }, "dest": { "index": "new_twitter" } }
5.DSL操作文档(document)
-
增
POST /索引库名/_doc/文档id { "字段1": "值1", "字段2": "值2", "字段3": { "子属性1": "值3", "子属性2": "值4" } // ... }
-
删
DELETE /索引库名/_doc/文档id
-
改:修改文档有两种方式:
-
全量修改:直接覆盖原来的文档,即删除旧文档、新增新文档
PUT /索引库名/_doc/文档id { "字段1": "值1", "字段2": "值2", // ... 略 }
-
增量修改:修改文档中的部分字段
POST /索引库名/_update/文档id { "doc": { "字段名": "新的值", } }
-
全量修改:直接覆盖原来的文档,即删除旧文档、新增新文档
-
查
GET /索引库名/_doc/文档id
6.es中操作类型(type)——已过时
类型(type):一个索引可以存储多个用于不同用途的对象,可以通过类型来区分索引中的不同对象,有点类似mysql中的表(6.0版本被废弃,是为了提高搜索效率)。
(1)保存数据(文档)
-
PUT方式:需要指定保存在哪个索引的哪个类型下,指定用哪个唯一标识
//在 customer 索引下的 external 类型下保存以下数据作为1号数据 PUT customer/external/1 { "name": "John Doe" }
【tips】①当使用PUT多次对 customer/external/1 进行数据保存时,第一次是新增操作,以后就都是修改操作了。
②PUT 必须指定 id,不指定 id 会报错。由于 PUT 需要指定 id,因此我们一般用PUT来做修改操作。 -
POST方式:可以不指定id,如果不指定 id,会自动生成 id;指定 id 就会修改这个数据,并新增版本号version。
PUT customer/external/1 { "name": "John Doe" }
(2)查询数据(文档)
-
需要指定要哪个索引的哪个类型下的哪个id的文档
GET customer/external/1
-
PUT方式:
PUT customer/external/1 { "name": "John Doe" }
-
POST方式:
//带_update就要带"doc" POST customer/external/1/_update { "doc":{ "name": "John Doew" } } //不带就都不带 POST customer/external/1 { "name": "John Doe2" }
-
区别:
POST 操作会对比原文档数据,如果修改的内容与原文档内容相同,则不会有什么操作,文档 version 不增加;
PUT 操作总会将数据重新保存并增加 version 版本。
//删除文档 DELETE customer/external/1 //删除索引库 DELETE customer
(5)bulk批量API
-
bulk API 按顺序执行所有的 action(动作)。如果单个的动作因任何原因而失败,它将继续处理它后面剩余的动作。当 bulk API 返回时,它将提供每个动作的状态(与发送的顺序相同)。
POST customer/external/_bulk //每两个为一条数据: //索引(新增)操作:给id为1的文档新增数据 {"index":{"_id":"1"}} //要新增的数据 {"name": "John Doe" } //同上 {"index":{"_id":"2"}} {"name": "Jane Doe" } 语法格式: //action——执行什么操作:index/delete/create/update //metadata——指定索引、类型、id { action: { metadata }}\n { request body }\n { action: { metadata }}\n { request body }\n
7.RestClient操作索引库
ES 官方提供了各种不同语言的客户端,用来操作 ES。这些客户端的本质就是组装 DSL 语句,通过【 http 请求】发送给 ES。
其中Java Rest Client又包括两种:Java Low Level Rest Clien t和 Java High Level Rest Client。官方文档地址:https://www.elastic.co/guide/en/elasticsearch/client/index.html
其中Java Rest Client又包括两种:Java Low Level Rest Clien t和 Java High Level Rest Client。官方文档地址:https://www.elastic.co/guide/en/elasticsearch/client/index.html
这里基于资料提供的hotel-demo和tb_hotel来学习Java High Level Rest Client。
(1)根据tb_hotel分析索引库mapping映射
在分析mapping映射时要考虑:字段名、数据类型、是否参与搜索、是否分词、如果分词,分词器是什么?
- 字段名、字段数据类型,可以参考数据表结构的名称和类型
- 是否参与搜索要分析业务来判断,例如图片地址,就无需参与搜索
- 是否分词要看内容,内容如果是一个整体就无需分词
- 分词器,这里统一使用 ik_max_word
PUT /hotel { "mappings": { "properties": { "id": { "type": "keyword" }, "name":{ "type": "text", "analyzer": "ik_max_word", "copy_to": "all" }, "address":{ "type": "keyword", "index": false }, "price":{ "type": "integer" }, "score":{ "type": "integer" }, "brand":{ "type": "keyword", "copy_to": "all" }, "city":{ "type": "keyword", "copy_to": "all" }, "starName":{ "type": "keyword" }, "business":{ "type": "keyword" }, "location":{ "type": "geo_point" }, "pic":{ "type": "keyword", "index": false }, "all":{ "type": "text", "analyzer": "ik_max_word" } } } }【tips】特殊字段说明:
- location:地理坐标,里面包含经度、纬度。
- ES中支持两种地理坐标数据类型:
- geo_point:由纬度(latitude)和经度(longitude)确定的一个点。例如:"32.8752345, 120.2981576"
- geo_shape:有多个 geo_point 组成的复杂几何图形。例如一条直线,"LINESTRING (-77.03653 38.897676, -77.009051 38.889939)"
- all:一个组合字段,其目的是将多字段的值利用 copy_to 合并,提供给用户搜索,这样一来就只需要搜索all这一个字段就可以得到结果,性能更好。
在 es 提供的 API 中,es 一切交互都封装在一个名为 RestHighLevelClient 的类中,必须先完成这个对象的初始化,建立与 es 的连接。
-
导入坐标
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> </dependency>
【tips】SpringBoot 默认的 ES 版本是 7.6.2,因此我们需要覆盖默认的ES版本,将版本改为7.12.1。 -
覆盖ES版本
<properties> <java.version>1.8</java.version> <elasticsearch.version>7.12.1</elasticsearch.version> </properties>
-
初始化 RestHighLevelClient
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder( HttpHost.create("http://192.168.152.100:9200") //es访问地址 ));
-
创建一个测试类 HotelIndexTest,然后将初始化的代码编写在 @BeforeEach 方法
public class HotelIndexTest { private RestHighLevelClient restHighLevelClient; @Test void testInit(){ System.out.println(this.restHighLevelClient); } @BeforeEach //将初始化的代码编写在 @BeforeEach 方法 ,这样每个测试方法运行前都会初始化一个client void init(){ this.restHighLevelClient = new RestHighLevelClient(RestClient.builder( HttpHost.create("http://192.168.152.100:9200") )); } @AfterEach void down() throws IOException { this.restHighLevelClient.close(); } }
(3)创建索引库
- 使用API创建索引库的本质是根据DSL语句,因此我们写的代码与DSL语句是对应的:
-
这个MAPPING_TEMPLATE就是DSL语句,我们把他定义成一个静态常量:
public class HotelConstants { public static final String MAPPING_TEMPLATE ="{\n" + " \"mappings\": {\n" + " \"properties\": {\n" + " \"id\": {\n" + " \"type\": \"keyword\"\n" + " },\n" + " \"name\":{\n" + " \"type\": \"text\",\n" + " \"analyzer\": \"ik_max_word\",\n" + " \"copy_to\": \"all\"\n" + " },\n" + " \"address\":{\n" + " \"type\": \"keyword\",\n" + " \"index\": false\n" + " },\n" + " \"price\":{\n" + " \"type\": \"integer\"\n" + " },\n" + " \"score\":{\n" + " \"type\": \"integer\"\n" + " },\n" + " \"brand\":{\n" + " \"type\": \"keyword\",\n" + " \"copy_to\": \"all\"\n" + " },\n" + " \"city\":{\n" + " \"type\": \"keyword\",\n" + " \"copy_to\": \"all\"\n" + " },\n" + " \"starName\":{\n" + " \"type\": \"keyword\"\n" + " },\n" + " \"business\":{\n" + " \"type\": \"keyword\"\n" + " },\n" + " \"location\":{\n" + " \"type\": \"geo_point\"\n" + " },\n" + " \"pic\":{\n" + " \"type\": \"keyword\",\n" + " \"index\": false\n" + " },\n" + " \"all\":{\n" + " \"type\": \"text\",\n" + " \"analyzer\": \"ik_max_word\"\n" + " }\n" + " }\n" + " }\n" + "}"; }
(4)删除索引库
@Test void deleteHotelIndex() throws IOException { DeleteIndexRequest hotel = new DeleteIndexRequest("hotel"); restHighLevelClient.indices().delete(hotel,RequestOptions.DEFAULT); }
(5)判断索引库是否存在
@Test void existHotelIndex() throws IOException { GetIndexRequest hotel = new GetIndexRequest("hotel"); boolean exists = restHighLevelClient.indices().exists(hotel, RequestOptions.DEFAULT); System.out.println(exists); }
8.★RestClient操作文档
(1)新增文档
新增文档与新建索引库的道理一样,都是需要根据DSL来写代码:
-
先根据id查询酒店数据,然后给这条数据创建倒排索引,即可完成添加:
@Test void testAddDocument() throws IOException { //根据id查询酒店数据 Hotel hotel = hotelService.getById(61083L); HotelDoc hotelDoc = new HotelDoc(hotel); //1.创建request对象,想一下DSL新增文档时都需要哪些信息?——索引库名和id IndexRequest request = new IndexRequest("hotel").id(hotelDoc.getId().toString()); //2.准备JSON文档:把Java对象序列化成json格式 //source 方法用于保存数据,数据的格式为 键值对形式 的类型 request.source(JSON.toJSONString(hotelDoc), XContentType.JSON); //3.发送请求 client.index(request, RequestOptions.DEFAULT); }
(2)根据文档id查询文档
@Test void testGetDocuments() throws IOException { //1.准备request GetRequest request = new GetRequest("hotel","61083"); //2.发送请求 GetResponse response = client.get(request, RequestOptions.DEFAULT); //3.解析响应结果 String json = response.getSourceAsString(); //4.json转java对象 HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class); System.out.println(hotelDoc); }(3)修改文档
前面我们说过修改文档有全量修改和增量修改两种方式:全量修改与新增的 API 完全一致,判断依据是 ID:如果新增时,ID已经存在,则修改;如果新增时,ID不存在,则新增。
所以全量修改写法与新增文档一样,下面我们主要是介绍增量修改。
@Test void testUpdateDocument() throws IOException { // 1.准备Request UpdateRequest request = new UpdateRequest("hotel", "61083"); // 2.准备请求参数 request.doc( "price", "952", "starName", "四钻" ); // 3.发送请求 client.update(request, RequestOptions.DEFAULT); }(4)删除文档
@Test void testDeleteDocument() throws IOException { DeleteRequest request=new DeleteRequest("hotel","61083"); client.delete(request,RequestOptions.DEFAULT); }(5)批量新建文档
需求:将数据库酒店数据批量(bulk)导入到索引库中。
- 利用 mybatis-plus 查询酒店数据
- 将查询到的酒店数据(Hotel)转换为文档类型数据(HotelDoc)
-
利用 JavaRestClient 中的 BulkRequest 批处理,实现批量新增文档:批量处理 BulkRequest,其本质就是将多个普通的 CRUD 请求组合在一起发送。 因此Bulk中添加了多个IndexRequest,就是批量新增功能了。示例:
@Test void testBulkDocument() throws IOException { //1.创建批量操作的request对象 BulkRequest request = new BulkRequest(); //2.查询hotel数据 List<Hotel> hotels = hotelService.list(); //3.将每个hotel数据转为hotelDoc for (Hotel hotel : hotels) { HotelDoc hotelDoc = new HotelDoc(hotel); //4.创建新增文档的request对象 request.add(new IndexRequest("hotel").id(hotelDoc.getId().toString()).source(JSON.toJSONString(hotelDoc), XContentType.JSON)); } //5.发送请求 client.bulk(request, RequestOptions.DEFAULT); }
在 Java 代码中,client 无论针对操作索引库还是文档,基本都是一样的代码:
restHighLevelClient.indices().xxx——代表操作索引库
restHighLevelClient.xxx——代表操作文档
而其中所需要的request参数,直接通过 ctrl+p 这样的快捷键去查看就可以,需要什么样的request对象我们就创建什么样的request对象。
四、★DSL文档【查询】语句(_search)
Elasticsearch 提供了基于 JSON 的 DSL(Domain Specific Language)来定义查询。
常见的查询类型包括:
1.查询所有(match_all)
查询出所有数据,一般测试用。例如:
-
match_all
GET /indexName/_search { // query{}用来定义如何查询 "query": { //具体的查询类型,如match_all、match、multi_match、 range、term等 "match_all": { } } }
-
只返回部分字段"_source"
GET bank/_search { "query": { "match_all": {} }, "from": 0, "size": 5, //_source:以数组的形式指定要返回的字段名 "_source": ["age","balance"] }
2.全文检索查询——要分词
利用分词器对用户输入内容分词,然后去倒排索引库中匹配。例如:
-
match:
GET /indexName/_search { "query": { "match": { //要查询的字段FIELD: 输入的内容TEXT "FIELD": "TEXT" } } }
-
multi_match:多字段匹配
GET /indexName/_search { "query": { "multi_match": { "query": "TEXT", "fields": ["FIELD1", "FIELD12"] } } }
【tips】搜索字段越多,对查询性能影响越大,因此建议采用 copy_to 将多个字段合并为一个,然后使用单字段查询的方式。
3.精确查询——不分词
根据精确词条值查找数据,一般是查找 keyword、数值、日期、boolean 等类型字段,因此 不会对搜索条件分词 。例如:
-
range:范围查询,一般应用在对数值类型做范围过滤的时候。比如做价格范围过滤。
GET /indexName/_search { "query": { "range": { "FIELD": { "gte": 10, // 这里的gte代表大于等于,gt则代表大于 "lte": 20 // lte代表小于等于,lt则代表小于 } } } }
-
term:精确查询的字段是不分词的字段(非文本字段),因此查询的条件也必须是不分词的词条。查询时,用户输入的内容跟自动值完全匹配时才认为符合条件。
GET /indexName/_search { "query": { "term": { "FIELD": { "value": "VALUE" } } } }
【tips】一般情况下,全文检索字段用match,其他非文本字段匹配用term。
4.地理(geo)查询
根据经纬度查询。例如:
- geo_distance:附近查询,也叫做距离查询(geo_distance):查询到指定中心点小于某个距离值的所有文档 在地图上找一个点作为圆心,以指定距离为半径,画一个圆,落在圆内的坐标都算符合条件:
-
GET /indexName/_search { "query": { "geo_distance": { "distance": "15km", // 半径 "FIELD": "31.21,121.5" // 圆心 } } }
- geo_bounding_box:矩形范围查询,查询坐标落在某个矩形范围的所有文档。查询时,需要指定矩形的左上、右下两个点的坐标,然后画出一个矩形,落在该矩形内的都是符合条件的点。
-
GET /indexName/_search { "query": { "geo_bounding_box": { "FIELD": { "top_left": { // 左上点 "lat": 31.1, "lon": 121.5 }, "bottom_right": { // 右下点 "lat": 30.9, "lon": 121.7 } } } } }
5.复合(compound)查询
复合查询可以将上述各种查询条件组合起来,合并查询条件。例如:
(1)相关性算分
当我们利用 match 查询时,文档结果会根据与搜索词条的关联度打分(_score),返回结果时按照分值降序排列。例如,我们搜索 "虹桥如家",结果如下:
- elasticsearch 早期(5.0之前)使用的打分算法是TF-IDF 算法,公式如下:
-
- 在后来的5.1版本升级中,elasticsearch 将算法改进为 BM25 算法,公式如下:
-
- TF-IDF 算法有个缺陷,就是词条频率越高,文档得分也会越高,单个词条对文档影响较大。而 BM25 则会让单个词条的算分有一个上限,曲线更加平滑:
-
(2)function_scores
算分函数查询:可以使用 elasticsearch 中的 function_score 查询人为控制相关性算分。
因此,其中的关键点是
- 过滤条件:决定哪些文档的算分被修改
- 算分函数:决定函数算分的算法
- 运算模式:决定最终算分结果
(3)bool
布尔查询是一个或多个查询子句的组合,每一个子句就是一个子查询。子查询的组合方式有
- must:必须匹配每个子查询,类似“与”,并且的意思
- should:选择性匹配子查询,类似“或”
- must_not:必须不匹配,不参与算分,类似“非”
-
filter:必须匹配,但不参与算分
GET /hotel/_search { "query": { "bool": { "must": [ {"term": {"city": "上海" }} ], "should": [ {"term": {"brand": "皇冠假日" }}, {"term": {"brand": "华美达" }} ], "must_not": [ { "range": { "price": { "lte": 500 } }} ], "filter": [ { "range": {"score": { "gte": 45 } }} ] } } }
需要注意的是,搜索时,参与打分的字段越多,查询的性能也越差。因此这种多条件查询时,建议这样做: 搜索框的关键字搜索,是全文检索查询,使用 must 查询,参与算分 其它过滤条件,采用 filter 查询,不参与算分。
五、搜索结果的处理
1.排序
elasticsearch 默认根据相关度算分(_score)排序,但是也支持自定义方式对搜索结果排序。
可以排序字段类型有:keyword 类型、数值类型、地理坐标类型、日期类型等。
-
keyword、数值、日期类型排序的语法基本一致:
GET /indexName/_search { "query": { "match_all": {} }, "sort": [ { "FIELD": "desc" // 排序字段、排序方式ASC、DESC } ] }
【tips】排序条件是一个数组,也就是可以写多个排序条件。按照声明的顺序,当第一个条件都满足时,再按照第二个条件排序。 -
地理坐标排序:
GET /indexName/_search { "query": { "match_all": {} }, "sort": [ { "_geo_distance" : { "FIELD" : "纬度,经度", // 文档中geo_point类型的字段名、目标坐标点 "order" : "asc", // 排序方式 "unit" : "km" // 排序的距离单位 } } ] }
2.分页
elasticsearch 默认只返回 top10 的数据,如果要查询更多数据就需要修改分页参数了。
elasticsearch 通过修改 from、size 参数来控制要返回的分页结果,类似于mysql中的limit ?, ?:
- from:从第几个文档开始
-
size:总共查询几个文档
GET /hotel/_search { "query": { "match_all": {} }, "from": 0, // 分页开始的位置,默认为0 "size": 10, // 期望获取的文档总数 "sort": [ {"price": "asc"} ] }
3.★深度分页问题
如果要用 from + size 查询第990~1000条数据,必须先查询0~1000条,然后截取其中的 990 ~ 1000 的这10条。如果 es 是单点模式,这并无太大影响。 但是 elasticsearch 将来一定是集群,例如集群有5个节点,要查询 TOP1000 的数据,并不是每个节点查询200条就可以了。节点A的 TOP200,在另一个节点可能排到10000名以外了。 因此要想获取整个集群的 TOP1000,必须先查询出每个节点的 TOP1000,汇总结果后,重新排名,重新截取 TOP1000。
当查询分页深度较大时,汇总数据过多,对内存和CPU会产生非常大的压力,因此 elasticsearch 会禁止from+ size 超过10000的请求。
针对深度分页,ES提供了两种解决方案:
- search after:分页时需要排序,原理是从上一次的排序值开始,查询下一页数据。官方推荐使用的方式。
- scroll:原理将排序后的文档id形成快照,保存在内存。官方已经不推荐使用。
分页查询的常见实现方案以及优缺点:
-
from + size
- 优点:支持随机翻页
- 缺点:深度分页问题,默认查询上限(from + size)是10000
- 场景:百度、京东、谷歌、淘宝这样的随机翻页搜索
-
after search
- 优点:没有查询上限(单次查询的size不超过10000)
- 缺点:只能向后逐页查询,不支持随机翻页
- 场景:没有随机翻页需求的搜索,例如手机向下滚动翻页
-
scroll
- 优点:没有查询上限(单次查询的size不超过10000)
- 缺点:会有额外内存消耗,并且搜索结果是非实时的
- 场景:海量数据的获取和迁移。从ES7.1开始不推荐,建议用 after search方案。
4.高亮显示
高亮(highlight)显示的实现分为两步:
-
给文档中的所有关键字都添加一个标签,例如<em>标签
GET /hotel/_search { "query": { "match": { "FIELD": "TEXT" // 查询条件,高亮一定要使用全文检索查询 } }, "highlight": { "fields": { // 指定要高亮的字段 "FIELD": { "pre_tags": "<em>", // 用来标记高亮字段的前置标签 "post_tags": "</em>" // 用来标记高亮字段的后置标签 } } } }
【tips】- 高亮是对关键字高亮,因此搜索条件必须带有关键字,而不能是范围这样的查询
- 默认情况下,高亮的字段,必须与搜索指定的字段一致,否则无法高亮
- 如果要对非搜索字段高亮,则需要添加一个属性:"required_field_match": "false"
- 页面给<em>标签编写CSS样式
六、RestClient实现DSL查询语句
1.match_all
(1)发送请求,获取结果
- 第一步:创建SearchRequest对象,指定索引库名
- 第二步:利用request.source()构建 DSL,DSL 中可以包含查询、分页、排序、高亮等
- query():代表查询条件,利用 QueryBuilders.matchAllQuery() 构建一个 match_all DSL
-
- 第三步:利用 client.search() 发送请求,得到响应
(2)解析结果,获取数据
Elasticsearch 返回的结果是一个 JSON 字符串,结构包含:
-
hits:命中的结果
- total:总条数,其中的value是具体的总条数值
- max_score:所有结果中得分最高的文档的相关性算分
-
hits:搜索结果的文档数组,其中的每个文档都是一个 json 对象
- _source:文档中的原始数据,也是 json 对象
-
SearchHits:通过 response.getHits() 获取,就是 json 中的最外层的 hits,代表命中的结果
- SearchHits.getTotalHits().value:获取总条数信息
-
SearchHits.getHits():获取 SearchHit 数组,也就是文档数组
- SearchHit.getSourceAsString():获取文档结果中的 _source,也就是原始的 json 文档数据
(3)match_all整体代码
@Test void testMatchAll() throws IOException { //1.创建SearchRequest对象 SearchRequest request = new SearchRequest("hotel"); //2.创建 match_all DSL语句 request.source().query(QueryBuilders.matchAllQuery()); //3.发送请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); //4.处理结果 SearchHits searchHits = response.getHits(); //4.1 获取到的总文档数 long total = searchHits.getTotalHits().value; System.out.println("共搜索到" + total + "条数据"); //4.2 获取文档数组 SearchHit[] hits = searchHits.getHits(); for (SearchHit hit : hits) { //获取文档source String json = hit.getSourceAsString(); //反序列化 HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class); System.out.println("HotelDoc:" + hotelDoc); } }
2.全文查询
全文检索的match和multi_match查询与match_all的API基本一致。差别是查询条件,即query的部分。 同样是利用QueryBuilders提供的方法:
那么match和mutil_match的代码只需要各改一行即可:
3.精确查询
精确查询主要是两者 term 和 range,同样利用QueryBuilders实现:
4.复合查询——bool
布尔查询是用 must、must_not、filter等方式组合其它查询,代码示例如下:
5.搜索结果处理——排序和分页
搜索结果的排序和分页是与 query 同级的参数,因此同样是使用 request.source() 来设置。
6.高亮显示
高亮显示的API包括两部分:
- 查询的 DSL:其中除了查询条件,还需要添加高亮条件,同样是与 query 同级。
-
- 结果解析:高亮的结果与查询的文档结果默认是分离的,并不在一起。 因此解析高亮的代码需要额外处理(逐层解析JSON):
-
@Test void testHighLight() throws IOException { //1.创建SearchRequest对象 SearchRequest request = new SearchRequest("hotel"); //2.创建DSL //2.1 query request.source().query(QueryBuilders.matchQuery("all","如家")); //2.2 高亮 request.source().highlighter(new HighlightBuilder().field("name").requireFieldMatch(false)); //3.发送请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); //4.处理结果 SearchHits searchHits = response.getHits(); //4.1 获取到的总文档数 long total = searchHits.getTotalHits().value; System.out.println("共搜索到" + total + "条数据"); //4.2 获取文档数组 SearchHit[] hits = searchHits.getHits(); for (SearchHit hit : hits) { //获取文档source String json = hit.getSourceAsString(); //反序列化 HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class); //处理高亮 Map<String, HighlightField> highlightFields = hit.getHighlightFields(); if(!CollectionUtils.isEmpty(highlightFields)){ //获取高亮字段 HighlightField highlightField = highlightFields.get("name"); if(highlightField != null){ //取出高亮结果数组中的第一个,就是酒店名称 String name = highlightField.getFragments()[0].string(); hotelDoc.setName(name); } } System.out.println("HotelDoc:" + hotelDoc); } }
7.算分控制查询
function_score 查询结构如下:
对应的API如下:
七、★DSL数据聚合
1.DSL聚合的分类
聚合(aggregations)可以方便的实现对数据的统计、分析、运算。在 Elasticsearch 实现这些统计功能比数据库的 sql 要方便的多,而且查询速度非常快,可以实现近实时搜索效果。
聚合常见的有三类:
-
桶(Bucket)聚合:用来对文档做分组
- TermAggregation:按照文档字段值分组,例如按照品牌值分组、按照国家分组
- Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
-
度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
- avg:求平均值
- max:求最大值
- min:求最小值
- stats:同时求 max、min、avg、sum 等
- 管道(Pipeline)聚合:以其它聚合的结果为基础做聚合
【tips】参加聚合的字段必须是像keyword、日期、数值、布尔类型等不可分词的字段!
2.DSL实现聚合
(1)DSL实现Bucket聚合
aggs 代表聚合,与query同级,此时query的作用是限定聚合的的文档范围。
聚合三要素: 聚合名称、聚合类型、聚合字段。
聚合可以配置属性有:size:指定聚合结果数量;order:指定聚合结果排序方式;field:指定聚合字段。
-
案例1:统计所有数据中的酒店品牌有几种(其实就是按照品牌对数据分组,也就是 Bucket 聚合。)
GET /hotel/_search { "size": 0, // 设置size为0,表示结果中不包含文档,只包含聚合结果 "aggs": { // 定义聚合 "brandAgg": { //自定义聚合名称 "terms": { // 聚合的类型,按照品牌值聚合,所以选择terms "field": "brand", // 参与聚合的字段 "size": 20 // 最终看到的聚合结果个数,默认是只显示10个 } } } }
-
案例2:统计所有数据中的酒店品牌有几种,并按数量升序排序。(默认情况下,Bucket 聚合会统计 Bucket 内的文档数量,记为 _count,并且按照 _count 降序排序。)
GET /hotel/_search { "size": 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "order": { "_count": "asc" // 按_count升序排列 }, "size": 20 } } } }
-
案例3:统计价格在200以内的酒店品牌有几种。(默认情况下,Bucket 聚合是对索引库的所有文档做聚合。但是也可以限定要聚合的文档范围,只要添加 query 条件即可,即在查询到的文档里做聚合)
GET /hotel/_search { "query": { "range": { "price": { "lte": 200 // 只对200元以下的文档聚合 } } }, "size": 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "size": 20 } } } }
(2)DSL实现Metric聚合
-
案例1:获取每个品牌的用户评分的最小值、最大值、平均值等数据。(上面的案例对酒店按品牌分组,形成了一个个桶,因此现在需要对桶内的酒店做聚合运算。)
GET /hotel/_search { "size": 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "size": 20 }, "aggs": { // 是对品牌聚合的子聚合,也就是分组后分别对每组再聚合 "score_stats": { // 聚合名称 "stats": { // 聚合类型,这里stats可以计算min、max、avg等 "field": "score" // 参与聚合的字段,这里是用户评分score } } } } } }
- 案例2:获取每个品牌的用户评分的最小值、最大值、平均值等数据,并按品牌平均分进行降序排序。
3.RestClient实现聚合
以上面的酒店品牌案例为例。
aggregation条件与 query 条件同级别,因此也需要使用 request.source() 来指定聚合条件。
-
构建DSL语句,发送请求
//1.创建request SearchRequest request = new SearchRequest("hotel"); //2.创建DSL request.source().aggregation(AggregationBuilders .terms("brandAgg") .field("brand") .size(10) ); //3.发送请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT);
-
获取结果,解析结果
//4.解析结果 //4.1 获取聚合结果 Aggregations aggregations = response.getAggregations(); //4.2 根据名称获取具体的聚合结果 Terms brandAgg = aggregations.get("brandAgg"); //4.3 获取桶 List<? extends Terms.Bucket> buckets = brandAgg.getBuckets(); for (Terms.Bucket bucket : buckets) { //4.4 获取key String key = bucket.getKeyAsString(); System.out.println(key); }
八、DSL自动补全查询
1.自动补全及拼音分词器
自动补全:当用户在搜索框输入字符时,可以提示出与该字符有关的搜索项,提示完整词条的功能,这就是自动补全。
如果我们需要根据拼音字母来推断,就要用到拼音分词功能。 要实现根据拼音做补全,就必须对文档按照拼音分词。
- 插件地址:https://github.com/medcl/elasticsearch-analysis-pinyin
- 通过docker volume inspect es-plugins命令查看插件目录,将下载好的插件解压上传到该目录
- 重启es
-
测试用例:
POST /_analyze { "text": "测试用例", "analyzer": "pinyin" //指定分词为拼音分词器 }
2.自定义分词器
拼音分词器默认会将每个汉字单独分为拼音,而我们希望的是每个词条形成一组拼音。因此需要对拼音分词器进行配置。
elasticsearch 中分词器(analyzer)的组成包含三部分:
- character filters:在 tokenizer 之前对文本进行处理。例如删除字符、替换字符。
- tokenizer:将文本按照一定的规则切割成词条(term)。例如 keyword,就是不分词;还有 ik_smart。
- filter:将 tokenizer 输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等。
(1)自定义分词器
可以在创建索引库时自定义分词器,如在创建test索引库时自定义分词器my_analyzer:
PUT /test { "settings": { "analysis": { "analyzer": { // 自定义分词器 "my_analyzer": { // 自定义分词器的名称 "tokenizer": "ik_max_word", "filter": "py" } }, "filter": { // 自定义tokenizer filter "py": { // 过滤器名称 "type": "pinyin", // 过滤器类型,这里是pinyin "keep_full_pinyin": false, "keep_joined_full_pinyin": true, "keep_original": true, "limit_first_letter_length": 16, "remove_duplicated_term": true, "none_chinese_pinyin_tokenize": false } } } }, "mappings": { "properties": { "name": { //指定对name字段使用my_analyzer自定义分词器 "type": "text", "analyzer": "my_analyzer", "search_analyzer": "ik_smart" } } } }(2)设置不同分词器
拼音分词器适合在创建倒排索引时使用,而为了避免搜索到同音字,搜索时不要使用拼音分词器。
设置在创建倒排索引时用my_analyzer分词器,搜索时用ik_smart分词器:
PUT /test { "settings": { "analysis": { "analyzer": { "my_analyzer": { "tokenizer": "ik_max_word", "filter": "py" } }, "filter": { "py": { ... } } } }, "mappings": { "properties": { "name": { "type": "text", "analyzer": "my_analyzer", //创建时用my_analyzer "search_analyzer": "ik_smart"//搜索时用ik_smart } } } }
3.自动补全查询(suggest)
elasticsearch 提供了 Completion Suggester 查询来实现自动补全功能。这个查询会匹配以用户输入内容为开头的词条并返回。
为了提高补全查询的效率,对于文档中字段的类型有一些要求:
- 参与补全查询的字段必须是 completion 类型。
-
字段的内容一般是用来补全的多个词条形成的数组。
// 自动补全的索引库 PUT test { "mappings": { "properties": { "title":{ "type": "completion" } } } } // 示例数据 POST test/_doc { "title": ["Sony", "WH-1000XM3"] } POST test/_doc { "title": ["SK-II", "PITERA"] } POST test/_doc { "title": ["Nintendo", "switch"] } // 自动补全查询 POST /test/_search { "suggest": { "title_suggest": { //自定义的名字 "text": "s", // 搜索关键字 "completion": { "field": "title", // 要补全的字段 "skip_duplicates": true, // 跳过重复的词条 "size": 10 // 获取前10条结果 } } } }
九、ES数据同步问题
elasticsearch 中的数据来自于 mysq l数据库,因此 mysql 数据发生改变时,elasticsearch 也必须跟着改变,这个就是 elasticsearch 与 mysql 之间的数据同步。
1.数据同步方案
(1)常见的三种数据同步方案
-
同步调用
- hotel-demo对外提供接口,用来修改 elasticsearch 中的数据
- 酒店管理服务在完成数据库操作后,直接调用 hotel-demo 提供的接口
-
异步通知
- hotel-admin 对 mysql 数据库数据完成增、删、改后,发送 MQ 消息
- hotel-demo监听 MQ,接收到消息后完成 elasticsearch 数据修改
-
监听 binlog
- mysql 开启 binlog 功能
- mysql 完成增、删、改操作都会记录在 binlog 中
- hotel-demo 基于canal 监听 binlog 变化,实时更新 elasticsearch 中的内容
(2)三种数据同步方案的优缺点
-
同步调用
- 优点:实现简单粗暴
- 缺点:业务耦合度高
-
异步通知
- 优点:低耦合,实现难度一般
- 缺点:依赖 mq 的可靠性
-
监听binlog
- 优点:完全解除服务间耦合
- 缺点:开启 binlog 增加数据库负担、实现复杂度高
2.异步通知实现数据同步
以异步通知为例,使用 MQ 消息中间件,实现es与mysql间的数据同步。当酒店数据库发生增删改时,es中的数据也要完成相同的操作。
- 将hotel-admin项目导入idea,作为酒店管理的微服务
-
在两个项目中都导入AMQP坐标
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
配置AMQP信息
spring: rabbitmq: host: 192.168.152.100 # 主机名 port: 5672 # 消息通讯端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码
-
在两个微服务中声明exchange、queue、routingKey的常量名
public class MQConstants { /** * 交换机 */ public final static String HO***CHANGE = "hotel.topic"; /** * 监听新增和修改的队列 */ public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue"; /** * 监听删除的队列 */ public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue"; /** * 新增或修改的RoutingKey */ public final static String HOTEL_INSERT_KEY = "hotel.insert"; /** * 删除的RoutingKey */ public final static String HOTEL_DELETE_KEY = "hotel.delete"; }
-
在消费者(hotel-demo)中定义exchange和queue间的绑定关系
@Configuration public class MQConfig { /** * 定义Topic交换机 * @return */ @Bean public TopicExchange topicExchange() { return new TopicExchange(MQConstants.HO***CHANGE, true, false); } /** * 定义两个队列 */ @Bean public Queue insertQueue() { return new Queue(MQConstants.HOTEL_INSERT_QUEUE, true); } @Bean public Queue deleteQueue() { return new Queue(MQConstants.HOTEL_DELETE_QUEUE, true); } /** * 分别定义交换机和两个队列的绑定关系 * @return */ @Bean public Binding insertQueueBinding() { return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MQConstants.HOTEL_INSERT_KEY); } @Bean public Binding deleteQueueBinding() { return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MQConstants.HOTEL_DELETE_KEY); } }
-
消息发送方(hotel-admin):在controller中对应的方法中添加发送消息的功能:
rabbitTemplate.convertAndSend(MQConstants.HO***CHANGE, MQConstants.HOTEL_INSERT_KEY, hotel.getId()); rabbitTemplate.convertAndSend(MQConstants.HO***CHANGE, MQConstants.HOTEL_DELETE_KEY, id);
-
消息接收方(hotel-demo):监听MQ消息。做出对应操作:
@Override public void insertById(Long id) { try { // 根据id查询酒店数据 Hotel hotel = getById(id); // 转换为文档类型 HotelDoc hotelDoc = new HotelDoc(hotel); IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString()); request.source(JSON.toJSONString(hotelDoc), XContentType.JSON); client.index(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } } @Override public void deleteById(Long id) { try { DeleteRequest request = new DeleteRequest("hotel", id.toString()); client.delete(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } }
@Component public class HotelListener { @Autowired private IHotelService hotelService; /** * 监听酒店管理服务新增或修改的业务 * @param id 酒店id */ @RabbitListener(queues = MQConstants.HOTEL_INSERT_QUEUE) public void listenInsertOrUpdate(Long id){ hotelService.InsertById(id); } /** * 监听酒店管理服务删除的业务 * @param id 酒店id */ @RabbitListener(queues = MQConstants.HOTEL_DELETE_QUEUE) public void listenDelete(Long id){ hotelService.deleteById(id); } }
十、ES集群
1.搭建es集群
(1)单机es存在的问题及es集群的解决方案
-
海量数据存储问题
- es集群:将索引库中的数据分为多个分片(shard),存储到多个节点
-
单点故障问题
- es集群:将分片数据进行备份(replica)并存储到与原分片不同的节点中
在单机上利用 Docker 容器运行3个 Elasticsearch 实例来模拟搭建es集群。
-
准备docker-composer文件
version: '2.2' services: es01: image: elasticsearch:7.12.1 container_name: es01 #容器名 environment: - node.name=es01 #节点名 - cluster.name=es-docker-cluster #集群名,只要节点的集群名一致,es会自动将这些节点部署成一个集群。 - discovery.seed_hosts=es02,es03 - cluster.initial_master_nodes=es01,es02,es03 #参与竞选的所有节点 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" #最大最小内存 volumes: - data01:/usr/share/elasticsearch/data ports: - 9200:9200 #宿主机端口:容器内端口 networks: - elastic es02: image: elasticsearch:7.12.1 container_name: es02 environment: - node.name=es02 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es03 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data02:/usr/share/elasticsearch/data ports: - 9201:9200 networks: - elastic es03: image: elasticsearch:7.12.1 container_name: es03 environment: - node.name=es03 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es02 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data03:/usr/share/elasticsearch/data networks: - elastic ports: - 9202:9200 volumes: data01: driver: local data02: driver: local data03: driver: local networks: elastic: driver: bridge
-
修改 Linux 系统权限,修改 /etc/sysctl.conf 文件
vi /etc/sysctl.conf
-
添加下面的内容
vm.max_map_count=262144
-
让配置生效:
sysctl -p
-
通过docker-compose启动集群
docker-compose up -d
-
集群状态监控
- kibana 可以监控 Elasticsearch 集群,但是更推荐使用 cerebro
- 在windows下下载并解压安装包,打开 /bin/cerebro.bat,访问 http://localhost:9000 即可进入管理界面
-
-
-
利用cerebro创建索引库,并进行分片和备份
-
2.集群节点职责划分及脑裂问题
(1)节点职责划分
【tips】默认情况下,集群中的任何一个节点都同时兼职上述四种角色。
职责分离可以让我们根据不同节点的需求分配不同的硬件去部署,而且避免业务之间的互相干扰。真实的集群一定要将集群职责分离:
- 主(master)节点:对 CPU 要求高,但是内存要求低
- 数据(data)节点:对 CPU 和内存要求都高
- 协同(coordinating)节点:对网络带宽、CPU 要求高
(2)脑裂问题及解决方案
1)脑裂问题
脑裂是由于集群中的节点失联导致的。 我们知道主节点一旦宕机,其余候选节点会重选主节点,但主节点失联时(主节点并没有宕机),其余候选节点(eligible node)联系不上主节点,误以为他宕机了,所以也会重选节点。这时就有了两个主节点(脑裂),一旦网络恢复,就会出现数据不同步问题。
【tips】节点失联指的是节点与其他节点发生网络故障,而不是节点宕机。
2)解决方案
解决脑裂的方案是:要求选票超过 (eligible节点数量+1)/2 才能当选为 master,因此 eligible 节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes。
例如:3个节点形成的集群,选票必须超过 (3+1)/2 = 2 票。node3 得到 node2 和 node3 的选票,当选为 master。node1 只有自己 1 票,没有当选。集群中依然只有1个主节点,没有出现脑裂。
在版本 7.0 以后,已经成为默认配置,因此一般不会发生脑裂问题。
3.es集群的分布式存储
上面提到协调节点(coordinating node)负责请求路由,将请求路由到数据节点(data node),完成对应的业务操作,并返回结果。那么协调节点如何确定请求该路由到哪个数据节点呢?
es 会通过 hash 算法来计算文档应该存储到哪个分片:
【tips】①_routing 默认是文档id
②算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!
(1)es的分布式新增
(2)es的分布式查询
es查询分成两个阶段——先分散再聚集:
- scatter phase:分散阶段,coordinating node 会把请求分发到每一个分片
- gather phase:聚集阶段,coordinating node 汇总 data node 的搜索结果,并处理为最终结果集返回给用户
4.es集群的故障转移
集群的 master 节会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它正常节点,确保数据安全,这个叫做故障转移。
【tips】若主节点宕机,候选节点会先选举产生新主节点,然后新主节点进行故障迁移。
十一、微服务保护——Sentinel
1.雪崩问题
(1)概念
微服务之间相互调用,由于微服务调用链中的【某个】微服务故障,引起整个链路都无法访问的情况,称为雪崩。
(2)解决方案
解决雪崩问题的常见方式有四种
- 超时处理:设定超时时间,请求超过一定时间没有响应就返回错误信息,不会无休止等待。
-
线程隔离(舱壁模式):如图,船舱都会被隔板分离为多个独立空间,当船体破损时,只会导致部分船舱进水,这样就将故障控制在一定范围内,避免整个船体都被淹没。类似地,我们可以限定每个服务能使用的线程数,这样一旦某个服务故障,他也只能占据一部分资源,避免耗尽整个 tomcat 的资源,因此也叫线程隔离。
-
- 降级熔断:是一种断路器模式:由断路器统计业务执行的异常比例,如果超出阈值则会熔断该业务,拦截访问该业务的一切请求。
- 限流(流量控制):限制业务访问的 QPS(每秒处理请求数),避免服务因流量的突增而故障。
(3)服务保护技术对比
在 SpringCloud 当中支持多种服务保护技术:
- Netfix Hystrix
- Sentinel
- Resilience4J
2.Sentinel简介及安装
(1)简介
Sentinel是阿里巴巴开源的一款微服务流量控制组件。官网地址:https://sentinelguard.io/zh-cn/index.html Sentinel
具有以下特征
- 丰富的应用场景:Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等。
- 完备的实时监控:Sentinel 同时提供实时的监控功能。可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。
- 广泛的开源生态:Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Dubbo、gRPC 的整合。只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。
- 完善的 SPI 扩展点:Sentinel 提供简单易用、完善的 SPI 扩展接口。可以通过实现扩展接口来快速地定制逻辑。例如定制规则管理、适配动态数据源等。
(2)安装Sentinel控制台
-
下载 jar 包至非中文目录,运行代码:
java -jar sentinel-dashboard-1.8.1.jar
- 访问localhost:8080即可看到控制台页面,默认账号密码都是sentinel
-
如果要修改 Sentinel 的默认端口、账户、密码,可以通过下列配置:
- server.port:默认端口8080
- sentinel.dashboard.auth.username:默认用户名sentinel
- sentinel.dashboard.auth.password:默认密码sentinel
-
例如:修改端口为8090:
java -jar sentinel-dashboard-1.8.1.jar -Dserver.port=8090
3.微服务整合sentinel
以cloud-demo项目为例,演示微服务整合sentinel。在 order-service 中整合 Sentinel,并连接 Sentinel 的控制台,步骤如下:
-
引入sentinel依赖
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> </dependency>
-
配置sentinel控制台信息
server: port: 8088 spring: cloud: sentinel: transport: dashboard: localhost:8080
- 访问 order-service 的任意端点:访问 http://localhost:10010/order/101,多访问几次,多点几次刷新,这样才能触发 Sentinel 的监控。
- 访问 Sentinel 的控制台,查看效果
4.限流规则
虽然雪崩问题有四种解决方案,但是限流(流控)是避免服务因突发的流量而发生故障,是对微服务雪崩问题的预防。因此先学习流量控制。
(1)簇点链路
当请求进入微服务时,首先会访问 DispatcherServlet,然后进入 Controller、Service、Mapper,这样的一个调用链就叫做簇点链路。
簇点链路中被监控的(没被监控的不是资源)每一个接口就是一个资源。默认情况下 Sentinel 会监控 SpringMVC 的每一个端点(Endpoint,也就是 Controller 中的方法),因此 SpringMVC 的每一个端点(Endpoint)就是调用链路中的一个资源。
一个方法要想被 Sentinel 监控,需要通过@SentinelResource注解来标记要监控的方法:
- 例如,我们刚才访问的 order-service 中的 OrderController 中的端点:/order/{orderId}
-
流控、熔断等都是针对簇点链路中的资源来设置的,因此可以点击对应资源后面的按钮来设置限流规则:
- 流控:流量控制
- 降级:降级熔断
- 热点:热点参数限流,是限流的一种
- 授权:请求的权限控制
(2)流控★
点击资源 /order/{orderId} 后面的流控按钮,就可以弹出表单。表单中可以填写限流规则,如下:
1)案例:给 /order/{orderId} 这个资源设置流控规则,QPS 不能超过 5,然后使用JMeter测试。
-
在 sentinel 控制台添加限流规则:
- 利用 jmeter 测试:20 个用户,2 秒内运行完,这样的话 QPS 就是 10,超过了我们设置的 QPS阈值5,所以会出发限流
- 右键运行
2)流控模式
在添加限流规则时,点击高级选项,可以选择三种流控模式
-
直接:统计当前资源的请求,触发阈值时对当前资源(即触发限流的资源)直接限流,也是默认的模式。
- 上面的案例我们用的就是这种模式。
-
关联:统计与当前资源相关的另一个资源,触发阈值时,对当前资源限流。相当于高优先级资源触发阈值,对低优先级资源限流。
- 使用场景:比如用户支付时需要修改订单状态,同时用户要查询订单。查询和修改操作会争抢数据库锁,产生竞争。业务需求是优先支付和更新订单的业务,因此当修改订单业务触发阈值时,需要对查询订单业务限流。
【tips】满足竞争关系的两个优先级不同的资源可以使用关联模式:想对谁限流,就对谁加规则。
- 链路:统计从指定链路访问到本资源的请求,触发阈值时,对指定链路限流,就是针对请求来源的限流。
【tips】链路模式中,是对不同来源的两个链路做监控。但是 sentinel 默认会给进入 SpringMVC 的所有请求设置同一个 root 资源,会导致链路模式失效。因此需要通过修改服务的 application.yml 文件来关闭这种对 SpringMVC 的资源聚合:
spring: cloud: sentinel: web-context-unify: false # 关闭context整合3)流控效果——请求达到阈值时采取的措施
- 快速失败:达到阈值后,新请求会被立即拒绝并抛出 FlowException 异常,是默认的处理方式。之前我们用的都是这种快速失败的模式。
- Warm Up:阈值一般是一个微服务能承担的最大 QPS,但是一个服务刚刚启动时,一切资源尚未初始化(冷启动),如果直接将 QPS 跑到最大值,可能导致服务瞬间宕机。 预热模式是应对服务冷启动的一种方案,虽然同样会拒绝超出阈值的请求并抛出异常,但这种模式下阈值会动态变化,从一个较小值逐渐增加到最大阈值。请求阈值初始值 = maxThreshold / coldFactor,持续指定时长后,逐渐提高到 maxThreshold 值。而 coldFactor 的默认值是 3。
-
排队等待:当请求超过 QPS 阈值时,「快速失败」和 「Warm Up」会拒绝新请求并抛出异常。 而排队等待则是让所有请求进入一个队列中,然后按照阈值允许的时间间隔依次执行。后来的请求必须等待前面执行完成,如果请求预期的等待时间超出最大时长,该请求则会被拒绝。这种方式严格控制了请求通过的间隔时间,即让请求以均匀的速度通过,对应的是漏桶算法。
- 假设第 1 秒同时接收到 10 个请求,但第 2 秒只有 1 个请求,此时 QPS 的曲线如左图;如果使用排队等待的流控效果,所有进入的请求都要排队,以固定的 200ms 的间隔执行,QPS 曲线会变的很平滑(右图):
-
(3)热点参数限流
之前的限流是统计访问某个资源的所有请求,判断是否超过 QPS 阈值。而「热点参数限流」是分别统计参数值相同的请求,判断是否超过 QPS 阈值。
例如,一个根据 id 查询商品的接口,访问 /goods/{id} 的请求中,id 参数值会有变化,「热点参数限流」会根据参数值分别统计 QPS。当 id=1 的请求触发阈值被限流时,id值不为1的请求则不受影响。
-
全局参数限流
- 对 hot 这个资源的 0 号参数(也就是第一个参数)做统计,每 1s 相同参数值的请求数不能超过 5:
-
热点参数限流
- 全局参数限流的配置中,对这个接口的所有商品一视同仁,QPS 都限定为 5。而在实际开发中,可能部分商品是热点商品,例如秒杀商品,我们希望这部分商品的 QPS 限制与其它商品不一样,高一些。那就需要配置「热点参数限流」的高级选项了——参数例外项。
-
结合上面的配置,这里的含义是对 0 号的 long 类型参数限流,每个相同参数的 QPS 不能超过 5,但如下两个参数例外:如果参数值是 100,则每 1s 允许的 QPS 为 10;如果参数值是 101,则每 1s 允许的 QPS 为 15。
【tips】热点参数限流对默认的SpringMVC资源(controller中的方法)是无效的!需要利用 @SentinelResource 注解标记资源!
5.隔离和降级
限流只是一种预防措施,虽然限流可以尽量避免因高并发而引起的服务故障,但服务还会因为其它原因而故障。 而要将这些故障控制在一定范围,避免雪崩,就要靠线程隔离(舱壁模式)和熔断降级手段了。
(1)Feign整合sentinel
可以看到,不管是线程隔离还是熔断降级,都是对客户端(调用方)的保护。需要在调用方发起远程调用时做线程隔离、或者服务熔断。 而我们的微服务远程调用都是基于 Feign 来完成的,因此我们需要将 Feign 与 Sentinel 整合,在 Feign 里面实现线程隔离和服务熔断。
-
修改配置,开启 sentinel 功能:修改服务调用方的 application.yml 文件,开启 Feign 的 sentinel 功能
feign: sentinel: enabled: true # 开启feign对sentinel的支持
-
给 FeignClient 编写失败后的降级逻辑
- 方式一:FallbackClass,但无法对远程调用的异常做处理
- 方式二:FallbackFactory,可以对远程调用的异常做处理(推荐)
-
FallbackFactory实现失败降级处理
-
在 feing-api 项目中定义类并实现 FallbackFactory
@Slf4j public class UserClientFallbackFactory implements FallbackFactory<UserClient> { @Override public UserClient create(Throwable throwable) { return userClient -> { log.error("查询用户失败",throwable); return new User(); }; } }
-
在 feing-api 项目中的配置类中将 UserClientFallbackFactory 注册为Bean
@Bean public UserClientFallbackFactory userClientFallbackFactory(){ return new UserClientFallbackFactory(); }
-
在 feing-api 项目中的 UserClient 接口中使用 UserClientFallbackFactory
@FeignClient(value = "userservice", fallbackFactory = UserClientFallbackFactory.class) public interface UserClient { @GetMapping("/user/{id}") User findById(@PathVariable("id") Long id); }
-
在 feing-api 项目中定义类并实现 FallbackFactory
(2)线程隔离
1)线程隔离(舱壁模式)有两种方式实现
-
线程池隔离:给每个服务调用业务分配一个线程池,利用线程池本身实现隔离效果。
- 优点:支持主动超时和异步调用,适用于低扇出的场景。
- 缺点:线程的额外开销比较大。
-
信号量隔离(Sentinel默认采用):不创建线程池,而是利用计数器模式,记录业务使用的线程数量,达到信号量上限时,禁止新的请求。
- 优点:轻量级,无额外开销,适用于高频调用和高扇出的场景。
- 缺点:不支持主动超时和异步调用
2)sentinel实现信号量线程隔离
线程数是指该资源能使用的 Tomcat 线程数的最大值。也就是通过限制线程数量,实现线程隔离(舱壁模式)。
(3)熔断降级
熔断降级的思路是由断路器统计服务调用的异常比例、慢请求比例,如果超出阈值则会熔断该服务,即拦截访问该服务的一切请求;而当服务恢复时,断路器会放行访问该服务的请求。
降级:某些服务不处理或只做简单处理,如抛异常、返回null、调用Mock数据、调用Fallback处理逻辑等。
1)断路器状态机
断路器控制熔断和放行是通过状态机来完成的,如下图就是一个断路器的状态机:
状态机包括三个状态:
- closed:关闭状态,断路器放行所有请求,并开始统计异常比例、慢请求比例,并判断是否达到熔断条件(熔断策略),达到该条件则切换到 open 状态,打开断路器。
- open:打开状态,服务调用被熔断,访问被熔断服务的请求都会被拒绝,快速失败,直接走降级逻辑。Open 状态 5 秒后会进入 half-open 状态。
-
half-open:半开状态,会一段时间放行一次请求,根据执行结果来判断接下来的操作:
- 请求成功:切换到 closed 状态
- 请求失败:切换到 open 状态
2)断路器熔断策略
断路器熔断策略有三种:慢调用比例、异常比例、异常数。
-
慢调用比例:业务的响应时长(RT)大于指定时长的请求认定为慢调用请求。在指定时间内,如果请求数量超过设定的最小数量,慢调用比例大于设定的阈值,则触发熔断。
- 设置 RT 超过 500ms 的调用是慢调用,统计最近 10000ms 内的请求,如果请求量超过 10 次,并且慢调用比例高于50%(0.5),则触发熔断,熔断时长为 5s,然后进入 half-open 状态,放行一次请求做测试。
-
异常比例或异常数:统计指定时间内的调用,如果调用次数超过指定请求数,并且出现异常的比例(或异常次数)达到设定的比例阈值(或超过指定异常数),则触发熔断。
- 异常比例设置如左图,统计最近 1000ms 内的请求,如果请求量超过 10 次且异常比例高于0.4,则触发熔断。
-
异常数设置如下,统计最近 1000ms 内的请求,如果请求量超过 10 次且异常次数高于 2 次,则触发熔断。
6.授权规则
授权规则可以对请求方来源做判断和控制,有白名单和黑名单两种方式:
- 白名单:来源(origin)在白名单内的调用者允许访问
- 黑名单:来源(origin)在黑名单内的调用者不允许访问
【案例】:比如我们允许请求从 网关gateway 到 order-service,不允许浏览器访问 order-service,那么白名单中就要填写网关的来源名称(origin)。
-
sentinel 是通过 RequestOriginParser 这个接口的 parseOrigin() 方法来获取请求的来源的
public interface RequestOriginParser { /** * 从请求request对象中获取来源origin,获取方式自定义 */ String parseOrigin(HttpServletRequest request); }
【tips】默认情况下,sentinel 不管请求者从哪里来,返回值永远是 default,也就是说一切请求的来源都被认为是一样的值 default。因此,我们需要自定义这个接口的方法:让不同的请求,返回不同的 origin。 -
在order-service 服务中,定义一个 RequestOriginParser 的实现类 HeaderOriginParser ,重写parseOrigin方法
@Component public class HeaderOriginParser implements RequestOriginParser { @Override public String parseOrigin(HttpServletRequest request) { // 1.获取来源的请求头 String origin = request.getHeader("origin"); // 2.非空判断 if (StringUtils.isEmpty(origin)) { origin = "blank"; } //3.返回来源 return origin; } }
【tips】默认情况下,来自gateway和浏览器的请求,请求头里的origin都是空的,所以我们需要让所有从 gateway 路由到微服务的请求都带上请求头origin:通过网关过滤器实现。 -
修改 gateway 服务中的 application.yml,使用 AddRequestHeader 过滤器给所有经过gateway的请求加上请求头 origin = gateway
spring: cloud: gateway: default-filters: - AddRequestHeader=origin,gateway #键值对形式
- 在sentinel控制台添加一个授权规则,放行 origin 值为 gateway 的请求
- 测试
7.自定义异常结果
默认情况下,发生限流、降级、授权拦截时,都会抛出异常到调用方,但是异常结果都是 flow limmiting(限流),这样无法得知是限流还是降级还是授权拦截。
我们可以通过实现BlockExceptionHandler接口来自定义异常时的返回结果。
public interface BlockExceptionHandler { /** * 该方法用来处理请求被限流、降级、授权拦截时抛出的BlockException异常。 */ void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception; }BlockException 包含多个不同的子类:
【案例】:自定义异常处理
-
在 order-service 定义一个自定义异常处理类
@Component public class SentinelExceptionHandler implements BlockExceptionHandler { @Override public void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception { String msg = "未知异常"; int status = 429; if (e instanceof FlowException) { msg = "请求被限流了"; } else if (e instanceof ParamFlowException) { msg = "请求被热点参数限流"; } else if (e instanceof DegradeException) { msg = "请求被降级了"; } else if (e instanceof AuthorityException) { msg = "没有权限访问"; status = 401; } response.setContentType("application/json;charset=utf-8"); response.setStatus(status); response.getWriter().println("{\"msg\": " + msg + ", \"status\": " + status + "}"); } }
- 测试
8.规则持久化
sentinel 的所有规则都是内存存储,重启后所有规则都会丢失。在生产环境下,我们必须确保这些规则的持久化,避免丢失。
规则管理模式
规则能否持久化,取决于规则的管理模式。sentinel 支持三种规则管理模式
- 原始模式:Sentinel 的默认模式,将规则保存在内存,重启服务会丢失。
- pull模式:控制台将配置的规则推送到 Sentinel 客户端,而客户端会将配置规则保存在本地文件或数据库中。以后会定时去本地文件或数据库中查询,更新本地规则。
- push模式:控制台将配置规则推送到远程配置中心,例如 Nacos,Sentinel 客户端监听 Nacos,获取配置变更的推送消息,完成本地配置实时更新。