(without-interrupts
(lambda ()
(if (channel-open? channel)
- (remove-from-gc-finalizer! open-channels channel)))))
+ (begin
+ (%deregister-io-descriptor (channel-descriptor-for-select channel))
+ (remove-from-gc-finalizer! open-channels channel))))))
(define-integrable (channel-open? channel)
(if (channel-descriptor channel) #t #f))
\f
(define (channel-read channel buffer start end)
(let loop ()
- (let ((n (with-thread-events-blocked
+ (let ((n (without-interrupts
(lambda ()
- (%channel-read channel buffer start end)))))
+ (if (channel-closed? channel)
+ 0
+ (%channel-read channel buffer start end))))))
(if (eq? n #t)
(begin
(handle-subprocess-status-change)
- (if (channel-closed? channel)
- 0
- (loop)))
+ (if (channel-blocking? channel)
+ (loop)
+ #f))
n))))
(define (%channel-read channel buffer start end)
- ;; Returns 0 (eof) or a fixnum (the number of octets written into
- ;; BUFFER). May also return #f if the channel is not blocking and
- ;; there are no octets to read. May also return #t if the operation
- ;; was un-blocked by a thread-event, e.g. subprocess status change.
(let ((do-read
(lambda ()
((ucode-primitive channel-read 4)
end))))
(declare (integrate-operator do-read))
(if (and have-select? (not (channel-type=file? channel)))
- (let ((do-test
- (lambda (k)
- (let ((result (test-for-io-on-channel channel 'READ)))
- (case result
- ((READ HANGUP ERROR) (do-read))
- ((PROCESS-STATUS-CHANGE INTERRUPT) #t)
- (else (k)))))))
- (if (channel-blocking? channel)
- (let loop () (do-test loop))
- (do-test (lambda () #f))))
+ (let ((result (test-for-io-on-channel channel 'READ)))
+ (case result
+ ((READ HANGUP ERROR) (do-read))
+ ((#F) 0)
+ ((PROCESS-STATUS-CHANGE INTERRUPT) #t)
+ (else (error "Unexpected test-for-io-on-channel value:" result))))
(do-read))))
(define (channel-write channel buffer start end)
+ (let loop ()
+ (let ((n (without-interrupts
+ (lambda ()
+ (if (channel-closed? channel)
+ 0
+ (%channel-write channel buffer start end))))))
+ (if (eq? n #t)
+ (begin
+ (handle-subprocess-status-change)
+ (if (channel-blocking? channel)
+ (loop)
+ #f))
+ n))))
+
+(define (%channel-write channel buffer start end)
(let ((do-write
(lambda ()
((ucode-primitive channel-write 4)
end))))
(declare (integrate-operator do-write))
(if (and have-select? (not (channel-type=file? channel)))
- (with-thread-events-blocked
- (lambda ()
- (let ((do-test
- (lambda (k)
- (let ((result (test-for-io-on-channel channel 'WRITE)))
- (case result
- ((WRITE HANGUP ERROR) (do-write))
- ((PROCESS-STATUS-CHANGE)
- (handle-subprocess-status-change)
- (if (channel-closed? channel) 0 (k)))
- (else (k)))))))
- (if (channel-blocking? channel)
- (let loop () (do-test loop))
- (do-test (lambda () #f))))))
+ (let ((result (test-for-io-on-channel channel 'WRITE)))
+ (case result
+ ((WRITE HANGUP ERROR) (do-write))
+ ((#F) 0)
+ ((PROCESS-STATUS-CHANGE INTERRUPT) #t)
+ (else (error "Unexpected test-for-io-on-channel value:" result))))
(do-write))))
\f
(define (channel-read-block channel buffer start end)
(else
(loop (dentry/next dentry)))))
(%maybe-toggle-thread-timer))))
+
+(define (%deregister-io-descriptor descriptor)
+ (let dloop ((dentry io-registrations))
+ (cond ((not dentry)
+ unspecific)
+ ((eqv? descriptor (dentry/descriptor dentry))
+ (let tloop ((tentry (dentry/first-tentry dentry)))
+ (if tentry
+ (let ((thread (tentry/thread tentry))
+ (event (tentry/event tentry)))
+ (%signal-thread-event thread
+ (and event
+ (lambda () (event #f))))
+ (tloop (tentry/next tentry)))))
+ (remove-from-select-registry! io-registry
+ (dentry/descriptor dentry)
+ (dentry/mode dentry))
+ (let ((prev (dentry/prev dentry))
+ (next (dentry/next dentry)))
+ (if prev
+ (set-dentry/next! prev next)
+ (set! io-registrations next))
+ (if next
+ (set-dentry/prev! next prev)))
+ (dloop (dentry/next dentry)))
+ (else
+ (dloop (dentry/next dentry)))))
+ (%maybe-toggle-thread-timer))
\f
(define (%register-io-thread-event descriptor mode thread event permanent?
front?)