Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor persistence manager initialization #6441

Merged
merged 4 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions tools/cli/admin_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func AdminShowWorkflow(c *cli.Context) error {
var history []*persistence.DataBlob
if len(tid) != 0 {
thriftrwEncoder := codec.NewThriftRWEncoder()
histV2, err := initializeHistoryManager(c)
histV2, err := getDeps(c).initializeHistoryManager(c)
if err != nil {
return commoncli.Problem("Error in Admin delete WF: ", err)
}
Expand Down Expand Up @@ -303,12 +303,12 @@ func AdminDeleteWorkflow(c *cli.Context) error {
if err != nil {
return commoncli.Problem("strconv.Atoi(shardID) err", err)
}
histV2, err := initializeHistoryManager(c)
histV2, err := getDeps(c).initializeHistoryManager(c)
defer histV2.Close()
if err != nil {
return commoncli.Problem("Error in Admin delete WF: ", err)
}
exeStore, err := initializeExecutionStore(c, shardIDInt)
exeStore, err := getDeps(c).initializeExecutionStore(c, shardIDInt)
if err != nil {
return commoncli.Problem("Error in Admin delete WF: ", err)
}
Expand Down Expand Up @@ -387,7 +387,7 @@ func AdminGetDomainIDOrName(c *cli.Context) error {
return commoncli.Problem("Need either domainName or domainID", nil)
}

domainManager, err := initializeDomainManager(c)
domainManager, err := getDeps(c).initializeDomainManager(c)
if err != nil {
return commoncli.Problem("Error in Admin delete WF: ", err)
}
Expand Down Expand Up @@ -487,7 +487,7 @@ func AdminDescribeShard(c *cli.Context) error {
if err != nil {
return commoncli.Problem("Error in creating context: ", err)
}
shardManager, err := initializeShardManager(c)
shardManager, err := getDeps(c).initializeShardManager(c)
if err != nil {
return commoncli.Problem("Error in Admin delete WF: ", err)
}
Expand Down Expand Up @@ -516,7 +516,7 @@ func AdminSetShardRangeID(c *cli.Context) error {
if err != nil {
return commoncli.Problem("Error in creating context: ", err)
}
shardManager, err := initializeShardManager(c)
shardManager, err := getDeps(c).initializeShardManager(c)
if err != nil {
return commoncli.Problem("Error in Admin delete WF: ", err)
}
Expand Down
4 changes: 2 additions & 2 deletions tools/cli/admin_db_clean_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,12 @@ func fixExecution(
invariants []executions.InvariantFactory,
execution *store.ScanOutputEntity,
) (invariant.ManagerFixResult, error) {
execManager, err := initializeExecutionStore(c, execution.Execution.(entity.Entity).GetShardID())
execManager, err := getDeps(c).initializeExecutionStore(c, execution.Execution.(entity.Entity).GetShardID())
defer execManager.Close()
if err != nil {
return invariant.ManagerFixResult{}, fmt.Errorf("Error in fix execution: %w", err)
}
historyV2Mgr, err := initializeHistoryManager(c)
historyV2Mgr, err := getDeps(c).initializeHistoryManager(c)
defer historyV2Mgr.Close()
if err != nil {
return invariant.ManagerFixResult{}, fmt.Errorf("Error in fix execution: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions tools/cli/admin_db_scan_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ func checkExecution(
invariants []executions.InvariantFactory,
fetcher executions.ExecutionFetcher,
) (interface{}, invariant.ManagerCheckResult, error) {
execManager, err := initializeExecutionStore(c, common.WorkflowIDToHistoryShard(req.WorkflowID, numberOfShards))
execManager, err := getDeps(c).initializeExecutionStore(c, common.WorkflowIDToHistoryShard(req.WorkflowID, numberOfShards))
defer execManager.Close()
if err != nil {
return nil, invariant.ManagerCheckResult{}, fmt.Errorf("Error in execution check: %w", err)
}
historyV2Mgr, err := initializeHistoryManager(c)
historyV2Mgr, err := getDeps(c).initializeHistoryManager(c)
defer historyV2Mgr.Close()
if err != nil {
return nil, invariant.ManagerCheckResult{}, fmt.Errorf("Error in execution check: %w", err)
Expand Down Expand Up @@ -200,7 +200,7 @@ func listExecutionsByShardID(
outputFile *os.File,
) error {

client, err := initializeExecutionStore(c, shardID)
client, err := getDeps(c).initializeExecutionStore(c, shardID)
defer client.Close()
if err != nil {
commoncli.Problem("Error in Admin DB unsupported WF scan: ", err)
Expand Down
3 changes: 2 additions & 1 deletion tools/cli/admin_timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func NewDBLoadCloser(c *cli.Context) (LoadCloser, error) {
if err != nil {
return nil, fmt.Errorf("error in NewDBLoadCloser: failed to get shard ID: %w", err)
}
executionManager, err := initializeExecutionStore(c, shardID)

executionManager, err := getDeps(c).initializeExecutionStore(c, shardID)
if err != nil {
return nil, fmt.Errorf("error in NewDBLoadCloser: failed to initialize execution store: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion tools/cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewCliApp(cf ClientFactory, opts ...CLIAppOptions) *cli.App {
app.Usage = "A command-line tool for cadence users"
app.Version = version
app.Metadata = map[string]any{
depsKey: &deps{ClientFactory: cf, IOHandler: &defaultIOHandler{app: app}},
depsKey: &deps{ClientFactory: cf, IOHandler: &defaultIOHandler{app: app}, PersistenceManagerFactory: &defaultPersistenceManagerFactory{}},
}
app.Flags = []cli.Flag{
&cli.StringFlag{
Expand Down Expand Up @@ -255,6 +255,7 @@ func getDeps(ctx *cli.Context) cliDeps {
type cliDeps interface {
ClientFactory
IOHandler
PersistenceManagerFactory
}

type IOHandler interface {
Expand Down Expand Up @@ -305,4 +306,5 @@ var _ cliDeps = &deps{}
type deps struct {
ClientFactory
IOHandler
PersistenceManagerFactory
}
48 changes: 30 additions & 18 deletions tools/cli/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination database_mock.go -self_package github.com/uber/cadence/tools/cli

package cli

import (
Expand Down Expand Up @@ -150,20 +152,32 @@ func getDBFlags() []cli.Flag {
}
}

func initializeExecutionStore(c *cli.Context, shardID int) (persistence.ExecutionManager, error) {
factory, err := getPersistenceFactory(c)
type PersistenceManagerFactory interface {
initializeExecutionStore(c *cli.Context, shardID int) (persistence.ExecutionManager, error)
bowenxia marked this conversation as resolved.
Show resolved Hide resolved
initializeHistoryManager(c *cli.Context) (persistence.HistoryManager, error)
initializeShardManager(c *cli.Context) (persistence.ShardManager, error)
initializeDomainManager(c *cli.Context) (persistence.DomainManager, error)
initPersistenceFactory(c *cli.Context) (client.Factory, error)
}

type defaultPersistenceManagerFactory struct {
persistenceFactory client.Factory
}

func (f *defaultPersistenceManagerFactory) initializeExecutionStore(c *cli.Context, shardID int) (persistence.ExecutionManager, error) {
factory, err := f.getPersistenceFactory(c)
if err != nil {
return nil, fmt.Errorf("Failed to get persistence factory: %w", err)
}
historyManager, err := factory.NewExecutionManager(shardID)
executionManager, err := factory.NewExecutionManager(shardID)
if err != nil {
return nil, fmt.Errorf("Failed to initialize history manager %w", err)
}
return historyManager, nil
return executionManager, nil
}

func initializeHistoryManager(c *cli.Context) (persistence.HistoryManager, error) {
factory, err := getPersistenceFactory(c)
func (f *defaultPersistenceManagerFactory) initializeHistoryManager(c *cli.Context) (persistence.HistoryManager, error) {
factory, err := f.getPersistenceFactory(c)
if err != nil {
return nil, fmt.Errorf("Failed to get persistence factory: %w", err)
}
Expand All @@ -174,8 +188,8 @@ func initializeHistoryManager(c *cli.Context) (persistence.HistoryManager, error
return historyManager, nil
}

func initializeShardManager(c *cli.Context) (persistence.ShardManager, error) {
factory, err := getPersistenceFactory(c)
func (f *defaultPersistenceManagerFactory) initializeShardManager(c *cli.Context) (persistence.ShardManager, error) {
factory, err := f.getPersistenceFactory(c)
if err != nil {
return nil, fmt.Errorf("Failed to get persistence factory: %w", err)
}
Expand All @@ -186,8 +200,8 @@ func initializeShardManager(c *cli.Context) (persistence.ShardManager, error) {
return shardManager, nil
}

func initializeDomainManager(c *cli.Context) (persistence.DomainManager, error) {
factory, err := getPersistenceFactory(c)
func (f *defaultPersistenceManagerFactory) initializeDomainManager(c *cli.Context) (persistence.DomainManager, error) {
factory, err := f.getPersistenceFactory(c)
if err != nil {
return nil, fmt.Errorf("Failed to get persistence factory: %w", err)
}
Expand All @@ -198,20 +212,18 @@ func initializeDomainManager(c *cli.Context) (persistence.DomainManager, error)
return domainManager, nil
}

var persistenceFactory client.Factory

func getPersistenceFactory(c *cli.Context) (client.Factory, error) {
func (f *defaultPersistenceManagerFactory) getPersistenceFactory(c *cli.Context) (client.Factory, error) {
var err error
if persistenceFactory == nil {
persistenceFactory, err = initPersistenceFactory(c)
if f.persistenceFactory == nil {
f.persistenceFactory, err = getDeps(c).initPersistenceFactory(c)
if err != nil {
return persistenceFactory, fmt.Errorf("%w", err)
return f.persistenceFactory, fmt.Errorf("%w", err)
}
}
return persistenceFactory, nil
return f.persistenceFactory, nil
}

func initPersistenceFactory(c *cli.Context) (client.Factory, error) {
func (f *defaultPersistenceManagerFactory) initPersistenceFactory(c *cli.Context) (client.Factory, error) {
cfg, err := getDeps(c).ServerConfig(c)

if err != nil {
Expand Down
135 changes: 135 additions & 0 deletions tools/cli/database_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion tools/cli/domain_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func initializeAdminDomainHandler(c *cli.Context) (domain.Handler, error) {
return nil, fmt.Errorf("Error in init admin domain handler: %w", err)
}
clusterMetadata := initializeClusterMetadata(configuration, metricsClient, logger)
metadataMgr, err := initializeDomainManager(c)
metadataMgr, err := getDeps(c).initializeDomainManager(c)
if err != nil {
return nil, fmt.Errorf("Error in init admin domain handler: %w", err)
}
Expand Down
Loading