Caller: x-graphics/close-window
Used the display-finalizer's mutex to serialize the callers.
+\f
+
+* Main Loops
+
+The analysis of the subprocess-global-status-tick procedure suggested
+that "How soon other threads observe the new tick actually makes
+little difference." The reason is that the procedure cannot be used
+for its intended purpose as exemplified by several application main
+loops including Edwin's. Thus it is merely advisory: "At some point
+recently, this was the value of the runtime's subprocess status tick."
+
+Subprocess-global-status-tick is used by main loops that are managing
+subprocesses. They use it to quickly check whether any statuses have
+changed since they last polled their subprocesses. Edwin's main loop
+on all display types (x11, os2, win32 and tty) currently suspends in
+block-on-io-descriptor which also wakes edwin-thread when any
+subprocess status changes.
+
+Unfortunately in an SMPing world there is room for a race. The
+edwin-thread needs to check its subprocesses' statuses and suspend,
+atomically, but another processor can asynchronously receive a
+SIGCHLD, collect the child's results, and tick the microcode's global
+subprocess status sync tick BEFORE the edwin thread can suspend,
+winning the race and beating Edwin, who has suspended without
+servicing the subprocess status change.
+
+Without-interrupts was used to avoid the race, but without-interrupts
+is not atomic in an SMPing world. All of these main loops need some
+other way to suspend without missing ticks. If suspend-current-thread
+knew when the loop last looked (the tick current at the START of the
+loop), it could avoid suspending the thread if new changes had
+occurred since then.
+
+The last-tick-I-saw argument comes from the main loop and becomes a
+"subprocess-tick" parameter in test-for-io-on-descriptor, block-on-io-
+descriptor and %suspend-current-thread. %Suspend-current-thread
+checks the tick from the main loop against the runtime's current tick.
+If it is up-to-date the thread is suspended and the processor looks
+for other work. If there is no other work and no io-waiter, the
+processor claims the title and blocks in test-select-registry.
+
+The test-select-registry primitive atomically tests and blocks using
+pselect/ppoll, but another pthread might receive the SIGCHLD signal.
+If the io-waiter does not get the SIGCHLD it will stay blocked in
+test-select-registry. To avoid this, the SIGCHLD handler forwards the
+signal to the io-waiter's pthread. It knows there is a blocked
+io-waiter by consulting a new variable: smp_io_blocked.
+
+So the test-select-registry primitive must first mask SIGCHLD, grab
+the process_table_mutex, and compare ticks. If the ticks do not
+match, it releases the mutex and returns 'process-status-change. If
+the ticks match, it sets smp_io_blocked, releases the mutex, and calls
+pselect/ppoll. The pselect/ppoll unmasks SIGCHLD and blocks
+atomically. If a SIGCHLD arrived after SIGCHLD was first masked, it
+will be delivered then, unblocking the pselect/ppoll.
+
+If a processor gets a SIGCHLD, its handler must grab the process_
+table_mutex, collect the child's status, and either increment the tick
+or forward the SIGCHLD to the smp_io_blocked, then release the mutex.
+This could happen after smp_io_blocked released the mutex and before
+it blocked, but the forwarded SIGCHLD will be unmasked by
+pselect/ppoll. This could also happen after smp_io_blocked has
+unblocked and before it has re-acquired the mutex to clear the
+variable, but no harm is done. The primitive still returns INTERRUPT
+and will be called again.
(define (read-event-1 block?)
(or (os2win-get-event event-descriptor #f)
(let loop ()
- (let ((interrupt-mask (set-interrupt-enables! interrupt-mask/gc-ok)))
+ (let ((tick (subprocess-global-status-tick)))
(cond (inferior-thread-changes?
- (set-interrupt-enables! interrupt-mask)
event:inferior-thread-output)
((process-output-available?)
- (set-interrupt-enables! interrupt-mask)
event:process-output)
((process-status-changes?)
- (set-interrupt-enables! interrupt-mask)
event:process-status)
(else
(let ((flag
(test-for-io-on-descriptor event-descriptor
block?
- 'READ)))
- (set-interrupt-enables! interrupt-mask)
+ 'READ tick)))
(case flag
((#F) #f)
((PROCESS-STATUS-CHANGE) event:process-status)
(find (cdr key-pairs)
possible-pending?))))))))))
(read-more? ; -> #F or #T if some octets were read
- (named-lambda (read-more? block?)
+ (named-lambda (read-more? block? subprocess-tick)
(if block?
(channel-blocking channel)
(channel-nonblocking channel))
- (let ((n (%channel-read channel buffer end input-buffer-size)))
+ (let ((n (%channel-read channel buffer end input-buffer-size
+ subprocess-tick)))
(cond ((not n) #F)
((eq? n #T) #F)
((fix:> n 0)
(named-lambda (match-event block?)
(let loop ()
(or (begin
- (read-more? #f)
+ (read-more? #f #f)
(match-key))
- ;; Atomically poll async event sources and block.
- (let ((mask (set-interrupt-enables! interrupt-mask/gc-ok)))
+ ;; Poll async event sources and block.
+ (let ((tick (subprocess-global-status-tick)))
(cond (inferior-thread-changes?
- (set-interrupt-enables! mask)
(or (->update-event (accept-thread-output))
(loop)))
((process-output-available?)
- (set-interrupt-enables! mask)
(or (->update-event (accept-process-output))
(loop)))
((process-status-changes?)
- (set-interrupt-enables! mask)
(or (->update-event (handle-process-status-changes))
(loop)))
((not have-select?)
- (set-interrupt-enables! mask)
(and block? (loop)))
(incomplete-pending
;; Must busy-wait.
- (set-interrupt-enables! mask)
(loop))
(block?
- (read-more? #t)
- (set-interrupt-enables! mask)
+ (read-more? #t tick)
(loop))
(else
- (set-interrupt-enables! mask)
#f)))))))
(->update-event
(named-lambda (->update-event redisplay?)
(values
(named-lambda (halt-update?)
(or (fix:< start end)
- (read-more? #f)))
+ (read-more? #f #f)))
(named-lambda (peek-no-hang)
(let ((event (->event (match-event #f))))
(if (input-event? event)
(define (read-event-1 block?)
(or (read-event-2)
(let loop ()
- (let ((mask (set-interrupt-enables! interrupt-mask/gc+win32)))
+ (let ((tick (subprocess-global-status-tick)))
(cond (inferior-thread-changes?
- (set-interrupt-enables! mask)
event:inferior-thread-output)
((process-output-available?)
- (set-interrupt-enables! mask)
event:process-output)
((process-status-changes?)
- (set-interrupt-enables! mask)
event:process-status)
(else
(let ((flag
(test-for-io-on-descriptor
;; console-channel-descriptor here
;; means "input from message queue".
- console-channel-descriptor block? 'READ)))
- (set-interrupt-enables! mask)
+ console-channel-descriptor block? 'READ tick)))
(case flag
((#F) #f)
((PROCESS-STATUS-CHANGE) event:process-status)
(define (read-event-1 display block?)
(or (x-display-process-events display 2)
(let loop ()
- (let ((interrupt-mask (set-interrupt-enables! interrupt-mask/gc-ok)))
+ (let ((tick (subprocess-global-status-tick)))
(cond (inferior-thread-changes?
- (set-interrupt-enables! interrupt-mask)
event:inferior-thread-output)
((process-output-available?)
- (set-interrupt-enables! interrupt-mask)
event:process-output)
((process-status-changes?)
- (set-interrupt-enables! interrupt-mask)
event:process-status)
(else
(let ((flag
(test-for-io-on-descriptor
(x-display-descriptor display)
- block?
- 'READ)))
- (set-interrupt-enables! interrupt-mask)
+ block? 'READ tick)))
(case flag
((#F) #f)
((PROCESS-STATUS-CHANGE) event:process-status)
};
extern processor_t *processors;
+extern processor_t *smp_io_blocked;
extern __thread processor_t *self;
extern void setup_processors (int count);
extern void smp_gc_finished (void);
extern void smp_kill_gc (processor_t *);
+extern void smp_kill_subprocess (processor_t *);
extern void smp_kill_timer (processor_t *);
extern void smp_timer_interrupt (void);
/* The current pthread's processor. */
__thread processor_t *self;
+/* The io-waiter's processor when it is blocked in test-select-registry. */
+processor_t *smp_io_blocked = NULL;
+
extern int saved_processor_count;
extern int saved_processor_heap_size;
extern int saved_stack_size;
{
make_processors (count-1);
+ smp_io_blocked = NULL;
self = processors;
assert (self->id == 0);
self->pthread = pthread_self ();
}
else
{
+#ifdef ENABLE_SMP
+ smp_io_blocked = self;
+#endif
n = (UX_ppoll (fds, nfds, NULL, &old));
+#ifdef ENABLE_SMP
+ smp_io_blocked = NULL;
+#endif
}
UX_sigprocmask (SIG_SETMASK, &old, NULL);
}
{
Tprocess process;
LOCK();
+#ifdef ENABLE_SMP
+ if (smp_io_blocked != NULL
+ && smp_io_blocked != self)
+ {
+ smp_kill_subprocess (smp_io_blocked);
+ UNLOCK();
+ return;
+ }
+#endif
process = (find_process (pid));
if (process != NO_PROCESS)
{
pthread_kill (p->pthread, SIGUSR2);
}
+void
+smp_kill_subprocess (processor_t *p)
+{
+ pthread_kill (p->pthread, SIGCHLD);
+}
+
static volatile processor_t *next_timer = NULL;
static pthread_mutex_t nt_mutex = MUTEX_INITIALIZER;
#ifdef ENABLE_DEBUGGING_TOOLS
\f
(define (channel-read channel buffer start end)
(let loop ()
- (let ((n (%channel-read channel buffer start end)))
+ (let ((n (%channel-read channel buffer start end #f)))
(if (eq? n #t)
(if (channel-blocking? channel)
(loop)
#f)
n))))
-(define (%channel-read channel buffer start end)
+(define (%channel-read channel buffer start end subprocess-tick)
(let ((do-read
(lambda ()
((ucode-primitive channel-read 4)
end))))
(declare (integrate-operator do-read))
(if (and have-select? (not (channel-type=file? channel)))
- (let ((result (test-for-io-on-channel channel 'READ)))
+ (let ((result (test-for-io-on-channel channel 'READ
+ #f subprocess-tick)))
(case result
((READ HANGUP ERROR) (do-read))
((#F) #f)
end))))
(declare (integrate-operator do-write))
(if (and have-select? (not (channel-type=file? channel)))
- (let ((result (test-for-io-on-channel channel 'WRITE)))
+ (let ((result (test-for-io-on-channel channel 'WRITE #f #f)))
(case result
((WRITE HANGUP ERROR) (do-write))
((#F) 0)
(set-select-registry-length! registry rl)
rl)))
\f
-(define (test-for-io-on-channel channel mode #!optional block?)
+(define (test-for-io-on-channel channel mode #!optional block? subprocess-tick)
(test-for-io-on-descriptor (channel-descriptor-for-select channel)
(if (default-object? block?)
(channel-blocking? channel)
block?)
- mode))
+ mode
+ (if (default-object? subprocess-tick)
+ #f
+ subprocess-tick)))
(define (channel-has-input? channel)
- (let ((descriptor (channel-descriptor-for-select channel)))
- (let loop ()
- (let ((mode (test-select-descriptor descriptor 'READ)))
- (if (pair? mode)
- (or (eq? (car mode) 'READ)
- (eq? (car mode) 'READ/WRITE))
- (loop))))))
+ (let loop ()
+ (let ((mode (test-select-descriptor (channel-descriptor-for-select channel)
+ 'READ)))
+ (if (pair? mode)
+ (or (eq? (car mode) 'READ)
+ (eq? (car mode) 'READ/WRITE))
+ (loop)))))
(define-integrable (channel-descriptor-for-select channel)
((ucode-primitive channel-descriptor 1) (channel-descriptor channel)))
-(define (test-for-io-on-descriptor descriptor block? mode)
+(define (test-for-io-on-descriptor descriptor block? mode
+ #!optional subprocess-tick)
(or (let ((rmode (test-select-descriptor descriptor mode)))
(if (pair? rmode)
(simplify-select-registry-mode rmode)
rmode))
(and block?
- (block-on-io-descriptor descriptor mode))))
+ (block-on-io-descriptor descriptor mode
+ (if (default-object? subprocess-tick)
+ #f
+ subprocess-tick)))))
(define (test-select-descriptor descriptor mode)
(let ((result
(if (queue-empty? user-event-queue)
(begin
(if (eq? 'READ
- (test-for-io-on-descriptor event-descriptor #t 'READ))
+ (test-for-io-on-descriptor event-descriptor #t 'READ #f))
(read-and-process-event))
(loop))
(dequeue! user-event-queue))))))
make-population/unsafe)
(import (runtime 1d-property)
make-1d-table/unsafe)
+ (import (runtime subprocess)
+ global-status-tick)
(export (runtime interrupt-handler)
thread-timer-interrupt-handler)
(export (runtime primitive-io)
(let ((do-test
(lambda (k)
(let ((result
- (test-for-io-on-channel server-socket
- 'READ
- block?)))
+ (test-for-io-on-channel server-socket 'READ
+ block? #f)))
(case result
((READ)
(open-channel
(%unlock))
(define (suspend-current-thread)
+ (%suspend-current-thread #f))
+
+(define (%suspend-current-thread subprocess-tick)
(without-interrupts
(lambda ()
(let* ((id (%id))
(thread (%current-thread id)))
- (%trace ";"id" suspend-current-thread "thread"\n")
+ (%trace ";"id" %suspend-current-thread "thread"\n")
(%lock)
- (%suspend-thread thread)))))
+ (%suspend-thread thread subprocess-tick)))))
-(define (%suspend-thread thread)
+(define (%suspend-thread thread subprocess-tick)
(%trace ";"(%%id)" %suspend-thread "thread"\n")
(assert-locked '%suspend-thread)
(let ((block-events? (thread/block-events? thread)))
(set-thread/block-events?! thread #f)
(maybe-signal-io-thread-events)
+ (if (and subprocess-tick
+ (not (eq? global-status-tick subprocess-tick)))
+ (signal-early-subprocess-event thread))
(let ((any-events? (handle-thread-events thread)))
(set-thread/block-events?! thread block-events?)
(if any-events?
(%%trace ";"(%%id)" maybe-signal-subprocess-status\n")
(%handle-subprocess-status-change))
-(define (block-on-io-descriptor descriptor mode)
+(define (block-on-io-descriptor descriptor mode subprocess-tick)
(let ((result 'INTERRUPT)
(thread (current-thread)))
(let ((registration-1 (make-tentry
(%maybe-toggle-thread-timer)
(%maybe-wake-io-waiter))))
(lambda ()
- (suspend-current-thread)
+ (%suspend-current-thread subprocess-tick)
result)
(lambda ()
(with-threads-locked
((not (pair? events)))
(%signal-thread-event (caar events) (cdar events)))))))
+(define (signal-early-subprocess-event thread)
+ (assert-locked 'signal-early-subprocess-event)
+ (let ((dentry (let loop ((dentry io-registrations))
+ (and dentry
+ (if (eqv? 'PROCESS-STATUS-CHANGE
+ (dentry/descriptor dentry))
+ dentry
+ (loop (dentry/next dentry)))))))
+ (if dentry
+ (let loop ((tentry (dentry/first-tentry dentry)))
+ (if (eq? thread (tentry/thread tentry))
+ (begin
+ (delete-tentry! tentry)
+ (%signal-thread-event thread (let ((e (tentry/event tentry)))
+ (and e
+ (lambda () (e 'READ))))))
+ (let ((n (tentry/next tentry)))
+ (if n
+ (loop n))))))))
+
(define (delete-tentry! tentry)
(assert-locked 'delete-tentry!)
(let ((dentry (tentry/dentry tentry))
(begin
(ring/enqueue (thread-mutex/waiting-threads mutex) thread)
(do () ((eq? thread (thread-mutex/owner mutex)))
- (%suspend-thread thread)
+ (%suspend-thread thread #f)
(%lock)))
(set-thread-mutex/owner! mutex thread)))
(test-for-io-on-descriptor
(x-display-descriptor (x-display/xd display))
#t
- 'READ))
+ 'READ
+ #f))
(x-display-process-events (x-display/xd display) 1)))))
(if event
(process-event display event))))