From 47367cdcb7572d49f43a4c0c184b5f676c9adffe Mon Sep 17 00:00:00 2001 From: eric <1048315650@qq.com> Date: Fri, 20 May 2022 14:44:19 +0800 Subject: [PATCH] =?UTF-8?q?feat(=E4=BA=92=E6=96=A5=E9=94=81[mutex]):=20?= =?UTF-8?q?=E4=BA=92=E6=96=A5=E9=94=81=20=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sync/try_lock/try_lock.distributed.go | 77 +++++++++++++++++++++++++++ sync/try_lock/try_locker.go | 45 ++++++++++++++++ 2 files changed, 122 insertions(+) create mode 100644 sync/try_lock/try_lock.distributed.go create mode 100644 sync/try_lock/try_locker.go diff --git a/sync/try_lock/try_lock.distributed.go b/sync/try_lock/try_lock.distributed.go new file mode 100644 index 0000000..33f8771 --- /dev/null +++ b/sync/try_lock/try_lock.distributed.go @@ -0,0 +1,77 @@ +package try_lock + +import ( + "fmt" + "math/rand" + "runtime" + "time" +) + +type distributedTryLocker struct { + cmd CASCommand + + key string + value string + + option *options +} + +type CASCommand interface { + CAS(key, src, dst string) bool +} + +type Option func(*options) + +func WithScheduleFunc(f func()) Option { + return func(o *options) { + o.schedule = f + } +} + +type options struct { + schedule Schedule +} + +type Schedule func() + +func defaultSchedule() { + time.Sleep(time.Millisecond * time.Duration((rand.Intn(20)+1)*10)) + runtime.Gosched() +} + +func NewDistributedTryLocker(cmd CASCommand, key, value string, opts ...Option) TryMutexLocker { + option := &options{ + schedule: defaultSchedule, + } + for _, opt := range opts { + opt(option) + } + return &distributedTryLocker{cmd: cmd, key: key, value: value, option: option} + +} + +func (r *distributedTryLocker) Unlock() { + r.cmd.CAS(r.key, r.value, "") +} + +func (r *distributedTryLocker) TryLock(duration time.Duration) error { + if r.cmd.CAS(r.key, "", r.value) { + return nil + } + if duration > 0 { + timeoutChan := time.After(duration) + for { + select { + case <-timeoutChan: + return fmt.Errorf("%w: key=%s value=%s", errGetLockTimeOut, r.key, r.value) + default: + if r.cmd.CAS(r.key, "", r.value) { + return nil + } + // 执行一次切换调度 + r.option.schedule() + } + } + } + return fmt.Errorf("%w: key=%s value=%s", errGetLockTimeOut, r.key, r.value) +} diff --git a/sync/try_lock/try_locker.go b/sync/try_lock/try_locker.go new file mode 100644 index 0000000..be1682d --- /dev/null +++ b/sync/try_lock/try_locker.go @@ -0,0 +1,45 @@ +package try_lock + +import ( + "errors" + "time" +) + +type TryMutexLocker interface { + Unlock() + TryLock(duration time.Duration) error +} + +type chMutex struct { + ch chan struct{} +} + +func NewChMutex() TryMutexLocker { + return &chMutex{ch: make(chan struct{}, 1)} +} + +var ( + errGetLockTimeOut = errors.New("get lock timeout") +) + +func (c *chMutex) TryLock(duration time.Duration) error { + if duration > 0 { + timeoutChan := time.After(duration) + select { + case <-timeoutChan: + return errGetLockTimeOut + case c.ch <- struct{}{}: + } + } else { + select { + case c.ch <- struct{}{}: + default: + return errGetLockTimeOut + } + } + return nil +} + +func (c *chMutex) Unlock() { + <-c.ch +}