Skip to content

Commit

Permalink
Persist failover history in DomainInfo data (#6139)
Browse files Browse the repository at this point in the history
**What changed?**
Added functionality to persist recent failover event data in the DomainInfo whenever a valid failover is executed.
`FailoverEvent` contains the failover `timestamp`, `fromCluster`, `toCluster`, and `FailoverType`("Force"/"Grace") information.
`FailoverHistory` is the key in the `DomainInfo` `data`. It's a slice stored as a string containing the `FailoverEvents`, with max size defined by `dynamicconfig.FrontendFailoverHistoryMaxSize`. It has a default value of 5 and domain filter allowed.
`FailoverHistory` always keep the n most recent `FailoverEvents` and it's sorted by descending timestamp.

**Why?**
Persist failover information, improving failover visibility to clients and the cadence team.

**How did you test it?**
Unit tests and integration tests. Tested locally, triggering failovers in a multiple Cadence clusters with replication environment.

**Potential risks**
The change does not affect the logic of UpdateDomain. It adds the failover info to the DomainInfo data. The main risk is that we introduce something that can cause the code to panic. Errors while adding the FailoverHistory are logged as warnings and do not return/interrupt the UpdateDomain action.

**Release notes**

**Documentation Changes**
  • Loading branch information
fimanishi authored Jun 21, 2024
1 parent a438ee4 commit 1e300fc
Show file tree
Hide file tree
Showing 9 changed files with 347 additions and 122 deletions.
23 changes: 23 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ const (
DomainDataKeyForManagedFailover = "IsManagedByCadence"
// DomainDataKeyForPreferredCluster is the key of DomainData for domain rebalance
DomainDataKeyForPreferredCluster = "PreferredCluster"
// DomainDataKeyForFailoverHistory is the key of DomainData for failover history
DomainDataKeyForFailoverHistory = "FailoverHistory"
// DomainDataKeyForReadGroups stores which groups have read permission of the domain API
DomainDataKeyForReadGroups = "READ_GROUPS"
// DomainDataKeyForWriteGroups stores which groups have write permission of the domain API
Expand Down Expand Up @@ -269,3 +271,24 @@ const (
// WorkflowIDRateLimitReason is the reason set in ServiceBusyError when workflow ID rate limit is exceeded
WorkflowIDRateLimitReason = "external-workflow-id-rate-limit"
)

type (
// FailoverType is the enum for representing different failover types
FailoverType int
)

const (
FailoverTypeForce = iota + 1
FailoverTypeGrace
)

func (v FailoverType) String() string {
switch v {
case FailoverTypeForce:
return "Force"
case FailoverTypeGrace:
return "Grace"
default:
return "Unknown"
}
}
57 changes: 57 additions & 0 deletions common/domain/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package domain

import (
"context"
"encoding/json"
"fmt"
"regexp"
"time"
Expand Down Expand Up @@ -101,6 +102,20 @@ type (
RequiredDomainDataKeys dynamicconfig.MapPropertyFn
MaxBadBinaryCount dynamicconfig.IntPropertyFnWithDomainFilter
FailoverCoolDown dynamicconfig.DurationPropertyFnWithDomainFilter
FailoverHistoryMaxSize dynamicconfig.IntPropertyFnWithDomainFilter
}

// FailoverEvent is the failover information to be stored for each failover event in domain data
FailoverEvent struct {
EventTime time.Time `json:"eventTime"`
FromCluster string `json:"fromCluster"`
ToCluster string `json:"toCluster"`
FailoverType string `json:"failoverType"`
}

// FailoverHistory is the history of failovers for a domain limited by the FailoverHistoryMaxSize config
FailoverHistory struct {
FailoverEvents []FailoverEvent
}
)

Expand Down Expand Up @@ -496,9 +511,14 @@ func (d *handlerImpl) UpdateDomain(
if configurationChanged {
configVersion++
}

if activeClusterChanged && isGlobalDomain {
var failoverType common.FailoverType = common.FailoverTypeGrace

// Force failover cleans graceful failover state
if updateRequest.FailoverTimeoutInSeconds == nil {
failoverType = common.FailoverTypeForce

// force failover cleanup graceful failover state
gracefulFailoverEndTime = nil
previousFailoverVersion = common.InitialPreviousFailoverVersion
Expand All @@ -508,6 +528,11 @@ func (d *handlerImpl) UpdateDomain(
failoverVersion,
updateRequest.Name,
)
err = updateFailoverHistory(info, d.config, now, currentActiveCluster, *updateRequest.ActiveClusterName, failoverType)
if err != nil {
d.logger.Warn("failed to update failover history", tag.Error(err))
}

failoverNotificationVersion = notificationVersion
}
lastUpdatedTime = now
Expand Down Expand Up @@ -1262,3 +1287,35 @@ func createUpdateRequest(
NotificationVersion: notificationVersion,
}
}

func updateFailoverHistory(
info *persistence.DomainInfo,
config Config,
eventTime time.Time,
fromCluster string,
toCluster string,
failoverType common.FailoverType,
) error {
data := info.Data
if info.Data == nil {
data = make(map[string]string)
}

newFailoverEvent := FailoverEvent{EventTime: eventTime, FromCluster: fromCluster, ToCluster: toCluster, FailoverType: failoverType.String()}

var failoverHistory []FailoverEvent
_ = json.Unmarshal([]byte(data[common.DomainDataKeyForFailoverHistory]), &failoverHistory)

failoverHistory = append([]FailoverEvent{newFailoverEvent}, failoverHistory...)

// Truncate the history to the max size
failoverHistoryJSON, err := json.Marshal(failoverHistory[:common.MinInt(config.FailoverHistoryMaxSize(info.Name), len(failoverHistory))])
if err != nil {
return err
}
data[common.DomainDataKeyForFailoverHistory] = string(failoverHistoryJSON)

info.Data = data

return nil
}
31 changes: 20 additions & 11 deletions common/domain/handler_MasterCluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package domain

import (
"context"
"encoding/json"
"log"
"os"
"testing"
Expand Down Expand Up @@ -50,13 +51,14 @@ type (
domainHandlerGlobalDomainEnabledPrimaryClusterSuite struct {
*persistencetests.TestBase

minRetentionDays int
maxBadBinaryCount int
domainManager persistence.DomainManager
mockProducer *mocks.KafkaProducer
mockDomainReplicator Replicator
archivalMetadata archiver.ArchivalMetadata
mockArchiverProvider *provider.MockArchiverProvider
minRetentionDays int
maxBadBinaryCount int
failoverHistoryMaxSize int
domainManager persistence.DomainManager
mockProducer *mocks.KafkaProducer
mockDomainReplicator Replicator
archivalMetadata archiver.ArchivalMetadata
mockArchiverProvider *provider.MockArchiverProvider

handler *handlerImpl
}
Expand Down Expand Up @@ -89,6 +91,7 @@ func (s *domainHandlerGlobalDomainEnabledPrimaryClusterSuite) SetupTest() {
dcCollection := dc.NewCollection(dc.NewNopClient(), logger)
s.minRetentionDays = 1
s.maxBadBinaryCount = 10
s.failoverHistoryMaxSize = 5
s.domainManager = s.TestBase.DomainManager
s.mockProducer = &mocks.KafkaProducer{}
s.mockDomainReplicator = NewDomainReplicator(s.mockProducer, logger)
Expand All @@ -102,9 +105,10 @@ func (s *domainHandlerGlobalDomainEnabledPrimaryClusterSuite) SetupTest() {
)
s.mockArchiverProvider = &provider.MockArchiverProvider{}
domainConfig := Config{
MinRetentionDays: dc.GetIntPropertyFn(s.minRetentionDays),
MaxBadBinaryCount: dc.GetIntPropertyFilteredByDomain(s.maxBadBinaryCount),
FailoverCoolDown: dc.GetDurationPropertyFnFilteredByDomain(0 * time.Second),
MinRetentionDays: dc.GetIntPropertyFn(s.minRetentionDays),
MaxBadBinaryCount: dc.GetIntPropertyFilteredByDomain(s.maxBadBinaryCount),
FailoverCoolDown: dc.GetDurationPropertyFnFilteredByDomain(0 * time.Second),
FailoverHistoryMaxSize: dc.GetIntPropertyFilteredByDomain(s.failoverHistoryMaxSize),
}
s.handler = NewHandler(
domainConfig,
Expand All @@ -114,7 +118,7 @@ func (s *domainHandlerGlobalDomainEnabledPrimaryClusterSuite) SetupTest() {
s.mockDomainReplicator,
s.archivalMetadata,
s.mockArchiverProvider,
clock.NewRealTimeSource(),
clock.NewMockedTimeSource(),
).(*handlerImpl)
}

Expand Down Expand Up @@ -820,6 +824,11 @@ func (s *domainHandlerGlobalDomainEnabledPrimaryClusterSuite) TestUpdateGetDomai
})
s.Nil(err)

var failoverHistory []FailoverEvent
failoverHistory = append(failoverHistory, FailoverEvent{EventTime: s.handler.timeSource.Now(), FromCluster: prevActiveClusterName, ToCluster: nextActiveClusterName, FailoverType: common.FailoverType(common.FailoverTypeForce).String()})
failoverHistoryJSON, _ := json.Marshal(failoverHistory)
data[common.DomainDataKeyForFailoverHistory] = string(failoverHistoryJSON)

fnTest := func(info *types.DomainInfo, config *types.DomainConfiguration,
replicationConfig *types.DomainReplicationConfiguration, isGlobalDomain bool, failoverVersion int64) {
s.NotEmpty(info.GetUUID())
Expand Down
31 changes: 20 additions & 11 deletions common/domain/handler_NotMasterCluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package domain

import (
"context"
"encoding/json"
"log"
"os"
"testing"
Expand Down Expand Up @@ -50,13 +51,14 @@ type (
domainHandlerGlobalDomainEnabledNotPrimaryClusterSuite struct {
*persistencetests.TestBase

minRetentionDays int
maxBadBinaryCount int
domainManager persistence.DomainManager
mockProducer *mocks.KafkaProducer
mockDomainReplicator Replicator
archivalMetadata archiver.ArchivalMetadata
mockArchiverProvider *provider.MockArchiverProvider
minRetentionDays int
maxBadBinaryCount int
failoverHistoryMaxSize int
domainManager persistence.DomainManager
mockProducer *mocks.KafkaProducer
mockDomainReplicator Replicator
archivalMetadata archiver.ArchivalMetadata
mockArchiverProvider *provider.MockArchiverProvider

handler *handlerImpl
}
Expand Down Expand Up @@ -89,6 +91,7 @@ func (s *domainHandlerGlobalDomainEnabledNotPrimaryClusterSuite) SetupTest() {
dcCollection := dc.NewCollection(dc.NewNopClient(), logger)
s.minRetentionDays = 1
s.maxBadBinaryCount = 10
s.failoverHistoryMaxSize = 5
s.domainManager = s.TestBase.DomainManager
s.mockProducer = &mocks.KafkaProducer{}
s.mockDomainReplicator = NewDomainReplicator(s.mockProducer, logger)
Expand All @@ -102,9 +105,10 @@ func (s *domainHandlerGlobalDomainEnabledNotPrimaryClusterSuite) SetupTest() {
)
s.mockArchiverProvider = &provider.MockArchiverProvider{}
domainConfig := Config{
MinRetentionDays: dc.GetIntPropertyFn(s.minRetentionDays),
MaxBadBinaryCount: dc.GetIntPropertyFilteredByDomain(s.maxBadBinaryCount),
FailoverCoolDown: dc.GetDurationPropertyFnFilteredByDomain(0 * time.Second),
MinRetentionDays: dc.GetIntPropertyFn(s.minRetentionDays),
MaxBadBinaryCount: dc.GetIntPropertyFilteredByDomain(s.maxBadBinaryCount),
FailoverCoolDown: dc.GetDurationPropertyFnFilteredByDomain(0 * time.Second),
FailoverHistoryMaxSize: dc.GetIntPropertyFilteredByDomain(s.failoverHistoryMaxSize),
}
s.handler = NewHandler(
domainConfig,
Expand All @@ -114,7 +118,7 @@ func (s *domainHandlerGlobalDomainEnabledNotPrimaryClusterSuite) SetupTest() {
s.mockDomainReplicator,
s.archivalMetadata,
s.mockArchiverProvider,
clock.NewRealTimeSource(),
clock.NewMockedTimeSource(),
).(*handlerImpl)
}

Expand Down Expand Up @@ -683,6 +687,11 @@ func (s *domainHandlerGlobalDomainEnabledNotPrimaryClusterSuite) TestUpdateGetDo
})
s.Nil(err)

var failoverHistory []FailoverEvent
failoverHistory = append(failoverHistory, FailoverEvent{EventTime: s.handler.timeSource.Now(), FromCluster: prevActiveClusterName, ToCluster: nextActiveClusterName, FailoverType: common.FailoverType(common.FailoverTypeForce).String()})
failoverHistoryJSON, _ := json.Marshal(failoverHistory)
data[common.DomainDataKeyForFailoverHistory] = string(failoverHistoryJSON)

fnTest := func(info *types.DomainInfo, config *types.DomainConfiguration,
replicationConfig *types.DomainReplicationConfiguration, isGlobalDomain bool, failoverVersion int64) {
s.NotEmpty(info.GetUUID())
Expand Down
23 changes: 13 additions & 10 deletions common/domain/handler_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ type (
domainHandlerCommonSuite struct {
*persistencetests.TestBase

minRetentionDays int
maxBadBinaryCount int
domainManager persistence.DomainManager
mockProducer *mocks.KafkaProducer
mockDomainReplicator Replicator
archivalMetadata archiver.ArchivalMetadata
mockArchiverProvider *provider.MockArchiverProvider
minRetentionDays int
maxBadBinaryCount int
failoverHistoryMaxSize int
domainManager persistence.DomainManager
mockProducer *mocks.KafkaProducer
mockDomainReplicator Replicator
archivalMetadata archiver.ArchivalMetadata
mockArchiverProvider *provider.MockArchiverProvider

handler *handlerImpl
}
Expand Down Expand Up @@ -95,6 +96,7 @@ func (s *domainHandlerCommonSuite) SetupTest() {
dcCollection := dc.NewCollection(dc.NewNopClient(), logger)
s.minRetentionDays = 1
s.maxBadBinaryCount = 10
s.failoverHistoryMaxSize = 5
s.domainManager = s.TestBase.DomainManager
s.mockProducer = &mocks.KafkaProducer{}
s.mockDomainReplicator = NewDomainReplicator(s.mockProducer, logger)
Expand All @@ -108,9 +110,10 @@ func (s *domainHandlerCommonSuite) SetupTest() {
)
s.mockArchiverProvider = &provider.MockArchiverProvider{}
domainConfig := Config{
MinRetentionDays: dc.GetIntPropertyFn(s.minRetentionDays),
MaxBadBinaryCount: dc.GetIntPropertyFilteredByDomain(s.maxBadBinaryCount),
FailoverCoolDown: dc.GetDurationPropertyFnFilteredByDomain(0 * time.Second),
MinRetentionDays: dc.GetIntPropertyFn(s.minRetentionDays),
MaxBadBinaryCount: dc.GetIntPropertyFilteredByDomain(s.maxBadBinaryCount),
FailoverCoolDown: dc.GetDurationPropertyFnFilteredByDomain(0 * time.Second),
FailoverHistoryMaxSize: dc.GetIntPropertyFilteredByDomain(s.failoverHistoryMaxSize),
}
s.handler = NewHandler(
domainConfig,
Expand Down
Loading

0 comments on commit 1e300fc

Please sign in to comment.