Rueidis 是一款高性能的 Go 语言 Redis 客户端,它支持自动流水线操作和服务端辅助客户端缓存等功能。Rueidis 的目标是提供一个简单易用、性能卓越的 Redis 客户端库,以满足 Go 开发者的各种需求。
主要功能
- 自动流水线操作,提升非阻塞命令的执行效率
- 服务端辅助客户端缓存,大幅降低延迟和提高吞吐量
- 支持泛型对象映射和客户端缓存
- 提供缓存策略模式(Cache-Aside pattern)的实现
- 支持分布式锁和客户端缓存
- 提供用于编写测试的 Rueidis 模拟库
- 集成 OpenTelemetry,方便监控和追踪
- 支持 Pub/Sub、分片 Pub/Sub 和 Streams
- 兼容 Redis Cluster、Sentinel、RedisJSON、RedisBloom、RediSearch、RedisTimeseries 等
- 提供不依赖 Redis Stack 的概率数据结构
快速入门
package main
import (
"context"
"github.com/redis/rueidis"
)
func main() {
client, err := rueidis.NewClient(rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
if err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
// SET key val NX
err = client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error()
// HGETALL hm
hm, err := client.Do(ctx, client.B().Hgetall().Key("hm").Build()).AsStrMap()
}
命令构建器
Rueidis 提供了简单易用的命令构建器,方便开发者构建 Redis 命令。
client.B().Set().Key("key").Value("val").Nx().Build()
流水线操作
自动流水线操作
Rueidis 会自动将并发执行的非阻塞 Redis 命令进行流水线操作,以减少网络往返次数和系统调用次数,从而提高吞吐量。
func BenchmarkPipelining(b *testing.B, client rueidis.Client) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
client.Do(context.Background(), client.B().Get().Key("k").Build()).ToString()
}
})
}
手动流水线操作
除了自动流水线操作外,Rueidis 还支持手动流水线操作。
cmds := make(rueidis.Commands, 0, 10)
for i := 0; i < 10; i++ {
cmds = append(cmds, client.B().Set().Key("key").Value("value").Build())
}
for _, resp := range client.DoMulti(ctx, cmds...) {
if err := resp.Error(); err != nil {
panic(err)
}
}
服务端辅助客户端缓存
Rueidis 默认开启了服务端辅助客户端缓存的 opt-in 模式,开发者可以通过 DoCache()
或 DoMultiCache()
方法使用该功能。
client.DoCache(ctx, client.B().Hmget().Key("mk").Field("1", "2").Cache(), time.Minute).ToArray()
client.DoMultiCache(ctx,
rueidis.CT(client.B().Get().Key("k1").Cache(), 1*time.Minute),
rueidis.CT(client.B().Get().Key("k2").Cache(), 2*time.Minute))
上下文取消
client.Do()
、client.DoMulti()
、client.DoCache()
和 client.DoMultiCache()
方法都支持上下文取消,如果上下文被取消或超时,这些方法会提前返回。
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error() == context.DeadlineExceeded
发布/订阅
Rueidis 提供了 client.Receive()
方法来接收来自 Redis 频道的信息。
err = client.Receive(context.Background(), client.B().Subscribe().Channel("ch1", "ch2").Build(), func(msg rueidis.PubSubMessage) {
// 处理接收到的信息
})
CAS 事务
Rueidis 支持 CAS 事务,即 WATCH
+ MULTI
+ EXEC
操作。
client.Dedicated(func(c rueidis.DedicatedClient) error {
c.Do(ctx, c.B().Watch().Key("k1", "k2").Build())
c.Do(ctx, c.B().Mget().Key("k1", "k2").Build())
c.DoMulti(
ctx,
c.B().Multi().Build(),
c.B().Set().Key("k1").Value("1").Build(),
c.B().Set().Key("k2").Value("2").Build(),
c.B().Exec().Build(),
)
return nil
})
Lua 脚本
Rueidis 提供了 NewLuaScript
和 NewLuaScriptReadOnly
方法来创建 Lua 脚本。
script := rueidis.NewLuaScript("return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}")
list, err := script.Exec(ctx, client, []string{"k1", "k2"}, []string{"a1", "a2"}).ToArray()
流式读取
Rueidis 提供了 client.DoStream()
和 client.DoMultiStream()
方法来流式读取 Redis 的响应数据。
s := client.DoMultiStream(ctx, client.B().Get().Key("a{slot1}").Build(), client.B().Get().Key("b{slot1}").Build())
for s.HasNext() {
n, err := s.WriteTo(io.Discard)
if rueidis.IsRedisNil(err) {
// ...
}
}
内存消耗
Rueidis 的每个底层连接都会分配一个环形缓冲区用于流水线操作,缓冲区的大小由 ClientOption.RingScaleEachConn
选项控制,默认值为 10,即每个环形缓冲区的大小为 2^10 字节。
如果 Rueidis 连接过多,可能会占用大量内存。在这种情况下,可以考虑将 ClientOption.RingScaleEachConn
选项的值减小到 8 或 9,但这样做可能会降低吞吐量。
创建 Redis 客户端
可以使用 NewClient
函数创建 Rueidis 客户端,并指定各种选项。
// 连接到单个 Redis 节点
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
})
// 连接到 Redis 集群
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
ShuffleInit: true,
})
Redis URL
可以使用 ParseURL
或 MustParseURL
函数解析 Redis URL,并将其转换为 ClientOption
结构体。
// 连接到 Redis 集群
client, err = rueidis.NewClient(rueidis.MustParseURL("redis://127.0.0.1:7001?addr=127.0.0.1:7002&addr=127.0.0.1:7003"))
任意命令
如果需要构建 Rueidis 命令构建器中没有提供的 Redis 命令,可以使用 client.B().Arbitrary()
方法。
// 这将生成 [ANY CMD k1 k2 a1 a2] 命令
client.B().Arbitrary("ANY", "CMD").Keys("k1", "k2").Args("a1", "a2").Build()
处理 JSON、原始字节数组和向量相似度搜索
Rueidis 命令构建器将所有参数都视为 Redis 字符串,这意味着开发者可以将 []byte
直接存储到 Redis 中,而无需进行转换。
client.B().Set().Key("b").Value(rueidis.BinaryString([]byte{...})).Build()
命令响应解析
Rueidis 提供了一系列方法来解析 Redis 命令的响应数据,例如 ToString()
、AsInt64()
、ToArray()
等。
// GET 命令
client.Do(ctx, client.B().Get().Key("k").Build()).ToString()
client.Do(ctx, client.B().Get().Key("k").Build()).AsInt64()
使用 DecodeSliceOfJSON 解析 JSON 数组
DecodeSliceOfJSON
函数可以将 Redis 响应中的 JSON 数组解析为 Go 结构体切片。
type User struct {
Name string `json:"name"`
}
// ...
var users []*User
if err := rueidis.DecodeSliceOfJSON(client.Do(ctx, client.B().Mget().Key("user1", "user2").Build()), &users); err != nil {
return err
}