1. 事务

1.1 概念

简单理解为要做或者所做的事情称之为事务。

1.2 Redis事务

redis 支持事务,redis的<mark>单行命令</mark>支持事务,也就是说<mark>单行命令</mark> 可以保证原子性,但是redis事务并不保证原子性。

1.3 Redis事务的本质

Redis事务是一组命令的集合,使用队列存储,队列中所有的命令都会被序列化到内存中,并且一次按照队列的顺序执行。

特性:具有排他性,顺序性,一次性。
注意:redis不支持隔离级别,也就是不会出现脏读,幻读等情况。

1.4 Redis事务使用

  • 开启事务 multi
  • 命令入队列
  • 执行事务 exec
  1. 正常使用事务

    可见命令入队时并不会执行,而是使用exec命令时才会执行。

  2. 放弃事务

  3. 事务出现异常

    编译期异常

    代码有问题,编译器都过不了
    此时事务是不能被正常执行的

    运行时异常

    和Java的运行时异常类似(除0异常,数组越界),表示的是可以通过编译器,但是不能运行期过不了,此时不会影响整个事务,只是会影响错误的那条命令。

    可见它只会影响到错误执行的命令。

2. Redis实现乐观锁(面试重点)

  • 乐观锁:乐观的一面,认为每次操作都不会有人去修改数据,只是在更新数据的时后利用version字段去判断数据有没有被修改过,常用于多写 的场景。
  • 悲观锁:悲观的一面,认为每次操作数据时都有人去修改,因此每次都会上锁,例如我们在Java中的sychronized和同步代码块,常用于多读的场合。

面试官:redis支持乐观锁吗?
支持,使用的watch关键字,起到的作用相当于加上乐观锁。

redis的wathc监测:

此时并不能体现处乐观锁的作用,开启两个redis客户端进行测试:


执行完客户端2的修改后,重新执行客户端1的

发现事务并没有被执行成功,因为exec命令,在开启watch的时候,exec命令在提交事务的时候回去检测version是否被修改了,如果被修改了(此时被客户端2改成了1000),则事务提交失效。

解决方法:重新watch

3. Jedis

3.1 Jedis操作五大常见类型

涉及到的方法都是只是常见的方法,其它方法可以查阅API文档!

  • String:
ublic class StringTest {
   
    public static void main(String[] args) {
   
        //1.获取Jedis对象
        Jedis jedis = new Jedis("localhost",6379);

        //2.1普通存储与获取
        jedis.set("name", "liuzeyu");
        jedis.set("age", "21");
        jedis.del("age");
        System.out.println(jedis.get("name")+"--"+jedis.get("age"));
        System.out.println("================================");
        //2.1普通存储与获取
        jedis.setex("singer", 30, "jay");  //设置30s后过期,测试的时候需要注释掉
        String singer = jedis.get("singer");
        Long ttl = jedis.ttl("singer");
        System.out.println(singer+":"+ ttl);

        jedis.setnx("admin", "duyanting");
        System.out.println(jedis.get("admin"));
    }
}
  • Hash
public class HashTest {
   
    public static void main(String[] args) {
   

        Jedis jedis = new Jedis();
        jedis.hset("person", "user", "liuzeyu");
        jedis.hset("person", "age", "32");

        String age = jedis.hget("person", "age");
        System.out.println(age);

        Map<String, String> person = jedis.hgetAll("person");
        Set<String> keySet = person.keySet();
        for (String s : keySet) {
   
            System.out.println(s+"--->"+person.get(s));
        }
    }
}
  • List
public class ListTest {
   
    public static void main(String[] args) {
   
        Jedis jedis = new Jedis("localhost",6379);
        jedis.select(1);  //选择1数据库

        jedis.lpush("name","liuzeyu");
        jedis.lpush("name", "duyanting");

        jedis.rpush("name", "yellow");
        List<String> list = jedis.lrange("name", 0, -1);
        System.out.println(list);
    }
}

  • Set
public class SetTest {
   

    public static void main(String[] args) {
   
        Jedis jedis = new Jedis();

        jedis.sadd("name", "liuzeyu");
        jedis.sadd("name", "duyanting");

        Set<String> name = jedis.smembers("name");
        System.out.println(name);
    }
}
  • ZSet
public class ZSetTest {
   
    public static void main(String[] args) {
   
        Jedis jedis = new Jedis();
        jedis.zadd("name", 343, "xiaoming");
        jedis.zadd("name", 341, "xiaohon");
        jedis.zadd("name", 333, "xiaozhang");
        jedis.zadd("name", 112, "xialiu");

        Set<String> name = jedis.zrange("name", 0, -1);
        Long zcount = jedis.zcount("name", 33, 2343);
        System.out.println(zcount);
    }
}

3.2 Jedis操作事务

public class TestRedisTX {
   
    public static void main(String[] args) {
   
        Jedis jedis = new Jedis();
        jedis.flushDB();

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("name", "liuzeyu");
        jsonObject.put("age", "10");

        String result = jsonObject.toString();

        Transaction multi = jedis.multi();//开启事务
        try {
   
            multi.set("k1", result);
            multi.set("k2", result);
            int i = 5/0;
            multi.exec();
        } catch (Exception e) {
   
            multi.discard();
            e.printStackTrace();
        }finally {
   
            System.out.println(jedis.get("k1"));
            System.out.println(jedis.get("k2"));
            jedis.close();
        }
    }
}

此时会因为catch中捕获异常后会放弃事务,所有json数据将没有存进去!

