将Token保存到Redis中

将手机号码(String)为key, 验证码(String)为value存入Redis

key,  value均是字符串的方法:

stringRedisTemplate.opsForValue().set(String key, String value, long timeout, TimeUnit unit)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Autowired
private StringRedisTemplate stringRedisTemplate;

@Override

public Result sendCode(String phone, HttpSession session) {
//1.校验手机号是否合法
if(RegexUtils.isPhoneInvalid(phone)) {

//2.不合法返回错误信息
return Result.fail("手机号不合法");
}

//3.合法发送验证码
String code = RandomUtil.randomString(6);

//4.保存验证码到redis
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, LOGIN_CODE_TTL, TimeUnit.MINUTES);

    //5.发送成功
log.debug("验证码是:" + code);

//6.返回ok
return Result.ok();
}

以Token(String)为key, 用户(Map)为value存入Redis

key为String, value为Map的方法:

stringRedisTemplate.opsForHash().putAll(String key, Map m)

设置存在时长:

stringRedisTemplate.expire(String key, long timeout, TimeUnit unit)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override

public Result login(LoginFormDTO loginForm, HttpSession session) {

String phone = loginForm.getPhone();

//1.判断手机号是否相等
String code = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
if(code == null) {
return Result.fail("手机号不一致");
}
//2.判断验证码是否相等
if(!(code.equals(loginForm.getCode()))) {
return Result.fail("验证码错误~");
}
//3.根据手机号在数据库中查询用户
User user = query().eq("phone", phone).one();

//4.查询失败新建用户
if(user == null) {
user = createUserByPhone(phone);
}

//5.用户保存到redis中
String token = UUID.randomUUID().toString();
String tokenKey = LOGIN_USER_KEY +token;
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create().setIgnoreNullValue(true)
.setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));
stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);

//5.返回ok
return Result.ok(token);
}

前端拦截器

前端将会拦截每一次请求, 在每次请求头中加入Token

拦截器

全部拦截

访问完毕后需要将ThreadLocal中的信息删除

RefreshTokenInterceptor类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.servlet.HandlerInterceptor;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import static com.hmdp.utils.RedisConstants.LOGIN_USER_KEY;
import static com.hmdp.utils.RedisConstants.LOGIN_USER_TTL;

public class RefreshTokenInterceptor implements HandlerInterceptor {

private StringRedisTemplate stringRedisTemplate;

public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate; //这里不采用注解注入, 使用构造器注入
}

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {

//1.获取请求头中的token
String token = request.getHeader("authorization");
String tokenKey = LOGIN_USER_KEY + token;

//2.根据token判断用户是否为空
if(StrUtil.isBlank(token)) {
return true; //不能返回false, 应该放行交给登录拦截器处理
}

//3.根据token在redis中获取用户
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(tokenKey);

//4.判断用户是否为空, 为空的原因可能有token过期
if(userMap == null) {
response.setStatus(401);
return false;
}

//5.将userDTO保存到ThreadLocal中供其他方法使用
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
UserHolder.saveUser(userDTO);

//6.刷新token有效期
stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);

//7.放行
return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
UserHolder.removeUser();
}
}

登录拦截

LoginInterceptor类

用于登录拦截

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import com.hmdp.dto.UserDTO;

import org.springframework.web.servlet.HandlerInterceptor;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1.从ThreadLocal中获取用户信息
UserDTO user = UserHolder.getUser();

//2.用户不存在,拦截
if(user == null) {
response.setStatus(401);
return false;
}

//3.用户存在,放行
return true;
}
}

对ThreadLocal的管理

UserHolder类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class UserHolder {

private static final ThreadLocal<User> tl = new ThreadLocal<>();

public static void saveUser(User user){
tl.set(user);
}

public static User getUser(){
return tl.get();
}

public static void removeUser(){
tl.remove();
}
}

启用登录拦截器

拦截的优先级可用order(int num)来指定, num越小优先级越高, 若不设置默认为0, 按添加的顺序拦截.

MvcConfig类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import com.hmdp.utils.LoginInterceptor;
import com.hmdp.utils.RefreshTokenInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

import javax.annotation.Resource;

@Configuration
public class MvcConfig implements WebMvcConfigurer {

@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);
registry.addInterceptor(new LoginInterceptor()).excludePathPatterns(
"/shop/**", "shop-type/**", "/upload/**",
"/blog/hot", "/user/code", "/user/login"
).order(1);
}
}

添加Redis缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public Result queryShopById(Long id) {
String key = CACHE_SHOP_KEY + id;

//1. 根据id在redis中查询
String shopStr = stringRedisTemplate.opsForValue().get(key);

//2. 判断是否存在
if (StrUtil.isNotBlank(shopStr)) {
//3. 存在直接返回
Shop shop = JSONUtil.toBean(shopStr, Shop.class);
return Result.ok(shop);
}

//4. 不存在在数据库中查询
Shop shop = getById(id);

//5. 数据库中查询不到则返回错误信息401
if(shop == null) {
return Result.fail("商铺不存在");
}



//6. 数据库查询到保存到redis中
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop));

//7. 返回Shop
return Result.ok(shop);
}

缓存更新策略

根据分析可得对于较为频繁访问的数据采用主动更新的策略, 更新时选择先操作数据库, 再删除缓存.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public Result update(Shop shop) {
//1. 判断该商品是否存在
Long id = shop.getId();

//2. 不存在返回错误
if(id == null) {
return Result.fail("商品不存在");
}

//3. 存在更新数据库
updateById(shop);

//4. 删除Redis
stringRedisTemplate.delete(CACHE_SHOP_KEY + id);

//5. 返回结果
return Result.ok();
}

