Redis缓存更新策略与缓存穿透、雪崩等问题的解决
一、缓存更新策略
1、三种策略
- 内存淘汰:redis自带的内存淘汰机制
- 过期淘汰:利用expire命令给数据设置过期时间
- 主动更新:主动完成数据库和缓存的同时更新
2、策略选择
- 低一致性需求:内存淘汰或过期淘汰
- 高一致性需求:主动更新为主,过期淘汰兜底
3、主动更新的方案
- Cache Aside:缓存调用者在更新数据库的同时完成对缓存的更新
- 一致性良好
- 实现难度一般
- Read/Write Through:缓存与数据库成为一个服务,服务保证两者的一致性,对外暴露的API接口。调用者调用API,无需知道自己操作的数据库还是缓存,不关心一致性
- 一致性优秀
- 实现复杂
- 性能一般
- Write Back:缓存调用者的CRUD都针对缓存完成。由独立线程异步的将缓存写到数据库,实现最终一致
- 一致性差
- 性能好
- 实现复杂
二、缓存存在的问题
1、缓存穿透
产生原因:客户端请求的数据在缓存和数据库中都不存在。当这种情况大量出现或被恶意攻击时,接口的访问全部透过Redis访问数据库,而数据库中也没有这些数据,我们称这种现象为"缓存穿透"。
解决方案:
- 缓存空对象:对于不存在的数据也在Redis建立缓存,值为空,设置一个较短的TTL时间
- 优点:实现简单,维护方便
- 缺点:额外消耗内存,短期的数据不一致
- 布隆过滤:利用布隆过滤算法,在请求Redis之前先判断是否存在,如果不存在则直接拒绝访问
- 优点:内存占用少
- 缺点:实现复杂,存在误判的可能性
- 其他方法:
- 做好数据的基础格式校验
- 加强用户权限校验
- 做好热点数据的限流
布隆过滤器:
一种数据结构,由一串很长的二进制向量组成,可以将其看成一个二进制数组。
当要向布隆过滤器中添加一个元素key时,我们通过多个hash函数,算出一个值,然后将这个值所在的方格置为1。
因为多个不同的数据通过hash函数算出来的结果是会有重复的,所以布隆过滤器可以判断某个数据一定不存在,但是无法判断一定存在。
优点:优点很明显,二进制组成的数组,占用内存极少,并且插入和查询速度都足够快。
缺点:随着数据的增加,误判率会增加;还有无法判断数据一定存在;另外还有一个重要缺点,无法删除数据。
2、缓存雪崩
产生原因:在同一时间段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力
解决方案:
- 给不同的Key的TTL设置随机值
- 利用Redis集群提高服务的可用性
- 诶缓存业务添加降级限流策略
- 给业务添加多级缓存
3、缓存击穿
产生原因:热点Key在某一个时间段被高并发访问,而此时Key正好过期,如果重建缓存时间耗时长,在这段时间内大量请求剾数据库,带来巨大冲击
解决方案:
- 设置value永不过期:通过定时任务进行数据库查询更新缓存,当然前提时不会给数据库造成压力过大
- 优点:最可靠,性能好
- 缺点:占空间,内存消耗大,一致性差
- 互斥锁:给缓存重建过程加锁,确保重建过程只有一个线程执行,其他线程等待
- 优点:实现简单,没有额外内存消耗,一致性好
- 缺点:等待导致性能下降,有死锁风险
- 逻辑过期:热点Key缓存永不过期,认识设置一个逻辑过期时间,查询到数据时通过对逻辑时间判断,来决定是否需要进行缓存重建。重建过程也通过互斥锁来保证单线程执行。利用独立线程异步执行,其他线程无需等待,直接查询到旧的数据即可。
- 优点:线程无需等待,性能较好
- 缺点:不保证一致性,有额外内存消耗,实现复杂
private final RedisTemplate<String, String> redisTemplate;
private static final ExecutorService CACHE_REBUILD_EXECUTOR = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(20), r -> new Thread(r, "cache_rebuild"));
public CacheClient(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
public void setWithLogicalExpire(String key, Object value, Long expireTime, TimeUnit unit) {
// 设置逻辑过期时间
RedisData redisData = new RedisData();
redisData.setValue(value);
redisData.setExpireTime(LocalDateTime.now().plusNanos(unit.toNanos(expireTime)));
redisTemplate.opsForValue().set(key, JSON.toJSONString(redisData));
}
/**
* 逻辑过期,互斥锁获取值,用于避免热点数据出现缓存击穿
*/
public <R, V> R getMutex(String keyPrefix, V id, Class<R> clazz, Function<V, R> dbFallback, Long expireTime, TimeUnit unit) {
String key = keyPrefix + id;
String value = redisTemplate.opsForValue().get(key);
if (StringUtils.isBlank(value)) {
return null;
}
RedisData redisData = JSON.parseObject(value, RedisData.class);
R result = JSONUtil.toBean((JSONObject) redisData.getValue(), clazz);
if (redisData.getExpireTime().isAfter(LocalDateTime.now())) {
return result;
}
// 如果缓存已过期,则尝试更新
String localKey = RedisConstant.LOCK + id;
// 获取锁成功
if (getLock(localKey)) {
// 异步更新缓存
CACHE_REBUILD_EXECUTOR.submit(
() -> {
try {
R res = dbFallback.apply(id);
this.setWithLogicalExpire(key, res, expireTime, unit);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
unLock(localKey);
}
}
);
}
return result;
}
private boolean getLock(String key) {
// 直接返回会进行自动拆箱,可能会出现空指针异常
return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, "1"));
}
private void unLock(String key) {
redisTemplate.delete(key);
}
三、解决缓存问题
1、自定义分布式锁
/**
* <pre>
* 简易实现的Redis分布式锁
* </pre>
*
* @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>
* @date 2023/2/26 21:18
*/
public class SimpleRedisLock {
private final RedisTemplate<String, String> redisTemplate;
/**
锁的名字,根据业务设置
*/
private final String lockName;
/**
* key前缀
*/
private static final String KEY_PREFIX = "lock:";
/**
* value中线程标识的前缀(为每个节点提供一个随机的前缀,避免集群部署下线程id出现重复而导致value出现相同的情况)
*/
private static final String ID_PREFIX = UUID.fastUUID().toString(true);
/**
* 释放锁逻辑的lua脚本
*/
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
public SimpleRedisLock(String lockName, RedisTemplate<String, String> redisTemplate) {
this.lockName = lockName;
this.redisTemplate = redisTemplate;
}
public boolean tryLock(long timeoutSec) {
long threadId = Thread.currentThread().getId();
// 返回的是Boolean类型,直接return会进行自动拆箱,可能会出现空指针异常
// 需要为锁设置过期时间,防止因服务宕机而导致锁无法释放
return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + lockName, ID_PREFIX + threadId, timeoutSec, TimeUnit.SECONDS));
}
public void unlock() {
redisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + lockName),
ID_PREFIX + Thread.currentThread().getId()
);
}
}
Lua脚本——unlock.lua
--- 比较线程标识与锁中的标识是否一致 if(redis.call('get', KEYS[1]) == ARGS[1]) then --- 释放锁 return redis.call('del', KEYS[1]) end return 0
使得释放锁的操作具有原子性
Redis是单线程处理,本身不会存在并发问题,但是由于可能有多个客户端访问,每个客户端会有一个线程,之间存在竞争,所以服务端收到的指令有可能出现多个客户端的指令穿插,而lua脚本可以保证多条指令的原子性从而解决并发问题
2、解决缓存穿透问题
/**
* 避免缓存穿透的获取
*/
public <R, V> R get(String keyPrefix, V id, Class<R> clazz, Function<V, R> dbFallback, Long expireTime, TimeUnit unit) {
String key = keyPrefix + id;
// 查询缓存
String value = redisTemplate.opsForValue().get(key);
// 缓存存在则直接返回
if (StringUtils.isNotBlank(value)) {
return JSON.parseObject(value, clazz);
}
// 缓存不存在(到此处说明value要么是空,要么是null)
if (value != null) {
// 不为null则说明为“”,代表数据不存在,直接返回null,不用查询数据库(解决缓存穿透问题)
return null;
}
// value为null则查询数据库获取数据进行更新
R result = dbFallback.apply(id);
if (result == null) {
// 数据库查询不到结果,则存入空串避免缓存穿透
redisTemplate.opsForValue().set(key, "", RedisConstant.CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}
// 查询到结果,写回缓存
this.set(key, result, expireTime, unit);
return result;
}
3、解决缓存击穿问题
/**
* 逻辑过期,互斥锁获取值,用于避免热点数据出现缓存击穿
*/
public <R, V> R getMutex(String keyPrefix, V id, Class<R> clazz, Function<V, R> dbFallback, Long expireTime, TimeUnit unit) {
String key = keyPrefix + id;
String value = redisTemplate.opsForValue().get(key);
if (StringUtils.isBlank(value)) {
return null;
}
RedisData redisData = JSON.parseObject(value, RedisData.class);
R result = JSONUtil.toBean((JSONObject) redisData.getValue(), clazz);
if (redisData.getExpireTime().isAfter(LocalDateTime.now())) {
return result;
}
// 如果缓存已过期,则获取锁尝试更新
SimpleRedisLock lock = new SimpleRedisLock(key, redisTemplate);
// 获取锁成功
if (lock.tryLock(5)) {
// 异步更新缓存
CACHE_REBUILD_EXECUTOR.submit(
() -> {
try {
R res = dbFallback.apply(id);
this.setWithLogicalExpire(key, res, expireTime, unit);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
);
}
return result;
}
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 Ken·勇者の小栈
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果