4. SpringBoot整合

4.1 新建spring boot项目

创建项目时需要勾选springboot整合redis的起步依赖

4.2 源码分析

  1. 在pom.xml中看到我们导入的依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  1. 点进去可以看到此时操作redis的操作并非Jedis了,而是lettuce
<dependency>
	<groupId>io.lettuce</groupId>
	<artifactId>lettuce-core</artifactId>
	<scope>compile</scope>
</dependency>

这是spring boot 2.x之后所作的一大改变,两者的区别:

  • Jedis:采用直连,多线程环境下,并不安全,需要改用Jedis Pool技术来替换,像BIO
  • Lettuce:采用netty,实例可以在多线程中进行共享,不存在线程不安全的情况,更像NIO。

BIO NIO介绍

  1. 进入spring.factory中搜索redis
  2. 进入redis自动配置类
@Configuration(
    proxyBeanMethods = false
)
@ConditionalOnClass({
   RedisOperations.class})
@EnableConfigurationProperties({
   RedisProperties.class})
@Import({
   LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class})
public class RedisAutoConfiguration {
   
    public RedisAutoConfiguration() {
   
    }

    @Bean
    @ConditionalOnMissingBean(
        name = {
   "redisTemplate"}
    )
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
   
        RedisTemplate<Object, Object> template = new RedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

    @Bean
    @ConditionalOnMissingBean
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
   
        StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }
}

  1. 看到于配置文件相关联的RedisProperties类和两个重要的函数
    @Bean
    @ConditionalOnMissingBean(
        name = {
   "redisTemplate"}
    )
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
   
        RedisTemplate<Object, Object> template = new RedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

    @Bean
    @ConditionalOnMissingBean
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
   
        StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }
  1. RedisTemplate对象就是操作redis的关键对象,spring boot为我们提供了默认的获取方式redisTemplate,但是在日常开发中,我们通常需要自定义RedisTemplate对象,在redis中,所有的对象都需要序列化,包括RedisTemplate,由于我们在redis操作最频繁的是String类型,于是源码也提供了基于操作字符串stringRedisTemplate的对象。

4.3 配置连接

# redis配置
# redis自动配置类RedisProperties
spring.redis.host=localhost
spring.redis.port=6379

上面两个配置在RedisPropertis都提供了默认赋值

4.4 自定义RedisTemplate

@Configuration
public class RedisConfig {
   

    @Bean
    @Deprecated
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
   
        //我们自己使用RedisTemplate一般使用<String,Object>
        RedisTemplate<String, Object> template = new RedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

    @Bean
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
   
        StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }
}

4.5 封装操作Redis工具类

先说一下为什么要封装Redis工具类,我们提供的RedisTemplate不能操作数据吗?
先给测试类注入RedisTemplate对象

  @Autowired
  @Qualifier("redisTemplate")
  private RedisTemplate redisTemplate;

测试代码:

  @Test
    public void test(){
   
        //操作字符串
        redisTemplate.opsForValue().set("k1", "v1");
        System.out.println(redisTemplate.opsForValue().get("k1"));

        //操作hash结构
        redisTemplate.opsForHash().put("user", "name", "lzy");
        System.out.println(redisTemplate.opsForHash().get("user", "name"));

        //操作set
        redisTemplate.opsForSet().add("singer", "jay","jack");
        redisTemplate.opsForSet().add("age", 33);
        System.out.println(redisTemplate.opsForSet().size("singer"));

//....其它数据类型
    }

可见代码多处出现冗余。

RedisTemplate封装:

package com.liuzeyu.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