缓存穿透

因为布隆过滤器实现较为复杂, 在此选择实现缓存空对象

商铺缓存中采用缓存空对象来解决缓存击穿的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public Result queryShopById(Long id) {

String key = CACHE_SHOP_KEY + id;

//1. 根据id在redis中查询
String shopStr = stringRedisTemplate.opsForValue().get(key);

//2. 判断是否存在
if (StrUtil.isNotBlank(shopStr)) { //isNotBlank()能检测null、""和"\n\t"
//3. 存在直接返回
Shop shop = JSONUtil.toBean(shopStr, Shop.class);
return Result.ok(shop);
}

//4. 判断从Redis中查到的值是否为"", 是则直接返回错误信息
if(shopStr != null) { //如果不是null表明是""
return Result.fail("商铺信息出错~");
}

//4. 不存在在数据库中查询
Shop shop = getById(id);

//5. 数据库中查询不到则返回错误信息401.
// 并且为了防止缓存穿透, 将一个空值存入Redis中, 设置短TTL
if(shop == null) {
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
return Result.fail("商铺不存在");
}

//6. 数据库查询到保存到redis中
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop));

//7. 返回Shop
return Result.ok(shop);
}

缓存雪崩

缓存击穿

互斥锁

用Redis实现锁

redis的SETNX命令(SET if Not eXists)

  语法:SETNX key value

  功能:当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。

1
2
3
4
5
6
7
8
public boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}

public void unLock(String key) {
stringRedisTemplate.delete(key);
}

用递归实现互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public Result queryShopById(Long id) {
String key = CACHE_SHOP_KEY + id;

//1. 根据id在redis中查询
String shopStr = stringRedisTemplate.opsForValue().get(key);

//2. 判断是否存在
if (StrUtil.isNotBlank(shopStr)) { //isNotBlank()能检测null、""和"\n\t"

//3. 存在直接返回
Shop shop = JSONUtil.toBean(shopStr, Shop.class);
return Result.ok(shop);
}

//4. 判断从Redis中查到的值是否为"", 是则直接返回错误信息
if(shopStr != null) { //如果不是null表明是""
return Result.fail("商铺信息出错~");
}

//4. 尝试获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;

//5. 判断是否获得互斥锁
if(tryLock(lockKey)) {

//5.1.1 如果有锁就根据id查询数据库
Shop shop = getById(id);

//5.1.2 数据库中查询不到则返回错误信息401.
// 并且为了防止缓存穿透, 将一个空值存入Redis中, 设置短TTL
if(shop == null) {
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
return Result.fail("商铺不存在");
}

//5.1.3 数据库查询到保存到redis中
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop));

//5.1.4 释放锁
unLock(lockKey);

//5.1.5 返回Shop
return Result.ok(shop);
}

//5.2.1 无锁则休眠一段时间
try {
Thread.sleep(50);
String shopJSON = stringRedisTemplate.opsForValue().get(key);
//5.2.2 休眠过后再次在Redis中查询店铺缓存
if(StrUtil.isNotBlank(shopJSON)) {
Shop shop = JSONUtil.toBean(shopJSON, Shop.class);
return Result.ok(shop);
}

//5.2.3 未命中就递归
return queryShopById(id);
} catch (InterruptedException e) {
unLock(String key)
stringRedisTemplate.delete(key);
    }
}

用while循环实现互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public Result queryShopById(Long id) {

String key = CACHE_SHOP_KEY + id;
while(true) {
//1. 根据id在redis中查询
String shopStr = stringRedisTemplate.opsForValue().get(key);

//2. 判断是否存在
if (StrUtil.isNotBlank(shopStr)) { //isNotBlank()能检测null、""和"\n\t"
//3. 存在直接返回
Shop shop = JSONUtil.toBean(shopStr, Shop.class);
return Result.ok(shop);
}

//4. 判断从Redis中查到的值是否为"", 是则直接返回错误信息
if(shopStr != null) { //如果不是null表明是""
return Result.fail("商铺信息出错~");
}

//4. 尝试获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;

//5. 判断是否获得互斥锁
if(tryLock(lockKey)) {
try {
//5.1.1 如果有锁就根据id查询数据库
Shop shop = getById(id);
Thread.sleep(200); //模拟查询操作进行了200ms
} catch (InterruptedException e) {
e.printStackTrace();
}

//5.1.2 数据库中查询不到则返回错误信息401.
// 并且为了防止缓存穿透, 将一个空值存入Redis中, 设置短TTL
if(shop == null) {
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
return Result.fail("商铺不存在");
}

//5.1.3 数据库查询到保存到redis中
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop));

//5.1.4 释放锁
unLock(lockKey);

//5.1.5 返回Shop
return Result.ok(shop);
}

try {
//5.2.1 无锁则休眠一段时间后继续while循环
Thread.sleep(50);
} catch (InterruptedException e) {
return Result.fail("睡出事了~~");
}
}
}

逻辑过期

RedisData类

需要在原先类上加上一个逻辑过期日期, 为了避免改源代码, 所以有两种方法, 让RedisData类继承原始类, 或者设置一个Object类型的变量, 这里选择后者.

1
2
3
4
5
6
7
8
import lombok.Data;
import java.time.LocalDateTime;

@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}

