Skip to content

Commit

Permalink
avoid race condition described in #141 (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
sGy1980de authored Jul 5, 2021
1 parent 5eee34e commit 6a08012
Showing 1 changed file with 24 additions and 3 deletions.
27 changes: 24 additions & 3 deletions execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,22 @@ func executeStep(
queryResult = resultObj
}

// we need to collect all the dependent steps and execute them at last in this function
// to avoid a race condition, where the result of a dependent request is published to the
// result channel even before the result created in this iteration
type stepArgs struct {
step *QueryPlanStep
insertionPoint []string
}
var dependentSteps []stepArgs
// defer the execution of the dependent steps after the main step has been published
defer func() {
for _, sr := range dependentSteps {
log.Info("Spawn ", sr.insertionPoint)
go executeStep(ctx, plan, sr.step, sr.insertionPoint, resultLock, queryVariables, resultCh, errCh, stepWg)
}
}()

// if there are next steps
if len(step.Then) > 0 {
log.Debug("Kicking off child queries")
Expand All @@ -263,19 +279,24 @@ func executeStep(
copy(copiedInsertionPoint, insertionPoint)
insertPoints, err := executorFindInsertionPoints(resultLock, dependent.InsertionPoint, step.SelectionSet, queryResult, [][]string{copiedInsertionPoint}, step.FragmentDefinitions)
if err != nil {
// reset dependent steps - result would be discarded anyways
dependentSteps = nil
errCh <- err
return
}

// this dependent needs to fire for every object that the insertion point references
for _, insertionPoint := range insertPoints {
log.Info("Spawn ", insertionPoint)
stepWg.Add(1)
go executeStep(ctx, plan, dependent, insertionPoint, resultLock, queryVariables, resultCh, errCh, stepWg)
dependentSteps = append(dependentSteps, stepArgs{
step: dependent,
insertionPoint: insertionPoint,
})
}
}
}

// before publishing the current result, tell the wait-group about the dependent steps to wait for
stepWg.Add(len(dependentSteps))
log.Debug("Pushing Result. Insertion point: ", insertionPoint, ". Value: ", queryResult)
// send the result to be stitched in with our accumulator
resultCh <- &queryExecutionResult{
Expand Down

0 comments on commit 6a08012

Please sign in to comment.