Skip to content

Commit

Permalink
Fix bug that will cause concurrency access to search attributes map (#…
Browse files Browse the repository at this point in the history
…6262)

What changed?
This is causing concurrent access to the map and crashed production instances.

Why?

Copy search attributes instead of mutate it.
  • Loading branch information
shijiesheng authored Sep 3, 2024
1 parent 05852ef commit 30d403f
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions service/history/task/transfer_task_executor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (t *transferTaskExecutorBase) recordWorkflowStarted(
numClusters int16,
visibilityMemo *types.Memo,
updateTimeUnixNano int64,
searchAttributes map[string][]byte,
immutableSearchAttributes map[string][]byte,
headers map[string][]byte,
) error {

Expand All @@ -182,6 +182,7 @@ func (t *transferTaskExecutorBase) recordWorkflowStarted(
}

// headers are appended into search attributes if enabled
searchAttributes := copySearchAttributes(immutableSearchAttributes)
if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) {
// fail open, if error occurs, just log it; successfully appended headers will be stored
if searchAttributes, err = appendContextHeaderToSearchAttributes(searchAttributes, headers, t.config.ValidSearchAttributes()); err != nil {
Expand Down Expand Up @@ -245,7 +246,7 @@ func (t *transferTaskExecutorBase) upsertWorkflowExecution(
isCron bool,
numClusters int16,
updateTimeUnixNano int64,
searchAttributes map[string][]byte,
immutableSearchAttributes map[string][]byte,
headers map[string][]byte,
) error {

Expand All @@ -258,6 +259,7 @@ func (t *transferTaskExecutorBase) upsertWorkflowExecution(
}

// headers are appended into search attributes if enabled
searchAttributes := copySearchAttributes(immutableSearchAttributes)
if t.config.EnableContextHeaderInVisibility(domain) {
// fail open, if error occurs, just log it; successfully appended headers will be stored
if searchAttributes, err = appendContextHeaderToSearchAttributes(searchAttributes, headers, t.config.ValidSearchAttributes()); err != nil {
Expand Down Expand Up @@ -306,7 +308,7 @@ func (t *transferTaskExecutorBase) recordWorkflowClosed(
isCron bool,
numClusters int16,
updateTimeUnixNano int64,
searchAttributes map[string][]byte,
immutableSearchAttributes map[string][]byte,
headers map[string][]byte,
) error {

Expand Down Expand Up @@ -336,14 +338,16 @@ func (t *transferTaskExecutorBase) recordWorkflowClosed(
archiveVisibility = clusterConfiguredForVisibilityArchival && domainConfiguredForVisibilityArchival
}

if recordWorkflowClose {
// headers are appended into search attributes if enabled
if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) {
// fail open, if error occurs, just log it; successfully appended headers will be stored
if searchAttributes, err = appendContextHeaderToSearchAttributes(searchAttributes, headers, t.config.ValidSearchAttributes()); err != nil {
t.logger.Error("failed to add headers to search attributes", tag.Error(err))
}
// headers are appended into search attributes if enabled
searchAttributes := copySearchAttributes(immutableSearchAttributes)
if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) {
// fail open, if error occurs, just log it; successfully appended headers will be stored
if searchAttributes, err = appendContextHeaderToSearchAttributes(searchAttributes, headers, t.config.ValidSearchAttributes()); err != nil {
t.logger.Error("failed to add headers to search attributes", tag.Error(err))
}
}

if recordWorkflowClose {
if err := t.visibilityMgr.RecordWorkflowExecutionClosed(ctx, &persistence.RecordWorkflowExecutionClosedRequest{
DomainUUID: domainID,
Domain: domain,
Expand Down

0 comments on commit 30d403f

Please sign in to comment.