ServiceImpl类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public Result queryShopByIdByExpire(Long id) {

String key = CACHE_SHOP_KEY + id;

while(true) {
//1. 从Redis中查商铺缓存
String shopJSON = stringRedisTemplate.opsForValue().get(key);

//2. 判断是否命中
if(StrUtil.isBlank(shopJSON)) {

//3. 未命中返回空(因为是热点Key, 不存在不命中的情况, 如果不命中表明这不是热点Key, 直接返回空)
return Result.fail("店铺不存在");
}

//4. 命中判断逻辑是否过期
RedisData shopData = JSONUtil.toBean(shopJSON, RedisData.class);
if(!shopData.getExpireTime().isBefore(LocalDateTime.now())) {
//4.1 未过期则返回店铺信息
return Result.ok(shopData.getData());
}

//4.2 过期则尝试获取互斥锁
//4.3 判断是否获取锁
String lockKey = LOCK_SHOP_KEY + id;

if(!tryLock(lockKey)) {
//4.3.1 获取不到锁直接返回店铺信息
return Result.ok(shopData.getData());

}

//4.3.2.1 获取到判断逻辑日期是否过期
String shopJSON2 = stringRedisTemplate.opsForValue().get(key);
RedisData shopData2 = JSONUtil.toBean(shopJSON2, RedisData.class);
if(shopData2.getExpireTime().isAfter(LocalDateTime.now())) {

//4.3.2.3 未过期返回结果
return Result.ok(shopData2.getData());
}

//4.3.2.2 过期开启独立线程
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
//4.3.2.2.1 根据id查询数据库, 将查询到的店铺数据写入Redis, 并设置逻辑过期时间
this.saveByExpire(id, 20L);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
//4.3.2.2.2 释放互斥锁
unLock(lockKey);
}
});
}
}

封装Redis工具类

初始化与方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
@Component
public class CacheClient {
private StringRedisTemplate stringRedisTemplate; //不采用注解注入, 采用构造器注入

public CacheClient(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

public boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}

public void unLock(String key) {
stringRedisTemplate.delete(key);
}
}
1
2
3
4
5
6
7
8
import lombok.Data;
import java.time.LocalDateTime;

@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}

缓存与逻辑缓存(String)

1
2
3
4
5
6
7
8
public void set(String key, Object value, Long timeout, TimeUnit unit) {
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), timeout, unit);
}

public void setWithLogicalExpire(String key, Object value, Long timeout, TimeUnit unit) {
RedisData objectRedisData = new RedisData<>(LocalDateTime.now().plusSeconds(unit.toSeconds(timeout)), value);
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(objectRedisData));
}

缓存穿透

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public <R, ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long timeout, TimeUnit unit) {
String key = keyPrefix + id;

while(true) {
//1. 根据id在redis中查询
String json = stringRedisTemplate.opsForValue().get(key);

//2. 判断是否存在
if (StrUtil.isNotBlank(json)) { //isNotBlank()能检测null、""和"\n\t"
//3. 存在直接返回
R result = JSONUtil.toBean(json, type);
return result;
}

//4. 判断从Redis中查到的值是否为"", 是则直接返回错误信息
if(json != null) { //如果不是null表明是""
return null;
}

R result = dbFallback.apply(id);

//5.1.2 数据库中查询不到则返回错误信息401.
// 并且为了防止缓存穿透, 将一个空值存入Redis中, 设置短TTL
if(result == null) {
set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}

//5.1.3 数据库查询到保存到redis中
set(key, result, timeout, unit);

//5.1.5 返回Shop
return result;
}
}

缓存穿透+缓存击穿(while互斥锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public <R, ID> R queryWithPassThroughAndLock(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long timeout, TimeUnit unit) {
String key = keyPrefix + id;
while(true) {
//1. 根据id在redis中查询
String json = stringRedisTemplate.opsForValue().get(key);

//2. 判断是否存在
if (StrUtil.isNotBlank(json)) { //isNotBlank()能检测null、""和"\n\t"
//3. 存在直接返回
R result = JSONUtil.toBean(json, type);
return result;
}

//4. 判断从Redis中查到的值是否为"", 是则直接返回错误信息
if(json != null) { //如果不是null表明是""
return null;
}

//4. 尝试获取互斥锁
//5. 判断是否获得互斥锁
String lockKey = LOCK_SHOP_KEY + id;
if(tryLock(lockKey)) {
//5.1.1 如果有锁就根据id查询数据库
R result = dbFallback.apply(id);

//5.1.2 数据库中查询不到则返回错误信息401.
// 并且为了防止缓存穿透, 将一个空值存入Redis中, 设置短TTL
if(result == null) {
set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}

//5.1.3 数据库查询到保存到redis中
set(key, result, timeout, unit);

//5.1.4 释放锁
unLock(lockKey);

//5.1.5 返回Shop
return result;
}

try {
//5.2.1 无锁则休眠一段时间后继续while循环
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException("睡出事了~~");
}
}
}

缓存击穿(逻辑过期)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public <R, ID> R queryWithLogicalExpire(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long timeout, TimeUnit unit) {
String key = keyPrefix + id;

/*R result = dbFallback.apply(id);
setWithLogicalExpire(key, result, timeout, unit);
return result;*/

while(true) {
//1. 从Redis中查商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);

//2. 判断是否命中
if(StrUtil.isBlank(json)) {
//3. 未命中返回空(因为是热点Key, 不存在不命中的情况, 如果不命中表明这不是热点Key, 直接返回空)
return null;
}

//4. 命中判断逻辑是否过期
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
if(redisData.getExpireTime().isAfter(LocalDateTime.now())) {
//4.1 未过期则返回店铺信息
R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
return r;
}

//4.2 过期则尝试获取互斥锁
//4.3 判断是否获取锁
String lockKey = LOCK_SHOP_KEY + id;
if(!tryLock(lockKey)) {
//4.3.1 获取不到锁直接返回店铺信息
return (R)redisData.getData();

}

//4.3.2.1 获取到判断逻辑日期是否过期
String json2 = stringRedisTemplate.opsForValue().get(key);
RedisData redisData2 = JSONUtil.toBean(json2, RedisData.class);
if(redisData2.getExpireTime().isAfter(LocalDateTime.now())) {
//4.3.2.3 未过期返回结果
R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
return r;
}

//4.3.2.2 过期开启独立线程
CACHE_REBUILD_EXECUTOR.submit(() -> {
//4.3.2.2.1 根据id查询数据库, 将查询到的店铺数据写入Redis, 并设置逻辑过期时间
R r = dbFallback.apply(id);
setWithLogicalExpire(key, r, timeout, unit);

//4.3.2.2.2 释放互斥锁
unLock(lockKey);
return r
});
}
}

工具类的调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {

@Resource
private StringRedisTemplate stringRedisTemplate;

@Resource
private CacheClient cacheClient;

public Result queryShopByIdByLock(Long id) { /**lambda表达式: id -> getById(id) = this.getById*/
Shop shop = cacheClient.queryWithPassThrough(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);
if(shop == null) {
return Result.fail("店铺不存在");
}
return Result.ok(shop);
}

@Override
public Result queryShopByIdByExpire(Long id) { /**lambda表达式: id -> getById(id) = this.getById*/
Shop shop = cacheClient.queryWithLogicalExpire(CACHE_SHOP_KEY, id, Shop.class, this::getById, 30L, TimeUnit.SECONDS);
if(shop == null) {
return Result.fail("店铺为空~~");
}
return Result.ok(shop);
}
}

基于Redis的全局ID生成器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Component
public class RedisIdWorker {
private static final long BEGIN_TIMESTAMP = 1664582400L; //开始时间戳, 由main函数得到

@Resource
private StringRedisTemplate stringRedisTemplate;

public long nextId(String keyPrefix) {
//1. 生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;

//2. 生成序列号
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);

//3. 拼接并返回
return timestamp << 32 | count;
}

//得到当前时间戳
public static void main(String[] args) {
LocalDateTime time = LocalDateTime.of(2022, 10, 1, 0, 0, 0);
long second = time.toEpochSecond(ZoneOffset.UTC);
System.out.println(second);
}
}

