Skip to content

Commit

Permalink
fix: handle replay heartbeat bug (#292)
Browse files Browse the repository at this point in the history
* fix: handle replay heartbeat bug
  • Loading branch information
Mryashbhardwaj authored Oct 29, 2024
1 parent 55a3026 commit d937f02
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 19 deletions.
1 change: 1 addition & 0 deletions core/scheduler/service/replay_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type SchedulerRunGetter interface {
type ReplayRepository interface {
RegisterReplay(ctx context.Context, replay *scheduler.Replay, runs []*scheduler.JobRunStatus) (uuid.UUID, error)
UpdateReplay(ctx context.Context, replayID uuid.UUID, state scheduler.ReplayState, runs []*scheduler.JobRunStatus, message string) error
UpdateReplayHeartbeat(ctx context.Context, replayID uuid.UUID) error
UpdateReplayRuns(ctx context.Context, replayID uuid.UUID, runs []*scheduler.JobRunStatus) error
UpdateReplayStatus(ctx context.Context, replayID uuid.UUID, state scheduler.ReplayState, message string) error
CancelReplayRequest(ctx context.Context, replayID uuid.UUID, message string) error
Expand Down
5 changes: 5 additions & 0 deletions core/scheduler/service/replay_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,11 @@ func (_m *ReplayRepository) AcquireReplayRequest(ctx context.Context, replayID u
return args.Error(0)
}

func (_m *ReplayRepository) UpdateReplayHeartbeat(ctx context.Context, replayID uuid.UUID) error {
args := _m.Called(ctx, replayID)
return args.Error(0)
}

// GetReplayJobConfig provides a mock function with given fields: ctx, jobTenant, jobName, scheduledAt
func (_m *ReplayRepository) GetReplayJobConfig(ctx context.Context, jobTenant tenant.Tenant, jobName scheduler.JobName, scheduledAt time.Time) (map[string]string, error) {
ret := _m.Called(ctx, jobTenant, jobName, scheduledAt)
Expand Down
41 changes: 29 additions & 12 deletions core/scheduler/service/replay_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ const (
replaySyncMultiplier = 3
)

var replayWorkerLoopDuration = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "replay_worker_loop_duration",
Help: "how long it takes to process a replay loop",
}, []string{"replayID"})

var replayReqLag = promauto.NewGauge(prometheus.GaugeOpts{
Name: "replay_request_lag",
Help: "how old is the oldest unhandled replay request",
Expand Down Expand Up @@ -129,6 +134,7 @@ func (w *ReplayWorker) isReplayCanceled(ctx context.Context, replayID uuid.UUID)
func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUID, jobCron *cron.ScheduleSpec) error {
executionLoopCount := 0
for {
loopStartTime := time.Now()
select {
case <-ctx.Done():
w.logger.Error("[ReplayID: %s] deadline encountered...", replayID)
Expand All @@ -147,7 +153,7 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI
// sync run first
replayWithRun, err := w.replayRepo.GetReplayByID(ctx, replayID)
if err != nil {
w.logger.Error("[ReplayID: %s] unable to get existing runs, err: %s", replayID.String(), err.Error())
w.logger.Error("[ReplayID: %s] unable to get existing runs, err: %s", replayID, err.Error())
return err
}

Expand All @@ -160,26 +166,32 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI
JobURN: replayWithRun.Replay.JobName().GetJobURN(t),
State: replayWithRun.Replay.State(),
})
w.logger.Info("[ReplayID: %s] replay is externally terminated with status [%s]", replayWithRun.Replay.ID().String(), replayWithRun.Replay.State().String())
w.logger.Info("[ReplayID: %s] replay is externally terminated with status [%s]", replayID, replayWithRun.Replay.State().String())
return nil
}

if executionLoopCount == 1 {
err := w.replayRepo.UpdateReplayStatus(ctx, replayID, scheduler.ReplayStateInProgress, "started handling replay request")
if err != nil {
w.logger.Error("[ReplayID: %s] unable to set replay state in progress", replayID.String(), err)
w.logger.Error("[ReplayID: %s] unable to set replay state in progress", replayID, err)
return err
}
} else {
err := w.replayRepo.UpdateReplayHeartbeat(ctx, replayID)
if err != nil {
w.logger.Error("[ReplayID: %s] unable to update replay heartbeat err:: %s", replayID, err)
return err
}
}

syncedRunStatus, err := w.FetchAndSyncStatus(ctx, replayWithRun, jobCron)
if err != nil {
w.logger.Error("[ReplayID: %s] unable to get incoming runs: %s", replayWithRun.Replay.ID().String(), err)
w.logger.Error("[ReplayID: %s] unable to get incoming runs: %s", replayID, err)
return err
}

if err := w.replayRepo.UpdateReplayRuns(ctx, replayWithRun.Replay.ID(), syncedRunStatus); err != nil {
w.logger.Error("[ReplayID: %s] unable to update replay state to failed: %s", replayWithRun.Replay.ID(), err)
if err := w.replayRepo.UpdateReplayRuns(ctx, replayID, syncedRunStatus); err != nil {
w.logger.Error("[ReplayID: %s] unable to update replay state to failed: %s", replayID, err)
return err
}

Expand Down Expand Up @@ -207,6 +219,7 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI
w.logger.Info("[ReplayID: %s] replay is externally canceled", replayID.String())
return nil
}

var updatedRuns []*scheduler.JobRunStatus
if replayWithRun.Replay.Config().Parallel {
if err := w.replayRunOnScheduler(ctx, jobCron, replayWithRun.Replay, toBeReplayedRuns...); err != nil {
Expand All @@ -216,7 +229,7 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI
} else { // sequential should work when there's no in_progress state on existing runs
inProgressRuns := syncedRunStatus.GetSortedRunsByStates([]scheduler.State{scheduler.StateInProgress})
if len(inProgressRuns) > 0 {
w.logger.Info("[ReplayID: %s] %d run is in progress, skip sequential iteration", replayWithRun.Replay.ID(), len(inProgressRuns))
w.logger.Info("[ReplayID: %s] %d run is in progress, skip sequential iteration", replayID, len(inProgressRuns))
continue
}
if err := w.replayRunOnScheduler(ctx, jobCron, replayWithRun.Replay, toBeReplayedRuns[0]); err != nil {
Expand All @@ -226,10 +239,12 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI
}

// update runs status
if err := w.replayRepo.UpdateReplayRuns(ctx, replayWithRun.Replay.ID(), updatedRuns); err != nil {
w.logger.Error("[ReplayID: %s] unable to update replay runs: %s", replayWithRun.Replay.ID(), err)
if err := w.replayRepo.UpdateReplayRuns(ctx, replayID, updatedRuns); err != nil {
w.logger.Error("[ReplayID: %s] unable to update replay runs: %s", replayID, err)
return err
}

replayWorkerLoopDuration.WithLabelValues(replayID.String()).Set(time.Since(loopStartTime).Seconds())
}
}

Expand Down Expand Up @@ -401,15 +416,17 @@ func (w *ReplayWorker) getRequestsToProcess(ctx context.Context, replays []*sche
if lag.Seconds() > maxLag {
maxLag = lag.Seconds()
}
w.logger.Info(fmt.Sprintf("trying to acquired replay request with ID: %s", replay.ID()))
w.logger.Info(fmt.Sprintf("[ReplayID: %s] trying to acquire replay request", replay.ID()))
err := w.replayRepo.AcquireReplayRequest(ctx, replay.ID(), unhandledClassifierDuration)
if err != nil {
if errors.IsErrorType(err, errors.ErrNotFound) {
continue
}
w.logger.Error("unable to acquire lock on replay request err: %s", err.Error())
w.logger.Error("[ReplayID: %s] unable to acquire lock on replay request err: %s", replay.ID(), err.Error())
// return early with the requests that have been acquired, the errors has been logged and can be retried in next iteration
return requestsToProcess
}
w.logger.Info(fmt.Sprintf("successfully acquired replay request with ID: %s", replay.ID()))
w.logger.Info(fmt.Sprintf("[ReplayID: %s] successfully acquired replay request", replay.ID()))
requestsToProcess = append(requestsToProcess, replay)
}
replayReqLag.Set(maxLag)
Expand Down
11 changes: 11 additions & 0 deletions core/scheduler/service/replay_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func TestReplayWorker(t *testing.T) {
replayRepository.On("UpdateReplayRuns", mock.Anything, replayReq.Replay.ID(), runsPhaseThree).Return(nil).Once()
summaryMsg := "replay is finished with run status: success(1)"
replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateSuccess, runsPhaseThree, summaryMsg).Return(nil).Once()
replayRepository.On("UpdateReplayHeartbeat", mock.Anything, replayReq.Replay.ID()).Return(nil).Once()

alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
Expand Down Expand Up @@ -188,13 +189,15 @@ func TestReplayWorker(t *testing.T) {
replayRepository.On("UpdateReplayRuns", mock.Anything, replayReq.Replay.ID(), mock.Anything).Return(nil).Once()
sch.On("ClearBatch", mock.Anything, tnnt, jobAName, scheduledTime2.Add(-24*time.Hour), scheduledTime2.Add(-24*time.Hour)).Return(nil).Once()
replayRepository.On("UpdateReplayRuns", mock.Anything, replayReq.Replay.ID(), []*scheduler.JobRunStatus{runsPhase3[1]}).Return(nil).Once()
replayRepository.On("UpdateReplayHeartbeat", mock.Anything, replayReq.Replay.ID()).Return(nil).Once()

// loop 3
replayRepository.On("GetReplayByID", mock.Anything, replayReq.Replay.ID()).Return(replayPhase3, nil).Once()
sch.On("GetJobRuns", mock.Anything, tnnt, mock.Anything, jobCron).Return(schedulerRunsPhase3, nil).Once()
replayRepository.On("UpdateReplayRuns", mock.Anything, replayReq.Replay.ID(), mock.Anything).Return(nil).Once()
summaryMsg := "replay is finished with run status: success(2)"
replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateSuccess, mock.Anything, summaryMsg).Return(nil).Once()
replayRepository.On("UpdateReplayHeartbeat", mock.Anything, replayReq.Replay.ID()).Return(nil).Once()

alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
Expand Down Expand Up @@ -271,12 +274,14 @@ func TestReplayWorker(t *testing.T) {
replayRepository.On("GetReplayByID", mock.Anything, replayReq.Replay.ID()).Return(replayPhase2, nil).Once()
sch.On("GetJobRuns", mock.Anything, tnnt, mock.Anything, jobCron).Return(schedulerRunsPhase2, nil).Once()
replayRepository.On("UpdateReplayRuns", mock.Anything, replayReq.Replay.ID(), mock.Anything).Return(nil).Once()
replayRepository.On("UpdateReplayHeartbeat", mock.Anything, replayReq.Replay.ID()).Return(nil).Once()

// loop 3
replayRepository.On("GetReplayByID", mock.Anything, replayReq.Replay.ID()).Return(replayPhase3, nil).Once()
sch.On("GetJobRuns", mock.Anything, tnnt, mock.Anything, jobCron).Return(schedulerRunsPhase3, nil).Once()
replayRepository.On("UpdateReplayRuns", mock.Anything, replayReq.Replay.ID(), mock.Anything).Return(nil).Once()
summaryMsg := "replay is finished with run status: success(2)"
replayRepository.On("UpdateReplayHeartbeat", mock.Anything, replayReq.Replay.ID()).Return(nil).Once()
replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateSuccess, mock.Anything, summaryMsg).Return(nil).Once()

alertManager := new(mockAlertManager)
Expand Down Expand Up @@ -356,13 +361,15 @@ func TestReplayWorker(t *testing.T) {
replayRepository.On("UpdateReplayRuns", mock.Anything, replayReq.Replay.ID(), mock.Anything).Return(nil).Once()
sch.On("ClearBatch", mock.Anything, tnnt, jobAName, scheduledTime2.Add(-24*time.Hour), scheduledTime2.Add(-24*time.Hour)).Return(nil).Once()
replayRepository.On("UpdateReplayRuns", mock.Anything, replayReq.Replay.ID(), []*scheduler.JobRunStatus{runsPhase3[1]}).Return(nil).Once()
replayRepository.On("UpdateReplayHeartbeat", mock.Anything, replayReq.Replay.ID()).Return(nil).Once()

// loop 3
replayRepository.On("GetReplayByID", mock.Anything, replayReq.Replay.ID()).Return(replayPhase3, nil).Once()
sch.On("GetJobRuns", mock.Anything, tnnt, mock.Anything, jobCron).Return(schedulerRunsPhase3, nil).Once()
replayRepository.On("UpdateReplayRuns", mock.Anything, replayReq.Replay.ID(), mock.Anything).Return(nil).Once()
summaryMsg := "replay is finished with run status: success(2)"
replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateSuccess, mock.Anything, summaryMsg).Return(nil).Once()
replayRepository.On("UpdateReplayHeartbeat", mock.Anything, replayReq.Replay.ID()).Return(nil).Once()
alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
defer alertManager.AssertExpectations(t)
Expand Down Expand Up @@ -437,13 +444,15 @@ func TestReplayWorker(t *testing.T) {
replayRepository.On("GetReplayByID", mock.Anything, replayReq.Replay.ID()).Return(replayPhase2, nil).Once()
sch.On("GetJobRuns", mock.Anything, tnnt, mock.Anything, jobCron).Return(schedulerRunsPhase2, nil).Once()
replayRepository.On("UpdateReplayRuns", mock.Anything, replayReq.Replay.ID(), mock.Anything).Return(nil).Once()
replayRepository.On("UpdateReplayHeartbeat", mock.Anything, replayReq.Replay.ID()).Return(nil).Once()

// loop 3
replayRepository.On("GetReplayByID", mock.Anything, replayReq.Replay.ID()).Return(replayPhase3, nil).Once()
sch.On("GetJobRuns", mock.Anything, tnnt, mock.Anything, jobCron).Return(schedulerRunsPhase3, nil).Once()
replayRepository.On("UpdateReplayRuns", mock.Anything, replayReq.Replay.ID(), mock.Anything).Return(nil).Once()
summaryMsg := "replay is finished with run status: success(2)"
replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateSuccess, mock.Anything, summaryMsg).Return(nil).Once()
replayRepository.On("UpdateReplayHeartbeat", mock.Anything, replayReq.Replay.ID()).Return(nil).Once()

alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
Expand Down Expand Up @@ -753,12 +762,14 @@ func TestReplayWorker(t *testing.T) {
replayRepository.On("UpdateReplayRuns", mock.Anything, replayReq.Replay.ID(), mock.Anything).Return(nil).Once()
sch.On("ClearBatch", mock.Anything, tnnt, jobAName, scheduledTime2.Add(-24*time.Hour), scheduledTime2.Add(-24*time.Hour)).Return(nil).Once()
replayRepository.On("UpdateReplayRuns", mock.Anything, replayReq.Replay.ID(), []*scheduler.JobRunStatus{runsPhase3[1]}).Return(nil).Once()
replayRepository.On("UpdateReplayHeartbeat", mock.Anything, replayReq.Replay.ID()).Return(nil).Once()

// loop 3
replayRepository.On("GetReplayByID", mock.Anything, replayReq.Replay.ID()).Return(replayPhase3, nil).Once()
sch.On("GetJobRuns", mock.Anything, tnnt, mock.Anything, jobCron).Return(schedulerRunsPhase3, nil).Once()
replayRepository.On("UpdateReplayRuns", mock.Anything, replayReq.Replay.ID(), mock.Anything).Return(nil).Once()
replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateFailed, mock.Anything, mock.Anything).Return(nil).Once()
replayRepository.On("UpdateReplayHeartbeat", mock.Anything, replayReq.Replay.ID()).Return(nil).Once()

alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
Expand Down
16 changes: 9 additions & 7 deletions internal/store/postgres/scheduler/replay_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,19 +379,21 @@ func (r ReplayRepository) CancelReplayRequest(ctx context.Context, id uuid.UUID,
}

func (r ReplayRepository) UpdateReplayRuns(ctx context.Context, id uuid.UUID, runs []*scheduler.JobRunStatus) error {
tx, err := r.db.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return err
}
query := `UPDATE replay_run SET status=$1, updated_at=NOW() WHERE replay_id=$2 AND scheduled_at=$3 AND status<>$1`
for _, run := range runs {
_, err := tx.Exec(ctx, query, run.State, id, run.ScheduledAt)
_, err := r.db.Exec(ctx, query, run.State, id, run.ScheduledAt)
if err != nil {
tx.Rollback(ctx)
return errors.Wrap(scheduler.EntityJobRun, "unable to update replay runs", err)
}
}
tx.Commit(ctx)
return nil
}

func (r ReplayRepository) UpdateReplayHeartbeat(ctx context.Context, id uuid.UUID) error {
query := `UPDATE replay_request SET updated_at = NOW() WHERE id = $1`
if _, err := r.db.Exec(ctx, query, id); err != nil {
return errors.Wrap(scheduler.EntityJobRun, "unable to update replay heatbeat", err)
}
return nil
}

Expand Down

0 comments on commit d937f02

Please sign in to comment.