From 550016fe9feae77325ae9998e0b4150ed2ebff5f Mon Sep 17 00:00:00 2001 From: Zijian Date: Thu, 3 Oct 2024 18:25:36 +0000 Subject: [PATCH] Introduce new type MatchingPollForActivityTaskResponse --- client/matching/client.go | 2 +- client/matching/client_test.go | 4 +- client/matching/interface.go | 2 +- client/matching/interface_mock.go | 4 +- .../errorinjectors/matching_generated.go | 4 +- client/wrappers/grpc/matching_generated.go | 2 +- client/wrappers/metered/matching_generated.go | 6 +-- .../wrappers/retryable/matching_generated.go | 4 +- client/wrappers/thrift/matching_generated.go | 2 +- client/wrappers/timeout/matching_generated.go | 2 +- common/types/mapper/proto/matching.go | 6 +-- common/types/mapper/proto/matching_test.go | 2 +- common/types/mapper/thrift/matching.go | 51 ++++++++++++++++++- common/types/mapper/thrift/matching_test.go | 25 +++++++++ common/types/matching.go | 20 ++++++++ common/types/testdata/service_matching.go | 2 +- service/frontend/api/handler.go | 22 +++++++- service/frontend/api/handler_test.go | 39 ++++++++++++++ service/matching/handler/engine.go | 12 ++--- service/matching/handler/handler.go | 2 +- service/matching/handler/handler_test.go | 4 +- service/matching/handler/interfaces.go | 4 +- service/matching/handler/interfaces_mock.go | 8 +-- service/matching/tasklist/forwarder_test.go | 2 +- service/matching/tasklist/task.go | 4 +- .../wrappers/thrift/thrift_handler_test.go | 2 +- 26 files changed, 193 insertions(+), 44 deletions(-) diff --git a/client/matching/client.go b/client/matching/client.go index 52c4eec2b80..42d393415d3 100644 --- a/client/matching/client.go +++ b/client/matching/client.go @@ -93,7 +93,7 @@ func (c *clientImpl) PollForActivityTask( ctx context.Context, request *types.MatchingPollForActivityTaskRequest, opts ...yarpc.CallOption, -) (*types.PollForActivityTaskResponse, error) { +) (*types.MatchingPollForActivityTaskResponse, error) { partition := c.loadBalancer.PickReadPartition( request.GetDomainUUID(), *request.PollRequest.GetTaskList(), diff --git a/client/matching/client_test.go b/client/matching/client_test.go index 383a430c9b9..09f6c183730 100644 --- a/client/matching/client_test.go +++ b/client/matching/client_test.go @@ -228,9 +228,9 @@ func TestClient_withResponse(t *testing.T) { mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient) { balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition) p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil) - c.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.PollForActivityTaskResponse{}, nil) + c.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.MatchingPollForActivityTaskResponse{}, nil) }, - want: &types.PollForActivityTaskResponse{}, + want: &types.MatchingPollForActivityTaskResponse{}, }, { name: "PollForActivityTask - Error in resolving peer", diff --git a/client/matching/interface.go b/client/matching/interface.go index 9093bb6ccac..b5c76451387 100644 --- a/client/matching/interface.go +++ b/client/matching/interface.go @@ -44,7 +44,7 @@ type Client interface { DescribeTaskList(context.Context, *types.MatchingDescribeTaskListRequest, ...yarpc.CallOption) (*types.DescribeTaskListResponse, error) ListTaskListPartitions(context.Context, *types.MatchingListTaskListPartitionsRequest, ...yarpc.CallOption) (*types.ListTaskListPartitionsResponse, error) GetTaskListsByDomain(context.Context, *types.GetTaskListsByDomainRequest, ...yarpc.CallOption) (*types.GetTaskListsByDomainResponse, error) - PollForActivityTask(context.Context, *types.MatchingPollForActivityTaskRequest, ...yarpc.CallOption) (*types.PollForActivityTaskResponse, error) + PollForActivityTask(context.Context, *types.MatchingPollForActivityTaskRequest, ...yarpc.CallOption) (*types.MatchingPollForActivityTaskResponse, error) PollForDecisionTask(context.Context, *types.MatchingPollForDecisionTaskRequest, ...yarpc.CallOption) (*types.MatchingPollForDecisionTaskResponse, error) QueryWorkflow(context.Context, *types.MatchingQueryWorkflowRequest, ...yarpc.CallOption) (*types.QueryWorkflowResponse, error) RespondQueryTaskCompleted(context.Context, *types.MatchingRespondQueryTaskCompletedRequest, ...yarpc.CallOption) error diff --git a/client/matching/interface_mock.go b/client/matching/interface_mock.go index cdaf3474a7d..5887d0e8d5f 100644 --- a/client/matching/interface_mock.go +++ b/client/matching/interface_mock.go @@ -177,14 +177,14 @@ func (mr *MockClientMockRecorder) ListTaskListPartitions(arg0, arg1 interface{}, } // PollForActivityTask mocks base method. -func (m *MockClient) PollForActivityTask(arg0 context.Context, arg1 *types.MatchingPollForActivityTaskRequest, arg2 ...yarpc.CallOption) (*types.PollForActivityTaskResponse, error) { +func (m *MockClient) PollForActivityTask(arg0 context.Context, arg1 *types.MatchingPollForActivityTaskRequest, arg2 ...yarpc.CallOption) (*types.MatchingPollForActivityTaskResponse, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "PollForActivityTask", varargs...) - ret0, _ := ret[0].(*types.PollForActivityTaskResponse) + ret0, _ := ret[0].(*types.MatchingPollForActivityTaskResponse) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/client/wrappers/errorinjectors/matching_generated.go b/client/wrappers/errorinjectors/matching_generated.go index 1f30730e504..161d8229863 100644 --- a/client/wrappers/errorinjectors/matching_generated.go +++ b/client/wrappers/errorinjectors/matching_generated.go @@ -182,11 +182,11 @@ func (c *matchingClient) ListTaskListPartitions(ctx context.Context, mp1 *types. return } -func (c *matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (pp1 *types.PollForActivityTaskResponse, err error) { +func (c *matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (mp2 *types.MatchingPollForActivityTaskResponse, err error) { fakeErr := c.fakeErrFn(c.errorRate) var forwardCall bool if forwardCall = c.forwardCallFn(fakeErr); forwardCall { - pp1, err = c.client.PollForActivityTask(ctx, mp1, p1...) + mp2, err = c.client.PollForActivityTask(ctx, mp1, p1...) } if fakeErr != nil { diff --git a/client/wrappers/grpc/matching_generated.go b/client/wrappers/grpc/matching_generated.go index 0c9a237619a..fc1eed13446 100644 --- a/client/wrappers/grpc/matching_generated.go +++ b/client/wrappers/grpc/matching_generated.go @@ -65,7 +65,7 @@ func (g matchingClient) ListTaskListPartitions(ctx context.Context, mp1 *types.M return proto.ToMatchingListTaskListPartitionsResponse(response), proto.ToError(err) } -func (g matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (pp1 *types.PollForActivityTaskResponse, err error) { +func (g matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (mp2 *types.MatchingPollForActivityTaskResponse, err error) { response, err := g.c.PollForActivityTask(ctx, proto.FromMatchingPollForActivityTaskRequest(mp1), p1...) return proto.ToMatchingPollForActivityTaskResponse(response), proto.ToError(err) } diff --git a/client/wrappers/metered/matching_generated.go b/client/wrappers/metered/matching_generated.go index 99460ab7ba7..55ad7dc8429 100644 --- a/client/wrappers/metered/matching_generated.go +++ b/client/wrappers/metered/matching_generated.go @@ -136,18 +136,18 @@ func (c *matchingClient) ListTaskListPartitions(ctx context.Context, mp1 *types. return lp1, err } -func (c *matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (pp1 *types.PollForActivityTaskResponse, err error) { +func (c *matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (mp2 *types.MatchingPollForActivityTaskResponse, err error) { c.metricsClient.IncCounter(metrics.MatchingClientPollForActivityTaskScope, metrics.CadenceClientRequests) c.emitForwardedFromStats(metrics.MatchingClientPollForActivityTaskScope, mp1) sw := c.metricsClient.StartTimer(metrics.MatchingClientPollForActivityTaskScope, metrics.CadenceClientLatency) - pp1, err = c.client.PollForActivityTask(ctx, mp1, p1...) + mp2, err = c.client.PollForActivityTask(ctx, mp1, p1...) sw.Stop() if err != nil { c.metricsClient.IncCounter(metrics.MatchingClientPollForActivityTaskScope, metrics.CadenceClientFailures) } - return pp1, err + return mp2, err } func (c *matchingClient) PollForDecisionTask(ctx context.Context, mp1 *types.MatchingPollForDecisionTaskRequest, p1 ...yarpc.CallOption) (mp2 *types.MatchingPollForDecisionTaskResponse, err error) { diff --git a/client/wrappers/retryable/matching_generated.go b/client/wrappers/retryable/matching_generated.go index 5af5902bae9..1c9960e2c27 100644 --- a/client/wrappers/retryable/matching_generated.go +++ b/client/wrappers/retryable/matching_generated.go @@ -107,8 +107,8 @@ func (c *matchingClient) ListTaskListPartitions(ctx context.Context, mp1 *types. return resp, err } -func (c *matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (pp1 *types.PollForActivityTaskResponse, err error) { - var resp *types.PollForActivityTaskResponse +func (c *matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (mp2 *types.MatchingPollForActivityTaskResponse, err error) { + var resp *types.MatchingPollForActivityTaskResponse op := func() error { var err error resp, err = c.client.PollForActivityTask(ctx, mp1, p1...) diff --git a/client/wrappers/thrift/matching_generated.go b/client/wrappers/thrift/matching_generated.go index ebe6e76dda1..ad9035f8937 100644 --- a/client/wrappers/thrift/matching_generated.go +++ b/client/wrappers/thrift/matching_generated.go @@ -65,7 +65,7 @@ func (g matchingClient) ListTaskListPartitions(ctx context.Context, mp1 *types.M return thrift.ToMatchingListTaskListPartitionsResponse(response), thrift.ToError(err) } -func (g matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (pp1 *types.PollForActivityTaskResponse, err error) { +func (g matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (mp2 *types.MatchingPollForActivityTaskResponse, err error) { response, err := g.c.PollForActivityTask(ctx, thrift.FromMatchingPollForActivityTaskRequest(mp1), p1...) return thrift.ToMatchingPollForActivityTaskResponse(response), thrift.ToError(err) } diff --git a/client/wrappers/timeout/matching_generated.go b/client/wrappers/timeout/matching_generated.go index 2bdc741e127..bc3e45ff83c 100644 --- a/client/wrappers/timeout/matching_generated.go +++ b/client/wrappers/timeout/matching_generated.go @@ -92,7 +92,7 @@ func (c *matchingClient) ListTaskListPartitions(ctx context.Context, mp1 *types. return c.client.ListTaskListPartitions(ctx, mp1, p1...) } -func (c *matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (pp1 *types.PollForActivityTaskResponse, err error) { +func (c *matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (mp2 *types.MatchingPollForActivityTaskResponse, err error) { ctx, cancel := createContext(ctx, c.longPollTimeout) defer cancel() return c.client.PollForActivityTask(ctx, mp1, p1...) diff --git a/common/types/mapper/proto/matching.go b/common/types/mapper/proto/matching.go index 22e6ddbdc56..784cc5eb54d 100644 --- a/common/types/mapper/proto/matching.go +++ b/common/types/mapper/proto/matching.go @@ -324,7 +324,7 @@ func ToMatchingPollForActivityTaskRequest(t *matchingv1.PollForActivityTaskReque } } -func FromMatchingPollForActivityTaskResponse(t *types.PollForActivityTaskResponse) *matchingv1.PollForActivityTaskResponse { +func FromMatchingPollForActivityTaskResponse(t *types.MatchingPollForActivityTaskResponse) *matchingv1.PollForActivityTaskResponse { if t == nil { return nil } @@ -348,11 +348,11 @@ func FromMatchingPollForActivityTaskResponse(t *types.PollForActivityTaskRespons } } -func ToMatchingPollForActivityTaskResponse(t *matchingv1.PollForActivityTaskResponse) *types.PollForActivityTaskResponse { +func ToMatchingPollForActivityTaskResponse(t *matchingv1.PollForActivityTaskResponse) *types.MatchingPollForActivityTaskResponse { if t == nil { return nil } - return &types.PollForActivityTaskResponse{ + return &types.MatchingPollForActivityTaskResponse{ TaskToken: t.TaskToken, WorkflowExecution: ToWorkflowExecution(t.WorkflowExecution), ActivityID: t.ActivityId, diff --git a/common/types/mapper/proto/matching_test.go b/common/types/mapper/proto/matching_test.go index 4aa54d2e08a..ea47bb1a8e5 100644 --- a/common/types/mapper/proto/matching_test.go +++ b/common/types/mapper/proto/matching_test.go @@ -84,7 +84,7 @@ func TestMatchingPollForActivityTaskRequest(t *testing.T) { } func TestMatchingPollForActivityTaskResponse(t *testing.T) { - for _, item := range []*types.PollForActivityTaskResponse{nil, {}, &testdata.MatchingPollForActivityTaskResponse} { + for _, item := range []*types.MatchingPollForActivityTaskResponse{nil, {}, &testdata.MatchingPollForActivityTaskResponse} { assert.Equal(t, item, ToMatchingPollForActivityTaskResponse(FromMatchingPollForActivityTaskResponse(item))) } } diff --git a/common/types/mapper/thrift/matching.go b/common/types/mapper/thrift/matching.go index a7cf6e9d687..6e87800eb50 100644 --- a/common/types/mapper/thrift/matching.go +++ b/common/types/mapper/thrift/matching.go @@ -22,6 +22,7 @@ package thrift import ( "github.com/uber/cadence/.gen/go/matching" + "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/types" ) @@ -35,8 +36,6 @@ var ( ToMatchingGetTaskListsByDomainResponse = ToGetTaskListsByDomainResponse FromMatchingListTaskListPartitionsResponse = FromListTaskListPartitionsResponse ToMatchingListTaskListPartitionsResponse = ToListTaskListPartitionsResponse - FromMatchingPollForActivityTaskResponse = FromPollForActivityTaskResponse - ToMatchingPollForActivityTaskResponse = ToPollForActivityTaskResponse FromMatchingQueryWorkflowResponse = FromQueryWorkflowResponse ToMatchingQueryWorkflowResponse = ToQueryWorkflowResponse ) @@ -324,6 +323,54 @@ func ToMatchingPollForDecisionTaskResponse(t *matching.PollForDecisionTaskRespon } } +func FromMatchingPollForActivityTaskResponse(t *types.MatchingPollForActivityTaskResponse) *shared.PollForActivityTaskResponse { + if t == nil { + return nil + } + return &shared.PollForActivityTaskResponse{ + TaskToken: t.TaskToken, + WorkflowExecution: FromWorkflowExecution(t.WorkflowExecution), + ActivityId: &t.ActivityID, + ActivityType: FromActivityType(t.ActivityType), + Input: t.Input, + ScheduledTimestamp: t.ScheduledTimestamp, + ScheduleToCloseTimeoutSeconds: t.ScheduleToCloseTimeoutSeconds, + StartedTimestamp: t.StartedTimestamp, + StartToCloseTimeoutSeconds: t.StartToCloseTimeoutSeconds, + HeartbeatTimeoutSeconds: t.HeartbeatTimeoutSeconds, + Attempt: &t.Attempt, + ScheduledTimestampOfThisAttempt: t.ScheduledTimestampOfThisAttempt, + HeartbeatDetails: t.HeartbeatDetails, + WorkflowType: FromWorkflowType(t.WorkflowType), + WorkflowDomain: &t.WorkflowDomain, + Header: FromHeader(t.Header), + } +} + +func ToMatchingPollForActivityTaskResponse(t *shared.PollForActivityTaskResponse) *types.MatchingPollForActivityTaskResponse { + if t == nil { + return nil + } + return &types.MatchingPollForActivityTaskResponse{ + TaskToken: t.TaskToken, + WorkflowExecution: ToWorkflowExecution(t.WorkflowExecution), + ActivityID: t.GetActivityId(), + ActivityType: ToActivityType(t.ActivityType), + Input: t.Input, + ScheduledTimestamp: t.ScheduledTimestamp, + ScheduleToCloseTimeoutSeconds: t.ScheduleToCloseTimeoutSeconds, + StartedTimestamp: t.StartedTimestamp, + StartToCloseTimeoutSeconds: t.StartToCloseTimeoutSeconds, + HeartbeatTimeoutSeconds: t.HeartbeatTimeoutSeconds, + Attempt: t.GetAttempt(), + ScheduledTimestampOfThisAttempt: t.ScheduledTimestampOfThisAttempt, + HeartbeatDetails: t.HeartbeatDetails, + WorkflowType: ToWorkflowType(t.WorkflowType), + WorkflowDomain: t.GetWorkflowDomain(), + Header: ToHeader(t.Header), + } +} + // FromMatchingQueryWorkflowRequest converts internal QueryWorkflowRequest type to thrift func FromMatchingQueryWorkflowRequest(t *types.MatchingQueryWorkflowRequest) *matching.QueryWorkflowRequest { if t == nil { diff --git a/common/types/mapper/thrift/matching_test.go b/common/types/mapper/thrift/matching_test.go index 19291d461f1..83564d4fed5 100644 --- a/common/types/mapper/thrift/matching_test.go +++ b/common/types/mapper/thrift/matching_test.go @@ -262,6 +262,31 @@ func TestMatchingPollForDecisionResponse(t *testing.T) { } } +func TestMatchingPollForActivityTaskResponse(t *testing.T) { + testCases := []struct { + desc string + input *types.MatchingPollForActivityTaskResponse + }{ + { + desc: "non-nil input test", + input: &testdata.MatchingPollForActivityTaskResponse, + }, + { + desc: "empty input test", + input: &types.MatchingPollForActivityTaskResponse{}, + }, + { + desc: "nil input test", + input: nil, + }, + } + for _, tc := range testCases { + thriftObj := FromMatchingPollForActivityTaskResponse(tc.input) + roundTripObj := ToMatchingPollForActivityTaskResponse(thriftObj) + assert.Equal(t, tc.input, roundTripObj) + } +} + func TestMatchingQueryWorkflowRequest(t *testing.T) { testCases := []struct { desc string diff --git a/common/types/matching.go b/common/types/matching.go index 3767409a1ea..9d6e016e295 100644 --- a/common/types/matching.go +++ b/common/types/matching.go @@ -480,6 +480,26 @@ func (v *MatchingPollForDecisionTaskResponse) GetTotalHistoryBytes() (o int64) { return } +type MatchingPollForActivityTaskResponse struct { + TaskToken []byte `json:"taskToken,omitempty"` + WorkflowExecution *WorkflowExecution `json:"workflowExecution,omitempty"` + ActivityID string `json:"activityId,omitempty"` + ActivityType *ActivityType `json:"activityType,omitempty"` + Input []byte `json:"input,omitempty"` + ScheduledTimestamp *int64 `json:"scheduledTimestamp,omitempty"` + ScheduleToCloseTimeoutSeconds *int32 `json:"scheduleToCloseTimeoutSeconds,omitempty"` + StartedTimestamp *int64 `json:"startedTimestamp,omitempty"` + StartToCloseTimeoutSeconds *int32 `json:"startToCloseTimeoutSeconds,omitempty"` + HeartbeatTimeoutSeconds *int32 `json:"heartbeatTimeoutSeconds,omitempty"` + Attempt int32 `json:"attempt,omitempty"` + ScheduledTimestampOfThisAttempt *int64 `json:"scheduledTimestampOfThisAttempt,omitempty"` + HeartbeatDetails []byte `json:"heartbeatDetails,omitempty"` + WorkflowType *WorkflowType `json:"workflowType,omitempty"` + WorkflowDomain string `json:"workflowDomain,omitempty"` + Header *Header `json:"header,omitempty"` + BacklogCountHint int64 `json:"backlogCountHint,omitempty"` +} + // MatchingQueryWorkflowRequest is an internal type (TBD...) type MatchingQueryWorkflowRequest struct { DomainUUID string `json:"domainUUID,omitempty"` diff --git a/common/types/testdata/service_matching.go b/common/types/testdata/service_matching.go index 2d9eecbf041..4ab02b16754 100644 --- a/common/types/testdata/service_matching.go +++ b/common/types/testdata/service_matching.go @@ -81,7 +81,7 @@ var ( ForwardedFrom: ForwardedFrom, IsolationGroup: IsolationGroup, } - MatchingPollForActivityTaskResponse = types.PollForActivityTaskResponse{ + MatchingPollForActivityTaskResponse = types.MatchingPollForActivityTaskResponse{ TaskToken: TaskToken, WorkflowExecution: &WorkflowExecution, ActivityID: ActivityID, diff --git a/service/frontend/api/handler.go b/service/frontend/api/handler.go index 1cc90004da5..e817603330e 100644 --- a/service/frontend/api/handler.go +++ b/service/frontend/api/handler.go @@ -554,8 +554,9 @@ func (wh *WorkflowHandler) PollForActivityTask( return &types.PollForActivityTaskResponse{}, nil } pollerID := uuid.New().String() + var matchingResp *types.MatchingPollForActivityTaskResponse op := func() error { - resp, err = wh.GetMatchingClient().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{ + matchingResp, err = wh.GetMatchingClient().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{ DomainUUID: domainID, PollerID: pollerID, PollRequest: pollRequest, @@ -581,7 +582,24 @@ func (wh *WorkflowHandler) PollForActivityTask( return nil, err } } - return resp, nil + return &types.PollForActivityTaskResponse{ + TaskToken: matchingResp.TaskToken, + WorkflowExecution: matchingResp.WorkflowExecution, + ActivityID: matchingResp.ActivityID, + ActivityType: matchingResp.ActivityType, + Input: matchingResp.Input, + ScheduledTimestamp: matchingResp.ScheduledTimestamp, + ScheduleToCloseTimeoutSeconds: matchingResp.ScheduleToCloseTimeoutSeconds, + StartedTimestamp: matchingResp.StartedTimestamp, + StartToCloseTimeoutSeconds: matchingResp.StartToCloseTimeoutSeconds, + HeartbeatTimeoutSeconds: matchingResp.HeartbeatTimeoutSeconds, + Attempt: matchingResp.Attempt, + ScheduledTimestampOfThisAttempt: matchingResp.ScheduledTimestampOfThisAttempt, + HeartbeatDetails: matchingResp.HeartbeatDetails, + WorkflowType: matchingResp.WorkflowType, + WorkflowDomain: matchingResp.WorkflowDomain, + Header: matchingResp.Header, + }, nil } // PollForDecisionTask - Poll for a decision task. diff --git a/service/frontend/api/handler_test.go b/service/frontend/api/handler_test.go index cb8affdf408..1931ae5cc8f 100644 --- a/service/frontend/api/handler_test.go +++ b/service/frontend/api/handler_test.go @@ -309,6 +309,45 @@ func (s *workflowHandlerSuite) TestPollForActivityTask_IsolationGroupDrained() { s.Equal(&types.PollForActivityTaskResponse{}, resp) } +func (s *workflowHandlerSuite) TestPollForActivityTask_Success() { + config := s.newConfig(dc.NewInMemoryClient()) + config.EnableTasklistIsolation = dc.GetBoolPropertyFnFilteredByDomain(true) + wh := s.getWorkflowHandler(config) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + isolationGroup := "dca1" + ctx = partition.ContextWithIsolationGroup(ctx, isolationGroup) + + s.mockDomainCache.EXPECT().GetDomainID(s.testDomain).Return(s.testDomainID, nil) + s.mockResource.IsolationGroups.EXPECT().IsDrained(gomock.Any(), s.testDomain, isolationGroup).Return(false, nil).AnyTimes() + s.mockMatchingClient.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any()).Return(&types.MatchingPollForActivityTaskResponse{ + TaskToken: []byte("token"), + WorkflowExecution: &types.WorkflowExecution{ + WorkflowID: "wid", + RunID: "rid", + }, + ActivityID: "1", + Input: []byte(`{"key": "value"}`), + }, nil) + resp, err := wh.PollForActivityTask(ctx, &types.PollForActivityTaskRequest{ + Domain: s.testDomain, + TaskList: &types.TaskList{ + Name: "task-list", + }, + }) + s.NoError(err) + s.Equal(&types.PollForActivityTaskResponse{ + TaskToken: []byte("token"), + WorkflowExecution: &types.WorkflowExecution{ + WorkflowID: "wid", + RunID: "rid", + }, + ActivityID: "1", + Input: []byte(`{"key": "value"}`), + }, resp) +} + func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_RequestIdNotSet() { config := s.newConfig(dc.NewInMemoryClient()) config.UserRPS = dc.GetIntPropertyFn(10) diff --git a/service/matching/handler/engine.go b/service/matching/handler/engine.go index a3a1b0b18c5..c13c75c0761 100644 --- a/service/matching/handler/engine.go +++ b/service/matching/handler/engine.go @@ -111,7 +111,7 @@ var ( // EmptyPollForDecisionTaskResponse is the response when there are no decision tasks to hand out emptyPollForDecisionTaskResponse = &types.MatchingPollForDecisionTaskResponse{} // EmptyPollForActivityTaskResponse is the response when there are no activity tasks to hand out - emptyPollForActivityTaskResponse = &types.PollForActivityTaskResponse{} + emptyPollForActivityTaskResponse = &types.MatchingPollForActivityTaskResponse{} historyServiceOperationRetryPolicy = common.CreateHistoryServiceRetryPolicy() errPumpClosed = errors.New("task list pump closed its channel") @@ -648,7 +648,7 @@ pollLoop: func (e *matchingEngineImpl) PollForActivityTask( hCtx *handlerContext, req *types.MatchingPollForActivityTaskRequest, -) (*types.PollForActivityTaskResponse, error) { +) (*types.MatchingPollForActivityTaskResponse, error) { domainID := req.GetDomainUUID() pollerID := req.GetPollerID() request := req.PollRequest @@ -741,11 +741,11 @@ pollLoop: func (e *matchingEngineImpl) createSyncMatchPollForActivityTaskResponse( task *tasklist.InternalTask, activityTaskDispatchInfo *types.ActivityTaskDispatchInfo, -) *types.PollForActivityTaskResponse { +) *types.MatchingPollForActivityTaskResponse { scheduledEvent := activityTaskDispatchInfo.ScheduledEvent attributes := scheduledEvent.ActivityTaskScheduledEventAttributes - response := &types.PollForActivityTaskResponse{} + response := &types.MatchingPollForActivityTaskResponse{} response.ActivityID = attributes.ActivityID response.ActivityType = attributes.ActivityType response.Header = attributes.Header @@ -1072,7 +1072,7 @@ func (e *matchingEngineImpl) createPollForActivityTaskResponse( task *tasklist.InternalTask, historyResponse *types.RecordActivityTaskStartedResponse, scope metrics.Scope, -) *types.PollForActivityTaskResponse { +) *types.MatchingPollForActivityTaskResponse { scheduledEvent := historyResponse.ScheduledEvent if scheduledEvent.ActivityTaskScheduledEventAttributes == nil { @@ -1086,7 +1086,7 @@ func (e *matchingEngineImpl) createPollForActivityTaskResponse( scope.RecordTimer(metrics.AsyncMatchLatencyPerTaskList, time.Since(task.Event.CreatedTime)) } - response := &types.PollForActivityTaskResponse{} + response := &types.MatchingPollForActivityTaskResponse{} response.ActivityID = attributes.ActivityID response.ActivityType = attributes.ActivityType response.Header = attributes.Header diff --git a/service/matching/handler/handler.go b/service/matching/handler/handler.go index c7ca760ae80..22fa6eb6728 100644 --- a/service/matching/handler/handler.go +++ b/service/matching/handler/handler.go @@ -194,7 +194,7 @@ func (h *handlerImpl) AddDecisionTask( func (h *handlerImpl) PollForActivityTask( ctx context.Context, request *types.MatchingPollForActivityTaskRequest, -) (resp *types.PollForActivityTaskResponse, retError error) { +) (resp *types.MatchingPollForActivityTaskResponse, retError error) { defer func() { log.CapturePanic(recover(), h.logger, &retError) }() domainName := h.domainName(request.GetDomainUUID()) diff --git a/service/matching/handler/handler_test.go b/service/matching/handler/handler_test.go index f689ae73275..aa697c270e5 100644 --- a/service/matching/handler/handler_test.go +++ b/service/matching/handler/handler_test.go @@ -277,7 +277,7 @@ func (s *handlerSuite) TestPollForActivityTask() { setupMocks: func() { s.mockLimiter.EXPECT().Allow().Return(true).Times(1) s.mockEngine.EXPECT().PollForActivityTask(gomock.Any(), &request). - Return(&types.PollForActivityTaskResponse{TaskToken: []byte("task-token")}, nil).Times(1) + Return(&types.MatchingPollForActivityTaskResponse{TaskToken: []byte("task-token")}, nil).Times(1) }, getCtx: func() (context.Context, context.CancelFunc) { ctx, cancel := context.WithDeadline(context.Background(), time.Now()) @@ -331,7 +331,7 @@ func (s *handlerSuite) TestPollForActivityTask() { s.Equal(tc.err, err) } else { s.NoError(err) - s.Equal(&types.PollForActivityTaskResponse{TaskToken: []byte("task-token")}, resp) + s.Equal(&types.MatchingPollForActivityTaskResponse{TaskToken: []byte("task-token")}, resp) } }) } diff --git a/service/matching/handler/interfaces.go b/service/matching/handler/interfaces.go index 21f39f160e0..1535fe92693 100644 --- a/service/matching/handler/interfaces.go +++ b/service/matching/handler/interfaces.go @@ -39,7 +39,7 @@ type ( AddDecisionTask(hCtx *handlerContext, request *types.AddDecisionTaskRequest) (syncMatch bool, err error) AddActivityTask(hCtx *handlerContext, request *types.AddActivityTaskRequest) (syncMatch bool, err error) PollForDecisionTask(hCtx *handlerContext, request *types.MatchingPollForDecisionTaskRequest) (*types.MatchingPollForDecisionTaskResponse, error) - PollForActivityTask(hCtx *handlerContext, request *types.MatchingPollForActivityTaskRequest) (*types.PollForActivityTaskResponse, error) + PollForActivityTask(hCtx *handlerContext, request *types.MatchingPollForActivityTaskRequest) (*types.MatchingPollForActivityTaskResponse, error) QueryWorkflow(hCtx *handlerContext, request *types.MatchingQueryWorkflowRequest) (*types.QueryWorkflowResponse, error) RespondQueryTaskCompleted(hCtx *handlerContext, request *types.MatchingRespondQueryTaskCompletedRequest) error CancelOutstandingPoll(hCtx *handlerContext, request *types.CancelOutstandingPollRequest) error @@ -59,7 +59,7 @@ type ( DescribeTaskList(context.Context, *types.MatchingDescribeTaskListRequest) (*types.DescribeTaskListResponse, error) ListTaskListPartitions(context.Context, *types.MatchingListTaskListPartitionsRequest) (*types.ListTaskListPartitionsResponse, error) GetTaskListsByDomain(context.Context, *types.GetTaskListsByDomainRequest) (*types.GetTaskListsByDomainResponse, error) - PollForActivityTask(context.Context, *types.MatchingPollForActivityTaskRequest) (*types.PollForActivityTaskResponse, error) + PollForActivityTask(context.Context, *types.MatchingPollForActivityTaskRequest) (*types.MatchingPollForActivityTaskResponse, error) PollForDecisionTask(context.Context, *types.MatchingPollForDecisionTaskRequest) (*types.MatchingPollForDecisionTaskResponse, error) QueryWorkflow(context.Context, *types.MatchingQueryWorkflowRequest) (*types.QueryWorkflowResponse, error) RespondQueryTaskCompleted(context.Context, *types.MatchingRespondQueryTaskCompletedRequest) error diff --git a/service/matching/handler/interfaces_mock.go b/service/matching/handler/interfaces_mock.go index 78401a46838..b2101288f56 100644 --- a/service/matching/handler/interfaces_mock.go +++ b/service/matching/handler/interfaces_mock.go @@ -148,10 +148,10 @@ func (mr *MockEngineMockRecorder) ListTaskListPartitions(hCtx, request interface } // PollForActivityTask mocks base method. -func (m *MockEngine) PollForActivityTask(hCtx *handlerContext, request *types.MatchingPollForActivityTaskRequest) (*types.PollForActivityTaskResponse, error) { +func (m *MockEngine) PollForActivityTask(hCtx *handlerContext, request *types.MatchingPollForActivityTaskRequest) (*types.MatchingPollForActivityTaskResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PollForActivityTask", hCtx, request) - ret0, _ := ret[0].(*types.PollForActivityTaskResponse) + ret0, _ := ret[0].(*types.MatchingPollForActivityTaskResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -356,10 +356,10 @@ func (mr *MockHandlerMockRecorder) ListTaskListPartitions(arg0, arg1 interface{} } // PollForActivityTask mocks base method. -func (m *MockHandler) PollForActivityTask(arg0 context.Context, arg1 *types.MatchingPollForActivityTaskRequest) (*types.PollForActivityTaskResponse, error) { +func (m *MockHandler) PollForActivityTask(arg0 context.Context, arg1 *types.MatchingPollForActivityTaskRequest) (*types.MatchingPollForActivityTaskResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PollForActivityTask", arg0, arg1) - ret0, _ := ret[0].(*types.PollForActivityTaskResponse) + ret0, _ := ret[0].(*types.MatchingPollForActivityTaskResponse) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/service/matching/tasklist/forwarder_test.go b/service/matching/tasklist/forwarder_test.go index 34cd3da19cd..77af4d36b0d 100644 --- a/service/matching/tasklist/forwarder_test.go +++ b/service/matching/tasklist/forwarder_test.go @@ -235,7 +235,7 @@ func (t *ForwarderTestSuite) TestForwardPollForActivity() { pollerID := uuid.New() ctx := ContextWithPollerID(context.Background(), pollerID) ctx = ContextWithIdentity(ctx, "id1") - resp := &types.PollForActivityTaskResponse{} + resp := &types.MatchingPollForActivityTaskResponse{} var request *types.MatchingPollForActivityTaskRequest t.client.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any()).Do( diff --git a/service/matching/tasklist/task.go b/service/matching/tasklist/task.go index c27c27780d8..379221689eb 100644 --- a/service/matching/tasklist/task.go +++ b/service/matching/tasklist/task.go @@ -41,7 +41,7 @@ type ( // another matching host. This type of task is already marked as started startedTaskInfo struct { decisionTaskInfo *types.MatchingPollForDecisionTaskResponse - activityTaskInfo *types.PollForActivityTaskResponse + activityTaskInfo *types.MatchingPollForActivityTaskResponse } // InternalTask represents an activity, decision, query or started (received from another host). // this struct is more like a union and only one of [ query, event, forwarded ] is @@ -156,7 +156,7 @@ func (task *InternalTask) PollForDecisionResponse() *types.MatchingPollForDecisi // pollForActivityResponse returns the poll response for an activity task that is // already marked as started. This method should only be called when isStarted() is true -func (task *InternalTask) PollForActivityResponse() *types.PollForActivityTaskResponse { +func (task *InternalTask) PollForActivityResponse() *types.MatchingPollForActivityTaskResponse { if task.IsStarted() { return task.started.activityTaskInfo } diff --git a/service/matching/wrappers/thrift/thrift_handler_test.go b/service/matching/wrappers/thrift/thrift_handler_test.go index 6113edc7d65..cdeeadb7768 100644 --- a/service/matching/wrappers/thrift/thrift_handler_test.go +++ b/service/matching/wrappers/thrift/thrift_handler_test.go @@ -78,7 +78,7 @@ func TestThriftHandler(t *testing.T) { assert.Equal(t, expectedErr, err) }) t.Run("PollForActivityTask", func(t *testing.T) { - h.EXPECT().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{}).Return(&types.PollForActivityTaskResponse{}, internalErr).Times(1) + h.EXPECT().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{}).Return(&types.MatchingPollForActivityTaskResponse{}, internalErr).Times(1) resp, err := th.PollForActivityTask(ctx, &m.PollForActivityTaskRequest{}) assert.Equal(t, s.PollForActivityTaskResponse{WorkflowDomain: common.StringPtr(""), ActivityId: common.StringPtr(""), Attempt: common.Int32Ptr(0)}, *resp) assert.Equal(t, expectedErr, err)