乐观锁 & 悲观锁

乐观锁

版本号法

CAS法

用CAS法(乐观锁)解决超卖问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Resource
private ISeckillVoucherService seckillVoucherService;

@Resource
private RedisIdWorker redisIdWorker;

@Transactional
@Override
public Result seckillVoucher(Long voucherId) {
//1. 查询优惠券, 判断是否存在
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
if(voucher == null) {
return Result.fail("优惠券不存在");

}

//2. 判断秒杀是否开始
if(voucher.getBeginTime().isAfter(LocalDateTime.now())) {
//秒杀尚未开始
return Result.fail("秒杀尚未开始");
}

//3. 判断秒杀是否结束
if(voucher.getEndTime().isBefore(LocalDateTime.now())) {
//秒杀已经结束
return Result.fail("秒杀已经结束");
}

//4. 判断库存是否充足
if(voucher.getStock() < 1) {
return Result.fail("库存不足");
}

//5. 扣减库存(乐观锁)
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") //set stock = stock - 1
.eq("voucher_id", voucherId) //where voucher_id = voucherId
.gt("stock", 0) //where stock>0 (乐观锁)
.update();
if(!success) {
return Result.fail("库存不足");
}

//6. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setUserId(UserHolder.getUser().getId());
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

//7. 返回订单id
return Result.ok(orderId);
}

用悲观锁解决一人一卖问题

注意: 事务的有效性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Override
public Result seckillVoucher(Long voucherId) {
//1. 查询优惠券, 判断是否存在

SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
if(voucher == null) {
return Result.fail("优惠券不存在");
}

//2. 判断秒杀是否开始
if(voucher.getBeginTime().isAfter(LocalDateTime.now())) {
//秒杀尚未开始
return Result.fail("秒杀尚未开始");
}

//3. 判断秒杀是否结束
if(voucher.getEndTime().isBefore(LocalDateTime.now())) {
//秒杀已经结束
return Result.fail("秒杀已经结束");
}

//4. 判断库存是否充足
if(voucher.getStock() < 1) {
return Result.fail("库存不足");
}

Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) { //userId.toString().intern()保证了只对同一个对象上锁
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}

@Transactional
public Result createVoucherOrder(Long voucherId) {
//根据用户id和优惠券id查询该用户是否购买过该优惠券
Long userId = UserHolder.getUser().getId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count > 0) {
return Result.fail("每位用户限购一张");
}

//5. 扣减库存(乐观锁)
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") //set stock = stock - 1
.eq("voucher_id", voucherId) //where voucher_id = voucherId
.gt("stock", 0) //where stock>0
.update();
if(!success) {
return Result.fail("库存不足");
}

//6. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setUserId(UserHolder.getUser().getId());
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

//7. 返回订单id
return Result.ok(orderId);
}

pom.xml

pom文件需要加入以下依赖

1
2
3
4
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>

分布式锁

前边用悲观锁实现的一人一卖存在的问题: 不同的JVM获取的不是同一把锁, 因为NGINX是负载均衡的, 当一个用户同时访问页面会被分配到不同的服务器, 不同的服务器之间的锁不共用, 从而出现错误. 而分布式锁可以解决这个问题.

基于Redis的分布式锁

如果释放锁时不判断锁标识是否是自己

用Redis分布式锁解决一人一卖

SimpleRedisLock类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import cn.hutool.core.lang.UUID;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.TimeUnit;

