-
Notifications
You must be signed in to change notification settings - Fork 534
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: remove wrapperd and launch processes directly #9489
base: main
Are you sure you want to change the base?
Changes from 13 commits
9ef81ed
a29c71a
860c141
d0cd5e5
386d2fb
b636c53
142a445
51c687b
f65c9b2
61dc215
fdaa3a2
3c509e0
4f5a68d
2379f0e
1e12335
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,9 @@ replace ( | |
// forked go-yaml that introduces RawYAML interface, which can be used to populate YAML fields using bytes | ||
// which are then encoded as a valid YAML blocks with proper indentiation | ||
gopkg.in/yaml.v3 => github.com/unix4ever/yaml v0.0.0-20220527175918-f17b0f05cf2c | ||
|
||
// improved error logging | ||
kernel.org/pub/linux/libs/security/libcap/cap => github.com/dsseng/go-libcap/cap v0.0.0-20241015195416-c3ab072bd718 | ||
Comment on lines
+27
to
+29
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. todo remove |
||
) | ||
|
||
// fd-leak related replacements: https://github.com/siderolabs/talos/issues/9412 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,12 +8,19 @@ import ( | |
"fmt" | ||
"io" | ||
"os" | ||
"os/exec" | ||
"slices" | ||
"strings" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/containerd/cgroups/v3" | ||
"github.com/containerd/cgroups/v3/cgroup1" | ||
"github.com/containerd/cgroups/v3/cgroup2" | ||
"github.com/containerd/containerd/v2/pkg/sys" | ||
"github.com/siderolabs/gen/optional" | ||
"github.com/siderolabs/gen/xslices" | ||
"github.com/siderolabs/go-cmd/pkg/cmd/proc/reaper" | ||
"kernel.org/pub/linux/libs/security/libcap/cap" | ||
|
||
"github.com/siderolabs/talos/internal/app/machined/pkg/system/events" | ||
"github.com/siderolabs/talos/internal/app/machined/pkg/system/runner" | ||
|
@@ -78,27 +85,85 @@ func (p *processRunner) Close() error { | |
} | ||
|
||
type commandWrapper struct { | ||
cmd *exec.Cmd | ||
launcher *cap.Launcher | ||
ctty optional.Optional[int] | ||
stdin uintptr | ||
stdout uintptr | ||
stderr uintptr | ||
afterStart func() | ||
afterTermination func() error | ||
} | ||
|
||
func dropCaps(droppedCapabilities []string, launcher *cap.Launcher) error { | ||
droppedCaps := strings.Join(droppedCapabilities, ",") | ||
|
||
if droppedCaps != "" { | ||
caps := strings.Split(droppedCaps, ",") | ||
dropCaps := xslices.Map(caps, func(c string) cap.Value { | ||
capability, capErr := cap.FromName(c) | ||
if capErr != nil { | ||
fmt.Printf("failed to parse capability: %s", capErr) | ||
} | ||
|
||
return capability | ||
}) | ||
|
||
iab := cap.IABGetProc() | ||
if err := iab.SetVector(cap.Bound, true, dropCaps...); err != nil { | ||
return fmt.Errorf("failed to set capabilities: %w", err) | ||
} | ||
|
||
launcher.SetIAB(iab) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// This callback is run in the thread before executing child process. | ||
func beforeExecCallback(pa *syscall.ProcAttr, data interface{}) error { | ||
wrapper, ok := data.(*commandWrapper) | ||
if !ok { | ||
return fmt.Errorf("failed to get command info") | ||
} | ||
|
||
ctty, cttySet := wrapper.ctty.Get() | ||
if cttySet { | ||
if pa.Sys == nil { | ||
pa.Sys = &syscall.SysProcAttr{} | ||
} | ||
|
||
pa.Sys.Ctty = ctty | ||
pa.Sys.Setsid = true | ||
pa.Sys.Setctty = true | ||
} | ||
|
||
pa.Files = []uintptr{ | ||
wrapper.stdin, | ||
wrapper.stdout, | ||
wrapper.stderr, | ||
} | ||
|
||
// TODO: use pa.Sys.CgroupFD here when we can be sure clone3 is available | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's absolutely do this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So implement 2 different pathways? We use this code in container mode as well, and we cannot really expect containers to be ran on kernel 5.5+ which has clone3 |
||
return nil | ||
} | ||
|
||
//nolint:gocyclo | ||
func (p *processRunner) build() (commandWrapper, error) { | ||
args := []string{ | ||
fmt.Sprintf("-name=%s", p.args.ID), | ||
fmt.Sprintf("-dropped-caps=%s", strings.Join(p.opts.DroppedCapabilities, ",")), | ||
fmt.Sprintf("-cgroup-path=%s", cgroup.Path(p.opts.CgroupPath)), | ||
fmt.Sprintf("-oom-score=%d", p.opts.OOMScoreAdj), | ||
fmt.Sprintf("-uid=%d", p.opts.UID), | ||
} | ||
wrapper := commandWrapper{} | ||
|
||
env := slices.Concat([]string{"PATH=" + constants.PATH}, p.opts.Env, os.Environ()) | ||
launcher := cap.NewLauncher(p.args.ProcessArgs[0], p.args.ProcessArgs, env) | ||
|
||
args = append(args, p.args.ProcessArgs...) | ||
if p.opts.UID > 0 { | ||
launcher.SetUID(int(p.opts.UID)) | ||
} | ||
|
||
cmd := exec.Command("/sbin/wrapperd", args...) | ||
// reduce capabilities and assign them to launcher | ||
if err := dropCaps(p.opts.DroppedCapabilities, launcher); err != nil { | ||
return commandWrapper{}, err | ||
} | ||
|
||
// Set the environment for the service. | ||
cmd.Env = append([]string{fmt.Sprintf("PATH=%s", constants.PATH)}, p.opts.Env...) | ||
launcher.Callback(beforeExecCallback) | ||
|
||
// Setup logging. | ||
w, err := p.opts.LoggingManager.ServiceLog(p.args.ID).Writer() | ||
|
@@ -113,12 +178,40 @@ func (p *processRunner) build() (commandWrapper, error) { | |
writer = w | ||
} | ||
|
||
// As MultiWriter is not a file, we need to create a pipe | ||
// Pipe writer is passed to the child process while we read from the read side | ||
pr, pw, err := os.Pipe() | ||
if err != nil { | ||
return commandWrapper{}, err | ||
} | ||
|
||
go io.Copy(writer, pr) //nolint:errcheck | ||
|
||
// close the writer if we exit early due to an error | ||
closeWriter := true | ||
|
||
closeLogging := func() (e error) { | ||
err := w.Close() | ||
if err != nil { | ||
e = err | ||
} | ||
|
||
err = pr.Close() | ||
if err != nil { | ||
e = err | ||
} | ||
|
||
err = pw.Close() | ||
if err != nil { | ||
e = err | ||
} | ||
|
||
return e | ||
} | ||
|
||
defer func() { | ||
if closeWriter { | ||
w.Close() //nolint:errcheck | ||
closeLogging() //nolint:errcheck | ||
} | ||
}() | ||
|
||
|
@@ -130,7 +223,7 @@ func (p *processRunner) build() (commandWrapper, error) { | |
return commandWrapper{}, err | ||
} | ||
|
||
cmd.Stdin = stdin | ||
wrapper.stdin = stdin.Fd() | ||
|
||
afterStartFuncs = append(afterStartFuncs, func() { | ||
stdin.Close() //nolint:errcheck | ||
|
@@ -143,13 +236,13 @@ func (p *processRunner) build() (commandWrapper, error) { | |
return commandWrapper{}, err | ||
} | ||
|
||
cmd.Stdout = stdout | ||
wrapper.stdout = stdout.Fd() | ||
|
||
afterStartFuncs = append(afterStartFuncs, func() { | ||
stdout.Close() //nolint:errcheck | ||
}) | ||
} else { | ||
cmd.Stdout = writer | ||
wrapper.stdout = pw.Fd() | ||
} | ||
|
||
if p.opts.StderrFile != "" { | ||
|
@@ -158,37 +251,60 @@ func (p *processRunner) build() (commandWrapper, error) { | |
return commandWrapper{}, err | ||
} | ||
|
||
cmd.Stderr = stderr | ||
wrapper.stderr = stderr.Fd() | ||
|
||
afterStartFuncs = append(afterStartFuncs, func() { | ||
stderr.Close() //nolint:errcheck | ||
}) | ||
} else { | ||
cmd.Stderr = writer | ||
wrapper.stderr = pw.Fd() | ||
} | ||
|
||
ctty, cttySet := p.opts.Ctty.Get() | ||
if cttySet { | ||
cmd.SysProcAttr = &syscall.SysProcAttr{ | ||
Setsid: true, | ||
Setctty: true, | ||
Ctty: ctty, | ||
closeWriter = false | ||
|
||
wrapper.launcher = launcher | ||
wrapper.afterStart = func() { | ||
for _, f := range afterStartFuncs { | ||
f() | ||
} | ||
} | ||
wrapper.afterTermination = closeLogging | ||
wrapper.ctty = p.opts.Ctty | ||
|
||
closeWriter = false | ||
return wrapper, nil | ||
} | ||
|
||
return commandWrapper{ | ||
cmd: cmd, | ||
afterStart: func() { | ||
for _, f := range afterStartFuncs { | ||
f() | ||
} | ||
}, | ||
afterTermination: func() error { | ||
return w.Close() | ||
}, | ||
}, nil | ||
// Apply cgroup and OOM score after the process is launched. | ||
func applyProperties(p *processRunner, pid int) error { | ||
path := cgroup.Path(p.opts.CgroupPath) | ||
|
||
if cgroups.Mode() == cgroups.Unified { | ||
cgv2, err := cgroup2.Load(path) | ||
if err != nil { | ||
return fmt.Errorf("failed to load cgroup %s: %w", path, err) | ||
} | ||
|
||
if err := cgv2.AddProc(uint64(pid)); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we move to cgroupFD? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't we also use this launch method when running in a container (thus not sure we have clone3 available)? Cgroup in clone3 is actually only available since kernel 5.7 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point, let's skip it for now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we optionally have two paths? like auto-detect? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unsure whether that's going to be beneficial, but it's possible for sure |
||
return fmt.Errorf("failed to move process %s to cgroup: %w", p, err) | ||
} | ||
} else { | ||
cgv1, err := cgroup1.Load(cgroup1.StaticPath(path)) | ||
if err != nil { | ||
return fmt.Errorf("failed to load cgroup %s: %w", path, err) | ||
} | ||
|
||
if err := cgv1.Add(cgroup1.Process{ | ||
Pid: pid, | ||
}); err != nil { | ||
return fmt.Errorf("failed to move process %s to cgroup: %w", p, err) | ||
} | ||
} | ||
|
||
if err := sys.AdjustOOMScore(pid, p.opts.OOMScoreAdj); err != nil { | ||
return fmt.Errorf("failed to change OOMScoreAdj of process %s to %d: %w", p, p.opts.OOMScoreAdj, err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (p *processRunner) run(eventSink events.Recorder) error { | ||
|
@@ -206,20 +322,29 @@ func (p *processRunner) run(eventSink events.Recorder) error { | |
defer reaper.Stop(notifyCh) | ||
} | ||
|
||
err = cmdWrapper.cmd.Start() | ||
pid, err := cmdWrapper.launcher.Launch(&cmdWrapper) | ||
if err != nil { | ||
return fmt.Errorf("error starting process: %w", err) | ||
} | ||
|
||
if err := applyProperties(p, pid); err != nil { | ||
return err | ||
} | ||
|
||
cmdWrapper.afterStart() | ||
|
||
eventSink(events.StateRunning, "Process %s started with PID %d", p, pid) | ||
|
||
process, err := os.FindProcess(pid) | ||
if err != nil { | ||
return fmt.Errorf("error starting process: %w", err) | ||
return fmt.Errorf("could not find process: %w", err) | ||
} | ||
|
||
eventSink(events.StateRunning, "Process %s started with PID %d", p, cmdWrapper.cmd.Process.Pid) | ||
|
||
waitCh := make(chan error) | ||
|
||
go func() { | ||
waitCh <- reaper.WaitWrapper(usingReaper, notifyCh, cmdWrapper.cmd) | ||
_, err := process.Wait() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should use reaper, don't change this part please There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, refactoring here to accept raw pid and so: https://github.com/siderolabs/go-cmd/blob/main/pkg/cmd/proc/reaper/wait.go#L13-L38 |
||
waitCh <- err | ||
}() | ||
|
||
select { | ||
|
@@ -231,7 +356,7 @@ func (p *processRunner) run(eventSink events.Recorder) error { | |
eventSink(events.StateStopping, "Sending SIGTERM to %s", p) | ||
|
||
//nolint:errcheck | ||
_ = cmdWrapper.cmd.Process.Signal(syscall.SIGTERM) | ||
_ = process.Signal(syscall.SIGTERM) | ||
} | ||
|
||
select { | ||
|
@@ -243,7 +368,7 @@ func (p *processRunner) run(eventSink events.Recorder) error { | |
eventSink(events.StateStopping, "Sending SIGKILL to %s", p) | ||
|
||
//nolint:errcheck | ||
_ = cmdWrapper.cmd.Process.Signal(syscall.SIGKILL) | ||
_ = process.Signal(syscall.SIGKILL) | ||
} | ||
|
||
// wait for process to terminate | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: remove as this was only debug