阅读完需:约 72 分钟
Redis 是用单线程来处理多个客户端的访问,因此作为 Redis 的开发和运维人员需要了解 Redis 服务端和客户端的通信协议,以及主流编程语言的 Redis 客户端使用方法,同时还需要了解客户端管理的相应 API 以及开发运维中可能遇到的问题。
Redis 客户端通信协议
Redis制定了RESP(Redis Serialization Protocol,Redis序列化协议)实现客户端与服务端的正常交互,这种协议简单高效,既能够被机器解析,又容易被人类识别。
RESP
可以序列化不同的数据类型,如整型、字符串、数组还有一种特殊的Error
类型。需要执行的Redis
命令会封装为类似于字符串数组的请求然后通过Redis
客户端发送到Redis
服务端。Redis
服务端会基于特定的命令类型选择对应的一种数据类型进行回复。
RESP 响应内容
在RESP
中,发送的数据类型取决于数据报的第一个字节:
- 单行字符串的第一个字节为
+
。 - 错误消息的第一个字节为
-
。 - 整型数字的第一个字节为
:
。 - 定长字符串的第一个字节为
$
。 -
RESP
数组的第一个字节为*
。
数据类型 | 本文翻译名称 | 基本特征 | 例子 |
Simple String |
单行字符串 | 第一个字节是+ ,最后两个字节是\r\n ,其他字节是字符串内容 |
+OK\r\n |
Error |
错误消息 | 第一个字节是- ,最后两个字节是\r\n ,其他字节是异常消息的文本内容 |
-ERR\r\n |
Integer |
整型数字 | 第一个字节是: ,最后两个字节是\r\n ,其他字节是数字的文本内容 |
:100\r\n |
Bulk String |
定长字符串 | 第一个字节是$ ,紧接着的字节是内容字符串长度\r\n ,最后两个字节是\r\n ,其他字节是字符串内容 |
$4\r\ndoge\r\n |
Array |
RESP 数组 |
第一个字节是* ,紧接着的字节是元素个数\r\n ,最后两个字节是\r\n ,其他字节是各个元素的内容,每个元素可以是任意一种数据类型 |
*2\r\n:100\r\n$4\r\ndoge\r\n |
发送的命令格式如下,CRLF代表”\r\n”:
*<参数数量> CRLF
$<参数1的字节数量> CRLF
<参数1> CRLF
...
$<参数N的字节数量> CRLF
<参数N> CRLF
以set hello world
这个命令为例,发送的内容就是这样的:
*3
$3
SET
$5
hello
$5
world
第一行*3表示有3个参数,3表示接下来的一个参数有3个字节,接下来是参数,3表示接下来的一个参数有3个字节,接下来是参数,5表示下一个参数有5个字节,接下来是参数,$5表示下一个参数有5个字节,接下来是参数。
所以set hello world最终发送给redis服务器的命令是:
*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n
RESP 响应内容
Redis的返回结果类型分为以下五种:
正确回复:在RESP中第一个字节为"+"
错误回复:在RESP中第一个字节为"-"
整数回复:在RESP中第一个字节为":"
字符串回复:在RESP中第一个字节为"$"
多条字符串回复:在RESP中第一个字节为"*"
(+) 表示一个正确的状态信息,具体信息是当前行+后面的字符。
(-) 表示一个错误信息,具体信息是当前行-后面的字符。
(*) 表示消息体总共有多少行,不包括当前行,*后面是具体的行数。
($) 表示下一行数据长度,不包括换行符长度\r\n,$后面则是对应的长度的数据。
(:) 表示返回一个数值,:后面是相应的数字节符。
有了这个协议,我们就可以编写程序来和 Redis 服务端进行通信。由于 Redis 的流行,已经存在了很多流行的开源客户端。本文主要选择 Java 领域 Redis 官方推荐的客户端进行介绍。
Redis 的 Java 客户端
Redis 官方推荐的 Java 客户端有Jedis、lettuce 和 Redisson。
Jedis
Jedis 是老牌的 Redis 的 Java 实现客户端,提供了比较全面的 Redis 命令的支持。
- 优点:
- 支持全面的 Redis 操作特性(可以理解为API比较全面)。
- 缺点:
- 使用阻塞的 I/O,且其方法调用都是同步的,程序流需要等到 sockets 处理完 I/O 才能执行,不支持异步;
- Jedis 客户端实例不是线程安全的,所以需要通过连接池来使用 Jedis。
lettuce
lettuce ([ˈletɪs]),是一种可扩展的线程安全的 Redis 客户端,支持异步模式。如果避免阻塞和事务操作,如BLPOP和MULTI/EXEC,多个线程就可以共享一个连接。lettuce 底层基于 Netty,支持高级的 Redis 特性,比如哨兵,集群,管道,自动重新连接和Redis数据模型。
- 优点:
- 支持同步异步通信模式;
- Lettuce 的 API 是线程安全的,如果不是执行阻塞和事务操作,如BLPOP和MULTI/EXEC,多个线程就可以共享一个连接。
Redisson
Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务。其中包括( BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson 提供了使用Redis 的最简单和最便捷的方法。
Redisson 的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
- 优点
- 使用者对 Redis 的关注分离,可以类比 Spring 框架,这些框架搭建了应用程序的基础框架和功能,提升开发效率,让开发者有更多的时间来关注业务逻辑;
- 提供很多分布式相关操作服务,例如,分布式锁,分布式集合,可通过Redis支持延迟队列等。
- 缺点
- Redisson 对字符串的操作支持比较差。
文档地址
https://github.com/redisson/redisson
https://www.bookstack.cn/read/redisson-doc-cn/README.md
https://www.cnblogs.com/crazymakercircle/p/14731826.html#autoid-h2-7-3-0
Redisson 项目介绍
Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。充分的利用了Redis键值数据库提供的一系列优势,基于Java实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。
Redisson采用了基于NIO的Netty框架,不仅能作为Redis底层驱动客户端,具备提供对Redis各种组态形式的连接功能,对Redis命令能以同步发送、异步形式发送、异步流形式发送或管道形式发送的功能,LUA脚本执行处理,以及处理返回结果的功能,还在此基础上融入了更高级的应用方案,不但将原生的Redis Hash
,List
,Set
,String
,Geo
,HyperLogLog
等数据结构封装为Java里大家最熟悉的映射(Map)
,列表(List)
,集(Set)
,通用对象桶(Object Bucket)
,地理空间对象桶(Geospatial Bucket)
,基数估计算法(HyperLogLog)
等结构,在这基础上还提供了分布式的多值映射(Multimap)
,本地缓存映射(LocalCachedMap)
,有序集(SortedSet)
,计分排序集(ScoredSortedSet)
,字典排序集(LexSortedSet)
,列队(Queue)
,阻塞队列(Blocking Queue)
,有界阻塞列队(Bounded Blocking Queue)
,双端队列(Deque)
,阻塞双端列队(Blocking Deque)
,阻塞公平列队(Blocking Fair Queue)
,延迟列队(Delayed Queue)
,布隆过滤器(Bloom Filter)
,原子整长形(AtomicLong)
,原子双精度浮点数(AtomicDouble)
,BitSet
等Redis原本没有的分布式数据结构。不仅如此,Redisson还实现了Redis文档中提到像分布式锁Lock
这样的更高阶应用场景。事实上Redisson并没有不止步于此,在分布式锁的基础上还提供了联锁(MultiLock)
,读写锁(ReadWriteLock)
,公平锁(Fair Lock)
,红锁(RedLock)
,信号量(Semaphore)
,可过期性信号量(PermitExpirableSemaphore)
和闭锁(CountDownLatch)
这些实际当中对多线程高并发应用至关重要的基本部件。正是通过实现基于Redis的高阶应用方案,使Redisson成为构建分布式系统的重要工具。
在提供这些工具的过程当中,Redisson广泛的使用了承载于Redis订阅发布功能之上的分布式话题(Topic)
功能。使得即便是在复杂的分布式环境下,Redisson的各个实例仍然具有能够保持相互沟通的能力。在以这为前提下,结合了自身独有的功能完善的分布式工具,Redisson进而提供了像分布式远程服务(Remote Service)
,分布式执行服务(Executor Service)
和分布式调度任务服务(Scheduler Service)
这样适用于不同场景的分布式服务。使得Redisson成为了一个基于Redis的Java中间件(Middleware)。
Redisson Node
的出现作为驻内存数据网格的重要特性之一,使Redisson能够独立作为一个任务处理节点,以系统服务的方式运行并自动加入Redisson集群,具备集群节点弹性增减的能力。然而在真正意义上让Redisson发展成为一个完整的驻内存数据网格的,还是具有将基本上任何复杂、多维结构的对象都能变为分布式对象的分布式实时对象服务(Live Object Service)
,以及与之相结合的,在分布式环境中支持跨节点对象引用(Distributed Object Reference)的功能。这些特色功能使Redisson具备了在分布式环境中,为Java程序提供了堆外空间(Off-Heap Memory)储存对象的能力。
Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。如果您现在正在使用其他的Redis的Java客户端,希望Redis命令和Redisson对象匹配列表 能够帮助您轻松的将现有代码迁徙到Redisson里来。如果目前Redis的应用场景还仅限于作为缓存使用,您也可以将Redisson轻松的整合到像Spring和Hibernate这样的常用框架里。除此外您也可以间接的通过Java缓存标准规范JCache API (JSR-107)接口来使用Redisson。
Redisson生而具有的高性能,分布式特性和丰富的结构等特点恰巧与Tomcat这类服务程序对会话管理器(Session Manager)的要求相吻合。利用这样的特点,Redisson专门为Tomcat提供了会话管理器(Tomcat Session Manager)。
在此不难看出,Redisson同其他Redis Java客户端有着很大的区别,相比之下其他客户端提供的功能还仅仅停留在作为数据库驱动层面上,比如仅针对Redis提供连接方式,发送命令和处理返回结果等。像上面这些高层次的应用则只能依靠使用者自行实现。
Redisson支持Redis 2.8以上版本,支持Java1.6+以上版本。
Redisson 同步接口外,还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
RedissonClient
、RedissonReactiveClient
和RedissonRxClient
实例本身和Redisson
提供的所有分布式对象都是线程安全的。
Redisson为每个操作都提供了自动重试策略,当某个命令执行失败时,Redisson会自动进行重试。自动重试策略可以通过修改retryAttempts(默认值:3)参数和retryInterval(默认值:1000毫秒)参数来进行优化调整。当等待时间达到retryInterval指定的时间间隔以后,将自动重试下一次。全部重试失败以后将抛出错误。
Redisson框架提供的几乎所有对象都包含了同步
和异步
相互匹配的方法。这些对象都可以通过RedissonClient
接口获取。同时还为大部分Redisson对象提供了满足异步流处理标准
的程序接口RedissonReactiveClient
。除此外还提供了RxJava2
规范的RedissonRxClient
程序接口。
以下是关于使用RAtomicLong
对象的范例:
RedissonClient client = Redisson.create(config);
RAtomicLong longObject = client.getAtomicLong('myLong');
// 同步执行方式
longObject.compareAndSet(3, 401);
// 异步执行方式
RFuture<Boolean> result = longObject.compareAndSetAsync(3, 401);
RedissonReactiveClient client = Redisson.createReactive(config);
RAtomicLongReactive longObject = client.getAtomicLong('myLong');
// 异步流执行方式
Mono<Boolean> result = longObject.compareAndSet(3, 401);
RedissonRxClient client = Redisson.createRx(config);
RAtomicLongRx longObject= client.getAtomicLong("myLong");
// RxJava2方式
Flowable<Boolean result = longObject.compareAndSet(3, 401);
几乎所有的Redisson对象都实现了一个异步接口,异步接口提供的方法名称与其同步接口的方法名称相互匹配。例如:
// RAtomicLong接口继承了RAtomicLongAsync接口
RAtomicLongAsync longObject = client.getAtomicLong("myLong");
RFuture<Boolean> future = longObject.compareAndSetAsync(1, 401);
异步执行的方法都会返回一个实现了RFuture
接口的对象。该对象同时提供了java.util.concurrent.CompletionStage
和java.util.concurrent.Future
两个异步接口。
future.whenComplete((res, exception) -> {
// ...
});
// 或者
future.thenAccept(res -> {
// 处理返回
}).exceptionally(exception -> {
// 处理错误
});
Redisson
为大多数分布式数据结构提供了满足Reactor
项目的异步流处理标准的程序接口。该接口通过两种方式实现:
- 基于
Project Reactor
标准的实现方式。使用范例如下:
RedissonReactiveClient client = Redisson.createReactive(config);
RAtomicLongReactive atomicLong = client.getAtomicLong("myLong");
Mono<Boolean> cs = longObject.compareAndSet(10, 91);
Mono<Long> get = longObject.get();
Publisher<Long> getPublisher = longObject.get();
- 基于
RxJava2
标准的实现方式。使用范例如下:
RedissonRxClient client = Redisson.createRx(config);
RAtomicLongRx atomicLong = client.getAtomicLong("myLong");
Single<Boolean> cs = longObject.compareAndSet(10, 91);
Single<Long> get = longObject.get();
映射(Map)
基于Redis的Redisson的分布式映射结构的RMap
Java对象实现了java.util.concurrent.ConcurrentMap
接口和java.util.Map
接口。与HashMap不同的是,RMap保持了元素的插入顺序。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295
个。
@PostConstruct
public void map() throws ExecutionException, InterruptedException {
// 按名称返回map实例
RMap<String, MultipartData> map = redissonClient.getMap("anyMap123");
// 存储由指定键映射的指定值。如果具有指定键的映射项已存在,则返回上一个值。
MultipartData prevObject = map.put("123", new MultipartData().include("345fff","thfgd"));
// 按指定的键移除映射项并返回值。
MultipartData obj = map.remove("123");
// 存储由指定键映射的指定值,工作速度比put(Object,Object) 快,但不返回以前的值。
map.fastPut("321", new MultipartData().include("fdgfdgdfg7","wsdsdjfkdsfdsf"));
// 删除由指定键映射的映射项,比remove(Object) 更快,但不返回值
map.fastRemove("321");
// 异步,存储由指定键映射的指定值。如果具有指定键的映射项已存在,则返回上一个值。
RFuture<MultipartData> putAsyncFuture = map.putAsync("123A",new MultipartData().include("999999","00000"));
// 异步,存储由指定键映射的指定值,工作速度比putAsync (对象,对象) 快,但不返回以前的值。
RFuture<Boolean> fastPutAsyncFuture = map.fastPutAsync("321A",new MultipartData().include("ooooooo---","00000"));
putAsyncFuture.get();
fastPutAsyncFuture.get();
// 存储由指定键映射的指定值,工作速度比putAsync (对象,对象) 快,但不返回以前的值。
map.fastPutAsync("321E", new MultipartData());
// 删除由指定键映射的映射项,工作速度比removeAsync(Object) 快,但不返回值
map.fastRemoveAsync("321E");
}
案例
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testListExamples() {
// 默认连接上 127.0.0.1:6379
RedissonClient client = redissonManager.getRedisson();
// RMap 继承了 java.util.concurrent.ConcurrentMap 接口
RMap<String, Object> map = client.getMap("redission:test:personalMap");
map.put("name", "张三");
map.put("address", "北京");
map.put("age", new Integer(50));
System.out.println("Map size: " + map.size());
boolean contains = map.containsKey("age");
System.out.println("Is map contains key 'age': " + contains);
String value = String.valueOf(map.get("name"));
System.out.println("Value mapped by key 'name': " + value);
client.shutdown();
}
}
结果:
Map size: 3
Is map contains key 'age': true
Value mapped by key 'name': 张三
Map eviction
元素淘汰功能(Eviction)
Redisson的分布式的RMapCache
Java对象在基于RMap
的前提下实现了针对单个元素的淘汰机制。同时仍然保留了元素的插入顺序。由于RMapCache
是基于RMap
实现的,使它同时继承了java.util.concurrent.ConcurrentMap
接口和java.util.Map
接口。
目前的Redis自身并不支持散列(Hash)当中的元素淘汰,因此所有过期元素都是通过org.redisson.EvictionScheduler
实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理300个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到1小时之间。比如该次清理时删除了300条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。
RMapCache<String, MultipartData> map2 = redissonClient.getMapCache("anyMap456");
// 有效时间 ttl = 10分钟
map2.put("key1", new MultipartData(), 10, TimeUnit.MINUTES);
// 有效时间 ttl = 10分钟, 最长闲置时间 maxIdleTime = 10秒钟
map2.put("key1", new MultipartData(), 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);
// 有效时间 = 3 秒钟
map2.putIfAbsent("key2", new MultipartData(), 3, TimeUnit.SECONDS);
// 有效时间 ttl = 40秒钟, 最长闲置时间 maxIdleTime = 10秒钟
map2.putIfAbsent("key2", new MultipartData(), 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
List
基于Redis的Redisson分布式列表(List)结构的RList
Java对象在实现了java.util.List
接口的同时,确保了元素插入时的顺序。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295
个。
public void list() {
RList<MultipartData> list = redissonClient.getList("anyList");
list.add(new MultipartData());
list.get(0);
list.remove(new MultipartData());
}
案例
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testListExamples() {
// 默认连接上 127.0.0.1:6379
RedissonClient client = redissonManager.getRedisson();
// RList 继承了 java.util.List 接口
RList<String> nameList = client.getList("redission:test:nameList");
nameList.clear();
nameList.add("张三");
nameList.add("李四");
nameList.add("王五");
nameList.remove(-1);
System.out.println("List size: " + nameList.size());
boolean contains = nameList.contains("李四");
System.out.println("Is list contains name '李四': " + contains);
nameList.forEach(System.out::println);
client.shutdown();
}
}
结果:
List size: 2
Is list contains name '李四': true
张三
李四
集(Set)
基于Redis的Redisson的分布式Set结构的RSet
Java对象实现了java.util.Set
接口。通过元素的相互状态比较保证了每个元素的唯一性。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295
个。
@PostConstruct
public void set() {
RSet<MultipartData> set = redissonClient.getSet("anySet");
set.add(new MultipartData());
set.remove(new MultipartData());
}
Set eviction
基于Redis的Redisson的分布式RSetCache
Java对象在基于RSet
的前提下实现了针对单个元素的淘汰机制。由于RSetCache
是基于RSet
实现的,使它还集成了java.util.Set
接口。
目前的Redis自身并不支持Set当中的元素淘汰,因此所有过期元素都是通过org.redisson.EvictionScheduler
实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理100个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到2小时之间。比如该次清理时删除了100条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。
RSetCache<MultipartData> setc = redissonClient.getSetCache("anySet");
// ttl = 10 seconds
setc.add(new MultipartData(), 10, TimeUnit.SECONDS);
通用对象桶(Object Bucket)
Redisson的分布式RBucket
Java对象是一种通用对象桶可以用来存放任类型的对象。
RBucket<AnyObject> bucket = redisson.getBucket("anyObject");
bucket.set(new AnyObject(1));
AnyObject obj = bucket.get();
bucket.trySet(new AnyObject(3));
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
bucket.getAndSet(new AnyObject(6));
还可以通过RBuckets接口实现批量操作多个RBucket对象:
RBuckets buckets = redisson.getBuckets();
List<RBucket<V>> foundBuckets = buckets.find("myBucket*");
Map<String, V> loadedBuckets = buckets.get("myBucket1", "myBucket2", "myBucket3");
Map<String, Object> map = new HashMap<>();
map.put("myBucket1", new MyObject());
map.put("myBucket2", new MyObject());
// 利用Redis的事务特性,同时保存所有的通用对象桶,如果任意一个通用对象桶已经存在则放弃保存其他所有数据。
buckets.trySet(map);
// 同时保存全部通用对象桶。
buckets.set(map);
案例
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testRBucketExamples() {
// 默认连接上 127.0.0.1:6379
RedissonClient client = redissonManager.getRedisson();
// RList 继承了 java.util.List 接口
RBucket<String> rstring = client.getBucket("redission:test:bucket:string");
rstring.set("this is a string");
RBucket<UserDTO> ruser = client.getBucket("redission:test:bucket:user");
UserDTO dto = new UserDTO();
dto.setToken(UUID.randomUUID().toString());
ruser.set(dto);
System.out.println("string is: " + rstring.get());
System.out.println("dto is: " + ruser.get());
client.shutdown();
}
}
结果:
string is: this is a string
dto is: UserDTO(id=null, userId=null, username=null, password=null, nickname=null, token=183b6eeb-65a8-4b2a-80c6-cf17c08332ce, createTime=null, updateTime=null, headImgUrl=null, mobile=null, sex=null, enabled=null, type=null, openId=null, isDel=false)
地理空间对象桶(Geospatial Bucket)
Redisson的分布式RGeo
Java对象是一种专门用来储存与地理位置有关的对象桶。
RGeo<String> geo = redisson.getGeo("test");
geo.add(new GeoEntry(13.361389, 38.115556, "Palermo"),
new GeoEntry(15.087269, 37.502669, "Catania"));
geo.addAsync(37.618423, 55.751244, "Moscow");
Double distance = geo.dist("Palermo", "Catania", GeoUnit.METERS);
geo.hashAsync("Palermo", "Catania");
Map<String, GeoPosition> positions = geo.pos("test2", "Palermo", "test3", "Catania", "test1");
List<String> cities = geo.radius(15, 37, 200, GeoUnit.KILOMETERS);
Map<String, GeoPosition> citiesWithPositions = geo.radiusWithPosition(15, 37, 200, GeoUnit.KILOMETERS);
基数估计算法(HyperLogLog)
Redisson利用Redis实现了Java分布式基数估计算法(HyperLogLog)对象。该对象可以在有限的空间内通过概率算法统计大量的数据。
RHyperLogLog<Integer> log = redisson.getHyperLogLog("log");
log.add(1);
log.add(2);
log.add(3);
log.count();
MultiMap
基于Redis的Redisson的分布式RMultimap
Java对象允许Map中的一个字段值包含多个元素。 字段总数受Redis限制,每个Multimap最多允许有4 294 967 295
个不同字段。
基于集(Set)的多值映射(Multimap)
基于Set的Multimap不允许一个字段值包含有重复的元素。
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("myMultimap");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
Set<SimpleValue> allValues = map.get(new SimpleKey("0"));
List<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
Set<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);
Set<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));
基于列表(List)的多值映射(Multimap)
基于List的Multimap在保持插入顺序的同时允许一个字段下包含重复的元素。
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
List<SimpleValue> allValues = map.get(new SimpleKey("0"));
Collection<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
List<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);
List<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));
多值映射(Multimap)淘汰机制(Eviction)
Multimap对象的淘汰机制是通过不同的接口来实现的。它们是RSetMultimapCache
接口和RListMultimapCache
接口,分别对应的是Set和List的Multimaps。
所有过期元素都是通过org.redisson.EvictionScheduler
实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理100个过期元素。
RSetMultimapCache<String, String> multimap = redisson.getSetMultimapCache("myMultimap");
multimap.put("1", "a");
multimap.put("1", "b");
multimap.put("1", "c");
multimap.put("2", "e");
multimap.put("2", "f");
multimap.expireKey("2", 10, TimeUnit.MINUTES);
有序集(SortedSet)
基于Redis的Redisson的分布式RSortedSet
Java对象实现了java.util.SortedSet
接口。在保证元素唯一性的前提下,通过比较器(Comparator)接口实现了对元素的排序。
RSortedSet<Integer> set = redisson.getSortedSet("anySet");
set.trySetComparator(new MyComparator()); // 配置元素比较器
set.add(3);
set.add(1);
set.add(2);
set.removeAsync(0);
set.addAsync(5);
计分排序集(ScoredSortedSet)
基于Redis的Redisson的分布式RScoredSortedSet
Java对象是一个可以按插入时指定的元素评分排序的集合。它同时还保证了元素的唯一性。
RScoredSortedSet<SomeObject> set = redisson.getScoredSortedSet("simple");
set.add(0.13, new SomeObject(a, b));
set.addAsync(0.251, new SomeObject(c, d));
set.add(0.302, new SomeObject(g, d));
set.pollFirst();
set.pollLast();
int index = set.rank(new SomeObject(g, d)); // 获取元素在集合中的位置
Double score = set.getScore(new SomeObject(g, d)); // 获取元素的评分
字典排序集(LexSortedSet)
基于Redis的Redisson的分布式RLexSortedSet
Java对象在实现了java.util.Set<String>
接口的同时,将其中的所有字符串元素按照字典顺序排列。它公式还保证了字符串元素的唯一性。
RLexSortedSet set = redisson.getLexSortedSet("simple");
set.add("d");
set.addAsync("e");
set.add("f");
set.lexRangeTail("d", false);
set.lexCountHead("e");
set.lexRange("d", true, "z", false);
队列(Queue)
基于Redis的Redisson分布式无界队列(Queue)结构的RQueue
Java对象实现了java.util.Queue
接口。尽管RQueue
对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295
个。
RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
queue.add(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
双端队列(Deque)
基于Redis的Redisson分布式无界双端队列(Deque)结构的RDeque
Java对象实现了java.util.Deque
接口。尽管RDeque
对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295
个。
RDeque<SomeObject> queue = redisson.getDeque("anyDeque");
queue.addFirst(new SomeObject());
queue.addLast(new SomeObject());
SomeObject obj = queue.removeFirst();
SomeObject someObj = queue.removeLast();
阻塞队列(Blocking Queue)
基于Redis的Redisson分布式无界阻塞队列(Blocking Queue)结构的RBlockingQueue
Java对象实现了java.util.concurrent.BlockingQueue
接口。尽管RBlockingQueue
对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295
个。
RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
queue.offer(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
poll
, pollFromAny
, pollLastAndOfferFirstTo
和take
方法内部采用话题订阅发布实现,在Redis节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。
有界阻塞队列(Bounded Blocking Queue)
基于Redis的Redisson分布式有界阻塞队列(Bounded Blocking Queue)结构的RBoundedBlockingQueue
Java对象实现了java.util.concurrent.BlockingQueue
接口。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295
个。队列的初始容量(边界)必须在使用前设定好。
RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
// 如果初始容量(边界)设定成功则返回`真(true)`,
// 如果初始容量(边界)已近存在则返回`假(false)`。
queue.trySetCapacity(2);
queue.offer(new SomeObject(1));
queue.offer(new SomeObject(2));
// 此时容量已满,下面代码将会被阻塞,直到有空闲为止。
queue.put(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
poll
, pollFromAny
, pollLastAndOfferFirstTo
和take
方法内部采用话题订阅发布实现,在Redis节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。
阻塞双端队列(Blocking Deque)
基于Redis的Redisson分布式无界阻塞双端队列(Blocking Deque)结构的RBlockingDeque
Java对象实现了java.util.concurrent.BlockingDeque
接口。尽管RBlockingDeque
对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295
个。
RBlockingDeque<Integer> deque = redisson.getBlockingDeque("anyDeque");
deque.putFirst(1);
deque.putLast(2);
Integer firstValue = queue.takeFirst();
Integer lastValue = queue.takeLast();
Integer firstValue = queue.pollFirst(10, TimeUnit.MINUTES);
Integer lastValue = queue.pollLast(3, TimeUnit.MINUTES);
延迟队列(Delayed Queue)
基于Redis的Redisson分布式延迟队列(Delayed Queue)结构的RDelayedQueue
Java对象在实现了RQueue
接口的基础上提供了向队列按要求延迟添加项目的功能。该功能可以用来实现消息传送延迟按几何增长或几何衰减的发送策略。
RQueue<String> distinationQueue = ...
RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
// 10秒钟以后将消息发送到指定队列
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// 一分钟以后将消息发送到指定队列
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
在该对象不再需要的情况下,应该主动销毁。仅在相关的Redisson对象也需要关闭的时候可以不用主动销毁。
RDelayedQueue<String> delayedQueue = ...
delayedQueue.destroy();
案例
@GetMapping("test2")
public void test() throws InterruptedException {
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue("dest_queue1");
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
new Thread(() -> {
while(true) {
try {
//阻塞队列有数据就返回,否则wait
System.out.println( blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
for(int i=1;i<=5;i++) {
// 向阻塞队列放入数据
delayedQueue.offer("fffffffff"+i, 30+i*5, TimeUnit.SECONDS);
}
}
优先队列(Priority Queue)
基于Redis的Redisson分布式优先队列(Priority Queue)Java对象实现了java.util.Queue
的接口。可以通过比较器(Comparator)接口来对元素排序。
RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.add(3);
queue.add(1);
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.poll();
优先双端队列(Priority Deque)
基于Redis的Redisson分布式优先双端队列(Priority Deque)Java对象实现了java.util.Deque
的接口。可以通过比较器(Comparator)接口来对元素排序。
RPriorityDeque<Integer> queue = redisson.getPriorityDeque("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.addLast(3);
queue.addFirst(1);
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.pollFirst();
queue.pollLast();
优先阻塞队列(Priority Blocking Queue)
基于Redis的分布式无界优先阻塞队列(Priority Blocking Queue)Java对象的结构与java.util.concurrent.PriorityBlockingQueue
类似。可以通过比较器(Comparator)接口来对元素排序。PriorityBlockingQueue
的最大容量是4 294 967 295
个元素。
RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.add(3);
queue.add(1);
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.take();
当Redis服务端断线重连以后,或Redis服务端发生主从切换,并重新建立连接后,断线时正在使用poll
,pollLastAndOfferFirstTo
或take
方法的对象Redisson将自动再次为其订阅相关的话题。
优先阻塞双端队列(Priority Blocking Deque)
基于Redis的分布式无界优先阻塞双端队列(Priority Blocking Deque)Java对象实现了java.util.Deque
的接口。addLast
、 addFirst
、push
方法不能再这个对里使用。PriorityBlockingDeque
的最大容量是4 294 967 295
个元素。
RPriorityBlockingDeque<Integer> queue = redisson.getPriorityBlockingDeque("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.pollFirst();
queue.pollLast();
queue.takeFirst();
queue.takeLast();
当Redis服务端断线重连以后,或Redis服务端发生主从切换,并重新建立连接后,断线时正在使用poll
,pollLastAndOfferFirstTo
或take
方法的对象Redisson将自动再次为其订阅相关的话题。
原子整长形(AtomicLong)
Redisson的分布式整长形RAtomicLong
对象和Java中的java.util.concurrent.atomic.AtomicLong
对象类似。
RAtomicLong atomicLong = redisson.getAtomicLong("myAtomicLong");
atomicLong.set(3);
atomicLong.incrementAndGet();
atomicLong.get();
案例
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testRAtomicLongExamples() {
// 默认连接上 127.0.0.1:6379
RedissonClient redisson = redissonManager.getRedisson();
RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong");
// 线程数
final int threads = 10;
// 每条线程的执行轮数
final int turns = 1000;
ExecutorService pool = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++)
{
pool.submit(() ->
{
try
{
for (int j = 0; j < turns; j++)
{
atomicLong.incrementAndGet();
}
} catch (Exception e)
{
e.printStackTrace();
}
});
}
ThreadUtil.sleepSeconds(5);
System.out.println("atomicLong: " + atomicLong.get());
redisson.shutdown();
}
}
结果:
atomicLong: 10000
原子双精度浮点(AtomicDouble)
Redisson还提供了分布式原子双精度浮点RAtomicDouble
,弥补了Java自身的不足。
RAtomicDouble atomicDouble = redisson.getAtomicDouble("myAtomicDouble");
atomicDouble.set(2.81);
atomicDouble.addAndGet(4.11);
atomicDouble.get();
BitSet
Redisson的分布式RBitSet
Java对象采用了与java.util.BiteSet
类似结构的设计风格。可以理解为它是一个分布式的可伸缩式位向量。需要注意的是RBitSet
的大小受Redis限制,最大长度为4 294 967 295
。
RBitSet set = redisson.getBitSet("simpleBitset");
set.set(0, true);
set.set(1812, false);
set.clear(0);
set.addAsync("e");
set.xor("anotherBitset");
布隆过滤器(Bloom Filter)
Redisson利用Redis实现了Java分布式布隆过滤器(Bloom Filter)。所含最大比特数量为2^32
。
RBloomFilter<SomeObject> bloomFilter = redisson.getBloomFilter("sample");
// 初始化布隆过滤器,预计统计元素数量为55000000,期望误差率为0.03
bloomFilter.tryInit(55000000L, 0.03);
bloomFilter.add(new SomeObject("field1Value", "field2Value"));
bloomFilter.add(new SomeObject("field5Value", "field8Value"));
bloomFilter.contains(new SomeObject("field1Value", "field8Value"));
分布式锁
可重入锁(Reentrant Lock)
基于Redis的Redisson分布式可重入锁RLock
Java对象实现了java.util.concurrent.locks.Lock
接口。
RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();
大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout
来另行指定。
另外Redisson还通过加锁的方法提供了leaseTime
的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
Redisson同时还为分布式锁提供了异步执行的相关方法:
RLock lock = redisson.getLock("anyLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
RLock
对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException
错误。
案例
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testLockExamples() {
// 默认连接上 127.0.0.1:6379
RedissonClient redisson = redissonManager.getRedisson();
// RLock 继承了 java.util.concurrent.locks.Lock 接口
RLock lock = redisson.getLock("redission:test:lock:1");
final int[] count = {0};
int threads = 10;
ExecutorService pool = Executors.newFixedThreadPool(10);
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(() ->
{
for (int j = 0; j < 1000; j++) {
lock.lock();
count[0]++;
lock.unlock();
}
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("10个线程每个累加1000为: = " + count[0]);
//输出统计结果
float time = System.currentTimeMillis() - start;
System.out.println("运行的时长为:" + time);
System.out.println("每一次执行的时长为:" + time/count[0]);
}
}
结果:
10个线程每个累加1000为: = 10000
运行的时长为:14172.0
每一次执行的时长为:1.4172
案例二
public void testReentrantLock(RedissonClient redisson){
RLock lock = redisson.getLock("anyLock");
try{
// 1. 最常见的使用方法
//lock.lock();
// 2. 支持过期解锁功能,10秒钟以后自动解锁, 无需调用unlock方法手动解锁
//lock.lock(10, TimeUnit.SECONDS);
// 3. 尝试加锁,最多等待3秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(3, 10, TimeUnit.SECONDS);
if(res){ //成功
// do your business
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
异步执行的相关方法
public void testAsyncReentrantLock(RedissonClient redisson){
RLock lock = redisson.getLock("anyLock");
try{
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(3, 10, TimeUnit.SECONDS);
if(res.get()){
// do your business
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
公平锁(Fair Lock)
基于Redis的Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock
接口的一种RLock
对象。
它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。
RLock fairLock = redisson.getFairLock("anyLock");
// 最常见的使用方法
fairLock.lock();
也会提供看门狗机制,另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
fairLock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
...
fairLock.unlock();
Redisson同时还为分布式可重入公平锁提供了异步执行的相关方法:
RLock fairLock = redisson.getFairLock("anyLock");
fairLock.lockAsync();
fairLock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);
案例
public void testFairLock(RedissonClient redisson){
RLock fairLock = redisson.getFairLock("anyLock");
try{
// 最常见的使用方法
fairLock.lock();
// 支持过期解锁功能, 10秒钟以后自动解锁,无需调用unlock方法手动解锁
fairLock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
fairLock.unlock();
}
}
联锁(MultiLock)
基于Redis的Redisson分布式联锁RedissonMultiLock
对象可以将多个RLock
对象关联为一个联锁,每个RLock
对象实例可以来自于不同的Redisson实例。
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();
同样也提供了一样的看门狗机制,指定加锁的时间
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开
lock.lock(10, TimeUnit.SECONDS);
// 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
案例
public void testMultiLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){
RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
try {
// 同时加锁:lock1 lock2 lock3, 所有的锁都上锁成功才算成功。
lock.lock();
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
红锁(RedLock)
基于Redis的Redisson红锁RedissonRedLock
对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock
对象关联为一个红锁,每个RLock
对象实例可以来自于不同的Redisson实例。
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();
指定加锁的时间
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开
lock.lock(10, TimeUnit.SECONDS);
// 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
案例
public void testRedLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){
RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
try {
// 同时加锁:lock1 lock2 lock3, 红锁在大部分节点上加锁成功就算成功。
lock.lock();
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
读写锁(ReadWriteLock)
基于Redis的Redisson分布式可重入读写锁ReadWriteLock
Java对象实现了java.util.concurrent.locks.ReadWriteLock
接口。其中读锁和写锁都继承了RLock
接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
来指定加锁的时间
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
- 读锁 + 读锁:相当于没加锁,可以并发读。
- 读锁 + 写锁:写锁需要等待读锁释放锁。
- 写锁 + 写锁:互斥,需要等待对方的锁释放。
- 写锁 + 读锁:读锁需要等待写锁释放。
信号量(Semaphore)
基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore
采用了与java.util.concurrent.Semaphore
相似的接口和用法。
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();
可过期性信号量(PermitExpirableSemaphore)
基于Redis的Redisson可过期性信号量(PermitExpirableSemaphore
)是在RSemaphore
对象的基础上,为每个信号增加了一个过期时间。每个信号可以通过独立的ID来辨识,释放时只能通过提交这个ID才能释放。
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 获取一个信号,有效期只有2秒钟。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);
闭锁(CountDownLatch)
基于Redisson的Redisson分布式闭锁(CountDownLatch
)Java对象RCountDownLatch
采用了与java.util.concurrent.CountDownLatch
相似的接口和用法。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
分布式锁详解
在分析前先来个实操案例,这里用的是与springboot结合的依赖,依赖需要对应boot的版本,这里3.17.7对应的是boot 2.7.2的版本
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.7</version>
</dependency>
从两个问题入手
- 多个线程抢占锁,后面锁需要等待吗?
- 如果抢占到锁的线程所在的服务停了,锁会不会被释放?
验证一:可重入锁是阻塞的吗?
测试代码,代码的流程就是设置lock-lock
锁,然后加锁,打印线程 ID,等待 10 秒后释放锁,最后返回响应:“test lock ok”。
@RestController
public class RedisController {
@Autowired
private RedissonClient redissonClient;
@GetMapping("test1")
public String TestLock() {
// 1.获取锁,只要锁的名字一样,获取到的锁就是同一把锁。
RLock lock = redissonClient.getLock("lock-lock");
// 2.加锁
lock.lock();
try {
System.out.println("加锁成功,执行后续代码。线程 ID:" + Thread.currentThread().getId());
Thread.sleep(10000);
} catch (Exception e) {
//TODO
} finally {
lock.unlock();
// 3.解锁
System.out.println("Finally,释放锁成功。线程 ID:" + Thread.currentThread().getId());
}
return "test lock ok";
}
}
请求多次接口的测试结果
加锁成功,执行后续代码。线程 ID:73
Finally,释放锁成功。线程 ID:73
加锁成功,执行后续代码。线程 ID:74
Finally,释放锁成功。线程 ID:74
加锁成功,执行后续代码。线程 ID:79
Finally,释放锁成功。线程 ID:79
- 第一步:线程 A 在 0 秒时,抢占到锁,0.1 秒后,开始执行等待 10 s。
- 第二步:线程 B 在 0.1 秒尝试抢占锁,未能抢到锁(被 A 抢占了)。
- 第三步:线程 A 在 10.1 秒后,释放锁。
- 第四步:线程 B 在 10.1 秒后抢占到锁,然后等待 10 秒后释放锁。
由此可以得出结论,Redisson 的可重入锁(lock)是阻塞其他线程的,需要等待其他线程释放的。
验证二:服务停了,锁会释放吗?
如果线程 A 在等待的过程中,服务突然停了,那么锁会释放吗?如果不释放的话,就会成为死锁,阻塞了其他线程获取锁。
我们先来看下线程 A 的获取锁后的,Redis 客户端查询到的结果,如下图所示:
锁的默认过期时间是30S,在停机后,占用的锁会自动释放。
这个就是看门狗的机制。
如果负责储存这个分布式锁的 Redisson 节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗
,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。
如果我们未制定 lock 的超时时间,就使用 30 秒作为看门狗的默认时间。只要占锁成功,就会启动一个定时任务
:每隔 10 秒重新给锁设置过期的时间,过期时间为 30 秒。
当服务器宕机后,因为锁的有效期是 30 秒,所以会在 30 秒内自动解锁。(30秒等于宕机之前的锁占用时间+后续锁占用的时间)。
Redisson原理分析
加锁机制
线程去获取锁,获取成功: 执行lua脚本,保存数据到redis数据库。
线程去获取锁,获取失败: 一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。
watch dog自动延期机制
它的作用就是 线程1 业务还没有执行完,时间就过了,线程1 还想持有锁的话,就会启动一个watch dog后台线程,不断的延长锁key的生存时间。
可重入加锁机制
Redisson可以实现可重入加锁机制的原因,应该跟两点有关:
- Redis存储锁的数据类型是 Hash类型
- Hash数据类型的key值包含了当前线程信息。
这里表面数据类型是Hash类型,Hash类型相当于我们java的 <key,<key1,value>>
类型,这里key是指 ‘redisson’
它的有效期还有9秒,我们再来看里们的key1值为078e44a3-5f95-4e24-b6aa-80684655a15a:45
它的组成是:
guid + 当前线程的ID。后面的value是就和可重入加锁有关。
分布式锁的核心功能其实就三个:加锁、解锁、设置锁超时
加锁源码分析
RLock是一个接口,具体的同步器需要实现该接口,当我们调用redisson.getLock()
时,程序会初始化一个默认的同步执行器RedissonLock
这里面初始化了几个参数
commandExecutor:异步的Executor执行器,Redisson中所有的命令都是通过…Executor 执行的 ;
id:唯一ID,初始化的时候是用UUID创建的;
internalLockLeaseTime:等待获取锁时间,这里读的是配置类中默认定义的,时间为30秒;
图片里标注了一个方法getLockName
,返回的是 “ID :线程ID” 的字符串,代表的是当前线程持有对应锁的一个标识
Redisson的加锁方法有两个,tryLock和lock,使用上的区别在于tryLock可以设置锁的过期时长leaseTime
和等待时长waitTime
,核心处理的逻辑都差不多,我们先从tryLock讲起。
tryLock
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 剩余的等待锁的时间
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 尝试获取锁,如果没取到锁,则返回锁的剩余超时时间
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// ttl为null,说明可以抢到锁了,返回true
if (ttl == null) {
return true;
}
// 如果waitTime已经超时了,就返回false,代表申请锁失败
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
// 订阅分布式锁, 解锁时进行通知,这里就用到了发布-订阅
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
// 阻塞等待锁释放
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
// 等待都超时了,直接取消订阅
unsubscribe(res, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
} catch (ExecutionException e) {
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 进入死循环,反复去调用tryAcquire尝试获取锁,跟上面那一段拿锁的逻辑一样
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
代码还是挺长的,不过流程也就两步,要么线程拿到锁返回成功;要么没拿到锁并且等待时间还没过就继续循环拿锁,同时监听锁是否被释放。
拿锁的方法是tryAcquire
,传入的参数分别是锁的持有时间,时间单位以及代表当前线程的ID,跟进代码查看调用栈,它会调到一个叫做tryAcquireAsync
的方法:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
if (ttlRemaining == null) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
继续跟,看看tryLockInnerAsync
方法的源码:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
这里就是底层的调用栈了,直接操作命令,整合成lua脚本后,调用netty的工具类跟redis进行通信,从而实现获取锁的功能。
- 先用
exists key
命令判断是否锁是否被占据了,没有的话就用hset
命令写入,key为锁的名称,field为“客户端唯一ID:线程ID”,value为1; - 锁被占据了,判断是否是当前线程占据的,是的话value值加1;
- 锁不是被当前线程占据,返回锁剩下的过期时长;
用了redis的Hash结构存储数据,如果发现当前线程已经持有锁了,就用hincrby
命令将value值加1,value的值将决定释放锁的时候调用解锁命令的次数,达到实现锁的可重入性效果。
根据上面的命令可以看出,如果线程拿到锁的话,tryLock方法会直接返回true,万事大吉。
拿不到的话,就会返回锁的剩余过期时长,这个时长有什么作用呢?我们回到tryLock方法中死循环的那个地方:
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
这里有一个针对waitTime和key的剩余过期时间大小的比较,取到二者中比较小的那个值,然后用Java的Semaphore信号量的tryAcquire方法来阻塞线程。
那么Semaphore信号量又是由谁控制呢,何时才能release呢。这里又需要回到上面来看,贴的tryLock代码中还有这一段:
current = System.currentTimeMillis();
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
订阅的逻辑显然是在subscribe
方法里,跟着方法的调用链,它会进入到PublishSubscribe.Java中:
public CompletableFuture<E> subscribe(String entryName, String channelName) {
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
CompletableFuture<E> newPromise = new CompletableFuture<>();
semaphore.acquire(() -> {
if (newPromise.isDone()) {
semaphore.release();
return;
}
E entry = entries.get(entryName);
if (entry != null) {
entry.acquire();
semaphore.release();
entry.getPromise().whenComplete((r, e) -> {
if (e != null) {
newPromise.completeExceptionally(e);
return;
}
newPromise.complete(r);
});
return;
}
E value = createEntry(newPromise);
value.acquire();
E oldValue = entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.acquire();
semaphore.release();
oldValue.getPromise().whenComplete((r, e) -> {
if (e != null) {
newPromise.completeExceptionally(e);
return;
}
newPromise.complete(r);
});
return;
}
RedisPubSubListener<Object> listener = createListener(channelName, value);
CompletableFuture<PubSubConnectionEntry> s = service.subscribeNoTimeout(LongCodec.INSTANCE, channelName, semaphore, listener);
newPromise.whenComplete((r, e) -> {
if (e != null) {
s.completeExceptionally(e);
}
});
s.whenComplete((r, e) -> {
if (e != null) {
value.getPromise().completeExceptionally(e);
return;
}
value.getPromise().complete(value);
});
});
return newPromise;
}
这段代码的作用在于将当前线程的threadId添加到一个AsyncSemaphore中,并且设置一个redis的监听器,这个监听器是通过redis的发布、订阅功能实现的。
一旦监听器收到redis发来的消息,就从中获取与当前thread相关的,如果是锁被释放的消息,就立马通过操作Semaphore(也就是调用release方法)来让刚才阻塞的地方释放。
释放后线程继续执行,仍旧是判断是否已经超时。如果还没超时,就进入下一次循环再次去获取锁,拿到就返回true,没有拿到的话就继续流程。
这里说明一下,之所以要循环,是因为锁可能会被多个客户端同时争抢,线程阻塞被释放之后的那一瞬间很可能还是拿不到锁,但是线程的等待时间又还没过,这个时候就需要重新跑循环去拿锁。
这就是tryLock获取锁的整个过程了,画一张流程图的话表示大概是这样:
lock
除了tryLock,一般我们还经常直接调用lock来获取锁,lock的拿锁过程跟tryLock基本是一致的,区别在于lock没有手动设置锁过期时长的参数,该方法的调用链也是跑到tryAcquire
方法来获取锁的。
这段代码做了两件事:
1、预设30秒的过期时长,然后去获取锁
2、开启一个监听器,如果发现拿到锁了,就开启定时任务不断去刷新该锁的过期时长
刷新过期时长的方法是scheduleExpirationRenewal
,贴一下源码吧:
protected void scheduleExpirationRenewal(long threadId) {
RedissonBaseLock.ExpirationEntry entry = new RedissonBaseLock.ExpirationEntry();
RedissonBaseLock.ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
private void renewExpiration() {
RedissonBaseLock.ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RedissonBaseLock.ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
代码的流程比较简单,大概就是开启一个定时任务,每隔internalLockLeaseTime / 3
的时间(这个时间是10秒)就去检测锁是否还被当前线程持有,是的话就重新设置过期时长internalLockLeaseTime
,也就是30秒的时间。
而这些定时任务会存储在一个RedissonBaseLock
.
ExpirationEntry
对象中,如果发现RedissonBaseLock
.
ExpirationEntry
中不存在对应当前线程key的话,定时任务就不会跑,这也是后面解锁中的一步重要操作。
上面这段代码就是Redisson中所谓的”看门狗“程序,用一个异步线程来定时检测并执行的,以防手动解锁之前就过期了。
其他的逻辑就跟tryLock()
基本没什么两样啦
解锁源码分析
有拿锁的方法,自然也就有解锁。Redisson分布式锁解锁的上层调用方法是unlock(),默认不用传任何参数
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
// Future<Void> future = unlockAsync();
// future.awaitUninterruptibly();
// if (future.isSuccess()) {
// return;
// }
// if (future.cause() instanceof IllegalMonitorStateException) {
// throw (IllegalMonitorStateException)future.cause();
// }
// throw commandExecutor.convertException(future);
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
RFuture<Boolean> future = unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((opStatus, e) -> {
// 成功释放锁,取消"看门狗"的续时线程
cancelExpirationRenewal(threadId);
if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompletableFutureWrapper<>(f);
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
protected void cancelExpirationRenewal(Long threadId) {
RedissonBaseLock.ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
解锁相关的命令操作在unlockInnerAsync
方法中定义
又是一大串的lua脚本,比起前面加锁那段脚本的命令稍微复杂了点
- ①:若当前线程并没有持有锁,则返回nil。
- ②:当前线程持有锁,则对value-1,拿到-1之后的vlaue。
- ③:value>0,以毫秒为单位返回剩下的过期时间。(保证可重入)
- ④:value<=0,则对key进行删除操作,return 1 (方法返回 true)。然后进行redis-pub指令。
总的来说,这段代码的主要作用是在一个哈希表中对某个字段进行计数,当计数到0时,删除该哈希表,并在另一个频道上发布一个消息。这种模式常用于实现分布式锁或者限流等功能。
当线程完全释放锁后,就会调用cancelExpirationRenewal()
方法取消”看门狗”的续时线程
到这里基本就差不多啦,但是需要注意的是redisson的代码每个版本之间还是有很大差异的,比如之前的源码都是用RFuture
,Runnable
等方式来进行异步的操作,但是现在都改为了是用CompletableFuture
,CompletionStage
,在写法上修改了,但是总体思路没变
这里的测试都是用单机redis测试,集群环境下或许需要做调整
redis分布式锁的高可用
关于Redis分布式锁的高可用问题,大致如下:
在master- slave的集群架构中,就是如果你对某个redis master实例,写入了DISLOCK这种锁key的value,此时会异步复制给对应的master slave实例。
但是,这个过程中一旦发生redis master宕机,主备切换,redis slave变为了redis master。而此时的主从复制没有彻底完成…..
接着就会导致,客户端2来尝试加锁的时候,在新的redis master上完成了加锁,而客户端1也以为自己成功加了锁。
此时就会导致多个客户端对一个分布式锁完成了加锁。
这时系统在业务语义上一定会出现问题,导致脏数据的产生。
所以这个是是redis master-slave架构的主从异步复制导致的redis分布式锁的最大缺陷:
在redis master实例宕机的时候,可能导致多个客户端同时完成加锁。
高可用的RedLock(红锁)原理
RedLock算法思想:
不能只在一个redis实例上创建锁,应该是在多个redis实例上创建锁,n / 2 + 1,必须在大多数redis节点上都成功创建锁,才能算这个整体的RedLock加锁成功,避免说仅仅在一个redis实例上加锁而带来的问题。
这个场景是假设有一个 redis cluster,有 5 个 redis master 实例。然后执行如下步骤获取一把红锁:
- 获取当前时间戳,单位是毫秒;
- 跟上面类似,轮流尝试在每个 master 节点上创建锁,过期时间较短,一般就几十毫秒;
- 尝试在大多数节点上建立一个锁,比如 5 个节点就要求是 3 个节点 n / 2 + 1;
- 客户端计算建立好锁的时间,如果建立锁的时间小于超时时间,就算建立成功了;
- 要是锁建立失败了,那么就依次之前建立过的锁删除;
- 只要别人建立了一把分布式锁,你就得不断轮询去尝试获取锁。
RedLock是基于redis实现的分布式锁,它能够保证以下特性:
- 互斥性:在任何时候,只能有一个客户端能够持有锁;避免死锁:
- 当客户端拿到锁后,即使发生了网络分区或者客户端宕机,也不会发生死锁;(利用key的存活时间)
- 容错性:只要多数节点的redis实例正常运行,就能够对外提供服务,加锁或者释放锁;
以sentinel模式架构为例,如下图所示,有sentinel-1,sentinel-2,sentinel-3总计3个sentinel模式集群,如果要获取分布式锁,那么需要向这3个sentinel集群通过EVAL命令执行LUA脚本,需要3/2+1=2,即至少2个sentinel集群响应成功,才算成功的以Redlock算法获取到分布式锁:
高可用的红锁会导致性能降低
提前说明,使用redis分布式锁,是追求高性能, 在cap理论中,追求的是 ap 而不是cp。
所以,如果追求高可用,建议使用 zookeeper分布式锁。
redis分布式锁可能导致的数据不一致性,建议使用业务补偿的方式去弥补。所以,不太建议使用红锁。
RedissonRedLock实现原理
上面示例中使用redLock.lock()或者tryLock()最终都是执行RedissonRedLock
中方法。
RedissonRedLock
继承自RedissonMultiLock
, 实现了其中的一些方法:
public class RedissonRedLock extends RedissonMultiLock {
public RedissonRedLock(RLock... locks) {
super(locks);
}
/**
* 锁可以失败的次数,锁的数量-锁成功客户端最小的数量
*/
@Override
protected int failedLocksLimit() {
return locks.size() - minLocksAmount(locks);
}
/**
* 锁的数量 / 2 + 1,例如有3个客户端加锁,那么最少需要2个客户端加锁成功
*/
protected int minLocksAmount(final List<RLock> locks) {
return locks.size()/2 + 1;
}
/**
* 计算多个客户端一起加锁的超时时间,每个客户端的等待时间
* remainTime默认为4.5s
*/
@Override
protected long calcLockWaitTime(long remainTime) {
return Math.max(remainTime / locks.size(), 1);
}
@Override
public void unlock() {
unlockInner(locks);
}
}
看到locks.size()/2 + 1
,例如我们有3个客户端实例,那么最少2个实例加锁成功才算分布式锁加锁成功。
RedissonMultiLock
实现原理
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// try {
// return tryLockAsync(waitTime, leaseTime, unit).get();
// } catch (ExecutionException e) {
// throw new IllegalStateException(e);
// }
long newLeaseTime = -1;
if (leaseTime > 0) {
if (waitTime > 0) {
// 如果等待时间设置了,那么将等待时间 * 2
newLeaseTime = unit.toMillis(waitTime)*2;
} else {
newLeaseTime = unit.toMillis(leaseTime);
}
}
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime > 0) {
remainTime = unit.toMillis(waitTime);
}
// 计算锁的等待时间,RedLock中:如果remainTime=-1,那么lockWaitTime为1
long lockWaitTime = calcLockWaitTime(remainTime);
// RedLock中failedLocksLimit即为n/2 + 1
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
// 循环每个redis客户端,去获取锁
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
// 调用tryLock方法去获取锁,如果获取锁成功,则lockAcquired=true
if (waitTime <= 0 && leaseTime <= 0) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
lockAcquired = false;
}
// 如果获取锁成功,将锁加入到list集合中
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
// 如果获取锁失败,判断失败次数是否等于失败的限制次数
// 比如,3个redis客户端,最多只能失败1次
// 这里locks.size = 3, 3-x=1,说明只要成功了2次就可以直接break掉循环
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
// 如果最大失败次数等于0
if (failedLocksLimit == 0) {
// 释放所有的锁,RedLock加锁失败
unlockInner(acquiredLocks);
if (waitTime <= 0) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// 重置迭代器 重试再次获取锁
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
// 失败的限制次数减一
// 比如3个redis实例,最大的限制次数是1,如果遍历第一个redis实例,失败了,那么failedLocksLimit会减成0
// 如果failedLocksLimit就会走上面的if逻辑,释放所有的锁,然后返回false
failedLocksLimit--;
}
}
if (remainTime > 0) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime > 0) {
acquiredLocks.stream()
.map(l -> (RedissonBaseLock) l)
.map(l -> l.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS))
.forEach(f -> f.toCompletableFuture().join());
}
return true;
}
核心代码都已经加了注释,实现原理其实很简单,基于RedLock思想,遍历所有的Redis客户端,然后依次加锁,最后统计成功的次数来判断是否加锁成功。
Redis分段锁
普通Redis分布式锁的性能瓶颈问题
分布式锁一旦加了之后,对同一个商品的下单请求,会导致所有下单操作,都必须对同一个商品key加分布式锁。
假设某个场景,一个商品1分钟6000订单,每秒的 600个下单操作,
假设加锁之后,释放锁之前,查库存 -> 创建订单 -> 扣减库存,每个IO操作100ms,大概300毫秒。
具体如下图:
可以再进行一下优化,将 创建订单 + 扣减库存 并发执行,将两个100ms 减少为一个100ms,这既是空间换时间的思想,大概200毫秒。
将 创建订单 + 扣减库存 批量执行,减少一次IO,也是大概200毫秒。
这个优化方案,有个重要的前提,就是 订单表和库存表在相同的库中,但是,这个前提条件,在数据量大+高并发的场景下,够呛。
package com.crazymaker.springcloud;
import com.crazymaker.springcloud.common.util.ThreadUtil;
import org.junit.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
public class CoCurrentDemo {
/**
* 使用CompletableFuture 和 CountDownLatch 进行并发回调
*/
@Test
public void testMutiCallBack() {
CountDownLatch countDownLatch = new CountDownLatch(10);
//批量异步
ExecutorService executor = ThreadUtil.getIoIntenseTargetThreadPool();
long start = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
long tid = ThreadUtil.getCurThreadId();
try {
System.out.println("线程" + tid + "开始了,模拟一下远程调用");
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return tid;
}, executor);
future.thenAccept((tid) -> {
System.out.println("线程" + tid + "结束了");
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
//输出统计结果
float time = System.currentTimeMillis() - start;
System.out.println("所有任务已经执行完毕");
System.out.println("运行的时长为(ms):" + time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
结果:
线程16开始了,模拟一下远程调用
线程17开始了,模拟一下远程调用
线程15开始了,模拟一下远程调用
线程18开始了,模拟一下远程调用
线程22开始了,模拟一下远程调用
线程19开始了,模拟一下远程调用
线程13开始了,模拟一下远程调用
线程14开始了,模拟一下远程调用
线程21开始了,模拟一下远程调用
线程20开始了,模拟一下远程调用
线程16结束了
线程17结束了
线程18结束了
线程19结束了
线程13结束了
线程22结束了
线程21结束了
线程20结束了
线程15结束了
线程14结束了
所有任务已经执行完毕
运行的时长为(ms):137.0
分段加锁的思想来源
空间换时间, 分段加锁
尤其是 LongAdder 的实现思想,可以用于 Redis分布式锁 作为性能提升的手段,将 Redis分布式锁 优化为 Redis分段锁。
回到前面的场景:
假设一个商品1分钟6000订单,每秒的 600个下单操作,
假设加锁之后,释放锁之前,查库存 -> 创建订单 -> 扣减库存,经过优化,每个IO操作100ms,大概200毫秒,一秒钟5个订单。
如何提高性能呢? 空间换时间
为了达到每秒600个订单,可以将锁分成 600 /5 =120 个段,反过来, 每个段1秒可以操作5次, 120个段,合起来,及时每秒操作600次。
进行抢夺锁的,如果申请到一个具体的段呢?
每一次使用随机算法,随机到一个分段, 如果不行,就轮询下一个分段,具体的流程,大致如下:
案例
@Slf4j
@Data
@AllArgsConstructor
public class JedisMultiSegmentLock implements Lock {
public static final int NO_SEG = -1;
//拿到锁的线程
private Thread thread;
//拿到锁的状态
private volatile boolean isLocked = false;
//段数
private final int segAmount;
public static final int DEFAULT_TIMEOUT = 2000;
public static final Long WAIT_GAT = Long.valueOf(100);
//内部的锁
InnerLock[] innerLocks = null;
//被锁住的分段
int segmentIndexLocked = NO_SEG;
/**
* 默认为2000ms
*/
long expire = 2000L;
int segmentIndex = 0;
public JedisMultiSegmentLock(String lockKey, String requestId, int segAmount) {
this.segAmount = segAmount;
innerLocks = new InnerLock[segAmount];
for (int i = 0; i < this.segAmount; i++) {
//每一个分段,加上一个编号
String innerLockKey = lockKey + ":" + i;
innerLocks[i] = new InnerLock(innerLockKey, requestId);
}
segmentIndex = RandomUtil.randInModLower(this.segAmount);
}
/**
* 获取一个分布式锁 , 超时则返回失败
*
* @return 获锁成功 - true | 获锁失败 - false
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
//本地可重入
if (isLocked && thread == Thread.currentThread()) {
return true;
}
expire = unit != null ? unit.toMillis(time) : DEFAULT_TIMEOUT;
long startMillis = System.currentTimeMillis();
Long millisToWait = expire;
boolean localLocked = false;
int turn = 1;
InnerLock innerLock = innerLocks[segmentIndex];
while (!localLocked) {
localLocked = innerLock.lock(expire);
if (!localLocked) {
millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait > 0L) {
/**
* 还没有超时
*/
ThreadUtil.sleepMilliSeconds(WAIT_GAT);
log.info("睡眠一下,重新开始,turn:{},剩余时间:{}", turn++, millisToWait);
segmentIndex++;
if (segmentIndex >= this.segAmount) {
segmentIndex = 0;
}
innerLock = innerLocks[segmentIndex];
} else {
log.info("抢锁超时");
return false;
}
} else {
segmentIndexLocked = segmentIndex;
isLocked = true;
localLocked = true;
thread = Thread.currentThread();
}
}
return isLocked;
}
/**
* 抢夺锁
*/
@Override
public void lock() {
throw new IllegalStateException(
"方法 'lock' 尚未实现!");
}
//释放锁
@Override
public void unlock() {
if (segmentIndexLocked == NO_SEG) {
return;
}
this.innerLocks[segmentIndexLocked].unlock();
segmentIndexLocked = NO_SEG;
thread = null;
isLocked = false;
}
@Override
public Condition newCondition() {
throw new IllegalStateException(
"方法 'newCondition' 尚未实现!");
}
@Override
public void lockInterruptibly() throws InterruptedException {
throw new IllegalStateException(
"方法 'lockInterruptibly' 尚未实现!");
}
@Override
public boolean tryLock() {
throw new IllegalStateException(
"方法 'tryLock' 尚未实现!");
}
}
@Slf4j
public class InnerLock {
private RedisTemplate redisTemplate;
public static final Long LOCKED = Long.valueOf(1);
public static final Long UNLOCKED = Long.valueOf(1);
public static final int EXPIRE = 2000;
String key;
String requestId; // lockValue 锁的value ,代表线程的uuid
/**
* 默认为2000ms
*/
long expire = 2000L;
private volatile boolean isLocked = false;
private RedisScript lockScript;
private RedisScript unLockScript;
public InnerLock(String lockKey, String requestId) {
this.key = lockKey;
this.requestId = requestId;
lockScript = ScriptHolder.getLockScript();
unLockScript = ScriptHolder.getUnlockScript();
}
/**
* 抢夺锁
*/
public void lock() {
if (null == key) {
return;
}
try {
List<String> redisKeys = new ArrayList<>();
redisKeys.add(key);
redisKeys.add(requestId);
redisKeys.add(String.valueOf(expire));
Long res = (Long) getRedisTemplate().execute(lockScript, redisKeys);
isLocked = false;
} catch (Exception e) {
e.printStackTrace();
throw BusinessException.builder().errMsg("抢锁失败").build();
}
}
/**
* 有返回值的抢夺锁
*
* @param millisToWait
*/
public boolean lock(Long millisToWait) {
if (null == key) {
return false;
}
try {
List<String> redisKeys = new ArrayList<>();
redisKeys.add(key);
redisKeys.add(requestId);
redisKeys.add(String.valueOf(millisToWait));
Long res = (Long) getRedisTemplate().execute(lockScript, redisKeys);
return res != null && res.equals(LOCKED);
} catch (Exception e) {
e.printStackTrace();
throw BusinessException.builder().errMsg("抢锁失败").build();
}
}
//释放锁
public void unlock() {
if (key == null || requestId == null) {
return;
}
try {
List<String> redisKeys = new ArrayList<>();
redisKeys.add(key);
redisKeys.add(requestId);
Long res = (Long) getRedisTemplate().execute(unLockScript, redisKeys);
// log.info("unlock");
// boolean unlocked = res != null && res.equals(UNLOCKED);
} catch (Exception e) {
e.printStackTrace();
throw BusinessException.builder().errMsg("释放锁失败").build();
}
}
private RedisTemplate getRedisTemplate() {
if(null==redisTemplate)
{
redisTemplate= (RedisTemplate) SpringContextUtil.getBean("stringRedisTemplate");
}
return redisTemplate;
}
}