一款支持自动流水线和客户端缓存的 Go 语言 Redis 客户端

源自开发者
源自开发者
发布于 2024-08-21 / 62 阅读
0
0

一款支持自动流水线和客户端缓存的 Go 语言 Redis 客户端

Rueidis 是一款高性能的 Go 语言 Redis 客户端,它支持自动流水线操作和服务端辅助客户端缓存等功能。Rueidis 的目标是提供一个简单易用、性能卓越的 Redis 客户端库,以满足 Go 开发者的各种需求。

主要功能

  • 自动流水线操作,提升非阻塞命令的执行效率
  • 服务端辅助客户端缓存,大幅降低延迟和提高吞吐量
  • 支持泛型对象映射和客户端缓存
  • 提供缓存策略模式(Cache-Aside pattern)的实现
  • 支持分布式锁和客户端缓存
  • 提供用于编写测试的 Rueidis 模拟库
  • 集成 OpenTelemetry,方便监控和追踪
  • 支持 Pub/Sub、分片 Pub/Sub 和 Streams
  • 兼容 Redis Cluster、Sentinel、RedisJSON、RedisBloom、RediSearch、RedisTimeseries 等
  • 提供不依赖 Redis Stack 的概率数据结构

快速入门

package main

import (
	"context"
	"github.com/redis/rueidis"
)

func main() {
	client, err := rueidis.NewClient(rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
	if err != nil {
		panic(err)
	}
	defer client.Close()

	ctx := context.Background()
	// SET key val NX
	err = client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error()
	// HGETALL hm
	hm, err := client.Do(ctx, client.B().Hgetall().Key("hm").Build()).AsStrMap()
}

命令构建器

Rueidis 提供了简单易用的命令构建器,方便开发者构建 Redis 命令。

client.B().Set().Key("key").Value("val").Nx().Build()

流水线操作

自动流水线操作

Rueidis 会自动将并发执行的非阻塞 Redis 命令进行流水线操作,以减少网络往返次数和系统调用次数,从而提高吞吐量。

func BenchmarkPipelining(b *testing.B, client rueidis.Client) {
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			client.Do(context.Background(), client.B().Get().Key("k").Build()).ToString()
		}
	})
}

手动流水线操作

除了自动流水线操作外,Rueidis 还支持手动流水线操作。

cmds := make(rueidis.Commands, 0, 10)
for i := 0; i < 10; i++ {
    cmds = append(cmds, client.B().Set().Key("key").Value("value").Build())
}
for _, resp := range client.DoMulti(ctx, cmds...) {
    if err := resp.Error(); err != nil {
        panic(err)
    }
}

服务端辅助客户端缓存

Rueidis 默认开启了服务端辅助客户端缓存的 opt-in 模式,开发者可以通过 DoCache()DoMultiCache() 方法使用该功能。

client.DoCache(ctx, client.B().Hmget().Key("mk").Field("1", "2").Cache(), time.Minute).ToArray()
client.DoMultiCache(ctx,
    rueidis.CT(client.B().Get().Key("k1").Cache(), 1*time.Minute),
    rueidis.CT(client.B().Get().Key("k2").Cache(), 2*time.Minute))

上下文取消

client.Do()client.DoMulti()client.DoCache()client.DoMultiCache() 方法都支持上下文取消,如果上下文被取消或超时,这些方法会提前返回。

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error() == context.DeadlineExceeded

发布/订阅

Rueidis 提供了 client.Receive() 方法来接收来自 Redis 频道的信息。

err = client.Receive(context.Background(), client.B().Subscribe().Channel("ch1", "ch2").Build(), func(msg rueidis.PubSubMessage) {
    // 处理接收到的信息
})

CAS 事务

Rueidis 支持 CAS 事务,即 WATCH + MULTI + EXEC 操作。

client.Dedicated(func(c rueidis.DedicatedClient) error {
    c.Do(ctx, c.B().Watch().Key("k1", "k2").Build())
    c.Do(ctx, c.B().Mget().Key("k1", "k2").Build())
    c.DoMulti(
        ctx,
        c.B().Multi().Build(),
        c.B().Set().Key("k1").Value("1").Build(),
        c.B().Set().Key("k2").Value("2").Build(),
        c.B().Exec().Build(),
    )
    return nil
})

Lua 脚本

Rueidis 提供了 NewLuaScriptNewLuaScriptReadOnly 方法来创建 Lua 脚本。

script := rueidis.NewLuaScript("return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}")
list, err := script.Exec(ctx, client, []string{"k1", "k2"}, []string{"a1", "a2"}).ToArray()

流式读取

Rueidis 提供了 client.DoStream()client.DoMultiStream() 方法来流式读取 Redis 的响应数据。

s := client.DoMultiStream(ctx, client.B().Get().Key("a{slot1}").Build(), client.B().Get().Key("b{slot1}").Build())
for s.HasNext() {
    n, err := s.WriteTo(io.Discard)
    if rueidis.IsRedisNil(err) {
        // ...
    }
}

内存消耗

Rueidis 的每个底层连接都会分配一个环形缓冲区用于流水线操作,缓冲区的大小由 ClientOption.RingScaleEachConn 选项控制,默认值为 10,即每个环形缓冲区的大小为 2^10 字节。

如果 Rueidis 连接过多,可能会占用大量内存。在这种情况下,可以考虑将 ClientOption.RingScaleEachConn 选项的值减小到 8 或 9,但这样做可能会降低吞吐量。

创建 Redis 客户端

可以使用 NewClient 函数创建 Rueidis 客户端,并指定各种选项。

// 连接到单个 Redis 节点
client, err := rueidis.NewClient(rueidis.ClientOption{
    InitAddress: []string{"127.0.0.1:6379"},
})

// 连接到 Redis 集群
client, err := rueidis.NewClient(rueidis.ClientOption{
    InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
    ShuffleInit: true,
})

Redis URL

可以使用 ParseURLMustParseURL 函数解析 Redis URL,并将其转换为 ClientOption 结构体。

// 连接到 Redis 集群
client, err = rueidis.NewClient(rueidis.MustParseURL("redis://127.0.0.1:7001?addr=127.0.0.1:7002&addr=127.0.0.1:7003"))

任意命令

如果需要构建 Rueidis 命令构建器中没有提供的 Redis 命令,可以使用 client.B().Arbitrary() 方法。

// 这将生成 [ANY CMD k1 k2 a1 a2] 命令
client.B().Arbitrary("ANY", "CMD").Keys("k1", "k2").Args("a1", "a2").Build()

处理 JSON、原始字节数组和向量相似度搜索

Rueidis 命令构建器将所有参数都视为 Redis 字符串,这意味着开发者可以将 []byte 直接存储到 Redis 中,而无需进行转换。

client.B().Set().Key("b").Value(rueidis.BinaryString([]byte{...})).Build()

命令响应解析

Rueidis 提供了一系列方法来解析 Redis 命令的响应数据,例如 ToString()AsInt64()ToArray() 等。

// GET 命令
client.Do(ctx, client.B().Get().Key("k").Build()).ToString()
client.Do(ctx, client.B().Get().Key("k").Build()).AsInt64()

使用 DecodeSliceOfJSON 解析 JSON 数组

DecodeSliceOfJSON 函数可以将 Redis 响应中的 JSON 数组解析为 Go 结构体切片。

type User struct {
	Name string `json:"name"`
}

// ...

var users []*User
if err := rueidis.DecodeSliceOfJSON(client.Do(ctx, client.B().Mget().Key("user1", "user2").Build()), &users); err != nil {
	return err
}

评论