This patch is required by gnunet-scheme. It has been submitted upstream at <https://github.com/wingo/fibers/pull/50>, but there has not been any response so far. * gnu/packages/patches/guile-fibers-wait-for-io-readiness.patch: New file. * gnu/packages/guile-xyz.scm (guile-fibers-1.1)[source]{patches}: Add it. * gnu/local.mk (dist_patch_DATA): Add it. Signed-off-by: Ludovic Courtès <ludo@gnu.org>
		
			
				
	
	
		
			346 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Diff
		
	
	
	
	
	
			
		
		
	
	
			346 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Diff
		
	
	
	
	
	
| Scheme-GNUnet requires the new operations 'wait-until-port-readable-operation'
 | |
| and 'wait-until-port-readable-operation' for communicating with services.
 | |
| This patch has been previously submitted at <https://github.com/wingo/fibers/pull/50>,
 | |
| on Sep 16, 2021.  As of Feb 3, 2022, upstream has not responded yet.
 | |
| 
 | |
| diff --git a/Makefile.am b/Makefile.am
 | |
| index e2db57e..0134255 100644
 | |
| --- a/Makefile.am
 | |
| +++ b/Makefile.am
 | |
| @@ -33,6 +33,7 @@ SOURCES = \
 | |
|  	fibers/deque.scm \
 | |
|  	fibers/epoll.scm \
 | |
|  	fibers/interrupts.scm \
 | |
| +	fibers/io-wakeup.scm \
 | |
|  	fibers/nameset.scm \
 | |
|  	fibers/operations.scm \
 | |
|  	fibers/posix-clocks.scm \
 | |
| @@ -67,6 +68,7 @@ TESTS = \
 | |
|  	tests/conditions.scm \
 | |
|  	tests/channels.scm \
 | |
|  	tests/foreign.scm \
 | |
| +	tests/io-wakeup.scm \
 | |
|  	tests/parameters.scm \
 | |
|  	tests/preemption.scm \
 | |
|  	tests/speedup.scm
 | |
| diff --git a/fibers.texi b/fibers.texi
 | |
| index 52f7177..0990c8f 100644
 | |
| --- a/fibers.texi
 | |
| +++ b/fibers.texi
 | |
| @@ -12,6 +12,7 @@ This manual is for Fibers (version @value{VERSION}, updated
 | |
|  @value{UPDATED})
 | |
|  
 | |
|  Copyright 2016-2022 Andy Wingo
 | |
| +Copyright 2021 Maxime Devos
 | |
|  
 | |
|  @quotation
 | |
|  @c For more information, see COPYING.docs in the fibers
 | |
| @@ -453,6 +454,7 @@ of operations for channels and timers, and an internals interface.
 | |
|  * Channels::             Share memory by communicating.
 | |
|  * Timers::               Operations on time.
 | |
|  * Conditions::           Waiting for simple state changes.
 | |
| +* Port Readiness::       Waiting until a port is ready for I/O.
 | |
|  * REPL Commands::        Experimenting with Fibers at the console.
 | |
|  * Schedulers and Tasks:: Fibers are built from lower-level primitives.
 | |
|  @end menu
 | |
| @@ -722,6 +724,28 @@ signalled.  Equivalent to @code{(perform-operation (wait-operation
 | |
|  cvar))}.
 | |
|  @end defun
 | |
|  
 | |
| +@node Port Readiness
 | |
| +@section Port Readiness
 | |
| +
 | |
| +These two operations can be used on file ports to wait until
 | |
| +they are readable or writable.  Spurious wake-ups are possible.
 | |
| +This is complementary to Guile's suspendable ports.
 | |
| +
 | |
| +@example
 | |
| +(use-modules (fibers io-wakeup))
 | |
| +@end example
 | |
| +
 | |
| +@defun wait-until-port-readable-operation port
 | |
| +Make an operation that will succeed with no values when the input
 | |
| +port @var{port} becomes readable.  For passive sockets, this operation
 | |
| +succeeds when a connection becomes available.
 | |
| +@end defun
 | |
| +
 | |
| +@defun wait-until-port-writable-operation
 | |
| +Make an operation that will succeed with no values when the output
 | |
| +port @var{port} becomes writable.
 | |
| +@end defun
 | |
| +
 | |
|  @node REPL Commands
 | |
|  @section REPL Commands
 | |
|  
 | |
| diff --git a/fibers/io-wakeup.scm b/fibers/io-wakeup.scm
 | |
| new file mode 100644
 | |
