(set! process (start-subprocess program arguments directory context)))
(lambda ()
(let loop ()
- (receive (status reason) (synchronous-process-wait process context)
+ (receive (status reason) (synchronous-subprocess-wait process context)
(case status
((EXITED) reason)
((SIGNALLED) (error:subprocess-signalled process reason))
;++ Give a restart to continue the process and loop?
((STOPPED) (error:subprocess-stopped process reason))
- ;++ Should happen, but there are races that make it happen.
- ((RUNNING) (loop))
(else
(error "Invalid synchronous subprocess status:" status))))))
(lambda ()
'(SUBPROCESS REASON)
standard-error-handler))
-(define (synchronous-process-wait process context)
- ;; Initialize the subprocess I/O.
- (let ((port (subprocess-i/o-port process))
- (line-ending (subprocess-context/line-ending context)))
+(define (synchronous-subprocess-wait process context)
+ (setup-synchronous-io process context)
+ (let ((input (subprocess-context/input context))
+ (output (subprocess-context/output context)))
+ (let ((copy-input (input-copier input process context))
+ (copy-output (output-copier output process context))
+ (thread (current-thread))
+ (input-descriptor (and output (input-channel-descriptor process)))
+ (output-descriptor (and input (output-channel-descriptor process)))
+ ;; These can be any of reading, writing, blocked-read,
+ ;; blocked-write or closed.
+ (input-state 'reading)
+ (output-state 'reading))
+ (let wait ()
+ (let ((event? #f)
+ (input-registration)
+ (output-registration)
+ (status-registration))
+
+ (define (event! mode/status)
+ mode/status
+ (set! event? #t))
+
+ (let copy ()
+ (set! input-state (copy-input))
+ (set! output-state (copy-output))
+ (let ((oport (subprocess-output-port process)))
+ (if (and (eq? input-state 'end)
+ (output-port-open? oport))
+ (close-output-port oport)))
+ (if (or (memq input-state '(reading writing))
+ (memq output-state '(reading writing)))
+ (copy)))
+
+ (dynamic-wind
+ (lambda ()
+ (set! input-registration
+ (and (eq? output-state 'blocked-read)
+ (register-io-thread-event
+ input-descriptor 'read thread event!)))
+ (set! output-registration
+ (and (eq? input-state 'blocked-write)
+ (register-io-thread-event
+ output-descriptor 'write thread event!)))
+ (set! status-registration
+ (register-subprocess-event process 'running
+ thread event!)))
+ (lambda ()
+ (with-thread-events-blocked
+ (lambda ()
+ (if (not event?)
+ (suspend-current-thread)))))
+ (lambda ()
+ (if (eq? (current-thread) thread)
+ (begin
+ (if input-registration
+ (deregister-io-thread-event input-registration))
+ (if output-registration
+ (deregister-io-thread-event output-registration))
+ (deregister-subprocess-event status-registration))))))
+
+ (if (eq? 'running (subprocess-status process))
+ (wait)
+ (let ((in (and output (subprocess-input-port process))))
+ (if (and in (input-port-open? in))
+ (begin
+ (set-input-port-blocking-mode! in 'blocking)
+ (let drain ()
+ (set! output-state (copy-output))
+ (cond ((eq? output-state 'reading)
+ (drain))
+ ((not (eq? output-state 'end))
+ (error "could not drain subprocess output"))))))
+ (subprocess-delete process)
+ (values (subprocess-status process)
+ (subprocess-exit-reason process))))))))
+
+(define (setup-synchronous-io process context)
+ (let ((line-ending (subprocess-context/line-ending context))
+ (input (subprocess-context/input context))
+ (output (subprocess-context/output context)))
(if line-ending
- (port/set-line-ending port line-ending)))
- (let ((redisplay-hook (subprocess-context/redisplay-hook context)))
- (call-with-input-copier process
- (subprocess-context/input context)
- (subprocess-context/output context)
- (subprocess-context/input-buffer-size context)
- (lambda (copy-input)
- (call-with-output-copier process
- (subprocess-context/output context)
- (subprocess-context/input context)
- (subprocess-context/output-buffer-size
- context)
- (lambda (copy-output)
- (if copy-input
- (if copy-output
- (begin
- (if redisplay-hook (redisplay-hook))
- (let loop ()
- (copy-input)
- (let ((n (copy-output)))
- (cond ((not n)
- (loop))
- ((fix:> n 0)
- (if redisplay-hook (redisplay-hook))
- (loop))))))
- (do ()
- ((let ((n (copy-input)))
- (and n
- (not (fix:> n 0)))))))
- (if copy-output
- (begin
- (if redisplay-hook (redisplay-hook))
- (do ()
- ((= (or (copy-output) 0) 0))
- (if redisplay-hook (redisplay-hook)))))))))))
- (subprocess-wait process)
- (subprocess-delete process)
- (values (subprocess-status process)
- (subprocess-exit-reason process)))
-\f
-(define (call-with-input-copier process process-input nonblock? bsize receiver)
- (let ((port (subprocess-output-port process)))
- (let ((output-port/close (textual-port-operation port 'CLOSE-OUTPUT)))
- (if process-input
- (handle-broken-pipe process
- (lambda ()
- (if nonblock?
- (set-output-port-blocking-mode! port 'nonblocking))
- (receiver
- (let ((buffer (make-string bsize)))
- (lambda ()
- (with-input-port-blocking-mode process-input 'BLOCKING
- (lambda ()
- (let ((n (input-port/read-string! process-input buffer)))
- (if n
- (if (fix:> n 0)
- (output-port/write-substring port buffer 0 n)
- (output-port/close port)))
- n))))))))
- (begin
- (output-port/close port)
- (receiver #f))))))
+ (port/set-line-ending (subprocess-i/o-port process) line-ending))
+ (set-input-port-blocking-mode! (subprocess-input-port process)
+ 'nonblocking)
+ (set-output-port-blocking-mode! (subprocess-output-port process)
+ 'nonblocking)
+ (if (and input (eq? (input-port-blocking-mode input) 'nonblocking))
+ (error "subprocess input is non-blocking:" input))
+ (if (and output (eq? (output-port-blocking-mode output) 'nonblocking))
+ (error "subprocess output is non-blocking:" output))
+ (if (not input)
+ (let ((port (subprocess-output-port process)))
+ (if (output-port-open? port)
+ (close-output-port port))))))
+
+(define (input-channel-descriptor process)
+ (channel-descriptor-for-select (subprocess-input-channel process)))
+
+(define (output-channel-descriptor process)
+ (channel-descriptor-for-select (subprocess-output-channel process)))
+
+(define (input-copier input process context)
+ (if input
+ (copier input (subprocess-output-port process) #t
+ (subprocess-context/input-buffer-size context)
+ #f)
+ (named-lambda (null-input-copier)
+ 'end)))
-(define (handle-broken-pipe process thunk)
- (call-with-current-continuation
- (lambda (continuation)
- (bind-condition-handler (list condition-type:system-call-error)
- (lambda (condition)
- (if (and (eq? 'WRITE (system-call-name condition))
- (eq? 'BROKEN-PIPE (system-call-error condition)))
- (continuation (subprocess-wait process))))
- thunk))))
+(define (output-copier output process context)
+ (copier (subprocess-input-port process) output #f
+ (subprocess-context/output-buffer-size context)
+ (subprocess-context/redisplay-hook context)))
-(define system-call-name
- (condition-accessor condition-type:system-call-error 'SYSTEM-CALL))
+(define (copier in out flush? buffer-size hook)
+ (let ((buffer (make-string buffer-size))
+ (start 0)
+ (end 0))
+ (named-lambda (synchronous-copy)
-(define system-call-error
- (condition-accessor condition-type:system-call-error 'ERROR-TYPE))
+ (define (fill)
+ (assert (and (fix:= start 0) (fix:= end 0)))
+ (if (and out (input-port-open? in))
+ (let ((n (read-string! buffer in)))
+ (cond ((not n)
+ 'blocked-read)
+ ((fix:= n 0)
+ (set! end -1)
+ 'end)
+ ((fix:> n 0)
+ (set! end n)
+ 'reading)
+ (else
+ (error "bogus read:" n))))
+ 'closed))
-(define (call-with-output-copier process process-output nonblock? bsize
- receiver)
- (let ((port (subprocess-input-port process)))
- (let ((input-port/open? (textual-port-operation port 'INPUT-OPEN?))
- (input-port/close (textual-port-operation port 'CLOSE-INPUT)))
- (if process-output
- (let ((buffer (make-string bsize)))
- (let ((copy-output
- (lambda ()
- (let ((n (input-port/read-string! port buffer)))
- (if (and n (fix:> n 0))
- (with-output-port-blocking-mode process-output
- 'BLOCKING
- (lambda ()
- (output-port/write-substring process-output
- buffer 0 n))))
- n))))
- (if nonblock? (set-input-port-blocking-mode! port 'NONBLOCKING))
- (let ((status (receiver copy-output)))
- (if (and nonblock? (input-port/open? port))
- (begin
- (set-input-port-blocking-mode! port 'BLOCKING)
- (do () ((not (fix:> (copy-output) 0))))
- (input-port/close port)))
- status)))
- (receiver #f)))))
\ No newline at end of file
+ (if (fix:= -1 end)
+ 'end
+ (if (fix:< start end)
+ ;; Flush buffer.
+ (if (not out)
+ (begin ;discard
+ (set! start 0)
+ (set! end 0)
+ (fill))
+ (if (output-port-open? out)
+ (let ((n (output-port/write-substring
+ out buffer start end)))
+ (if flush?
+ (flush-output-port out))
+ (cond ((not n)
+ 'blocked-write)
+ ((fix:> n 0)
+ (if hook (hook))
+ (let ((start* (fix:+ start n)))
+ (if (fix:= start* end)
+ (begin
+ (set! start 0)
+ (set! end 0)
+ (fill))
+ (begin
+ (set! start start*)
+ 'writing))))
+ (else
+ (error "bogus write:" n))))
+ 'closed))
+ (fill))))))
\ No newline at end of file