Skip to content

Commit

Permalink
Refactor persistence manager initialization (#6441)
Browse files Browse the repository at this point in the history
* Refactor persistence manager initialization

* add persistencyManagerFactory mock

* add initPersistenceFactory into persistenceManagerFactory as well

* add persistence factory into defaultPersistenceManagerFactory to avoid global variables
  • Loading branch information
bowenxia authored Oct 29, 2024
1 parent cd6e4f3 commit 2887db9
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 32 deletions.
12 changes: 6 additions & 6 deletions tools/cli/admin_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func AdminShowWorkflow(c *cli.Context) error {
var history []*persistence.DataBlob
if len(tid) != 0 {
thriftrwEncoder := codec.NewThriftRWEncoder()
histV2, err := initializeHistoryManager(c)
histV2, err := getDeps(c).initializeHistoryManager(c)
if err != nil {
return commoncli.Problem("Error in Admin delete WF: ", err)
}
Expand Down Expand Up @@ -303,12 +303,12 @@ func AdminDeleteWorkflow(c *cli.Context) error {
if err != nil {
return commoncli.Problem("strconv.Atoi(shardID) err", err)
}
histV2, err := initializeHistoryManager(c)
histV2, err := getDeps(c).initializeHistoryManager(c)
defer histV2.Close()
if err != nil {
return commoncli.Problem("Error in Admin delete WF: ", err)
}
exeStore, err := initializeExecutionStore(c, shardIDInt)
exeStore, err := getDeps(c).initializeExecutionStore(c, shardIDInt)
if err != nil {
return commoncli.Problem("Error in Admin delete WF: ", err)
}
Expand Down Expand Up @@ -387,7 +387,7 @@ func AdminGetDomainIDOrName(c *cli.Context) error {
return commoncli.Problem("Need either domainName or domainID", nil)
}

domainManager, err := initializeDomainManager(c)
domainManager, err := getDeps(c).initializeDomainManager(c)
if err != nil {
return commoncli.Problem("Error in Admin delete WF: ", err)
}
Expand Down Expand Up @@ -487,7 +487,7 @@ func AdminDescribeShard(c *cli.Context) error {
if err != nil {
return commoncli.Problem("Error in creating context: ", err)
}
shardManager, err := initializeShardManager(c)
shardManager, err := getDeps(c).initializeShardManager(c)
if err != nil {
return commoncli.Problem("Error in Admin delete WF: ", err)
}
Expand Down Expand Up @@ -516,7 +516,7 @@ func AdminSetShardRangeID(c *cli.Context) error {
if err != nil {
return commoncli.Problem("Error in creating context: ", err)
}
shardManager, err := initializeShardManager(c)
shardManager, err := getDeps(c).initializeShardManager(c)
if err != nil {
return commoncli.Problem("Error in Admin delete WF: ", err)
}
Expand Down
4 changes: 2 additions & 2 deletions tools/cli/admin_db_clean_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,12 @@ func fixExecution(
invariants []executions.InvariantFactory,
execution *store.ScanOutputEntity,
) (invariant.ManagerFixResult, error) {
execManager, err := initializeExecutionStore(c, execution.Execution.(entity.Entity).GetShardID())
execManager, err := getDeps(c).initializeExecutionStore(c, execution.Execution.(entity.Entity).GetShardID())
defer execManager.Close()
if err != nil {
return invariant.ManagerFixResult{}, fmt.Errorf("Error in fix execution: %w", err)
}
historyV2Mgr, err := initializeHistoryManager(c)
historyV2Mgr, err := getDeps(c).initializeHistoryManager(c)
defer historyV2Mgr.Close()
if err != nil {
return invariant.ManagerFixResult{}, fmt.Errorf("Error in fix execution: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions tools/cli/admin_db_scan_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ func checkExecution(
invariants []executions.InvariantFactory,
fetcher executions.ExecutionFetcher,
) (interface{}, invariant.ManagerCheckResult, error) {
execManager, err := initializeExecutionStore(c, common.WorkflowIDToHistoryShard(req.WorkflowID, numberOfShards))
execManager, err := getDeps(c).initializeExecutionStore(c, common.WorkflowIDToHistoryShard(req.WorkflowID, numberOfShards))
defer execManager.Close()
if err != nil {
return nil, invariant.ManagerCheckResult{}, fmt.Errorf("Error in execution check: %w", err)
}
historyV2Mgr, err := initializeHistoryManager(c)
historyV2Mgr, err := getDeps(c).initializeHistoryManager(c)
defer historyV2Mgr.Close()
if err != nil {
return nil, invariant.ManagerCheckResult{}, fmt.Errorf("Error in execution check: %w", err)
Expand Down Expand Up @@ -200,7 +200,7 @@ func listExecutionsByShardID(
outputFile *os.File,
) error {

client, err := initializeExecutionStore(c, shardID)
client, err := getDeps(c).initializeExecutionStore(c, shardID)
defer client.Close()
if err != nil {
commoncli.Problem("Error in Admin DB unsupported WF scan: ", err)
Expand Down
3 changes: 2 additions & 1 deletion tools/cli/admin_timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func NewDBLoadCloser(c *cli.Context) (LoadCloser, error) {
if err != nil {
return nil, fmt.Errorf("error in NewDBLoadCloser: failed to get shard ID: %w", err)
}
executionManager, err := initializeExecutionStore(c, shardID)

executionManager, err := getDeps(c).initializeExecutionStore(c, shardID)
if err != nil {
return nil, fmt.Errorf("error in NewDBLoadCloser: failed to initialize execution store: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion tools/cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewCliApp(cf ClientFactory, opts ...CLIAppOptions) *cli.App {
app.Usage = "A command-line tool for cadence users"
app.Version = version
app.Metadata = map[string]any{
depsKey: &deps{ClientFactory: cf, IOHandler: &defaultIOHandler{app: app}},
depsKey: &deps{ClientFactory: cf, IOHandler: &defaultIOHandler{app: app}, PersistenceManagerFactory: &defaultPersistenceManagerFactory{}},
}
app.Flags = []cli.Flag{
&cli.StringFlag{
Expand Down Expand Up @@ -255,6 +255,7 @@ func getDeps(ctx *cli.Context) cliDeps {
type cliDeps interface {
ClientFactory
IOHandler
PersistenceManagerFactory
}

type IOHandler interface {
Expand Down Expand Up @@ -305,4 +306,5 @@ var _ cliDeps = &deps{}
type deps struct {
ClientFactory
IOHandler
PersistenceManagerFactory
}
48 changes: 30 additions & 18 deletions tools/cli/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination database_mock.go -self_package github.com/uber/cadence/tools/cli

package cli

import (
Expand Down Expand Up @@ -150,20 +152,32 @@ func getDBFlags() []cli.Flag {
}
}

func initializeExecutionStore(c *cli.Context, shardID int) (persistence.ExecutionManager, error) {
factory, err := getPersistenceFactory(c)
type PersistenceManagerFactory interface {
initializeExecutionStore(c *cli.Context, shardID int) (persistence.ExecutionManager, error)
initializeHistoryManager(c *cli.Context) (persistence.HistoryManager, error)
initializeShardManager(c *cli.Context) (persistence.ShardManager, error)
initializeDomainManager(c *cli.Context) (persistence.DomainManager, error)
initPersistenceFactory(c *cli.Context) (client.Factory, error)
}

type defaultPersistenceManagerFactory struct {
persistenceFactory client.Factory
}

func (f *defaultPersistenceManagerFactory) initializeExecutionStore(c *cli.Context, shardID int) (persistence.ExecutionManager, error) {
factory, err := f.getPersistenceFactory(c)
if err != nil {
return nil, fmt.Errorf("Failed to get persistence factory: %w", err)
}
historyManager, err := factory.NewExecutionManager(shardID)
executionManager, err := factory.NewExecutionManager(shardID)
if err != nil {
return nil, fmt.Errorf("Failed to initialize history manager %w", err)
}
return historyManager, nil
return executionManager, nil
}

func initializeHistoryManager(c *cli.Context) (persistence.HistoryManager, error) {
factory, err := getPersistenceFactory(c)
func (f *defaultPersistenceManagerFactory) initializeHistoryManager(c *cli.Context) (persistence.HistoryManager, error) {
factory, err := f.getPersistenceFactory(c)
if err != nil {
return nil, fmt.Errorf("Failed to get persistence factory: %w", err)
}
Expand All @@ -174,8 +188,8 @@ func initializeHistoryManager(c *cli.Context) (persistence.HistoryManager, error
return historyManager, nil
}

func initializeShardManager(c *cli.Context) (persistence.ShardManager, error) {
factory, err := getPersistenceFactory(c)
func (f *defaultPersistenceManagerFactory) initializeShardManager(c *cli.Context) (persistence.ShardManager, error) {
factory, err := f.getPersistenceFactory(c)
if err != nil {
return nil, fmt.Errorf("Failed to get persistence factory: %w", err)
}
Expand All @@ -186,8 +200,8 @@ func initializeShardManager(c *cli.Context) (persistence.ShardManager, error) {
return shardManager, nil
}

func initializeDomainManager(c *cli.Context) (persistence.DomainManager, error) {
factory, err := getPersistenceFactory(c)
func (f *defaultPersistenceManagerFactory) initializeDomainManager(c *cli.Context) (persistence.DomainManager, error) {
factory, err := f.getPersistenceFactory(c)
if err != nil {
return nil, fmt.Errorf("Failed to get persistence factory: %w", err)
}
Expand All @@ -198,20 +212,18 @@ func initializeDomainManager(c *cli.Context) (persistence.DomainManager, error)
return domainManager, nil
}

var persistenceFactory client.Factory

func getPersistenceFactory(c *cli.Context) (client.Factory, error) {
func (f *defaultPersistenceManagerFactory) getPersistenceFactory(c *cli.Context) (client.Factory, error) {
var err error
if persistenceFactory == nil {
persistenceFactory, err = initPersistenceFactory(c)
if f.persistenceFactory == nil {
f.persistenceFactory, err = getDeps(c).initPersistenceFactory(c)
if err != nil {
return persistenceFactory, fmt.Errorf("%w", err)
return f.persistenceFactory, fmt.Errorf("%w", err)
}
}
return persistenceFactory, nil
return f.persistenceFactory, nil
}

func initPersistenceFactory(c *cli.Context) (client.Factory, error) {
func (f *defaultPersistenceManagerFactory) initPersistenceFactory(c *cli.Context) (client.Factory, error) {
cfg, err := getDeps(c).ServerConfig(c)

if err != nil {
Expand Down
135 changes: 135 additions & 0 deletions tools/cli/database_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion tools/cli/domain_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func initializeAdminDomainHandler(c *cli.Context) (domain.Handler, error) {
return nil, fmt.Errorf("Error in init admin domain handler: %w", err)
}
clusterMetadata := initializeClusterMetadata(configuration, metricsClient, logger)
metadataMgr, err := initializeDomainManager(c)
metadataMgr, err := getDeps(c).initializeDomainManager(c)
if err != nil {
return nil, fmt.Errorf("Error in init admin domain handler: %w", err)
}
Expand Down

0 comments on commit 2887db9

Please sign in to comment.