diff --git a/execute.go b/execute.go index 405f8f9..7edbc4a 100644 --- a/execute.go +++ b/execute.go @@ -253,6 +253,22 @@ func executeStep( queryResult = resultObj } + // we need to collect all the dependent steps and execute them at last in this function + // to avoid a race condition, where the result of a dependent request is published to the + // result channel even before the result created in this iteration + type stepArgs struct { + step *QueryPlanStep + insertionPoint []string + } + var dependentSteps []stepArgs + // defer the execution of the dependent steps after the main step has been published + defer func() { + for _, sr := range dependentSteps { + log.Info("Spawn ", sr.insertionPoint) + go executeStep(ctx, plan, sr.step, sr.insertionPoint, resultLock, queryVariables, resultCh, errCh, stepWg) + } + }() + // if there are next steps if len(step.Then) > 0 { log.Debug("Kicking off child queries") @@ -263,19 +279,24 @@ func executeStep( copy(copiedInsertionPoint, insertionPoint) insertPoints, err := executorFindInsertionPoints(resultLock, dependent.InsertionPoint, step.SelectionSet, queryResult, [][]string{copiedInsertionPoint}, step.FragmentDefinitions) if err != nil { + // reset dependent steps - result would be discarded anyways + dependentSteps = nil errCh <- err return } // this dependent needs to fire for every object that the insertion point references for _, insertionPoint := range insertPoints { - log.Info("Spawn ", insertionPoint) - stepWg.Add(1) - go executeStep(ctx, plan, dependent, insertionPoint, resultLock, queryVariables, resultCh, errCh, stepWg) + dependentSteps = append(dependentSteps, stepArgs{ + step: dependent, + insertionPoint: insertionPoint, + }) } } } + // before publishing the current result, tell the wait-group about the dependent steps to wait for + stepWg.Add(len(dependentSteps)) log.Debug("Pushing Result. Insertion point: ", insertionPoint, ". Value: ", queryResult) // send the result to be stitched in with our accumulator resultCh <- &queryExecutionResult{