smp: main loops must use subprocess status tick to avoid suspending.
authorMatt Birkholz <puck@birchwood-abbey.net>
Fri, 13 Mar 2015 06:47:47 +0000 (23:47 -0700)
committerMatt Birkholz <puck@birchwood-abbey.net>
Fri, 13 Mar 2015 06:47:47 +0000 (23:47 -0700)
16 files changed:
README.txt
src/edwin/os2term.scm
src/edwin/tterm.scm
src/edwin/win32.scm
src/edwin/xterm.scm
src/microcode/ossmp.h
src/microcode/prossmp.c
src/microcode/uxio.c
src/microcode/uxproc.c
src/microcode/uxsig.c
src/runtime/io.scm
src/runtime/os2graph.scm
src/runtime/runtime.pkg
src/runtime/socket.scm
src/runtime/thread.scm
src/runtime/x11graph.scm

index 42c122ecc11948a5c4d3825da4a5243940ef2049..4ec13ae6caa7981c76a862a4c9db40eefd9f027b 100644 (file)
@@ -1636,3 +1636,68 @@ The hits with accompanying analysis:
        Caller: x-graphics/close-window
 
        Used the display-finalizer's mutex to serialize the callers.
+\f
+
+* Main Loops
+
+The analysis of the subprocess-global-status-tick procedure suggested
+that "How soon other threads observe the new tick actually makes
+little difference."  The reason is that the procedure cannot be used
+for its intended purpose as exemplified by several application main
+loops including Edwin's.  Thus it is merely advisory: "At some point
+recently, this was the value of the runtime's subprocess status tick."
+
+Subprocess-global-status-tick is used by main loops that are managing
+subprocesses.  They use it to quickly check whether any statuses have
+changed since they last polled their subprocesses.  Edwin's main loop
+on all display types (x11, os2, win32 and tty) currently suspends in
+block-on-io-descriptor which also wakes edwin-thread when any
+subprocess status changes.
+
+Unfortunately in an SMPing world there is room for a race.  The
+edwin-thread needs to check its subprocesses' statuses and suspend,
+atomically, but another processor can asynchronously receive a
+SIGCHLD, collect the child's results, and tick the microcode's global
+subprocess status sync tick BEFORE the edwin thread can suspend,
+winning the race and beating Edwin, who has suspended without
+servicing the subprocess status change.
+
+Without-interrupts was used to avoid the race, but without-interrupts
+is not atomic in an SMPing world.  All of these main loops need some
+other way to suspend without missing ticks.  If suspend-current-thread
+knew when the loop last looked (the tick current at the START of the
+loop), it could avoid suspending the thread if new changes had
+occurred since then.
+
+The last-tick-I-saw argument comes from the main loop and becomes a
+"subprocess-tick" parameter in test-for-io-on-descriptor, block-on-io-
+descriptor and %suspend-current-thread.  %Suspend-current-thread
+checks the tick from the main loop against the runtime's current tick.
+If it is up-to-date the thread is suspended and the processor looks
+for other work.  If there is no other work and no io-waiter, the
+processor claims the title and blocks in test-select-registry.
+
+The test-select-registry primitive atomically tests and blocks using
+pselect/ppoll, but another pthread might receive the SIGCHLD signal.
+If the io-waiter does not get the SIGCHLD it will stay blocked in
+test-select-registry.  To avoid this, the SIGCHLD handler forwards the
+signal to the io-waiter's pthread.  It knows there is a blocked
+io-waiter by consulting a new variable: smp_io_blocked.
+
+So the test-select-registry primitive must first mask SIGCHLD, grab
+the process_table_mutex, and compare ticks.  If the ticks do not
+match, it releases the mutex and returns 'process-status-change.  If
+the ticks match, it sets smp_io_blocked, releases the mutex, and calls
+pselect/ppoll.  The pselect/ppoll unmasks SIGCHLD and blocks
+atomically.  If a SIGCHLD arrived after SIGCHLD was first masked, it
+will be delivered then, unblocking the pselect/ppoll.
+
+If a processor gets a SIGCHLD, its handler must grab the process_
+table_mutex, collect the child's status, and either increment the tick
+or forward the SIGCHLD to the smp_io_blocked, then release the mutex.
+This could happen after smp_io_blocked released the mutex and before
+it blocked, but the forwarded SIGCHLD will be unmasked by
+pselect/ppoll.  This could also happen after smp_io_blocked has
+unblocked and before it has re-acquired the mutex to clear the
+variable, but no harm is done.  The primitive still returns INTERRUPT
+and will be called again.
index a8f6aae8bbecf7e817ff590ce9b7324e539b6637..69c378f6a255f5d9e627f0dd92eafd5e9cf40330 100644 (file)
@@ -719,22 +719,18 @@ USA.
 (define (read-event-1 block?)
   (or (os2win-get-event event-descriptor #f)
       (let loop ()
-       (let ((interrupt-mask (set-interrupt-enables! interrupt-mask/gc-ok)))
+       (let ((tick (subprocess-global-status-tick)))
          (cond (inferior-thread-changes?
-                (set-interrupt-enables! interrupt-mask)
                 event:inferior-thread-output)
                ((process-output-available?)
-                (set-interrupt-enables! interrupt-mask)
                 event:process-output)
                ((process-status-changes?)
-                (set-interrupt-enables! interrupt-mask)
                 event:process-status)
                (else
                 (let ((flag
                        (test-for-io-on-descriptor event-descriptor
                                                   block?
-                                                  'READ)))
-                  (set-interrupt-enables! interrupt-mask)
+                                                  'READ tick)))
                   (case flag
                     ((#F) #f)
                     ((PROCESS-STATUS-CHANGE) event:process-status)
index 84651d1c1785289dc32de5902e4794bf6ae993f6..e2542d9344552695fe77bda6586e15921012f6d2 100644 (file)
@@ -222,11 +222,12 @@ USA.
                                  (find (cdr key-pairs)
                                        possible-pending?))))))))))
         (read-more?                    ; -> #F or #T if some octets were read
-         (named-lambda (read-more? block?)
+         (named-lambda (read-more? block? subprocess-tick)
            (if block?
                (channel-blocking channel)
                (channel-nonblocking channel))
-           (let ((n (%channel-read channel buffer end input-buffer-size)))
+           (let ((n (%channel-read channel buffer end input-buffer-size
+                                   subprocess-tick)))
              (cond ((not n)  #F)
                    ((eq? n #T) #F)
                    ((fix:> n 0)
@@ -239,35 +240,28 @@ USA.
          (named-lambda (match-event block?)
            (let loop ()
              (or (begin
-                   (read-more? #f)
+                   (read-more? #f #f)
                    (match-key))
-                 ;; Atomically poll async event sources and block.
-                 (let ((mask (set-interrupt-enables! interrupt-mask/gc-ok)))
+                 ;; Poll async event sources and block.
+                 (let ((tick (subprocess-global-status-tick)))
                    (cond (inferior-thread-changes?
-                          (set-interrupt-enables! mask)
                           (or (->update-event (accept-thread-output))
                               (loop)))
                          ((process-output-available?)
-                          (set-interrupt-enables! mask)
                           (or (->update-event (accept-process-output))
                               (loop)))
                          ((process-status-changes?)
-                          (set-interrupt-enables! mask)
                           (or (->update-event (handle-process-status-changes))
                               (loop)))
                          ((not have-select?)
-                          (set-interrupt-enables! mask)
                           (and block? (loop)))
                          (incomplete-pending
                           ;; Must busy-wait.
-                          (set-interrupt-enables! mask)
                           (loop))
                          (block?
-                          (read-more? #t)
-                          (set-interrupt-enables! mask)
+                          (read-more? #t tick)
                           (loop))
                          (else
-                          (set-interrupt-enables! mask)
                           #f)))))))
         (->update-event
          (named-lambda (->update-event redisplay?)
@@ -313,7 +307,7 @@ USA.
       (values
        (named-lambda (halt-update?)
         (or (fix:< start end)
-            (read-more? #f)))
+            (read-more? #f #f)))
        (named-lambda (peek-no-hang)
         (let ((event (->event (match-event #f))))
           (if (input-event? event)
index dab0ce567b709bc67cb294c62126a41e579fb150..4d0a132d5b445e9b3236ad016ce42b6e39f16fad 100644 (file)
@@ -449,23 +449,19 @@ USA.
 (define (read-event-1 block?)
   (or (read-event-2)
       (let loop ()
-       (let ((mask (set-interrupt-enables! interrupt-mask/gc+win32)))
+       (let ((tick (subprocess-global-status-tick)))
          (cond (inferior-thread-changes?
-                (set-interrupt-enables! mask)
                 event:inferior-thread-output)
                ((process-output-available?)
-                (set-interrupt-enables! mask)
                 event:process-output)
                ((process-status-changes?)
-                (set-interrupt-enables! mask)
                 event:process-status)
                (else
                 (let ((flag
                        (test-for-io-on-descriptor
                         ;; console-channel-descriptor here
                         ;; means "input from message queue".
-                        console-channel-descriptor block? 'READ)))
-                  (set-interrupt-enables! mask)
+                        console-channel-descriptor block? 'READ tick)))
                   (case flag
                     ((#F) #f)
                     ((PROCESS-STATUS-CHANGE) event:process-status)
index 416ebd8bfca84826820f545be92e518f22c0eec9..44b23ae3e7f331f0c6b8504c892c08416d7d6c84 100644 (file)
@@ -544,23 +544,18 @@ USA.
 (define (read-event-1 display block?)
   (or (x-display-process-events display 2)
       (let loop ()
-       (let ((interrupt-mask (set-interrupt-enables! interrupt-mask/gc-ok)))
+       (let ((tick (subprocess-global-status-tick)))
          (cond (inferior-thread-changes?
-                (set-interrupt-enables! interrupt-mask)
                 event:inferior-thread-output)
                ((process-output-available?)
-                (set-interrupt-enables! interrupt-mask)
                 event:process-output)
                ((process-status-changes?)
-                (set-interrupt-enables! interrupt-mask)
                 event:process-status)
                (else
                 (let ((flag
                        (test-for-io-on-descriptor
                         (x-display-descriptor display)
-                        block?
-                        'READ)))
-                  (set-interrupt-enables! interrupt-mask)
+                        block? 'READ tick)))
                   (case flag
                     ((#F) #f)
                     ((PROCESS-STATUS-CHANGE) event:process-status)
index 47185bbef5ed0b606a05483895b422872b59e3d2..fc630931819e63ee530f33dc6619ea09856ee65e 100644 (file)
@@ -61,6 +61,7 @@ struct processor {
 };
 
 extern processor_t *processors;
+extern processor_t *smp_io_blocked;
 extern __thread processor_t *self;
 
 extern void setup_processors (int count);
@@ -71,6 +72,7 @@ extern bool smp_gc_started (void);
 extern void smp_gc_finished (void);
 
 extern void smp_kill_gc (processor_t *);
+extern void smp_kill_subprocess (processor_t *);
 extern void smp_kill_timer (processor_t *);
 extern void smp_timer_interrupt (void);
 
index 4e70cbf6d5eac5dd462465c59af8afe932d60160..90f426aee6eca8f0fb8c396f3bf4e0ae063d5430 100644 (file)
@@ -59,6 +59,9 @@ static pthread_mutex_t thread_mutex = MUTEX_INITIALIZER;
 /* The current pthread's processor. */
 __thread processor_t *self;
 
+/* The io-waiter's processor when it is blocked in test-select-registry. */
+processor_t *smp_io_blocked = NULL;
+
 extern int saved_processor_count;
 extern int saved_processor_heap_size;
 extern int saved_stack_size;
@@ -199,6 +202,7 @@ setup_processors (int count)
 {
   make_processors (count-1);
 
+  smp_io_blocked = NULL;
   self = processors;
   assert (self->id == 0);
   self->pthread = pthread_self ();
index 3bd0c5dbbd78e1d927e244eb31e4a7bfa416b833..092f299483a2ceb5f7b835e1b0e34e27ca0306d1 100644 (file)
@@ -629,7 +629,13 @@ safe_poll (struct pollfd *fds, nfds_t nfds, int blockp)
        }
       else
        {
+#ifdef ENABLE_SMP
+         smp_io_blocked = self;
+#endif
          n = (UX_ppoll (fds, nfds, NULL, &old));
+#ifdef ENABLE_SMP
+         smp_io_blocked = NULL;
+#endif
        }
       UX_sigprocmask (SIG_SETMASK, &old, NULL);
     }
index 06266a3ac8c976ab1075b926f081718e42c657f1..eb08702f7d288c8ee63bd10e5cd1c8a9cbd40ee4 100644 (file)
@@ -762,6 +762,15 @@ subprocess_death (pid_t pid, int * status)
 {
   Tprocess process;
   LOCK();
+#ifdef ENABLE_SMP
+  if (smp_io_blocked != NULL
+      && smp_io_blocked != self)
+    {
+      smp_kill_subprocess (smp_io_blocked);
+      UNLOCK();
+      return;
+    }
+#endif
   process = (find_process (pid));
   if (process != NO_PROCESS)
     {
index 8f351904a7b79f0ae9d3c56d4d610041c3290590..4d7d806e9bd2df7501f38751dae9ebb6beb035c4 100644 (file)
@@ -520,6 +520,12 @@ smp_kill_gc (processor_t *p)
   pthread_kill (p->pthread, SIGUSR2);
 }
 
+void
+smp_kill_subprocess (processor_t *p)
+{
+  pthread_kill (p->pthread, SIGCHLD);
+}
+
 static volatile processor_t *next_timer = NULL;
 static pthread_mutex_t nt_mutex = MUTEX_INITIALIZER;
 #ifdef ENABLE_DEBUGGING_TOOLS
index 1c0cff227604f455f7d67400267c1c3b70d61cab..3f0720812a3ff22d6590f3c646069d5b323235d5 100644 (file)
@@ -175,14 +175,14 @@ USA.
 \f
 (define (channel-read channel buffer start end)
   (let loop ()
-    (let ((n (%channel-read channel buffer start end)))
+    (let ((n (%channel-read channel buffer start end #f)))
       (if (eq? n #t)
          (if (channel-blocking? channel)
              (loop)
              #f)
          n))))
 
-(define (%channel-read channel buffer start end)
+(define (%channel-read channel buffer start end subprocess-tick)
   (let ((do-read
         (lambda ()
           ((ucode-primitive channel-read 4)
@@ -194,7 +194,8 @@ USA.
            end))))
     (declare (integrate-operator do-read))
     (if (and have-select? (not (channel-type=file? channel)))
-       (let ((result (test-for-io-on-channel channel 'READ)))
+       (let ((result (test-for-io-on-channel channel 'READ
+                                             #f subprocess-tick)))
          (case result
            ((READ HANGUP ERROR) (do-read))
            ((#F) #f)
@@ -223,7 +224,7 @@ USA.
            end))))
     (declare (integrate-operator do-write))
     (if (and have-select? (not (channel-type=file? channel)))
-       (let ((result (test-for-io-on-channel channel 'WRITE)))
+       (let ((result (test-for-io-on-channel channel 'WRITE #f #f)))
          (case result
            ((WRITE HANGUP ERROR) (do-write))
            ((#F) 0)
@@ -528,32 +529,39 @@ USA.
        (set-select-registry-length! registry rl)
        rl)))
 \f
-(define (test-for-io-on-channel channel mode #!optional block?)
+(define (test-for-io-on-channel channel mode #!optional block? subprocess-tick)
   (test-for-io-on-descriptor (channel-descriptor-for-select channel)
                             (if (default-object? block?)
                                 (channel-blocking? channel)
                                 block?)
-                            mode))
+                            mode
+                            (if (default-object? subprocess-tick)
+                                #f
+                                subprocess-tick)))
 
 (define (channel-has-input? channel)
-  (let ((descriptor (channel-descriptor-for-select channel)))
-    (let loop ()
-      (let ((mode (test-select-descriptor descriptor 'READ)))
-       (if (pair? mode)
-           (or (eq? (car mode) 'READ)
-               (eq? (car mode) 'READ/WRITE))
-           (loop))))))
+  (let loop ()
+    (let ((mode (test-select-descriptor (channel-descriptor-for-select channel)
+                                       'READ)))
+      (if (pair? mode)
+         (or (eq? (car mode) 'READ)
+             (eq? (car mode) 'READ/WRITE))
+         (loop)))))
 
 (define-integrable (channel-descriptor-for-select channel)
   ((ucode-primitive channel-descriptor 1) (channel-descriptor channel)))
 
-(define (test-for-io-on-descriptor descriptor block? mode)
+(define (test-for-io-on-descriptor descriptor block? mode
+                                  #!optional subprocess-tick)
   (or (let ((rmode (test-select-descriptor descriptor mode)))
        (if (pair? rmode)
            (simplify-select-registry-mode rmode)
            rmode))
       (and block?
-          (block-on-io-descriptor descriptor mode))))
+          (block-on-io-descriptor descriptor mode
+                                  (if (default-object? subprocess-tick)
+                                      #f
+                                      subprocess-tick)))))
 
 (define (test-select-descriptor descriptor mode)
   (let ((result
index d3292a84069babdf43370209b79cffb3f7d443e6..73f02f107622340cfc90280e0df8f0cefe617010 100644 (file)
@@ -860,7 +860,7 @@ USA.
        (if (queue-empty? user-event-queue)
           (begin
             (if (eq? 'READ
-                     (test-for-io-on-descriptor event-descriptor #t 'READ))
+                     (test-for-io-on-descriptor event-descriptor #t 'READ #f))
                 (read-and-process-event))
             (loop))
           (dequeue! user-event-queue))))))
index 80d6355a71acb0a6926c02d9a79d91ab2e9474bf..7854ff398f3ce141505ff6a30ab97b121e7983be 100644 (file)
@@ -5087,6 +5087,8 @@ USA.
          make-population/unsafe)
   (import (runtime 1d-property)
          make-1d-table/unsafe)
+  (import (runtime subprocess)
+         global-status-tick)
   (export (runtime interrupt-handler)
          thread-timer-interrupt-handler)
   (export (runtime primitive-io)
index c43d05ad70496ce959ce0931b5102decd3a54ad6..e149762b393389194c1accb860271f10bb2f44be 100644 (file)
@@ -70,9 +70,8 @@ USA.
             (let ((do-test
                    (lambda (k)
                      (let ((result
-                            (test-for-io-on-channel server-socket
-                                                    'READ
-                                                    block?)))
+                            (test-for-io-on-channel server-socket 'READ
+                                                    block? #f)))
                        (case result
                          ((READ)
                           (open-channel
index 3301419421b764748a45ba1bacd09bb0f891e574..a5f48846c0a160e79a3fd2a955d92b2e48a4baef 100644 (file)
@@ -361,20 +361,26 @@ USA.
   (%unlock))
 
 (define (suspend-current-thread)
+  (%suspend-current-thread #f))
+
+(define (%suspend-current-thread subprocess-tick)
   (without-interrupts
    (lambda ()
      (let* ((id (%id))
            (thread (%current-thread id)))
-       (%trace ";"id" suspend-current-thread "thread"\n")
+       (%trace ";"id" %suspend-current-thread "thread"\n")
        (%lock)
-       (%suspend-thread thread)))))
+       (%suspend-thread thread subprocess-tick)))))
 
-(define (%suspend-thread thread)
+(define (%suspend-thread thread subprocess-tick)
   (%trace ";"(%%id)" %suspend-thread "thread"\n")
   (assert-locked '%suspend-thread)
   (let ((block-events? (thread/block-events? thread)))
     (set-thread/block-events?! thread #f)
     (maybe-signal-io-thread-events)
+    (if (and subprocess-tick
+            (not (eq? global-status-tick subprocess-tick)))
+       (signal-early-subprocess-event thread))
     (let ((any-events? (handle-thread-events thread)))
       (set-thread/block-events?! thread block-events?)
       (if any-events?
@@ -750,7 +756,7 @@ USA.
   (%%trace ";"(%%id)" maybe-signal-subprocess-status\n")
   (%handle-subprocess-status-change))
 
-(define (block-on-io-descriptor descriptor mode)
+(define (block-on-io-descriptor descriptor mode subprocess-tick)
   (let ((result 'INTERRUPT)
        (thread (current-thread)))
     (let ((registration-1 (make-tentry
@@ -774,7 +780,7 @@ USA.
            (%maybe-toggle-thread-timer)
            (%maybe-wake-io-waiter))))
        (lambda ()
-        (suspend-current-thread)
+        (%suspend-current-thread subprocess-tick)
         result)
        (lambda ()
         (with-threads-locked
@@ -1003,6 +1009,26 @@ USA.
              ((not (pair? events)))
            (%signal-thread-event (caar events) (cdar events)))))))
 
+(define (signal-early-subprocess-event thread)
+  (assert-locked 'signal-early-subprocess-event)
+  (let ((dentry (let loop ((dentry io-registrations))
+                 (and dentry
+                      (if (eqv? 'PROCESS-STATUS-CHANGE
+                                (dentry/descriptor dentry))
+                          dentry
+                          (loop (dentry/next dentry)))))))
+    (if dentry
+       (let loop ((tentry (dentry/first-tentry dentry)))
+         (if (eq? thread (tentry/thread tentry))
+             (begin
+               (delete-tentry! tentry)
+               (%signal-thread-event thread (let ((e (tentry/event tentry)))
+                                              (and e
+                                                   (lambda () (e 'READ))))))
+             (let ((n (tentry/next tentry)))
+               (if n
+                   (loop n))))))))
+
 (define (delete-tentry! tentry)
   (assert-locked 'delete-tentry!)
   (let ((dentry (tentry/dentry tentry))
@@ -1388,7 +1414,7 @@ USA.
       (begin
        (ring/enqueue (thread-mutex/waiting-threads mutex) thread)
        (do () ((eq? thread (thread-mutex/owner mutex)))
-         (%suspend-thread thread)
+         (%suspend-thread thread #f)
          (%lock)))
       (set-thread-mutex/owner! mutex thread)))
 
index 3b87d391718970598e9cbd01d42ba8ed2df4f40d..13c9867d187a221f79e506ad53fb7a3caaa19e61 100644 (file)
@@ -323,7 +323,8 @@ USA.
                       (test-for-io-on-descriptor
                        (x-display-descriptor (x-display/xd display))
                        #t
-                       'READ))
+                       'READ
+                       #f))
                  (x-display-process-events (x-display/xd display) 1)))))
     (if event
        (process-event display event))))