From: Matt Birkholz <puck@birchwood-abbey.net>
Date: Fri, 13 Mar 2015 06:47:47 +0000 (-0700)
Subject: smp: main loops must use subprocess status tick to avoid suspending.
X-Git-Url: https://birchwood-abbey.net/git?a=commitdiff_plain;h=9208c1057f987eea5d2cf61c1ab0802e6f394309;p=mit-scheme.git

smp: main loops must use subprocess status tick to avoid suspending.
---

diff --git a/README.txt b/README.txt
index 42c122ecc..4ec13ae6c 100644
--- a/README.txt
+++ b/README.txt
@@ -1636,3 +1636,68 @@ The hits with accompanying analysis:
 	Caller: x-graphics/close-window
 
 	Used the display-finalizer's mutex to serialize the callers.
+
+
+* Main Loops
+
+The analysis of the subprocess-global-status-tick procedure suggested
+that "How soon other threads observe the new tick actually makes
+little difference."  The reason is that the procedure cannot be used
+for its intended purpose as exemplified by several application main
+loops including Edwin's.  Thus it is merely advisory: "At some point
+recently, this was the value of the runtime's subprocess status tick."
+
+Subprocess-global-status-tick is used by main loops that are managing
+subprocesses.  They use it to quickly check whether any statuses have
+changed since they last polled their subprocesses.  Edwin's main loop
+on all display types (x11, os2, win32 and tty) currently suspends in
+block-on-io-descriptor which also wakes edwin-thread when any
+subprocess status changes.
+
+Unfortunately in an SMPing world there is room for a race.  The
+edwin-thread needs to check its subprocesses' statuses and suspend,
+atomically, but another processor can asynchronously receive a
+SIGCHLD, collect the child's results, and tick the microcode's global
+subprocess status sync tick BEFORE the edwin thread can suspend,
+winning the race and beating Edwin, who has suspended without
+servicing the subprocess status change.
+
+Without-interrupts was used to avoid the race, but without-interrupts
+is not atomic in an SMPing world.  All of these main loops need some
+other way to suspend without missing ticks.  If suspend-current-thread
+knew when the loop last looked (the tick current at the START of the
+loop), it could avoid suspending the thread if new changes had
+occurred since then.
+
+The last-tick-I-saw argument comes from the main loop and becomes a
+"subprocess-tick" parameter in test-for-io-on-descriptor, block-on-io-
+descriptor and %suspend-current-thread.  %Suspend-current-thread
+checks the tick from the main loop against the runtime's current tick.
+If it is up-to-date the thread is suspended and the processor looks
+for other work.  If there is no other work and no io-waiter, the
+processor claims the title and blocks in test-select-registry.
+
+The test-select-registry primitive atomically tests and blocks using
+pselect/ppoll, but another pthread might receive the SIGCHLD signal.
+If the io-waiter does not get the SIGCHLD it will stay blocked in
+test-select-registry.  To avoid this, the SIGCHLD handler forwards the
+signal to the io-waiter's pthread.  It knows there is a blocked
+io-waiter by consulting a new variable: smp_io_blocked.
+
+So the test-select-registry primitive must first mask SIGCHLD, grab
+the process_table_mutex, and compare ticks.  If the ticks do not
+match, it releases the mutex and returns 'process-status-change.  If
+the ticks match, it sets smp_io_blocked, releases the mutex, and calls
+pselect/ppoll.  The pselect/ppoll unmasks SIGCHLD and blocks
+atomically.  If a SIGCHLD arrived after SIGCHLD was first masked, it
+will be delivered then, unblocking the pselect/ppoll.
+
+If a processor gets a SIGCHLD, its handler must grab the process_
+table_mutex, collect the child's status, and either increment the tick
+or forward the SIGCHLD to the smp_io_blocked, then release the mutex.
+This could happen after smp_io_blocked released the mutex and before
+it blocked, but the forwarded SIGCHLD will be unmasked by
+pselect/ppoll.  This could also happen after smp_io_blocked has
+unblocked and before it has re-acquired the mutex to clear the
+variable, but no harm is done.  The primitive still returns INTERRUPT
+and will be called again.
diff --git a/src/edwin/os2term.scm b/src/edwin/os2term.scm
index a8f6aae8b..69c378f6a 100644
--- a/src/edwin/os2term.scm
+++ b/src/edwin/os2term.scm
@@ -719,22 +719,18 @@ USA.
 (define (read-event-1 block?)
   (or (os2win-get-event event-descriptor #f)
       (let loop ()
-	(let ((interrupt-mask (set-interrupt-enables! interrupt-mask/gc-ok)))
+	(let ((tick (subprocess-global-status-tick)))
 	  (cond (inferior-thread-changes?
-		 (set-interrupt-enables! interrupt-mask)
 		 event:inferior-thread-output)
 		((process-output-available?)
-		 (set-interrupt-enables! interrupt-mask)
 		 event:process-output)
 		((process-status-changes?)
-		 (set-interrupt-enables! interrupt-mask)
 		 event:process-status)
 		(else
 		 (let ((flag
 			(test-for-io-on-descriptor event-descriptor
 						   block?
-						   'READ)))
-		   (set-interrupt-enables! interrupt-mask)
+						   'READ tick)))
 		   (case flag
 		     ((#F) #f)
 		     ((PROCESS-STATUS-CHANGE) event:process-status)
diff --git a/src/edwin/tterm.scm b/src/edwin/tterm.scm
index 84651d1c1..e2542d934 100644
--- a/src/edwin/tterm.scm
+++ b/src/edwin/tterm.scm
@@ -222,11 +222,12 @@ USA.
 				  (find (cdr key-pairs)
 					possible-pending?))))))))))
 	 (read-more?			; -> #F or #T if some octets were read
-	  (named-lambda (read-more? block?)
+	  (named-lambda (read-more? block? subprocess-tick)
 	    (if block?
 		(channel-blocking channel)
 		(channel-nonblocking channel))
-	    (let ((n (%channel-read channel buffer end input-buffer-size)))
+	    (let ((n (%channel-read channel buffer end input-buffer-size
+				    subprocess-tick)))
 	      (cond ((not n)  #F)
 		    ((eq? n #T) #F)
 		    ((fix:> n 0)
@@ -239,35 +240,28 @@ USA.
 	  (named-lambda (match-event block?)
 	    (let loop ()
 	      (or (begin
-		    (read-more? #f)
+		    (read-more? #f #f)
 		    (match-key))
-		  ;; Atomically poll async event sources and block.
-		  (let ((mask (set-interrupt-enables! interrupt-mask/gc-ok)))
+		  ;; Poll async event sources and block.
+		  (let ((tick (subprocess-global-status-tick)))
 		    (cond (inferior-thread-changes?
-			   (set-interrupt-enables! mask)
 			   (or (->update-event (accept-thread-output))
 			       (loop)))
 			  ((process-output-available?)
-			   (set-interrupt-enables! mask)
 			   (or (->update-event (accept-process-output))
 			       (loop)))
 			  ((process-status-changes?)
-			   (set-interrupt-enables! mask)
 			   (or (->update-event (handle-process-status-changes))
 			       (loop)))
 			  ((not have-select?)
-			   (set-interrupt-enables! mask)
 			   (and block? (loop)))
 			  (incomplete-pending
 			   ;; Must busy-wait.
-			   (set-interrupt-enables! mask)
 			   (loop))
 			  (block?
-			   (read-more? #t)
-			   (set-interrupt-enables! mask)
+			   (read-more? #t tick)
 			   (loop))
 			  (else
-			   (set-interrupt-enables! mask)
 			   #f)))))))
 	 (->update-event
 	  (named-lambda (->update-event redisplay?)
@@ -313,7 +307,7 @@ USA.
       (values
        (named-lambda (halt-update?)
 	 (or (fix:< start end)
-	     (read-more? #f)))
+	     (read-more? #f #f)))
        (named-lambda (peek-no-hang)
 	 (let ((event (->event (match-event #f))))
 	   (if (input-event? event)
diff --git a/src/edwin/win32.scm b/src/edwin/win32.scm
index dab0ce567..4d0a132d5 100644
--- a/src/edwin/win32.scm
+++ b/src/edwin/win32.scm
@@ -449,23 +449,19 @@ USA.
 (define (read-event-1 block?)
   (or (read-event-2)
       (let loop ()
-	(let ((mask (set-interrupt-enables! interrupt-mask/gc+win32)))
+	(let ((tick (subprocess-global-status-tick)))
 	  (cond (inferior-thread-changes?
-		 (set-interrupt-enables! mask)
 		 event:inferior-thread-output)
 		((process-output-available?)
-		 (set-interrupt-enables! mask)
 		 event:process-output)
 		((process-status-changes?)
-		 (set-interrupt-enables! mask)
 		 event:process-status)
 		(else
 		 (let ((flag
 			(test-for-io-on-descriptor
 			 ;; console-channel-descriptor here
 			 ;; means "input from message queue".
-			 console-channel-descriptor block? 'READ)))
-		   (set-interrupt-enables! mask)
+			 console-channel-descriptor block? 'READ tick)))
 		   (case flag
 		     ((#F) #f)
 		     ((PROCESS-STATUS-CHANGE) event:process-status)
diff --git a/src/edwin/xterm.scm b/src/edwin/xterm.scm
index 416ebd8bf..44b23ae3e 100644
--- a/src/edwin/xterm.scm
+++ b/src/edwin/xterm.scm
@@ -544,23 +544,18 @@ USA.
 (define (read-event-1 display block?)
   (or (x-display-process-events display 2)
       (let loop ()
-	(let ((interrupt-mask (set-interrupt-enables! interrupt-mask/gc-ok)))
+	(let ((tick (subprocess-global-status-tick)))
 	  (cond (inferior-thread-changes?
-		 (set-interrupt-enables! interrupt-mask)
 		 event:inferior-thread-output)
 		((process-output-available?)
-		 (set-interrupt-enables! interrupt-mask)
 		 event:process-output)
 		((process-status-changes?)
-		 (set-interrupt-enables! interrupt-mask)
 		 event:process-status)
 		(else
 		 (let ((flag
 			(test-for-io-on-descriptor
 			 (x-display-descriptor display)
-			 block?
-			 'READ)))
-		   (set-interrupt-enables! interrupt-mask)
+			 block? 'READ tick)))
 		   (case flag
 		     ((#F) #f)
 		     ((PROCESS-STATUS-CHANGE) event:process-status)
diff --git a/src/microcode/ossmp.h b/src/microcode/ossmp.h
index 47185bbef..fc6309318 100644
--- a/src/microcode/ossmp.h
+++ b/src/microcode/ossmp.h
@@ -61,6 +61,7 @@ struct processor {
 };
 
 extern processor_t *processors;
+extern processor_t *smp_io_blocked;
 extern __thread processor_t *self;
 
 extern void setup_processors (int count);
@@ -71,6 +72,7 @@ extern bool smp_gc_started (void);
 extern void smp_gc_finished (void);
 
 extern void smp_kill_gc (processor_t *);
+extern void smp_kill_subprocess (processor_t *);
 extern void smp_kill_timer (processor_t *);
 extern void smp_timer_interrupt (void);
 
diff --git a/src/microcode/prossmp.c b/src/microcode/prossmp.c
index 4e70cbf6d..90f426aee 100644
--- a/src/microcode/prossmp.c
+++ b/src/microcode/prossmp.c
@@ -59,6 +59,9 @@ static pthread_mutex_t thread_mutex = MUTEX_INITIALIZER;
 /* The current pthread's processor. */
 __thread processor_t *self;
 
+/* The io-waiter's processor when it is blocked in test-select-registry. */
+processor_t *smp_io_blocked = NULL;
+
 extern int saved_processor_count;
 extern int saved_processor_heap_size;
 extern int saved_stack_size;
@@ -199,6 +202,7 @@ setup_processors (int count)
 {
   make_processors (count-1);
 
+  smp_io_blocked = NULL;
   self = processors;
   assert (self->id == 0);
   self->pthread = pthread_self ();
diff --git a/src/microcode/uxio.c b/src/microcode/uxio.c
index 3bd0c5dbb..092f29948 100644
--- a/src/microcode/uxio.c
+++ b/src/microcode/uxio.c
@@ -629,7 +629,13 @@ safe_poll (struct pollfd *fds, nfds_t nfds, int blockp)
 	}
       else
 	{
+#ifdef ENABLE_SMP
+	  smp_io_blocked = self;
+#endif
 	  n = (UX_ppoll (fds, nfds, NULL, &old));
+#ifdef ENABLE_SMP
+	  smp_io_blocked = NULL;
+#endif
 	}
       UX_sigprocmask (SIG_SETMASK, &old, NULL);
     }
diff --git a/src/microcode/uxproc.c b/src/microcode/uxproc.c
index 06266a3ac..eb08702f7 100644
--- a/src/microcode/uxproc.c
+++ b/src/microcode/uxproc.c
@@ -762,6 +762,15 @@ subprocess_death (pid_t pid, int * status)
 {
   Tprocess process;
   LOCK();
+#ifdef ENABLE_SMP
+  if (smp_io_blocked != NULL
+      && smp_io_blocked != self)
+    {
+      smp_kill_subprocess (smp_io_blocked);
+      UNLOCK();
+      return;
+    }
+#endif
   process = (find_process (pid));
   if (process != NO_PROCESS)
     {
diff --git a/src/microcode/uxsig.c b/src/microcode/uxsig.c
index 8f351904a..4d7d806e9 100644
--- a/src/microcode/uxsig.c
+++ b/src/microcode/uxsig.c
@@ -520,6 +520,12 @@ smp_kill_gc (processor_t *p)
   pthread_kill (p->pthread, SIGUSR2);
 }
 
+void
+smp_kill_subprocess (processor_t *p)
+{
+  pthread_kill (p->pthread, SIGCHLD);
+}
+
 static volatile processor_t *next_timer = NULL;
 static pthread_mutex_t nt_mutex = MUTEX_INITIALIZER;
 #ifdef ENABLE_DEBUGGING_TOOLS
diff --git a/src/runtime/io.scm b/src/runtime/io.scm
index 1c0cff227..3f0720812 100644
--- a/src/runtime/io.scm
+++ b/src/runtime/io.scm
@@ -175,14 +175,14 @@ USA.
 
 (define (channel-read channel buffer start end)
   (let loop ()
-    (let ((n (%channel-read channel buffer start end)))
+    (let ((n (%channel-read channel buffer start end #f)))
       (if (eq? n #t)
 	  (if (channel-blocking? channel)
 	      (loop)
 	      #f)
 	  n))))
 
-(define (%channel-read channel buffer start end)
+(define (%channel-read channel buffer start end subprocess-tick)
   (let ((do-read
 	 (lambda ()
 	   ((ucode-primitive channel-read 4)
@@ -194,7 +194,8 @@ USA.
 	    end))))
     (declare (integrate-operator do-read))
     (if (and have-select? (not (channel-type=file? channel)))
-	(let ((result (test-for-io-on-channel channel 'READ)))
+	(let ((result (test-for-io-on-channel channel 'READ
+					      #f subprocess-tick)))
 	  (case result
 	    ((READ HANGUP ERROR) (do-read))
 	    ((#F) #f)
@@ -223,7 +224,7 @@ USA.
 	    end))))
     (declare (integrate-operator do-write))
     (if (and have-select? (not (channel-type=file? channel)))
-	(let ((result (test-for-io-on-channel channel 'WRITE)))
+	(let ((result (test-for-io-on-channel channel 'WRITE #f #f)))
 	  (case result
 	    ((WRITE HANGUP ERROR) (do-write))
 	    ((#F) 0)
@@ -528,32 +529,39 @@ USA.
 	(set-select-registry-length! registry rl)
 	rl)))
 
-(define (test-for-io-on-channel channel mode #!optional block?)
+(define (test-for-io-on-channel channel mode #!optional block? subprocess-tick)
   (test-for-io-on-descriptor (channel-descriptor-for-select channel)
 			     (if (default-object? block?)
 				 (channel-blocking? channel)
 				 block?)
-			     mode))
+			     mode
+			     (if (default-object? subprocess-tick)
+				 #f
+				 subprocess-tick)))
 
 (define (channel-has-input? channel)
-  (let ((descriptor (channel-descriptor-for-select channel)))
-    (let loop ()
-      (let ((mode (test-select-descriptor descriptor 'READ)))
-	(if (pair? mode)
-	    (or (eq? (car mode) 'READ)
-		(eq? (car mode) 'READ/WRITE))
-	    (loop))))))
+  (let loop ()
+    (let ((mode (test-select-descriptor (channel-descriptor-for-select channel)
+					'READ)))
+      (if (pair? mode)
+	  (or (eq? (car mode) 'READ)
+	      (eq? (car mode) 'READ/WRITE))
+	  (loop)))))
 
 (define-integrable (channel-descriptor-for-select channel)
   ((ucode-primitive channel-descriptor 1) (channel-descriptor channel)))
 
-(define (test-for-io-on-descriptor descriptor block? mode)
+(define (test-for-io-on-descriptor descriptor block? mode
+				   #!optional subprocess-tick)
   (or (let ((rmode (test-select-descriptor descriptor mode)))
 	(if (pair? rmode)
 	    (simplify-select-registry-mode rmode)
 	    rmode))
       (and block?
-	   (block-on-io-descriptor descriptor mode))))
+	   (block-on-io-descriptor descriptor mode
+				   (if (default-object? subprocess-tick)
+				       #f
+				       subprocess-tick)))))
 
 (define (test-select-descriptor descriptor mode)
   (let ((result
diff --git a/src/runtime/os2graph.scm b/src/runtime/os2graph.scm
index d3292a840..73f02f107 100644
--- a/src/runtime/os2graph.scm
+++ b/src/runtime/os2graph.scm
@@ -860,7 +860,7 @@ USA.
        (if (queue-empty? user-event-queue)
 	   (begin
 	     (if (eq? 'READ
-		      (test-for-io-on-descriptor event-descriptor #t 'READ))
+		      (test-for-io-on-descriptor event-descriptor #t 'READ #f))
 		 (read-and-process-event))
 	     (loop))
 	   (dequeue! user-event-queue))))))
diff --git a/src/runtime/runtime.pkg b/src/runtime/runtime.pkg
index 80d6355a7..7854ff398 100644
--- a/src/runtime/runtime.pkg
+++ b/src/runtime/runtime.pkg
@@ -5087,6 +5087,8 @@ USA.
 	  make-population/unsafe)
   (import (runtime 1d-property)
 	  make-1d-table/unsafe)
+  (import (runtime subprocess)
+	  global-status-tick)
   (export (runtime interrupt-handler)
 	  thread-timer-interrupt-handler)
   (export (runtime primitive-io)
diff --git a/src/runtime/socket.scm b/src/runtime/socket.scm
index c43d05ad7..e149762b3 100644
--- a/src/runtime/socket.scm
+++ b/src/runtime/socket.scm
@@ -70,9 +70,8 @@ USA.
 	     (let ((do-test
 		    (lambda (k)
 		      (let ((result
-			     (test-for-io-on-channel server-socket
-						     'READ
-						     block?)))
+			     (test-for-io-on-channel server-socket 'READ
+						     block? #f)))
 			(case result
 			  ((READ)
 			   (open-channel
diff --git a/src/runtime/thread.scm b/src/runtime/thread.scm
index 330141942..a5f48846c 100644
--- a/src/runtime/thread.scm
+++ b/src/runtime/thread.scm
@@ -361,20 +361,26 @@ USA.
   (%unlock))
 
 (define (suspend-current-thread)
+  (%suspend-current-thread #f))
+
+(define (%suspend-current-thread subprocess-tick)
   (without-interrupts
    (lambda ()
      (let* ((id (%id))
 	    (thread (%current-thread id)))
-       (%trace ";"id" suspend-current-thread "thread"\n")
+       (%trace ";"id" %suspend-current-thread "thread"\n")
        (%lock)
-       (%suspend-thread thread)))))
+       (%suspend-thread thread subprocess-tick)))))
 
-(define (%suspend-thread thread)
+(define (%suspend-thread thread subprocess-tick)
   (%trace ";"(%%id)" %suspend-thread "thread"\n")
   (assert-locked '%suspend-thread)
   (let ((block-events? (thread/block-events? thread)))
     (set-thread/block-events?! thread #f)
     (maybe-signal-io-thread-events)
+    (if (and subprocess-tick
+	     (not (eq? global-status-tick subprocess-tick)))
+	(signal-early-subprocess-event thread))
     (let ((any-events? (handle-thread-events thread)))
       (set-thread/block-events?! thread block-events?)
       (if any-events?
@@ -750,7 +756,7 @@ USA.
   (%%trace ";"(%%id)" maybe-signal-subprocess-status\n")
   (%handle-subprocess-status-change))
 
-(define (block-on-io-descriptor descriptor mode)
+(define (block-on-io-descriptor descriptor mode subprocess-tick)
   (let ((result 'INTERRUPT)
 	(thread (current-thread)))
     (let ((registration-1 (make-tentry
@@ -774,7 +780,7 @@ USA.
 	    (%maybe-toggle-thread-timer)
 	    (%maybe-wake-io-waiter))))
        (lambda ()
-	 (suspend-current-thread)
+	 (%suspend-current-thread subprocess-tick)
 	 result)
        (lambda ()
 	 (with-threads-locked
@@ -1003,6 +1009,26 @@ USA.
 	      ((not (pair? events)))
 	    (%signal-thread-event (caar events) (cdar events)))))))
 
+(define (signal-early-subprocess-event thread)
+  (assert-locked 'signal-early-subprocess-event)
+  (let ((dentry (let loop ((dentry io-registrations))
+		  (and dentry
+		       (if (eqv? 'PROCESS-STATUS-CHANGE
+				 (dentry/descriptor dentry))
+			   dentry
+			   (loop (dentry/next dentry)))))))
+    (if dentry
+	(let loop ((tentry (dentry/first-tentry dentry)))
+	  (if (eq? thread (tentry/thread tentry))
+	      (begin
+		(delete-tentry! tentry)
+		(%signal-thread-event thread (let ((e (tentry/event tentry)))
+					       (and e
+						    (lambda () (e 'READ))))))
+	      (let ((n (tentry/next tentry)))
+		(if n
+		    (loop n))))))))
+
 (define (delete-tentry! tentry)
   (assert-locked 'delete-tentry!)
   (let ((dentry (tentry/dentry tentry))
@@ -1388,7 +1414,7 @@ USA.
       (begin
 	(ring/enqueue (thread-mutex/waiting-threads mutex) thread)
 	(do () ((eq? thread (thread-mutex/owner mutex)))
-	  (%suspend-thread thread)
+	  (%suspend-thread thread #f)
 	  (%lock)))
       (set-thread-mutex/owner! mutex thread)))
 
diff --git a/src/runtime/x11graph.scm b/src/runtime/x11graph.scm
index 3b87d3917..13c9867d1 100644
--- a/src/runtime/x11graph.scm
+++ b/src/runtime/x11graph.scm
@@ -323,7 +323,8 @@ USA.
 		       (test-for-io-on-descriptor
 			(x-display-descriptor (x-display/xd display))
 			#t
-			'READ))
+			'READ
+			#f))
 		  (x-display-process-events (x-display/xd display) 1)))))
     (if event
 	(process-event display event))))