Skip to content

Commit

Permalink
Add support rpc JSON replay.
Browse files Browse the repository at this point in the history
Add realtime replay of captured JSON data.

Example:
```
λ mc support top rpc --json myminio >rpc.json

λ mc support top rpc -in=rpc.json
```
  • Loading branch information
klauspost committed Oct 18, 2024
1 parent 2c4e3ad commit 8fe9844
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 28 deletions.
36 changes: 17 additions & 19 deletions cmd/admin-scanner-status.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/charmbracelet/bubbles/spinner"
tea "github.com/charmbracelet/bubbletea"
"github.com/charmbracelet/lipgloss"
humanize "github.com/dustin/go-humanize"
"github.com/dustin/go-humanize"
"github.com/fatih/color"
"github.com/minio/cli"
json "github.com/minio/colorjson"
Expand Down Expand Up @@ -64,9 +64,8 @@ var adminScannerInfoFlags = []cli.Flag{
Value: -1,
},
cli.StringFlag{
Name: "in",
Hidden: true,
Usage: "read previously saved json from file and replay",
Name: "in",
Usage: "read previously saved json from file and replay",
},
cli.StringFlag{
Name: "bucket",
Expand Down Expand Up @@ -198,19 +197,6 @@ func mainAdminScannerInfo(ctx *cli.Context) error {

checkAdminScannerInfoSyntax(ctx)

aliasedURL := ctx.Args().Get(0)

// Create a new MinIO Admin Client
client, err := newAdminClient(aliasedURL)
fatalIf(err.Trace(aliasedURL), "Unable to initialize admin client.")

if bucket := ctx.String("bucket"); bucket != "" {
bucketStats, err := client.BucketScanInfo(globalContext, bucket)
fatalIf(probe.NewError(err).Trace(aliasedURL), "Unable to get bucket stats.")
printMsg(bucketScanMsg{Stats: bucketStats})
return nil
}

ui := tea.NewProgram(initScannerMetricsUI(ctx.Int("max-paths")))
ctxt, cancel := context.WithCancel(globalContext)
defer cancel()
Expand All @@ -220,11 +206,11 @@ func mainAdminScannerInfo(ctx *cli.Context) error {
go func() {
if _, e := ui.Run(); e != nil {
cancel()
fatalIf(probe.NewError(e).Trace(aliasedURL), "Unable to fetch scanner metrics")
fatalIf(probe.NewError(e), "Unable to fetch scanner metrics")
}
}()
f, e := os.Open(inFile)
fatalIf(probe.NewError(e).Trace(aliasedURL), "Unable to open input")
fatalIf(probe.NewError(e), "Unable to open input")
sc := bufio.NewReader(f)
var lastTime time.Time
for {
Expand All @@ -250,6 +236,18 @@ func mainAdminScannerInfo(ctx *cli.Context) error {
os.Exit(0)
}

// Create a new MinIO Admin Client
aliasedURL := ctx.Args().Get(0)
client, err := newAdminClient(aliasedURL)
fatalIf(err.Trace(aliasedURL), "Unable to initialize admin client.")

if bucket := ctx.String("bucket"); bucket != "" {
bucketStats, err := client.BucketScanInfo(globalContext, bucket)
fatalIf(probe.NewError(err).Trace(aliasedURL), "Unable to get bucket stats.")
printMsg(bucketScanMsg{Stats: bucketStats})
return nil
}

opts := madmin.MetricsOptions{
Type: madmin.MetricsScanner,
N: ctx.Int("n"),
Expand Down
64 changes: 55 additions & 9 deletions cmd/support-top-rcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package cmd

import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"sort"
"strings"
"time"
Expand All @@ -29,6 +32,7 @@ import (
tea "github.com/charmbracelet/bubbletea"
"github.com/charmbracelet/lipgloss"
"github.com/minio/cli"
json "github.com/minio/colorjson"
"github.com/minio/madmin-go/v3"
"github.com/minio/mc/pkg/probe"
"github.com/olekukonko/tablewriter"
Expand All @@ -49,6 +53,10 @@ var supportTopRPCFlags = []cli.Flag{
Usage: "number of requests to run before exiting. 0 for endless (default)",
Value: 0,
},
cli.StringFlag{
Name: "in",
Usage: "read previously saved json from file and replay",
},
}

var supportTopRPCCmd = cli.Command{
Expand Down Expand Up @@ -77,6 +85,9 @@ EXAMPLES:

// checkSupportTopNetSyntax - validate all the passed arguments
func checkSupportTopRPCSyntax(ctx *cli.Context) {
if ctx.String("in") != "" {
return
}
if len(ctx.Args()) == 0 || len(ctx.Args()) > 1 {
showCommandHelpAndExit(ctx, 1) // last argument is exit code
}
Expand All @@ -85,6 +96,45 @@ func checkSupportTopRPCSyntax(ctx *cli.Context) {
func mainSupportTopRPC(ctx *cli.Context) error {
checkSupportTopRPCSyntax(ctx)

ui := tea.NewProgram(initTopRPCUI())
ctxt, cancel := context.WithCancel(globalContext)
defer cancel()

// Replay from file.
if inFile := ctx.String("in"); inFile != "" {
go func() {
if _, e := ui.Run(); e != nil {
cancel()
fatalIf(probe.NewError(e), "Unable to fetch scanner metrics")
}
}()
f, e := os.Open(inFile)
fatalIf(probe.NewError(e), "Unable to open input")
sc := bufio.NewReader(f)
var lastTime time.Time
for {
b, e := sc.ReadBytes('\n')
if e == io.EOF {
break
}
var metrics madmin.RealtimeMetrics
e = json.Unmarshal(b, &metrics)
if e != nil || metrics.Aggregated.RPC == nil {
continue
}
delay := metrics.Aggregated.RPC.CollectedAt.Sub(lastTime)
if !lastTime.IsZero() && delay > 0 {
if delay > 3*time.Second {
delay = 3 * time.Second
}
time.Sleep(delay)
}
ui.Send(metrics)
lastTime = metrics.Aggregated.RPC.CollectedAt
}
os.Exit(0)
}

aliasedURL := ctx.Args().Get(0)
alias, _ := url2Alias(aliasedURL)
validateClusterRegistered(alias, false)
Expand All @@ -96,9 +146,6 @@ func mainSupportTopRPC(ctx *cli.Context) error {
return nil
}

ctxt, cancel := context.WithCancel(globalContext)
defer cancel()

// MetricsOptions are options provided to Metrics call.
opts := madmin.MetricsOptions{
Type: madmin.MetricsRPC,
Expand All @@ -116,20 +163,19 @@ func mainSupportTopRPC(ctx *cli.Context) error {
}
return nil
}
p := tea.NewProgram(initTopRPCUI())
go func() {
out := func(m madmin.RealtimeMetrics) {
p.Send(m)
ui.Send(m)
}

e := client.Metrics(ctxt, opts, out)
if e != nil {
fatalIf(probe.NewError(e), "Unable to fetch top net events")
}
p.Quit()
ui.Quit()
}()

if _, e := p.Run(); e != nil {
if _, e := ui.Run(); e != nil {
cancel()
fatalIf(probe.NewError(e).Trace(aliasedURL), "Unable to fetch top net events")
}
Expand Down Expand Up @@ -242,7 +288,7 @@ func (m *topRPCUI) View() string {
fmt.Sprintf(" To %s", host),
fmt.Sprintf("%d", v.Connected),
fmt.Sprintf("%0.1fms", v.LastPingMS),
fmt.Sprintf("%ds ago", time.Since(v.LastPongTime)/time.Second),
fmt.Sprintf("%ds ago", v.CollectedAt.Sub(v.LastPongTime)/time.Second),
fmt.Sprintf("%d", v.OutQueue),
fmt.Sprintf("%d", v.ReconnectCount),
fmt.Sprintf("->%d", v.IncomingStreams),
Expand All @@ -256,7 +302,7 @@ func (m *topRPCUI) View() string {
fmt.Sprintf("From %s", host),
fmt.Sprintf("%d", v.Connected),
fmt.Sprintf("%0.1fms", v.LastPingMS),
fmt.Sprintf("%ds ago", time.Since(v.LastPongTime)/time.Second),
fmt.Sprintf("%ds ago", v.CollectedAt.Sub(v.LastPongTime)/time.Second),
fmt.Sprintf("%d", v.OutQueue),
fmt.Sprintf("%d", v.ReconnectCount),
fmt.Sprintf("->%d", v.IncomingStreams),
Expand Down

0 comments on commit 8fe9844

Please sign in to comment.