// 在我们真实的分发中,或者你们在公司,一般都可以看到一个公司自己封装RedisUtil
@Component
public final class RedisUtil {
   

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // =============================common============================
    /** * 指定缓存失效时间 * @param key 键 * @param time 时间(秒) */
    public boolean expire(String key, long time) {
   
        try {
   
            if (time > 0) {
   
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
   
            e.printStackTrace();
            return false;
        }
    }

    /** * 根据key 获取过期时间 * @param key 键 不能为null * @return 时间(秒) 返回0代表为永久有效 */
    public long getExpire(String key) {
   
        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
    }


    /** * 判断key是否存在 * @param key 键 * @return true 存在 false不存在 */
    public boolean hasKey(String key) {
   
        try {
   
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
   
            e.printStackTrace();
            return false;
        }
    }


    /** * 删除缓存 * @param key 可以传一个值 或多个 */
    @SuppressWarnings("unchecked")
    public void del(String... key) {
   
        if (key != null && key.length > 0) {
   
            if (key.length == 1) {
   
                redisTemplate.delete(key[0]);
            } else {
   
                redisTemplate.delete(CollectionUtils.arrayToList(key));
            }
        }
    }


    // ============================String=============================

    /** * 普通缓存获取 * @param key 键 * @return 值 */
    public Object get(String key) {
   
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }
    
    /** * 普通缓存放入 * @param key 键 * @param value 值 * @return true成功 false失败 */

    public boolean set(String key, Object value) {
   
        try {
   
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
   
            e.printStackTrace();
            return false;
        }
    }


    /** * 普通缓存放入并设置时间 * @param key 键 * @param value 值 * @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期 * @return true成功 false 失败 */

    public boolean set(String key, Object value, long time) {
   
        try {
   
            if (time > 0) {
   
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            } else {
   
                set(key, value);
            }
            return true;
        } catch (Exception e) {
   
            e.printStackTrace();
            return false;
        }
    }


    /** * 递增 * @param key 键 * @param delta 要增加几(大于0) */
    public long incr(String key, long delta) {
   
        if (delta < 0) {
   
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }


    /** * 递减 * @param key 键 * @param delta 要减少几(小于0) */
    public long decr(String key, long delta) {
   
        if (delta < 0) {
   
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }


    // ================================Map=================================

    /** * HashGet * @param key 键 不能为null * @param item 项 不能为null */
    public Object hget(String key, String item) {
   
        return redisTemplate.opsForHash().get(key, item);
    }
    
    /** * 获取hashKey对应的所有键值 * @param key 键 * @return 对应的多个键值 */
    public Map<Object, Object> hmget(String key) {
   
        return redisTemplate.opsForHash().entries(key);
    }
    
    /** * HashSet * @param key 键 * @param map 对应多个键值 */
    public boolean hmset(String key, Map<String, Object> map) {
   
        try {
   
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
   
            e.printStackTrace();
            return false;
        }
    }


    /** * HashSet 并设置时间 * @param key 键 * @param map 对应多个键值 * @param time 时间(秒) * @return true成功 false失败 */
    public boolean hmset(String key, Map<String, Object> map, long time) {
   
        try {
   
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0) {
   
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
   
            e.printStackTrace();
            return false;
        }
    }


    /** * 向一张hash表中放入数据,如果不存在将创建 * * @param key 键 * @param item 项 * @param value 值 * @return true 成功 false失败 */
    public boolean hset(String key, String item, Object value) {
   
        try {
   
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
   
            e.printStackTrace();
            return false;
        }
    }

    /** * 向一张hash表中放入数据,如果不存在将创建 * * @param key 键 * @param item 项 * @param value 值 * @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间 * @return true 成功 false失败 */
    public boolean hset(String key, String item, Object value, long time) {
   
        try {
   
            redisTemplate.opsForHash().put(key, item, value);
            if (time > 0) {
   
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
   
            e.printStackTrace();
            return false;
        }
    }


    /** * 删除hash表中的值 * * @param key 键 不能为null * @param item 项 可以使多个 不能为null */
    public void hdel(String key, Object... item) {
   
        redisTemplate.opsForHash().delete(key, item);
    }


    /** * 判断hash表中是否有该项的值 * * @param key 键 不能为null * @param item 项 不能为null * @return true 存在 false不存在 */
    public boolean hHasKey(String key, String item) {
   
        return redisTemplate.opsForHash().hasKey(key, item);
    }


    /** * hash递增 如果不存在,就会创建一个 并把新增后的值返回 * * @param key 键 * @param item 项 * @param by 要增加几(大于0) */
    public double hincr(String key, String item, double by) {
   
        return redisTemplate.opsForHash().increment(key, item, by);
    }


    /** * hash递减 * * @param key 键 * @param item 项 * @param by 要减少记(小于0) */
    public double hdecr(String key, String item, double by) {
   
        return redisTemplate.opsForHash().increment(key, item, -by);
    }


    // ============================set=============================

    /** * 根据key获取Set中的所有值 * @param key 键 */
    public Set<Object> sGet(String key) {
   
        try {
   
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
   
            e.printStackTrace();
            return null;
        }
    }


    /** * 根据value从一个set中查询,是否存在 * * @param key 键 * @param value 值 * @return true 存在 false不存在 */
    public boolean sHasKey(String key, Object value) {
   
        try {
   
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
   
            e.printStackTrace();
            return false;
        }
    }


    /** * 将数据放入set缓存 * * @param key 键 * @param values 值 可以是多个 * @return 成功个数 */
    public long sSet(String key, Object... values) {
   
        try {
   
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
   
            e.printStackTrace();
            return 0;
        }
    }


    /** * 将set数据放入缓存 * * @param key 键 * @param time 时间(秒) * @param values 值 可以是多个 * @return 成功个数 */
    public long sSetAndTime(String key, long time, Object... values) {
   
        try {
   
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0)
                expire(key, time);
            return count;
        } catch (Exception e) {
   
            e.printStackTrace();
            return 0;
        }
    }


    /** * 获取set缓存的长度 * * @param key 键 */
    public long sGetSetSize(String key) {
   
        try {
   
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
   
            e.printStackTrace();
            return 0;
        }
    }


    /** * 移除值为value的 * * @param key 键 * @param values 值 可以是多个 * @return 移除的个数 */

    public long setRemove(String key, Object... values) {
   
        try {
   
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
   
            e.printStackTrace();
            return 0;
        }
    }

    // ===============================list=================================
    
    /** * 获取list缓存的内容 * * @param key 键 * @param start 开始 * @param end 结束 0 到 -1代表所有值 */
    public List<Object> lGet(String key, long start, long end) {
   
        try {
   
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
   
            e.printStackTrace();
            return null;
        }
    }


    /** * 获取list缓存的长度 * * @param key 键 */
    public long lGetListSize(String key) {
   
        try {
   
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
   
            e.printStackTrace();
            return 0;
        }
    }


    /** * 通过索引 获取list中的值 * * @param key 键 * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推 */
    public Object lGetIndex(String key, long index) {
   
        try {
   
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
   
            e.printStackTrace();
            return null;
        }
    }


    /** * 将list放入缓存 * * @param key 键 * @param value 值 */
    public boolean lSet(String key, Object value) {
   
        try {
   
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
   
            e.printStackTrace();
            return false;
        }
    }


    /** * 将list放入缓存 * @param key 键 * @param value 值 * @param time 时间(秒) */
    public boolean lSet(String key, Object value, long time) {
   
        try {
   
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
   
            e.printStackTrace();
            return false;
        }

    }


    /** * 将list放入缓存 * * @param key 键 * @param value 值 * @return */
    public boolean lSet(String key, List<Object> value) {
   
        try {
   
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
   
            e.printStackTrace();
            return false;
        }

    }


    /** * 将list放入缓存 * * @param key 键 * @param value 值 * @param time 时间(秒) * @return */
    public boolean lSet(String key, List<Object> value, long time) {
   
        try {
   
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
   
            e.printStackTrace();
            return false;
        }
    }

    /** * 根据索引修改list中的某条数据 * * @param key 键 * @param index 索引 * @param value 值 * @return */

    public boolean lUpdateIndex(String key, long index, Object value) {
   
        try {
   
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
   
            e.printStackTrace();
            return false;
        }
    }

    /** * 移除N个值为value * * @param key 键 * @param count 移除多少个 * @param value 值 * @return 移除的个数 */

    public long lRemove(String key, long count, Object value) {
   
        try {
   
            Long remove = redisTemplate.opsForList().remove(key, count, value);
            return remove;
        } catch (Exception e) {
   
            e.printStackTrace();
            return 0;
        }

    }

}

4.6 测试

  1. 新建User类
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User{
   

    private Integer id;
    private String name;
}
  1. 注入工具类
@Autowired
private RedisUtil redisUtil;
  1. 测试
 @Test
 void contextLoads() {
   
     //存储普通的值
     redisUtil.set("name", "liuzeyu");
     System.out.println(redisUtil.get("name"));

     //存储对象
     redisUtil.set("user", new User(1, "liuzeyu"));
     System.out.println(redisUtil.get("user"));
 }
  1. 测试结果:

对象序列化错误,此时我们的user对象并没有被序列化到redis内存中,前面我们提到,redis的所有对象都需要被序列化。

  1. 解决方案
  • User对象实现序列化接口
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable{
   

    private Integer id;
    private String name;
}
  • 自定义序列化规则(redis开发中更用)
    修改redis的配置文件:
    @Bean
    @Deprecated
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
   
        //我们自己使用RedisTemplate一般使用<String,Object>
        RedisTemplate<String, Object> template = new RedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);

        //JSon序列化配置
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new
                Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);

        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        //String序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        //key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        //hash的key也采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        //value采用json的序列化方式
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //hash的value也采用json的序列化方式
        template.setHashValueSerializer(jackson2JsonRedisSerializer);

