Skip to content

Commit

Permalink
feat(xmsgbus): 支持至少消费一次的 ack 机制
Browse files Browse the repository at this point in the history
  • Loading branch information
Ccheers committed Aug 28, 2024
1 parent 10d54f5 commit 537bf75
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 15 deletions.
12 changes: 6 additions & 6 deletions xmsgbus/impl/memory/msgbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (x *MsgBus) Push(ctx context.Context, topic string, bs []byte) error {
return nil
}

func (x *MsgBus) Pop(ctx context.Context, topic, channel string, blockTimeout time.Duration) ([]byte, error) {
func (x *MsgBus) Pop(ctx context.Context, topic, channel string, blockTimeout time.Duration) ([]byte, func(), error) {
x.mu.Lock()
if x.topicSet[topic] == nil {
x.topicSet[topic] = make(map[string]chan []byte)
Expand All @@ -48,18 +48,18 @@ func (x *MsgBus) Pop(ctx context.Context, topic, channel string, blockTimeout ti
if blockTimeout > 0 {
select {
case <-ctx.Done():
return nil, ctx.Err()
return nil, nil, ctx.Err()
case <-time.After(blockTimeout):
return nil, xmsgbus.ErrPopTimeout
return nil, nil, xmsgbus.ErrPopTimeout
case bs := <-ch:
return bs, nil
return bs, func() {}, nil
}
}
select {
case <-ctx.Done():
return nil, ctx.Err()
return nil, nil, ctx.Err()
case bs := <-ch:
return bs, nil
return bs, func() {}, nil
}
}

Expand Down
23 changes: 22 additions & 1 deletion xmsgbus/impl/redis/consts.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package redis

import "time"
import (
"fmt"
"time"
)

// msgBusSetKey 集合 key
func msgBusSetKey(topic string) string {
Expand All @@ -12,6 +15,24 @@ func msgBusListKey(topic string, channel string) string {
return "hmsgbus:list:v3:" + topic + ":" + channel
}

// Ack key
func msgBusAckKeyPrefix(tm time.Time) string {
const (
format = "2006-01-02-15-04"
)
return fmt.Sprintf("hmsgbus:hash:v3:ack:%s:", tm.Format(format))
}

// Ack key
func msgBusAckKey(tm time.Time, key string) string {
return msgBusAckKeyPrefix(tm) + key
}

// Ack key
func msgBusMonitorKey() string {
return "hmsgbus:nx:v3:monitor"
}

const (
tenMinute = time.Minute * 10
)
67 changes: 61 additions & 6 deletions xmsgbus/impl/redis/msgbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package redis

import (
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
"fmt"
"runtime/debug"
"strings"
"time"

Expand All @@ -11,12 +15,24 @@ import (
"github.com/go-redis/redis/v8"
)

type AckData struct {
ListKey string
Data string
}

type MsgBus struct {
client *redis.Client
}

func NewMsgBus(client *redis.Client) xmsgbus.IMsgBus {
return &MsgBus{client: client}
x := &MsgBus{client: client}
go func() {
for {
x.monitor(context.Background())
time.Sleep(time.Minute)
}
}()
return x
}

func (x *MsgBus) Push(ctx context.Context, topic string, bs []byte) error {
Expand All @@ -42,15 +58,25 @@ func (x *MsgBus) Push(ctx context.Context, topic string, bs []byte) error {
return nil
}

func (x *MsgBus) Pop(ctx context.Context, topic, channel string, blockTimeout time.Duration) ([]byte, error) {
strs, err := x.client.BLPop(ctx, blockTimeout, msgBusListKey(topic, channel)).Result()
func (x *MsgBus) Pop(ctx context.Context, topic, channel string, blockTimeout time.Duration) ([]byte, func(), error) {
listKey := msgBusListKey(topic, channel)
strs, err := x.client.BLPop(ctx, blockTimeout, listKey).Result()
if err != nil {
return nil, err
return nil, nil, err
}
if len(strs) < 1 {
return nil, xmsgbus.ErrNoData
return nil, nil, xmsgbus.ErrNoData
}
return []byte(strs[1]), nil
md5Bs := md5.Sum([]byte(strs[1]))
ackKey := msgBusAckKey(time.Now(), hex.EncodeToString(md5Bs[:]))
bs, _ := json.Marshal(AckData{
ListKey: listKey,
Data: strs[1],
})
x.client.Set(ctx, ackKey, bs, time.Minute*3)
return []byte(strs[1]), func() {
x.client.Del(ctx, ackKey)
}, nil
}

func (x *MsgBus) AddChannel(ctx context.Context, topic string, channel string) error {
Expand All @@ -69,3 +95,32 @@ func (x *MsgBus) RemoveChannel(ctx context.Context, topic string, channel string
func (x *MsgBus) ListChannel(ctx context.Context, topic string) ([]string, error) {
return x.client.SMembers(ctx, msgBusSetKey(topic)).Result()
}

func (x *MsgBus) monitor(ctx context.Context) {
defer func() {
r := recover()
if r != nil {
fmt.Printf("[MsgBus][redis] monitor panic: %v, stack:\n%s\n", r, debug.Stack())
}
}()
ok := x.client.SetNX(ctx, msgBusMonitorKey(), 1, time.Second*55).Val()
if !ok {
return
}

ackKeyPrefix := msgBusAckKeyPrefix(time.Now().Add(-time.Minute * 2))
keys, err := x.client.Keys(ctx, ackKeyPrefix+"*").Result()
if err != nil {
return
}
for _, key := range keys {
bs, err := x.client.Get(ctx, key).Bytes()
if err != nil {
continue
}
var ackData AckData
_ = json.Unmarshal(bs, &ackData)
x.client.Del(ctx, key)
x.client.RPush(ctx, ackData.ListKey, ackData.Data)
}
}
82 changes: 82 additions & 0 deletions xmsgbus/impl/redis/msgbus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package redis

import (
"context"
"crypto/md5"
"encoding/hex"
"errors"
"reflect"
"testing"
"time"

"github.com/go-redis/redis/v8"
)

func TestMsgBus_Pop(t *testing.T) {
ctx := context.TODO()
client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
DB: 0,
})
msg := []byte("test")
type fields struct {
client *redis.Client
}
type args struct {
ctx context.Context
topic string
channel string
blockTimeout time.Duration
}
tests := []struct {
name string
fields fields
args args
want []byte
want1 func()
wantErr bool
}{
{
name: "1",
fields: fields{
client: client,
},
args: args{
ctx: ctx,
topic: "test",
channel: "test2",
blockTimeout: 0,
},
want: msg,
want1: nil,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
x := &MsgBus{
client: tt.fields.client,
}
_ = x.AddChannel(ctx, tt.args.topic, tt.args.channel)
_ = x.Push(ctx, tt.args.topic, msg)
got, ack, err := x.Pop(tt.args.ctx, tt.args.topic, tt.args.channel, tt.args.blockTimeout)
if (err != nil) != tt.wantErr {
t.Errorf("Pop() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Pop() got = %v, want %v", got, tt.want)
}
// 如果没有 ack 则 有数据在

md5Bs := md5.Sum(msg)
ackKey := msgBusAckKey(time.Now(), hex.EncodeToString(md5Bs[:]))
bs := x.client.Get(ctx, ackKey).Val()
t.Logf("ack key: %s, ack value: %s", ackKey, bs)
ack()
if !errors.Is(x.client.Get(ctx, ackKey).Err(), redis.Nil) {
t.Errorf("ack failed, ack key: %s", ackKey)
}
})
}
}
2 changes: 1 addition & 1 deletion xmsgbus/msgbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type IMsgBus interface {
Push(ctx context.Context, topic string, bs []byte) error
// Pop 以阻塞的方式获取数据
// blockTimeout 为 0 则永久阻塞 直到 context 退出 或 数据到达
Pop(ctx context.Context, topic, channel string, blockTimeout time.Duration) ([]byte, error)
Pop(ctx context.Context, topic, channel string, blockTimeout time.Duration) (data []byte, ackFn func(), err error)
// AddChannel 为 topic 添加 channel
AddChannel(ctx context.Context, topic string, channel string) error
// RemoveChannel 删除 Channel, channel 下的数据也应该被删除
Expand Down
5 changes: 4 additions & 1 deletion xmsgbus/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,13 @@ func (x *Subscriber[T]) Handle(ctx context.Context) (err error) {
// 一次 handle 监听不超过 [30,45) 秒
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(30+rand.Intn(15)))
defer cancel()
bs, err := x.msgBus.Pop(timeoutCtx, x.topic, x.channel, 0)

bs, ack, err := x.msgBus.Pop(timeoutCtx, x.topic, x.channel, 0)
if err != nil {
return err
}
defer ack()

var dst Event
err = json.Unmarshal(bs, &dst)
if err != nil {
Expand Down

0 comments on commit 537bf75

Please sign in to comment.