public class SimpleRedisLock implements ILock{
private String name;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-"; //标识不同进程

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

public boolean tryLock(long timeout) {
String threadId = ID_PREFIX + Thread.currentThread().getId();
Boolean tryLock = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeout, TimeUnit.SECONDS);
return Boolean.TRUE.equals(tryLock);
}

public void unlock() {
String threadId = ID_PREFIX + Thread.currentThread().getId();
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
if(threadId.equals(id)) {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}
VoucherOrderServiceImpl类
1
2
3
4
5
6
7
8
9
10
11
12
13
Long userId = UserHolder.getUser().getId();
SimpleRedisLock redisLock = new SimpleRedisLock("order" + userId, stringRedisTemplate);
boolean isLock = redisLock.tryLock(1200);
if(!isLock) {
return Result.fail("黄牛滚啊[○・`Д´・ ○]");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
redisLock.unlock();
}

不足

Redisson

官网地址: https://redisson.org

GitHub地址: https://github.com/redisson/redisson

Redisson配置

注意: 不推荐使用在创建项目时SpringBoot选中Redisson

引入依赖
1
2
3
4
5
<dependency>    
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
配置Redisson客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
public class RedisConfig {
@Bean
public RedissonClient redissonClient() {
// 配置类
Config config = new Config();

// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://192.168.150.101:6379").setPassowrd("123321");
// 创建客户端

return Redisson.create(config);
}
}

使用Redisson分布式锁解决一人一卖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Resource
private RedissonClient redissonClient;
Long userId = UserHolder.getUser().getId();
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = lock.tryLock();
if(!isLock) {
return Result.fail("黄牛滚啊[○・`Д´・ ○]");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
lock.unlock();
}

Redisson分布式锁原理

Redisson可重入锁原理

获取锁的Lua脚本

释放锁的Lua脚本

Redisson分布式锁主从一致性

需要所有Redis服务器同时获得锁才可以执行业务代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Resource
private RedissonClient redissonClient;

@Resource
private RedissonClient redissonClient2;

@Resource
private RedissonClient redissonClient3;

private RLock lock;

@BeforeEach
void setUp() {
RLock lock1 = redissonClient.getLock("order");
RLock lock2 = redissonClient2.getLock("order");
RLock lock3 = redissonClient3.getLock("order");

//创建联锁
lock = redissonClient.getMultiLock(lock1, lock2, lock3);
}

Redis在点赞业务中的应用

上传图片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@PostMapping("blog")
public Result uploadImage(@RequestParam("file") MultipartFile image) {
try {
// 获取原始文件名称
String originalFilename = image.getOriginalFilename();

// 生成新文件名
String fileName = createNewFileName(originalFilename);

// 保存文件
image.transferTo(new File(SystemConstants.IMAGE_UPLOAD_DIR, fileName));

// 返回结果
return Result.ok(fileName);
} catch (IOException e) {
throw new RuntimeException("文件上传失败", e);
}
}

private String createNewFileName(String originalFilename) {
// 获取后缀
String suffix = StrUtil.subAfter(originalFilename, ".", true);

// 生成目录
String name = UUID.randomUUID().toString();
int hash = name.hashCode();
int d1 = hash & 0xF;
int d2 = (hash >> 4) & 0xF;

// 判断目录是否存在
File dir = new File(SystemConstants.IMAGE_UPLOAD_DIR, StrUtil.format("/blogs/{}/{}", d1, d2));
if (!dir.exists()) {
dir.mkdirs();
}

// 生成文件名
return StrUtil.format("/blogs/{}/{}/{}.{}", d1, d2, name, suffix);
}

@TableField(exist = false)注解

@TableField(exist = false) 注解可以解决表中表的问题,加载bean属性上,表示当前属性不是数据库的字段但在项目中必须使用,这样可以用来把一个数据表当作一个字段来输出,用来实现表中表数据输出。这样设置在新增等使用bean的时候,mybatis-plus就会忽略这个,不会报错

点赞高亮

如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)

Blog类

包含用@TableField(exist = false)注解的属性isLike

1
2
@TableField(exist = false)
private Boolean isLike;
BlogServiceImpl类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public Result getBlogById(Long blogId) {
Blog blog = getById(blogId);
if(blog == null) {
return Result.fail("Blog不存在");
}
isLike(blog);
return Result.ok(blog);
}

private void isLike(Blog blog) {
String key = BLOG_LIKED_KEY + blog.getId();
Long userId = UserHolder.getUser().getId();
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
//(score == null) ? blog.setIsLike(false) : blog.setIsLike(true); 我的猪鼻写法
blog.setIsLike(score != null);
}

Redis在点赞业务中的应用

Redis数据结构的选择

点赞的实现

  • 将当前时间的毫秒数作为得分(score), 点赞越早得分越高

  • 将指定前缀和BlogId拼接起来作为key

  • 将已点赞的用id集合作为value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
public Result likeBlog(Long blogId) {
//1. 获取Blog对象
Blog blog = getById(blogId);

//2. 判断Blog对象是否存在
if(blog == null) {
return Result.fail("Blog不存在");
}

//3. 存在则通过Thread获取当前用户
Long userId = UserHolder.getUser().getId();

//4. 判断该用户是否点过赞
String key = BLOG_LIKED_KEY + blogId;
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());

//5. 点过赞则取消点赞(数据库), 将用户从Blog的点赞Redis中删除
if(score != null) {
boolean cancel = update().setSql("liked = liked - 1").eq("id", blogId).update();
if(cancel) {
stringRedisTemplate.opsForZSet().remove(key, userId.toString());
blog.setIsLike(false);
return Result.ok();
}
return Result.fail("点赞出错");
}

