Skip to content

Commit

Permalink
fix: do not pass context to run replay execute goroutine (#287)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryashbhardwaj authored Oct 23, 2024
1 parent 14e1c30 commit 069827c
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 23 deletions.
4 changes: 2 additions & 2 deletions core/scheduler/service/replay_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type ReplayValidator interface {
}

type ReplayExecutor interface {
Execute(ctx context.Context, replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName)
Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName)
FetchAndSyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error)
FetchRunsWithDetails(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec) (scheduler.JobRunDetailsList, error)
CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunWithDetails) []*scheduler.JobRunStatus
Expand Down Expand Up @@ -123,7 +123,7 @@ func (r *ReplayService) CreateReplay(ctx context.Context, t tenant.Tenant, jobNa
State: scheduler.ReplayStateCreated,
})

go r.executor.Execute(context.Background(), replayID, replayReq.Tenant(), jobName)
go r.executor.Execute(replayID, replayReq.Tenant(), jobName) //nolint:contextcheck

return replayID, nil
}
Expand Down
8 changes: 4 additions & 4 deletions core/scheduler/service/replay_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestReplayService(t *testing.T) {
jobRepository.On("GetJob", ctx, projName, jobName).Return(&job, nil)
replayValidator.On("Validate", ctx, replayReq, jobCron).Return(nil)
replayRepository.On("RegisterReplay", ctx, replayReq, replayRuns).Return(replayID, nil)
replayWorker.On("Execute", ctx, replayID, tnnt, jobName).Return().Maybe()
replayWorker.On("Execute", replayID, tnnt, jobName).Return().Maybe()

alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestReplayService(t *testing.T) {
jobRepository.On("GetJob", ctx, projName, jobName).Return(&job, nil)
replayValidator.On("Validate", ctx, replayReq, jobCron).Return(nil)
replayRepository.On("RegisterReplay", ctx, replayReq, replayRuns).Return(replayID, nil)
replayWorker.On("Execute", ctx, replayID, tnnt, jobName).Return().Maybe()
replayWorker.On("Execute", replayID, tnnt, jobName).Return().Maybe()

alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
Expand Down Expand Up @@ -866,8 +866,8 @@ type ReplayExecutor struct {
}

// Execute provides a mock function with given fields: ctx, replayRequest
func (_m *ReplayExecutor) Execute(ctx context.Context, replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName) {
_m.Called(ctx, replayID, jobTenant, jobName)
func (_m *ReplayExecutor) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName) {
_m.Called(replayID, jobTenant, jobName)
}

func (_m *ReplayExecutor) FetchAndSyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) {
Expand Down
6 changes: 3 additions & 3 deletions core/scheduler/service/replay_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func NewReplayWorker(logger log.Logger, replayRepository ReplayRepository, jobRe
}
}

func (w *ReplayWorker) Execute(ctxBack context.Context, replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName) {
ctx, cancelFn := context.WithTimeout(ctxBack, time.Minute*time.Duration(w.config.ReplayTimeoutInMinutes))
func (w *ReplayWorker) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName) {
ctx, cancelFn := context.WithTimeout(context.Background(), time.Minute*time.Duration(w.config.ReplayTimeoutInMinutes))
defer cancelFn()

w.logger.Info("[ReplayID: %s] starting to execute replay", replayID)
Expand Down Expand Up @@ -384,7 +384,7 @@ func (w *ReplayWorker) ScanReplayRequest(ctx context.Context) {
}
requestsToProcess := w.getRequestsToProcess(ctx, replays)
for _, req := range requestsToProcess {
go w.Execute(ctx, req.ID(), req.Tenant(), req.JobName())
go w.Execute(req.ID(), req.Tenant(), req.JobName()) //nolint:contextcheck
}
}
}
Expand Down
27 changes: 13 additions & 14 deletions core/scheduler/service/replay_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
func TestReplayWorker(t *testing.T) {
logger := log.NewNoop()
now := time.Now()
ctx := context.Background()
replayServerConfig := config.ReplayConfig{
ExecutionIntervalInSeconds: 1,
ReplayTimeoutInMinutes: 5,
Expand Down Expand Up @@ -117,7 +116,7 @@ func TestReplayWorker(t *testing.T) {
defer alertManager.AssertExpectations(t)

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(ctx, replayID, tnnt, jobAName)
worker.Execute(replayID, tnnt, jobAName)
})
t.Run("should able to process sequential replay request with multiple run", func(t *testing.T) {
replayRepository := new(ReplayRepository)
Expand Down Expand Up @@ -202,7 +201,7 @@ func TestReplayWorker(t *testing.T) {
defer alertManager.AssertExpectations(t)

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(ctx, replayID, tnnt, jobAName)
worker.Execute(replayID, tnnt, jobAName)
})
t.Run("should able to process parallel replay request", func(t *testing.T) {
replayRepository := new(ReplayRepository)
Expand Down Expand Up @@ -285,7 +284,7 @@ func TestReplayWorker(t *testing.T) {
defer alertManager.AssertExpectations(t)

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(ctx, replayID, tnnt, jobAName)
worker.Execute(replayID, tnnt, jobAName)
})

t.Run("should able to process replay request with sequential mode and creating non existing runs", func(t *testing.T) {
Expand Down Expand Up @@ -368,7 +367,7 @@ func TestReplayWorker(t *testing.T) {
alertManager.On("SendReplayEvent", mock.Anything).Return()
defer alertManager.AssertExpectations(t)
worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(ctx, replayID, tnnt, jobAName)
worker.Execute(replayID, tnnt, jobAName)
})
t.Run("should able to process replay request with parallel mode and creating non existing runs", func(t *testing.T) {
replayRepository := new(ReplayRepository)
Expand Down Expand Up @@ -451,7 +450,7 @@ func TestReplayWorker(t *testing.T) {
defer alertManager.AssertExpectations(t)

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(ctx, replayID, tnnt, jobAName)
worker.Execute(replayID, tnnt, jobAName)
})

t.Run("should able to update replay state as failed if unable to get job details", func(t *testing.T) {
Expand Down Expand Up @@ -480,7 +479,7 @@ func TestReplayWorker(t *testing.T) {
errorMsgToStore := "internal error for entity replay: unable to get job details for jobName: job-a, project: proj: internal error"
replayRepository.On("UpdateReplayStatus", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateFailed, errorMsgToStore).Return(nil).Once()
worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, nil)
worker.Execute(ctx, replayID, tnnt, jobAName)
worker.Execute(replayID, tnnt, jobAName)
})
t.Run("should able to update replay state as failed if unable to get replay by id", func(t *testing.T) {
replayRepository := new(ReplayRepository)
Expand Down Expand Up @@ -514,7 +513,7 @@ func TestReplayWorker(t *testing.T) {
alertManager.On("SendReplayEvent", mock.Anything).Return()
defer alertManager.AssertExpectations(t)
worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(ctx, replayID, tnnt, jobAName)
worker.Execute(replayID, tnnt, jobAName)
})
t.Run("should able to update replay state as failed if unable to fetch job runs", func(t *testing.T) {
replayRepository := new(ReplayRepository)
Expand Down Expand Up @@ -549,7 +548,7 @@ func TestReplayWorker(t *testing.T) {
alertManager.On("SendReplayEvent", mock.Anything).Return()
defer alertManager.AssertExpectations(t)
worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(ctx, replayID, tnnt, jobAName)
worker.Execute(replayID, tnnt, jobAName)
})
t.Run("should able to update replay state as failed if unable to update replay once it is synced", func(t *testing.T) {
replayRepository := new(ReplayRepository)
Expand Down Expand Up @@ -591,7 +590,7 @@ func TestReplayWorker(t *testing.T) {
defer alertManager.AssertExpectations(t)

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(ctx, replayID, tnnt, jobAName)
worker.Execute(replayID, tnnt, jobAName)
})
t.Run("should able to update replay state as failed if unable to do clear batch of runs", func(t *testing.T) {
replayRepository := new(ReplayRepository)
Expand Down Expand Up @@ -635,7 +634,7 @@ func TestReplayWorker(t *testing.T) {
alertManager.On("SendReplayEvent", mock.Anything).Return()
defer alertManager.AssertExpectations(t)
worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(ctx, replayID, tnnt, jobAName)
worker.Execute(replayID, tnnt, jobAName)
})
t.Run("should able to update replay state as failed if unable to create missing run", func(t *testing.T) {
replayRepository := new(ReplayRepository)
Expand Down Expand Up @@ -681,7 +680,7 @@ func TestReplayWorker(t *testing.T) {
defer alertManager.AssertExpectations(t)

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(ctx, replayID, tnnt, jobAName)
worker.Execute(replayID, tnnt, jobAName)
})

t.Run("should able to still process replay if some of the runs are in failed state", func(t *testing.T) {
Expand Down Expand Up @@ -765,7 +764,7 @@ func TestReplayWorker(t *testing.T) {
alertManager.On("SendReplayEvent", mock.Anything).Return()
defer alertManager.AssertExpectations(t)
worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(ctx, replayID, tnnt, jobAName)
worker.Execute(replayID, tnnt, jobAName)
})

t.Run("should able to stop replay process if it is being terminated externally", func(t *testing.T) {
Expand Down Expand Up @@ -797,7 +796,7 @@ func TestReplayWorker(t *testing.T) {
defer alertManager.AssertExpectations(t)

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(ctx, replayID, tnnt, jobAName)
worker.Execute(replayID, tnnt, jobAName)
})
})
}
Expand Down

0 comments on commit 069827c

Please sign in to comment.