Skip to content

Commit

Permalink
Allow resetting the finish notification of a runner (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
yahavi authored Dec 23, 2023
1 parent 6d742be commit ef57bd0
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 0 deletions.
19 changes: 19 additions & 0 deletions parallel/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Runner interface {
SetMaxParallel(int)
GetFinishedNotification() chan bool
SetFinishedNotification(bool)
ResetFinishNotificationIfActive()
}

type TaskFunc func(int) error
Expand Down Expand Up @@ -209,6 +210,24 @@ func (r *runner) SetFinishedNotification(toEnable bool) {
r.finishedNotificationEnabled = toEnable
}

// Recreates the finish notification channel.
// This method helps manage a scenario involving two runners: "1" assigns tasks to "2".
// Runner "2" might occasionally encounter periods without assigned tasks.
// As a result, the finish notification for "2" might be triggered.
// To tackle this issue, use the ResetFinishNotificationIfActive after all tasks assigned to "1" have been completed.
func (r *runner) ResetFinishNotificationIfActive() {
r.finishedNotifierLock.Lock()
defer r.finishedNotifierLock.Unlock()

// If no active threads, don't reset
if r.activeThreads.Load() == 0 && r.totalTasksInQueue.Load() == 0 || r.cancel.Load() {
return
}

r.finishedNotifier = make(chan bool, 1)
r.finishedNotifierChannelClosed = false
}

func (r *runner) SetMaxParallel(newVal int) {
if newVal < 1 {
newVal = 1
Expand Down
46 changes: 46 additions & 0 deletions parallel/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,49 @@ func TestMaxParallel(t *testing.T) {
runner.Run()
assert.Equal(t, uint32(capacity), runner.started)
}

func TestResetFinishNotificationIfActive(t *testing.T) {
// Create 2 runners
const capacity = 10
const parallelism = 3
runnerOne := NewRunner(parallelism, capacity, false)
runnerOne.SetFinishedNotification(true)
runnerTwo := NewRunner(parallelism, capacity, false)
runnerTwo.SetFinishedNotification(true)

// Add 10 tasks to runner one. Each task provides tasks to runner two.
for i := 0; i < capacity; i++ {
_, err := runnerOne.AddTask(func(int) error {
time.Sleep(time.Millisecond * 100)
_, err := runnerTwo.AddTask(func(int) error {
time.Sleep(time.Millisecond)
return nil
})
assert.NoError(t, err)
return nil
})
assert.NoError(t, err)
}

// Create a goroutine waiting for the finish notification of the first runner before running "Done".
go func() {
<-runnerOne.GetFinishedNotification()
runnerOne.Done()
}()

// Start running the second runner in a different goroutine to make it non-blocking.
go func() {
runnerTwo.Run()
}()

// Run the first runner. This is a blocking method.
runnerOne.Run()

// Reset runner two's finish notification to ensure we receive it only after all tasks assigned to runner two are completed.
runnerTwo.ResetFinishNotificationIfActive()

// Receive the finish notification and ensure that we have truly completed the task.
<-runnerTwo.GetFinishedNotification()
assert.Zero(t, runnerTwo.ActiveThreads())
runnerTwo.Done()
}

0 comments on commit ef57bd0

Please sign in to comment.