//6. 未点过赞则点赞(数据库), 将用户加入Blog的点赞Redis
boolean success = update().setSql("liked = liked + 1").eq("id", blogId).update();
if(!success) {
return Result.fail("点赞出错");
}
stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());
blog.setIsLike(true);

//7. 返回结果
return Result.ok();
}

点赞排行榜的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
public Result queryLikes(Long blogId) {
//1. 得到blog, 判断是否存在
Blog blog = getById(blogId);
if(blog == null) {
return Result.fail("blog不存在");
}

//2. 得到Key
String key = BLOG_LIKED_KEY + blogId;

//3. 得到点赞集合
Set<String> strings = stringRedisTemplate.opsForZSet().range(key, 0, 4);

//4. 判断集合是否为空
if(strings == null || strings.isEmpty()) {
//5. 为空则直接返回
return Result.ok(Collections.emptyList());
}

//5. 不为空, 则解析出其中的用户id
List<Long> ids = strings.stream().map(Long::valueOf).collect(Collectors.toList());

//6. 根据用户Id查询用户
List<UserDTO> userDTOS = userService.listByIds(ids)
.stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
//4. 返回前五名
return Result.ok(userDTOS);

}

Redis消息队列实现异步秒杀

!!!注意: 因为Redis Stream 是 Redis 5.0 版本新增加的数据结构, 因此需要Redis版本至少为5.0, 不然就会疯狂报错o(╥﹏╥)o

写在前面: 虽然主流的消息队列技术是MQ等, 这些技术与Redis的消息队列思想大同小异, 并且有些小企业因为业务较小, 不搭建MQ集群, 选择使用Redis来完成消息队列功能, 因此该技术的学习还是很有必要的.

异步秒杀

当前我们采用串行的方法执行一人一单, 下单流程如下:

当用户发起请求,此时会请求 nginx,nginx 会访问到 tomcat,而 tomcat 中的程序,会进行串行操作,分成如下几个步骤

1、查询优惠卷

2、判断秒杀库存是否足够

3、查询订单

4、校验是否是一人一单

5、扣减库存

6、创建订单

优化方案

将订单资格判断放入到 Redis 中,只要资格通过就返回一个订单消息, 并且将下单任务添加到消息队列中.

后台有一个线程时刻读取并执行消息队列中的任务, 当队列中没有任务时就阻塞等待, 直到有新任务进入队列.

该方案将判断订单资格和将订单写入数据库分开执行, 用户在很短时间内就可以得到下单成功与否的反馈, 而写入数据库的操作可以慢慢完成.

消息队列

消息队列数据结构的选择

基于Stream的消息队列

消费者组

基于Stream的异步秒杀实现

  1. 创建一个Stream类型的消息队列,名为stream.orders.

  2. 新增秒杀优惠券的同时,将优惠券信息保存到 Redis 中.

  3. 基于Lua 脚本,判断秒杀库存、一人一单,决定用户是否抢购成功. 成功直接向stream.orders中添加消息,内容包含voucherId、userId、orderId(因为整个过程需要保证原子性, 所以需要使用Lua脚本).

  4. 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单. 实现抢购与下单异步.

Lua脚本

用Lua脚本实现资格判断和向消息队列中添加信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]

-- 1.2.用户id
local userId = ARGV[2]

-- 1.3.订单id
local orderId = ARGV[3]

-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stoc:' .. voucherId

-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

VoucherOrderServiceImpl类

异步处理线程池

@PostConstruct注解: 被注解的方法,在对象加载完依赖注入后执行。

此注解是在Java EE5规范中加入的,在Servlet生命周期中有一定作用,它通常都是一些初始化的操作,但初始化可能依赖于注入的其他组件,所以要等依赖全部加载完再执行。

1
2
3
4
5
6
7
8
//异步处理线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

//在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
线程处理业务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
private class VoucherOrderHandler implements Runnable {

@Override
public void run() {
while (true) {
try {
// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
SreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有消息,继续下一次循环
continue;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);

// 3.创建订单
createVoucherOrder(voucherOrder);

// 4.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
//处理异常消息
handlePendingList();
}
}
}

private void handlePendingList() {
while (true) {
try {
// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create("stream.orders", ReadOffset.from("0"))
);

// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有异常消息,结束循环
break;
}

// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);

// 3.创建订单
createVoucherOrder(voucherOrder);

// 4.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} catch (Exception e) {
log.error("处理pendding订单异常", e);
try{
Thread.sleep(20);
}catch(Exception e){
e.printStackTrace();
}
}
}
}
}
调用Lua脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));//在Resource文件里调用Lua脚本, 脚本文件名:seckill.lua
SECKILL_SCRIPT.setResultType(Long.class);
}

IVoucherOrderService proxy;
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}

// 3.获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();

// 4.返回订单id
return Result.ok(orderId);
}
添加秒杀券业务 & 创建订单业务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);

// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀库存到Redis中
//SECKILL_STOCK_KEY 这个变量定义在RedisConstans中
//private static final String SECKILL_STOCK_KEY ="seckill:stock:"
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}

@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
// 在数据库中扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
log.error("库存不足!");
return;
}

// 在数据库中创建订单
save(voucherOrder);

Redis实现共同关注

数据结构的选择

  • Redis中的集合元素可以做并差交集操作, 同时有具有唯一性, 所以数据结构上选择集合.

  • 没有有序性的需求, 所以选择无序的set集合.

业务步骤

  1. 在关注时以关注人id为key, 被关注人id为value放到一个set集合中.

  2. 取关用户时将被取关人id从set集合中删除.

  3. 将当前用户与被关注人的关注列表求交集, 即可得到共同关注列表.

