Skip to content

Commit

Permalink
Persist workflow request ids into Cassandra (#5826)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Apr 11, 2024
1 parent 5e98036 commit 8249589
Show file tree
Hide file tree
Showing 23 changed files with 1,321 additions and 157 deletions.
44 changes: 43 additions & 1 deletion common/persistence/data_manager_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,32 @@ const (
TaskTypeWorkflowBackoffTimer
)

// WorkflowRequestType is the type of workflow request
type WorkflowRequestType int

// Types of workflow requests
const (
WorkflowRequestTypeStart WorkflowRequestType = iota
WorkflowRequestTypeSignal
WorkflowRequestTypeCancel
WorkflowRequestTypeReset
)

// CreateWorkflowRequestMode is the mode of create workflow request
type CreateWorkflowRequestMode int

// Modes of create workflow request
const (
// Fail if data with the same domain_id, workflow_id, request_id exists
// It is used for transactions started by external API requests
// to allow us detecting duplicate requests
CreateWorkflowRequestModeNew CreateWorkflowRequestMode = iota
// Upsert the data without checking duplication
// It is used for transactions started by replication stack to achieve
// eventual consistency
CreateWorkflowRequestModeReplicated
)

// UnknownNumRowsAffected is returned when the number of rows that an API affected cannot be determined
const UnknownNumRowsAffected = -1

Expand Down Expand Up @@ -608,7 +634,8 @@ type (

NewWorkflowSnapshot WorkflowSnapshot

DomainName string
WorkflowRequestMode CreateWorkflowRequestMode
DomainName string
}

// CreateWorkflowExecutionResponse is the response to CreateWorkflowExecutionRequest
Expand Down Expand Up @@ -699,6 +726,8 @@ type (

NewWorkflowSnapshot *WorkflowSnapshot

WorkflowRequestMode CreateWorkflowRequestMode

Encoding common.EncodingType // optional binary encoding type

DomainName string
Expand All @@ -719,6 +748,8 @@ type (
// current workflow
CurrentWorkflowMutation *WorkflowMutation

WorkflowRequestMode CreateWorkflowRequestMode

Encoding common.EncodingType // optional binary encoding type

DomainName string
Expand All @@ -733,6 +764,13 @@ type (
Events []*types.HistoryEvent
}

// WorkflowRequest is used as requestID and it's corresponding failover version container
WorkflowRequest struct {
RequestID string
Version int64
RequestType WorkflowRequestType
}

// WorkflowMutation is used as generic workflow execution state mutation
WorkflowMutation struct {
ExecutionInfo *WorkflowExecutionInfo
Expand All @@ -759,6 +797,8 @@ type (
ReplicationTasks []Task
TimerTasks []Task

WorkflowRequests []*WorkflowRequest

Condition int64
Checksum checksum.Checksum
}
Expand All @@ -781,6 +821,8 @@ type (
ReplicationTasks []Task
TimerTasks []Task

WorkflowRequests []*WorkflowRequest

Condition int64
Checksum checksum.Checksum
}
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/data_store_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ type (
PreviousLastWriteVersion int64

NewWorkflowSnapshot InternalWorkflowSnapshot

WorkflowRequestMode CreateWorkflowRequestMode
}

// InternalGetReplicationTasksResponse is the response to GetReplicationTask
Expand Down Expand Up @@ -424,6 +426,8 @@ type (
UpdateWorkflowMutation InternalWorkflowMutation

NewWorkflowSnapshot *InternalWorkflowSnapshot

WorkflowRequestMode CreateWorkflowRequestMode
}

// InternalConflictResolveWorkflowExecutionRequest is used to reset workflow execution state for Persistence Interface
Expand All @@ -440,6 +444,8 @@ type (

// current workflow
CurrentWorkflowMutation *InternalWorkflowMutation

WorkflowRequestMode CreateWorkflowRequestMode
}

// InternalWorkflowMutation is used as generic workflow execution state mutation for Persistence Interface
Expand Down Expand Up @@ -469,6 +475,8 @@ type (
TimerTasks []Task
ReplicationTasks []Task

WorkflowRequests []*WorkflowRequest

Condition int64

Checksum checksum.Checksum
Expand All @@ -494,6 +502,8 @@ type (
TimerTasks []Task
ReplicationTasks []Task

WorkflowRequests []*WorkflowRequest

Condition int64

Checksum checksum.Checksum
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

package persistence

import "fmt"

type (
// TimeoutError is returned when a write operation fails due to a timeout
TimeoutError struct {
Expand Down Expand Up @@ -73,8 +75,16 @@ type (
CloseStatus int
LastWriteVersion int64
}

DuplicateRequestError struct {
RunID string
}
)

func (e *DuplicateRequestError) Error() string {
return fmt.Sprintf("Request has already been applied to runID: %s", e.RunID)
}

func (e *InvalidPersistenceRequestError) Error() string {
return e.Msg
}
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ func (m *executionManagerImpl) UpdateWorkflowExecution(

UpdateWorkflowMutation: *serializedWorkflowMutation,
NewWorkflowSnapshot: serializedNewWorkflowSnapshot,

WorkflowRequestMode: request.WorkflowRequestMode,
}
msuss := m.statsComputer.computeMutableStateUpdateStats(newRequest)
err = m.persistence.UpdateWorkflowExecution(ctx, newRequest)
Expand Down Expand Up @@ -563,6 +565,8 @@ func (m *executionManagerImpl) ConflictResolveWorkflowExecution(
NewWorkflowSnapshot: serializedNewWorkflowMutation,

CurrentWorkflowMutation: serializedCurrentWorkflowMutation,

WorkflowRequestMode: request.WorkflowRequestMode,
}
msuss := m.statsComputer.computeMutableStateConflictResolveStats(newRequest)
err = m.persistence.ConflictResolveWorkflowExecution(ctx, newRequest)
Expand Down Expand Up @@ -593,6 +597,8 @@ func (m *executionManagerImpl) CreateWorkflowExecution(
PreviousLastWriteVersion: request.PreviousLastWriteVersion,

NewWorkflowSnapshot: *serializedNewWorkflowSnapshot,

WorkflowRequestMode: request.WorkflowRequestMode,
}

msuss := m.statsComputer.computeMutableStateCreateStats(newRequest)
Expand Down Expand Up @@ -675,6 +681,8 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
ReplicationTasks: input.ReplicationTasks,
TimerTasks: input.TimerTasks,

WorkflowRequests: input.WorkflowRequests,

Condition: input.Condition,
Checksum: input.Checksum,
ChecksumData: checksumData,
Expand Down Expand Up @@ -739,6 +747,8 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot(
ReplicationTasks: input.ReplicationTasks,
TimerTasks: input.TimerTasks,

WorkflowRequests: input.WorkflowRequests,

Condition: input.Condition,
Checksum: input.Checksum,
ChecksumData: checksumData,
Expand Down
48 changes: 44 additions & 4 deletions common/persistence/nosql/nosql_execution_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ func (d *nosqlExecutionStore) CreateWorkflowExecution(
return nil, err
}

workflowRequestWriteMode, err := getWorkflowRequestWriteMode(request.WorkflowRequestMode)
if err != nil {
return nil, err
}

currentWorkflowWriteReq, err := d.prepareCurrentWorkflowRequestForCreateWorkflowTxn(domainID, workflowID, runID, executionInfo, lastWriteVersion, request)
if err != nil {
return nil, err
Expand All @@ -94,13 +99,21 @@ func (d *nosqlExecutionStore) CreateWorkflowExecution(
return nil, err
}

workflowRequests := d.prepareWorkflowRequestRows(domainID, workflowID, runID, newWorkflow.WorkflowRequests, nil)

shardCondition := &nosqlplugin.ShardCondition{
ShardID: d.shardID,
RangeID: request.RangeID,
}

workflowRequestsWriteRequest := &nosqlplugin.WorkflowRequestsWriteRequest{
Rows: workflowRequests,
WriteMode: workflowRequestWriteMode,
}

err = d.db.InsertWorkflowExecutionWithTasks(
ctx,
workflowRequestsWriteRequest,
currentWorkflowWriteReq, workflowExecutionWriteReq,
transferTasks, crossClusterTasks, replicationTasks, timerTasks,
shardCondition,
Expand Down Expand Up @@ -133,6 +146,10 @@ func (d *nosqlExecutionStore) CreateWorkflowExecution(
CloseStatus: conditionFailureErr.WorkflowExecutionAlreadyExists.CloseStatus,
LastWriteVersion: conditionFailureErr.WorkflowExecutionAlreadyExists.LastWriteVersion,
}
case conditionFailureErr.DuplicateRequest != nil:
return nil, &persistence.DuplicateRequestError{
RunID: conditionFailureErr.DuplicateRequest.RunID,
}
default:
// If ever runs into this branch, there is bug in the code either in here, or in the implementation of nosql plugin
err := fmt.Errorf("unsupported conditionFailureReason error")
Expand Down Expand Up @@ -187,6 +204,10 @@ func (d *nosqlExecutionStore) UpdateWorkflowExecution(
return err
}

workflowRequestWriteMode, err := getWorkflowRequestWriteMode(request.WorkflowRequestMode)
if err != nil {
return err
}
var currentWorkflowWriteReq *nosqlplugin.CurrentWorkflowWriteRequest

switch request.Mode {
Expand Down Expand Up @@ -269,7 +290,7 @@ func (d *nosqlExecutionStore) UpdateWorkflowExecution(
var nosqlCrossClusterTasks []*nosqlplugin.CrossClusterTask
var nosqlReplicationTasks []*nosqlplugin.ReplicationTask
var nosqlTimerTasks []*nosqlplugin.TimerTask
var err error
var workflowRequests []*nosqlplugin.WorkflowRequestRow

// 1. current
mutateExecution, err = d.prepareUpdateWorkflowExecutionRequestWithMapsAndEventBuffer(&updateWorkflow)
Expand All @@ -284,6 +305,7 @@ func (d *nosqlExecutionStore) UpdateWorkflowExecution(
if err != nil {
return err
}
workflowRequests = d.prepareWorkflowRequestRows(domainID, workflowID, runID, updateWorkflow.WorkflowRequests, workflowRequests)

// 2. new
if newWorkflow != nil {
Expand All @@ -300,15 +322,21 @@ func (d *nosqlExecutionStore) UpdateWorkflowExecution(
if err != nil {
return err
}
workflowRequests = d.prepareWorkflowRequestRows(domainID, workflowID, newWorkflow.ExecutionInfo.RunID, newWorkflow.WorkflowRequests, workflowRequests)
}

shardCondition := &nosqlplugin.ShardCondition{
ShardID: d.shardID,
RangeID: request.RangeID,
}

workflowRequestsWriteRequest := &nosqlplugin.WorkflowRequestsWriteRequest{
Rows: workflowRequests,
WriteMode: workflowRequestWriteMode,
}

err = d.db.UpdateWorkflowExecutionWithTasks(
ctx, currentWorkflowWriteReq,
ctx, workflowRequestsWriteRequest, currentWorkflowWriteReq,
mutateExecution, insertExecution, nil, // no workflow to reset here
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
shardCondition)
Expand Down Expand Up @@ -336,6 +364,10 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
return err
}

workflowRequestWriteMode, err := getWorkflowRequestWriteMode(request.WorkflowRequestMode)
if err != nil {
return err
}
var currentWorkflowWriteReq *nosqlplugin.CurrentWorkflowWriteRequest
var prevRunID string

Expand Down Expand Up @@ -393,7 +425,7 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
var nosqlCrossClusterTasks []*nosqlplugin.CrossClusterTask
var nosqlReplicationTasks []*nosqlplugin.ReplicationTask
var nosqlTimerTasks []*nosqlplugin.TimerTask
var err error
var workflowRequests []*nosqlplugin.WorkflowRequestRow

// 1. current
if currentWorkflow != nil {
Expand All @@ -409,6 +441,7 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
if err != nil {
return err
}
workflowRequests = d.prepareWorkflowRequestRows(domainID, workflowID, currentWorkflow.ExecutionInfo.RunID, currentWorkflow.WorkflowRequests, workflowRequests)
}

// 2. reset
Expand All @@ -424,6 +457,7 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
if err != nil {
return err
}
workflowRequests = d.prepareWorkflowRequestRows(domainID, workflowID, resetWorkflow.ExecutionInfo.RunID, resetWorkflow.WorkflowRequests, workflowRequests)

// 3. new
if newWorkflow != nil {
Expand All @@ -440,15 +474,21 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
if err != nil {
return err
}
workflowRequests = d.prepareWorkflowRequestRows(domainID, workflowID, newWorkflow.ExecutionInfo.RunID, newWorkflow.WorkflowRequests, workflowRequests)
}

shardCondition := &nosqlplugin.ShardCondition{
ShardID: d.shardID,
RangeID: request.RangeID,
}

workflowRequestsWriteRequest := &nosqlplugin.WorkflowRequestsWriteRequest{
Rows: workflowRequests,
WriteMode: workflowRequestWriteMode,
}

err = d.db.UpdateWorkflowExecutionWithTasks(
ctx, currentWorkflowWriteReq,
ctx, workflowRequestsWriteRequest, currentWorkflowWriteReq,
mutateExecution, insertExecution, resetExecution,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
shardCondition)
Expand Down
Loading

0 comments on commit 8249589

Please sign in to comment.