Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate Nexus Operation-Timeout header #6676

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions components/nexusoperations/executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ
return err
}

header := nexus.Header(args.header)
if e.Config.CallbackURLTemplate() == "unset" {
return serviceerror.NewInternal(fmt.Sprintf("dynamic config %q is unset", CallbackURLTemplate.Key().String()))
}
Expand Down Expand Up @@ -197,10 +196,35 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ
return fmt.Errorf("%w: %w", queues.NewUnprocessableTaskError("failed to generate a callback token"), err)
}

callCtx, cancel := context.WithTimeout(
ctx,
e.Config.RequestTimeout(ns.Name().String(), task.EndpointName),
)
// The following logic handles the operation ScheduleToCloseTimout parameter and Operation-Timeout header.
// The minimum of the resolved operation timeout and the configured request timeout is used for the context timeout
// when making the StartOperation request, according to the following logic:
// (ScheduleToClose set, Operation-Timeout set) -> no changes, use ScheduleToClose
// (ScheduleToClose set, Operation-Timeout unset) -> set Operation-Timeout to ScheduleToClose, use ScheduleToClose
// (ScheduleToClose unset, Operation-Timeout set) -> no changes, use Operation-Timeout
header := nexus.Header(args.header)
opTimeout := args.scheduleToCloseTimeout
opTimeoutHeader, set := header["Operation-Timeout"]
if !set && args.scheduleToCloseTimeout > 0 {
if header == nil {
header = make(nexus.Header, 1)
}
header["Operation-Timeout"] = args.scheduleToCloseTimeout.String()
} else if set && args.scheduleToCloseTimeout == 0 {
parsedTimeout, parseErr := time.ParseDuration(opTimeoutHeader)
if parseErr != nil {
// ScheduleToCloseTimeout is not required, so do not fail task on parsing error.
e.Logger.Warn(fmt.Sprintf("unable to parse Operation-Timeout header: %v", opTimeoutHeader), tag.Error(parseErr))
} else {
opTimeout = parsedTimeout
}
}

callTimeout := e.Config.RequestTimeout(ns.Name().String(), task.EndpointName)
if opTimeout > 0 {
callTimeout = min(callTimeout, opTimeout)
}
callCtx, cancel := context.WithTimeout(ctx, callTimeout)
defer cancel()