| index 0000000..5df03f1
 | |
| --- /dev/null
 | |
| +++ b/fibers/io-wakeup.scm
 | |
| @@ -0,0 +1,93 @@
 | |
| +;; Fibers: cooperative, event-driven user-space threads.
 | |
| +
 | |
| +;;;; Copyright (C) 2016,2021 Free Software Foundation, Inc.
 | |
| +;;;; Copyright (C) 2021 Maxime Devos
 | |
| +;;;;
 | |
| +;;;; This library is free software; you can redistribute it and/or
 | |
| +;;;; modify it under the terms of the GNU Lesser General Public
 | |
| +;;;; License as published by the Free Software Foundation; either
 | |
| +;;;; version 3 of the License, or (at your option) any later version.
 | |
| +;;;;
 | |
| +;;;; This library is distributed in the hope that it will be useful,
 | |
| +;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| +;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 | |
| +;;;; Lesser General Public License for more details.
 | |
| +;;;;
 | |
| +;;;; You should have received a copy of the GNU Lesser General Public
 | |
| +;;;; License along with this library; if not, write to the Free Software
 | |
| +;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 | |
| +;;;;
 | |
| +
 | |
| +(define-module (fibers io-wakeup)
 | |
| +  #:use-module (fibers scheduler)
 | |
| +  #:use-module (fibers operations)
 | |
| +  #:use-module (ice-9 atomic)
 | |
| +  #:use-module (ice-9 match)
 | |
| +  #:use-module (ice-9 threads)
 | |
| +  #:use-module (ice-9 ports internal)
 | |
| +  #:export (wait-until-port-readable-operation
 | |
| +	    wait-until-port-writable-operation))
 | |
| +
 | |
