diff --git a/imports/wasi_snapshot_preview1/poll.go b/imports/wasi_snapshot_preview1/poll.go index 0119b5410fb..a3eca018b30 100644 --- a/imports/wasi_snapshot_preview1/poll.go +++ b/imports/wasi_snapshot_preview1/poll.go @@ -46,7 +46,6 @@ type event struct { eventType byte userData []byte errno wasip1.Errno - outOffset uint32 } func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno { @@ -90,16 +89,16 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno var blockingStdinSubs []*event // The timeout is initialized at max Duration, the loop will find the minimum. var timeout time.Duration = 1<<63 - 1 - // Count of all the clock subscribers that have been already written back to outBuf. - clockEvents := uint32(0) - // Count of all the non-clock subscribers that have been already written back to outBuf. - readySubs := uint32(0) + // Count of all the subscriptions that have been already written back to outBuf. + // nevents*32 returns at all times the offset where the next event should be written: + // this way we ensure that there are no gaps between records. + nevents := uint32(0) // Layout is subscription_u: Union // https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#subscription_u for i := uint32(0); i < nsubscriptions; i++ { inOffset := i * 48 - outOffset := i * 32 + outOffset := nevents * 32 eventType := inBuf[inOffset+8] // +8 past userdata // +8 past userdata +8 contents_offset @@ -110,12 +109,10 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno eventType: eventType, userData: userData, errno: wasip1.ErrnoSuccess, - outOffset: outOffset, } switch eventType { case wasip1.EventTypeClock: // handle later - clockEvents++ newTimeout, err := processClockEvent(argBuf) if err != 0 { return err @@ -125,7 +122,8 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno timeout = newTimeout } // Ack the clock event to the outBuf. - writeEvent(outBuf, evt) + writeEvent(outBuf[outOffset:], evt) + nevents++ case wasip1.EventTypeFdRead: fd := int32(le.Uint32(argBuf)) if fd < 0 { @@ -133,16 +131,15 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno } if file, ok := fsc.LookupFile(fd); !ok { evt.errno = wasip1.ErrnoBadf - writeEvent(outBuf, evt) - readySubs++ - continue - } else if fd == internalsys.FdStdin && !file.File.IsNonblock() { - // if the fd is Stdin, and it is in non-blocking mode, + writeEvent(outBuf[outOffset:], evt) + nevents++ + } else if fd != internalsys.FdStdin && file.File.IsNonblock() { + writeEvent(outBuf[outOffset:], evt) + nevents++ + } else { + // if the fd is Stdin, and it is in blocking mode, // do not ack yet, append to a slice for delayed evaluation. blockingStdinSubs = append(blockingStdinSubs, evt) - } else { - writeEvent(outBuf, evt) - readySubs++ } case wasip1.EventTypeFdWrite: fd := int32(le.Uint32(argBuf)) @@ -154,47 +151,46 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno } else { evt.errno = wasip1.ErrnoBadf } - readySubs++ - writeEvent(outBuf, evt) + nevents++ + writeEvent(outBuf[outOffset:], evt) default: return sys.EINVAL } } - // If there are subscribers with data ready, we have already written them to outBuf, - // and we don't need to wait for the timeout: clear it. - if readySubs != 0 { - timeout = 0 + sysCtx := mod.(*wasm.ModuleInstance).Sys + if nevents == nsubscriptions { + // We already wrote back all the results. We already wrote this number + // earlier to offset `resultNevents`. + // We only need to observe the timeout (nonzero if there are clock subscriptions) + // and return. + if timeout > 0 { + sysCtx.Nanosleep(int64(timeout)) + } + return 0 } // If there are blocking stdin subscribers, check for data with given timeout. - if len(blockingStdinSubs) > 0 { - stdin, ok := fsc.LookupFile(internalsys.FdStdin) - if !ok { - return sys.EBADF - } - // Wait for the timeout to expire, or for some data to become available on Stdin. - stdinReady, errno := stdin.File.Poll(sys.POLLIN, int32(timeout.Milliseconds())) - if errno != 0 { - return errno - } - if stdinReady { - // stdin has data ready to for reading, write back all the events - for i := range blockingStdinSubs { - readySubs++ - evt := blockingStdinSubs[i] - evt.errno = 0 - writeEvent(outBuf, evt) - } + stdin, ok := fsc.LookupFile(internalsys.FdStdin) + if !ok { + return sys.EBADF + } + // Wait for the timeout to expire, or for some data to become available on Stdin. + + if stdinReady, errno := stdin.File.Poll(sys.POLLIN, int32(timeout.Milliseconds())); errno != 0 { + return errno + } else if stdinReady { + // stdin has data ready to for reading, write back all the events + for i := range blockingStdinSubs { + evt := blockingStdinSubs[i] + evt.errno = 0 + writeEvent(outBuf[nevents*32:], evt) + nevents++ } - } else { - // No subscribers, just wait for the given timeout. - sysCtx := mod.(*wasm.ModuleInstance).Sys - sysCtx.Nanosleep(int64(timeout)) } - if readySubs != nsubscriptions { - if !mod.Memory().WriteUint32Le(resultNevents, readySubs+clockEvents) { + if nevents != nsubscriptions { + if !mod.Memory().WriteUint32Le(resultNevents, nevents) { return sys.EFAULT } } @@ -234,9 +230,9 @@ func processClockEvent(inBuf []byte) (time.Duration, sys.Errno) { // writeEvent writes the event corresponding to the processed subscription. // https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#-event-struct func writeEvent(outBuf []byte, evt *event) { - copy(outBuf[evt.outOffset:], evt.userData) // userdata - outBuf[evt.outOffset+8] = byte(evt.errno) // uint16, but safe as < 255 - outBuf[evt.outOffset+9] = 0 - le.PutUint32(outBuf[evt.outOffset+10:], uint32(evt.eventType)) + copy(outBuf, evt.userData) // userdata + outBuf[8] = byte(evt.errno) // uint16, but safe as < 255 + outBuf[9] = 0 + le.PutUint32(outBuf[10:], uint32(evt.eventType)) // TODO: When FD events are supported, write outOffset+16 } diff --git a/imports/wasi_snapshot_preview1/poll_test.go b/imports/wasi_snapshot_preview1/poll_test.go index 6db7d6926d8..e077ec65b95 100644 --- a/imports/wasi_snapshot_preview1/poll_test.go +++ b/imports/wasi_snapshot_preview1/poll_test.go @@ -2,6 +2,7 @@ package wasi_snapshot_preview1_test import ( "io/fs" + "os" "strings" "testing" "time" @@ -150,6 +151,12 @@ func Test_pollOneoff_Errors(t *testing.T) { } func Test_pollOneoff_Stdin(t *testing.T) { + w, r, err := os.Pipe() + require.NoError(t, err) + defer w.Close() + defer r.Close() + _, _ = w.Write([]byte("wazero")) + tests := []struct { name string in, out, nsubscriptions, resultNevents uint32 @@ -192,7 +199,6 @@ func Test_pollOneoff_Stdin(t *testing.T) { mem: concat( clockNsSub(20*1000*1000), fdReadSub, - singleton('?'), ), expectedErrno: wasip1.ErrnoSuccess, out: 128, // past in @@ -227,7 +233,6 @@ func Test_pollOneoff_Stdin(t *testing.T) { mem: concat( clockNsSub(20*1000*1000), fdReadSub, - singleton('?'), ), expectedErrno: wasip1.ErrnoSuccess, out: 128, // past in @@ -262,7 +267,6 @@ func Test_pollOneoff_Stdin(t *testing.T) { mem: concat( clockNsSub(20*1000*1000), fdReadSub, - singleton('?'), ), expectedErrno: wasip1.ErrnoSuccess, out: 128, // past in @@ -297,7 +301,6 @@ func Test_pollOneoff_Stdin(t *testing.T) { mem: concat( clockNsSub(20*1000*1000), fdReadSub, - singleton('?'), ), expectedErrno: wasip1.ErrnoSuccess, out: 128, // past in @@ -332,7 +335,6 @@ func Test_pollOneoff_Stdin(t *testing.T) { mem: concat( clockNsSub(20*1000*1000), fdReadSub, - singleton('?'), ), expectedErrno: wasip1.ErrnoSuccess, @@ -357,6 +359,52 @@ func Test_pollOneoff_Stdin(t *testing.T) { expectedLog: ` ==> wasi_snapshot_preview1.poll_oneoff(in=0,out=128,nsubscriptions=2) <== (nevents=1,errno=ESUCCESS) +`, + }, + { + name: "pollable pipe, multiple subs, events returned out of order", + nsubscriptions: 3, + expectedNevents: 3, + mem: concat( + fdReadSub, + clockNsSub(20*1000*1000), + // Illegal file fd with custom user data to recognize it in the event buffer. + fdReadSubFdWithUserData(100, []byte{0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77})), + stdin: &sys.StdinFile{Reader: w}, + expectedErrno: wasip1.ErrnoSuccess, + out: 128, // past in + resultNevents: 512, // past out + expectedMem: []byte{ + // Clock is acknowledged first. + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata + byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit + wasip1.EventTypeClock, 0x0, 0x0, 0x0, // 4 bytes for type enum + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, + + // Then an illegal file with custom user data. + 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, // userdata + byte(wasip1.ErrnoBadf), 0x0, // errno is 16 bit + wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, + + // Stdin pipes are delayed to invoke sysfs.poll + // thus, they are written back last. + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata + byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit + wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, + + '?', // stopped after encoding + }, + expectedLog: ` +==> wasi_snapshot_preview1.poll_oneoff(in=0,out=128,nsubscriptions=3) +<== (nevents=3,errno=ESUCCESS) `, }, } @@ -420,7 +468,6 @@ func Test_pollOneoff_Zero(t *testing.T) { concat( clockNsSub(20*1000*1000), fdReadSub, - singleton('?'), ), ) @@ -460,7 +507,6 @@ func Test_pollOneoff_Zero(t *testing.T) { concat( clockNsSub(20*1000*1000), fdReadSub, - singleton('?'), ), ) @@ -491,10 +537,6 @@ func Test_pollOneoff_Zero(t *testing.T) { require.Equal(t, uint32(1), nevents) } -func singleton(b byte) []byte { - return []byte{b} -} - func concat(bytes ...[]byte) []byte { var res []byte for i := range bytes { @@ -522,9 +564,26 @@ func fdReadSubFd(fd byte) []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, fd, 0x0, 0x0, 0x0, // valid readable FD + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, // pad to 32 bytes } } +func fdReadSubFdWithUserData(fd byte, userdata []byte) []byte { + return concat( + userdata, + []byte{ + wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + fd, 0x0, 0x0, 0x0, // valid readable FD + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, // pad to 32 bytes + }) +} + // subscription for an EventTypeFdRead on stdin var fdReadSub = fdReadSubFd(byte(sys.FdStdin)) diff --git a/internal/sysfs/sock_test.go b/internal/sysfs/sock_test.go index e2505925835..bdcb721e970 100644 --- a/internal/sysfs/sock_test.go +++ b/internal/sysfs/sock_test.go @@ -103,3 +103,30 @@ func TestTcpConnFile_Stat(t *testing.T) { _, errno := file.Stat() require.Zero(t, errno, "Stat should not fail") } + +func TestTcpConnFile_SetNonblock(t *testing.T) { + listen, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer listen.Close() + + lf := newTCPListenerFile(listen.(*net.TCPListener)) + + tcpAddr, err := net.ResolveTCPAddr("tcp", listen.Addr().String()) + require.NoError(t, err) + tcp, err := net.DialTCP("tcp", nil, tcpAddr) + require.NoError(t, err) + defer tcp.Close() //nolint + + errno := lf.SetNonblock(true) + require.EqualErrno(t, 0, errno) + require.True(t, lf.IsNonblock()) + + conn, errno := lf.Accept() + require.EqualErrno(t, 0, errno) + defer conn.Close() + + file := newTcpConn(tcp) + errno = file.SetNonblock(true) + require.EqualErrno(t, 0, errno) + require.True(t, file.IsNonblock()) +} diff --git a/internal/sysfs/sock_unix.go b/internal/sysfs/sock_unix.go index ef679520917..86db1ef051c 100644 --- a/internal/sysfs/sock_unix.go +++ b/internal/sysfs/sock_unix.go @@ -41,8 +41,9 @@ var _ socketapi.TCPSock = (*tcpListenerFile)(nil) type tcpListenerFile struct { baseSockFile - fd uintptr - addr *net.TCPAddr + fd uintptr + addr *net.TCPAddr + nonblock bool } // Accept implements the same method as documented on socketapi.TCPSock @@ -57,9 +58,14 @@ func (f *tcpListenerFile) Accept() (socketapi.TCPConn, sys.Errno) { // SetNonblock implements the same method as documented on sys.File func (f *tcpListenerFile) SetNonblock(enabled bool) sys.Errno { + f.nonblock = enabled return sys.UnwrapOSError(setNonblock(f.fd, enabled)) } +func (f *tcpListenerFile) IsNonblock() bool { + return f.nonblock +} + // Close implements the same method as documented on sys.File func (f *tcpListenerFile) Close() sys.Errno { return sys.UnwrapOSError(syscall.Close(int(f.fd))) @@ -75,7 +81,8 @@ var _ socketapi.TCPConn = (*tcpConnFile)(nil) type tcpConnFile struct { baseSockFile - fd uintptr + fd uintptr + nonblock bool // closed is true when closed was called. This ensures proper sys.EBADF closed bool @@ -91,9 +98,15 @@ func newTcpConn(tc *net.TCPConn) socketapi.TCPConn { // SetNonblock implements the same method as documented on sys.File func (f *tcpConnFile) SetNonblock(enabled bool) (errno sys.Errno) { + f.nonblock = enabled return sys.UnwrapOSError(setNonblock(f.fd, enabled)) } +// IsNonblock implements the same method as documented on sys.File +func (f *tcpConnFile) IsNonblock() bool { + return f.nonblock +} + // Read implements the same method as documented on sys.File func (f *tcpConnFile) Read(buf []byte) (n int, errno sys.Errno) { n, err := syscall.Read(int(f.fd), buf) diff --git a/internal/sysfs/sock_windows.go b/internal/sysfs/sock_windows.go index 325d739f928..acde78aaba5 100644 --- a/internal/sysfs/sock_windows.go +++ b/internal/sysfs/sock_windows.go @@ -90,6 +90,16 @@ func syscallConnControl(conn syscall.Conn, fn func(fd uintptr) (int, sys.Errno)) return } +func _pollSock(conn syscall.Conn, flag sys.Pflag, timeoutMillis int32) (bool, sys.Errno) { + if flag != sys.POLLIN { + return false, sys.ENOTSUP + } + n, errno := syscallConnControl(conn, func(fd uintptr) (int, sys.Errno) { + return _poll([]pollFd{newPollFd(fd, _POLLIN, 0)}, timeoutMillis) + }) + return n > 0, errno +} + // newTCPListenerFile is a constructor for a socketapi.TCPSock. // // Note: currently the Windows implementation of socketapi.TCPSock @@ -99,9 +109,7 @@ func syscallConnControl(conn syscall.Conn, fn func(fd uintptr) (int, sys.Errno)) // standard library, instead of invoke syscalls/Win32 APIs // because they are sensibly different from Unix's. func newTCPListenerFile(tl *net.TCPListener) socketapi.TCPSock { - w := &winTcpListenerFile{tl: tl} - _ = w.SetNonblock(true) - return w + return &winTcpListenerFile{tl: tl} } var _ socketapi.TCPSock = (*winTcpListenerFile)(nil) @@ -116,14 +124,11 @@ type winTcpListenerFile struct { // Accept implements the same method as documented on socketapi.TCPSock func (f *winTcpListenerFile) Accept() (socketapi.TCPConn, sys.Errno) { - // Ensure we have an incoming connection using winsock_select. - n, errno := syscallConnControl(f.tl, func(fd uintptr) (int, sys.Errno) { - return _poll([]pollFd{newPollFd(fd, _POLLIN, 0)}, 0) - }) - - // Otherwise return immediately. - if n == 0 || errno != 0 { - return nil, sys.EAGAIN + // Ensure we have an incoming connection using winsock_select, otherwise return immediately. + if f.nonblock { + if ready, errno := _pollSock(f.tl, sys.POLLIN, 0); !ready || errno != 0 { + return nil, sys.EAGAIN + } } // Accept normally blocks goroutines, but we