Skip to content

Commit

Permalink
address comments and use errorexithandler to avoid duplicate printing
Browse files Browse the repository at this point in the history
  • Loading branch information
shijiesheng committed Sep 25, 2024
1 parent 8cc237f commit cb0fbfa
Show file tree
Hide file tree
Showing 25 changed files with 252 additions and 237 deletions.
6 changes: 3 additions & 3 deletions tools/cli/admin_async_queue_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func AdminGetAsyncWFConfig(c *cli.Context) error {

resp, err := adminClient.GetDomainAsyncWorkflowConfiguraton(ctx, req)
if err != nil {
return ErrorAndPrint("Failed to get async wf queue config", err)
return PrintableError("Failed to get async wf queue config", err)
}

if resp == nil || resp.Configuration == nil {
Expand All @@ -67,7 +67,7 @@ func AdminUpdateAsyncWFConfig(c *cli.Context) error {
var cfg types.AsyncWorkflowConfiguration
err := json.Unmarshal([]byte(asyncWFCfgJSON), &cfg)
if err != nil {
return ErrorAndPrint("Failed to parse async workflow config", err)
return PrintableError("Failed to parse async workflow config", err)
}

ctx, cancel := newContext(c)
Expand All @@ -80,7 +80,7 @@ func AdminUpdateAsyncWFConfig(c *cli.Context) error {

_, err = adminClient.UpdateDomainAsyncWorkflowConfiguraton(ctx, req)
if err != nil {
return ErrorAndPrint("Failed to update async workflow queue config", err)
return PrintableError("Failed to update async workflow queue config", err)
}

fmt.Printf("Successfully updated async workflow queue config for domain %s\n", domainName)
Expand Down
14 changes: 7 additions & 7 deletions tools/cli/admin_cluster_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ var promptFn = prompt
func AdminAddSearchAttribute(c *cli.Context) error {
key := getRequiredOption(c, FlagSearchAttributesKey)
if err := visibility.ValidateSearchAttributeKey(key); err != nil {
return ErrorAndPrint("Invalid search-attribute key.", err)
return PrintableError("Invalid search-attribute key.", err)
}

valType := getRequiredIntOption(c, FlagSearchAttributesType)
if !isValueTypeValid(valType) {
return ErrorAndPrint("Unknown Search Attributes value type.", nil)
return PrintableError("Unknown Search Attributes value type.", nil)
}

// ask user for confirmation
Expand All @@ -66,7 +66,7 @@ func AdminAddSearchAttribute(c *cli.Context) error {

err := adminClient.AddSearchAttribute(ctx, request)
if err != nil {
return ErrorAndPrint("Add search attribute failed.", err)
return PrintableError("Add search attribute failed.", err)
}
fmt.Println("Success. Note that for a multil-node Cadence cluster, DynamicConfig MUST be updated separately to whitelist the new attributes.")
return nil
Expand All @@ -80,7 +80,7 @@ func AdminDescribeCluster(c *cli.Context) error {
defer cancel()
response, err := adminClient.DescribeCluster(ctx)
if err != nil {
return ErrorAndPrint("Operation DescribeCluster failed.", err)
return PrintableError("Operation DescribeCluster failed.", err)
}

prettyPrintJSONObject(response)
Expand All @@ -99,13 +99,13 @@ func AdminRebalanceStart(c *cli.Context) error {
}
input, err := json.Marshal(rbParams)
if err != nil {
return ErrorAndPrint("Failed to serialize params for failover workflow", err)
return PrintableError("Failed to serialize params for failover workflow", err)
}
memo, err := getWorkflowMemo(map[string]interface{}{
common.MemoKeyForOperator: getOperator(),
})
if err != nil {
return ErrorAndPrint("Failed to serialize memo", err)
return PrintableError("Failed to serialize memo", err)
}
request := &types.StartWorkflowExecutionRequest{
Domain: common.SystemLocalDomainName,
Expand All @@ -127,7 +127,7 @@ func AdminRebalanceStart(c *cli.Context) error {

resp, err := client.StartWorkflowExecution(tcCtx, request)
if err != nil {
return ErrorAndPrint("Failed to start failover workflow", err)
return PrintableError("Failed to start failover workflow", err)
}
fmt.Println("Rebalance workflow started")
fmt.Println("wid: " + workflowID)
Expand Down
70 changes: 35 additions & 35 deletions tools/cli/admin_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func AdminShowWorkflow(c *cli.Context) error {
BranchID: &bid,
})
if err != nil {
return ErrorAndPrint("encoding branch token err", err)
return PrintableError("encoding branch token err", err)
}
resp, err := histV2.ReadRawHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
BranchToken: branchToken,
Expand All @@ -74,16 +74,16 @@ func AdminShowWorkflow(c *cli.Context) error {
DomainName: domainName,
})
if err != nil {
return ErrorAndPrint("ReadHistoryBranch err", err)
return PrintableError("ReadHistoryBranch err", err)
}

history = resp.HistoryEventBlobs
} else {
return ErrorAndPrint("need to specify TreeID/BranchID/ShardID", nil)
return PrintableError("need to specify TreeID/BranchID/ShardID", nil)
}

if len(history) == 0 {
return ErrorAndPrint("no events", nil)
return PrintableError("no events", nil)
}
allEvents := &shared.History{}
totalSize := 0
Expand All @@ -92,14 +92,14 @@ func AdminShowWorkflow(c *cli.Context) error {
fmt.Printf("======== batch %v, blob len: %v ======\n", idx+1, len(b.Data))
internalHistoryBatch, err := serializer.DeserializeBatchEvents(b)
if err != nil {
return ErrorAndPrint("DeserializeBatchEvents err", err)
return PrintableError("DeserializeBatchEvents err", err)
}
historyBatch := thrift.FromHistoryEventArray(internalHistoryBatch)
allEvents.Events = append(allEvents.Events, historyBatch...)
for _, e := range historyBatch {
jsonstr, err := json.Marshal(e)
if err != nil {
return ErrorAndPrint("json.Marshal err", err)
return PrintableError("json.Marshal err", err)
}
fmt.Println(string(jsonstr))
}
Expand All @@ -109,10 +109,10 @@ func AdminShowWorkflow(c *cli.Context) error {
if outputFileName != "" {
data, err := json.Marshal(allEvents.Events)
if err != nil {
return ErrorAndPrint("Failed to serialize history data.", err)
return PrintableError("Failed to serialize history data.", err)
}
if err := ioutil.WriteFile(outputFileName, data, 0666); err != nil {
return ErrorAndPrint("Failed to export history data file.", err)
return PrintableError("Failed to export history data file.", err)
}
}
return nil
Expand All @@ -132,14 +132,14 @@ func AdminDescribeWorkflow(c *cli.Context) error {
ms := persistence.WorkflowMutableState{}
err := json.Unmarshal([]byte(msStr), &ms)
if err != nil {
return ErrorAndPrint("json.Unmarshal err", err)
return PrintableError("json.Unmarshal err", err)
}
currentBranchToken := ms.ExecutionInfo.BranchToken
if ms.VersionHistories != nil {
// if VersionHistories is set, then all branch infos are stored in VersionHistories
currentVersionHistory, err := ms.VersionHistories.GetCurrentVersionHistory()
if err != nil {
return ErrorAndPrint("ms.VersionHistories.GetCurrentVersionHistory err", err)
return PrintableError("ms.VersionHistories.GetCurrentVersionHistory err", err)
}
currentBranchToken = currentVersionHistory.GetBranchToken()
}
Expand All @@ -148,7 +148,7 @@ func AdminDescribeWorkflow(c *cli.Context) error {
thriftrwEncoder := codec.NewThriftRWEncoder()
err = thriftrwEncoder.Decode(currentBranchToken, &branchInfo)
if err != nil {
return ErrorAndPrint("thriftrwEncoder.Decode err", err)
return PrintableError("thriftrwEncoder.Decode err", err)
}
prettyPrintJSONObject(branchInfo)
if ms.ExecutionInfo.AutoResetPoints != nil {
Expand Down Expand Up @@ -184,7 +184,7 @@ func describeMutableState(c *cli.Context) (*types.AdminDescribeWorkflowExecution
},
)
if err != nil {
return nil, ErrorAndPrint("Get workflow mutableState failed", err)
return nil, PrintableError("Get workflow mutableState failed", err)
}
return resp, nil
}
Expand All @@ -210,7 +210,7 @@ func AdminMaintainCorruptWorkflow(c *cli.Context) error {
defer cancel()
_, err := adminClient.MaintainCorruptWorkflow(ctx, request)
if err != nil {
return ErrorAndPrint("Operation AdminMaintainCorruptWorkflow failed.", err)
return PrintableError("Operation AdminMaintainCorruptWorkflow failed.", err)
}

return err
Expand Down Expand Up @@ -243,7 +243,7 @@ func AdminDeleteWorkflow(c *cli.Context) error {
}
_, err := adminClient.DeleteWorkflow(ctx, request)
if err != nil {
return ErrorAndPrint("Operation AdminMaintainCorruptWorkflow failed.", err)
return PrintableError("Operation AdminMaintainCorruptWorkflow failed.", err)
}
return nil
}
Expand All @@ -256,14 +256,14 @@ func AdminDeleteWorkflow(c *cli.Context) error {
ms := persistence.WorkflowMutableState{}
err = json.Unmarshal([]byte(msStr), &ms)
if err != nil {
return ErrorAndPrint("json.Unmarshal err", err)
return PrintableError("json.Unmarshal err", err)
}
domainID := ms.ExecutionInfo.DomainID

shardID := resp.GetShardID()
shardIDInt, err := strconv.Atoi(shardID)
if err != nil {
return ErrorAndPrint("strconv.Atoi(shardID) err", err)
return PrintableError("strconv.Atoi(shardID) err", err)
}
histV2 := initializeHistoryManager(c)
defer histV2.Close()
Expand All @@ -283,7 +283,7 @@ func AdminDeleteWorkflow(c *cli.Context) error {
for _, branchToken := range branchTokens {
err = thriftrwEncoder.Decode(branchToken, &branchInfo)
if err != nil {
return ErrorAndPrint("thriftrwEncoder.Decode err", err)
return PrintableError("thriftrwEncoder.Decode err", err)
}
fmt.Println("deleting history events for ...")
prettyPrintJSONObject(branchInfo)
Expand All @@ -296,7 +296,7 @@ func AdminDeleteWorkflow(c *cli.Context) error {
if skipError {
fmt.Println("failed to delete history, ", err)
} else {
return ErrorAndPrint("DeleteHistoryBranch err", err)
return PrintableError("DeleteHistoryBranch err", err)
}
}
}
Expand All @@ -313,7 +313,7 @@ func AdminDeleteWorkflow(c *cli.Context) error {
if skipError {
fmt.Println("delete mutableState row failed, ", err)
} else {
return ErrorAndPrint("delete mutableState row failed", err)
return PrintableError("delete mutableState row failed", err)
}
}
fmt.Println("delete mutableState row successfully")
Expand All @@ -329,7 +329,7 @@ func AdminDeleteWorkflow(c *cli.Context) error {
if skipError {
fmt.Println("delete current row failed, ", err)
} else {
return ErrorAndPrint("delete current row failed", err)
return PrintableError("delete current row failed", err)
}
}
fmt.Println("delete current row successfully")
Expand All @@ -341,7 +341,7 @@ func AdminGetDomainIDOrName(c *cli.Context) error {
domainID := c.String(FlagDomainID)
domainName := c.String(FlagDomain)
if len(domainID) == 0 && len(domainName) == 0 {
return ErrorAndPrint("Need either domainName or domainID", nil)
return PrintableError("Need either domainName or domainID", nil)
}

domainManager := initializeDomainManager(c)
Expand All @@ -351,13 +351,13 @@ func AdminGetDomainIDOrName(c *cli.Context) error {
if len(domainID) > 0 {
domain, err := domainManager.GetDomain(ctx, &persistence.GetDomainRequest{ID: domainID})
if err != nil {
return ErrorAndPrint("SelectDomain error", err)
return PrintableError("SelectDomain error", err)
}
fmt.Printf("domainName for domainID %v is %v \n", domainID, domain.Info.Name)
} else {
domain, err := domainManager.GetDomain(ctx, &persistence.GetDomainRequest{Name: domainName})
if err != nil {
return ErrorAndPrint("SelectDomain error", err)
return PrintableError("SelectDomain error", err)
}
fmt.Printf("domainID for domainName %v is %v \n", domain.Info.ID, domainID)
}
Expand All @@ -370,7 +370,7 @@ func AdminGetShardID(c *cli.Context) error {
numberOfShards := c.Int(FlagNumberOfShards)

if numberOfShards <= 0 {
return ErrorAndPrint("numberOfShards is required", nil)
return PrintableError("numberOfShards is required", nil)
}
shardID := common.WorkflowIDToHistoryShard(wid, numberOfShards)
fmt.Printf("ShardID for workflowID: %v is %v \n", wid, shardID)
Expand Down Expand Up @@ -403,7 +403,7 @@ func AdminRemoveTask(c *cli.Context) error {

err := adminClient.RemoveTask(ctx, req)
if err != nil {
return ErrorAndPrint("Remove task has failed", err)
return PrintableError("Remove task has failed", err)
}
return nil
}
Expand All @@ -419,7 +419,7 @@ func AdminDescribeShard(c *cli.Context) error {
getShardReq := &persistence.GetShardRequest{ShardID: sid}
shard, err := shardManager.GetShard(ctx, getShardReq)
if err != nil {
return ErrorAndPrint("Failed to describe shard.", err)
return PrintableError("Failed to describe shard.", err)
}

prettyPrintJSONObject(shard)
Expand All @@ -437,7 +437,7 @@ func AdminSetShardRangeID(c *cli.Context) error {

getShardResp, err := shardManager.GetShard(ctx, &persistence.GetShardRequest{ShardID: sid})
if err != nil {
return ErrorAndPrint("Failed to get shardInfo.", err)
return PrintableError("Failed to get shardInfo.", err)
}

previousRangeID := getShardResp.ShardInfo.RangeID
Expand All @@ -451,7 +451,7 @@ func AdminSetShardRangeID(c *cli.Context) error {
PreviousRangeID: previousRangeID,
ShardInfo: updatedShardInfo,
}); err != nil {
return ErrorAndPrint("Failed to reset shard rangeID.", err)
return PrintableError("Failed to reset shard rangeID.", err)
}

fmt.Printf("Successfully updated rangeID from %v to %v for shard %v.\n", previousRangeID, rid, sid)
Expand All @@ -471,7 +471,7 @@ func AdminCloseShard(c *cli.Context) error {

err := adminClient.CloseShard(ctx, req)
if err != nil {
return ErrorAndPrint("Close shard task has failed", err)
return PrintableError("Close shard task has failed", err)
}
return nil
}
Expand All @@ -495,7 +495,7 @@ func AdminDescribeShardDistribution(c *cli.Context) error {

resp, err := adminClient.DescribeShardDistribution(ctx, req)
if err != nil {
return ErrorAndPrint("Shard list failed", err)
return PrintableError("Shard list failed", err)
}

fmt.Printf("Total Number of Shards: %d \n", resp.NumberOfShards)
Expand Down Expand Up @@ -536,7 +536,7 @@ func AdminDescribeHistoryHost(c *cli.Context) error {
printFully := c.Bool(FlagPrintFullyDetail)

if len(wid) == 0 && !c.IsSet(FlagShardID) && len(addr) == 0 {
return ErrorAndPrint("at least one of them is required to provide to lookup host: workflowID, shardID and host address", nil)
return PrintableError("at least one of them is required to provide to lookup host: workflowID, shardID and host address", nil)
}

ctx, cancel := newContext(c)
Expand All @@ -555,7 +555,7 @@ func AdminDescribeHistoryHost(c *cli.Context) error {

resp, err := adminClient.DescribeHistoryHost(ctx, req)
if err != nil {
return ErrorAndPrint("Describe history host failed", err)
return PrintableError("Describe history host failed", err)
}

if !printFully {
Expand Down Expand Up @@ -584,7 +584,7 @@ func AdminRefreshWorkflowTasks(c *cli.Context) error {
},
})
if err != nil {
return ErrorAndPrint("Refresh workflow task failed", err)
return PrintableError("Refresh workflow task failed", err)
}
fmt.Println("Refresh workflow task succeeded.")
return nil
Expand All @@ -609,7 +609,7 @@ func AdminResetQueue(c *cli.Context) error {

err := adminClient.ResetQueue(ctx, req)
if err != nil {
return ErrorAndPrint("Failed to reset queue", err)
return PrintableError("Failed to reset queue", err)
}
fmt.Println("Reset queue state succeeded")
return nil
Expand All @@ -634,7 +634,7 @@ func AdminDescribeQueue(c *cli.Context) error {

resp, err := adminClient.DescribeQueue(ctx, req)
if err != nil {
return ErrorAndPrint("Failed to describe queue", err)
return PrintableError("Failed to describe queue", err)
}

for _, state := range resp.ProcessingQueueStates {
Expand Down
Loading

0 comments on commit cb0fbfa

Please sign in to comment.