runtime/syncproc: Eliminate spinning, and blocking.
authorMatt Birkholz <matt@birchwood-abbey.net>
Tue, 1 Aug 2017 22:21:01 +0000 (15:21 -0700)
committerMatt Birkholz <matt@birchwood-abbey.net>
Tue, 1 Aug 2017 22:21:01 +0000 (15:21 -0700)
The run-shell-command spins when it is copying both stdin and stdout.
E.g.

    (call-with-input-string "Lorem ipsum dolor sit amet\n"
      (lambda (in)
(run-shell-command "sleep 10; cat" 'input in)))

will keep your machine busy for 10 seconds.

When it is not spinning, the procedure blocks for large bufferfuls.
During the evaluation of

    (run-shell-command
     "i=0; while [ $i -lt 5 ]; do echo $i; i=$[$i + 1]; sleep 1; done"
     'redisplay-hook flush-output-port)

you will not see 5 lines of output, one each second, but all 5 lines
at once after 5 seconds, despite the redisplay hook [Linux 4.10.0
glibc 2.24].

This new copying process eliminates the blocking AND the spinning.  It
keeps stdout in nonblocking mode and uses suspend-current-thread to
block.  It handles short writes too.  The ports sourcing/sinking
stdin/stdout are required to block.

src/runtime/runtime.pkg
src/runtime/syncproc.scm

index c5bee3e2bf95a06c27ca82700934b2b1b5721a27..0d2a7249f0dd848d1dd16de19f46bd0650181259 100644 (file)
@@ -4159,6 +4159,8 @@ USA.
     ((load) "syncproc")
     (else))
   (parent (runtime))
+  (import (runtime primitive-io)
+         channel-descriptor-for-select)
   (export ()
          condition-type:subprocess-abnormal-termination
          condition-type:subprocess-signalled
index 366adf4d30430ceda3111fe76b63fde18106af70..379853220fd24fb1aa2d2391365d43c1978ff30d 100644 (file)
@@ -83,14 +83,12 @@ USA.
        (set! process (start-subprocess program arguments directory context)))
      (lambda ()
        (let loop ()
-        (receive (status reason) (synchronous-process-wait process context)
+        (receive (status reason) (synchronous-subprocess-wait process context)
           (case status
             ((EXITED) reason)
             ((SIGNALLED) (error:subprocess-signalled process reason))
             ;++ Give a restart to continue the process and loop?
             ((STOPPED) (error:subprocess-stopped process reason))
-            ;++ Should happen, but there are races that make it happen.
-            ((RUNNING) (loop))
             (else
              (error "Invalid synchronous subprocess status:" status))))))
      (lambda ()
@@ -145,113 +143,171 @@ USA.
                       '(SUBPROCESS REASON)
                       standard-error-handler))
 
-(define (synchronous-process-wait process context)
-  ;; Initialize the subprocess I/O.
-  (let ((port (subprocess-i/o-port process))
-       (line-ending (subprocess-context/line-ending context)))
+(define (synchronous-subprocess-wait process context)
+  (setup-synchronous-io process context)
+  (let ((input (subprocess-context/input context))
+       (output (subprocess-context/output context)))
+    (let ((copy-input (input-copier input process context))
+         (copy-output (output-copier output process context))
+         (thread (current-thread))
+         (input-descriptor (and output (input-channel-descriptor process)))
+         (output-descriptor (and input (output-channel-descriptor process)))
+         ;; These can be any of reading, writing, blocked-read,
+         ;; blocked-write or closed.
+         (input-state 'reading)
+         (output-state 'reading))
+      (let wait ()
+       (let ((event? #f)
+             (input-registration)
+             (output-registration)
+             (status-registration))
+
+         (define (event! mode/status)
+           mode/status
+           (set! event? #t))
+
+         (let copy ()
+           (set! input-state (copy-input))
+           (set! output-state (copy-output))
+           (let ((oport (subprocess-output-port process)))
+             (if (and (eq? input-state 'end)
+                      (output-port-open? oport))
+                 (close-output-port oport)))
+           (if (or (memq input-state '(reading writing))
+                   (memq output-state '(reading writing)))
+               (copy)))
+
+         (dynamic-wind
+          (lambda ()
+            (set! input-registration
+                  (and (eq? output-state 'blocked-read)
+                       (register-io-thread-event
+                        input-descriptor 'read thread event!)))
+            (set! output-registration
+                  (and (eq? input-state 'blocked-write)
+                       (register-io-thread-event
+                        output-descriptor 'write thread event!)))
+            (set! status-registration
+                  (register-subprocess-event process 'running
+                                             thread event!)))
+          (lambda ()
+            (with-thread-events-blocked
+             (lambda ()
+               (if (not event?)
+                   (suspend-current-thread)))))
+          (lambda ()
+            (if (eq? (current-thread) thread)
+                (begin
+                  (if input-registration
+                      (deregister-io-thread-event input-registration))
+                  (if output-registration
+                      (deregister-io-thread-event output-registration))
+                  (deregister-subprocess-event status-registration))))))
+
+       (if (eq? 'running (subprocess-status process))
+           (wait)
+           (let ((in (and output (subprocess-input-port process))))
+             (if (and in (input-port-open? in))
+                 (begin
+                   (set-input-port-blocking-mode! in 'blocking)
+                   (let drain ()
+                     (set! output-state (copy-output))
+                     (cond ((eq? output-state 'reading)
+                            (drain))
+                           ((not (eq? output-state 'end))
+                            (error "could not drain subprocess output"))))))
+             (subprocess-delete process)
+             (values (subprocess-status process)
+                     (subprocess-exit-reason process))))))))
+
+(define (setup-synchronous-io process context)
+  (let ((line-ending (subprocess-context/line-ending context))
+       (input (subprocess-context/input context))
+       (output (subprocess-context/output context)))
     (if line-ending
-       (port/set-line-ending port line-ending)))
-  (let ((redisplay-hook (subprocess-context/redisplay-hook context)))
-    (call-with-input-copier process
-                           (subprocess-context/input context)
-                           (subprocess-context/output context)
-                           (subprocess-context/input-buffer-size context)
-      (lambda (copy-input)
-       (call-with-output-copier process
-                                (subprocess-context/output context)
-                                (subprocess-context/input context)
-                                (subprocess-context/output-buffer-size
-                                 context)
-         (lambda (copy-output)
-           (if copy-input
-               (if copy-output
-                   (begin
-                     (if redisplay-hook (redisplay-hook))
-                     (let loop ()
-                       (copy-input)
-                       (let ((n (copy-output)))
-                         (cond ((not n)
-                                (loop))
-                               ((fix:> n 0)
-                                (if redisplay-hook (redisplay-hook))
-                                (loop))))))
-                   (do ()
-                       ((let ((n (copy-input)))
-                          (and n
-                               (not (fix:> n 0)))))))
-               (if copy-output
-                   (begin
-                     (if redisplay-hook (redisplay-hook))
-                     (do ()
-                         ((= (or (copy-output) 0) 0))
-                       (if redisplay-hook (redisplay-hook)))))))))))
-  (subprocess-wait process)
-  (subprocess-delete process)
-  (values (subprocess-status process)
-         (subprocess-exit-reason process)))
-\f
-(define (call-with-input-copier process process-input nonblock? bsize receiver)
-  (let ((port (subprocess-output-port process)))
-    (let ((output-port/close (textual-port-operation port 'CLOSE-OUTPUT)))
-      (if process-input
-         (handle-broken-pipe process
-           (lambda ()
-             (if nonblock?
-                 (set-output-port-blocking-mode! port 'nonblocking))
-             (receiver
-              (let ((buffer (make-string bsize)))
-                (lambda ()
-                  (with-input-port-blocking-mode process-input 'BLOCKING
-                    (lambda ()
-                      (let ((n (input-port/read-string! process-input buffer)))
-                        (if n
-                            (if (fix:> n 0)
-                                (output-port/write-substring port buffer 0 n)
-                                (output-port/close port)))
-                        n))))))))
-         (begin
-           (output-port/close port)
-           (receiver #f))))))
+       (port/set-line-ending (subprocess-i/o-port process) line-ending))
+    (set-input-port-blocking-mode! (subprocess-input-port process)
+                                  'nonblocking)
+    (set-output-port-blocking-mode! (subprocess-output-port process)
+                                   'nonblocking)
+    (if (and input (eq? (input-port-blocking-mode input) 'nonblocking))
+       (error "subprocess input is non-blocking:" input))
+    (if (and output (eq? (output-port-blocking-mode output) 'nonblocking))
+       (error "subprocess output is non-blocking:" output))
+    (if (not input)
+       (let ((port (subprocess-output-port process)))
+         (if (output-port-open? port)
+             (close-output-port port))))))
+
+(define (input-channel-descriptor process)
+  (channel-descriptor-for-select (subprocess-input-channel process)))
+
+(define (output-channel-descriptor process)
+  (channel-descriptor-for-select (subprocess-output-channel process)))
+
+(define (input-copier input process context)
+  (if input
+      (copier input (subprocess-output-port process) #t
+             (subprocess-context/input-buffer-size context)
+             #f)
+      (named-lambda (null-input-copier)
+       'end)))
 
-(define (handle-broken-pipe process thunk)
-  (call-with-current-continuation
-   (lambda (continuation)
-     (bind-condition-handler (list condition-type:system-call-error)
-        (lambda (condition)
-          (if (and (eq? 'WRITE (system-call-name condition))
-                   (eq? 'BROKEN-PIPE (system-call-error condition)))
-              (continuation (subprocess-wait process))))
-       thunk))))
+(define (output-copier output process context)
+  (copier (subprocess-input-port process) output #f
+         (subprocess-context/output-buffer-size context)
+         (subprocess-context/redisplay-hook context)))
 
-(define system-call-name
-  (condition-accessor condition-type:system-call-error 'SYSTEM-CALL))
+(define (copier in out flush? buffer-size hook)
+  (let ((buffer (make-string buffer-size))
+       (start 0)
+       (end 0))
+    (named-lambda (synchronous-copy)
 
-(define system-call-error
-  (condition-accessor condition-type:system-call-error 'ERROR-TYPE))
+      (define (fill)
+       (assert (and (fix:= start 0) (fix:= end 0)))
+       (if (and out (input-port-open? in))
+           (let ((n (read-string! buffer in)))
+             (cond ((not n)
+                    'blocked-read)
+                   ((fix:= n 0)
+                    (set! end -1)
+                    'end)
+                   ((fix:> n 0)
+                    (set! end n)
+                    'reading)
+                   (else
+                    (error "bogus read:" n))))
+           'closed))
 
-(define (call-with-output-copier process process-output nonblock? bsize
-                                receiver)
-  (let ((port (subprocess-input-port process)))
-    (let ((input-port/open? (textual-port-operation port 'INPUT-OPEN?))
-         (input-port/close (textual-port-operation port 'CLOSE-INPUT)))
-      (if process-output
-         (let ((buffer (make-string bsize)))
-           (let ((copy-output
-                  (lambda ()
-                    (let ((n (input-port/read-string! port buffer)))
-                      (if (and n (fix:> n 0))
-                          (with-output-port-blocking-mode process-output
-                                                          'BLOCKING
-                            (lambda ()
-                              (output-port/write-substring process-output
-                                                           buffer 0 n))))
-                      n))))
-             (if nonblock? (set-input-port-blocking-mode! port 'NONBLOCKING))
-             (let ((status (receiver copy-output)))
-               (if (and nonblock? (input-port/open? port))
-                   (begin
-                     (set-input-port-blocking-mode! port 'BLOCKING)
-                     (do () ((not (fix:> (copy-output) 0))))
-                     (input-port/close port)))
-               status)))
-         (receiver #f)))))
\ No newline at end of file
+      (if (fix:= -1 end)
+         'end
+         (if (fix:< start end)
+             ;; Flush buffer.
+             (if (not out)
+                 (begin                ;discard
+                   (set! start 0)
+                   (set! end 0)
+                   (fill))
+                 (if (output-port-open? out)
+                     (let ((n (output-port/write-substring
+                               out buffer start end)))
+                       (if flush?
+                           (flush-output-port out))
+                       (cond ((not n)
+                              'blocked-write)
+                             ((fix:> n 0)
+                              (if hook (hook))
+                              (let ((start* (fix:+ start n)))
+                                (if (fix:= start* end)
+                                    (begin
+                                      (set! start 0)
+                                      (set! end 0)
+                                      (fill))
+                                    (begin
+                                      (set! start start*)
+                                      'writing))))
+                             (else
+                              (error "bogus write:" n))))
+                     'closed))
+             (fill))))))
\ No newline at end of file