From 339824bfbb45546075b80dfb2fd27995d6f54954 Mon Sep 17 00:00:00 2001 From: Matt Birkholz Date: Sun, 29 Apr 2012 21:04:44 -0700 Subject: [PATCH] Fixed channel-open to unblock threads and update the io-registry... ...using a new procedure: %deregister-io-descriptor. Also tightened up channel-read and channel-write to check, within an atomic section, that the port has not been closed. If a closed channel is left in the io-registry, wait-for-io piles up error levels because test-select-registry returns an "illegal mode". --- src/runtime/io.scm | 71 +++++++++++++++++++++-------------------- src/runtime/runtime.pkg | 2 ++ src/runtime/thread.scm | 28 ++++++++++++++++ 3 files changed, 67 insertions(+), 34 deletions(-) diff --git a/src/runtime/io.scm b/src/runtime/io.scm index a3f97157d..d35c88d01 100644 --- a/src/runtime/io.scm +++ b/src/runtime/io.scm @@ -95,7 +95,9 @@ USA. (without-interrupts (lambda () (if (channel-open? channel) - (remove-from-gc-finalizer! open-channels channel))))) + (begin + (%deregister-io-descriptor (channel-descriptor-for-select channel)) + (remove-from-gc-finalizer! open-channels channel)))))) (define-integrable (channel-open? channel) (if (channel-descriptor channel) #t #f)) @@ -170,22 +172,20 @@ USA. (define (channel-read channel buffer start end) (let loop () - (let ((n (with-thread-events-blocked + (let ((n (without-interrupts (lambda () - (%channel-read channel buffer start end))))) + (if (channel-closed? channel) + 0 + (%channel-read channel buffer start end)))))) (if (eq? n #t) (begin (handle-subprocess-status-change) - (if (channel-closed? channel) - 0 - (loop))) + (if (channel-blocking? channel) + (loop) + #f)) n)))) (define (%channel-read channel buffer start end) - ;; Returns 0 (eof) or a fixnum (the number of octets written into - ;; BUFFER). May also return #f if the channel is not blocking and - ;; there are no octets to read. May also return #t if the operation - ;; was un-blocked by a thread-event, e.g. subprocess status change. (let ((do-read (lambda () ((ucode-primitive channel-read 4) @@ -197,19 +197,30 @@ USA. end)))) (declare (integrate-operator do-read)) (if (and have-select? (not (channel-type=file? channel))) - (let ((do-test - (lambda (k) - (let ((result (test-for-io-on-channel channel 'READ))) - (case result - ((READ HANGUP ERROR) (do-read)) - ((PROCESS-STATUS-CHANGE INTERRUPT) #t) - (else (k))))))) - (if (channel-blocking? channel) - (let loop () (do-test loop)) - (do-test (lambda () #f)))) + (let ((result (test-for-io-on-channel channel 'READ))) + (case result + ((READ HANGUP ERROR) (do-read)) + ((#F) 0) + ((PROCESS-STATUS-CHANGE INTERRUPT) #t) + (else (error "Unexpected test-for-io-on-channel value:" result)))) (do-read)))) (define (channel-write channel buffer start end) + (let loop () + (let ((n (without-interrupts + (lambda () + (if (channel-closed? channel) + 0 + (%channel-write channel buffer start end)))))) + (if (eq? n #t) + (begin + (handle-subprocess-status-change) + (if (channel-blocking? channel) + (loop) + #f)) + n)))) + +(define (%channel-write channel buffer start end) (let ((do-write (lambda () ((ucode-primitive channel-write 4) @@ -221,20 +232,12 @@ USA. end)))) (declare (integrate-operator do-write)) (if (and have-select? (not (channel-type=file? channel))) - (with-thread-events-blocked - (lambda () - (let ((do-test - (lambda (k) - (let ((result (test-for-io-on-channel channel 'WRITE))) - (case result - ((WRITE HANGUP ERROR) (do-write)) - ((PROCESS-STATUS-CHANGE) - (handle-subprocess-status-change) - (if (channel-closed? channel) 0 (k))) - (else (k))))))) - (if (channel-blocking? channel) - (let loop () (do-test loop)) - (do-test (lambda () #f)))))) + (let ((result (test-for-io-on-channel channel 'WRITE))) + (case result + ((WRITE HANGUP ERROR) (do-write)) + ((#F) 0) + ((PROCESS-STATUS-CHANGE INTERRUPT) #t) + (else (error "Unexpected test-for-io-on-channel value:" result)))) (do-write)))) (define (channel-read-block channel buffer start end) diff --git a/src/runtime/runtime.pkg b/src/runtime/runtime.pkg index 15198d565..c0e677c45 100644 --- a/src/runtime/runtime.pkg +++ b/src/runtime/runtime.pkg @@ -3258,6 +3258,8 @@ USA. make-select-registry remove-from-select-registry! test-select-registry) + (import (runtime thread) + %deregister-io-descriptor) (export (runtime directory) directory-channel/descriptor) (initialization (initialize-package!))) diff --git a/src/runtime/thread.scm b/src/runtime/thread.scm index 5013335e5..5dd6f48f5 100644 --- a/src/runtime/thread.scm +++ b/src/runtime/thread.scm @@ -578,6 +578,34 @@ USA. (else (loop (dentry/next dentry))))) (%maybe-toggle-thread-timer)))) + +(define (%deregister-io-descriptor descriptor) + (let dloop ((dentry io-registrations)) + (cond ((not dentry) + unspecific) + ((eqv? descriptor (dentry/descriptor dentry)) + (let tloop ((tentry (dentry/first-tentry dentry))) + (if tentry + (let ((thread (tentry/thread tentry)) + (event (tentry/event tentry))) + (%signal-thread-event thread + (and event + (lambda () (event #f)))) + (tloop (tentry/next tentry))))) + (remove-from-select-registry! io-registry + (dentry/descriptor dentry) + (dentry/mode dentry)) + (let ((prev (dentry/prev dentry)) + (next (dentry/next dentry))) + (if prev + (set-dentry/next! prev next) + (set! io-registrations next)) + (if next + (set-dentry/prev! next prev))) + (dloop (dentry/next dentry))) + (else + (dloop (dentry/next dentry))))) + (%maybe-toggle-thread-timer)) (define (%register-io-thread-event descriptor mode thread event permanent? front?) -- 2.25.1