Redis缓存穿透的几种解决方案

Redis 缓存穿透的几种解决方案

缓存穿透是 Redis 使用中的经典问题。当大量请求查询不存在的数据时,缓存无法命中,请求直接打到数据库,可能导致数据库崩溃。我在生产环境中实施过多种解决方案,这里分享一些实战经验。

问题分析

缓存穿透的典型场景

  1. 恶意攻击:故意查询不存在的数据
  2. 业务逻辑缺陷:代码 bug 导致查询无效 key
  3. 数据不一致:缓存与数据库数据不同步

影响评估

1
2
3
4
5
6
7
# 监控缓存命中率
redis-cli info stats | grep keyspace_hits
# keyspace_hits:1000000
# keyspace_misses:500000
# 命中率 = hits / (hits + misses) = 66.7%

# 正常情况下命中率应该在90%以上

解决方案对比

方案 实现难度 内存开销 误判率 适用场景
缓存空值 小规模、明确的 null 值
布隆过滤器 大规模、只读场景
缓存预热 数据量可控的场景
请求校验 有业务规则的场景

方案一:缓存空值

实现方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Service
public class UserService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private UserMapper userMapper;

private static final String NULL_CACHE_VALUE = "NULL";
private static final int NULL_CACHE_TTL = 300; // 5分钟

public User getUser(Long userId) {
String key = "user:" + userId;

// 1. 先查缓存
Object cached = redisTemplate.opsForValue().get(key);
if (cached != null) {
if (NULL_CACHE_VALUE.equals(cached)) {
return null; // 缓存的空值
}
return (User) cached;
}

// 2. 查数据库
User user = userMapper.selectById(userId);

// 3. 写入缓存
if (user != null) {
redisTemplate.opsForValue().set(key, user, 3600, TimeUnit.SECONDS);
} else {
// 缓存空值,设置较短TTL
redisTemplate.opsForValue().set(key, NULL_CACHE_VALUE, NULL_CACHE_TTL, TimeUnit.SECONDS);
}

return user;
}
}

优化点

  • 差异化 TTL:空值缓存设置更短的过期时间
  • 空值标识:使用特殊标识而不是 null,避免序列化问题
  • 内存控制:定期清理过期的空值缓存

方案二:布隆过滤器

Redis + Lua 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
-- bloom_add.lua
local key = KEYS[1]
local value = ARGV[1]
local hash_count = tonumber(ARGV[2])
local bit_size = tonumber(ARGV[3])

for i = 1, hash_count do
local hash = redis.call('EVAL', 'return redis.sha1hex(ARGV[1] .. ARGV[2])', 0, value, i)
local bit_pos = tonumber(string.sub(hash, 1, 8), 16) % bit_size
redis.call('SETBIT', key, bit_pos, 1)
end

return 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- bloom_exists.lua
local key = KEYS[1]
local value = ARGV[1]
local hash_count = tonumber(ARGV[2])
local bit_size = tonumber(ARGV[3])

for i = 1, hash_count do
local hash = redis.call('EVAL', 'return redis.sha1hex(ARGV[1] .. ARGV[2])', 0, value, i)
local bit_pos = tonumber(string.sub(hash, 1, 8), 16) % bit_size

if redis.call('GETBIT', key, bit_pos) == 0 then
return 0 -- 肯定不存在
end
end

return 1 -- 可能存在

Java 客户端封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Component
public class BloomFilter {

@Autowired
private RedisTemplate<String, String> redisTemplate;

private final int hashCount = 3;
private final int bitSize = 1000000; // 100万bit ≈ 125KB

public void add(String key, String value) {
redisTemplate.execute((RedisCallback<Long>) connection -> {
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptText(bloomAddScript);
script.setResultType(Long.class);

return connection.eval(
script.getScriptAsString().getBytes(),
1,
key.getBytes(),
value.getBytes(),
String.valueOf(hashCount).getBytes(),
String.valueOf(bitSize).getBytes()
);
});
}

public boolean mightContain(String key, String value) {
Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptText(bloomExistsScript);
script.setResultType(Long.class);

return connection.eval(
script.getScriptAsString().getBytes(),
1,
key.getBytes(),
value.getBytes(),
String.valueOf(hashCount).getBytes(),
String.valueOf(bitSize).getBytes()
);
});

return result != null && result == 1;
}
}