// Make the call and record metrics.
Expand Down Expand Up @@ -266,6 +290,7 @@ type startArgs struct {
requestID string
endpointName string
endpointID string
scheduleToCloseTimeout time.Duration
header map[string]string
payload *commonpb.Payload
nexusLink nexus.Link
Expand Down Expand Up @@ -301,6 +326,7 @@ func (e taskExecutor) loadOperationArgs(
if err != nil {
return nil
}
args.scheduleToCloseTimeout = event.GetNexusOperationScheduledEventAttributes().GetScheduleToCloseTimeout().AsDuration()
args.payload = event.GetNexusOperationScheduledEventAttributes().GetInput()
args.header = event.GetNexusOperationScheduledEventAttributes().GetNexusHeader()
args.nexusLink = ConvertLinkWorkflowEventToNexusLink(&commonpb.Link_WorkflowEvent{
Expand Down
52 changes: 47 additions & 5 deletions components/nexusoperations/executors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,13 @@
endpointNotFound bool
eventHasNoEndpointID bool
operationIsCanceled bool
headers nexus.Header
checkStartOperationOptions func(t *testing.T, options nexus.StartOperationOptions)
onStartOperation func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error)
expectedMetricOutcome string
checkOutcome func(t *testing.T, op nexusoperations.Operation, events []*historypb.HistoryEvent)
requestTimeout time.Duration
schedToCloseTimeout time.Duration
destinationDown bool
}{
{
Expand Down Expand Up @@ -167,9 +169,11 @@
},
},
{
name: "sync start",
requestTimeout: time.Hour,
destinationDown: false,
name: "sync start",
requestTimeout: time.Hour,
schedToCloseTimeout: time.Hour,
headers: nexus.Header{"Operation-Timeout": time.Microsecond.String()}, // to test this value is ignored when ScheduleToCloseTimeout is set
destinationDown: false,
onStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) {
// Also use this test case to check the input and options provided.
if service != "service" {
Expand Down Expand Up @@ -311,8 +315,43 @@
},
},
{
name: "invocation timeout",
name: "invocation timeout by request timeout",
requestTimeout: time.Microsecond,
schedToCloseTimeout: time.Hour,
destinationDown: true,
expectedMetricOutcome: "request-timeout",
onStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) {
time.Sleep(time.Millisecond * 100)

Check failure on line 324 in components/nexusoperations/executors_test.go

View workflow job for this annotation

GitHub Actions / lint

use of `time.Sleep` forbidden because "Please use require.Eventually or assert.Eventually instead unless you've no other option" (forbidigo)
return &nexus.HandlerStartOperationResultAsync{OperationID: "op-id"}, nil
},
checkOutcome: func(t *testing.T, op nexusoperations.Operation, events []*historypb.HistoryEvent) {
require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, op.State())
require.NotNil(t, op.LastAttemptFailure.GetApplicationFailureInfo())
require.Regexp(t, "Post \"http://localhost:\\d+/service/operation\\?callback=http%3A%2F%2Flocalhost%2Fcallback\": context deadline exceeded", op.LastAttemptFailure.Message)
require.Equal(t, 0, len(events))
},
},
{
name: "invocation timeout by ScheduleToCloseTimeout",
requestTimeout: time.Hour,
schedToCloseTimeout: time.Microsecond,
destinationDown: true,
expectedMetricOutcome: "request-timeout",
onStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) {
time.Sleep(time.Millisecond * 100)

Check failure on line 341 in components/nexusoperations/executors_test.go

View workflow job for this annotation

GitHub Actions / lint

use of `time.Sleep` forbidden because "Please use require.Eventually or assert.Eventually instead unless you've no other option" (forbidigo)
return &nexus.HandlerStartOperationResultAsync{OperationID: "op-id"}, nil
},
checkOutcome: func(t *testing.T, op nexusoperations.Operation, events []*historypb.HistoryEvent) {
require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, op.State())
require.NotNil(t, op.LastAttemptFailure.GetApplicationFailureInfo())
require.Regexp(t, "Post \"http://localhost:\\d+/service/operation\\?callback=http%3A%2F%2Flocalhost%2Fcallback\": context deadline exceeded", op.LastAttemptFailure.Message)
require.Equal(t, 0, len(events))
},
},
{
name: "invocation timeout by Operation-Timeout header",
requestTimeout: time.Hour,
headers: nexus.Header{"Operation-Timeout": time.Microsecond.String()},
destinationDown: true,
expectedMetricOutcome: "request-timeout",
onStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) {
Expand Down Expand Up @@ -390,10 +429,13 @@
nexustest.NewNexusServer(t, listenAddr, h)

reg := newRegistry(t)
event := mustNewScheduledEvent(time.Now(), time.Hour)
event := mustNewScheduledEvent(time.Now(), tc.schedToCloseTimeout)
if tc.eventHasNoEndpointID {
event.GetNexusOperationScheduledEventAttributes().EndpointId = ""
}
if tc.headers != nil {
event.GetNexusOperationScheduledEventAttributes().NexusHeader = tc.headers
}
backend := &hsmtest.NodeBackend{Events: []*historypb.HistoryEvent{event}}
node := newOperationNode(t, backend, backend.Events[0])
env := fakeEnv{node}
Expand Down
22 changes: 13 additions & 9 deletions components/nexusoperations/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,24 @@ func mustNewScheduledEvent(schedTime time.Time, timeout time.Duration) *historyp
panic(err)
}

attr := &historypb.NexusOperationScheduledEventAttributes{
EndpointId: "endpoint-id",
Endpoint: "endpoint",
Service: "service",
Operation: "operation",
Input: payload,
RequestId: uuid.NewString(),
}
if timeout > 0 {
attr.ScheduleToCloseTimeout = durationpb.New(timeout)
}