        template.afterPropertiesSet();

        return template;
    }

两种方法,问题均可以解决!

5. Redis.conf文件详解

从第5部分起,是Redis的核心重难点!启动redis服务器通常我们需要从配置文件启动。

1、单位:GENERAL

规定了redis内存中的单位

# Note on units: when memory size is needed, it is possible to specify
# it in the usual form of 1k 5GB 4M and so forth:
#
# 1k => 1000 bytes
# 1kb => 1024 bytes
# 1m => 1000000 bytes
# 1mb => 1024*1024 bytes
# 1g => 1000000000 bytes
# 1gb => 1024*1024*1024 bytes
#
# units are case insensitive so 1GB 1Gb 1gB are all the same.

units 单位对大小写不敏感!

2、包含:INCLUDES

# Include one or more other config files here. This is useful if you
# have a standard template that goes to all Redis servers but also need
# to customize a few per-server settings. Include files can include
# other files, so use this wisely.
#
# Notice option "include" won't be rewritten by command "CONFIG REWRITE"
# from admin or Redis Sentinel. Since Redis always uses the last processed
# line as value of a configuration directive, you'd better put includes
# at the beginning of this file to avoid overwriting config change at runtime.
#
# If instead you are interested in using includes to override configuration
# options, it is better to use include as the last line.
#
# include /path/to/local.conf
# include /path/to/other.conf

redis.conf可以由多个配置文件导入,使用include关键字

3、网络:NETWORD

bind 127.0.0.1
protected-mode yes  # 保护模式
port 6379	# 默认端口

默认只会绑定本机,可以使用bind绑定其它主机,

4、通用:GENERAL

daemonize yes # 守护进程daemonize 默认关闭,需要我们手动开启
pidfile /var/run/redis_6379.pid #如果是后台运行,我们需要指定一个pid文件
# Specify the server verbosity level.
# This can be one of:
# debug (a lot of information, useful for development/testing)
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably)
# warning (only very important / critical messages are logged)
loglevel notice  # 日志等级
logfile ""	#日志文件位置名
databases 16 # 数据库默认16个
always-show-logo yes  # 总是显示开启默认的LOG

5、快照:SNAPSHOTTING

因为redis数据是存储在内存中,所有容易 断电即失,因此我们需要将它持久化到硬盘上,持久化后的文件后缀有两种,.aof.rdb

# Note: you can disable saving completely by commenting out all "save" lines.
#
# It is also possible to remove all the previously configured save
# points by adding a save directive with a single empty string argument
# like in the following example:
#
# save ""

save 900 1 #900s内至少有1 个key被修改了,进行持久化
save 300 10 #300s内至少有10 个key被修改了,进行持久化
save 60 60 #60s内至少有60个key被修改了,进行持久化

6、复制:REPLICATION

主从复制学完回来补充

7、安全:SECURITY

