- A+
前言
Redis
是一款强大的高性能键值存储数据库,也是目前NOSQL
中最流行比较流行的一款数据库,它在广泛的应用场景中扮演着至关重要的角色,包括但不限于缓存、消息队列、会话存储等。在本文中,我们将介绍如何基于C# Socket
来实现一个简单的Redis客户端类RedisClient
,来演示构建请求和输出的相关通信机制。需要注意的是本文只是着重展示如何基于原生的Socket
方式与Redis Server
进行通信,并不是构建一个强大的Redis开发工具包
。
Redis简介
Redis(Remote Dictionary Server)
是一个内存数据库,它支持了非常丰富的数据结构,包括字符串、列表、集合、散列、有序集合等。Redis 提供了高性能的读写操作,可以用于缓存数据、消息队列、分布式锁、会话管理等多种用途。Redis 通常以键值对的方式存储数据,每个键都与一个值相关联,值的类型可以是字符串、列表、散列等。Redis
不仅提供了丰富的命令集,用于操作存储在数据库中的数据,还提供了Redis serialization protocol (RESP)
协议来解析Redis Server
返回的数据。相关的文档地址如下所示:
- Redis官网地址 https://redis.io/
- Redis官方文档地址 https://redis.io/docs/
- Redis命令文档地址 https://redis.io/commands/
- Redis序列化协议规范文档地址 https://redis.io/docs/reference/protocol-spec/
Redis 命令指南
Redis命令
是与Redis服务器进行通信的主要方式,通俗点就是发送指定格式的指令用于执行各种操作,包括数据存储、检索、修改和删除等。以下是一些日常使用过程中常见的Redis命令及其用途:
-
GET 和 SET 命令
GET key
: 用于获取指定键的值。SET key value
: 用于设置指定键的值.
-
DEL 命令
DEL key
: 用于删除指定键.
-
EXPIRE 和 TTL 命令
EXPIRE key seconds
: 用于为指定键设置过期时间(秒).TTL key
: 用于获取指定键的剩余过期时间(秒).
注意这里的时间单位是秒
-
INCR 和 DECR 命令
INCR key
: 用于递增指定键的值.DECR key
: 用于递减指定键的值.
-
RPUSH 和 LPOP 命令
RPUSH key value
: 用于将值添加到列表的右侧.LPOP key
: 用于从列表的左侧弹出一个值.
-
HSET 和 HGET 命令
HSET key field value
: 用于设置哈希表中指定字段的值.HGET key field
: 用于获取哈希表中指定字段的值.
-
PUBLISH 和 SUBSCRIBE 命令
PUBLISH channel message
: 用于向指定频道发布消息.SUBSCRIBE channel
: 用于订阅指定频道的消息.
当然 Redis 支持的命令远不止这些,它还包括对集合、有序集合、位图、HyperLogLog 等数据结构的操作,以及事务、Lua 脚本执行等高级功能。我们接下来演示的时候也只是展示几个大家比较熟悉的指令,这也是我们学习新知识的时候经常使用的方式,先从最简单最容易的开始入手,循序渐进,这也是微精通
所提倡的方式。
Redis协议(RESP)
Redis Serialization Protocol (RESP)
是 Redis 使用的二进制协议,用于客户端和服务器之间的通信。我们可以通过该协议解析Redis服务器
返回的命令格式,解析我们想要的数据。RESP具有简洁易解析的特点
-
简单字符串协议:
- 格式:
+OKrn
- 第一个字节是"+”,后跟消息内容,以"rn"(回车和换行)结束。
- 示例:
+OKrn
- 格式:
-
批量字符串协议:
- 格式:
$5rnhellorn
- 第一个字节是"$",后跟字符串的字节长度,然后是实际的字符串内容,最后以"rn"结束。
- 示例:
$5rnhellorn
- 格式:
-
整数协议:
- 格式:
:42rn
- 第一个字节是":",后跟整数的文本表示,以"rn"结束。
- 示例:
:42rn
- 格式:
-
数组协议:
- 格式:
*3rn:1rn:2rn:3rn
- 第一个字节是"*",后跟数组中元素的数量,然后是数组中每个元素的 RESP 表示,以"rn"结束。
- 示例:
*3rn:1rn:2rn:3rn
- 格式:
-
错误协议:
- 格式:
-Error messagern
- 第一个字节是"-",后跟错误消息内容,以"rn"结束。
- 示例:
-Error messagern
- 格式:
需要注意的是字符串协议里面的长度不是具体字符的长度,而是对应的
UTF8
对应的字节数组的长度,这一点对于我们解析返回的数据很重要,否则获取数据的时候会影响数据的完整性。
RESP协议
是Redis高效性能的关键之一,它相对比较加单,不需要解析各种头信息等,这使得Redis能够在处理大规模数据和请求时表现出色。了解RESP协议可以帮助您更好地理解Redis客户端类 RedisClient
的内部工作原理。可以理解为它属于一种应用层面的协议,通过给定的数据格式解析出想要的数据,这也对我们在实际编程过程中,解决类似的问题,提供了一个不错的思路。
实现RedisClient
上面我们介绍了一些关于Redis
的基础概念,重点介绍了一下关于Redis
的命令和RESP
,接下来我们就结合上面的理论,基于C# Socket
来简单的模拟一下如何和Redis Server
进行数据交互。主要就是结合Redis命令
和Redis 协议(RESP)
来简单的实现。
通信架子
首先来看一下类的结构
public class RedisClient : IDisposable, IAsyncDisposable { //定义默认端口 private readonly int DefaultPort = 6379; //定义默认地址 private readonly string Host = "localhost"; //心跳间隔,单位为毫秒 private readonly int HeartbeatInterval = 30000; private bool _isConnected; //心跳定时器 private Timer _heartbeatTimer; private Socket _socket; public RedisClient(string host = "localhost", int defaultPort = 6379) { Host = host; DefaultPort = defaultPort; // 初始化心跳定时器 _heartbeatTimer = new Timer(HeartbeatCallback, null, HeartbeatInterval, HeartbeatInterval); } //连接方法 public async Task ConnectAsync(int timeoutMilliseconds = 5000) { _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); var cts = new CancellationTokenSource(timeoutMilliseconds); await _socket.ConnectAsync(Host, DefaultPort, cts.Token); _isConnected = true; } //心跳方法 private async void HeartbeatCallback(object state) { if (_isConnected) { var pingCommand = "PINGrn"; await SendCommandAsync(pingCommand); } } //释放逻辑 public void Dispose() { DisposeAsync().GetAwaiter().GetResult(); } public ValueTask DisposeAsync() { // 停止心跳定时器 _heartbeatTimer.Dispose(); if (_socket != null) { _socket.Shutdown(SocketShutdown.Both); _socket.Close(); } return ValueTask.CompletedTask; } }
上面的类定义了实现的大致通信结构,结构中主要涉及到的是通信相关的功能实现,包含Socket
的初始化信息、默认的连连接信息、心跳方法、释放逻辑等。首先,在构造函数中,指定了默认的Redis端口(6379)、地址(localhost),并初始化了心跳定时器。连接方法ConnectAsync
通过Socket
建立与Redis服务器
的TCP连接。心跳定时器HeartbeatCallback
定期发送PING
命令,确保与服务器的连接保持活动。最后,Dispose方法
用于释放资源,包括停止心跳定时器和关闭Socket
连接,实现了IDisposable
和IAsyncDisposable
接口。这些功能为RedisClient
类提供了基本的连接和资源管理能力。由于我对Socket
编程也不是很熟悉,所以定义的可能不是很完善,有比较熟悉的同学,可以多多指导。
发送和解析
有了这个基础的架子之后,我们可以在里面填写具体的实现逻辑了。首先我们来定义发送Redis
命令和解析RESP
的逻辑
//发送命令 public async Task<string> SendCommandAsync(string command) { // 发送命令的实现 if (!_isConnected) { // 如果连接已断开,可以进行重连 await ConnectAsync(); } //Redis的命令是以rn为结尾的 var request = Encoding.UTF8.GetBytes(command + "rn"); //发送命令 await _socket.SendAsync(new ArraySegment<byte>(request), SocketFlags.None); var response = new StringBuilder(); var remainingData = string.Empty; //初始化响应字符串和剩余数据 byte[] receiveBuffer = ArrayPool<byte>.Shared.Rent(1024); try { while (true) { //读取返回信息 var bytesRead = await _socket.ReceiveAsync(new ArraySegment<byte>(receiveBuffer), SocketFlags.None); //将接收到的数据添加到响应字符串 var responseData = remainingData + Encoding.UTF8.GetString(receiveBuffer, 0, bytesRead); //提取完整的响应并添加到响应字符串中 var completeResponses = ExtractCompleteResponses(ref responseData); foreach (var completeResponse in completeResponses) { response.Append(completeResponse); } remainingData = responseData; //结果为rn读取结束 if (response.ToString().EndsWith("rn")) { break; } } } finally { //释放缓冲区 ArrayPool<byte>.Shared.Return(receiveBuffer); } //返回完整的响应字符串 return response.ToString(); } private List<string> ExtractCompleteResponses(ref string data) { var completeResponses = new List<string>(); while (true) { var index = data.IndexOf("rn"); if (index >= 0) { // 提取一个完整的响应 var completeResponse = data.Substring(0, index + 2); //将完整的响应添加到列表中 completeResponses.Add(completeResponse); data = data.Substring(index + 2); } else { break; } } return completeResponses; } private string ParseResponse(string response) { if (response.StartsWith("$")) { // 处理 Bulk Strings($) var lengthStr = response.Substring(1, response.IndexOf('r') - 1); if (int.TryParse(lengthStr, out int length)) { if (length == -1) { return null!; } string rawRedisData = response.Substring(response.IndexOf('n') + 1); byte[] utf8Bytes = Encoding.UTF8.GetBytes(rawRedisData); string value = Encoding.UTF8.GetString(utf8Bytes, 0, length); return value; } } else if (response.StartsWith("+")) { // 处理 Simple Strings(+) return response.Substring(1, response.Length - 3); } else if (response.StartsWith(":")) { // 处理 Integers(:) var valueStr = response.Substring(1, response.IndexOf('r') - 1); if (int.TryParse(valueStr, out int value)) { return value.ToString(); } } // 如果响应格式不符合预期,抛出异常 throw new InvalidOperationException(response); }
上面逻辑涉及到发送和接收Redis消息的三个方法SendCommandAsync
、ExtractCompleteResponses
、ParseResponse
。虽然上面代码中有注释,但是咱们分别I简单的讲解一下这三个方法
-
SendCommandAsync
该方法主要目的是向 Redis 服务器发送命令并异步接收响应
- 连接检查:首先,检查连接状态 (_isConnected),如果连接已断开,则调用 ConnectAsync 方法进行重连。
- 命令转换:将传入的命令字符串转换为 UTF-8 编码的字节数组,附加回车换行符 ("rn")。
- 接收响应:使用异步循环接收来自服务器的响应。在每次接收之后,将接收到的数据添加到响应字符串中,并提取其中的完整响应。
- 缓冲区管理:为了有效地处理接收到的数据,使用了一个缓冲区 (receiveBuffer),并在方法结束时通过 ArrayPool
.Shared.Return 进行释放。 - 提取完整响应:调用 ExtractCompleteResponses 方法,该方法从响应数据中提取出一个或多个完整的响应,将其从数据中移除,并返回一个列表。
-
ExtractCompleteResponses
该方法主要用于从接收到的数据中提取出一个或多个完整的响应。
- completeResponses 列表:用于存储提取出的完整响应的列表。
- while 循环:循环进行以下操作,直到数据中没有换行符为止。
- 提取完整响应:如果找到换行符,就提取从数据开头到换行符位置的子字符串,包括换行符本身,构成一个完整的响应。
- 添加到列表:将提取出的完整响应添加到 completeResponses 列表中。
-
ParseResponse
该方法主要用于解析从 Redis 服务器接收到的响应字符串。
- 如果响应以 $ 开头,表示这是一个 Bulk String 类型的响应。
- 如果响应以 + 开头,表示这是一个 Simple String 类型的响应。
- 如果响应以 : 开头,表示这是一个 Integer 类型的响应。
简单操作方法
上面有了和Redis通信
的基本方法,也有了解析RESP
协议的基础方法,接下来咱们实现几个简单的Redis操作指令
来展示一下Redis客户端具体是如何工作的,简单的几个方法如下所示
//切换db操作 public async Task SelectAsync(int dbIndex) { var command = $"SELECT {dbIndex}"; await SendCommandAsync(command); } //get操作 public async Task<string> GetAsync(string key) { var command = $"GET {key}"; return ParseResponse(await SendCommandAsync(command)); } //set操作 public async Task<bool> SetAsync(string key, string value, TimeSpan? expiry = null) { var command = $"SET {key} '{value}'"; //判断会否追加过期时间 if (expiry.HasValue) { command += $" EX {expiry.Value.TotalSeconds}"; } var response = ParseResponse(await SendCommandAsync(command)); return response == "OK"; } //支持过期时间的setnx操作 public async Task<bool> SetNxAsync(string key, string value, TimeSpan? expiry = null) { //因为默认的setnx方法不支持添加过期时间,为了保证操作的原子性,使用了lua var command = $"EVAL "if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then if ARGV[2] then redis.call('EXPIRE', KEYS[1], ARGV[2]) end return true else return false end" 1 {key} '{value}'"; if (expiry.HasValue) { command += $" {expiry.Value.TotalSeconds}"; } var response = ParseResponse(await SendCommandAsync(command)); return response == "1"; } //添加支持函过期时间的list push操作 public async Task<long> ListPushAsync(string key, string value, TimeSpan? expiry = null) { var script = @"local len = redis.call('LPUSH', KEYS[1], ARGV[1]) if tonumber(ARGV[2]) > 0 then redis.call('EXPIRE', KEYS[1], ARGV[2]) end return len"; var keys = new string[] { key }; var args = new string[] { value, (expiry?.TotalSeconds ?? 0).ToString() }; var response = await ExecuteLuaScriptAsync(script, keys, args); return long.Parse(response); } //list pop操作 public async Task<string> ListPopAsync(string key) { var command = $"LPOP {key}"; return ParseResponse(await SendCommandAsync(command)); } //listrange操作 public async Task<List<string>> ListRangeAsync(string key, int start, int end) { var command = $"LRANGE {key} {start} {end}"; var response = await SendCommandAsync(command); if (response.StartsWith("*0rn")) { return new List<string>(); } //由于list range返回了是一个数组,所以单独处理了一下,这里我使用了正则,解析字符串也可以,方法随意 var values = new List<string>(); var pattern = @"$d+rn(.*?)rn"; MatchCollection matches = Regex.Matches(response, pattern); foreach (Match match in matches) { values.Add(match.Groups[1].Value); } return values; } //执行lua脚本的方法 public async Task<string> ExecuteLuaScriptAsync(string script, string[]? keys = null, string[]? args = null) { //去除lua里的换行 script = Regex.Replace(script, @"[rn]", ""); // 构建EVAL命令,将Lua脚本、keys和args发送到Redis服务器 var command = $"EVAL "{script}" { keys?.Length??0 } "; //拼接key和value参数 if (keys != null && keys.Length != 0) { command += string.Join(" ", keys.Select(key => $"{key}")); } if (args != null && args.Length != 0) { command += " " + string.Join(" ", args.Select(arg => $"{arg}")); } return ParseResponse(await SendCommandAsync(command)); } //redis发布操作 public async Task SubscribeAsync(string channel, Action<string, string> handler) { await SendCommandAsync($"SUBSCRIBE {channel}"); while (true) { var response = await SendCommandAsync(string.Empty); string pattern = @"*d+rn$d+rn(.*?)rn$d+rn(.*?)rn$d+rn(.*?)rn"; Match match = Regex.Match(response, pattern); if (match.Success) { string ch = match.Groups[2].Value; string message = match.Groups[3].Value; handler(ch, message); } } } //redis订阅操作 public async Task PublishAsync(string channel, string message) { await SendCommandAsync($"PUBLISH {channel} {message}"); }
上面方法中演示了几个比较常见的操作,很简单,主要是向大家展示Redis
命令是如何发送的,从最简单的GET
、SET
、LIST
、发布订阅
、执行LUA
操作方面着手,如果对Redis命令
比较熟悉的话,操作起来还是比较简单的,这里给大家讲解几个比较有代表的方法
- 首先关于
setnx
方法,由于自带的setnx方法不支持添加过期时间,为了保证操作的原子性,使用了lua脚本的方式 - 自带的
lpush
也就是上面ListPushAsync
方法中封装的操作,自带的也是没办法给定过期时间的,为了保证操作的原子性,我在这里也是用lua进行封装 - 关于执行
lua脚本
的时候的时候需要注意lua脚本的格式EVAL script numkeys [key [key ...]] [arg [arg ...]]
脚本后面紧跟着的长度是key的个数
这个需要注意 - 最后,自行编写命令的时候需要注意
rn
的处理和引号
的转义问题,当然研究的越深,遇到的问题越多
相信大家也看到了,这里我封装的都是几个简单的操作,难度系数不大,因为主要是向大家演示Redis客户端
的发送和接收操作是什么样的,甚至我都是直接返回的字符串,真实使用的时候我们使用都是需要封装序列化和反序列化操作的。
完整代码
上面分别对RedisClient
类中的方法进行了讲解,接下来我把我封装的类完整的给大家贴出来,由于封装的只是几个简单的方法用于演示,所以也只有一个类,代码量也不多,主要是为了方便大家理解,有想试验的同学可以直接拿走
public class RedisClient : IDisposable, IAsyncDisposable { private readonly int DefaultPort = 6379; private readonly string Host = "localhost"; private readonly int HeartbeatInterval = 30000; private bool _isConnected; private Timer _heartbeatTimer; private Socket _socket; public RedisClient(string host = "localhost", int defaultPort = 6379) { Host = host; DefaultPort = defaultPort; _heartbeatTimer = new Timer(HeartbeatCallback, null, HeartbeatInterval, HeartbeatInterval); } public async Task ConnectAsync(int timeoutMilliseconds = 5000) { _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); var cts = new CancellationTokenSource(timeoutMilliseconds); await _socket.ConnectAsync(Host, DefaultPort, cts.Token); _isConnected = true; } public async Task SelectAsync(int dbIndex) { var command = $"SELECT {dbIndex}"; await SendCommandAsync(command); } public async Task<string> GetAsync(string key) { var command = $"GET {key}"; return ParseResponse(await SendCommandAsync(command)); } public async Task<bool> SetAsync(string key, string value, TimeSpan? expiry = null) { var command = $"SET {key} '{value}'"; if (expiry.HasValue) { command += $" EX {expiry.Value.TotalSeconds}"; } var response = ParseResponse(await SendCommandAsync(command)); return response == "OK"; } public async Task<bool> SetNxAsync(string key, string value, TimeSpan? expiry = null) { var command = $"EVAL "if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then if ARGV[2] then redis.call('EXPIRE', KEYS[1], ARGV[2]) end return true else return false end" 1 {key} '{value}'"; if (expiry.HasValue) { command += $" {expiry.Value.TotalSeconds}"; } var response = ParseResponse(await SendCommandAsync(command)); return response == "1"; } public async Task<long> ListPushAsync(string key, string value, TimeSpan? expiry = null) { var script = @"local len = redis.call('LPUSH', KEYS[1], ARGV[1]) if tonumber(ARGV[2]) > 0 then redis.call('EXPIRE', KEYS[1], ARGV[2]) end return len"; var keys = new string[] { key }; var args = new string[] { value, (expiry?.TotalSeconds ?? 0).ToString() }; var response = await ExecuteLuaScriptAsync(script, keys, args); return long.Parse(response); } public async Task<string> ListPopAsync(string key) { var command = $"LPOP {key}"; return ParseResponse(await SendCommandAsync(command)); } public async Task<long> ListLengthAsync(string key) { var command = $"LLEN {key}"; return long.Parse(ParseResponse(await SendCommandAsync(command))); } public async Task<List<string>> ListRangeAsync(string key, int start, int end) { var command = $"LRANGE {key} {start} {end}"; var response = await SendCommandAsync(command); if (response.StartsWith("*0rn")) { return new List<string>(); } var values = new List<string>(); var pattern = @"$d+rn(.*?)rn"; MatchCollection matches = Regex.Matches(response, pattern); foreach (Match match in matches) { values.Add(match.Groups[1].Value); } return values; } public async Task<string> ExecuteLuaScriptAsync(string script, string[]? keys = null, string[]? args = null) { script = Regex.Replace(script, @"[rn]", ""); var command = $"EVAL "{script}" { keys?.Length??0 } "; if (keys != null && keys.Length != 0) { command += string.Join(" ", keys.Select(key => $"{key}")); } if (args != null && args.Length != 0) { command += " " + string.Join(" ", args.Select(arg => $"{arg}")); } return ParseResponse(await SendCommandAsync(command)); } public async Task SubscribeAsync(string channel, Action<string, string> handler) { await SendCommandAsync($"SUBSCRIBE {channel}"); while (true) { var response = await SendCommandAsync(string.Empty); string pattern = @"*d+rn$d+rn(.*?)rn$d+rn(.*?)rn$d+rn(.*?)rn"; Match match = Regex.Match(response, pattern); if (match.Success) { string ch = match.Groups[2].Value; string message = match.Groups[3].Value; handler(ch, message); } } } public async Task PublishAsync(string channel, string message) { await SendCommandAsync($"PUBLISH {channel} {message}"); } public async Task<string> SendCommandAsync(string command) { if (!_isConnected) { await ConnectAsync(); } var request = Encoding.UTF8.GetBytes(command + "rn"); await _socket.SendAsync(new ArraySegment<byte>(request), SocketFlags.None); var response = new StringBuilder(); var remainingData = string.Empty; byte[] receiveBuffer = ArrayPool<byte>.Shared.Rent(1024); try { while (true) { var bytesRead = await _socket.ReceiveAsync(new ArraySegment<byte>(receiveBuffer), SocketFlags.None); var responseData = remainingData + Encoding.UTF8.GetString(receiveBuffer, 0, bytesRead); var completeResponses = ExtractCompleteResponses(ref responseData); foreach (var completeResponse in completeResponses) { response.Append(completeResponse); } remainingData = responseData; if (response.ToString().EndsWith("rn")) { break; } } } finally { ArrayPool<byte>.Shared.Return(receiveBuffer); } return response.ToString(); } private List<string> ExtractCompleteResponses(ref string data) { var completeResponses = new List<string>(); while (true) { var index = data.IndexOf("rn"); if (index >= 0) { var completeResponse = data.Substring(0, index + 2); completeResponses.Add(completeResponse); data = data.Substring(index + 2); } else { break; } } return completeResponses; } private string ParseResponse(string response) { if (response.StartsWith("$")) { var lengthStr = response.Substring(1, response.IndexOf('r') - 1); if (int.TryParse(lengthStr, out int length)) { if (length == -1) { return null!; } string rawRedisData = response.Substring(response.IndexOf('n') + 1); byte[] utf8Bytes = Encoding.UTF8.GetBytes(rawRedisData); string value = Encoding.UTF8.GetString(utf8Bytes, 0, length); return value; } } else if (response.StartsWith("+")) { return response.Substring(1, response.Length - 3); } else if (response.StartsWith(":")) { var valueStr = response.Substring(1, response.IndexOf('r') - 1); if (int.TryParse(valueStr, out int value)) { return value.ToString(); } } throw new InvalidOperationException(response); } private async void HeartbeatCallback(object state) { if (_isConnected) { var pingCommand = "PINGrn"; await SendCommandAsync(pingCommand); } } public void Dispose() { DisposeAsync().GetAwaiter().GetResult(); } public ValueTask DisposeAsync() { _heartbeatTimer.Dispose(); if (_socket != null) { _socket.Shutdown(SocketShutdown.Both); _socket.Close(); } return ValueTask.CompletedTask; } }
简单使用RedisClient
上面我们封装了RedisClient
类,也讲解了里面实现的几个简单的方法,接下来我们就简单的使用一下它,比较简单直接上代码
GET/SET
GET/SET
是最基础和最简单的指令,没啥可说的直接上代码
using RedisClient redisClient = new RedisClient(); await redisClient.ConnectAsync(); //切换db await redisClient.SelectAsync(3); bool setResult = await redisClient.SetAsync("key:foo", "are you ok,你好吗?", TimeSpan.FromSeconds(120)); string getResult = await redisClient.GetAsync("key:foo"); Console.WriteLine("get key:foo:" + getResult);
SETNX
SETNX
比较常用,很多时候用在做分布式锁的场景,判断资源存不存在的时候经常使用
//第一次setnx返回true bool setNxResult = await redisClient.SetNxAsync("order:lock", "123_lock", TimeSpan.FromSeconds(120)); Console.WriteLine("first setnx order:lock:" + setNxResult); //第一次setnx返回false setNxResult = await redisClient.SetNxAsync("order:lock", "123_lock", TimeSpan.FromSeconds(120)); Console.WriteLine("second setnx aname:foo:" + setNxResult);
PUB/SUB
这里实现的SubscribeAsync
和PublishAsync
需要使用两个RedisClient
实例,因为我上面封装的每个RedisClient
只包含一个Socket
实例所以ReceiveAsync
方法是阻塞的。如果同一个实例的话SubscribeAsync
的时候,在使用PublishAsync
方法的时候会被阻塞,所以演示的时候使用了两个RedisClient
实例
_ = redisClient.SubscribeAsync("order_msg_ch", (ch, msg) => { Console.WriteLine($"接收消息:[{ch}]---[{msg}]"); }); Thread.Sleep(2000); using RedisClient redisClient2 = new RedisClient(); await redisClient2.ConnectAsync(); for (int i = 0; i < 5; i++) { await redisClient2.PublishAsync("order_msg_ch", $"发送消息{i}"); Thread.Sleep(2000); }
ExecuteLuaScriptAsync
动态执行lua的功能还是比较强大的,在之前的项目中,我也使用类似的功能。我们是模拟抢单/完成
的场景,比如业务人员需要自行抢单,每个人最多抢几单,超过阈值则抢单失败,你需要把抢到的完成了才能继续抢单,这种操作就需要借助lua进行操作
//抢单的lua string takeOrderLuaScript = @" local ordersTaken = tonumber(redis.call('GET', KEYS[1]) or '0') if ordersTaken < tonumber(ARGV[1]) then redis.call('INCR', KEYS[1]) return 1 else return 0 end"; //完成你手里的订单操作 string completeOrderLuaScript = @" local ordersTaken = tonumber(redis.call('GET', KEYS[1]) or '0') if ordersTaken > 0 then redis.call('DECR', KEYS[1]) return 1 else return 0 end"; //模拟抢单,最多抢两单 string result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" }); result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" }); result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" }); result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" }); //完成订单 string anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" }); anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" }); anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" }); anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
还有一个功能也是我们之前遇到的,就是使用Redis
实现缓存最新的N条消息,旧的则被抛弃,实现这个功能也需要使用Redis的List
结构结合lua的方式
string luaScript = @" local record_key = KEYS[1] local max_records = tonumber(ARGV[1]) local new_record = ARGV[2] local current_count = redis.call('LLEN', record_key) if current_count >= max_records then redis.call('LPOP', record_key) end redis.call('RPUSH', record_key, new_record) "; //这里限制保存最新的50条数据,旧的数据则被抛弃 for (int i = 0; i < 60; i++) { _ = await redisClient.ExecuteLuaScriptAsync(luaScript, keys: new[] { "msg:list" }, new[] { "50", i.ToString() }); }
List
LIST
很多时候会把它当做分布式队列来使用,它提供的操作也比较灵活,咱们这里只是封装了几个最简单的操作,大致的效果如下所示
//lis入队操作 var res = await redisClient.ListPushAsync("list:2", "123", TimeSpan.FromHours(1)); res = await redisClient.ListPushAsync("list:2", "1234", TimeSpan.FromHours(1)); res = await redisClient.ListPushAsync("list:2", "12345", TimeSpan.FromHours(1)); //list出队操作 var str = await redisClient.ListPopAsync("list:2"); //list长度 var length = await redisClient.ListLengthAsync("list:2"); //list range操作 var list = await redisClient.ListRangeAsync("article:list", 0, 10);
总结
本文我们通过理解Redis命令
和RESP协议
来构建了一个简单RedisClient
的实现,方便我们更容易的理解Redis客户端
如何与Redis服务器
进行通信,这个实现也可以作为学习和理解·Redis客户端·的一个很好的例子。当然我们的这个RedisClient
这是了解和学习使用,很多场景我们并没有展示,实际的项目我们还是尽量使用开源的Redis SDK
, .net
中常用的有StackExchange.Redis
、FreeRedis
、csredis
、NewLife.Redis
、Service.Stack.Redis
,其中我经常使用的是StackExchange.Redis
和FreeRedis
整体来说效果还是不错的。总结一下我们文章的主要内容
- 首先我们讲解了
Redis命令
的格式 - 其次我们讲解了
Redis协议(RESP)
的主要格式以及如何解析 - 然后我们基于上面的理论简单的封装了一个
RedisClient
类来演示相关概念 - 最后我们通过几个示例和我用过的两个
lua
来简单的演示RedisClient
类的使用
作为新时代的职场人,我乐在探究自己感兴趣的领域,对未知的事物充满好奇,并渴望深入了解。对于常用的核心技术,我不仅要求自己能够熟练运用,更追求深入理解其实现原理。面对新的技术趋势,我决不会视而不见,而是在熟悉更多常用技术栈的同时,努力深入掌握一些重要的知识。我坚信,学无止境,每一步的进步都带来无比的喜悦与成就感。