diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 7f1ef7a..21c0c15 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -10,6 +10,10 @@ jobs: env: ETCD_LISTEN_CLIENT_URLS: http://0.0.0.0:2379 ETCD_ADVERTISE_CLIENT_URLS: http://0.0.0.0:2379 + redis: + image: docker.io/redis:latest + ports: + - 6379:6379 strategy: matrix: go-version: [1.21.x, 1.22.x] @@ -28,9 +32,9 @@ jobs: run: "go vet ./..." - name: Install Staticcheck - uses: dominikh/staticcheck-action@v1.2.0 + uses: dominikh/staticcheck-action@v1 with: - version: "2022.1.1" + version: "latest" install-go: false cache-key: ${{ matrix.go-version }} - name: Staticcheck diff --git a/go.mod b/go.mod index 79dc6fb..6e2abfa 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,6 @@ module github.com/ccheers/xpkg -go 1.21 - -toolchain go1.21.4 +go 1.22 require ( github.com/BurntSushi/toml v0.4.1 diff --git a/stat/metric/rolling_policy_test.go b/stat/metric/rolling_policy_test.go index 2792605..32f4700 100644 --- a/stat/metric/rolling_policy_test.go +++ b/stat/metric/rolling_policy_test.go @@ -2,7 +2,6 @@ package metric import ( "fmt" - "math/rand" "testing" "time" ) @@ -33,8 +32,6 @@ func Handler(t *testing.T, table []map[string][]int) { } func TestRollingPolicy_Add(t *testing.T) { - rand.Seed(time.Now().Unix()) - // test add after 400ms and 601ms relative to the policy created time policy := GetRollingPolicy() time.Sleep(400 * time.Millisecond) diff --git a/testing/internal/cmd/testgen/mockgen/parser.go b/testing/internal/cmd/testgen/mockgen/parser.go index db86a14..f836e62 100644 --- a/testing/internal/cmd/testgen/mockgen/parser.go +++ b/testing/internal/cmd/testgen/mockgen/parser.go @@ -234,6 +234,7 @@ func (p *fileParser) parsePackage(path string) (*fileParser, error) { srcDir: p.srcDir, } + //lint:ignore SA1019 跳过检查 var pkgs map[string]*ast.Package if imp, err := build.Import(path, newP.srcDir, build.FindOnly); err != nil { return nil, err diff --git a/xmsgbus/impl/memory/msgbus.go b/xmsgbus/impl/memory/msgbus.go index dd18951..09e89ae 100644 --- a/xmsgbus/impl/memory/msgbus.go +++ b/xmsgbus/impl/memory/msgbus.go @@ -35,7 +35,7 @@ func (x *MsgBus) Push(ctx context.Context, topic string, bs []byte) error { return nil } -func (x *MsgBus) Pop(ctx context.Context, topic, channel string, blockTimeout time.Duration) ([]byte, error) { +func (x *MsgBus) Pop(ctx context.Context, topic, channel string, blockTimeout time.Duration) ([]byte, func(), error) { x.mu.Lock() if x.topicSet[topic] == nil { x.topicSet[topic] = make(map[string]chan []byte) @@ -48,18 +48,18 @@ func (x *MsgBus) Pop(ctx context.Context, topic, channel string, blockTimeout ti if blockTimeout > 0 { select { case <-ctx.Done(): - return nil, ctx.Err() + return nil, nil, ctx.Err() case <-time.After(blockTimeout): - return nil, xmsgbus.ErrPopTimeout + return nil, nil, xmsgbus.ErrPopTimeout case bs := <-ch: - return bs, nil + return bs, func() {}, nil } } select { case <-ctx.Done(): - return nil, ctx.Err() + return nil, nil, ctx.Err() case bs := <-ch: - return bs, nil + return bs, func() {}, nil } } diff --git a/xmsgbus/impl/redis/consts.go b/xmsgbus/impl/redis/consts.go index d69d02a..60492dc 100644 --- a/xmsgbus/impl/redis/consts.go +++ b/xmsgbus/impl/redis/consts.go @@ -1,6 +1,9 @@ package redis -import "time" +import ( + "fmt" + "time" +) // msgBusSetKey 集合 key func msgBusSetKey(topic string) string { @@ -12,6 +15,24 @@ func msgBusListKey(topic string, channel string) string { return "hmsgbus:list:v3:" + topic + ":" + channel } +// Ack key +func msgBusAckKeyPrefix(tm time.Time) string { + const ( + format = "2006-01-02-15-04" + ) + return fmt.Sprintf("hmsgbus:hash:v3:ack:%s:", tm.Format(format)) +} + +// Ack key +func msgBusAckKey(tm time.Time, key string) string { + return msgBusAckKeyPrefix(tm) + key +} + +// Ack key +func msgBusMonitorKey() string { + return "hmsgbus:nx:v3:monitor" +} + const ( tenMinute = time.Minute * 10 ) diff --git a/xmsgbus/impl/redis/msgbus.go b/xmsgbus/impl/redis/msgbus.go index c45da52..9803ec6 100644 --- a/xmsgbus/impl/redis/msgbus.go +++ b/xmsgbus/impl/redis/msgbus.go @@ -2,7 +2,11 @@ package redis import ( "context" + "crypto/md5" + "encoding/hex" + "encoding/json" "fmt" + "runtime/debug" "strings" "time" @@ -11,12 +15,24 @@ import ( "github.com/go-redis/redis/v8" ) +type AckData struct { + ListKey string + Data string +} + type MsgBus struct { client *redis.Client } func NewMsgBus(client *redis.Client) xmsgbus.IMsgBus { - return &MsgBus{client: client} + x := &MsgBus{client: client} + go func() { + for { + x.monitor(context.Background()) + time.Sleep(time.Minute) + } + }() + return x } func (x *MsgBus) Push(ctx context.Context, topic string, bs []byte) error { @@ -42,15 +58,25 @@ func (x *MsgBus) Push(ctx context.Context, topic string, bs []byte) error { return nil } -func (x *MsgBus) Pop(ctx context.Context, topic, channel string, blockTimeout time.Duration) ([]byte, error) { - strs, err := x.client.BLPop(ctx, blockTimeout, msgBusListKey(topic, channel)).Result() +func (x *MsgBus) Pop(ctx context.Context, topic, channel string, blockTimeout time.Duration) ([]byte, func(), error) { + listKey := msgBusListKey(topic, channel) + strs, err := x.client.BLPop(ctx, blockTimeout, listKey).Result() if err != nil { - return nil, err + return nil, nil, err } if len(strs) < 1 { - return nil, xmsgbus.ErrNoData + return nil, nil, xmsgbus.ErrNoData } - return []byte(strs[1]), nil + md5Bs := md5.Sum([]byte(strs[1])) + ackKey := msgBusAckKey(time.Now(), hex.EncodeToString(md5Bs[:])) + bs, _ := json.Marshal(AckData{ + ListKey: listKey, + Data: strs[1], + }) + x.client.Set(ctx, ackKey, bs, time.Minute*3) + return []byte(strs[1]), func() { + x.client.Del(ctx, ackKey) + }, nil } func (x *MsgBus) AddChannel(ctx context.Context, topic string, channel string) error { @@ -69,3 +95,32 @@ func (x *MsgBus) RemoveChannel(ctx context.Context, topic string, channel string func (x *MsgBus) ListChannel(ctx context.Context, topic string) ([]string, error) { return x.client.SMembers(ctx, msgBusSetKey(topic)).Result() } + +func (x *MsgBus) monitor(ctx context.Context) { + defer func() { + r := recover() + if r != nil { + fmt.Printf("[MsgBus][redis] monitor panic: %v, stack:\n%s\n", r, debug.Stack()) + } + }() + ok := x.client.SetNX(ctx, msgBusMonitorKey(), 1, time.Second*55).Val() + if !ok { + return + } + + ackKeyPrefix := msgBusAckKeyPrefix(time.Now().Add(-time.Minute * 2)) + keys, err := x.client.Keys(ctx, ackKeyPrefix+"*").Result() + if err != nil { + return + } + for _, key := range keys { + bs, err := x.client.Get(ctx, key).Bytes() + if err != nil { + continue + } + var ackData AckData + _ = json.Unmarshal(bs, &ackData) + x.client.Del(ctx, key) + x.client.RPush(ctx, ackData.ListKey, ackData.Data) + } +} diff --git a/xmsgbus/impl/redis/msgbus_test.go b/xmsgbus/impl/redis/msgbus_test.go new file mode 100644 index 0000000..bf54e99 --- /dev/null +++ b/xmsgbus/impl/redis/msgbus_test.go @@ -0,0 +1,82 @@ +package redis + +import ( + "context" + "crypto/md5" + "encoding/hex" + "errors" + "reflect" + "testing" + "time" + + "github.com/go-redis/redis/v8" +) + +func TestMsgBus_Pop(t *testing.T) { + ctx := context.TODO() + client := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + DB: 0, + }) + msg := []byte("test") + type fields struct { + client *redis.Client + } + type args struct { + ctx context.Context + topic string + channel string + blockTimeout time.Duration + } + tests := []struct { + name string + fields fields + args args + want []byte + want1 func() + wantErr bool + }{ + { + name: "1", + fields: fields{ + client: client, + }, + args: args{ + ctx: ctx, + topic: "test", + channel: "test2", + blockTimeout: 0, + }, + want: msg, + want1: nil, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + x := &MsgBus{ + client: tt.fields.client, + } + _ = x.AddChannel(ctx, tt.args.topic, tt.args.channel) + _ = x.Push(ctx, tt.args.topic, msg) + got, ack, err := x.Pop(tt.args.ctx, tt.args.topic, tt.args.channel, tt.args.blockTimeout) + if (err != nil) != tt.wantErr { + t.Errorf("Pop() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("Pop() got = %v, want %v", got, tt.want) + } + // 如果没有 ack 则 有数据在 + + md5Bs := md5.Sum(msg) + ackKey := msgBusAckKey(time.Now(), hex.EncodeToString(md5Bs[:])) + bs := x.client.Get(ctx, ackKey).Val() + t.Logf("ack key: %s, ack value: %s", ackKey, bs) + ack() + if !errors.Is(x.client.Get(ctx, ackKey).Err(), redis.Nil) { + t.Errorf("ack failed, ack key: %s", ackKey) + } + }) + } +} diff --git a/xmsgbus/msgbus.go b/xmsgbus/msgbus.go index 8253d7a..6851c20 100644 --- a/xmsgbus/msgbus.go +++ b/xmsgbus/msgbus.go @@ -21,7 +21,7 @@ type IMsgBus interface { Push(ctx context.Context, topic string, bs []byte) error // Pop 以阻塞的方式获取数据 // blockTimeout 为 0 则永久阻塞 直到 context 退出 或 数据到达 - Pop(ctx context.Context, topic, channel string, blockTimeout time.Duration) ([]byte, error) + Pop(ctx context.Context, topic, channel string, blockTimeout time.Duration) (data []byte, ackFn func(), err error) // AddChannel 为 topic 添加 channel AddChannel(ctx context.Context, topic string, channel string) error // RemoveChannel 删除 Channel, channel 下的数据也应该被删除 diff --git a/xmsgbus/subscriber.go b/xmsgbus/subscriber.go index 7d8cd9b..9571498 100644 --- a/xmsgbus/subscriber.go +++ b/xmsgbus/subscriber.go @@ -112,10 +112,13 @@ func (x *Subscriber[T]) Handle(ctx context.Context) (err error) { // 一次 handle 监听不超过 [30,45) 秒 timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(30+rand.Intn(15))) defer cancel() - bs, err := x.msgBus.Pop(timeoutCtx, x.topic, x.channel, 0) + + bs, ack, err := x.msgBus.Pop(timeoutCtx, x.topic, x.channel, 0) if err != nil { return err } + defer ack() + var dst Event err = json.Unmarshal(bs, &dst) if err != nil {