| +(define *poll-sched* (make-atomic-box #f))
 | |
| +
 | |
| +(define (poll-sched)
 | |
| +  (or (atomic-box-ref *poll-sched*)
 | |
| +      (let ((sched (make-scheduler)))
 | |
| +        (cond
 | |
| +         ((atomic-box-compare-and-swap! *poll-sched* #f sched))
 | |
| +         (else
 | |
| +          ;; FIXME: Would be nice to clean up this thread at some point.
 | |
| +          (call-with-new-thread
 | |
| +           (lambda ()
 | |
| +             (define (finished?) #f)
 | |
| +             (run-scheduler sched finished?)))
 | |
| +          sched)))))
 | |
| +
 | |
| +;; These procedure are subject to spurious wakeups.
 | |
| +
 | |
| +(define (readable? port)
 | |
| +  "Test if PORT is writable."
 | |
| +  (match (select (vector port) #() #() 0)
 | |
| +    ((#() #() #()) #f)
 | |
| +    ((#(_) #() #()) #t)))
 | |
| +
 | |
| +(define (writable? port)
 | |
| +  "Test if PORT is writable."
 | |
| +  (match (select #() (vector port) #() 0)
 | |
| +    ((#() #() #()) #f)
 | |
| +    ((#() #(_) #()) #t)))
 | |
| +
 | |
| +(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure)
 | |
| +  (make-base-operation #f
 | |
| +                       (lambda _
 | |
| +                         (and (ready? port) values))
 | |
| +                       (lambda (flag sched resume)
 | |
| +                         (define (commit)
 | |
| +                           (match (atomic-box-compare-and-swap! flag 'W 'S)
 | |
| +                             ('W (resume values))
 | |
| +                             ('C (commit))
 | |
| +                             ('S #f)))
 | |
| +                         (if sched
 | |
| +                             (schedule-when-ready
 | |
| +                              sched (port-ready-fd port) commit)
 | |
| +                             (schedule-task
 | |
| +                              (poll-sched)
 | |
| +                              (lambda ()
 | |
| +                                (perform-operation (this-procedure port))
 | |
| +                                (commit)))))))
 | |
| +
 | |
| +(define (wait-until-port-readable-operation port)
 | |
| +  "Make an operation that will succeed when PORT is readable."
 | |
| +  (unless (input-port? port)
 | |
| +    (error "refusing to wait forever for input on non-input port"))
 | |
| +  (make-wait-operation readable? schedule-task-when-fd-readable port
 | |
| +                       port-read-wait-fd
 | |
| +                       wait-until-port-readable-operation))
 | |
| +
 | |
| +(define (wait-until-port-writable-operation port)
 | |
| +  "Make an operation that will succeed when PORT is writable."
 | |
| +  (unless (output-port? port)
 | |
| +    (error "refusing to wait forever for output on non-output port"))
 | |
| +  (make-wait-operation writable? schedule-task-when-fd-writable port
 | |
| +                       port-write-wait-fd
 | |
| +                       wait-until-port-writable-operation))
 | |
| diff --git a/tests/io-wakeup.scm b/tests/io-wakeup.scm
 | |
| new file mode 100644
 | |
| index 0000000..c14fa81
 | |
| --- /dev/null
 | |
| +++ b/tests/io-wakeup.scm
 | |
| @@ -0,0 +1,167 @@
 | |
| +;; Fibers: cooperative, event-driven user-space threads.
 | |
| +
 | |
| +;;;; Copyright (C) 2016 Free Software Foundation, Inc.
 | |
| +;;;; Copyright (C) 2021 Maxime Devos
 | |
| +;;;;
 | |
| +;;;; This library is free software; you can redistribute it and/or
 | |
| +;;;; modify it under the terms of the GNU Lesser General Public
 | |
| +;;;; License as published by the Free Software Foundation; either
 | |
| +;;;; version 3 of the License, or (at your option) any later version.
 | |
| +;;;;
 | |
| +;;;; This library is distributed in the hope that it will be useful,
 | |
| +;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| +;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 | |
| +;;;; Lesser General Public License for more details.
 | |
| +;;;;
 | |
| +;;;; You should have received a copy of the GNU Lesser General Public
 | |
| +;;;; License along with this library; if not, write to the Free Software
 | |
| +;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 | |
| +;;;;
 | |
| +
 | |
| +(define-module (tests io-wakeup)
 | |
| +  #:use-module (rnrs bytevectors)
 | |
| +  #:use-module (ice-9 control)
 | |
| +  #:use-module (ice-9 suspendable-ports)
 | |
| +  #:use-module (ice-9 binary-ports)
 | |
| +  #:use-module (fibers)
 | |
| +  #:use-module (fibers io-wakeup)
 | |
| +  #:use-module (fibers operations)
 | |
| +  #:use-module (fibers timers))
 | |
| +
 | |
| +(define failed? #f)
 | |
| +
 | |
| +(define-syntax-rule (assert-equal expected actual)
 | |
| +  (let ((x expected))
 | |
| +    (format #t "assert ~s equal to ~s: " 'actual x)
 | |
| +    (force-output)
 | |
| +    (let ((y actual))
 | |
| +      (cond
 | |
| +       ((equal? x y) (format #t "ok\n"))
 | |
| +       (else
 | |
| +        (format #t "no (got ~s)\n" y)
 | |
| +        (set! failed? #t))))))
 | |
| +
 | |
| +(define-syntax-rule (assert-run-fibers-terminates exp)
 | |
| +  (begin
 | |
| +    (format #t "assert run-fibers on ~s terminates: " 'exp)
 | |
| +    (force-output)
 | |
| +    (let ((start (get-internal-real-time)))
 | |
| +      (call-with-values (lambda () (run-fibers (lambda () exp)))
 | |
| +        (lambda vals
 | |
| +          (format #t "ok (~a s)\n" (/ (- (get-internal-real-time) start)
 | |
| +                                      1.0 internal-time-units-per-second))
 | |
| +          (apply values vals))))))
 | |
| +
 | |
| +(define-syntax-rule (assert-run-fibers-returns (expected ...) exp)
 | |
| +  (begin
 | |
| +    (call-with-values (lambda () (assert-run-fibers-terminates exp))
 | |
| +      (lambda run-fiber-return-vals
 | |
| +        (assert-equal '(expected ...) run-fiber-return-vals)))))
 | |
| +
 | |
| +
 | |
| +;; Note that theoretically, on very slow systems, SECONDS might need
 | |
| +;; to be increased.  However, readable/timeout? and writable/timeout?
 | |
| +;; call this 5 times in a loop anyways, so the effective timeout is
 | |
| +;; a fourth of a second, which should be plenty in practice.
 | |
| +(define* (with-timeout op #:key (seconds 0.05) (wrap values))
 | |
| +  (choice-operation op
 | |
| +                    (wrap-operation (sleep-operation seconds) wrap)))
 | |
| +
 | |
| +(define* (readable/timeout? port #:key (allowed-spurious 5))
 | |
| +  "Does waiting for readability time-out?
 | |
| +Allow @var{allowed-spurious} spurious wakeups."
 | |
| +  (or (perform-operation
 | |
| +	(with-timeout
 | |
| +	 (wrap-operation (wait-until-port-readable-operation port)
 | |
| +			 (lambda () #f))
 | |
| +	 #:wrap (lambda () #t)))
 | |
| +      (and (> allowed-spurious 0)
 | |
| +	   (readable/timeout? port #:allowed-spurious
 | |
| +			      (- allowed-spurious 1)))))
 | |
| +
 | |
| +(define* (writable/timeout? port #:key (allowed-spurious 5))
 | |
| +  "Does waiting for writability time-out?
 | |
| +Allow @var{allowed-spurious} spurious wakeups."
 | |
| +  (or (perform-operation
 | |
| +       (with-timeout
 | |
| +	(wrap-operation (wait-until-port-writable-operation port)
 | |
| +			(lambda () #f))
 | |
| +	#:wrap (lambda () #t)))
 | |
| +      (and (> allowed-spurious 0)
 | |
| +	   (writable/timeout? port #:allowed-spurious
 | |
| +			      (- allowed-spurious 1)))))
 | |
| +
 | |
| +;; Tests:
 | |
| +;;  * wait-until-port-readable-operaton / wait-until-port-writable-operation
 | |
| +;;    blocks if the port isn't ready for input / output.
 | |
| +;;
 | |
| +;;    This is tested with a pipe (read & write)
 | |
| +;;    and a listening socket (read, or accept in this case).
 | |
| +;;
 | |
| +;;    Due to the possibility of spurious wakeups,
 | |
| +;;    a limited few spurious wakeups are tolerated.
 | |
| +;;
 | |
| +;;  * these operations succeed if the port is ready for input / output.
 | |
| +;;
 | |
| +;;    These are again tested with a pipe and a listening socket
 | |
| +;;
 | |
| +;; Blocking is detected with a small time-out.
 | |
| +
 | |
| +(define (make-listening-socket)
 | |
| +  (let ((server (socket PF_INET SOCK_DGRAM 0)))
 | |
| +    (bind server AF_INET INADDR_LOOPBACK 0)
 | |
| +    server))
 | |
| +
 | |
| +(let ((s (make-listening-socket)))
 | |
| +  (assert-run-fibers-returns (#t)
 | |
| +			     (readable/timeout? s))
 | |
| +  (assert-equal #t (readable/timeout? s))
 | |
| +  (close s))
 | |
| +
 | |
| +(define (set-nonblocking! sock)
 | |
| +  (let ((flags (fcntl sock F_GETFL)))
 | |
| +    (fcntl sock F_SETFL (logior O_NONBLOCK flags))))
 | |
| +
 | |
| +(define-syntax-rule (with-pipes (A B) exp exp* ...)
 | |
| +  (let* ((pipes (pipe))
 | |
| +	 (A (car pipes))
 | |
| +	 (B (cdr pipes)))
 | |
| +    exp exp* ...
 | |
| +    (close A)
 | |
| +    (close B)))
 | |
| +
 | |
| +(with-pipes (A B)
 | |
| +  (setvbuf A 'none)
 | |
| +  (setvbuf B 'none)
 | |
| +  (assert-run-fibers-returns (#t)
 | |
| +			     (readable/timeout? A))
 | |
| +  (assert-equal #t (readable/timeout? A))
 | |
| +
 | |
| +  ;; The buffer is empty, so writability is expected.
 | |
| +  (assert-run-fibers-returns (#f)
 | |
| +			     (writable/timeout? B))
 | |
| +  (assert-equal #f (writable/timeout? B))
 | |
| +
 | |
| +  ;; Fill the buffer
 | |
| +  (set-nonblocking! B)
 | |
| +  (let ((bv (make-bytevector 1024)))
 | |
| +    (let/ec k
 | |
| +      (parameterize ((current-write-waiter k))
 | |
| +	(let loop ()
 | |
| +	  (put-bytevector B bv)
 | |
| +	  (loop)))))
 | |
| +
 | |
| +  ;; As the buffer is full, writable/timeout? should return
 | |
| +  ;; #t.
 | |
| +  (assert-run-fibers-returns (#t)
 | |
| +			     (writable/timeout? B))
 | |
| +  ;; There's plenty to read now, so readable/timeout? should
 | |
| +  ;; return #f.
 | |
| +  (assert-run-fibers-returns (#f)
 | |
| +			     (readable/timeout? A)))
 | |
| +
 | |
| +(exit (if failed? 1 0))
 | |
| +
 | |
| +;; Local Variables:
 | |
| +;; eval: (put 'with-pipes 'scheme-indent-function 1)
 | |
| +;; End:
 |