From: Matt Birkholz Date: Fri, 13 Mar 2015 06:47:47 +0000 (-0700) Subject: smp: main loops must use subprocess status tick to avoid suspending. X-Git-Url: https://birchwood-abbey.net/git?a=commitdiff_plain;h=9208c1057f987eea5d2cf61c1ab0802e6f394309;p=mit-scheme.git smp: main loops must use subprocess status tick to avoid suspending. --- diff --git a/README.txt b/README.txt index 42c122ecc..4ec13ae6c 100644 --- a/README.txt +++ b/README.txt @@ -1636,3 +1636,68 @@ The hits with accompanying analysis: Caller: x-graphics/close-window Used the display-finalizer's mutex to serialize the callers. + + +* Main Loops + +The analysis of the subprocess-global-status-tick procedure suggested +that "How soon other threads observe the new tick actually makes +little difference." The reason is that the procedure cannot be used +for its intended purpose as exemplified by several application main +loops including Edwin's. Thus it is merely advisory: "At some point +recently, this was the value of the runtime's subprocess status tick." + +Subprocess-global-status-tick is used by main loops that are managing +subprocesses. They use it to quickly check whether any statuses have +changed since they last polled their subprocesses. Edwin's main loop +on all display types (x11, os2, win32 and tty) currently suspends in +block-on-io-descriptor which also wakes edwin-thread when any +subprocess status changes. + +Unfortunately in an SMPing world there is room for a race. The +edwin-thread needs to check its subprocesses' statuses and suspend, +atomically, but another processor can asynchronously receive a +SIGCHLD, collect the child's results, and tick the microcode's global +subprocess status sync tick BEFORE the edwin thread can suspend, +winning the race and beating Edwin, who has suspended without +servicing the subprocess status change. + +Without-interrupts was used to avoid the race, but without-interrupts +is not atomic in an SMPing world. All of these main loops need some +other way to suspend without missing ticks. If suspend-current-thread +knew when the loop last looked (the tick current at the START of the +loop), it could avoid suspending the thread if new changes had +occurred since then. + +The last-tick-I-saw argument comes from the main loop and becomes a +"subprocess-tick" parameter in test-for-io-on-descriptor, block-on-io- +descriptor and %suspend-current-thread. %Suspend-current-thread +checks the tick from the main loop against the runtime's current tick. +If it is up-to-date the thread is suspended and the processor looks +for other work. If there is no other work and no io-waiter, the +processor claims the title and blocks in test-select-registry. + +The test-select-registry primitive atomically tests and blocks using +pselect/ppoll, but another pthread might receive the SIGCHLD signal. +If the io-waiter does not get the SIGCHLD it will stay blocked in +test-select-registry. To avoid this, the SIGCHLD handler forwards the +signal to the io-waiter's pthread. It knows there is a blocked +io-waiter by consulting a new variable: smp_io_blocked. + +So the test-select-registry primitive must first mask SIGCHLD, grab +the process_table_mutex, and compare ticks. If the ticks do not +match, it releases the mutex and returns 'process-status-change. If +the ticks match, it sets smp_io_blocked, releases the mutex, and calls +pselect/ppoll. The pselect/ppoll unmasks SIGCHLD and blocks +atomically. If a SIGCHLD arrived after SIGCHLD was first masked, it +will be delivered then, unblocking the pselect/ppoll. + +If a processor gets a SIGCHLD, its handler must grab the process_ +table_mutex, collect the child's status, and either increment the tick +or forward the SIGCHLD to the smp_io_blocked, then release the mutex. +This could happen after smp_io_blocked released the mutex and before +it blocked, but the forwarded SIGCHLD will be unmasked by +pselect/ppoll. This could also happen after smp_io_blocked has +unblocked and before it has re-acquired the mutex to clear the +variable, but no harm is done. The primitive still returns INTERRUPT +and will be called again. diff --git a/src/edwin/os2term.scm b/src/edwin/os2term.scm index a8f6aae8b..69c378f6a 100644 --- a/src/edwin/os2term.scm +++ b/src/edwin/os2term.scm @@ -719,22 +719,18 @@ USA. (define (read-event-1 block?) (or (os2win-get-event event-descriptor #f) (let loop () - (let ((interrupt-mask (set-interrupt-enables! interrupt-mask/gc-ok))) + (let ((tick (subprocess-global-status-tick))) (cond (inferior-thread-changes? - (set-interrupt-enables! interrupt-mask) event:inferior-thread-output) ((process-output-available?) - (set-interrupt-enables! interrupt-mask) event:process-output) ((process-status-changes?) - (set-interrupt-enables! interrupt-mask) event:process-status) (else (let ((flag (test-for-io-on-descriptor event-descriptor block? - 'READ))) - (set-interrupt-enables! interrupt-mask) + 'READ tick))) (case flag ((#F) #f) ((PROCESS-STATUS-CHANGE) event:process-status) diff --git a/src/edwin/tterm.scm b/src/edwin/tterm.scm index 84651d1c1..e2542d934 100644 --- a/src/edwin/tterm.scm +++ b/src/edwin/tterm.scm @@ -222,11 +222,12 @@ USA. (find (cdr key-pairs) possible-pending?)))))))))) (read-more? ; -> #F or #T if some octets were read - (named-lambda (read-more? block?) + (named-lambda (read-more? block? subprocess-tick) (if block? (channel-blocking channel) (channel-nonblocking channel)) - (let ((n (%channel-read channel buffer end input-buffer-size))) + (let ((n (%channel-read channel buffer end input-buffer-size + subprocess-tick))) (cond ((not n) #F) ((eq? n #T) #F) ((fix:> n 0) @@ -239,35 +240,28 @@ USA. (named-lambda (match-event block?) (let loop () (or (begin - (read-more? #f) + (read-more? #f #f) (match-key)) - ;; Atomically poll async event sources and block. - (let ((mask (set-interrupt-enables! interrupt-mask/gc-ok))) + ;; Poll async event sources and block. + (let ((tick (subprocess-global-status-tick))) (cond (inferior-thread-changes? - (set-interrupt-enables! mask) (or (->update-event (accept-thread-output)) (loop))) ((process-output-available?) - (set-interrupt-enables! mask) (or (->update-event (accept-process-output)) (loop))) ((process-status-changes?) - (set-interrupt-enables! mask) (or (->update-event (handle-process-status-changes)) (loop))) ((not have-select?) - (set-interrupt-enables! mask) (and block? (loop))) (incomplete-pending ;; Must busy-wait. - (set-interrupt-enables! mask) (loop)) (block? - (read-more? #t) - (set-interrupt-enables! mask) + (read-more? #t tick) (loop)) (else - (set-interrupt-enables! mask) #f))))))) (->update-event (named-lambda (->update-event redisplay?) @@ -313,7 +307,7 @@ USA. (values (named-lambda (halt-update?) (or (fix:< start end) - (read-more? #f))) + (read-more? #f #f))) (named-lambda (peek-no-hang) (let ((event (->event (match-event #f)))) (if (input-event? event) diff --git a/src/edwin/win32.scm b/src/edwin/win32.scm index dab0ce567..4d0a132d5 100644 --- a/src/edwin/win32.scm +++ b/src/edwin/win32.scm @@ -449,23 +449,19 @@ USA. (define (read-event-1 block?) (or (read-event-2) (let loop () - (let ((mask (set-interrupt-enables! interrupt-mask/gc+win32))) + (let ((tick (subprocess-global-status-tick))) (cond (inferior-thread-changes? - (set-interrupt-enables! mask) event:inferior-thread-output) ((process-output-available?) - (set-interrupt-enables! mask) event:process-output) ((process-status-changes?) - (set-interrupt-enables! mask) event:process-status) (else (let ((flag (test-for-io-on-descriptor ;; console-channel-descriptor here ;; means "input from message queue". - console-channel-descriptor block? 'READ))) - (set-interrupt-enables! mask) + console-channel-descriptor block? 'READ tick))) (case flag ((#F) #f) ((PROCESS-STATUS-CHANGE) event:process-status) diff --git a/src/edwin/xterm.scm b/src/edwin/xterm.scm index 416ebd8bf..44b23ae3e 100644 --- a/src/edwin/xterm.scm +++ b/src/edwin/xterm.scm @@ -544,23 +544,18 @@ USA. (define (read-event-1 display block?) (or (x-display-process-events display 2) (let loop () - (let ((interrupt-mask (set-interrupt-enables! interrupt-mask/gc-ok))) + (let ((tick (subprocess-global-status-tick))) (cond (inferior-thread-changes? - (set-interrupt-enables! interrupt-mask) event:inferior-thread-output) ((process-output-available?) - (set-interrupt-enables! interrupt-mask) event:process-output) ((process-status-changes?) - (set-interrupt-enables! interrupt-mask) event:process-status) (else (let ((flag (test-for-io-on-descriptor (x-display-descriptor display) - block? - 'READ))) - (set-interrupt-enables! interrupt-mask) + block? 'READ tick))) (case flag ((#F) #f) ((PROCESS-STATUS-CHANGE) event:process-status) diff --git a/src/microcode/ossmp.h b/src/microcode/ossmp.h index 47185bbef..fc6309318 100644 --- a/src/microcode/ossmp.h +++ b/src/microcode/ossmp.h @@ -61,6 +61,7 @@ struct processor { }; extern processor_t *processors; +extern processor_t *smp_io_blocked; extern __thread processor_t *self; extern void setup_processors (int count); @@ -71,6 +72,7 @@ extern bool smp_gc_started (void); extern void smp_gc_finished (void); extern void smp_kill_gc (processor_t *); +extern void smp_kill_subprocess (processor_t *); extern void smp_kill_timer (processor_t *); extern void smp_timer_interrupt (void); diff --git a/src/microcode/prossmp.c b/src/microcode/prossmp.c index 4e70cbf6d..90f426aee 100644 --- a/src/microcode/prossmp.c +++ b/src/microcode/prossmp.c @@ -59,6 +59,9 @@ static pthread_mutex_t thread_mutex = MUTEX_INITIALIZER; /* The current pthread's processor. */ __thread processor_t *self; +/* The io-waiter's processor when it is blocked in test-select-registry. */ +processor_t *smp_io_blocked = NULL; + extern int saved_processor_count; extern int saved_processor_heap_size; extern int saved_stack_size; @@ -199,6 +202,7 @@ setup_processors (int count) { make_processors (count-1); + smp_io_blocked = NULL; self = processors; assert (self->id == 0); self->pthread = pthread_self (); diff --git a/src/microcode/uxio.c b/src/microcode/uxio.c index 3bd0c5dbb..092f29948 100644 --- a/src/microcode/uxio.c +++ b/src/microcode/uxio.c @@ -629,7 +629,13 @@ safe_poll (struct pollfd *fds, nfds_t nfds, int blockp) } else { +#ifdef ENABLE_SMP + smp_io_blocked = self; +#endif n = (UX_ppoll (fds, nfds, NULL, &old)); +#ifdef ENABLE_SMP + smp_io_blocked = NULL; +#endif } UX_sigprocmask (SIG_SETMASK, &old, NULL); } diff --git a/src/microcode/uxproc.c b/src/microcode/uxproc.c index 06266a3ac..eb08702f7 100644 --- a/src/microcode/uxproc.c +++ b/src/microcode/uxproc.c @@ -762,6 +762,15 @@ subprocess_death (pid_t pid, int * status) { Tprocess process; LOCK(); +#ifdef ENABLE_SMP + if (smp_io_blocked != NULL + && smp_io_blocked != self) + { + smp_kill_subprocess (smp_io_blocked); + UNLOCK(); + return; + } +#endif process = (find_process (pid)); if (process != NO_PROCESS) { diff --git a/src/microcode/uxsig.c b/src/microcode/uxsig.c index 8f351904a..4d7d806e9 100644 --- a/src/microcode/uxsig.c +++ b/src/microcode/uxsig.c @@ -520,6 +520,12 @@ smp_kill_gc (processor_t *p) pthread_kill (p->pthread, SIGUSR2); } +void +smp_kill_subprocess (processor_t *p) +{ + pthread_kill (p->pthread, SIGCHLD); +} + static volatile processor_t *next_timer = NULL; static pthread_mutex_t nt_mutex = MUTEX_INITIALIZER; #ifdef ENABLE_DEBUGGING_TOOLS diff --git a/src/runtime/io.scm b/src/runtime/io.scm index 1c0cff227..3f0720812 100644 --- a/src/runtime/io.scm +++ b/src/runtime/io.scm @@ -175,14 +175,14 @@ USA. (define (channel-read channel buffer start end) (let loop () - (let ((n (%channel-read channel buffer start end))) + (let ((n (%channel-read channel buffer start end #f))) (if (eq? n #t) (if (channel-blocking? channel) (loop) #f) n)))) -(define (%channel-read channel buffer start end) +(define (%channel-read channel buffer start end subprocess-tick) (let ((do-read (lambda () ((ucode-primitive channel-read 4) @@ -194,7 +194,8 @@ USA. end)))) (declare (integrate-operator do-read)) (if (and have-select? (not (channel-type=file? channel))) - (let ((result (test-for-io-on-channel channel 'READ))) + (let ((result (test-for-io-on-channel channel 'READ + #f subprocess-tick))) (case result ((READ HANGUP ERROR) (do-read)) ((#F) #f) @@ -223,7 +224,7 @@ USA. end)))) (declare (integrate-operator do-write)) (if (and have-select? (not (channel-type=file? channel))) - (let ((result (test-for-io-on-channel channel 'WRITE))) + (let ((result (test-for-io-on-channel channel 'WRITE #f #f))) (case result ((WRITE HANGUP ERROR) (do-write)) ((#F) 0) @@ -528,32 +529,39 @@ USA. (set-select-registry-length! registry rl) rl))) -(define (test-for-io-on-channel channel mode #!optional block?) +(define (test-for-io-on-channel channel mode #!optional block? subprocess-tick) (test-for-io-on-descriptor (channel-descriptor-for-select channel) (if (default-object? block?) (channel-blocking? channel) block?) - mode)) + mode + (if (default-object? subprocess-tick) + #f + subprocess-tick))) (define (channel-has-input? channel) - (let ((descriptor (channel-descriptor-for-select channel))) - (let loop () - (let ((mode (test-select-descriptor descriptor 'READ))) - (if (pair? mode) - (or (eq? (car mode) 'READ) - (eq? (car mode) 'READ/WRITE)) - (loop)))))) + (let loop () + (let ((mode (test-select-descriptor (channel-descriptor-for-select channel) + 'READ))) + (if (pair? mode) + (or (eq? (car mode) 'READ) + (eq? (car mode) 'READ/WRITE)) + (loop))))) (define-integrable (channel-descriptor-for-select channel) ((ucode-primitive channel-descriptor 1) (channel-descriptor channel))) -(define (test-for-io-on-descriptor descriptor block? mode) +(define (test-for-io-on-descriptor descriptor block? mode + #!optional subprocess-tick) (or (let ((rmode (test-select-descriptor descriptor mode))) (if (pair? rmode) (simplify-select-registry-mode rmode) rmode)) (and block? - (block-on-io-descriptor descriptor mode)))) + (block-on-io-descriptor descriptor mode + (if (default-object? subprocess-tick) + #f + subprocess-tick))))) (define (test-select-descriptor descriptor mode) (let ((result diff --git a/src/runtime/os2graph.scm b/src/runtime/os2graph.scm index d3292a840..73f02f107 100644 --- a/src/runtime/os2graph.scm +++ b/src/runtime/os2graph.scm @@ -860,7 +860,7 @@ USA. (if (queue-empty? user-event-queue) (begin (if (eq? 'READ - (test-for-io-on-descriptor event-descriptor #t 'READ)) + (test-for-io-on-descriptor event-descriptor #t 'READ #f)) (read-and-process-event)) (loop)) (dequeue! user-event-queue)))))) diff --git a/src/runtime/runtime.pkg b/src/runtime/runtime.pkg index 80d6355a7..7854ff398 100644 --- a/src/runtime/runtime.pkg +++ b/src/runtime/runtime.pkg @@ -5087,6 +5087,8 @@ USA. make-population/unsafe) (import (runtime 1d-property) make-1d-table/unsafe) + (import (runtime subprocess) + global-status-tick) (export (runtime interrupt-handler) thread-timer-interrupt-handler) (export (runtime primitive-io) diff --git a/src/runtime/socket.scm b/src/runtime/socket.scm index c43d05ad7..e149762b3 100644 --- a/src/runtime/socket.scm +++ b/src/runtime/socket.scm @@ -70,9 +70,8 @@ USA. (let ((do-test (lambda (k) (let ((result - (test-for-io-on-channel server-socket - 'READ - block?))) + (test-for-io-on-channel server-socket 'READ + block? #f))) (case result ((READ) (open-channel diff --git a/src/runtime/thread.scm b/src/runtime/thread.scm index 330141942..a5f48846c 100644 --- a/src/runtime/thread.scm +++ b/src/runtime/thread.scm @@ -361,20 +361,26 @@ USA. (%unlock)) (define (suspend-current-thread) + (%suspend-current-thread #f)) + +(define (%suspend-current-thread subprocess-tick) (without-interrupts (lambda () (let* ((id (%id)) (thread (%current-thread id))) - (%trace ";"id" suspend-current-thread "thread"\n") + (%trace ";"id" %suspend-current-thread "thread"\n") (%lock) - (%suspend-thread thread))))) + (%suspend-thread thread subprocess-tick))))) -(define (%suspend-thread thread) +(define (%suspend-thread thread subprocess-tick) (%trace ";"(%%id)" %suspend-thread "thread"\n") (assert-locked '%suspend-thread) (let ((block-events? (thread/block-events? thread))) (set-thread/block-events?! thread #f) (maybe-signal-io-thread-events) + (if (and subprocess-tick + (not (eq? global-status-tick subprocess-tick))) + (signal-early-subprocess-event thread)) (let ((any-events? (handle-thread-events thread))) (set-thread/block-events?! thread block-events?) (if any-events? @@ -750,7 +756,7 @@ USA. (%%trace ";"(%%id)" maybe-signal-subprocess-status\n") (%handle-subprocess-status-change)) -(define (block-on-io-descriptor descriptor mode) +(define (block-on-io-descriptor descriptor mode subprocess-tick) (let ((result 'INTERRUPT) (thread (current-thread))) (let ((registration-1 (make-tentry @@ -774,7 +780,7 @@ USA. (%maybe-toggle-thread-timer) (%maybe-wake-io-waiter)))) (lambda () - (suspend-current-thread) + (%suspend-current-thread subprocess-tick) result) (lambda () (with-threads-locked @@ -1003,6 +1009,26 @@ USA. ((not (pair? events))) (%signal-thread-event (caar events) (cdar events))))))) +(define (signal-early-subprocess-event thread) + (assert-locked 'signal-early-subprocess-event) + (let ((dentry (let loop ((dentry io-registrations)) + (and dentry + (if (eqv? 'PROCESS-STATUS-CHANGE + (dentry/descriptor dentry)) + dentry + (loop (dentry/next dentry))))))) + (if dentry + (let loop ((tentry (dentry/first-tentry dentry))) + (if (eq? thread (tentry/thread tentry)) + (begin + (delete-tentry! tentry) + (%signal-thread-event thread (let ((e (tentry/event tentry))) + (and e + (lambda () (e 'READ)))))) + (let ((n (tentry/next tentry))) + (if n + (loop n)))))))) + (define (delete-tentry! tentry) (assert-locked 'delete-tentry!) (let ((dentry (tentry/dentry tentry)) @@ -1388,7 +1414,7 @@ USA. (begin (ring/enqueue (thread-mutex/waiting-threads mutex) thread) (do () ((eq? thread (thread-mutex/owner mutex))) - (%suspend-thread thread) + (%suspend-thread thread #f) (%lock))) (set-thread-mutex/owner! mutex thread))) diff --git a/src/runtime/x11graph.scm b/src/runtime/x11graph.scm index 3b87d3917..13c9867d1 100644 --- a/src/runtime/x11graph.scm +++ b/src/runtime/x11graph.scm @@ -323,7 +323,8 @@ USA. (test-for-io-on-descriptor (x-display-descriptor (x-display/xd display)) #t - 'READ)) + 'READ + #f)) (x-display-process-events (x-display/xd display) 1))))) (if event (process-event display event))))