为了系统性能的提升,我们一般会将部分数据放入缓存中,加速访问。db承担数据落盘工作。
哪些数据适合存入缓存:
举例:电商类应用,商品分类,商品列表等适合缓存并加一个失效时间(根据数据更新频率 来定),后台如果发布一个商品,买家需要 5 分钟才能看到新的商品一般还是可以接受的。

缓存分为 本地缓存与分布式缓存,在单机情况下,本地缓存很快,推荐。但是在分布式模式下,本地缓存会有两个问题。
本地缓存在分布式模式下的问题:

分布式缓存
由本地缓存在分布式模式下产生的问题,解决方法很简单,将缓存提出来作为一个中间件供各个微服务使用即可。也就是大名鼎鼎的redis

服务器启动Redis

依赖:
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-data-redisartifactId>
dependency>
redis配置

使用spring boot自动配置好的 StringRedisTemplate来操作redis

测试

@Autowired
StringRedisTemplate stringRedisTemplate;
@Test
public void testStringRedisTemplate(){
ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
ops.set("ribbit hunts"," cat! " + UUID.randomUUID().toString());
System.out.println("what does ribbit hunt? "+ ops.get("ribbit hunts"));
}

使用redis远程工具查看

优化代码:


重启访问:
获取成功

redis也有数据

如果不设定直接内存,默认和Java堆最大值一致。


需要注意的如果在-Xmx:300m时运行几分钟都不报错,应该是使用的新版本,像我的两万多个还是0异常

如果一启动就报上面的直接内存溢出异常,使用的是老版本了,最简单的方法就是升级依赖。
这里可以留个心眼,可能会用不到,但是在遇到一些旧项目时,可能会需要用到。但是这里我比较好奇为什么我的吞吐量这么小,博主比较笨,希望读者帮我解决一下。
缓存穿透:举例:假设100万数据并发访问到缓存中为空,那么这100万并发数据就会访问到数据库中,缓存也就没有意义了。

缓存雪崩:在为解决缓存穿透加上过期时间后,如果一段时间后缓存全面同时失效、雪崩,同时将压力加到db上。

缓存击穿:热点的key在过期时正好被高并发请求。


由于分布式锁的性能比本地锁要低,这里使用本地锁也是不错的,就算由一百个服务同时访问数据库,数据库也能承担。

测试,先删掉redis中之前的缓存。
查询了两次数据库,按照我们的代码来说应该是直查一次的。

这里就涉及到一个锁的时序问题了
在第一个查询数据库的线程查询完成了,就会释放锁,然后再将数据放到缓存中,在一号线程释放锁之后、存数据到缓存之前,二号线程已经获取到了锁,并进行查询数据库了。

解决方法也很简单,扩大锁的范围,把存数据到缓存这一步放到锁里就好了,确保查询数据库、存入缓存之后再放锁。

清除redis中的缓存,测试:仅查询一次

前面虽然用到了本地锁来解决问题,但是在某些情况下,本地锁会产生一些问题,所以使用分布式锁来解决分布式模式下的缓存问题。

redis 中有一个命令set NX:

public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
//占分布式锁 去Redis占坑
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1");
if (lock){
//加锁成功 查询
Map<String, List<Catelog2Vo>> dataFromDB = getDataFromDB();
//删锁
redisTemplate.delete("lock");
return dataFromDB;
}else {
//加锁失败 重试
//休眠100ms重试
return getCatalogJsonFromDbWithRedisLock();//自旋的方式
}
}
上面代码存在一个很严重的问题:如果在加锁成功,查询的时候,抛出异常了,没有执行删锁,就会造成死锁。
解决:设置锁的过期时间。

为了保证获取到锁和设置过期时间的原子性,在redis的set命令中有NX可以设置过期时间,下面是StringRedisTemplate里的方法