8、客户端限制:CLIENTS

# maxclients 10000 # 运行客户端最大的连接数10000

9、aof配置:APPEND ONLY MODE

appendonly no  # 默认不开启aof持久化,采用的是rdb存储方式
# The name of the append only file (default: "appendonly.aof")
appendfilename "appendonly.aof" # 持久化文件的名字
appendfsync alway     # 每次修改都会进行sync 影响性能
appendfsync everysec  # 每秒执行一次sync 可能会丢失这1s的数据
appendfsync no 	# 不进行sync 这个时候操作系统自己同步数据,速度最快!!

6. Redis持久化

Redis持久化一直都是面试和工作的重点,因为redis是内存数据库,因此数据有断电即失的风险,所有redis提供了持久化功能。

6.1 rdb

rdb的运行原理图

在指定的时间间隔内,将内存中的数据集于快照的形式写入到磁盘文件中,恢复过程就是将快照读到内存中即可。

<mark>持久化原理:</mark>
Redis会单独(fork)一个子进程进行持久化处理,会先将数据写入到一个临时RDB文件中,然后用临时的RDB文件去覆盖已经持久化后的RBD文件。整个过程中,主进程是不会进行IO操作的,这就确保了极高的性能,如果是大规模的数据,并且对数据的完整性要求不是很敏感时,那么RDB方式要比AOF方式更加的高效。RDB有一个很大的缺点就是,如果在最后一次持久化时服务器宕机了,数据可能会丢失,我们redis默认采用的就是RDB,一般情况下不要去修改这些配置。

rdb的保存文件是dump.rdb,一般不进行修改,通常在生产环境下需要备份dump.rdb文件,都是在我们配置文件中的快照中进行配置。

一、触发机制

  1. 通过修改快照配置,满足save的某种规则:

    观察redis/bin目录下的dump.rdb的生成。

  2. redis中执行shutdown关机也会生成dump.rdb文件

    开机需要先开启服务器,才可以连接上客户端!

  3. 执行flushall命令也会生成dump.rdb文件,注意flushdb不可以!

二、如何恢复rdb文件?

只需要将dump.rdb文件置于redis的启动目录即可!

127.0.0.1:6379> config get dir
1) "dir"
2) "/usr/local/redis/bin"
127.0.0.1:6379>

三、采用rdb的优缺点

优点:
	1. 适合大规模的数据恢复
	2. 对数据的完整性要求不高
缺点:
	1. 需要一定的时间间隔操作,如果redis宕机了,最后一次修改的数据也就没有了
	2. fork子进程时,会占用一定的内存空间。

6.2 aof

什么是aof:也是一种持久化操作,aof过程会将我们的操作命令都记录下来,恢复时,读取这些操作命令到redis中。


redis将我们那些写操作以日志的形式记录下来,读操作不记录,然后写入到aof文件中,只允许追加aof文件,不允许修改aof文件,redis启动之初会读取改aof文件重新构建数据,换言之,redis重启过程中,就是将aof日志文件从前到后执行一遍以完成数据恢复操作!

aof保存文件为appendonly.aof,可以查看redis.conf的配置文件

appendonly no  # 默认不开启aof持久化,采用的是rdb存储方式
# The name of the append only file (default: "appendonly.aof")
appendfilename "appendonly.aof" # 持久化文件的名字
appendfsync alway     # 每次修改都会进行sync 影响性能
appendfsync everysec  # 每秒执行一次sync 可能会丢失这1s的数据
appendfsync no 	# 不进行sync 这个时候操作系统自己同步数据,速度最快!!

默认不会开启aof,将no改为yes开启aof。

一、触发机制

  • 当关掉redis执行shutdown文件时会在redis的启动目录下生成appendonly.aof文件
  • 执行flushall命令时会在redis的启动目录下生成appendonly.aof文件

二、如和修复aof文件

如果生成appendonly.aof文件被修改了,并且有错误,在redis启动时会被会被加载成功,尝试一下!

  1. 打开appendonly.aof加入错误
  2. 重启redis,连接服务端

    注意:修复的过程中,可能也会将正确的文件一同删掉的,但毕竟也是少数的。

三、aof的优缺点

优点:
	1. 每一次修改都同步,数据的完整性比较好
	2. 每秒同步一次,可能会丢失数据
	3. 从不同步,效率是最高的。
缺点:
	1. 相对于数据文件来说,aof是远大于rdb的,修复的速度也更慢
	2. aof的效率比rdb慢,读写操作频繁,所以我们默认都是使用rdb做持久化的

四、重写规则说明


默认情况下,当aof文件大小达到64m时,父进程会fork一个新的进程来完成我们文件的重写,重写可以压缩aof文件的体积。

6.3 小结

  • RDB持久化可以在一定时间间隔内对内存中的数据进行快照存储
  • AOF持久化操作记录每次服务器的写操作,当服务器重启会重写执行aof文件中的命令来恢复内存中的数据,aof以追加的形式把服务器写操作存入aof文件末尾,redis还可以fork子进程对aof文件进行重写,不至于文件过大。
  • 只做缓存的话,可以不要开启持久化
  • 同时开启两种持久化:
    • 在这种情况下,redis启动会优先载入aof文件来恢复原有数据,因为通常情况下aof文件的完整性要比rdb的好。
    • 当rdb文件不实时,服务器重写启动也只会先加载aof文件,那要不要只使用aof文件呢?不要!!rdb文件适应于做备份数据库,因为aof是不断变化的不好备份,快速重启时而且不会像aof那样存在bug,rdb做备份最合适!

