Skip to content

Commit

Permalink
feat(pkg/process): initial commit, fix dmesg scan on non-root (#26)
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho authored Aug 23, 2024
1 parent 867570c commit 44175c0
Show file tree
Hide file tree
Showing 9 changed files with 836 additions and 136 deletions.
65 changes: 21 additions & 44 deletions components/diagnose/diagnose.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
Expand All @@ -15,6 +14,7 @@ import (
query_log_filter "github.com/leptonai/gpud/components/query/log/filter"
query_log_tail "github.com/leptonai/gpud/components/query/log/tail"
"github.com/leptonai/gpud/pkg/host"
"github.com/leptonai/gpud/pkg/process"

"sigs.k8s.io/yaml"
)
Expand Down Expand Up @@ -469,25 +469,6 @@ func (o *output) runCommand(ctx context.Context, subDir string, args ...string)
return nil
}

bashFile, err := os.CreateTemp(os.TempDir(), "tmpbash*.bash")
if err != nil {
return err
}
defer os.RemoveAll(bashFile.Name())

if _, err := bashFile.WriteString(bashScriptHeader); err != nil {
return err
}
if _, err := bashFile.WriteString(fmt.Sprintf("%s\n", strings.Join(args, " "))); err != nil {
return err
}
if err := bashFile.Sync(); err != nil {
return err
}
if err := bashFile.Close(); err != nil {
return err
}

fileName := strings.Join(args, "-")
fileName = strings.ReplaceAll(fileName, "*", "_matchall")
fileName = strings.ReplaceAll(fileName, " ", "_")
Expand All @@ -506,31 +487,27 @@ func (o *output) runCommand(ctx context.Context, subDir string, args ...string)
}
defer f.Close()

fmt.Printf("%s running %s\n", inProgress, strings.Join(args, " "))
cmd := exec.CommandContext(ctx, "bash", bashFile.Name())
cmd.Stdout = f
cmd.Stderr = f

if cerr := cmd.Run(); cerr != nil {
o.Results = append(o.Results, CommandResult{
Command: strings.Join(args, " "),
Error: cerr.Error(),
})
p, err := process.New([][]string{args}, process.WithRunAsBashScript(), process.WithOutputFile(f))
if err != nil {
return err
}
if serr := p.Start(ctx); serr != nil {
return serr
}
select {
case <-ctx.Done():
return ctx.Err()
case err := <-p.Wait():
if err != nil {
o.Results = append(o.Results, CommandResult{
Command: strings.Join(args, " "),
Error: err.Error(),
})
}
}
if err := p.Stop(ctx); err != nil {
return err
}

return nil
}