布隆过滤器应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Service
public class ProductService {

@Autowired
private BloomFilter bloomFilter;

private static final String BLOOM_KEY = "product:bloom";

public Product getProduct(Long productId) {
// 1. 布隆过滤器检查
if (!bloomFilter.mightContain(BLOOM_KEY, productId.toString())) {
return null; // 肯定不存在
}

// 2. 查缓存
String cacheKey = "product:" + productId;
Object cached = redisTemplate.opsForValue().get(cacheKey);
if (cached != null) {
return (Product) cached;
}

// 3. 查数据库
Product product = productMapper.selectById(productId);
if (product != null) {
redisTemplate.opsForValue().set(cacheKey, product, 3600, TimeUnit.SECONDS);
}

return product;
}

@PostConstruct
public void initBloomFilter() {
// 启动时将所有商品ID加入布隆过滤器
List<Long> productIds = productMapper.selectAllIds();
for (Long id : productIds) {
bloomFilter.add(BLOOM_KEY, id.toString());
}
}
}

方案三:分布式锁 + 双重检查

防止缓存击穿

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Service
public class OrderService {

@Autowired
private RedissonClient redissonClient;

public Order getOrder(Long orderId) {
String cacheKey = "order:" + orderId;

// 1. 查缓存
Object cached = redisTemplate.opsForValue().get(cacheKey);
if (cached != null) {
return (Order) cached;
}

// 2. 获取分布式锁
String lockKey = "lock:order:" + orderId;
RLock lock = redissonClient.getLock(lockKey);

try {
// 尝试获取锁,最多等待1秒,锁10秒后自动释放
if (lock.tryLock(1, 10, TimeUnit.SECONDS)) {
// 3. 双重检查
cached = redisTemplate.opsForValue().get(cacheKey);
if (cached != null) {
return (Order) cached;
}

// 4. 查数据库
Order order = orderMapper.selectById(orderId);

// 5. 写缓存
if (order != null) {
redisTemplate.opsForValue().set(cacheKey, order, 1800, TimeUnit.SECONDS);
} else {
// 缓存空值,防止穿透
redisTemplate.opsForValue().set(cacheKey, "NULL", 300, TimeUnit.SECONDS);
}

return order;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}

// 获取锁失败,直接查数据库(降级策略)
return orderMapper.selectById(orderId);
}
}

方案四:参数校验

业务层拦截

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Component
public class RequestValidator {

// 用户ID校验规则
public boolean isValidUserId(Long userId) {
return userId != null && userId > 0 && userId < 10000000000L;
}

// 商品ID校验规则
public boolean isValidProductId(String productId) {
return StringUtils.isNotBlank(productId) &&
productId.matches("^PRD[0-9]{10}$");
}

// 订单号校验规则
public boolean isValidOrderNo(String orderNo) {
return StringUtils.isNotBlank(orderNo) &&
orderNo.matches("^ORD[0-9]{8}[A-Z]{2}[0-9]{6}$");
}
}

@RestController
public class UserController {

@Autowired
private RequestValidator validator;

@GetMapping("/users/{userId}")
public ResponseEntity<User> getUser(@PathVariable Long userId) {
// 参数校验
if (!validator.isValidUserId(userId)) {
return ResponseEntity.badRequest().build();
}

User user = userService.getUser(userId);
return ResponseEntity.ok(user);
}
}

监控和告警

缓存穿透监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
public class CacheMetrics {

private final Counter cacheMissCounter = Counter.build()
.name("cache_miss_total")
.help("Total cache misses")
.labelNames("cache_name", "key_type")
.register();

private final Counter cachePenetrationCounter = Counter.build()
.name("cache_penetration_total")
.help("Total cache penetrations")
.labelNames("cache_name")
.register();

public void recordCacheMiss(String cacheName, String keyType) {
cacheMissCounter.labels(cacheName, keyType).inc();
}

public void recordCachePenetration(String cacheName) {
cachePenetrationCounter.labels(cacheName).inc();
}
}

告警规则

1
2
3
4
5
6
7
8
9
10
11
12
# Prometheus告警规则
- alert: HighCacheMissRate
expr: rate(cache_miss_total[5m]) / rate(cache_requests_total[5m]) > 0.3
for: 2m
annotations:
summary: "Cache miss rate > 30%"

- alert: CachePenetrationAttack
expr: rate(cache_penetration_total[1m]) > 100
for: 1m
annotations:
summary: "Possible cache penetration attack"

最佳实践总结

  1. 多层防护:结合多种方案,建立纵深防御体系
  2. 监控先行:先建监控,再优化性能
  3. 渐进演化:从简单方案开始,根据业务发展逐步优化
  4. 成本平衡:在防护效果和系统成本间找到平衡点

缓存穿透的解决需要结合具体业务场景选择合适的方案。记住,没有银弹,只有最适合的解决方案。