7. Redis发布订阅

Redis发布订阅(pub/sub)是一种消息通信模式,发布者(pub)发送消息,订阅者(sub)接收消息,适应场景:微信,微博,关注系统。

订阅/发布消息图:

例如:client 2,5,1订阅(sub)了channel 1

当有消息发布(pub)到频道1时,这个消息也会被发送到client 2,5,1

常用命令


测试:

<mark>原理:</mark>
Redis采用C语言编写,底层原理代码放在 pubsub.c 文件中。
通过subscribe命令订阅频道后,在redis的服务端维护者一个字典,字典的键就是一个个频道,而且字典的值的数据结构是一个链表,而且链表中保存了订阅了某频道的客户端。subscribe命令的关键在于将客户端添加到指定频道的的链表中。
通过publish命令向订阅者发布消息,redis服务端会使用给定的频道作为键,在它所维护的链表中遍历订阅了该频道的客户端,将消息依次推送到客户端。

PUB/SUB,译为发布与订阅,在redis中,可以可以设定为某一个key进行消息发布和消息订阅,这里的key就把它当作一个频道即可,当向这个频道推送消息后,所有订阅该频道的客户端都会收到相应的消息。这一功能明显可以用来开发实时消息系统,群聊功能等。

<mark>使用场景</mark>

  • 实时消息系统
  • 实时群聊天系统
  • 订阅,关注系统

遇到稍微复杂一点的场景,可以使用消息中间件MQ…

8. Redis主从复制

8.1 主从复制概念

主从复制指的是一台服务器的数据复制到另一台服务器,前者是主节点(master/leader),后者是从节点(slaver/follower)。

<mark>数据的复制是单向的,只能由主节点到从节点,默认情况下,每台redis服务器都是主节点</mark>
并且一个主节点可以由多个从节点,但从节点只能由一个主节点!

<mark>主从复制的作用:</mark>

  • 数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余的方式
  • 故障恢复:但主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复,实际上也是一种服务的冗余。
  • 负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,从节点提供读服务,分担服务器负载,尤其在写少读多的场景下,通过从节点分担读负载,可以大大提高redis服务器的并发量!
  • 高可用(集群)基石:除了上述作用外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是redis高可用的基石。

一般来说,在redis运用的工程中,使用一台redis服务器是万万不能(宕机)的,原因如下:

  • 从结构上来讲,单个redis服务器容易出现单点故障,并且一台服务器需要处理的请求负载,压力比较大;
  • 从容量上讲,单个redis服务器内存容量有限,单台redis服务器最大的使用内存不应该超过20G

电商网站上的商品往往都是一次上传,多次浏览,说的专业点就是 多读少写
对于这样的场景,可以使用如下架构:

主从复制,读写分离,80%的情况都是在读,为了减轻服务器的压力,一般可以使用一主二从。

8.2 传统的Redis集群设计

以一主二从为例来搭建redis服务器集群

  1. 准备多个redis配置文件,用来启动不同的redis服务器
  2. 开启redis窗口
  3. 修改一主二从的配置文件

主要修改下面几个方面:

1. 端口号:主:6379 ;从1:6380 ;从:6381
2. pid名字:主:/var/run/redis6379.pid;从1:redis6379.pid ;从2:redis6379.pid
3. log文件名字:主:log6379;从1:log6380 ;从:log6381
4. rbd文件:主:dump6379.rdb ;从1:dump6380.rdb ;从:dump6381.rdb

这样可以避免发生启动端口占用等问题

5. 修改完成后,我们启动这三台服务器

6. 连接

三台都连接上后,查看进程

  1. 查看主从关系



    可见每一台服务器都是默认是maser,并且slaves = 0,表示没有从机!

  2. 分配主机和从机

在两台从机上运行:

127.0.0.1:6380> SLAVEOF 127.0.0.1 6379  # 找主机
OK
(0.86s)
127.0.0.1:6380>

重新查看两台从机和主机的状态:


此时的服务器结构图:

9. 特点

  1. 主机可以写数据,从机只能读数据,并且主机写完数据立马同步到主机,从机可以获取到数据
  2. 主机如果宕机,两个从机还是能获取到主机存进去的值,但是也不能写,如果主机重新起来了,那么它将重新是两个从机的maser
  3. 如果是从机宕机,重启后就会丢失数据,但是只要称为主机的从机,那么就可以立即从主机那边同步到数据

<mark>主从复制原理:</mark>
slave启动成功后会向maser发送一个sync同步命令,maser接收命令后,启动后台的存盘进程,同时收集所有接收到用于修改数据集的指令,在后台进程执行完毕后,maser将传送各个数据到slave,并完成一次完全同步!

  • 全量复制:slave服务器第一次连接到maser并接收到数据库文件数据后,将其存盘加载进内存!
  • 增量复制:maser继续将新的收集到的修改命令依次传给slave,完成同步,这就叫做增量复制。

但只要是重新连接maser,一次完全同步(全量复制)将被自动执行,我们的数据一定可以在从机中看到!

  1. 一种特殊情况
    主机maser连接着一个从机slave,此时这个slave又充当另外一个slave的注解存在。

    此时查看中间的slave的redis是什么状态
    仍然是属于从机的角色!!也连接着一台从机

此时如果遇到主机宕机,会出现什么情况??

两台从机哪一台会变成maser??


<mark>可见两台服务器仍然是从机,那么如何将它们之一变成主机呢??</mark>