const bashScriptHeader = `
#!/bin/bash
# do not mask errors in a pipeline
set -o pipefail
# treat unset variables as an error
set -o nounset
# exit script whenever it errs
set -o errexit
`
5 changes: 3 additions & 2 deletions components/dmesg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ func DefaultConfig() Config {
BufferSize: query_log_config.DefaultBufferSize,

Commands: [][]string{
{"dmesg", "--ctime", "--nopager", "--buffer-size", "163920", "-w"},
{"dmesg", "--ctime", "--nopager", "--buffer-size", "163920", "-W"},
// run last commands as fallback, in case dmesg flag only works in some machines
{"dmesg --ctime --nopager --buffer-size 163920 -w || true"},
{"dmesg --ctime --nopager --buffer-size 163920 -W"},
},

Scan: &query_log_config.Scan{
Expand Down
37 changes: 0 additions & 37 deletions components/query/log/tail/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package tail

import (
"errors"
"io"
"strings"
"time"

query_log_filter "github.com/leptonai/gpud/components/query/log/filter"
Expand Down Expand Up @@ -77,41 +75,6 @@ func WithCommands(commands [][]string) OpOption {
}
}

const bashScriptHeader = `
#!/bin/bash
# do not mask errors in a pipeline
set -o pipefail
# treat unset variables as an error
set -o nounset
# exit script whenever it errs
set -o errexit
`

func (op *Op) writeCommands(w io.Writer) error {
if _, err := w.Write([]byte(bashScriptHeader)); err != nil {
return err
}
for i, args := range op.commands {
if _, err := w.Write([]byte(strings.Join(args, " "))); err != nil {
return err
}
if i < len(op.commands)-1 {
// run last commands as fallback, in case dmesg flag only works in some machines
if _, err := w.Write([]byte(" || true")); err != nil {
return err
}
}
if _, err := w.Write([]byte("\n")); err != nil {
return err
}
}
return nil
}

// Sets the number of lines to tail.
// If not set, defaults to 100.
func WithLinesToTail(n int) OpOption {
Expand Down
25 changes: 15 additions & 10 deletions components/query/log/tail/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ package tail
import (
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"strings"

"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/process"
)

// Scan scans the file or commands output from the end of the file
Expand Down Expand Up @@ -37,20 +35,27 @@ func Scan(ctx context.Context, opts ...OpOption) (int, error) {
file = f.Name()

log.Logger.Debugw("writing commands to file to scan", "commands", op.commands)
if err := op.writeCommands(f); err != nil {
return 0, err
}
out, err := exec.CommandContext(ctx, "bash", file).Output()
p, err := process.New(op.commands, process.WithRunAsBashScript(), process.WithOutputFile(f))
if err != nil {
log.Logger.Debugw("failed to execute command", "error", err, "command", strings.Join(op.commands[0], " "), "output", string(out))
return 0, fmt.Errorf("failed to execute command: %w (%s)", err, strings.Join(op.commands[0], " "))
return 0, err
}
if _, err := f.Write(out); err != nil {
if err := p.Start(ctx); err != nil {
return 0, err
}
select {
case <-ctx.Done():
return 0, ctx.Err()
case err := <-p.Wait():
if err != nil {
return 0, err
}
}
if err := f.Sync(); err != nil {
return 0, err
}
if err := p.Stop(ctx); err != nil {
return 0, err
}
}

f, err := os.Open(file)
Expand Down
59 changes: 18 additions & 41 deletions components/query/log/tail/streamer_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package tail
import (
"bufio"
"context"
"os"
"os/exec"
"fmt"
"time"

query_log_filter "github.com/leptonai/gpud/components/query/log/filter"
"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/process"

"github.com/nxadm/tail"
)
Expand All @@ -21,38 +21,29 @@ func NewFromCommand(ctx context.Context, commands [][]string, opts ...OpOption)
return nil, err
}

f, err := os.CreateTemp(os.TempDir(), "streamer-from-command*.txt")
p, err := process.New(op.commands, process.WithRunAsBashScript())
if err != nil {
return nil, err
}
file := f.Name()

if err := op.writeCommands(f); err != nil {
return nil, err
}

cmd := exec.CommandContext(ctx, "bash", file)

stdout, err := cmd.StdoutPipe()
if err != nil {
if err := p.Start(ctx); err != nil {
return nil, err
}
stdoutScanner := bufio.NewScanner(stdout)

stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-p.Wait():
return nil, fmt.Errorf("command exited unexpectedly: %w", err)
case <-time.After(50 * time.Millisecond):
}
stderrScanner := bufio.NewScanner(stderr)

if err := cmd.Start(); err != nil {
return nil, err
}
stdoutScanner := bufio.NewScanner(p.StdoutReader())
stderrScanner := bufio.NewScanner(p.StderrReader())

streamer := &commandStreamer{
op: op,
ctx: ctx,
cmd: cmd,
proc: p,
lineC: make(chan Line, 200),
}
go streamer.pollLoops(stdoutScanner)
Expand All @@ -67,7 +58,7 @@ var _ Streamer = (*commandStreamer)(nil)
type commandStreamer struct {
op *Op
ctx context.Context
cmd *exec.Cmd
proc process.Process
lineC chan Line
}

Expand Down Expand Up @@ -127,29 +118,15 @@ func (sr *commandStreamer) pollLoops(scanner *bufio.Scanner) {
MatchedFilter: matchedFilter,
}:
default:
log.Logger.Debugw("channel is full -- dropped output", "cmd", sr.cmd.String())
log.Logger.Debugw("channel is full -- dropped output", "pid", sr.proc.PID())
}
}
}

func (sr *commandStreamer) waitCommand() {
defer close(sr.lineC)

if err := sr.cmd.Wait(); err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
if exitErr.ExitCode() == -1 {
if sr.ctx.Err() != nil {
log.Logger.Debugw("command was terminated (exit code -1) by the root context cancellation", "cmd", sr.cmd.String(), "contextError", sr.ctx.Err())
} else {
log.Logger.Warnw("command was terminated (exit code -1) for unknown reasons", "cmd", sr.cmd.String())
}
} else {
log.Logger.Warnw("command exited with non-zero status", "error", err, "cmd", sr.cmd.String(), "exitCode", exitErr.ExitCode())
}
} else {
log.Logger.Warnw("error waiting for command to finish", "error", err, "cmd", sr.cmd.String())
}
} else {
log.Logger.Debugw("command completed successfully")
select {
case <-sr.ctx.Done():
case <-sr.proc.Wait():
}
}
10 changes: 8 additions & 2 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ var (
)

func DefaultConfig(ctx context.Context) (*Config, error) {
asRoot := stdos.Geteuid() == 0 // running as root

cfg := &Config{
APIVersion: DefaultAPIVersion,

Expand Down Expand Up @@ -163,8 +165,12 @@ func DefaultConfig(ctx context.Context) (*Config, error) {

if runtime.GOOS == "linux" {
if dmesg.DmesgExists() {
log.Logger.Debugw("auto-detected dmesg -- configuring dmesg component")
cfg.Components[dmesg.Name] = dmesg.DefaultConfig()
if asRoot {
log.Logger.Debugw("auto-detected dmesg -- configuring dmesg component")
cfg.Components[dmesg.Name] = dmesg.DefaultConfig()
} else {
log.Logger.Debugw("auto-detected dmesg but running as root -- skipping")
}
}
} else {
log.Logger.Debugw("auto-detect dmesg not supported -- skipping", "os", runtime.GOOS)
Expand Down
Loading

0 comments on commit 44175c0

Please sign in to comment.