1、基本概念
Redis诞生于2009年全称是Remote Dictionary Server,远程词典服务器,是一个基于内存的键值型NoSQL数据库。
1.1、redis特性
- 键值(key-value)型,value支持多种不同数据结构,功能丰富
- 单线程,每个命令具备原子性
Redis的网络IO和键值对读写是由一个线程来完成的,但Redis的其他功能,例如持久化、异步删除、集群数据同步等操作依赖于其他线程来执行
- 低延迟,速度快
原因:基于内存、采用多路复用非阻塞I/O、单线程
- 支持数据持久化
- 支持主从集群、分片集群
- 支持多语言客户端
1.2、数据类型
Redis是一个key-value的数据库,key一般是String类型,不过value的类型多种多样
2、redis常见命令
2.1、String
String是Redis中最基本的数据类型,可以存储任何数据,包括二进制数据、序列化的数据、JSON化的对象甚至是图片。
String类型,也就是字符串类型,是Redis中最简单的存储类型
底层SDS结构。为什么不直接实用字符串?①C 语言字符数组最后一个元素总是 ‘\0’,而在Redis中\0可能会被判定为提前结束而识别不了字符串②获取字符串长度为O(n),因为C字符串需要去遍历,开销较大,SDS对象有len属性直接获取
其value是字符串,不过根据字符串的格式不同,又可以分为3类:
(1)string:普通字符串
(2)int:整数类型,可以做自增、自减操作
(3)float:浮点类型,可以做自增、自减操作
命令 |
描述 |
SET |
添加或者修改已经存在的一个String类型的键值对 |
GET |
根据key获取String类型的value |
MSET |
批量添加多个String类型的键值对 |
MGET |
根据多个key获取多个String类型的value |
INCR |
让一个整型的key自增1 |
INCRBY |
让一个整型的key自增并指定步长,例如:incrby num 2 让num值自增2 |
INCRBYFLOAT |
让一个浮点类型的数字自增并指定步长 |
SETNX |
添加一个String类型的键值对,前提是这个key不存在,否则不执行 |
SETEX |
添加一个String类型的键值对,并且指定有效期 |
Redis的key允许有多个单词形成层级结构,多个单词之间用” :“隔开,格式如下:
2.2、Hash
Hash类型,也叫散列,底层是hashtable,其value是一个无序字典,类似于Java中的HashMap结构。
Hash结构可以将对象中的每个字段独立存储,可以针对单个字段做CRUD
命令 |
描述 |
HSET key field value |
添加或者修改hash类型key的field的值 |
HGET key field |
获取一个hash类型key的field的值 |
HMSET |
hmset 和 hset 效果相同 ,4.0之后hmset可以弃用了 |
HMGET |
批量获取多个hash类型key的field的值 |
HGETALL |
获取一个hash类型的key中的所有的field和value |
HKEYS |
获取一个hash类型的key中的所有的field |
HVALS |
获取一个hash类型的key中的所有的value |
HINCRBY |
让一个hash类型key的字段值自增并指定步长 |
HSETNX |
添加一个hash类型的key的field值,前提是这个field不存在,否则不执行 |
2.3、List
list列表的数据结构使用的是压缩列表ziplist和普通的双向链表linkedlist组成。元素少的时候会用ziplist,元素多的时候会用linkedlist
ziplist是一种压缩链表,它的好处是更能节省内存空间,因为它所存储的内容都是在连续的内存区域当中的,当数据量较大的时候因为需要重新分配,开销较大
常用来存储一个有序数据,例如:朋友圈点赞列表,评论列表等
命令 |
描述 |
LPUSH key element … |
向列表左侧插入一个或多个元素 |
LPOP key |
移除并返回列表左侧的第一个元素,没有则返回nil |
RPUSH key element … |
向列表右侧插入一个或多个元素 |
RPOP key |
移除并返回列表右侧的第一个元素 |
LRANGE key star end |
返回一段角标范围内的所有元素 |
BLPOP和BRPOP |
与LPOP和RPOP类似,只不过在没有元素时等待指定时间,而不是直接返回nil |
2.4、SET
Redis的Set结构与Java中的HashSet类似,可以看做是一个value为null的HashMap。因为也是一个hash表,因此具备与HashSet类似的特征。数据结构的底层实现有两种方式:Intset 和 Hashtable。当集合中的所有元素都是整数,并且元素数量较少时,Redis 会使用 Intset 作为底层实现。当集合中的元素不仅限于整数,或者元素数量较多时,Redis 会使用 Hashtable 作为底层实现
命令 |
描述 |
SADD key member … |
向set中添加一个或多个元素 |
SREM key member … |
移除set中的指定元素 |
SCARD key |
返回set中元素的个数 |
SISMEMBER key member |
判断一个元素是否存在于set中 |
SMEMBERS |
获取set中的所有元素 |
SINTER key1 key2 … |
求key1与key2的交集 |
SDIFF key1 key2 … |
求key1与key2的差集 |
SUNION key1 key2 … |
求key1和key2的并集 |
2.5、SortedSet
Redis的SortedSet是一个可排序的set集合,与Java中的TreeSet有些类似,但底层数据结构却差别很大。SortedSet中的每一个元素都带有一个score属性,可以基于score属性对元素排序,底层的实现是一个跳表(SkipList)加 hash表。
因为SortedSet的可排序特性,经常被用来实现排行榜这样的功能
命令 |
描述 |
ZADD key score member |
添加一个或多个元素到sorted set ,如果已经存在则更新其score值 |
ZREM key member |
删除sorted set中的一个指定元素 |
ZSCORE key member |
获取sorted set中的指定元素的score值 |
ZRANK key member |
获取sorted set 中的指定元素的排名 |
ZCARD key |
获取sorted set中的元素个数 |
ZCOUNT key min max |
统计score值在给定范围内的所有元素的个数 |
ZINCRBY key increment member |
让sorted set中的指定元素自增,步长为指定的increment值 |
ZRANGE key min max |
按照score排序后,获取指定排名范围内的元素 |
ZRANGEBYSCORE key min max |
按照score排序后,获取指定score范围内的元素 |
ZDIFF、ZINTER、ZUNION |
求差集、交集、并集 |
注意:所有的排名默认都是升序,如果要降序则在命令的Z后面添加REV即可
3、java客户端
3.1、springboot整合redis
3.1.1、引入依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
|
3.1.2、redis基本配置
1 2 3
| spring.redis.host=127.0.0.1 spring.redis.port=6379
|
3.1.3、redis固定配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Configuration public class RedisConfig { @Bean @SuppressWarnings("all") public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<String, Object>(); template.setConnectionFactory(factory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer); template.setHashKeySerializer(stringRedisSerializer); template.setValueSerializer(jackson2JsonRedisSerializer); template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } }
|
redis使用JDK提供的序列化功能。 优点是反序列化时不需要提供类型信息(class),但缺点是需要实现Serializable接口,还有序列化后的结果非常庞大,是JSON格式的5倍左右,这样就会消耗redis服务器的大量内存
所以我们需要 使用Jackson库将对象序列化为JSON字符串。优点是速度快,序列化后的字符串短小精悍,易读
3.1.4、工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
| package com.kaka.redis;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils;
import javax.annotation.Resource; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit;
@Component public final class RedisUtils {
@Resource private RedisTemplate<String, Object> redisTemplate;
public Set<String> keys(String keys){ try { return redisTemplate.keys(keys); }catch (Exception e){ e.printStackTrace(); return null; } }
public boolean expire(String key, long time) { try { if (time > 0) { redisTemplate.expire(key, time, TimeUnit.SECONDS); } return true; } catch (Exception e) { e.printStackTrace(); return false; } }
public long getExpire(String key) { return redisTemplate.getExpire(key, TimeUnit.SECONDS); }
public boolean hasKey(String key) { try { return redisTemplate.hasKey(key); } catch (Exception e) { e.printStackTrace(); return false; } }
@SuppressWarnings("unchecked") public void del(String... key) { if (key != null && key.length > 0) { if (key.length == 1) { redisTemplate.delete(key[0]); } else { redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key)); } } }
public Object get(String key) { return key == null ? null : redisTemplate.opsForValue().get(key); }
public boolean set(String key, Object value) { try { redisTemplate.opsForValue().set(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } }
public boolean setnx(String key, Object value) { try { redisTemplate.opsForValue().setIfAbsent(key,value); return true; } catch (Exception e) { e.printStackTrace(); return false; } }
public boolean set(String key, Object value, long time) { try { if (time > 0) { redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS); } else { set(key, value); } return true; } catch (Exception e) { e.printStackTrace(); return false; } }
public boolean setnx(String key, Object value, long time) { try { if (time > 0) { redisTemplate.opsForValue().setIfAbsent(key, value, time, TimeUnit.SECONDS); } else { set(key, value); } return true; } catch (Exception e) { e.printStackTrace(); return false; } }
public long incr(String key, long delta) { if (delta < 0) { throw new RuntimeException("递增因子必须大于0"); } return redisTemplate.opsForValue().increment(key, delta); }
public long decr(String key, long delta) { if (delta < 0) { throw new RuntimeException("递减因子必须大于0"); } return redisTemplate.opsForValue().increment(key, -delta); }
public Object hget(String key, String item) { return redisTemplate.opsForHash().get(key, item); }
public Map<Object, Object> hmget(String key) { return redisTemplate.opsForHash().entries(key); }
public boolean hmset(String key, Map<String, Object> map) { try { redisTemplate.opsForHash().putAll(key, map); return true; } catch (Exception e) { e.printStackTrace(); return false; } }
public boolean hmset(String key, Map<String, Object> map, long time) { try { redisTemplate.opsForHash().putAll(key, map); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } }
public boolean hset(String key, String item, Object value) { try { redisTemplate.opsForHash().put(key, item, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } }
public boolean hset(String key, String item, Object value, long time) { try { redisTemplate.opsForHash().put(key, item, value); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } }
public void hdel(String key, Object... item) { redisTemplate.opsForHash().delete(key, item); }
public boolean hHasKey(String key, String item) { return redisTemplate.opsForHash().hasKey(key, item); }
public double hincr(String key, String item, double by) { return redisTemplate.opsForHash().increment(key, item, by); }
public double hdecr(String key, String item, double by) { return redisTemplate.opsForHash().increment(key, item, -by); }
public Set<Object> sGet(String key) { try { return redisTemplate.opsForSet().members(key); } catch (Exception e) { e.printStackTrace(); return null; } }
public boolean sHasKey(String key, Object value) { try { return redisTemplate.opsForSet().isMember(key, value); } catch (Exception e) { e.printStackTrace(); return false; } }
public long sSet(String key, Object... values) { try { return redisTemplate.opsForSet().add(key, values); } catch (Exception e) { e.printStackTrace(); return 0; } }
public long sSetAndTime(String key, long time, Object... values) { try { Long count = redisTemplate.opsForSet().add(key, values); if (time > 0){ expire(key, time); } return count; } catch (Exception e) { e.printStackTrace(); return 0; } }
public long sGetSetSize(String key) { try { return redisTemplate.opsForSet().size(key); } catch (Exception e) { e.printStackTrace(); return 0; } }
public long setRemove(String key, Object... values) { try { Long count = redisTemplate.opsForSet().remove(key, values); return count; } catch (Exception e) { e.printStackTrace(); return 0; } }
public List<Object> lGet(String key, long start, long end) { try { return redisTemplate.opsForList().range(key, start, end); } catch (Exception e) { e.printStackTrace(); return null; } }
public long lGetListSize(String key) { try { return redisTemplate.opsForList().size(key); } catch (Exception e) { e.printStackTrace(); return 0; } }
public Object lGetIndex(String key, long index) { try { return redisTemplate.opsForList().index(key, index); } catch (Exception e) { e.printStackTrace(); return null; } }
public boolean lSet(String key, Object value) { try { redisTemplate.opsForList().rightPush(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } }
public boolean lSet(String key, Object value, long time) { try { redisTemplate.opsForList().rightPush(key, value); if (time > 0){ expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } }
public boolean lSet(String key, List<Object> value) { try { redisTemplate.opsForList().rightPushAll(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } }
public boolean lSet(String key, List<Object> value, long time) { try { redisTemplate.opsForList().rightPushAll(key, value); if (time > 0){ expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } }
public boolean lUpdateIndex(String key, long index, Object value) { try { redisTemplate.opsForList().set(key, index, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } }
public long lRemove(String key, long count, Object value) { try { Long remove = redisTemplate.opsForList().remove(key, count, value); return remove; } catch (Exception e) { e.printStackTrace(); return 0; } } }
|
- 使用
1 2 3 4 5 6 7 8 9 10
| @RestController public class Demo { @Autowired RedisUtils redisUtils; @RequestMapping("/redisTest") public String test(){ redisUtils.set("test3:key3","hello,redis"); return redisUtils.get("test3:key3").toString(); } }
|
4、进阶
4.1、redis持久化
4.1.1、rdb
把当前内存中的快照写入磁盘
(1)save:save指令执行会阻塞当前redis服务器,直到当前rdb过程执行完,可能造成长时间阻塞,线上环境不建议使用
(2)bgsave:调用fork函数生成子进程,解决了save的阻塞问题
(3)自动执行:(redis配置文件中配置)save 900 1 save 300 10 save 60 1000
4.1.2、aof
以日志的方式记录每次写命令,重启时再执行aof中的命令达到数据恢复的目的(是目前redis持久化的主流方式)
aof写数据策略:
(1)always:服务器每写入一个命令,就调用一次fdatasync(不会丢失数据)
(2)Everysec:服务器每一秒重调用一次fdatasync(数据同步),最多丢失1秒的数据
(3)NO:操作系统决定任何将缓冲区里面的命令写入磁盘里面,数据丢失量是不确定的
注:always策略持久化数据:先把写命令追加到aof buffer中,下一次进入事件循环循环后,再将buffer写到磁盘上。也就是说,这次写到磁盘上的内容是上一个事件循环产生的所以,即使设置为always,也会丢失一个循环的数据
4.1.3、对比
|
rdb |
aof |
占用存储空间 |
小(数据级) |
大(指令级) |
恢复速度 |
快 |
慢(需要执行指令) |
数据安全性 |
可能会丢失最后一次持久化后的数据 |
根据策略决定 |
4.2、redis数据删除策略
4.2.1、立即删除
方式:创建一个定时器,当设置的key到达到期时间时,由定时器任务立即执行对key的删除操作
优缺点:
(1)节约内存,到时就删,快速释放掉不必要的内存占用
(2)CPU压力变大,无论CPU此时负载量多高,均占用CPU
4.2.2、惰性删除
方式:数据到期时不做删除,等下次访问时进行删除
优缺点:
(1)节约cpu性能
(2)若大量的key在超出超时时间后,很久一段时间内,都没有被获取过,那么可能发生内存泄露(无用的垃圾占用了大量的内存)
4.2.3、定期删除
方式:每隔一段时间主动检查一批过期键,并将其删除。这样可以保证过期键及时地从内存中释放
(1)Redis 默认每秒进行 10 次过期扫描,此配置可通过 Redis 的配置文件 redis.conf 进行配置,配置健为
hz 它的默认值是 hz 10。
【注意】:Redis 每次扫描并不是遍历过期字典中的所有健,而是采用随机抽取判断并删除过期健的形式执行的。
删除流程:
(1)从过期字典随机取20个键
(2)删除这20个键中过期的键
(3)如果过期key的比例超过25%,重复步骤1
优缺点:
(1)分批处理,以避免对cpu产生过大的负载
4.3、redis内存淘汰策略
Redis的内存淘汰策略是指在Redis的用于缓存的内存不足时,怎么处理需要新写入且需要申请额外空间的数据
LRU:淘汰最长时间没有被使用的
LFU:一定时间内使用频次越低的
random:随机
ttl:越早过期的数据
淘汰策略名称 |
策略含义 |
noeviction |
默认策略,不淘汰数据;大部分写命令都将返回错误(DEL等少数除外) |
allkeys-lru |
从所有数据中根据 LRU 算法挑选数据淘汰 |
volatile-lru |
从设置了过期时间的数据中根据 LRU 算法挑选数据淘汰 |
allkeys-random |
从所有数据中随机挑选数据淘汰 |
volatile-random |
从设置了过期时间的数据中随机挑选数据淘汰 |
volatile-ttl |
从设置了过期时间的数据中,挑选越早过期的数据进行删除 |
allkeys-lfu |
从所有数据中根据 LFU 算法挑选数据淘汰(4.0及以上版本可用) |
volatile-lfu |
从设置了过期时间的数据中根据 LFU 算法挑选数据淘汰(4.0及以上版本可用) |
4.4、redis工作模式(高可用)
4.4.1、单机模式
单机模式是最简单的 Redis 工作模式。在单机模式下,Redis 只运行在单个节点上,数据存储在该节点的内存中。这种模式适用于小规模应用或开发环境
4.4.2、主从复制模式
主从复制模式通过将数据从主节点复制到一个或多个从节点来提高数据的可靠性和读取性能。主节点负责处理写入操作,从节点复制主节点的数据,并可以处理读取操作。主从复制模式适用于需要读取扩展和数据冗余的场景
4.4.3、哨兵模式
主从复制的基础上,引入了哨兵节点来监控主节点的状态。当主节点发生故障时,哨兵节点会自动将一个从节点升级为新的主节点,并将其他从节点重新配置为复制新的主节点。这种模式提供了故障转移和自动主节点切换的功能
4.4.4、集群模式
即使使用哨兵,redis每个实例也是全量存储,每个redis存储的内容都是完整的数据。cluster是为了解决单机Redis容量有限的问题,将数据按一定的规则分配到多台机器,提高并发量
4.5、redis发布订阅机制
Redis 发布订阅(Pus/Sub)是一种消息通信模式:发送者通过 PUBLISH发布消息,订阅者通过 SUBSCRIBE 订阅接收消息或通过UNSUBSCRIBE 取消订阅。
发布者和订阅者属于客户端,Channel 是 Redis 服务端,发布者将消息发布到频道,订阅这个频道的订阅者则收到消息。从而实现消息的广播和实时通知
Redis 的发布订阅机制是一种简单的消息传递方式,并不提供消息持久化和消息队列的功能。如果需要更高级的消息队列功能,可以考虑rabbitmq,kafka等
1 2 3 4 5 6
| # A订阅频道 SUBSCRIBE channel1
# B向频道发送消息,A就可以收到消息 PUBLISH channel1 "Redis PUBLISH test"
|
4.6、缓存穿透、击穿、雪崩
4.6.1、缓存穿透
缓存穿透:某些不存在的数据,被大量的查询访问,缓存层中没有这些数据的缓存,请求就直达存储层,造成宕机
解决方法:
1.返回空对象,将该key的空值返回给缓存层,缓存层会直接返回空对象。
2.布隆过滤器:将所有的key都存在过滤器中,在访问缓存层的时候会首先访问过滤器,如果过滤器中不存在这个值,那么直接返回空值。
布隆过滤器:它是一种类似哈希的数据结构,通过这个数据结构,可以快速的插入和查询,确定某个事件一定不存在或可能存在。特点是占用空间少,缺点是返回的结果是概率性
当一个元素加入集合时,就通过K个hash函数将这个映射成一个位数组中的K个点,把它们置为1。当查询时,只要检查这些点是否全为1,就能判断集合中是否可能存在。
如果k个点有任何一个0,则被检元素一定不在。如果都是1,则很可能存在,这个期望概率是可以设置
4.6.2、缓存击穿?
一份热点数据,在它缓存失效期间,大量的请求直接命中存储层
解决方法:
1.设置热点数据永不过期的策略。
2.加互斥锁,在一个请求访问时另一个不能访问,这样,在这个请求访问过后,缓存重建,其他线程就可以访问了
4.6.3、缓存雪崩?
当缓存服务器重启或者大量缓存集中在某一个时间段失效,这样在失效的时候,会给后端系统带来很大压力。导致系统崩溃。
解决方法:
1.不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀
2.在系统启动或低峰期(比如系统刚启动),提前加载热门数据到缓存中,避免在高峰期大量请求同时访问导致缓存失效
4.7、redis实现分布式锁
在分布式的环境下,会发生多个server并发修改同一个资源的情况,这种情况下,由于多个server是多个不同的JRE环境,而Java自带的锁局限于当前JRE,所以Java自带的锁机制在这个场景下是无效的,那么就需要我们自己来实现一个分布式锁
- 通过
set...nx...
命令,将加锁、过期命令编排到一起,把他们变成原子操作。完整命令:set key random-value nx ex seconds
其实目前通常所说的Setnx命令,并非单指Redis的setnx key value这条命令
一般代指Redis中对set命令加上nx参数进行使用
(1)nx ex 是set指令的两个参数: ex过期时间 nx只有key不存在时设置新的key/value
(2)key设置成随机数,避免一个线程过期时间内没释放掉锁,过期后有另一个线程获取到锁,该线程执行完后释放掉另一个线程获取的锁
(3)设置过期时间(EX)作用:如果客户端忘记解锁,那么这种情况就很有可能造成死锁
(4)NX的作用:避免重复获取锁
- 解锁的时候进行判断,是自己持有的锁才能释放,否则不能释放。另外判断,释放这两步需要保持原子性,所以通过Lua脚本将两个命令编排在一起,而整个Lua脚本的执行是原子的
if redis.call(“get”,KEYS[1]) == ARGV[1] then return redis.call(“del”,KEYS[1]) else return 0 end
这里为什么要用原子操作?
主要是怕误将其他客户端的锁解开。比如客户端A加锁,一段时间之后客户端A解锁,在进入unlock后执行jedis.del()之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()方法,则将客户端B的锁给解除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| import redis.clients.jedis.Jedis;
public class RedisLock { private Jedis jedis;
public RedisLock(Jedis jedis) { this.jedis = jedis; }
public boolean tryLock(String lockKey, String requestId, int expireTime) { String result = jedis.set(lockKey, requestId, "NX", "EX", expireTime); if ("OK".equals(result)) { return true; } return false; }
public boolean releaseLock(String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Long result = (Long) jedis.eval(script, 1, lockKey, requestId); if (result == 1) { return true; } return false; } }
|
另外可以通过Redisson框架,它的底层原理其实也是这个setnx
4.8、redis和数据库的双写一致性
假设有 A、B 两个并发请求:
- 先更新数据库再删除缓存:当请求 A 更新数据库之后,还未来得及进行缓存清除,此时请求 B 查询到并使用了 Cache 中的旧数据。
- 先删除缓存再更新数据库:当请求 A 执行清除缓存后,还未进行数据库更新,此时请求 B 进行查询,查到了旧数据并写入了 Cache
- 延迟双删
延迟双删是这样:先执行缓存清除操作,再执行数据库更新操作,延迟 N 秒之后再执行一次缓存清除操作,这样就不用担心缓存中的数据和数据库中的数据不一致了
一般来说,N 要大于一次写入缓存操作的时间,如果延迟时间小于写入缓存的时间,会导致请求 A 已经延迟清除了缓存,但是此时请求 B 缓存还未写入,具体是多少,就要结合自己的业务来统计这个数值了