Skip to content

Commit

Permalink
node-operator: fix data race in executor (#2326)
Browse files Browse the repository at this point in the history
  • Loading branch information
elchead authored Sep 11, 2023
1 parent 92726da commit b3bb486
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 76 deletions.
35 changes: 21 additions & 14 deletions operators/constellation-node-operator/internal/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ type Controller interface {
Reconcile(ctx context.Context) (Result, error)
}

// Executor is a task executor / scheduler.
// taskExecutor is a task executor / scheduler.
// It will call the reconcile method of the given controller with a regular interval
// or when triggered externally.
type Executor struct {
type taskExecutor struct {
running atomic.Bool

// controller is the controller to be reconciled.
Expand All @@ -55,21 +55,31 @@ type Executor struct {
}

// New creates a new Executor.
func New(controller Controller, cfg Config) *Executor {
func New(controller Controller, cfg Config) Executor {
cfg.applyDefaults()
return &Executor{
return &taskExecutor{
controller: controller,
pollingFrequency: cfg.PollingFrequency,
rateLimiter: cfg.RateLimiter,
externalTrigger: make(chan struct{}, 1),
stop: make(chan struct{}, 1),
}
}

// Executor is a task executor / scheduler.
type Executor interface {
Start(ctx context.Context) StopWaitFn
Running() bool
Trigger()
}

// StopWaitFn is a function that can be called to stop the executor and wait for it to stop.
type StopWaitFn func()

// Start starts the executor in a separate go routine.
// Call Stop to stop the executor.
func (e *Executor) Start(ctx context.Context) StopWaitFn {
// IMPORTANT: The executor can only be started once.
func (e *taskExecutor) Start(ctx context.Context) StopWaitFn {
wg := &sync.WaitGroup{}
logr := log.FromContext(ctx)
stopWait := func() {
Expand All @@ -83,9 +93,6 @@ func (e *Executor) Start(ctx context.Context) StopWaitFn {
if !e.running.CompareAndSwap(false, true) {
return stopWait
}

e.externalTrigger = make(chan struct{}, 1)
e.stop = make(chan struct{}, 1)
// execute is used by the go routines below to communicate
// that a reconciliation should happen
execute := make(chan struct{}, 1)
Expand All @@ -99,6 +106,9 @@ func (e *Executor) Start(ctx context.Context) StopWaitFn {
// timer routine is responsible for triggering the reconciliation after the timer expires
// or when triggered externally
go func() {
defer func() {
e.running.Store(false)
}()
defer wg.Done()
defer close(execute)
defer logr.Info("Timer stopped")
Expand All @@ -119,9 +129,6 @@ func (e *Executor) Start(ctx context.Context) StopWaitFn {

// executor routine is responsible for executing the reconciliation
go func() {
defer func() {
e.running.Store(false)
}()
defer wg.Done()
defer close(nextScheduledReconcile)
defer logr.Info("Executor stopped")
Expand Down Expand Up @@ -157,7 +164,7 @@ func (e *Executor) Start(ctx context.Context) StopWaitFn {

// Stop stops the executor.
// It does not block until the executor is stopped.
func (e *Executor) Stop() {
func (e *taskExecutor) Stop() {
select {
case e.stop <- struct{}{}:
default:
Expand All @@ -167,13 +174,13 @@ func (e *Executor) Stop() {

// Running returns true if the executor is running.
// When the executor is stopped, it is not running anymore.
func (e *Executor) Running() bool {
func (e *taskExecutor) Running() bool {
return e.running.Load()
}

// Trigger triggers a reconciliation.
// If a reconciliation is already pending, this call is a no-op.
func (e *Executor) Trigger() {
func (e *taskExecutor) Trigger() {
select {
case e.externalTrigger <- struct{}{}:
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,14 @@ func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

func TestNew(t *testing.T) {
testCases := map[string]struct {
config Config
wantPollingFrequency time.Duration
}{
"applies default polling frequency": {
config: Config{},
wantPollingFrequency: defaultPollingFrequency,
},
"custom polling frequency": {
config: Config{
PollingFrequency: time.Hour,
},
wantPollingFrequency: time.Hour,
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
exec := New(nil, tc.config)
assert.Equal(tc.wantPollingFrequency, exec.pollingFrequency)
})
}
}

func TestStartTriggersImmediateReconciliation(t *testing.T) {
assert := assert.New(t)
ctrl := newStubController(Result{}, nil)
exec := Executor{
controller: ctrl,
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
rateLimiter: &stubRateLimiter{}, // no rate limiting
cfg := Config{
PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
RateLimiter: &stubRateLimiter{}, // no rate limiting
}
exec := New(ctrl, cfg)
// on start, the executor should trigger a reconciliation
stopAndWait := exec.Start(context.Background())
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called
Expand All @@ -68,11 +42,11 @@ func TestStartTriggersImmediateReconciliation(t *testing.T) {
func TestStartMultipleTimesIsCoalesced(t *testing.T) {
assert := assert.New(t)
ctrl := newStubController(Result{}, nil)
exec := Executor{
controller: ctrl,
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
rateLimiter: &stubRateLimiter{}, // no rate limiting
cfg := Config{
PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
RateLimiter: &stubRateLimiter{}, // no rate limiting
}
exec := New(ctrl, cfg)
// start once
stopAndWait := exec.Start(context.Background())
// start again multiple times
Expand All @@ -93,11 +67,11 @@ func TestErrorTriggersImmediateReconciliation(t *testing.T) {
assert := assert.New(t)
// returning an error should trigger a reconciliation immediately
ctrl := newStubController(Result{}, errors.New("reconciler error"))
exec := Executor{
controller: ctrl,
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
rateLimiter: &stubRateLimiter{}, // no rate limiting
cfg := Config{
PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
RateLimiter: &stubRateLimiter{}, // no rate limiting
}
exec := New(ctrl, cfg)
stopAndWait := exec.Start(context.Background())
for i := 0; i < 10; i++ {
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called
Expand All @@ -115,13 +89,13 @@ func TestErrorTriggersRateLimiting(t *testing.T) {
assert := assert.New(t)
// returning an error should trigger a reconciliation immediately
ctrl := newStubController(Result{}, errors.New("reconciler error"))
exec := Executor{
controller: ctrl,
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
rateLimiter: &stubRateLimiter{
cfg := Config{
PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
RateLimiter: &stubRateLimiter{
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
},
}
exec := New(ctrl, cfg)
stopAndWait := exec.Start(context.Background())
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called once to trigger rate limiting
ctrl.stop <- struct{}{}
Expand All @@ -139,13 +113,13 @@ func TestRequeueAfterResultRequeueInterval(t *testing.T) {
Requeue: true,
RequeueAfter: time.Microsecond,
}, nil)
exec := Executor{
controller: ctrl,
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
rateLimiter: &stubRateLimiter{
cfg := Config{
PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
RateLimiter: &stubRateLimiter{
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
},
}
exec := New(ctrl, cfg)
stopAndWait := exec.Start(context.Background())
for i := 0; i < 10; i++ {
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called
Expand All @@ -162,13 +136,13 @@ func TestRequeueAfterResultRequeueInterval(t *testing.T) {
func TestExternalTrigger(t *testing.T) {
assert := assert.New(t)
ctrl := newStubController(Result{}, nil)
exec := Executor{
controller: ctrl,
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
rateLimiter: &stubRateLimiter{
cfg := Config{
PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
RateLimiter: &stubRateLimiter{
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
},
}
exec := New(ctrl, cfg)
stopAndWait := exec.Start(context.Background())
<-ctrl.waitUntilReconciled // initial trigger
for i := 0; i < 10; i++ {
Expand All @@ -186,13 +160,13 @@ func TestExternalTrigger(t *testing.T) {
func TestSimultaneousExternalTriggers(t *testing.T) {
assert := assert.New(t)
ctrl := newStubController(Result{}, nil)
exec := Executor{
controller: ctrl,
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
rateLimiter: &stubRateLimiter{
cfg := Config{
PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
RateLimiter: &stubRateLimiter{
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
},
}
exec := New(ctrl, cfg)
stopAndWait := exec.Start(context.Background())
<-ctrl.waitUntilReconciled // initial trigger
for i := 0; i < 100; i++ {
Expand All @@ -212,11 +186,11 @@ func TestContextCancel(t *testing.T) {
assert := assert.New(t)
ctx, cancel := context.WithCancel(context.Background())
ctrl := newStubController(Result{}, nil)
exec := Executor{
controller: ctrl,
pollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
rateLimiter: &stubRateLimiter{}, // no rate limiting
cfg := Config{
PollingFrequency: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
RateLimiter: &stubRateLimiter{}, // no rate limiting
}
exec := New(ctrl, cfg)
_ = exec.Start(ctx) // no need to explicitly stop the executor, it will stop when the context is canceled
<-ctrl.waitUntilReconciled // initial trigger

Expand All @@ -238,13 +212,13 @@ func TestContextCancel(t *testing.T) {
func TestRequeueAfterPollingFrequency(t *testing.T) {
assert := assert.New(t)
ctrl := newStubController(Result{}, nil)
exec := Executor{
controller: ctrl,
pollingFrequency: time.Microsecond, // basically no delay
rateLimiter: &stubRateLimiter{
cfg := Config{
PollingFrequency: time.Microsecond, // basically no delay
RateLimiter: &stubRateLimiter{
whenRes: time.Hour * 24 * 365, // 1 year. Should be high enough to not trigger the timer in the test.
},
}
exec := New(ctrl, cfg)
stopAndWait := exec.Start(context.Background())
for i := 0; i < 10; i++ {
<-ctrl.waitUntilReconciled // makes sure to wait until reconcile was called
Expand Down

0 comments on commit b3bb486

Please sign in to comment.