Skip to content

Commit

Permalink
feat(msgbus): 兼容 redis v8 & redis v9
Browse files Browse the repository at this point in the history
  • Loading branch information
Ccheers committed Aug 29, 2024
1 parent 4b32249 commit f137dcf
Show file tree
Hide file tree
Showing 12 changed files with 558 additions and 254 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/philchia/agollo/v4 v4.1.3
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.19.0
github.com/redis/go-redis/v9 v9.6.1
github.com/satori/go.uuid v1.2.0
github.com/shirou/gopsutil v3.21.7+incompatible
github.com/stretchr/testify v1.9.0
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f h1:ZNv7
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
Expand Down Expand Up @@ -123,6 +127,8 @@ github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSz
github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package redis
package core

import (
"fmt"
Expand Down
31 changes: 31 additions & 0 deletions xmsgbus/impl/redis/core/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package core

import (
"context"
"fmt"
"time"
)

var (
ErrRPushAndExpire = fmt.Errorf("RPushAndExpire failed")
)

type IRedisClient interface {
SAdd(ctx context.Context, key string, members ...interface{}) error
SMembers(ctx context.Context, key string) ([]string, error)
SRem(ctx context.Context, key string, members ...interface{}) error

Get(ctx context.Context, key string) ([]byte, error)

Set(ctx context.Context, key string, value interface{}, expiration time.Duration) (string, error)
SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) (bool, error)
SetEX(ctx context.Context, key string, value interface{}, expiration time.Duration) error

Keys(ctx context.Context, pattern string) ([]string, error)

Del(ctx context.Context, keys ...string) error

BLPop(ctx context.Context, timeout time.Duration, keys ...string) ([]string, error)

RPushAndExpire(ctx context.Context, key string, value string, ttl time.Duration) error
}
124 changes: 124 additions & 0 deletions xmsgbus/impl/redis/core/msgbus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package core

import (
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
"fmt"
"runtime/debug"
"strings"
"time"

"github.com/ccheers/xpkg/generic/arrayx"
"github.com/ccheers/xpkg/xmsgbus"
)

type AckData struct {
ListKey string
Data string
}

type MsgBus struct {
client IRedisClient
}

func NewMsgBus(client IRedisClient) xmsgbus.IMsgBus {
x := &MsgBus{client: client}
go func() {
for {
x.monitor(context.Background())
time.Sleep(time.Minute)
}
}()
return x
}

func (x *MsgBus) Push(ctx context.Context, topic string, bs []byte) error {
channels, err := x.ListChannel(ctx, topic)
if err != nil {
return err
}
var errList []error
for _, channel := range channels {
key := msgBusListKey(topic, channel)
err = x.client.RPushAndExpire(ctx, key, string(bs), tenMinute)
if err != nil {
errList = append(errList, err)
}
}
if len(errList) > 0 {
err := fmt.Errorf("publish to %s failed: %v", topic, strings.Join(arrayx.Map(errList, func(err error) string {
return err.Error()
}), ". "))
return err
}
return nil
}

func (x *MsgBus) Pop(ctx context.Context, topic, channel string, blockTimeout time.Duration) ([]byte, func(), error) {
listKey := msgBusListKey(topic, channel)
strs, err := x.client.BLPop(ctx, blockTimeout, listKey)
if err != nil {
return nil, nil, err
}
if len(strs) < 1 {
return nil, nil, xmsgbus.ErrNoData
}
md5Bs := md5.Sum([]byte(strs[1]))
ackKey := msgBusAckKey(time.Now(), hex.EncodeToString(md5Bs[:]))
bs, _ := json.Marshal(AckData{
ListKey: listKey,
Data: strs[1],
})
x.client.Set(ctx, ackKey, bs, time.Minute*3)
return []byte(strs[1]), func() {
x.client.Del(ctx, ackKey)
}, nil
}

func (x *MsgBus) AddChannel(ctx context.Context, topic string, channel string) error {
return x.client.SAdd(ctx, msgBusSetKey(topic), channel)
}

func (x *MsgBus) RemoveChannel(ctx context.Context, topic string, channel string) error {
err := x.client.SRem(ctx, msgBusSetKey(topic), channel)
if err != nil {
return err
}
_ = x.client.Del(ctx, msgBusListKey(topic, channel))
return nil
}

func (x *MsgBus) ListChannel(ctx context.Context, topic string) ([]string, error) {
return x.client.SMembers(ctx, msgBusSetKey(topic))
}

func (x *MsgBus) monitor(ctx context.Context) {
defer func() {
r := recover()
if r != nil {
fmt.Printf("[MsgBus][redis] monitor panic: %v, stack:\n%s\n", r, debug.Stack())
}
}()
ok, _ := x.client.SetNX(ctx, msgBusMonitorKey(), 1, time.Second*55)
if !ok {
return
}

ackKeyPrefix := msgBusAckKeyPrefix(time.Now().Add(-time.Minute * 2))
keys, err := x.client.Keys(ctx, ackKeyPrefix+"*")
if err != nil {
return
}
for _, key := range keys {
bs, err := x.client.Get(ctx, key)
if err != nil {
continue
}
var ackData AckData
_ = json.Unmarshal(bs, &ackData)
x.client.Del(ctx, key)
_ = x.client.RPushAndExpire(ctx, ackData.ListKey, ackData.Data, tenMinute)
}
}
165 changes: 165 additions & 0 deletions xmsgbus/impl/redis/core/msgbus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package core

