Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wasi: use File.Poll for all blocking FDs in poll_oneoff #1606

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 37 additions & 38 deletions RATIONALE.md
Original file line number Diff line number Diff line change
Expand Up @@ -1376,58 +1376,54 @@ as socket handles.
### Clock Subscriptions

As detailed above in [sys.Nanosleep](#sysnanosleep), `poll_oneoff` handles
relative clock subscriptions. In our implementation we use `sys.Nanosleep()`
for this purpose in most cases, except when polling for interactive input
from `os.Stdin` (see more details below).
relative clock subscriptions. In our implementation we use `sys.Nanosleep()`.

### FdRead and FdWrite Subscriptions

When subscribing a file descriptor (except `Stdin`) for reads or writes,
the implementation will generally return immediately with success, unless
the file descriptor is unknown. The file descriptor is not checked further
for new incoming data. Any timeout is cancelled, and the API call is able
to return, unless there are subscriptions to `Stdin`: these are handled
separately.
to return, unless there are subscriptions to blocking file descriptors:
these are handled separately.

### FdRead and FdWrite Subscription to Stdin
### FdRead and FdWrite Subscription to Blocking File Descriptors

Subscribing `Stdin` for reads (writes make no sense and cause an error),
requires extra care: wazero allows to configure a custom reader for `Stdin`.
Subscribing a file descriptor for reads requires extra care:
wazero allows to plug an entire custom virtual file system,
and it also allows to configure custom readers and writers for standard I/O
descriptors.

In general, if a custom reader is found, the behavior will be the same
as for regular file descriptors: data is assumed to be present and
a success is written back to the result buffer.
In general, if the file reports to be in non-blocking mode,
the behavior will be the same as for regular file descriptors:
data is assumed to be present and a success is written back to the result buffer.

However, if the reader is detected to read from `os.Stdin`,
a special code path is followed, invoking `sysfs.poll()`.
However, if the file is reported to be in blocking mode (the default),
the `fsapi.File.Poll()` method is invoked.

`sysfs.poll()` is a wrapper for `poll(2)` on POSIX systems,
and it is emulated on Windows.
For regular files, stdin, pipes and sockets, `fsapi.File.Poll()`
is a wrapper for `poll(2)` on POSIX systems, and it is emulated on Windows.
Virtual file systems may provide their own custom implementation.

### Poll on POSIX

On POSIX systems, `poll(2)` allows to wait for incoming data on a file
descriptor, and block until either data becomes available or the timeout
expires.

Usage of `syfs.poll()` is currently only reserved for standard input, because

1. it is really only necessary to handle interactive input: otherwise,
there is no way in Go to peek from Standard Input without actually
reading (and thus consuming) from it;

2. if `Stdin` is connected to a pipe, it is ok in most cases to return
with success immediately;

3. `syfs.poll()` is currently a blocking call, irrespective of goroutines,
because the underlying syscall is; thus, it is better to limit its usage.
Usage of `syfs.poll()` is reserved to blocking I/O. In particular,
it is used most often with pipes (such as `os.Stdin`) and TCP sockets.

So, if the subscription is for `os.Stdin` and the handle is detected
to correspond to an interactive session, then `sysfs.poll()` will be
invoked with a the `Stdin` handle *and* the timeout.
invoked with the `Stdin` file descriptor.

This also means that in this specific case, the timeout is uninterruptible,
unless data becomes available on `Stdin` itself.
In order to avoid a blocking call, the underlying `sysfs.poll()` call
is repeatedly invoked with a 0 timeout at given intervals (currently 100 ms,
until the given timeout expires).

The timeout and the tick both honor the settings for `sys.Nanosleep()`.
This also implies that `sys.Nanosleep()` has to be properly configured.

### Select on Windows

Expand Down Expand Up @@ -1457,15 +1453,18 @@ which plays nicely with the rest of the Go runtime.

### Impact of blocking

Because this is a blocking syscall, it will also block the carrier thread of
the goroutine, preventing any means to support context cancellation directly.

There are ways to obviate this issue. We outline here one idea, that is however
not currently implemented. A common approach to support context cancellation is
to add a signal file descriptor to the set, e.g. the read-end of a pipe or an
eventfd on Linux. When the context is canceled, we may unblock a Select call by
writing to the fd, causing it to return immediately. This however requires to
do a bit of housekeeping to hide the "special" FD from the end-user.
Because this is a blocking syscall, invoking it with a nonzero timeout will also
block the carrier thread of the goroutine, preventing any means
to support context cancellation directly.

We obviate this by invoking `poll` with a 0 timeout repeatedly,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand this. why not poll with 100ms vs poll zero+sleep? Are you suggesting that the poll implementation blocks too long even if you put 100ms? If so maybe the above paragraph needs to clarify this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, if we put a 100ms timeout, then the syscall will block for 100ms, which means it will also block the underlying OS thread. I will add a clarification.

at given intervals (currently, 100 ms). We outline here another idea:
a common approach to support context cancellation is to add a signal
file descriptor to the set, e.g. the read-end of a pipe or an
eventfd on Linux. When the context is canceled, we may unblock a Select
call by writing to the fd, causing it to return immediately.
This however requires to do a bit of housekeeping to hide the "special" FD
from the end-user.

[poll_oneoff]: https://github.com/WebAssembly/wasi-poll#why-is-the-function-called-poll_oneoff
[async-io-windows]: https://tinyclouds.org/iocp_links
Expand Down
115 changes: 80 additions & 35 deletions imports/wasi_snapshot_preview1/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,18 @@ var pollOneoff = newHostFunc(
"in", "out", "nsubscriptions", "result.nevents",
)

type event struct {
type pollEvent struct {
eventType byte
userData []byte
errno wasip1.Errno
outOffset uint32
}

type filePollEvent struct {
f *internalsys.FileEntry
e *pollEvent
}

func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno {
in := uint32(params[0])
out := uint32(params[1])
Expand Down Expand Up @@ -86,8 +91,8 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno

// Extract FS context, used in the body of the for loop for FS access.
fsc := mod.(*wasm.ModuleInstance).Sys.FS()
// Slice of events that are processed out of the loop (blocking stdin subscribers).
var blockingStdinSubs []*event
// Slice of events that are processed out of the loop (blocking subscribers).
var blockingSubs []*filePollEvent
// The timeout is initialized at max Duration, the loop will find the minimum.
var timeout time.Duration = 1<<63 - 1
// Count of all the clock subscribers that have been already written back to outBuf.
Expand All @@ -106,7 +111,7 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno
argBuf := inBuf[inOffset+8+8:]
userData := inBuf[inOffset : inOffset+8]

evt := &event{
evt := &pollEvent{
eventType: eventType,
userData: userData,
errno: wasip1.ErrnoSuccess,
Expand Down Expand Up @@ -135,11 +140,11 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno
evt.errno = wasip1.ErrnoBadf
writeEvent(outBuf, evt)
readySubs++
continue
} else if fd == internalsys.FdStdin && !file.File.IsNonblock() {
// if the fd is Stdin, and it is in non-blocking mode,
// do not ack yet, append to a slice for delayed evaluation.
blockingStdinSubs = append(blockingStdinSubs, evt)
} else if !file.File.IsNonblock() {
// If the fd is blocking, do not ack yet,
// append to a slice for delayed evaluation.
fe := &filePollEvent{f: file, e: evt}
blockingSubs = append(blockingSubs, fe)
} else {
writeEvent(outBuf, evt)
readySubs++
Expand All @@ -161,44 +166,34 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno
}
}

// If there are subscribers with data ready, we have already written them to outBuf,
// and we don't need to wait for the timeout: clear it.
if readySubs != 0 {
timeout = 0
}
sysCtx := mod.(*wasm.ModuleInstance).Sys
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the section below has been reordered for clarity


// If there are blocking stdin subscribers, check for data with given timeout.
if len(blockingStdinSubs) > 0 {
stdin, ok := fsc.LookupFile(internalsys.FdStdin)
if !ok {
return sys.EBADF
}
// Wait for the timeout to expire, or for some data to become available on Stdin.
stdinReady, errno := stdin.File.Poll(sys.POLLIN, int32(timeout.Milliseconds()))
// There are no blocking subscribers, we just wait for the given timeout.
evacchi marked this conversation as resolved.
Show resolved Hide resolved
if len(blockingSubs) == 0 {
sysCtx.Nanosleep(int64(timeout))
} else {
// If there are blocking subscribers, check the fds using poll.
n, errno := pollFileEventsOnce(blockingSubs, outBuf)
if errno != 0 {
return errno
}
if stdinReady {
// stdin has data ready to for reading, write back all the events
for i := range blockingStdinSubs {
readySubs++
evt := blockingStdinSubs[i]
evt.errno = 0
writeEvent(outBuf, evt)
readySubs += n

// If there are any subscribers ready, including those we checked earlier,
// we don't need to poll further; otherwise, poll until the given timeout.
if readySubs == 0 {
readySubs, errno = pollFileEventsUntil(sysCtx, timeout, blockingSubs, outBuf)
if errno != 0 {
return errno
}
}
} else {
// No subscribers, just wait for the given timeout.
sysCtx := mod.(*wasm.ModuleInstance).Sys
sysCtx.Nanosleep(int64(timeout))
}

if readySubs != nsubscriptions {
if !mod.Memory().WriteUint32Le(resultNevents, readySubs+clockEvents) {
return sys.EFAULT
}
}

return 0
}

Expand Down Expand Up @@ -233,10 +228,60 @@ func processClockEvent(inBuf []byte) (time.Duration, sys.Errno) {

// writeEvent writes the event corresponding to the processed subscription.
// https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#-event-struct
func writeEvent(outBuf []byte, evt *event) {
func writeEvent(outBuf []byte, evt *pollEvent) {
copy(outBuf[evt.outOffset:], evt.userData) // userdata
outBuf[evt.outOffset+8] = byte(evt.errno) // uint16, but safe as < 255
outBuf[evt.outOffset+9] = 0
le.PutUint32(outBuf[evt.outOffset+10:], uint32(evt.eventType))
// TODO: When FD events are supported, write outOffset+16
}

// closeChAfter closes a channel after the given timeout.
// It is similar to time.After but it uses sysCtx.Nanosleep.
func closeChAfter(sysCtx *internalsys.Context, timeout time.Duration, timeoutCh chan struct{}) {
sysCtx.Nanosleep(int64(timeout))
close(timeoutCh)
}

// pollFileEventsOnce invokes Poll on each sys.FileEntry in the given slice
// and writes back the result to outBuf for each file reported "ready";
// i.e., when Poll() returns true, and no error.
func pollFileEventsOnce(evts []*filePollEvent, outBuf []byte) (n uint32, errno sys.Errno) {
// For simplicity, we assume that there are no multiple subscriptions for the same file.
for _, e := range evts {
isReady, errno := e.f.File.Poll(sys.POLLIN, 0)
if errno != 0 {
return 0, errno
}
if isReady {
e.e.errno = 0
writeEvent(outBuf, e.e)
n++
}
}
return
}

// pollFileEventsUntil repeatedly invokes pollFileEventsOnce until the given timeout is reached.
// The poll interval is currently fixed at 100 millis.
func pollFileEventsUntil(sysCtx *internalsys.Context, timeout time.Duration, blockingSubs []*filePollEvent, outBuf []byte) (n uint32, errno sys.Errno) {
timeoutCh := make(chan struct{}, 1)
go closeChAfter(sysCtx, timeout, timeoutCh)

pollInterval := 100 * time.Millisecond
ticker := time.NewTicker(pollInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi closeChAfter we are intentionally using the context clock, but this will use a real one..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops!

defer ticker.Stop()

for {
select {
case <-timeoutCh:
// Give one last chance before returning.
return pollFileEventsOnce(blockingSubs, outBuf)
case <-ticker.C:
n, errno = pollFileEventsOnce(blockingSubs, outBuf)
if errno != 0 || n > 0 {
return
}
}
}
}
6 changes: 3 additions & 3 deletions imports/wasi_snapshot_preview1/sock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func Test_sockAccept(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctx := experimentalsock.WithConfig(testCtx, experimentalsock.NewConfig().WithTCPListener("127.0.0.1", 0))

mod, r, log := requireProxyModuleWithContext(ctx, t, wazero.NewModuleConfig())
mod, r, log := requireProxyModuleWithContext(ctx, t, wazero.NewModuleConfig().WithSysNanosleep())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

poll_oneoff now respects SysNanosleep thus, it has to be configured properly

defer r.Close(testCtx)

// Dial the socket so that a call to accept doesn't hang.
Expand Down Expand Up @@ -100,7 +100,7 @@ func Test_sockShutdown(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctx := experimentalsock.WithConfig(testCtx, experimentalsock.NewConfig().WithTCPListener("127.0.0.1", 0))

mod, r, log := requireProxyModuleWithContext(ctx, t, wazero.NewModuleConfig())
mod, r, log := requireProxyModuleWithContext(ctx, t, wazero.NewModuleConfig().WithSysNanosleep())
defer r.Close(testCtx)

// Dial the socket so that a call to accept doesn't hang.
Expand Down Expand Up @@ -326,7 +326,7 @@ func Test_sockSend(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctx := experimentalsock.WithConfig(testCtx, experimentalsock.NewConfig().WithTCPListener("127.0.0.1", 0))

mod, r, log := requireProxyModuleWithContext(ctx, t, wazero.NewModuleConfig())
mod, r, log := requireProxyModuleWithContext(ctx, t, wazero.NewModuleConfig().WithSysNanosleep())
defer r.Close(testCtx)

// Dial the socket so that a call to accept doesn't hang.
Expand Down
9 changes: 5 additions & 4 deletions imports/wasi_snapshot_preview1/wasi_stdlib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,12 @@ func Test_Poll(t *testing.T) {
tc := tt
t.Run(tc.name, func(t *testing.T) {
start := time.Now()
console := compileAndRunWithPreStart(t, testCtx, wazero.NewModuleConfig().WithArgs(tc.args...), wasmZigCc,
console := compileAndRunWithPreStart(t, testCtx, wazero.NewModuleConfig().WithSysNanosleep().WithArgs(tc.args...), wasmZigCc,
func(t *testing.T, mod api.Module) {
setStdin(t, mod, tc.stdin)
})
elapsed := time.Since(start)
require.True(t, elapsed >= tc.expectedTimeout)
require.True(t, elapsed >= tc.expectedTimeout, "Elapsed %d < expected %d", elapsed, tc.expectedTimeout)
require.Equal(t, tc.expectedOutput+"\n", console)
})
}
Expand All @@ -398,7 +398,8 @@ func Test_Sleep(t *testing.T) {
moduleConfig := wazero.NewModuleConfig().WithArgs("wasi", "sleepmillis", "100").WithSysNanosleep()
start := time.Now()
console := compileAndRun(t, testCtx, moduleConfig, wasmZigCc)
require.True(t, time.Since(start) >= 100*time.Millisecond)
elapsed := time.Since(start)
require.True(t, elapsed >= 100*time.Millisecond, "elapsed %d ns < 100 ms", elapsed)
require.Equal(t, "OK\n", console)
}

Expand Down Expand Up @@ -455,7 +456,7 @@ func Test_Sock(t *testing.T) {
func testSock(t *testing.T, bin []byte) {
sockCfg := experimentalsock.NewConfig().WithTCPListener("127.0.0.1", 0)
ctx := experimentalsock.WithConfig(testCtx, sockCfg)
moduleConfig := wazero.NewModuleConfig().WithArgs("wasi", "sock")
moduleConfig := wazero.NewModuleConfig().WithArgs("wasi", "sock").WithSysNanosleep()
tcpAddrCh := make(chan *net.TCPAddr, 1)
ch := make(chan string, 1)
go func() {
Expand Down
10 changes: 10 additions & 0 deletions internal/sysfs/sock_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func (f *tcpListenerFile) Accept() (socketapi.TCPConn, sys.Errno) {
return &tcpConnFile{fd: uintptr(nfd)}, 0
}

// Poll implements the same method as documented on sys.File
func (f *tcpListenerFile) Poll(flag sys.Pflag, timeoutMillis int32) (ready bool, errno sys.Errno) {
return poll(f.fd, flag, timeoutMillis)
}

// SetNonblock implements the same method as documented on sys.File
func (f *tcpListenerFile) SetNonblock(enabled bool) sys.Errno {
return sys.UnwrapOSError(setNonblock(f.fd, enabled))
Expand Down Expand Up @@ -94,6 +99,11 @@ func (f *tcpConnFile) SetNonblock(enabled bool) (errno sys.Errno) {
return sys.UnwrapOSError(setNonblock(f.fd, enabled))
}

// Poll implements the same method as documented on sys.File
func (f *tcpConnFile) Poll(flag sys.Pflag, timeoutMillis int32) (ready bool, errno sys.Errno) {
return poll(f.fd, flag, timeoutMillis)
}

// Read implements the same method as documented on sys.File
func (f *tcpConnFile) Read(buf []byte) (n int, errno sys.Errno) {
n, err := syscall.Read(int(f.fd), buf)
Expand Down
Loading
Loading