只需要执行 slaveof no one即可,就拥有了写的操作!

8.3 Redis哨兵模式

传统的集群设计,层层链路结构,如果主机宕机,需要我们手动去开启新的主机,显然这是非常麻烦,且费时费力,还会造成一段时间服务器不可用,有没有那种自动开启?有!那就是哨兵模式!

哨兵模式由redis 2.8引入,是一种特殊的模式,哨兵是一个独立的进程,它会独立运行**,其原理是哨兵通过发送命令,来等待服务器响应,从而监控多个redis实例。**

运行原理图:

仍然以一主二从为例,配置哨兵的配置文件和启动哨兵:

如果主机宕机后:

6381端口的从机变成主机!
测试宕机的主机请重新起来:

<mark>需要注意的是:如果是宕机后的机子重新回来,那么它也只会当从机,因为主机已经被他人占用的了,这个过程有点像谋朝篡位</mark>

这里的哨兵由两个作用:

  • 通过发送命令,让redis返回其监控其运行状态,包括主服务器和从服务器。
  • 当哨兵检测到maser宕机后,会自动将slave转成master,并发布订阅模式通知其它的从服务器修改配置文件,让它们切换主机!

8.4 多哨兵模式

面对复杂一点的架构,一个哨兵往往是不能满足需求的,这个时候需要引入多哨兵模式,如图:

多个哨兵除了监视主机外,还互相监督,确保多集群服务器的正常工作。
<mark>原理</mark>:如果哨兵1检测到master宕机了,服务器不会立马选举新的master,而是哨兵1进入主观下线,由于多个服务器同时监控着master,这个宕机的消息也会传到多个哨兵,当主观下线的服务器达到一定数量时,哨兵之间会发起投票,投票结果由其中一个哨兵发出,然后将票数最高的从机角色转成master,(故障转移),最后,其它的各个哨兵把自己监控的从服务器切换主机,这个过程称之为客观下线,这就完成了一次完整的主备的切换,然后怎么样,智能吧!

8.5 哨兵模式的优缺点

优点:

  • 哨兵集群,基于主从配置,所有的主从配置优点,它都有
  • 主从可以切换,故障可以转移,系统的可用性更好
  • 哨兵模式就是主从模式的升级,手动到自动,更加健壮!

缺点:

  • 在线扩容难度大,集群数据到达一定量,扩容会非常麻烦!
  • 集群到达一定量,哨兵模式的配置文件也会复杂起来

哨兵模式的配置参考

# Example sentinel.conf
# 哨兵sentinel实例运行的端口 默认26379
port 26379
# 哨兵sentinel的工作目录
dir /tmp
# 哨兵sentinel监控的redis主节点的 ip port
# master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。
# quorum 配置多少个sentinel哨兵统一认为master主节点失联 那么这时客观上认为主节点失联了
# sentinel monitor <master-name> <ip> <redis-port> <quorum>
sentinel monitor mymaster 127.0.0.1 6379 2
# 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供
密码
# 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码
# sentinel auth-pass <master-name> <password>
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd
# 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒
# sentinel down-after-milliseconds <master-name> <milliseconds>
sentinel down-after-milliseconds mymaster 30000
# 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同步,
这个数字越小,完成failover所需的时间就越长,
但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。
可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。
# sentinel parallel-syncs <master-name> <numslaves>
sentinel parallel-syncs mymaster 1
# 故障转移的超时时间 failover-timeout 可以用在以下这些方面:
#1. 同一个sentinel对同一个master两次failover之间的间隔时间。
#2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那
里同步数据时。
#3.当想要取消一个正在进行的failover所需要的时间。
#4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,
slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了
# 默认三分钟
# sentinel failover-timeout <master-name> <milliseconds>
sentinel failover-timeout mymaster 180000
# SCRIPTS EXECUTION
#配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知
相关人员。
#对于脚本的运行结果有以下规则:
#若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10
#若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。
#如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。
#一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。
#通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),
将会去调用这个脚本,这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信
息。调用该脚本时,将传给脚本两个参数,一个是事件的类型,一个是事件的描述。如果sentinel.conf配
置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无
法正常启动成功。
#通知脚本
# shell编程
# sentinel notification-script <master-name> <script-path>
sentinel notification-script mymaster /var/redis/notify.sh
# 客户端重新配置主节点参数脚本
# 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已
经发生改变的信息。
# 以下参数将会在调用脚本时传给脚本:
# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
# 目前<state>总是“failover”,
# <role>是“leader”或者“observer”中的一个。
# 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通
信的
# 这个脚本应该是通用的,能被多次调用,不是针对性的。
# sentinel client-reconfig-script <master-name> <script-path>
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh # 一般都是由运维来配
置!

9. Redis缓存穿透,击穿和雪崩(面试重点)

缓存穿透和缓存雪崩都是属于服务器的高可用问题。

Rdis的使用,极大的提升了应用程序的性能和效率,特别是在查询方面,但同时也引入了新的问题,最要害的是,解决数据的一致性问题,高并发数据时,如果对数据一致性要求比较高,就不能使用缓存!

还有另外一系列问题,例如,缓存穿透,缓存雪崩,缓存击穿,目前业界也有比较流行的解决方案。

9.1 缓存穿透

看一个概念图:

概念很简单:其实就是客户端如果要去查询数据,如果配置了缓存,肯定会先走缓存,此时缓存没有查到或者说缓存没有命中(该场景常出现在秒杀系统,而你总是没有杀到),就回去走数据库,但是数据库其实也没有这条数据,如果此时数据的访问量过大,就会给DB造成太大的压力,简单的说就是绕过缓存直接访问DB,此时缓存的设计就显得可有可无了,这就是缓存穿透

存在的问题1:
如果遇到一个低端的黑客,无数次发送id = -1的请求去查询数据库信息,类似于一种DDOS攻击,显然在缓存和数据库中都不会存在,这样一条数据!
0)
数据量大的时候,这就造成了缓存可能会被穿透

解决方案

低端黑客请求第一次走的缓存,当缓存查询数据不存在时,这个时候走数据库,显然数据库也是没有的,所有返回一个data = null的数据存入缓存,下次请求肯定先访问缓存时,就会直接拿到null的值,也不会走数据库了,缓存击穿的风险就被解决了!)

存在的问题2

如果遇到一个高端点的黑客,它不在发送id = -1的请求了,而是利用DDOS分布式攻击,发送id = UUID的请求。此时我们想,如果也是想上面那样存入缓存中,那么缓存是一个内存数据库,况且id的值是变化的而且唯一,缓存中能存如这么大的数据量吗,这种做法肯定是不被允许的!

又如果黑客的的请求条件比较苛刻,例如where id = uuid and score = xx…
可选的解决方案是讲db上面的数据同步到过滤器上,但是此时此时的过滤器负载又要太大,也是不被允许的!

所以,我们想要的是一个轻量级的过滤器!!

采用布隆过滤器
布隆过滤器是一种数据结构,对可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了底层存储系统的查询压力!

布隆算法是一种数据标识算法,采用错误率换取空间。

原理:假设数组长度10,过滤器上id = 1的数据通过hash函数,在bit数组的1号位置被标示,id = 10的也在1号位置被标识,id = 100的在4号位置被标示。此时如果c3黑客查询条件为id = 100,经过过滤器,看到4号被标识,说明可以数据库查询,但是如果cl1和c2分别发送id = 1和id = 10,经过过滤器的hash运算发现都被标识为1,也不一定能进入数据库查询,因为此时发送了hash碰撞,id = 1或id =10,不一定会在数据库中,因此拦截器可能会把它们拦截!!

<mark>小结:</mark>
布隆算法的错误率体现在:

  • 如果,过滤器告诉你数据存在,那么它可能不存在,因为可能发生了hash碰撞
  • 如果它告诉你数据不存在,那么它肯定不存在!
  • 错误率在实际情况下,影响不大,因为缓存穿透的风险已经被大大降低了。

布隆算法的错误率受什么影响:

  • 数组长度
  • hash函数个数

但是!布隆过滤器是在什么时刻开始标识数据的?在用户插入数据 or 数据库同步过去?

显然,显示开发中选的是第二种,是在用户插入完到数据库后,数据库开启异步线程,向过滤器同步数据,期间可设置一个时间策略,一段时间更新一次。
代码实现(bitmap)

后面再过来补充

9.2 缓存击穿

缓存击穿是一种特殊的缓存穿透


概念其实也很简单!就是客户端发送请求查询某一条热点数据,但是在缓存中,热点数据已经过期了,此时客户端就不会走缓存,而直接查询DB,如果数据量请求过大,就会造成缓存击穿!!

解决方案

  1. 设置热点数据永不过期,这样就只会走缓存了,但是这种情况如果用在热点数据较多的场景下,对缓存无疑是一种负担!
  2. 使用分布式锁
    <mark>分布式锁能够解决多个节点上多个进程之间的排队问题!</mark>

    分布式锁lock的运用:
    当多个客户端没有查询到热点数据时,直接拐向锁,获取到所之后,再去访问DB,如此就可以降低缓存被击穿的风险。
    其中可能有死锁的风险:就是如果某一个客户端在拿到锁之后宕机了,还没到释放锁,其它客户端在没拿到锁,也就访问不到DB了。
    解决方案:另起一个线程监控锁资源,如果达到一定时间客户端没有是释放锁,这个线程就去释放锁,这个时间要适当,不能太长也不能太短,太短容易出现数据的一致性问题(并发冲突),太长单位时间内的效率就会降低!

9.3 缓存雪崩

缓存雪崩,也是缓存穿透的另外一种表现形式,指的是在某一时间段内,缓存多条数据集中过期失效,redis宕机!

一个例子:淘宝双11晚上0点,那些热门降价的商品会先放到缓存中,假设缓存都设置为1h,从0点开始抢购后,到1点缓存中的商品就过期了,此时如果还存在大量的客户抢购,就会直接走存储层,也就是数据库,就会造成缓存雪崩!

更多情况下,是redis因为某一结点,或者服务器宕机不能使用,导致某一时间段,大量数据涌入数据库造成的缓存雪崩!

解决方案

  • Redis挂了导致的缓存雪崩:搭建redis集群,如果某一台主要redis服务器挂了,还可以进行主备切换!
  • 限流降低:通过锁和队列来控制数据库写入缓存的数量,比如一个key只允许一个线程查询和访问
  • 有效期一致导致的缓存雪崩:给每条数据设置随机的过期时间!

9.4 三者区别

<mark>重点在于理解缓存穿透,缓存击穿,缓存雪崩之间的区别和各自的解决方法!!</mark>

参考学习:https://www.bilibili.com/video/BV1S54y1R7SB
https://www.bilibili.com/video/BV1xf4y1278g?p=1