Skip to content

await ports

Chris Vine edited this page Mar 15, 2022 · 27 revisions

The (a-sync await-ports) module provides procedures for using guile-2.2/3.0's suspendable ports with event loops, and a convenient way of using ports' normal input and output procedures with non-blocking ports.

For this purpose, the 'normal input and output' procedures which may be used with await-read-suspendable! and await-write-suspendable! cover most but not all i/o procedures. Thus, the following are safe to use with non-blocking suspendable ports: read-char, get-char, peek-char, lookahead-char, read-line, get-line, get-u8, lookahead-u8, get-bytevector-n, get-string-all, write-char, put-char, put-u8, put-string, put-bytevector, newline, force-output and flush-output-port. For sockets, the accept and connect procedures are also safe.

Some others are not at present safe to use with suspendable ports, including get-bytevector-all, get-string-n, get-string-n!, write and display. In addition, get-bytevector-n!, get-bytevector-some and get-bytevector-some! are not safe until guile version 2.2.5, and the read procedure is not safe until guile version 3.0.6.

Unfortunately this means that some of the procedures in guile's web module cannot be used with suspendable ports. build-uri, build-request and cognates are fine, as is write-request if no custom header writers are imported, but the read-response-body procedure is not because it invokes get-bytevector-all. This means that (unless streaming) http-get, http-put and so on are unsafe in suspendable code. In addition, guile-gnutls ports are not suspendable. (One answer where only a few gnutls sessions are to be run concurrently is to run each such session in a separate thread using await-task-in-thread!, await-task-in-thread-pool! or await-task-in-event-loop!, and to use synchronous guile-gnutls i/o in the session.)

Including this module will automatically enable suspendable ports. The uninstall-suspendable-ports! procedure should not subsequently be applied, or the procedures in this module (and in the (a-sync event-loop) module) will not work correctly. Any port using these procedures must be made non-blocking using fcntl as follows:

(fcntl *port* F_SETFL (logior O_NONBLOCK
                              (fcntl *port* F_GETFL)))

(await-read-suspendable! await resume [loop] port proc)

This procedure is provided to implement read file watches using guile-2.2/3.0's suspendable ports. 'proc' is a procedure taking a single argument, to which the port will be passed when it is invoked. The purpose of 'proc' is to carry out i/o operations on 'port' using the port's normal read procedures. 'port' must be a suspendable non-blocking port. This procedure will return when 'proc' returns, as if by blocking read operations. However, the event loop will not be blocked by this procedure even if only individual characters or bytes comprising part characters are available for reading at any one time. It is intended to be called within a waitable procedure invoked by a-sync (which supplies the 'await' and 'resume' arguments). 'proc' must not itself explicitly apply 'await' and 'resume' as those are potentially in use by the suspendable port while 'proc' is executing.