代码实现

关注业务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public Result follow(Long followUserId, Boolean isFollow) {
//1. 获得登录用户的id
Long userId = UserHolder.getUser().getId();

//2. 判断是否关注
if(BooleanUtil.isTrue(isFollow)) {
Follow follow = new Follow();
follow.setUserId(userId);
follow.setFollowUserId(followUserId);
boolean save = save(follow);
if(save) {
stringRedisTemplate.opsForSet().add(LIKE_COMMON_KEY + userId, followUserId.toString()); //存入redis
}
} else {
boolean remove = remove(new QueryWrapper<Follow>()
.eq("user_id", userId).eq("follow_user_id", followUserId));
if(remove) {
stringRedisTemplate.opsForSet().remove(LIKE_COMMON_KEY + userId, followUserId.toString());//从redis中删除
}
}
return Result.ok();
}

@Override
public Result isFollow(Long followUserId) {
Long userId = UserHolder.getUser().getId();
Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();
return Result.ok(count > 0);
}

共同关注业务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public Result followCommons(Long followId) {
//1. 获取当前用户
Long userId = UserHolder.getUser().getId();

//2. 求交集
Set<String> intersect = stringRedisTemplate.opsForSet(
.intersect(LIKE_COMMON_KEY + userId, LIKE_COMMON_KEY + followId);

//4. 判断是否为空
if(intersect == null || intersect.isEmpty()) {
//为空返回一个空集合
return Result.ok(Collections.emptyList());
}

//5. 处理数据
List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
List<UserDTO> commonUsers = userService.listByIds(ids)
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());

//6. 返回
return Result.ok(commonUsers);
}

Redis实现关注推送

Feed流模式

Feed流的三种实现方案

拉模式

推模式

推拉结合模式

三种模式的比较

 

在此我们选择实现最容易的推模式

基于推模式实现关注推送功能

业务需求

滚动分页VS角标分页

滚动分页更好更人性化. 可以参考刷朋友圈, 角标分页可能会出现向下滚动分页时出现了两条作者和内容都相同的朋友圈., 而滚动分页就不会出现这样的情况. 当需要阅读新发布的一条朋友圈时可以滑到最前面查看.

角标分页

滚动分页

当出现时间戳一样的情况时, 需要借助偏移量offset来实现

业务步骤

  1. 需要满足时间戳排序和滚动分页, 可以使用ZSet集合存放关注列表发布的Blog

  2. 作者发布Blog时需要获取其粉丝列表

  3. 发布成功后需要将BlogId推送至粉丝的ZSet集合中

  4. 当用户查看点击关注按钮时获取其ZSet集合中的BlogId集合

  5. 将BlogId集合转化为Blog集合, 再加上minTime和offset形成一个新的类对象返回给前端实现分页

代码实现

发布博客业务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public Result saveBlog(Blog blog) {
// 获取登录用户
Long userId = UserHolder.getUser().getId();
if(userId == null) {
return Result.fail("用户不存在");
}
blog.setUserId(userId);
// 保存探店博文
boolean save = save(blog);
if(!save) {
return Result.fail("发布博客失败");
}

// 获取该用户粉丝列表
List<Follow> follows = followService.query().eq("follow_user_id", userId).list();

// 将博文Id推送到每个粉丝的Redis ZSet集合中
for (Follow follow : follows) {
Long followId = follow.getId();
stringRedisTemplate.opsForZSet()
.add(FEED_KEY + followId, blog.getId().toString(), System.currentTimeMillis());
}

// 返回id
return Result.ok(blog.getId());
}
查看关注列表博客
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Override

public Result getLikeBlog(Long max, Integer offset) {

//1. 获得当前用户Id
Long userId = UserHolder.getUser().getId();

//2. 在Redis中获得关注列表的BlogId集合
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
.reverseRangeByScoreWithScores(FEED_KEY + userId, 0, max, offset, PRE_BLOG);

//3. 非空判断
if (typedTuples == null || typedTuples.isEmpty()) {
return Result.ok(Collections.emptyList());
}

//4. 解析数据
ArrayList<Object> ids = new ArrayList<>(typedTuples.size()); //不采用自动扩容, 提高效率

long minTime = 0;
int os = 1;
for (ZSetOperations.TypedTuple<String> typedTuple : typedTuples) {

//5. 获得博客id
ids.add(Long.valueOf(typedTuple.getValue()));

//6. 获得分数(时间戳)
long time = typedTuple.getScore().longValue();
if(time == minTime) {
os++;
} else {
minTime = time;
os = 1;
}
}

//7. 防止出现多页最小时间相同的情况 1 1 1 1 1 1 5 5
if(max == minTime) {
os += offset;
}

//8. 将blogId集合转化成blog集合
String idStr = StrUtil.join(",", ids);
List<Blog> blogList = query().in("id", ids).last("order by field(id, " + idStr + ")").list();
for (Blog blog : blogList) {
isLike(blog);
}
//9. 返回封装类
LikeBlogs blogs = new LikeBlogs();
blogs.setList(blogList);
blogs.setLastId(minTime);
blogs.setOffset(os);
return Result.ok(blogs);
}

Redis实现附近商铺功能

GEO数据结构

 

附近商铺搜索

将店铺存入GEO集合

这里选择在for循环里将商铺位置放到集合中, 最后在将集合存到Redis里.

而不选择在for循环中将商铺一个一个存入Redis, 因为这样每次都要与Redis建立连接,  效率不如直接将集合存入Redis.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//1. 查询店铺信息
List<Shop> list = list();

//2. 将店铺按照typeId分组, 相同typeId的放到一组
Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));

