Skip to content

Commit

Permalink
refactor: experiment lock
Browse files Browse the repository at this point in the history
  • Loading branch information
Ubisoft-potato committed Jul 25, 2024
1 parent 17cbe9b commit 94f091f
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 37 deletions.
2 changes: 1 addition & 1 deletion pkg/batch/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
experimentClient,
eventCounterClient,
mysqlClient,
nonPersistentRedisClient,
calculator.NewExperimentLock(nonPersistentRedisClient),
location,
jobs.WithTimeout(5*time.Minute),
jobs.WithLogger(logger),
Expand Down
19 changes: 6 additions & 13 deletions pkg/batch/jobs/calculator/experiment_calculate.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,21 @@ import (
"github.com/bucketeer-io/bucketeer/pkg/experimentcalculator/domain"
"github.com/bucketeer-io/bucketeer/pkg/experimentcalculator/experimentcalc"
"github.com/bucketeer-io/bucketeer/pkg/experimentcalculator/stan"
"github.com/bucketeer-io/bucketeer/pkg/lock"
"github.com/bucketeer-io/bucketeer/pkg/log"
redisv3 "github.com/bucketeer-io/bucketeer/pkg/redis/v3"
"github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql"
"github.com/bucketeer-io/bucketeer/proto/environment"
"github.com/bucketeer-io/bucketeer/proto/experiment"
)

const (
day = 24 * 60 * 60
lockTTL = time.Minute
lockPrefix = "experiment-calculate:"
day = 24 * 60 * 60
)