import (
"context"
"crypto/md5"
"encoding/hex"
"errors"
"reflect"
"testing"
"time"

"github.com/go-redis/redis/v8"
)

func TestMsgBus_Pop(t *testing.T) {
ctx := context.TODO()
client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
DB: 0,
})
msg := []byte("test")
type fields struct {
client IRedisClient
}
type args struct {
ctx context.Context
topic string
channel string
blockTimeout time.Duration
}
tests := []struct {
name string
fields fields
args args
want []byte
want1 func()
wantErr bool
}{
{
name: "1",
fields: fields{
client: NewRedisClientImplV8(client),
},
args: args{
ctx: ctx,
topic: "test",
channel: "test2",
blockTimeout: time.Second,
},
want: msg,
want1: nil,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
x := &MsgBus{
client: tt.fields.client,
}
_ = x.AddChannel(ctx, tt.args.topic, tt.args.channel)
for i := 0; i < 10; i++ {
_ = x.Push(ctx, tt.args.topic, msg)
got, ack, err := x.Pop(tt.args.ctx, tt.args.topic, tt.args.channel, tt.args.blockTimeout)
if (err != nil) != tt.wantErr {
t.Errorf("Pop() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Pop() got = %v, want %v", got, tt.want)
}
// 如果没有 ack 则 有数据在
md5Bs := md5.Sum(msg)
ackKey := msgBusAckKey(time.Now(), hex.EncodeToString(md5Bs[:]))
bs, _ := x.client.Get(ctx, ackKey)
t.Logf("ack key: %s, ack value: %s", ackKey, bs)
ack()
_, err = x.client.Get(ctx, ackKey)
if !errors.Is(err, redis.Nil) {
t.Errorf("ack failed, ack key: %s", ackKey)
}
}
})
}
}

type RedisClientImplV8 struct {
client *redis.Client
}

func NewRedisClientImplV8(client *redis.Client) IRedisClient {
return &RedisClientImplV8{client: client}
}

func (x *RedisClientImplV8) SAdd(ctx context.Context, key string, members ...interface{}) error {
return x.client.SAdd(ctx, key, members...).Err()
}

func (x *RedisClientImplV8) SMembers(ctx context.Context, key string) ([]string, error) {
return x.client.SMembers(ctx, key).Result()
}

func (x *RedisClientImplV8) SRem(ctx context.Context, key string, members ...interface{}) error {
return x.client.SRem(ctx, key, members...).Err()
}

func (x *RedisClientImplV8) Get(ctx context.Context, key string) ([]byte, error) {
return x.client.Get(ctx, key).Bytes()
}

func (x *RedisClientImplV8) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) (string, error) {
return x.client.Set(ctx, key, value, expiration).Result()
}

func (x *RedisClientImplV8) SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) (bool, error) {
return x.client.SetNX(ctx, key, value, expiration).Result()
}

func (x *RedisClientImplV8) SetEX(ctx context.Context, key string, value interface{}, expiration time.Duration) error {
return x.client.SetEX(ctx, key, value, expiration).Err()
}

func (x *RedisClientImplV8) Keys(ctx context.Context, pattern string) ([]string, error) {
return x.client.Keys(ctx, pattern).Result()
}

func (x *RedisClientImplV8) Del(ctx context.Context, keys ...string) error {
return x.client.Del(ctx, keys...).Err()
}

func (x *RedisClientImplV8) BLPop(ctx context.Context, timeout time.Duration, keys ...string) ([]string, error) {
return x.client.BLPop(ctx, timeout, keys...).Result()
}

func (x *RedisClientImplV8) RPushAndExpire(ctx context.Context, key string, value string, ttl time.Duration) error {
return x.rpushAndExpire(ctx, key, value, ttl)
}

const luaScript = `
local key = KEYS[1]
local value = ARGV[1]
local expiration = tonumber(ARGV[2])
local result = redis.call('RPUSH', key, value)
if result > 0 then
redis.call('EXPIRE', key, expiration)
return result
else
return 0 -- 表示操作失败
end
`

var (
rpushAndExpireScript = redis.NewScript(luaScript)
)

func (x *RedisClientImplV8) rpushAndExpire(ctx context.Context, key string, value string, ttl time.Duration) error {
result, err := rpushAndExpireScript.Run(ctx, x.client, []string{key}, value, int(ttl.Seconds())).Int()
if err != nil {
return err
}
if result == 0 {
return ErrRPushAndExpire
}
return nil
}
32 changes: 32 additions & 0 deletions xmsgbus/impl/redis/core/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package core

import (
"context"
"strings"
"time"

"github.com/ccheers/xpkg/xmsgbus"
)

type SharedStorage struct {
client IRedisClient
}

func NewSharedStorage(client IRedisClient) xmsgbus.ISharedStorage {
return &SharedStorage{client: client}
}

func (x *SharedStorage) SetEx(ctx context.Context, key string, value interface{}, ttl time.Duration) error {
return x.client.SetEX(ctx, key, value, ttl)
}

func (x *SharedStorage) Keys(ctx context.Context, prefix string) ([]string, error) {
if !strings.HasSuffix(prefix, "*") {
prefix += "*"
}
return x.client.Keys(ctx, prefix)
}

func (x *SharedStorage) Del(ctx context.Context, key string) error {
return x.client.Del(ctx, key)
}
Loading

0 comments on commit f137dcf

Please sign in to comment.