//3. 分批写入redis
for (Map.Entry<Long, List<Shop>> entry : map.entrySet()) {
//3.1 获取类型id
Long typeId = entry.getKey();
String key = "shop:geo:" + typeId;
//3.2 获取相同类型店铺集合
List<Shop> value = entry.getValue();
List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size());
for (Shop shop : value) {
//不选择:
//stringRedisTemplate.opsForGeo().add(key, new Point(shop.getX(), shop.getY()), shop.getId().toString));
locations.add(new RedisGeoCommands.GeoLocation<>(
shop.getId().toString(), new Point(shop.getX(), shop.getY())
));
}
stringRedisTemplate.opsForGeo().add(key, locations);
}

代码实现

环境配置

spring-data-redis 2.3.9 版本并不支持 Redis 6.2 提供的 GEOSEARCH 命令,因此我们需要提升版本,修改自己的 POM文件.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-data-redis</artifactId>
<groupId>org.springframework.data</groupId>
</exclusion>
<exclusion>
<artifactId>lettuce-core</artifactId>
<groupId>io.lettuce</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.6.RELEASE</version>
</dependency>

实现类方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Override

public Result queryShopp(Integer typeId, Integer current, Double x, Double y) {

// 1.判断是否需要根据坐标查询
if (x == null || y == null) {
// 不需要坐标查询,按数据库查询
Page<Shop> page = query()
.eq("type_id", typeId)
.page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
// 返回数据
return Result.ok(page.getRecords());
}

// 2.计算分页参数
int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;
int end = current * SystemConstants.DEFAULT_PAGE_SIZE;

// 3.查询redis、按照距离排序、分页。结果:shopId、distance
String key = SHOP_GEO_KEY + typeId;
GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo() // GEOSEARCH key BYLONLAT x y BYRADIUS 10 WITHDISTANCE
.search(
key,
GeoReference.fromCoordinate(x, y),
new Distance(5000),
RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end)
);

// 4.解析出id
if (results == null) {
return Result.ok(Collections.emptyList());
}
List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
if (list.size() <= from) {
// 没有下一页了,结束
return Result.ok(Collections.emptyList());
}

// 4.1.截取 from ~ end的部分
List<Long> ids = new ArrayList<>(list.size());
Map<String, Distance> distanceMap = new HashMap<>(list.size());
list.stream().skip(from).forEach(result -> {
// 4.2.获取店铺id
String shopIdStr = result.getContent().getName();
ids.add(Long.valueOf(shopIdStr));
// 4.3.获取距离
Distance distance = result.getDistance();
distanceMap.put(shopIdStr, distance);
});

// 5.根据id查询Shop
String idStr = StrUtil.join(",", ids);
List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
for (Shop shop : shops) {
shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
}

// 6.返回
return Result.ok(shops);
}

Redis实现签到功能

BitMap

BitMap的用法

签到功能

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override

public Result sign() {

//1. 获得当前用户id, 作为key的中间部分
Long userId = UserHolder.getUser().getId();
if(userId == null) {
return Result.fail("用户未登录");
}

//2. 获得当前年月作为key的后缀
LocalDateTime now = LocalDateTime.now();
String key = USER_SIGN_KEY + userId + now.format(DateTimeFormatter.ofPattern(":yyyyMM"));

//3. 获得今天是本月的第几天
int dayOfMonth = now.getDayOfMonth();

//4. 写入Redis setbit key offset 1
stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);
return Result.ok("签到成功");
}

签到统计

实现签到统计功能

代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Override
public Result signCount() {
//1. 获得当前用户id, 作为key的中间部分
Long userId = UserHolder.getUser().getId();
if(userId == null) {
return Result.fail("用户未登录");
}

//2. 获得当前年月作为key的后缀
LocalDateTime now = LocalDateTime.now();
String key = USER_SIGN_KEY + userId + now.format(DateTimeFormatter.ofPattern(":yyyyMM"));

//3. 获得今天是本月的第几天
int dayOfMonth = now.getDayOfMonth();

//4. 从Redis中获得签到表
List<Long> result = stringRedisTemplate.opsForValue().bitField(
key,
BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
);

//5. 非空/非零判断
if(result == null || result.isEmpty()) {
return Result.ok(0);
}

Long num = result.get(0);
if(num == null || num == 0) {
return Result.ok(0);
}

//5. 循环
int count = 0;
while(true) {
if((num.intValue() & 1) == 1 ) {
count++;
} else {
break;
}
num >>>= 1;
}
return Result.ok("已连续签到" + count + "天");
}

Redis实现UV统计

HyperLogLog用法

Hyperloglog(HLL)是从Loglog算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。

相关算法原理大家可以参考:https://juejin.cn/post/6844903785744056333#heading-0

Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb,内存占用低的令人发指!作为代价,其测量结果是概率性的,有小于0.81%的误差。不过对于UV统计来说,这完全可以忽略。

实现UV统计

在这里我们直接在UserController类里加入一个uv统计接口,向HyperLogLog中添加100万条数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@GetMapping("/uv")
public Result uv() {
String[] values = new String[1000];
int j = 0;
for (int i = 0; i < 1000000; i++) {
j = i % 1000; //为防止数组越界, j的值为0-999
values[j] = "user_" + i;
if(j == 999) {
//每一千次写入一次
stringRedisTemplate.opsForHyperLogLog().add("h12", values);
}
}
Long count = stringRedisTemplate.opsForHyperLogLog().size("h12"); //统计数量
return Result.ok(count);
}

可见结果与实际十分接近, 用计算器求得误差为: 0.2407%

并且内存占用很少!!!