From ad146c13bc836ddfb763e19eb78a1ced5c620973 Mon Sep 17 00:00:00 2001 From: Matt Birkholz Date: Sat, 16 Sep 2017 15:06:17 -0700 Subject: [PATCH] runtime/syncproc: Eliminate spinning, and blocking. The run-shell-command spins when it is copying both stdin and stdout. E.g. (call-with-input-string "Lorem ipsum dolor sit amet\n" (lambda (in) (run-shell-command "sleep 10; cat" 'input in))) will keep your machine busy for 10 seconds. When it is not spinning, the procedure blocks for large bufferfuls. During the evaluation of (run-shell-command "i=0; while [ $i -lt 5 ]; do echo $i; i=$[$i + 1]; sleep 1; done" 'redisplay-hook flush-output-port) you will not see 5 lines of output, one each second, but all 5 lines at once after 5 seconds, despite the redisplay hook [Linux 4.10.0 glibc 2.24]. This new copying process eliminates the blocking AND the spinning. It keeps stdout in nonblocking mode and uses suspend-current-thread to block. It handles short writes too. The ports sourcing/sinking stdin/stdout are required to block. --- src/runtime/runtime.pkg | 2 + src/runtime/syncproc.scm | 272 +++++++++++++++++++++++---------------- 2 files changed, 166 insertions(+), 108 deletions(-) diff --git a/src/runtime/runtime.pkg b/src/runtime/runtime.pkg index 90299f3bc..dec7ecb79 100644 --- a/src/runtime/runtime.pkg +++ b/src/runtime/runtime.pkg @@ -4174,6 +4174,8 @@ USA. ((load) "syncproc") (else)) (parent (runtime)) + (import (runtime primitive-io) + channel-descriptor-for-select) (export () condition-type:subprocess-abnormal-termination condition-type:subprocess-signalled diff --git a/src/runtime/syncproc.scm b/src/runtime/syncproc.scm index 366adf4d3..379853220 100644 --- a/src/runtime/syncproc.scm +++ b/src/runtime/syncproc.scm @@ -83,14 +83,12 @@ USA. (set! process (start-subprocess program arguments directory context))) (lambda () (let loop () - (receive (status reason) (synchronous-process-wait process context) + (receive (status reason) (synchronous-subprocess-wait process context) (case status ((EXITED) reason) ((SIGNALLED) (error:subprocess-signalled process reason)) ;++ Give a restart to continue the process and loop? ((STOPPED) (error:subprocess-stopped process reason)) - ;++ Should happen, but there are races that make it happen. - ((RUNNING) (loop)) (else (error "Invalid synchronous subprocess status:" status)))))) (lambda () @@ -145,113 +143,171 @@ USA. '(SUBPROCESS REASON) standard-error-handler)) -(define (synchronous-process-wait process context) - ;; Initialize the subprocess I/O. - (let ((port (subprocess-i/o-port process)) - (line-ending (subprocess-context/line-ending context))) +(define (synchronous-subprocess-wait process context) + (setup-synchronous-io process context) + (let ((input (subprocess-context/input context)) + (output (subprocess-context/output context))) + (let ((copy-input (input-copier input process context)) + (copy-output (output-copier output process context)) + (thread (current-thread)) + (input-descriptor (and output (input-channel-descriptor process))) + (output-descriptor (and input (output-channel-descriptor process))) + ;; These can be any of reading, writing, blocked-read, + ;; blocked-write or closed. + (input-state 'reading) + (output-state 'reading)) + (let wait () + (let ((event? #f) + (input-registration) + (output-registration) + (status-registration)) + + (define (event! mode/status) + mode/status + (set! event? #t)) + + (let copy () + (set! input-state (copy-input)) + (set! output-state (copy-output)) + (let ((oport (subprocess-output-port process))) + (if (and (eq? input-state 'end) + (output-port-open? oport)) + (close-output-port oport))) + (if (or (memq input-state '(reading writing)) + (memq output-state '(reading writing))) + (copy))) + + (dynamic-wind + (lambda () + (set! input-registration + (and (eq? output-state 'blocked-read) + (register-io-thread-event + input-descriptor 'read thread event!))) + (set! output-registration + (and (eq? input-state 'blocked-write) + (register-io-thread-event + output-descriptor 'write thread event!))) + (set! status-registration + (register-subprocess-event process 'running + thread event!))) + (lambda () + (with-thread-events-blocked + (lambda () + (if (not event?) + (suspend-current-thread))))) + (lambda () + (if (eq? (current-thread) thread) + (begin + (if input-registration + (deregister-io-thread-event input-registration)) + (if output-registration + (deregister-io-thread-event output-registration)) + (deregister-subprocess-event status-registration)))))) + + (if (eq? 'running (subprocess-status process)) + (wait) + (let ((in (and output (subprocess-input-port process)))) + (if (and in (input-port-open? in)) + (begin + (set-input-port-blocking-mode! in 'blocking) + (let drain () + (set! output-state (copy-output)) + (cond ((eq? output-state 'reading) + (drain)) + ((not (eq? output-state 'end)) + (error "could not drain subprocess output")))))) + (subprocess-delete process) + (values (subprocess-status process) + (subprocess-exit-reason process)))))))) + +(define (setup-synchronous-io process context) + (let ((line-ending (subprocess-context/line-ending context)) + (input (subprocess-context/input context)) + (output (subprocess-context/output context))) (if line-ending - (port/set-line-ending port line-ending))) - (let ((redisplay-hook (subprocess-context/redisplay-hook context))) - (call-with-input-copier process - (subprocess-context/input context) - (subprocess-context/output context) - (subprocess-context/input-buffer-size context) - (lambda (copy-input) - (call-with-output-copier process - (subprocess-context/output context) - (subprocess-context/input context) - (subprocess-context/output-buffer-size - context) - (lambda (copy-output) - (if copy-input - (if copy-output - (begin - (if redisplay-hook (redisplay-hook)) - (let loop () - (copy-input) - (let ((n (copy-output))) - (cond ((not n) - (loop)) - ((fix:> n 0) - (if redisplay-hook (redisplay-hook)) - (loop)))))) - (do () - ((let ((n (copy-input))) - (and n - (not (fix:> n 0))))))) - (if copy-output - (begin - (if redisplay-hook (redisplay-hook)) - (do () - ((= (or (copy-output) 0) 0)) - (if redisplay-hook (redisplay-hook))))))))))) - (subprocess-wait process) - (subprocess-delete process) - (values (subprocess-status process) - (subprocess-exit-reason process))) - -(define (call-with-input-copier process process-input nonblock? bsize receiver) - (let ((port (subprocess-output-port process))) - (let ((output-port/close (textual-port-operation port 'CLOSE-OUTPUT))) - (if process-input - (handle-broken-pipe process - (lambda () - (if nonblock? - (set-output-port-blocking-mode! port 'nonblocking)) - (receiver - (let ((buffer (make-string bsize))) - (lambda () - (with-input-port-blocking-mode process-input 'BLOCKING - (lambda () - (let ((n (input-port/read-string! process-input buffer))) - (if n - (if (fix:> n 0) - (output-port/write-substring port buffer 0 n) - (output-port/close port))) - n)))))))) - (begin - (output-port/close port) - (receiver #f)))))) + (port/set-line-ending (subprocess-i/o-port process) line-ending)) + (set-input-port-blocking-mode! (subprocess-input-port process) + 'nonblocking) + (set-output-port-blocking-mode! (subprocess-output-port process) + 'nonblocking) + (if (and input (eq? (input-port-blocking-mode input) 'nonblocking)) + (error "subprocess input is non-blocking:" input)) + (if (and output (eq? (output-port-blocking-mode output) 'nonblocking)) + (error "subprocess output is non-blocking:" output)) + (if (not input) + (let ((port (subprocess-output-port process))) + (if (output-port-open? port) + (close-output-port port)))))) + +(define (input-channel-descriptor process) + (channel-descriptor-for-select (subprocess-input-channel process))) + +(define (output-channel-descriptor process) + (channel-descriptor-for-select (subprocess-output-channel process))) + +(define (input-copier input process context) + (if input + (copier input (subprocess-output-port process) #t + (subprocess-context/input-buffer-size context) + #f) + (named-lambda (null-input-copier) + 'end))) -(define (handle-broken-pipe process thunk) - (call-with-current-continuation - (lambda (continuation) - (bind-condition-handler (list condition-type:system-call-error) - (lambda (condition) - (if (and (eq? 'WRITE (system-call-name condition)) - (eq? 'BROKEN-PIPE (system-call-error condition))) - (continuation (subprocess-wait process)))) - thunk)))) +(define (output-copier output process context) + (copier (subprocess-input-port process) output #f + (subprocess-context/output-buffer-size context) + (subprocess-context/redisplay-hook context))) -(define system-call-name - (condition-accessor condition-type:system-call-error 'SYSTEM-CALL)) +(define (copier in out flush? buffer-size hook) + (let ((buffer (make-string buffer-size)) + (start 0) + (end 0)) + (named-lambda (synchronous-copy) -(define system-call-error - (condition-accessor condition-type:system-call-error 'ERROR-TYPE)) + (define (fill) + (assert (and (fix:= start 0) (fix:= end 0))) + (if (and out (input-port-open? in)) + (let ((n (read-string! buffer in))) + (cond ((not n) + 'blocked-read) + ((fix:= n 0) + (set! end -1) + 'end) + ((fix:> n 0) + (set! end n) + 'reading) + (else + (error "bogus read:" n)))) + 'closed)) -(define (call-with-output-copier process process-output nonblock? bsize - receiver) - (let ((port (subprocess-input-port process))) - (let ((input-port/open? (textual-port-operation port 'INPUT-OPEN?)) - (input-port/close (textual-port-operation port 'CLOSE-INPUT))) - (if process-output - (let ((buffer (make-string bsize))) - (let ((copy-output - (lambda () - (let ((n (input-port/read-string! port buffer))) - (if (and n (fix:> n 0)) - (with-output-port-blocking-mode process-output - 'BLOCKING - (lambda () - (output-port/write-substring process-output - buffer 0 n)))) - n)))) - (if nonblock? (set-input-port-blocking-mode! port 'NONBLOCKING)) - (let ((status (receiver copy-output))) - (if (and nonblock? (input-port/open? port)) - (begin - (set-input-port-blocking-mode! port 'BLOCKING) - (do () ((not (fix:> (copy-output) 0)))) - (input-port/close port))) - status))) - (receiver #f))))) \ No newline at end of file + (if (fix:= -1 end) + 'end + (if (fix:< start end) + ;; Flush buffer. + (if (not out) + (begin ;discard + (set! start 0) + (set! end 0) + (fill)) + (if (output-port-open? out) + (let ((n (output-port/write-substring + out buffer start end))) + (if flush? + (flush-output-port out)) + (cond ((not n) + 'blocked-write) + ((fix:> n 0) + (if hook (hook)) + (let ((start* (fix:+ start n))) + (if (fix:= start* end) + (begin + (set! start 0) + (set! end 0) + (fill)) + (begin + (set! start start*) + 'writing)))) + (else + (error "bogus write:" n)))) + 'closed)) + (fill)))))) \ No newline at end of file -- 2.25.1