Skip to content

Commit

Permalink
fix: duplicate unique constraint issue on job deployment (#256)
Browse files Browse the repository at this point in the history
* fix: duplicate unique constraint issue on job deployment

* chore: fix lint issue in job service test
  • Loading branch information
arinda-arif committed Jul 22, 2024
1 parent 9ef1741 commit 81dd3d0
Show file tree
Hide file tree
Showing 2 changed files with 373 additions and 37 deletions.
66 changes: 38 additions & 28 deletions core/job/service/job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,18 @@ func (j *JobService) Add(ctx context.Context, jobTenant tenant.Tenant, specs []*
downstreamJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), jobs)
me.Append(err)

jobsWithUpstreams, err := j.upstreamResolver.BulkResolve(ctx, jobTenant.ProjectName(), append(addedJobs, downstreamJobs...), logWriter)
var jobsToBeResolved []*job.Job
jobsToBeResolved = append(jobsToBeResolved, addedJobs...)
jobsToBeResolved = append(jobsToBeResolved, downstreamJobs...)
jobsToBeResolved = job.Jobs(jobsToBeResolved).Deduplicate()

jobsWithUpstreams, err := j.upstreamResolver.BulkResolve(ctx, jobTenant.ProjectName(), jobsToBeResolved, logWriter)
me.Append(err)

err = j.upstreamRepo.ReplaceUpstreams(ctx, jobsWithUpstreams)
me.Append(err)

err = j.uploadJobs(ctx, jobTenant, addedJobs, downstreamJobs, nil)
err = j.uploadJobs(ctx, jobTenant, jobsToBeResolved, nil)
me.Append(err)

for _, addedJob := range addedJobs {
Expand Down Expand Up @@ -224,7 +229,7 @@ func (j *JobService) Update(ctx context.Context, jobTenant tenant.Tenant, specs
downstreamUpdatedJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), updatedJobs)
me.Append(err)