If an exceptional condition ('excpt) is encountered by the implementation, #f will be returned by this procedure and the read operations to be performed by 'proc' will be abandonned; there is however no guarantee that any exceptional condition that does arise will be encountered by the implementation - the user procedure 'proc' may get there first and deal with it, or it may not. However exceptional conditions are very rare, usually comprising only out-of-band data on a TCP socket, or a pseudoterminal master in packet mode seeing state change in a slave. In the absence of an exceptional condition, the value(s) returned by 'proc' will be returned. Prior to version 0.14, 'proc' could only return a single value. From version 0.14, 'proc' may return any number of values.

The 'loop' argument is optional: this procedure operates on the event loop passed in as an argument, or if none is passed (or #f is passed), on the default event loop.

This procedure must (like the a-sync procedure) be called in the same thread as that in which the event loop runs.

Exceptions (say, from 'proc' because of port or conversion errors) will propagate out of this procedure in the first instance, and if not caught locally will then propagate out of event-loop-run!.

Unlike the await-* procedures in the (a-sync event-loop) module, this procedure will not call 'await' if the read operation(s) in 'proc' can be effected immediately without waiting: instead, after reading this procedure would return straight away without invoking the event loop.

As an example of how to use await-read-suspendable!, here is the implementation of await-getline!:

(define await-getline!
  (case-lambda
    ((await resume port)
     (await-getline! await resume #f port))
    ((await resume loop port)
     (await-read-suspendable! await resume loop port
                              (lambda (p)
                                (read-line p))))))

(await-getline! await resume [loop] port)

This procedure is provided mainly to retain compatibility with the guile-a-sync library for guile-2.0, because it is trivial to implement with await-read-suspendable! (and is implemented by await-read-suspendable!).

It is intended to be called within a waitable procedure invoked by a-sync (which supplies the 'await' and 'resume' arguments), and reads a line of text from a non-blocking suspendable port and returns it (without the terminating '\n' character). The 'loop' argument is optional: this procedure operates on the event loop passed in as an argument, or if none is passed (or #f is passed), on the default event loop. If an exceptional condition ('excpt) is encountered by the implementation, #f will be returned by this procedure and the read operation will be abandonned. See the documentation on the await-read-suspendable! procedure for further particulars about this procedure.

Here is an example of the use of await-getline!:

(set-default-event-loop!) ;; if none has yet been set
(a-sync (lambda (await resume)
          (display "Enter a line of text at the keyboard\n")
          (let ((port (open "/dev/tty" O_RDONLY)))
            (fcntl port F_SETFL (logior O_NONBLOCK
                                (fcntl port F_GETFL)))
            (simple-format #t
                           "The line was: ~A\n"
                           (await-getline! await resume
                                           port)))))
(event-loop-run!)

(await-geteveryline! await resume [loop] port proc)

This procedure is provided mainly to retain compatibility with the guile-a-sync library for guile-2.0, because it is trivial to implement with await-read-suspendable! (and is implemented by await-read-suspendable!).

It is intended to be called within a waitable procedure invoked by a-sync (which supplies the 'await' and 'resume' arguments), and will apply 'proc' to every complete line of text received (without the terminating '\n' character). The watch will not end until end-of-file or an exceptional condition ('excpt) is reached. In the event of that happening, this procedure will end and return an end-of-file object or #f respectively. The 'loop' argument is optional: this procedure operates on the event loop passed in as an argument, or if none is passed (or #f is passed), on the default event loop.

When 'proc' executes, 'await' and 'resume' will still be in use by this procedure, so they may not be reused by 'proc' (even though 'proc' runs in the event loop thread).

See the documentation on the await-read-suspendable! procedure for further particulars about this procedure.

Here is an example of the use of await-geteveryline! (because the keyboard has no end-of-file, use Ctrl-C to exit this code snippet):

(set-default-event-loop!) ;; if none has yet been set
(a-sync (lambda (await resume)
          (display "Enter lines of text at the keyboard, ^C to finish\n")
          (let ((port (open "/dev/tty" O_RDONLY)))
            (fcntl port F_SETFL (logior O_NONBLOCK
                                (fcntl port F_GETFL)))
            (await-geteveryline! await resume
                                 port
                                 (lambda (line)
                                   (simple-format #t
                                                  "The line was: ~A\n"
                                                  line))))))
(event-loop-run!)

(await-getsomelines! await resume [loop] port proc)

This procedure is intended to be called within a waitable procedure invoked by a-sync (which supplies the 'await' and 'resume' arguments), and does the same as await-geteveryline!, except that it provides a second argument to 'proc', namely an escape continuation which can be invoked by 'proc' to cause the procedure to return before end-of-file is reached. Behavior is identical to await-geteveryline! if the continuation is not invoked.

This procedure will apply 'proc' to every complete line of text received (without the terminating '\n' character). The watch will not end until end-of-file or an exceptional condition ('excpt) is reached, which would cause this procedure to end and return an end-of-file object or #f respectively, or until the escape continuation is invoked, in which case the value passed to the escape continuation will be returned. The 'loop' argument is optional: this procedure operates on the event loop passed in as an argument, or if none is passed (or #f is passed), on the default event loop.

When 'proc' executes, 'await' and 'resume' will still be in use by this procedure, so they may not be reused by 'proc' (even though 'proc' runs in the event loop thread).

See the documentation on the await-read-suspendable! procedure for further particulars about this procedure.

Here is an example of the use of await-getsomelines!:

(set-default-event-loop!) ;; if none has yet been set
(a-sync (lambda (await resume)
          (display "Enter lines of text at the keyboard, enter an empty line to finish\n")
          (let ((port (open "/dev/tty" O_RDONLY)))
            (fcntl port F_SETFL (logior O_NONBLOCK
                                (fcntl port F_GETFL)))
            (await-getsomelines! await resume
                                 port
                                 (lambda (line k)
                                   (when (string=? line "")
                                         (k #f))
                                   (simple-format #t
                                                  "The line was: ~A\n"
                                                  line))))))
(event-loop-run!)

(await-getblock! await resume [loop] port size)

This procedure is provided mainly to retain compatibility with the guile-a-sync library for guile-2.0, because it is trivial to implement this kind of functionality with await-read-suspendable! (and is implemented by await-read-suspendable!).

It is intended to be called within a waitable procedure invoked by a-sync (which supplies the 'await' and 'resume' arguments), and reads a block of data, such as a binary record, of size 'size' from a non-blocking suspendable port 'port'. This procedure and will return a pair, normally comprising as its car a bytevector of length 'size' containing the data, and as its cdr the number of bytes received and placed in the bytevector (which will be the same as 'size' unless an end-of-file object was encountered part way through receiving the data). If an exceptional condition ('excpt) is encountered, a pair comprising (#f . #f) will be returned. If an end-of-file object is encountered without any bytes of data, a pair with eof-object as car and #f as cdr will be returned.

The 'loop' argument is optional: this procedure operates on the event loop passed in as an argument, or if none is passed (or #f is passed), on the default event loop.

See the documentation on the await-read-suspendable! procedure for further particulars about this procedure.

This procedure is first available in version 0.6 of this library.


(await-geteveryblock! await resume [loop] port size proc)

This procedure is provided mainly to retain compatibility with the guile-a-sync library for guile-2.0, because it is trivial to implement this kind of functionality with await-read-suspendable! (and is implemented by await-read-suspendable!).

It is intended to be called within a waitable procedure invoked by a-sync (which supplies the 'await' and 'resume' arguments), and will apply 'proc' to any block of data received, such as a binary record. 'proc' should be a procedure taking two arguments, first a bytevector of length 'size' containing the block of data read and second the size of the block of data placed in the bytevector. The value passed as the size of the block of data placed in the bytevector will always be the same as 'size' unless end-of-file has been encountered after receiving only a partial block of data. The watch will not end until end-of-file or an exceptional condition ('excpt) is reached. In the event of that happening, this procedure will end and return an end-of-file object or #f respectively.

For efficiency reasons, this procedure passes its internal bytevector buffer to 'proc' as proc's first argument and, when 'proc' returns, re-uses it. Therefore, if 'proc' stores its first argument for use after 'proc' has returned, it should store it by copying it.

The 'loop' argument is optional: this procedure operates on the event loop passed in as an argument, or if none is passed (or #f is passed), on the default event loop.

When 'proc' executes, 'await' and 'resume' will still be in use by this procedure, so they may not be reused by 'proc' (even though 'proc' runs in the event loop thread).

See the documentation on the await-read-suspendable! procedure for further particulars about this procedure.

This procedure is first available in version 0.6 of this library.


(await-getsomeblocks! await resume [loop] port size proc)

This procedure is intended to be called within a waitable procedure invoked by a-sync (which supplies the 'await' and 'resume' arguments), and does the same as await-geteveryblock!, except that it provides a third argument to 'proc', namely an escape continuation which can be invoked by 'proc' to cause the procedure to return before end-of-file is reached. Behavior is identical to await-geteveryblock! if the continuation is not invoked.

This procedure will apply 'proc' to any block of data received, such as a binary record. 'proc' should be a procedure taking three arguments, first a bytevector of length 'size' containing the block of data read, second the size of the block of data placed in the bytevector and third an escape continuation. The value passed as the size of the block of data placed in the bytevector will always be the same as 'size' unless end-of-file has been encountered after receiving only a partial block of data. The watch will not end until end-of-file or an exceptional condition ('excpt) is reached, which would cause this procedure to end and return an end-of-file object or #f respectively, or until the escape continuation is invoked, in which case the value passed to the escape continuation will be returned.

For efficiency reasons, this procedure passes its internal bytevector buffer to 'proc' as proc's first argument and, when 'proc' returns, re-uses it. Therefore, if 'proc' stores its first argument for use after 'proc' has returned, it should store it by copying it.

The 'loop' argument is optional: this procedure operates on the event loop passed in as an argument, or if none is passed (or #f is passed), on the default event loop.

When 'proc' executes, 'await' and 'resume' will still be in use by this procedure, so they may not be reused by 'proc' (even though 'proc' runs in the event loop thread).

See the documentation on the await-read-suspendable! procedure for further particulars about this procedure.

This procedure is first available in version 0.6 of this library.


(await-write-suspendable! await resume [loop] port proc)

This procedure is provided to implement write file watches using guile-2.2/3.0's suspendable ports. 'proc' is a procedure taking a single argument, to which the port will be passed when it is invoked. The purpose of 'proc' is to carry out i/o operations on 'port' using the port's normal write procedures. 'port' must be a suspendable non-blocking port. This procedure will return when 'proc' returns, as if by blocking write operations. However, the event loop will not be blocked by this procedure even if only individual characters or bytes comprising part characters can be written at any one time. It is intended to be called within a waitable procedure invoked by a-sync (which supplies the 'await' and 'resume' arguments). 'proc' must not itself explicitly apply 'await' and 'resume' as those are potentially in use by the suspendable port while 'proc' is executing.

If an exceptional condition ('excpt) is encountered by the implementation, #f will be returned by this procedure and the write operations to be performed by 'proc' will be abandonned; there is however no guarantee that any exceptional condition that does arise will be encountered by the implementation - the user procedure 'proc' may get there first and deal with it, or it may not. However exceptional conditions on write ports cannot normally occur. In the absence of an exceptional condition, the value(s) returned by 'proc' will be returned. Prior to version 0.14, 'proc' could only return a single value. From version 0.14, 'proc' may return any number of values.

The 'loop' argument is optional: this procedure operates on the event loop passed in as an argument, or if none is passed (or #f is passed), on the default event loop.

This procedure must (like the a-sync procedure) be called in the same thread as that in which the event loop runs.

Exceptions (say, from 'proc' because of port or conversion errors) will propagate out of this procedure in the first instance, and if not caught locally will then propagate out of event-loop-run!.

Unlike the await-* procedures in the (a-sync event-loop) module, this procedure will not call 'await' if the write operation(s) in 'proc' can be effected immediately without waiting: instead, after writing this procedure would return straight away without invoking the event loop.

As an example of how to use await-write-suspendable!, here is the implementation of await-put-string!:

(define await-put-string!
  (case-lambda
    ((await resume port text) (await-put-string! await resume #f port text))
    ((await resume loop port text)
     (await-write-suspendable! await resume loop port
                               (lambda (p)
                                 (put-string p text)
                                 ;; enforce a flush when the current
                                 ;; write-waiter is still in operation
                                 (force-output p)
                                 #t)))))

(await-put-bytevector! await resume [loop] port bv)

This procedure is provided mainly to retain compatibility with the guile-a-sync library for guile-2.0, because it is trivial to implement with await-write-suspendable! (and is implemented by await-write-suspendable!).

It is intended to be called within a waitable procedure invoked by a-sync (which supplies the 'await' and 'resume' arguments), and will write the contents of bytevector 'bv' to 'port'. The 'loop' argument is optional: this procedure operates on the event loop passed in as an argument, or if none is passed (or #f is passed), on the default event loop. If an exceptional condition ('excpt) is encountered by the implementation, #f will be returned by this procedure and the write operation will be abandonned, otherwise #t will be returned. However exceptional conditions on write ports cannot normally occur.

The port will be flushed by this procedure upon conclusion of the writing of the bytevector.

See the documentation on the await-write-suspendable! procedure for further particulars about this procedure.

This procedure is first available in version 0.6 of this library.

As mentioned in relation to the await-write-suspendable! procedure, write exceptions will propagate out of this procedure in the first instance, and if not caught locally (say by placing a catch block immediately around this procedure) will then propagate out of event-loop-run!. So one way of testing for EPIPE is as follows:

(set-default-event-loop!) ;; if none has yet been set
(a-sync (lambda (await resume)
          (catch 'system-error
                 (lambda ()
                   (await-put-bytevector! await resume port bv))
                 (lambda args
                   (if (= (system-error-errno args) EPIPE)
                       (begin
                         ... do something to cater for EPIPE ...)
                       (begin
                         ;; possibly rethrow the exception
                         (apply throw args)))))))
(event-loop-run!)

(await-put-string! await resume [loop] port text)

This procedure is provided mainly to retain compatibility with the guile-a-sync library for guile-2.0, because it is trivial to implement with await-write-suspendable! (and is implemented by await-write-suspendable!).

It is intended to be called within a waitable procedure invoked by a-sync (which supplies the 'await' and 'resume' arguments), and will write the string 'text' to 'port'. The 'loop' argument is optional: this procedure operates on the event loop passed in as an argument, or if none is passed (or #f is passed), on the default event loop. If an exceptional condition ('excpt) is encountered by the implementation, #f will be returned by this procedure and the write operation will be abandonned, otherwise #t will be returned. However exceptional conditions on write ports cannot normally occur.

The port will be flushed by this procedure upon conclusion of the writing of the string.

If CR-LF line endings are to be written when outputting the string, the '\r' character (as well as the '\n' character) must be embedded in the string.

See the documentation on the await-write-suspendable! procedure for further particulars about this procedure.

This procedure is first available in version 0.5 of this library.

As mentioned in relation to the await-write-suspendable! procedure, write exceptions will propagate out of this procedure in the first instance, and if not caught locally (say by placing a catch block immediately around this procedure) will then propagate out of event-loop-run!. So one way of testing for EPIPE is as follows:

(set-default-event-loop!) ;; if none has yet been set
(a-sync (lambda (await resume)
          (catch 'system-error
                 (lambda ()
                   (await-put-string! await resume port "test"))
                 (lambda args
                   (if (= (system-error-errno args) EPIPE)
                       (begin
                         ... do something to cater for EPIPE ...)
                       (begin
                         ;; possibly rethrow the exception
                         (apply throw args)))))))
(event-loop-run!)

An example of the use of this procedure can also be found in the example-socket.scm file in the docs directory.


(await-accept! await resume [loop] sock)

This procedure is provided mainly to retain compatibility with the guile-a-sync library for guile-2.0, because it is trivial to implement with await-read-suspendable! (and is implemented by await-read-suspendable!).

This procedure will start a watch on listening socket 'sock' for a connection. 'sock' must be a non-blocking socket port. This procedure wraps the guile 'accept' procedure and therefore returns a pair, comprising as car a connection socket, and as cdr a socket address object containing particulars of the address of the remote connection. The 'loop' argument is optional: this procedure operates on the event loop passed in as an argument, or if none is passed (or #f is passed), on the default event loop. This procedure is intended to be called within a waitable procedure invoked by a-sync (which supplies the 'await' and 'resume' arguments).

See the documentation on the await-read-suspendable! procedure for further particulars about this procedure.

This procedure is first available in version 0.7 of this library.


(await-connect! await resume [loop] sock . args)

This procedure is provided mainly to retain compatibility with the guile-a-sync library for guile-2.0, because it is trivial to implement with await-write-suspendable! (and is implemented by await-write-suspendable!).

This procedure will connect socket 'sock' to a remote host. Particulars of the remote host are given by 'args' which are the arguments (other than 'sock') taken by guile's 'connect' procedure, which this procedure wraps. 'sock' must be a non-blocking socket port. The 'loop' argument is optional: this procedure operates on the event loop passed in as an argument, or if none is passed (or #f is passed), on the default event loop. This procedure is intended to be called within a waitable procedure invoked by a-sync (which supplies the 'await' and 'resume' arguments).

There are cases where it will not be helpful to use this procedure. Where a connection request is immediately followed by a write to the remote server (say, a get request), the call to 'connect' and to 'put-string' can be combined in a single procedure passed to await-write-suspendable!.

See the documentation on the await-write-suspendable! procedure for further particulars about this procedure.

This procedure is first available in version 0.7 of this library.