总结:要保证创建锁和设置过期时间的原子性,可以用set NX EX 一条命令设置
在业务时间大于锁的过期时间时,第一个线程在业务处理完之后,删除的锁时第二个甚至第三、四个线程的锁。
需要将value设置为uuid,删锁前判断是不是当前加的锁。
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
//占分布式锁 去Redis占坑
String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);
if (lock){
// 加锁成功 查询
Map<String, List<Catelog2Vo>> dataFromDB = getDataFromDB();
String lockValue = redisTemplate.opsForValue().get("lock");
if (uuid.equals(lockValue)){
//删锁
redisTemplate.delete("lock");
}
return dataFromDB;
}else {
//加锁失败 重试
//休眠100ms重试
return getCatalogJsonFromDbWithRedisLock();//自旋的方式
}
}
还有一种情况就是,当业务在查询到锁的value值之后,redis发送value给业务服务器的路上,锁过期了。那么这样删除的锁同样还是其他线程的。
解决:使用 lua 脚本删锁,保证操作的原子性
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
//占分布式锁 去Redis占坑
String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);
if (lock){
// 加锁成功 查询
Map<String, List<Catelog2Vo>> dataFromDB;
try {
dataFromDB = getDataFromDB();
}finally {//无论业务是否抛出异常 都执行删锁操作
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<Long>(script,Long.class),
Arrays.asList("lock"),uuid);
}
return dataFromDB;
}else {
//加锁失败 重试
//休眠100ms重试
return getCatalogJsonFromDbWithRedisLock();//自旋的方式
}
}
官方文档:1. 概述 · redisson/redisson Wiki (github.com)
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map…)
Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
导入依赖
<dependency>
<groupId>org.redissongroupId>
<artifactId>redissonartifactId>
<version>3.12.0version>
dependency>
MyRedissonConfig.java:
@Configuration
public class MyRedissonConfig {
/**
* 所有对 Redisson 的使用都是通过 RedissonClient 对象
* @return
* @throws IOException
*/
@Bean(destroyMethod = "shutdown")
public RedissonClient redisson() throws IOException{
//1、创建配置
Config config = new Config();
config.useSingleServer().setAddress("redis://101.200.238.161:6379");
//2、根据Config 创建出 RedissonClient
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
}
看门狗:锁的自动续期,如果业务时间超长,在业务运行期间自动给锁续上30s。

@ResponseBody
@GetMapping("/redisson")
public void redisson(){
//1、获取一把锁 只要锁的名字一样就是同一把锁
RLock myLock = redissonClient.getLock("myLock");
//2、加锁
myLock.lock();//阻塞式等待
try {
System.out.println("加锁成功,执行业务" + Thread.currentThread().getId());
Thread.sleep(30000);
}catch (Exception e){
}finally {
//3、解锁
System.out.println("释放锁" + Thread.currentThread().getId());
myLock.unlock();
}
}
看门狗原理,如何解决死锁:

它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。
=========不设置过期时间 使用默认过期时间=========
RLock fairLock = redisson.getFairLock("anyLock");
// 最常见的使用方法
fairLock.lock();
=========设置过期时间========
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
fairLock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
...
fairLock.unlock();
基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();
=========设置过期时间==========
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开
lock.lock(10, TimeUnit.SECONDS);
// 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();
==========设置过期时间==========
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开
lock.lock(10, TimeUnit.SECONDS);
// 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();//读锁
// 或
rwlock.writeLock().lock();//写锁
==========设置过期时间===========
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
信号量也可以用作分布式限流
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();
设置的计数为0,阻塞停止
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();//等待计数为0
// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();//计数减一
如果缓存中已经存在数据,但是数据在数据库被修改,那么如何保证缓存的数据与数据库的数据有一致性?
双写模式:

失效模式:

解决方法:

spring可以根据不同的数据存储方式构造不同的缓存处理器:如 ConcurrentHashMap 、Redis 、MongoDB等。
缓存处理器接口提供了根据名字查询缓存、查询全部缓存的方法。缓存管理器下的组件在Cache 接口提供了对缓存的增删改查的方法。

<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-cacheartifactId>
dependency>
查看redis 作为缓存整合的类

spring:
cache:
type: redis
设置缓存的几个注解:
@Cacheable: Triggers cache population.@CacheEvict: Triggers cache eviction.@CachePut: Updates the cache without interfering with the method execution.@Caching: Regroups multiple cache operations to be applied on a method.@CacheConfig: Shares some common cache-related settings at class-level.


在生成缓存时的一些默认行为:
设置key: @Cacheable(value = {"category"},key = "'level1Categorys'") 、@Cacheable(value = {"category"},key = "#root.method.name")
设置缓存过期时间:spring.cache.redis.time-to-live=3600000

一些可以动态获取的属性

创建一个 RedisCacheConfiguration 放到容器中就能生效
@EnableConfigurationProperties(CacheProperties.class)
@Configuration
@EnableCaching
public class MyCacheConfig {
@Bean
RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties){
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
// key 序列化
config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
// value 序列化
config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
// 配置文件中配置
CacheProperties.Redis redisProperties = cacheProperties.getRedis();
if (redisProperties.getTimeToLive() != null) {
config = config.entryTtl(redisProperties.getTimeToLive());
}
if (redisProperties.getKeyPrefix() != null) {
config = config.prefixCacheNameWith(redisProperties.getKeyPrefix());
}
if (!redisProperties.isCacheNullValues()) {
config = config.disableCachingNullValues();
}
if (!redisProperties.isUseKeyPrefix()) {
config = config.disableKeyPrefix();
}
return config;
}
}
一些配置:
#配置 redis 缓存
spring:
cache:
type: redis
redis:
key-prefix: CACHE_
#开启缓存空值 防止缓存穿透
cache-null-values: true
time-to-live: 36000
json、过期时间、前缀都设置成功:

失效模式
@CacheEvict(value = "category",key = "#root.methodName")
@Transactional
@Override
public Map<String,List<Catelog2Vo>> getCatalogJson(){
// 查询全部的分类
List<CategoryEntity> selectList = baseMapper.selectList(null);
//1 查出所有1级分类
List<CategoryEntity> level1Catrgorys = getParent_cid(selectList, 0L);
//2 封装分类
Map<String, List<Catelog2Vo>> parent_cid = level1Catrgorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> {
//1 查询二级分类
List<CategoryEntity> categoryEntities = getParent_cid(selectList, v.getCatId());
//2 封装上面的结果
List<Catelog2Vo> catelog2Vos = null;
if (categoryEntities != null) {
// map封装
catelog2Vos = categoryEntities.stream().map(l2 -> {
Catelog2Vo catelog2Vo = new Catelog2Vo(v.getCatId().toString(), null, l2.getCatId().toString(), l2.getName());
//1 找当前二级分类的三级分类封装成vo
List<CategoryEntity> level3Catelog = getParent_cid(selectList, l2.getCatId());
if (level3Catelog != null) {
List<Catelog2Vo.Catelog3Vo> collect = level3Catelog.stream().map(l3 -> {
//2 封装成指定格式
Catelog2Vo.Catelog3Vo catalog3Vo = new Catelog2Vo.Catelog3Vo(l2.getCatId().toString(), l3.getCatId().toString(), l3.getName());
return catalog3Vo;
}).collect(Collectors.toList());
// 填充三级分类
catelog2Vo.setCatelog3List(collect);
}
return catelog2Vo;
}).collect(Collectors.toList());
}
return catelog2Vos;
}));
return parent_cid;
}
可以用@Caching 注解实现多个操作

删除 value 下的所有分区

读模式:
写模式:(缓存与数据库一致):