jobsToBeResolved := []*job.Job{}
var jobsToBeResolved []*job.Job
jobsToBeResolved = append(jobsToBeResolved, updatedJobs...)
jobsToBeResolved = append(jobsToBeResolved, downstreamExistingJobs...)
jobsToBeResolved = append(jobsToBeResolved, downstreamUpdatedJobs...)
Expand All @@ -236,7 +241,7 @@ func (j *JobService) Update(ctx context.Context, jobTenant tenant.Tenant, specs
err = j.upstreamRepo.ReplaceUpstreams(ctx, jobsWithUpstreams)
me.Append(err)

err = j.uploadJobs(ctx, jobTenant, nil, jobsToBeResolved, nil)
err = j.uploadJobs(ctx, jobTenant, jobsToBeResolved, nil)
me.Append(err)

if len(updatedJobs) > 0 {
Expand Down Expand Up @@ -297,20 +302,22 @@ func (j *JobService) Upsert(ctx context.Context, jobTenant tenant.Tenant, specs
var upsertedJobs []*job.Job
upsertedJobs = append(upsertedJobs, addedJobs...)
upsertedJobs = append(upsertedJobs, updatedJobs...)
downstreamToBeResolved := []*job.Job{}
downstreamToBeResolved = append(downstreamToBeResolved, downstreamExistingJobs...)
downstreamToBeResolved = append(downstreamToBeResolved, downstreamUpdatedJobs...)
downstreamToBeResolved = append(downstreamToBeResolved, downstreamAddedJobs...)
downstreamToBeResolved = job.Jobs(downstreamToBeResolved).Deduplicate()

var jobsToBeResolved []*job.Job
jobsToBeResolved = append(jobsToBeResolved, upsertedJobs...)
jobsToBeResolved = append(jobsToBeResolved, downstreamExistingJobs...)
jobsToBeResolved = append(jobsToBeResolved, downstreamUpdatedJobs...)
jobsToBeResolved = append(jobsToBeResolved, downstreamAddedJobs...)
jobsToBeResolved = job.Jobs(jobsToBeResolved).Deduplicate()

if len(upsertedJobs) > 0 {
jobsWithUpstreams, err := j.upstreamResolver.BulkResolve(ctx, jobTenant.ProjectName(), append(upsertedJobs, downstreamToBeResolved...), logWriter)
jobsWithUpstreams, err := j.upstreamResolver.BulkResolve(ctx, jobTenant.ProjectName(), jobsToBeResolved, logWriter)
me.Append(err)

err = j.upstreamRepo.ReplaceUpstreams(ctx, jobsWithUpstreams)
me.Append(err)

err = j.uploadJobs(ctx, jobTenant, addedJobs, append(updatedJobs, downstreamToBeResolved...), nil)
err = j.uploadJobs(ctx, jobTenant, jobsToBeResolved, nil)
me.Append(err)
}

Expand Down Expand Up @@ -399,7 +406,7 @@ func (j *JobService) Delete(ctx context.Context, jobTenant tenant.Tenant, jobNam

raiseJobEventMetric(jobTenant, job.MetricJobEventStateDeleted, 1)

if err := j.uploadJobs(ctx, jobTenant, nil, nil, []job.Name{jobName}); err != nil {
if err := j.uploadJobs(ctx, jobTenant, nil, []job.Name{jobName}); err != nil {
j.logger.Error("error uploading job [%s]: %s", jobName, err)
return downstreamFullNames, err
}
Expand All @@ -422,13 +429,13 @@ func (j *JobService) ChangeNamespace(ctx context.Context, jobTenant, jobNewTenan
return errors.NewError(errors.ErrInternalError, job.EntityJob, errorsMsg)
}

err = j.uploadJobs(ctx, jobTenant, nil, nil, []job.Name{jobName})
err = j.uploadJobs(ctx, jobTenant, nil, []job.Name{jobName})
if err != nil {
errorsMsg := fmt.Sprintf(" unable to remove old job : %s", err.Error())
return errors.NewError(errors.ErrInternalError, job.EntityJob, errorsMsg)
}

err = j.uploadJobs(ctx, jobNewTenant, []*job.Job{newJobSpec}, nil, nil)
err = j.uploadJobs(ctx, jobNewTenant, []*job.Job{newJobSpec}, nil)
if err != nil {
errorsMsg := fmt.Sprintf(" unable to create new job on scheduler : %s", err.Error())
return errors.NewError(errors.ErrInternalError, job.EntityJob, errorsMsg)
Expand Down Expand Up @@ -564,7 +571,7 @@ func (j *JobService) bulkJobCleanup(ctx context.Context, jobTenant tenant.Tenant
me := errors.NewMultiError("bulk Job Cleanup errors")
deletedJobNames, err := j.bulkDelete(ctx, jobTenant, toDelete, logWriter)
me.Append(err)
err = j.uploadJobs(ctx, jobTenant, nil, nil, deletedJobNames)
err = j.uploadJobs(ctx, jobTenant, nil, deletedJobNames)
if err != nil {
me.Append(err)
return errors.Wrap(job.EntityJob, "critical error deleting DAGS from scheduler storage, consider deleting the dags manually and report this incident to administrators, this wont fix on a retry", me.ToErr())
Expand Down Expand Up @@ -686,17 +693,19 @@ func (j *JobService) ReplaceAll(ctx context.Context, jobTenant tenant.Tenant, sp
downstreamAddedJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), addedJobs)
me.Append(err)

downstreamToBeResolved := []*job.Job{}
downstreamToBeResolved = append(downstreamToBeResolved, downstreamExistingJobs...)
downstreamToBeResolved = append(downstreamToBeResolved, downstreamUpdatedJobs...)
downstreamToBeResolved = append(downstreamToBeResolved, downstreamAddedJobs...)
downstreamToBeResolved = job.Jobs(downstreamToBeResolved).Deduplicate()
jobsToBeResolved := []*job.Job{}
jobsToBeResolved = append(jobsToBeResolved, updatedJobs...)
jobsToBeResolved = append(jobsToBeResolved, addedJobs...)
jobsToBeResolved = append(jobsToBeResolved, downstreamExistingJobs...)
jobsToBeResolved = append(jobsToBeResolved, downstreamUpdatedJobs...)
jobsToBeResolved = append(jobsToBeResolved, downstreamAddedJobs...)
jobsToBeResolved = job.Jobs(jobsToBeResolved).Deduplicate()

if err := j.resolveAndSaveUpstreams(ctx, jobTenant, logWriter, addedJobs, updatedJobs, downstreamToBeResolved); err != nil {
if err := j.resolveAndSaveUpstreams(ctx, jobTenant, logWriter, jobsToBeResolved); err != nil {
return errors.Wrap(job.EntityJob, "failed resolving job upstreams", err)
}

if err := j.uploadJobs(ctx, jobTenant, addedJobs, append(updatedJobs, downstreamToBeResolved...), nil); err != nil {
if err := j.uploadJobs(ctx, jobTenant, jobsToBeResolved, nil); err != nil {
return errors.Wrap(job.EntityJob, "failed uploading compiled dags", err)
}

Expand All @@ -709,17 +718,18 @@ func (j *JobService) ReplaceAll(ctx context.Context, jobTenant tenant.Tenant, sp
return nil
}

func (j *JobService) uploadJobs(ctx context.Context, jobTenant tenant.Tenant, addedJobs, updatedJobs []*job.Job, deletedJobNames []job.Name) error {
if len(addedJobs) == 0 && len(updatedJobs) == 0 && len(deletedJobNames) == 0 {
func (j *JobService) uploadJobs(ctx context.Context, jobTenant tenant.Tenant, jobsToUpload []*job.Job, deletedJobNames []job.Name) error {
if len(jobsToUpload) == 0 && len(deletedJobNames) == 0 {
j.logger.Warn("no jobs to be uploaded")
return nil
}

var jobNamesToUpload, jobNamesToRemove []string
for _, addedJob := range append(addedJobs, updatedJobs...) {
jobNamesToUpload = append(jobNamesToUpload, addedJob.GetName())
var jobNamesToUpload []string
for _, jobToUpload := range jobsToUpload {
jobNamesToUpload = append(jobNamesToUpload, jobToUpload.GetName())
}

var jobNamesToRemove []string
for _, deletedJobName := range deletedJobNames {
jobNamesToRemove = append(jobNamesToRemove, deletedJobName.String())
}
Expand Down Expand Up @@ -771,7 +781,7 @@ func (j *JobService) Refresh(ctx context.Context, projectName tenant.ProjectName
me.Append(err)

j.logger.Debug("uploading [%d] jobs of project [%s] namespace [%s] to scheduler", len(jobs), projectName, namespaceName)
err = j.uploadJobs(ctx, jobTenant, jobs, nil, nil)
err = j.uploadJobs(ctx, jobTenant, jobs, nil)
me.Append(err)
}

Expand Down
Loading

0 comments on commit 81dd3d0

Please sign in to comment.