return &historypb.HistoryEvent{
EventType: enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED,
EventId: 1,
EventTime: timestamppb.New(schedTime),
Attributes: &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{
NexusOperationScheduledEventAttributes: &historypb.NexusOperationScheduledEventAttributes{
EndpointId: "endpoint-id",
Endpoint: "endpoint",
Service: "service",
Operation: "operation",
Input: payload,
RequestId: uuid.NewString(),
ScheduleToCloseTimeout: durationpb.New(timeout),
},
NexusOperationScheduledEventAttributes: attr,
},
}
}
9 changes: 9 additions & 0 deletions components/nexusoperations/workflow/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
commonnexus "go.temporal.io/server/common/nexus"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/components/nexusoperations"
"go.temporal.io/server/service/history/hsm"
"go.temporal.io/server/service/history/workflow"
Expand Down Expand Up @@ -110,6 +111,14 @@ func (ch *commandHandler) HandleScheduleCommand(
}
}

if err := timestamp.ValidateProtoDuration(attrs.ScheduleToCloseTimeout); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly related to this PR but I realized I missed this validation in #6631

return workflow.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf(
"ScheduleNexusOperationCommandAttributes.ScheduleToCloseTimeout is invalid: %v", err),
}
}

if !validator.IsValidPayloadSize(attrs.Input.Size()) {
return workflow.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Expand Down
3 changes: 3 additions & 0 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1769,6 +1769,9 @@ pollLoop:

nexusReq := task.nexus.request.GetRequest()
nexusReq.Header[nexus.HeaderRequestTimeout] = time.Until(task.nexus.deadline).String()
if !task.nexus.operationDeadline.IsZero() {
nexusReq.Header["Operation-Timeout"] = time.Until(task.nexus.operationDeadline).String()
}

return &matchingservice.PollNexusTaskQueueResponse{
Response: &workflowservice.PollNexusTaskQueueResponse{
Expand Down
13 changes: 12 additions & 1 deletion service/matching/physical_task_queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,18 @@ func (c *physicalTaskQueueManagerImpl) DispatchNexusTask(
request *matchingservice.DispatchNexusTaskRequest,
) (*matchingservice.DispatchNexusTaskResponse, error) {
deadline, _ := ctx.Deadline() // If not set by user, our client will set a default.
task := newInternalNexusTask(taskId, deadline, request)
var opDeadline time.Time
opTimeoutHeader, set := request.GetRequest().GetHeader()["Operation-Timeout"]
if set {
opTimeout, err := time.ParseDuration(opTimeoutHeader)
if err != nil {
// Operation-Timeout header is not required so don't fail request on parsing errors.
c.logger.Warn(fmt.Sprintf("unable to parse Operation-Timeout header: %v", opTimeoutHeader), tag.Error(err))
} else {
opDeadline = time.Now().Add(opTimeout)
}
}
task := newInternalNexusTask(taskId, deadline, opDeadline, request)
if !task.isForwarded() {
c.tasksAddedInIntervals.incrementTaskCount()
}
Expand Down
15 changes: 9 additions & 6 deletions service/matching/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ type (
}
// nexusTaskInfo contains the info for a nexus task
nexusTaskInfo struct {
taskID string
deadline time.Time
request *matchingservice.DispatchNexusTaskRequest
taskID string
deadline time.Time
operationDeadline time.Time
request *matchingservice.DispatchNexusTaskRequest
}
// startedTaskInfo contains info for any task received from
// another matching host. This type of task is already marked as started
Expand Down Expand Up @@ -141,13 +142,15 @@ func newInternalQueryTask(
func newInternalNexusTask(
taskID string,
deadline time.Time,
operationDeadline time.Time,
request *matchingservice.DispatchNexusTaskRequest,
) *internalTask {
return &internalTask{
nexus: &nexusTaskInfo{
taskID: taskID,
deadline: deadline,
request: request,
taskID: taskID,
deadline: deadline,
operationDeadline: operationDeadline,
request: request,
},
forwardInfo: request.GetForwardInfo(),
responseC: make(chan error, 1),
Expand Down
Loading