type experimentCalculate struct {
environmentClient environmentclient.Client
experimentClient experimentclient.Client
calculator *experimentcalc.ExperimentCalculator
redisClient redisv3.Client
experimentLock *ExperimentLock
location *time.Location
opts *jobs.Options
logger *zap.Logger
Expand All @@ -60,7 +56,7 @@ func NewExperimentCalculate(
experimentClient experimentclient.Client,
ecClient ecclient.Client,
mysqlClient mysql.Client,
redisClient redisv3.Client,
experimentLock *ExperimentLock,
location *time.Location,
opts ...jobs.Option,
) jobs.Job {
Expand All @@ -85,10 +81,10 @@ func NewExperimentCalculate(
environmentClient: environmentClient,
experimentClient: experimentClient,
calculator: calculator,
experimentLock: experimentLock,
location: location,
opts: dopts,
logger: dopts.Logger.Named("experiment-calculate"),
redisClient: redisClient,
}
}

Expand Down Expand Up @@ -147,10 +143,7 @@ func (e *experimentCalculate) calculateExperimentWithLock(ctx context.Context,
env *environment.EnvironmentV2,
experiment *experiment.Experiment,
) error {
lockKey := fmt.Sprintf("%s%s:%s", lockPrefix, env.Id, experiment.Id)
dl := lock.NewDistributedLock(e.redisClient, lockKey, lockTTL)

locked, err := dl.Lock(ctx)
locked, lockValue, err := e.experimentLock.Lock(ctx, env.Id, experiment.Id)
if err != nil {
return fmt.Errorf("failed to acquire lock: %w", err)
}
Expand All @@ -163,7 +156,7 @@ func (e *experimentCalculate) calculateExperimentWithLock(ctx context.Context,
}

defer func() {
unlocked, unlockErr := dl.Unlock(ctx)
unlocked, unlockErr := e.experimentLock.Unlock(ctx, env.Id, experiment.Id, lockValue)
if unlockErr != nil {
e.logger.Error("Failed to release lock",
zap.Error(unlockErr),
Expand Down
58 changes: 58 additions & 0 deletions pkg/batch/jobs/calculator/experiment_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2024 The Bucketeer Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package calculator

import (
"context"
"fmt"
"time"

"github.com/bucketeer-io/bucketeer/pkg/lock"
redisv3 "github.com/bucketeer-io/bucketeer/pkg/redis/v3"
)

const (
experimentLockPrefix = "experiment-calculate:"
experimentLockTTL = time.Minute
)

// ExperimentLock represents a distributed lock for experiments
type ExperimentLock struct {
lock *lock.DistributedLock
}

// NewExperimentLock creates a new ExperimentLock
func NewExperimentLock(client redisv3.Client) *ExperimentLock {
return &ExperimentLock{
lock: lock.NewDistributedLock(client, experimentLockTTL),
}
}

// Lock attempts to acquire the lock for a specific experiment
func (el *ExperimentLock) Lock(ctx context.Context, environmentID, experimentID string) (bool, string, error) {
lockKey := el.getLockKey(environmentID, experimentID)
return el.lock.Lock(ctx, lockKey)
}

// Unlock releases the lock for a specific experiment
func (el *ExperimentLock) Unlock(ctx context.Context, environmentID, experimentID, value string) (bool, error) {
lockKey := el.getLockKey(environmentID, experimentID)
return el.lock.Unlock(ctx, lockKey, value)
}

// getLockKey generates the lock key for a specific experiment
func (el *ExperimentLock) getLockKey(environmentID, experimentID string) string {
return fmt.Sprintf("%s%s:%s", experimentLockPrefix, environmentID, experimentID)
}
16 changes: 7 additions & 9 deletions pkg/lock/distributed_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,27 @@ const (
// DistributedLock represents a distributed lock
type DistributedLock struct {
client redisv3.Client
key string
value string
expiration time.Duration
}

// NewDistributedLock creates a new DistributedLock
func NewDistributedLock(client redisv3.Client, key string, expiration time.Duration) *DistributedLock {
func NewDistributedLock(client redisv3.Client, expiration time.Duration) *DistributedLock {
return &DistributedLock{
client: client,
key: key,
value: uuid.New().String(),
expiration: expiration,
}
}

// Lock attempts to acquire the lock
func (dl *DistributedLock) Lock(ctx context.Context) (bool, error) {
return dl.client.SetNX(ctx, dl.key, dl.value, dl.expiration)
func (dl *DistributedLock) Lock(ctx context.Context, key string) (bool, string, error) {
value := uuid.New().String()
locked, err := dl.client.SetNX(ctx, key, value, dl.expiration)
return locked, value, err
}

// Unlock releases the lock
func (dl *DistributedLock) Unlock(ctx context.Context) (bool, error) {
cmd := dl.client.Eval(ctx, unlockScript, []string{dl.key}, dl.value)
func (dl *DistributedLock) Unlock(ctx context.Context, key string, value string) (bool, error) {
cmd := dl.client.Eval(ctx, unlockScript, []string{key}, value)
res, err := cmd.Int()
if err != nil {
return false, err
Expand Down
93 changes: 79 additions & 14 deletions pkg/lock/distributed_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

goredis "github.com/go-redis/redis"
"github.com/stretchr/testify/assert"
gomock "go.uber.org/mock/gomock"
"go.uber.org/mock/gomock"

"github.com/bucketeer-io/bucketeer/pkg/redis/v3/mock"
)
Expand All @@ -32,20 +32,18 @@ func TestNewDistributedLock(t *testing.T) {
defer ctrl.Finish()

mockClient := mock.NewMockClient(ctrl)
lock := NewDistributedLock(mockClient, "test-key", time.Minute)
lock := NewDistributedLock(mockClient, time.Minute)

assert.NotNil(t, lock)
assert.Equal(t, "test-key", lock.key)
assert.Equal(t, time.Minute, lock.expiration)
assert.NotEmpty(t, lock.value)
}

func TestDistributedLock_Lock(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockClient := mock.NewMockClient(ctrl)
lock := NewDistributedLock(mockClient, "test-key", time.Minute)
lock := NewDistributedLock(mockClient, time.Minute)

ctx := context.Background()

Expand All @@ -54,9 +52,10 @@ func TestDistributedLock_Lock(t *testing.T) {
SetNX(ctx, "test-key", gomock.Any(), time.Minute).
Return(true, nil)

acquired, err := lock.Lock(ctx)
acquired, value, err := lock.Lock(ctx, "test-key")

assert.True(t, acquired)
assert.NotEmpty(t, value)
assert.NoError(t, err)
})

Expand All @@ -65,29 +64,43 @@ func TestDistributedLock_Lock(t *testing.T) {
SetNX(ctx, "test-key", gomock.Any(), time.Minute).
Return(false, nil)

acquired, err := lock.Lock(ctx)
acquired, value, err := lock.Lock(ctx, "test-key")

assert.False(t, acquired)
assert.NotEmpty(t, value)
assert.NoError(t, err)
})

t.Run("error during lock acquisition", func(t *testing.T) {
mockClient.EXPECT().
SetNX(ctx, "test-key", gomock.Any(), time.Minute).
Return(false, errors.New("redis error"))

acquired, value, err := lock.Lock(ctx, "test-key")

assert.False(t, acquired)
assert.NotEmpty(t, value)
assert.Error(t, err)
assert.Equal(t, "redis error", err.Error())
})
}

func TestDistributedLock_Unlock(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockClient := mock.NewMockClient(ctrl)
lock := NewDistributedLock(mockClient, "test-key", time.Minute)
lock := NewDistributedLock(mockClient, time.Minute)

ctx := context.Background()

t.Run("successful unlock", func(t *testing.T) {
successCmd := goredis.NewCmdResult(int64(1), nil)
mockClient.EXPECT().
Eval(ctx, unlockScript, []string{"test-key"}, gomock.Any()).
Eval(ctx, unlockScript, []string{"test-key"}, "test-value").
Return(successCmd)

unlocked, err := lock.Unlock(ctx)
unlocked, err := lock.Unlock(ctx, "test-key", "test-value")

assert.True(t, unlocked)
assert.NoError(t, err)
Expand All @@ -96,10 +109,10 @@ func TestDistributedLock_Unlock(t *testing.T) {
t.Run("unsuccessful unlock", func(t *testing.T) {
failCmd := goredis.NewCmdResult(int64(0), nil)
mockClient.EXPECT().
Eval(ctx, unlockScript, []string{"test-key"}, gomock.Any()).
Eval(ctx, unlockScript, []string{"test-key"}, "test-value").
Return(failCmd)

unlocked, err := lock.Unlock(ctx)
unlocked, err := lock.Unlock(ctx, "test-key", "test-value")

assert.False(t, unlocked)
assert.NoError(t, err)
Expand All @@ -108,13 +121,65 @@ func TestDistributedLock_Unlock(t *testing.T) {
t.Run("error during unlock", func(t *testing.T) {
errorCmd := goredis.NewCmdResult(nil, errors.New("eval error"))
mockClient.EXPECT().
Eval(ctx, unlockScript, []string{"test-key"}, gomock.Any()).
Eval(ctx, unlockScript, []string{"test-key"}, "test-value").
Return(errorCmd)

unlocked, err := lock.Unlock(ctx)
unlocked, err := lock.Unlock(ctx, "test-key", "test-value")

assert.False(t, unlocked)
assert.Error(t, err)
assert.Equal(t, "eval error", err.Error())
})
}

func TestDistributedLock_MultipleKeys(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockClient := mock.NewMockClient(ctrl)
lock := NewDistributedLock(mockClient, time.Minute)

ctx := context.Background()

t.Run("lock and unlock multiple keys", func(t *testing.T) {
// Lock key1
mockClient.EXPECT().
SetNX(ctx, "key1", gomock.Any(), time.Minute).
Return(true, nil)

acquired1, value1, err := lock.Lock(ctx, "key1")
assert.True(t, acquired1)
assert.NotEmpty(t, value1)
assert.NoError(t, err)

// Lock key2
mockClient.EXPECT().
SetNX(ctx, "key2", gomock.Any(), time.Minute).
Return(true, nil)

acquired2, value2, err := lock.Lock(ctx, "key2")
assert.True(t, acquired2)
assert.NotEmpty(t, value2)
assert.NoError(t, err)

// Unlock key1
successCmd1 := goredis.NewCmdResult(int64(1), nil)
mockClient.EXPECT().
Eval(ctx, unlockScript, []string{"key1"}, value1).
Return(successCmd1)

unlocked1, err := lock.Unlock(ctx, "key1", value1)
assert.True(t, unlocked1)
assert.NoError(t, err)

// Unlock key2
successCmd2 := goredis.NewCmdResult(int64(1), nil)
mockClient.EXPECT().
Eval(ctx, unlockScript, []string{"key2"}, value2).
Return(successCmd2)

unlocked2, err := lock.Unlock(ctx, "key2", value2)
assert.True(t, unlocked2)
assert.NoError(t, err)
})
}

0 comments on commit 94f091f

Please sign in to comment.