From 94f091fa3616838665fe4faca01fe0ac2eee0480 Mon Sep 17 00:00:00 2001 From: Cyka <1354250064mdzz@gmail.com> Date: Thu, 25 Jul 2024 23:39:17 +0800 Subject: [PATCH] refactor: experiment lock --- pkg/batch/cmd/server/server.go | 2 +- .../jobs/calculator/experiment_calculate.go | 19 ++-- pkg/batch/jobs/calculator/experiment_lock.go | 58 ++++++++++++ pkg/lock/distributed_lock.go | 16 ++-- pkg/lock/distributed_lock_test.go | 93 ++++++++++++++++--- 5 files changed, 151 insertions(+), 37 deletions(-) create mode 100644 pkg/batch/jobs/calculator/experiment_lock.go diff --git a/pkg/batch/cmd/server/server.go b/pkg/batch/cmd/server/server.go index c95f54357..a6f832739 100644 --- a/pkg/batch/cmd/server/server.go +++ b/pkg/batch/cmd/server/server.go @@ -481,7 +481,7 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L experimentClient, eventCounterClient, mysqlClient, - nonPersistentRedisClient, + calculator.NewExperimentLock(nonPersistentRedisClient), location, jobs.WithTimeout(5*time.Minute), jobs.WithLogger(logger), diff --git a/pkg/batch/jobs/calculator/experiment_calculate.go b/pkg/batch/jobs/calculator/experiment_calculate.go index 02f6a506e..c5228d56a 100644 --- a/pkg/batch/jobs/calculator/experiment_calculate.go +++ b/pkg/batch/jobs/calculator/experiment_calculate.go @@ -30,25 +30,21 @@ import ( "github.com/bucketeer-io/bucketeer/pkg/experimentcalculator/domain" "github.com/bucketeer-io/bucketeer/pkg/experimentcalculator/experimentcalc" "github.com/bucketeer-io/bucketeer/pkg/experimentcalculator/stan" - "github.com/bucketeer-io/bucketeer/pkg/lock" "github.com/bucketeer-io/bucketeer/pkg/log" - redisv3 "github.com/bucketeer-io/bucketeer/pkg/redis/v3" "github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql" "github.com/bucketeer-io/bucketeer/proto/environment" "github.com/bucketeer-io/bucketeer/proto/experiment" ) const ( - day = 24 * 60 * 60 - lockTTL = time.Minute - lockPrefix = "experiment-calculate:" + day = 24 * 60 * 60 ) type experimentCalculate struct { environmentClient environmentclient.Client experimentClient experimentclient.Client calculator *experimentcalc.ExperimentCalculator - redisClient redisv3.Client + experimentLock *ExperimentLock location *time.Location opts *jobs.Options logger *zap.Logger @@ -60,7 +56,7 @@ func NewExperimentCalculate( experimentClient experimentclient.Client, ecClient ecclient.Client, mysqlClient mysql.Client, - redisClient redisv3.Client, + experimentLock *ExperimentLock, location *time.Location, opts ...jobs.Option, ) jobs.Job { @@ -85,10 +81,10 @@ func NewExperimentCalculate( environmentClient: environmentClient, experimentClient: experimentClient, calculator: calculator, + experimentLock: experimentLock, location: location, opts: dopts, logger: dopts.Logger.Named("experiment-calculate"), - redisClient: redisClient, } } @@ -147,10 +143,7 @@ func (e *experimentCalculate) calculateExperimentWithLock(ctx context.Context, env *environment.EnvironmentV2, experiment *experiment.Experiment, ) error { - lockKey := fmt.Sprintf("%s%s:%s", lockPrefix, env.Id, experiment.Id) - dl := lock.NewDistributedLock(e.redisClient, lockKey, lockTTL) - - locked, err := dl.Lock(ctx) + locked, lockValue, err := e.experimentLock.Lock(ctx, env.Id, experiment.Id) if err != nil { return fmt.Errorf("failed to acquire lock: %w", err) } @@ -163,7 +156,7 @@ func (e *experimentCalculate) calculateExperimentWithLock(ctx context.Context, } defer func() { - unlocked, unlockErr := dl.Unlock(ctx) + unlocked, unlockErr := e.experimentLock.Unlock(ctx, env.Id, experiment.Id, lockValue) if unlockErr != nil { e.logger.Error("Failed to release lock", zap.Error(unlockErr), diff --git a/pkg/batch/jobs/calculator/experiment_lock.go b/pkg/batch/jobs/calculator/experiment_lock.go new file mode 100644 index 000000000..eae359792 --- /dev/null +++ b/pkg/batch/jobs/calculator/experiment_lock.go @@ -0,0 +1,58 @@ +// Copyright 2024 The Bucketeer Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package calculator + +import ( + "context" + "fmt" + "time" + + "github.com/bucketeer-io/bucketeer/pkg/lock" + redisv3 "github.com/bucketeer-io/bucketeer/pkg/redis/v3" +) + +const ( + experimentLockPrefix = "experiment-calculate:" + experimentLockTTL = time.Minute +) + +// ExperimentLock represents a distributed lock for experiments +type ExperimentLock struct { + lock *lock.DistributedLock +} + +// NewExperimentLock creates a new ExperimentLock +func NewExperimentLock(client redisv3.Client) *ExperimentLock { + return &ExperimentLock{ + lock: lock.NewDistributedLock(client, experimentLockTTL), + } +} + +// Lock attempts to acquire the lock for a specific experiment +func (el *ExperimentLock) Lock(ctx context.Context, environmentID, experimentID string) (bool, string, error) { + lockKey := el.getLockKey(environmentID, experimentID) + return el.lock.Lock(ctx, lockKey) +} + +// Unlock releases the lock for a specific experiment +func (el *ExperimentLock) Unlock(ctx context.Context, environmentID, experimentID, value string) (bool, error) { + lockKey := el.getLockKey(environmentID, experimentID) + return el.lock.Unlock(ctx, lockKey, value) +} + +// getLockKey generates the lock key for a specific experiment +func (el *ExperimentLock) getLockKey(environmentID, experimentID string) string { + return fmt.Sprintf("%s%s:%s", experimentLockPrefix, environmentID, experimentID) +} diff --git a/pkg/lock/distributed_lock.go b/pkg/lock/distributed_lock.go index d87f750ed..01d524a1f 100644 --- a/pkg/lock/distributed_lock.go +++ b/pkg/lock/distributed_lock.go @@ -36,29 +36,27 @@ const ( // DistributedLock represents a distributed lock type DistributedLock struct { client redisv3.Client - key string - value string expiration time.Duration } // NewDistributedLock creates a new DistributedLock -func NewDistributedLock(client redisv3.Client, key string, expiration time.Duration) *DistributedLock { +func NewDistributedLock(client redisv3.Client, expiration time.Duration) *DistributedLock { return &DistributedLock{ client: client, - key: key, - value: uuid.New().String(), expiration: expiration, } } // Lock attempts to acquire the lock -func (dl *DistributedLock) Lock(ctx context.Context) (bool, error) { - return dl.client.SetNX(ctx, dl.key, dl.value, dl.expiration) +func (dl *DistributedLock) Lock(ctx context.Context, key string) (bool, string, error) { + value := uuid.New().String() + locked, err := dl.client.SetNX(ctx, key, value, dl.expiration) + return locked, value, err } // Unlock releases the lock -func (dl *DistributedLock) Unlock(ctx context.Context) (bool, error) { - cmd := dl.client.Eval(ctx, unlockScript, []string{dl.key}, dl.value) +func (dl *DistributedLock) Unlock(ctx context.Context, key string, value string) (bool, error) { + cmd := dl.client.Eval(ctx, unlockScript, []string{key}, value) res, err := cmd.Int() if err != nil { return false, err diff --git a/pkg/lock/distributed_lock_test.go b/pkg/lock/distributed_lock_test.go index 3c7a8f373..4819b76bf 100644 --- a/pkg/lock/distributed_lock_test.go +++ b/pkg/lock/distributed_lock_test.go @@ -22,7 +22,7 @@ import ( goredis "github.com/go-redis/redis" "github.com/stretchr/testify/assert" - gomock "go.uber.org/mock/gomock" + "go.uber.org/mock/gomock" "github.com/bucketeer-io/bucketeer/pkg/redis/v3/mock" ) @@ -32,12 +32,10 @@ func TestNewDistributedLock(t *testing.T) { defer ctrl.Finish() mockClient := mock.NewMockClient(ctrl) - lock := NewDistributedLock(mockClient, "test-key", time.Minute) + lock := NewDistributedLock(mockClient, time.Minute) assert.NotNil(t, lock) - assert.Equal(t, "test-key", lock.key) assert.Equal(t, time.Minute, lock.expiration) - assert.NotEmpty(t, lock.value) } func TestDistributedLock_Lock(t *testing.T) { @@ -45,7 +43,7 @@ func TestDistributedLock_Lock(t *testing.T) { defer ctrl.Finish() mockClient := mock.NewMockClient(ctrl) - lock := NewDistributedLock(mockClient, "test-key", time.Minute) + lock := NewDistributedLock(mockClient, time.Minute) ctx := context.Background() @@ -54,9 +52,10 @@ func TestDistributedLock_Lock(t *testing.T) { SetNX(ctx, "test-key", gomock.Any(), time.Minute). Return(true, nil) - acquired, err := lock.Lock(ctx) + acquired, value, err := lock.Lock(ctx, "test-key") assert.True(t, acquired) + assert.NotEmpty(t, value) assert.NoError(t, err) }) @@ -65,11 +64,25 @@ func TestDistributedLock_Lock(t *testing.T) { SetNX(ctx, "test-key", gomock.Any(), time.Minute). Return(false, nil) - acquired, err := lock.Lock(ctx) + acquired, value, err := lock.Lock(ctx, "test-key") assert.False(t, acquired) + assert.NotEmpty(t, value) assert.NoError(t, err) }) + + t.Run("error during lock acquisition", func(t *testing.T) { + mockClient.EXPECT(). + SetNX(ctx, "test-key", gomock.Any(), time.Minute). + Return(false, errors.New("redis error")) + + acquired, value, err := lock.Lock(ctx, "test-key") + + assert.False(t, acquired) + assert.NotEmpty(t, value) + assert.Error(t, err) + assert.Equal(t, "redis error", err.Error()) + }) } func TestDistributedLock_Unlock(t *testing.T) { @@ -77,17 +90,17 @@ func TestDistributedLock_Unlock(t *testing.T) { defer ctrl.Finish() mockClient := mock.NewMockClient(ctrl) - lock := NewDistributedLock(mockClient, "test-key", time.Minute) + lock := NewDistributedLock(mockClient, time.Minute) ctx := context.Background() t.Run("successful unlock", func(t *testing.T) { successCmd := goredis.NewCmdResult(int64(1), nil) mockClient.EXPECT(). - Eval(ctx, unlockScript, []string{"test-key"}, gomock.Any()). + Eval(ctx, unlockScript, []string{"test-key"}, "test-value"). Return(successCmd) - unlocked, err := lock.Unlock(ctx) + unlocked, err := lock.Unlock(ctx, "test-key", "test-value") assert.True(t, unlocked) assert.NoError(t, err) @@ -96,10 +109,10 @@ func TestDistributedLock_Unlock(t *testing.T) { t.Run("unsuccessful unlock", func(t *testing.T) { failCmd := goredis.NewCmdResult(int64(0), nil) mockClient.EXPECT(). - Eval(ctx, unlockScript, []string{"test-key"}, gomock.Any()). + Eval(ctx, unlockScript, []string{"test-key"}, "test-value"). Return(failCmd) - unlocked, err := lock.Unlock(ctx) + unlocked, err := lock.Unlock(ctx, "test-key", "test-value") assert.False(t, unlocked) assert.NoError(t, err) @@ -108,13 +121,65 @@ func TestDistributedLock_Unlock(t *testing.T) { t.Run("error during unlock", func(t *testing.T) { errorCmd := goredis.NewCmdResult(nil, errors.New("eval error")) mockClient.EXPECT(). - Eval(ctx, unlockScript, []string{"test-key"}, gomock.Any()). + Eval(ctx, unlockScript, []string{"test-key"}, "test-value"). Return(errorCmd) - unlocked, err := lock.Unlock(ctx) + unlocked, err := lock.Unlock(ctx, "test-key", "test-value") assert.False(t, unlocked) assert.Error(t, err) assert.Equal(t, "eval error", err.Error()) }) } + +func TestDistributedLock_MultipleKeys(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient := mock.NewMockClient(ctrl) + lock := NewDistributedLock(mockClient, time.Minute) + + ctx := context.Background() + + t.Run("lock and unlock multiple keys", func(t *testing.T) { + // Lock key1 + mockClient.EXPECT(). + SetNX(ctx, "key1", gomock.Any(), time.Minute). + Return(true, nil) + + acquired1, value1, err := lock.Lock(ctx, "key1") + assert.True(t, acquired1) + assert.NotEmpty(t, value1) + assert.NoError(t, err) + + // Lock key2 + mockClient.EXPECT(). + SetNX(ctx, "key2", gomock.Any(), time.Minute). + Return(true, nil) + + acquired2, value2, err := lock.Lock(ctx, "key2") + assert.True(t, acquired2) + assert.NotEmpty(t, value2) + assert.NoError(t, err) + + // Unlock key1 + successCmd1 := goredis.NewCmdResult(int64(1), nil) + mockClient.EXPECT(). + Eval(ctx, unlockScript, []string{"key1"}, value1). + Return(successCmd1) + + unlocked1, err := lock.Unlock(ctx, "key1", value1) + assert.True(t, unlocked1) + assert.NoError(t, err) + + // Unlock key2 + successCmd2 := goredis.NewCmdResult(int64(1), nil) + mockClient.EXPECT(). + Eval(ctx, unlockScript, []string{"key2"}, value2). + Return(successCmd2) + + unlocked2, err := lock.Unlock(ctx, "key2", value2) + assert.True(t, unlocked2) + assert.NoError(t, err) + }) +}