From bf4882cb2c9e43e3d24505ed0bd46c33de2ea438 Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Fri, 18 Oct 2024 12:33:25 -0700 Subject: [PATCH 1/2] Propagate Nexus Operation-Timeout header --- components/nexusoperations/executors.go | 31 ++++++++++++++++--- .../nexusoperations/workflow/commands.go | 9 ++++++ service/matching/matching_engine.go | 3 ++ .../matching/physical_task_queue_manager.go | 13 +++++++- service/matching/task.go | 15 +++++---- 5 files changed, 60 insertions(+), 11 deletions(-) diff --git a/components/nexusoperations/executors.go b/components/nexusoperations/executors.go index 3a01c9427e8..65335a8da95 100644 --- a/components/nexusoperations/executors.go +++ b/components/nexusoperations/executors.go @@ -197,10 +197,31 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ return fmt.Errorf("%w: %w", queues.NewUnprocessableTaskError("failed to generate a callback token"), err) } - callCtx, cancel := context.WithTimeout( - ctx, - e.Config.RequestTimeout(ns.Name().String(), task.EndpointName), - ) + // The following logic handles the operation ScheduleToCloseTimout parameter and Operation-Timeout header. + // The minimum of the resolved operation timeout and the configured request timeout is used for the context timeout + // when making the StartOperation request, according to the following logic: + // (ScheduleToClose set, Operation-Timeout set) -> no changes, use ScheduleToClose + // (ScheduleToClose set, Operation-Timeout unset) -> set Operation-Timeout to ScheduleToClose, use ScheduleToClose + // (ScheduleToClose unset, Operation-Timeout set) -> no changes, use Operation-Timeout + opTimeout := args.scheduleToCloseTimeout + opTimeoutHeader, set := header["Operation-Timeout"] + if !set && args.scheduleToCloseTimeout > 0 { + header["Operation-Timeout"] = args.scheduleToCloseTimeout.String() + } else if set && args.scheduleToCloseTimeout == 0 { + parsedTimeout, parseErr := time.ParseDuration(opTimeoutHeader) + if parseErr != nil { + // ScheduleToCloseTimeout is not required, so do not fail task on parsing error. + e.Logger.Warn(fmt.Sprintf("unable to parse Operation-Timeout header: %v", opTimeoutHeader), tag.Error(parseErr)) + } else { + opTimeout = parsedTimeout + } + } + + callTimeout := e.Config.RequestTimeout(ns.Name().String(), task.EndpointName) + if opTimeout > 0 { + callTimeout = min(callTimeout, opTimeout) + } + callCtx, cancel := context.WithTimeout(ctx, callTimeout) defer cancel() // Make the call and record metrics. @@ -266,6 +287,7 @@ type startArgs struct { requestID string endpointName string endpointID string + scheduleToCloseTimeout time.Duration header map[string]string payload *commonpb.Payload nexusLink nexus.Link @@ -301,6 +323,7 @@ func (e taskExecutor) loadOperationArgs( if err != nil { return nil } + args.scheduleToCloseTimeout = event.GetNexusOperationScheduledEventAttributes().GetScheduleToCloseTimeout().AsDuration() args.payload = event.GetNexusOperationScheduledEventAttributes().GetInput() args.header = event.GetNexusOperationScheduledEventAttributes().GetNexusHeader() args.nexusLink = ConvertLinkWorkflowEventToNexusLink(&commonpb.Link_WorkflowEvent{ diff --git a/components/nexusoperations/workflow/commands.go b/components/nexusoperations/workflow/commands.go index 94410bfa363..ddc4c5745a6 100644 --- a/components/nexusoperations/workflow/commands.go +++ b/components/nexusoperations/workflow/commands.go @@ -36,6 +36,7 @@ import ( historypb "go.temporal.io/api/history/v1" "go.temporal.io/api/serviceerror" commonnexus "go.temporal.io/server/common/nexus" + "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/components/nexusoperations" "go.temporal.io/server/service/history/hsm" "go.temporal.io/server/service/history/workflow" @@ -110,6 +111,14 @@ func (ch *commandHandler) HandleScheduleCommand( } } + if err := timestamp.ValidateProtoDuration(attrs.ScheduleToCloseTimeout); err != nil { + return workflow.FailWorkflowTaskError{ + Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES, + Message: fmt.Sprintf( + "ScheduleNexusOperationCommandAttributes.ScheduleToCloseTimeout is invalid: %v", err), + } + } + if !validator.IsValidPayloadSize(attrs.Input.Size()) { return workflow.FailWorkflowTaskError{ Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES, diff --git a/service/matching/matching_engine.go b/service/matching/matching_engine.go index 41bbd8ba627..8dcd5304101 100644 --- a/service/matching/matching_engine.go +++ b/service/matching/matching_engine.go @@ -1769,6 +1769,9 @@ pollLoop: nexusReq := task.nexus.request.GetRequest() nexusReq.Header[nexus.HeaderRequestTimeout] = time.Until(task.nexus.deadline).String() + if !task.nexus.operationDeadline.IsZero() { + nexusReq.Header["Operation-Timeout"] = time.Until(task.nexus.operationDeadline).String() + } return &matchingservice.PollNexusTaskQueueResponse{ Response: &workflowservice.PollNexusTaskQueueResponse{ diff --git a/service/matching/physical_task_queue_manager.go b/service/matching/physical_task_queue_manager.go index cb9e93cd649..eaacac19e9f 100644 --- a/service/matching/physical_task_queue_manager.go +++ b/service/matching/physical_task_queue_manager.go @@ -423,7 +423,18 @@ func (c *physicalTaskQueueManagerImpl) DispatchNexusTask( request *matchingservice.DispatchNexusTaskRequest, ) (*matchingservice.DispatchNexusTaskResponse, error) { deadline, _ := ctx.Deadline() // If not set by user, our client will set a default. - task := newInternalNexusTask(taskId, deadline, request) + var opDeadline time.Time + opTimeoutHeader, set := request.GetRequest().GetHeader()["Operation-Timeout"] + if set { + opTimeout, err := time.ParseDuration(opTimeoutHeader) + if err != nil { + // Operation-Timeout header is not required so don't fail request on parsing errors. + c.logger.Warn(fmt.Sprintf("unable to parse Operation-Timeout header: %v", opTimeoutHeader), tag.Error(err)) + } else { + opDeadline = time.Now().Add(opTimeout) + } + } + task := newInternalNexusTask(taskId, deadline, opDeadline, request) if !task.isForwarded() { c.tasksAddedInIntervals.incrementTaskCount() } diff --git a/service/matching/task.go b/service/matching/task.go index ea3c3146c65..05f52b37359 100644 --- a/service/matching/task.go +++ b/service/matching/task.go @@ -48,9 +48,10 @@ type ( } // nexusTaskInfo contains the info for a nexus task nexusTaskInfo struct { - taskID string - deadline time.Time - request *matchingservice.DispatchNexusTaskRequest + taskID string + deadline time.Time + operationDeadline time.Time + request *matchingservice.DispatchNexusTaskRequest } // startedTaskInfo contains info for any task received from // another matching host. This type of task is already marked as started @@ -141,13 +142,15 @@ func newInternalQueryTask( func newInternalNexusTask( taskID string, deadline time.Time, + operationDeadline time.Time, request *matchingservice.DispatchNexusTaskRequest, ) *internalTask { return &internalTask{ nexus: &nexusTaskInfo{ - taskID: taskID, - deadline: deadline, - request: request, + taskID: taskID, + deadline: deadline, + operationDeadline: operationDeadline, + request: request, }, forwardInfo: request.GetForwardInfo(), responseC: make(chan error, 1), From 0ba0bf6acb8a86706058c75be26410feed532a86 Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Fri, 18 Oct 2024 13:18:15 -0700 Subject: [PATCH 2/2] tests --- components/nexusoperations/executors.go | 5 +- components/nexusoperations/executors_test.go | 52 ++++++++++++++++++-- components/nexusoperations/helpers_test.go | 22 +++++---- 3 files changed, 64 insertions(+), 15 deletions(-) diff --git a/components/nexusoperations/executors.go b/components/nexusoperations/executors.go index 65335a8da95..97da5cfc389 100644 --- a/components/nexusoperations/executors.go +++ b/components/nexusoperations/executors.go @@ -157,7 +157,6 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ return err } - header := nexus.Header(args.header) if e.Config.CallbackURLTemplate() == "unset" { return serviceerror.NewInternal(fmt.Sprintf("dynamic config %q is unset", CallbackURLTemplate.Key().String())) } @@ -203,9 +202,13 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ // (ScheduleToClose set, Operation-Timeout set) -> no changes, use ScheduleToClose // (ScheduleToClose set, Operation-Timeout unset) -> set Operation-Timeout to ScheduleToClose, use ScheduleToClose // (ScheduleToClose unset, Operation-Timeout set) -> no changes, use Operation-Timeout + header := nexus.Header(args.header) opTimeout := args.scheduleToCloseTimeout opTimeoutHeader, set := header["Operation-Timeout"] if !set && args.scheduleToCloseTimeout > 0 { + if header == nil { + header = make(nexus.Header, 1) + } header["Operation-Timeout"] = args.scheduleToCloseTimeout.String() } else if set && args.scheduleToCloseTimeout == 0 { parsedTimeout, parseErr := time.ParseDuration(opTimeoutHeader) diff --git a/components/nexusoperations/executors_test.go b/components/nexusoperations/executors_test.go index 71c55819b5e..a194c4e1696 100644 --- a/components/nexusoperations/executors_test.go +++ b/components/nexusoperations/executors_test.go @@ -105,11 +105,13 @@ func TestProcessInvocationTask(t *testing.T) { endpointNotFound bool eventHasNoEndpointID bool operationIsCanceled bool + headers nexus.Header checkStartOperationOptions func(t *testing.T, options nexus.StartOperationOptions) onStartOperation func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) expectedMetricOutcome string checkOutcome func(t *testing.T, op nexusoperations.Operation, events []*historypb.HistoryEvent) requestTimeout time.Duration + schedToCloseTimeout time.Duration destinationDown bool }{ { @@ -167,9 +169,11 @@ func TestProcessInvocationTask(t *testing.T) { }, }, { - name: "sync start", - requestTimeout: time.Hour, - destinationDown: false, + name: "sync start", + requestTimeout: time.Hour, + schedToCloseTimeout: time.Hour, + headers: nexus.Header{"Operation-Timeout": time.Microsecond.String()}, // to test this value is ignored when ScheduleToCloseTimeout is set + destinationDown: false, onStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) { // Also use this test case to check the input and options provided. if service != "service" { @@ -311,8 +315,43 @@ func TestProcessInvocationTask(t *testing.T) { }, }, { - name: "invocation timeout", + name: "invocation timeout by request timeout", requestTimeout: time.Microsecond, + schedToCloseTimeout: time.Hour, + destinationDown: true, + expectedMetricOutcome: "request-timeout", + onStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) { + time.Sleep(time.Millisecond * 100) + return &nexus.HandlerStartOperationResultAsync{OperationID: "op-id"}, nil + }, + checkOutcome: func(t *testing.T, op nexusoperations.Operation, events []*historypb.HistoryEvent) { + require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, op.State()) + require.NotNil(t, op.LastAttemptFailure.GetApplicationFailureInfo()) + require.Regexp(t, "Post \"http://localhost:\\d+/service/operation\\?callback=http%3A%2F%2Flocalhost%2Fcallback\": context deadline exceeded", op.LastAttemptFailure.Message) + require.Equal(t, 0, len(events)) + }, + }, + { + name: "invocation timeout by ScheduleToCloseTimeout", + requestTimeout: time.Hour, + schedToCloseTimeout: time.Microsecond, + destinationDown: true, + expectedMetricOutcome: "request-timeout", + onStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) { + time.Sleep(time.Millisecond * 100) + return &nexus.HandlerStartOperationResultAsync{OperationID: "op-id"}, nil + }, + checkOutcome: func(t *testing.T, op nexusoperations.Operation, events []*historypb.HistoryEvent) { + require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, op.State()) + require.NotNil(t, op.LastAttemptFailure.GetApplicationFailureInfo()) + require.Regexp(t, "Post \"http://localhost:\\d+/service/operation\\?callback=http%3A%2F%2Flocalhost%2Fcallback\": context deadline exceeded", op.LastAttemptFailure.Message) + require.Equal(t, 0, len(events)) + }, + }, + { + name: "invocation timeout by Operation-Timeout header", + requestTimeout: time.Hour, + headers: nexus.Header{"Operation-Timeout": time.Microsecond.String()}, destinationDown: true, expectedMetricOutcome: "request-timeout", onStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) { @@ -390,10 +429,13 @@ func TestProcessInvocationTask(t *testing.T) { nexustest.NewNexusServer(t, listenAddr, h) reg := newRegistry(t) - event := mustNewScheduledEvent(time.Now(), time.Hour) + event := mustNewScheduledEvent(time.Now(), tc.schedToCloseTimeout) if tc.eventHasNoEndpointID { event.GetNexusOperationScheduledEventAttributes().EndpointId = "" } + if tc.headers != nil { + event.GetNexusOperationScheduledEventAttributes().NexusHeader = tc.headers + } backend := &hsmtest.NodeBackend{Events: []*historypb.HistoryEvent{event}} node := newOperationNode(t, backend, backend.Events[0]) env := fakeEnv{node} diff --git a/components/nexusoperations/helpers_test.go b/components/nexusoperations/helpers_test.go index 7b68f61c7b9..d7fde7abfa7 100644 --- a/components/nexusoperations/helpers_test.go +++ b/components/nexusoperations/helpers_test.go @@ -97,20 +97,24 @@ func mustNewScheduledEvent(schedTime time.Time, timeout time.Duration) *historyp panic(err) } + attr := &historypb.NexusOperationScheduledEventAttributes{ + EndpointId: "endpoint-id", + Endpoint: "endpoint", + Service: "service", + Operation: "operation", + Input: payload, + RequestId: uuid.NewString(), + } + if timeout > 0 { + attr.ScheduleToCloseTimeout = durationpb.New(timeout) + } + return &historypb.HistoryEvent{ EventType: enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, EventId: 1, EventTime: timestamppb.New(schedTime), Attributes: &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{ - NexusOperationScheduledEventAttributes: &historypb.NexusOperationScheduledEventAttributes{ - EndpointId: "endpoint-id", - Endpoint: "endpoint", - Service: "service", - Operation: "operation", - Input: payload, - RequestId: uuid.NewString(), - ScheduleToCloseTimeout: durationpb.New(timeout), - }, + NexusOperationScheduledEventAttributes: attr, }, } }