From 82ec59bbd8f6e80542a9816fe1403c55c7d323ab Mon Sep 17 00:00:00 2001 From: Matt Birkholz Date: Fri, 19 Dec 2014 12:29:29 -0700 Subject: [PATCH] smp: Accommodate multiple processors. Keep the threads running on each processor in the current-threads vector. Change the running list into a runnable list: the threads that are runnable but not currently running on a processor. --- src/runtime/thread.scm | 450 +++++++++++++++++++++++------------------ 1 file changed, 248 insertions(+), 202 deletions(-) diff --git a/src/runtime/thread.scm b/src/runtime/thread.scm index a550f98ae..c4c90de9d 100644 --- a/src/runtime/thread.scm +++ b/src/runtime/thread.scm @@ -24,10 +24,14 @@ USA. |# -;;;; Multiple Threads of Control +;;;; Multiple Processors of Multiple Threads of Control ;;; package: (runtime thread) (declare (usual-integrations)) + +;;; This is set at boot/restore time and allows a host without the SMP +;;; primitives to run this code. +(define enable-smp? #f) (define-structure (thread (constructor %make-thread ()) @@ -94,8 +98,8 @@ USA. (eq? 'DEAD (thread/execution-state thread))) (define thread-population) -(define first-running-thread) -(define last-running-thread) +(define first-runnable-thread) +(define last-runnable-thread) (define next-scheduled-timeout) (define root-continuation-default) @@ -103,22 +107,26 @@ USA. ;; Called early in the cold load to create the first thread. (set! root-continuation-default (make-fluid #f)) (set! thread-population (make-population)) - (set! first-running-thread #f) - (set! last-running-thread #f) + (set! first-runnable-thread #f) + (set! last-runnable-thread #f) (set! next-scheduled-timeout #f) (set! timer-records #f) (set! timer-interval 100) + (reset-threads-low!) (let ((first (%make-thread))) (set-thread/exit-value! first detached-thread-marker) (add-to-population!/unsafe thread-population first) - (set! first-running-thread first) - (set! last-running-thread first))) + (vector-set! current-threads + (if enable-smp? + ((ucode-primitive smp-id 0)) + 0) + first))) (define (initialize-high!) ;; Called later in the cold load, when more of the runtime is initialized. (initialize-error-conditions!) - (initialize-io-blocking) - (add-event-receiver! event:after-restore initialize-io-blocking) + (reset-threads-high!) + (add-event-receiver! event:after-restore reset-threads!) (add-event-receiver! event:before-exit stop-thread-timer)) (define (make-thread continuation) @@ -170,42 +178,44 @@ USA. (error:wrong-type-argument continuation "continuation" with-create-thread-continuation)) - (let-fluid root-continuation-default continuation - thunk)) + (let-fluid root-continuation-default continuation thunk)) +(define processor-count) +(define current-threads #f) + +(define-integrable (%id) + ;; To avoid task switching between accessing a processor id and + ;; using it (e.g. passing it to %current-thread), %id should be + ;; called without-interrupts. + (if (not (fix:= (get-interrupt-enables) interrupt-mask/gc-ok)) + (outf-error "\n;%id: WRONG interrupt mask!")) + (if enable-smp? + ((ucode-primitive smp-id 0)) + 0)) + +(define-integrable (%current-thread id) + (vector-ref current-threads id)) + (define (current-thread) - (or first-running-thread - (let ((thread (console-thread))) - (if thread - (call-with-current-continuation - (lambda (continuation) - (let ((condition - (make-condition condition-type:no-current-thread - continuation - 'BOUND-RESTARTS - '()))) - (signal-thread-event thread - (lambda () - (error condition))))))) - (run-first-thread)))) - -(define (call-with-current-thread return? procedure) - (let ((thread first-running-thread)) - (cond (thread (procedure thread)) - ((not return?) (run-first-thread))))) + (without-interrupts (lambda () (%current-thread (%id))))) (define (console-thread) (thread-mutex-owner (port/thread-mutex console-i/o-port))) (define (other-running-threads?) - (thread/next (current-thread))) + (or first-runnable-thread + (without-interrupts + (lambda () + (let ((id (%id))) + (let loop ((i 0)) + (and (fix:< i processor-count) + (or (and (not (fix:= i id)) + (%current-thread i)) + (loop (fix:1+ i)))))))))) (define (thread-continuation thread) (guarantee-thread thread 'THREAD-CONTINUATION) - (without-interrupts - (lambda () - (and (eq? 'WAITING (thread/execution-state thread)) - (thread/continuation thread))))) + (thread/continuation thread)) (define (thread-running thread) (%thread-running thread) @@ -213,30 +223,40 @@ USA. (define (%thread-running thread) (set-thread/execution-state! thread 'RUNNING) - (let ((prev last-running-thread)) + (let ((prev last-runnable-thread)) (if prev (set-thread/next! prev thread) - (set! first-running-thread thread))) - (set! last-running-thread thread) + (set! first-runnable-thread thread))) + (set! last-runnable-thread thread) unspecific) -(define (thread-not-running thread state) +(define (thread-not-running id thread state) + (if (not (eq? thread (%current-thread id))) + (outf-error "\n;thread-not-running: NOT CURRENT")) (set-thread/execution-state! thread state) - (let ((thread* (thread/next thread))) - (set-thread/next! thread #f) - (set! first-running-thread thread*)) - (run-first-thread)) - -(define (run-first-thread) - (if first-running-thread - (run-thread first-running-thread) - (begin - (set! last-running-thread #f) - (wait-for-io)))) + (vector-set! current-threads id #f) + (run-first-thread id)) + +(define (run-first-thread id) + (if first-runnable-thread + (let ((thread first-runnable-thread)) + (if (%current-thread id) + (outf-error "\n;run-first-thread: ALREADY running a thread!")) + (set! first-runnable-thread (thread/next thread)) + (if (not first-runnable-thread) + (set! last-runnable-thread #f) + (if (not last-runnable-thread) + (outf-error "\n;run-first-thread: lost last-runnable!"))) + (set-thread/next! thread #f) + (vector-set! current-threads id thread) + (run-thread thread)) + (wait-for-io))) (define (run-thread thread) (let ((continuation (thread/continuation thread)) (fp-env (thread/floating-point-environment thread))) + (if (not (continuation? continuation)) + (outf-error "\n;run-thread: NO CONTINUATION!")) (set-thread/continuation! thread #f) (%within-continuation continuation #t (lambda () @@ -248,38 +268,40 @@ USA. (begin (handle-thread-events thread) (set-thread/block-events?! thread #f))) - (%maybe-toggle-thread-timer)) + (%maybe-toggle-thread-timer) + (set-interrupt-enables! interrupt-mask/all)) (define (suspend-current-thread) (without-interrupts %suspend-current-thread)) (define (%suspend-current-thread) - (call-with-current-thread #f - (lambda (thread) - (let ((block-events? (thread/block-events? thread))) - (set-thread/block-events?! thread #f) - (maybe-signal-io-thread-events) - (let ((any-events? (handle-thread-events thread))) - (set-thread/block-events?! thread block-events?) - (if any-events? - (%maybe-toggle-thread-timer) - (call-with-current-continuation - (lambda (continuation) - (set-thread/continuation! thread continuation) - (maybe-save-thread-float-environment! thread) - (set-thread/block-events?! thread #f) - (thread-not-running thread 'WAITING))))))))) + (%suspend-thread (%current-thread (%id)))) + +(define (%suspend-thread thread) + (let ((block-events? (thread/block-events? thread))) + (set-thread/block-events?! thread #f) + (maybe-signal-io-thread-events) + (let ((any-events? (handle-thread-events thread))) + (set-thread/block-events?! thread block-events?) + (if any-events? + (%maybe-toggle-thread-timer) + (call-with-current-continuation + (lambda (continuation) + (set-thread/continuation! thread continuation) + (maybe-save-thread-float-environment! thread) + (set-thread/block-events?! thread #f) + (thread-not-running (%id) thread 'WAITING))))))) (define (stop-current-thread) (without-interrupts (lambda () - (call-with-current-thread #f - (lambda (thread) - (call-with-current-continuation - (lambda (continuation) - (set-thread/continuation! thread continuation) - (maybe-save-thread-float-environment! thread) - (thread-not-running thread 'STOPPED)))))))) + (let* ((id (%id)) + (thread (%current-thread id))) + (call-with-current-continuation + (lambda (continuation) + (set-thread/continuation! thread continuation) + (maybe-save-thread-float-environment! thread) + (thread-not-running id thread 'STOPPED))))))) (define (restart-thread thread discard-events? event) (guarantee-thread thread 'RESTART-THREAD) @@ -306,49 +328,50 @@ USA. ;; Preserve the floating-point environment here to guarantee that the ;; thread timer won't raise or clear exceptions (particularly the ;; inexact result exception) that the interrupted thread cares about. - (let ((fp-env (enter-default-float-environment first-running-thread))) - (set! next-scheduled-timeout #f) - (set-interrupt-enables! interrupt-mask/gc-ok) - (deliver-timer-events) - (maybe-signal-io-thread-events) - (let ((thread first-running-thread)) - (cond ((not thread) + (let* ((id (%id)) + (old (%current-thread id))) + (let ((fp-env (enter-default-float-environment old))) + (set! next-scheduled-timeout #f) + (deliver-timer-events) + (maybe-signal-io-thread-events) + (cond ((and (not first-runnable-thread) (not old)) (%maybe-toggle-thread-timer)) - ((thread/continuation thread) - (run-thread thread)) - ((not (eq? 'RUNNING-WITHOUT-PREEMPTION - (thread/execution-state thread))) - (yield-thread thread fp-env)) - (else + ((not old) + (run-first-thread id)) + ((not first-runnable-thread) + (restore-float-environment-from-default fp-env) + (%resume-current-thread old)) + ((eq? 'RUNNING-WITHOUT-PREEMPTION (thread/execution-state old)) (restore-float-environment-from-default fp-env) - (%resume-current-thread thread)))))) + (%resume-current-thread old)) + (else + (%yield-thread id old fp-env)))))) (define (yield-current-thread) (without-interrupts (lambda () - (call-with-current-thread #t - (lambda (thread) - ;; Allow preemption now, since the current thread has - ;; volunteered to yield control. - (set-thread/execution-state! thread 'RUNNING) - (yield-thread thread)))))) - -(define (yield-thread thread #!optional fp-env) - (let ((next (thread/next thread))) - (if (not next) - (begin - (if (not (default-object? fp-env)) - (restore-float-environment-from-default fp-env)) - (%resume-current-thread thread)) - (call-with-current-continuation - (lambda (continuation) - (set-thread/continuation! thread continuation) - (maybe-save-thread-float-environment! thread fp-env) - (set-thread/next! thread #f) - (set-thread/next! last-running-thread thread) - (set! last-running-thread thread) - (set! first-running-thread next) - (run-thread next)))))) + (let* ((id (%id)) + (thread (%current-thread id))) + (if thread + (let ((fp-env (enter-default-float-environment thread))) + (maybe-signal-io-thread-events) + ;; Allow preemption now, since the current thread has + ;; volunteered to yield control. + (set-thread/execution-state! thread 'RUNNING) + (%yield-thread id thread fp-env))))))) + +(define (%yield-thread id thread fp-env) + (if (not first-runnable-thread) + (begin + (restore-float-environment-from-default fp-env) + (%resume-current-thread thread)) + (call-with-current-continuation + (lambda (continuation) + (set-thread/continuation! thread continuation) + (maybe-save-thread-float-environment! thread fp-env) + (%thread-running thread) + (vector-set! current-threads id #F) + (run-first-thread id))))) (define (thread-float-environment thread) (thread/floating-point-environment thread)) @@ -357,18 +380,21 @@ USA. (set-thread/floating-point-environment! thread fp-env)) (define (exit-current-thread value) - (let ((thread (current-thread))) - (set-interrupt-enables! interrupt-mask/gc-ok) - (set-thread/block-events?! thread #t) - (ring/discard-all (thread/pending-events thread)) - (dynamic-unwind thread) - (%deregister-io-thread-events thread #t) - (%discard-thread-timer-records thread) - (%disassociate-joined-threads thread) - (%disassociate-thread-mutexes thread) - (if (eq? no-exit-value-marker (thread/exit-value thread)) - (release-joined-threads thread value)) - (thread-not-running thread 'DEAD))) + (without-interrupts + (lambda () + (let* ((id (%id)) + (thread (%current-thread id))) + (set-interrupt-enables! interrupt-mask/gc-ok) + (set-thread/block-events?! thread #t) + (ring/discard-all (thread/pending-events thread)) + (dynamic-unwind thread) + (%deregister-io-thread-events thread #t) + (%discard-thread-timer-records thread) + (%disassociate-joined-threads thread) + (%disassociate-thread-mutexes thread) + (if (eq? no-exit-value-marker (thread/exit-value thread)) + (release-joined-threads thread value)) + (thread-not-running id thread 'DEAD))))) (define (join-thread thread event-constructor) (guarantee-thread thread 'JOIN-THREAD) @@ -444,22 +470,46 @@ USA. prev next) -(define (initialize-io-blocking) - (set! io-registry (and have-select? (make-select-registry))) - (set! io-registrations #f) - unspecific) +(define (reset-threads!) + (reset-threads-low!) + (reset-threads-high!)) + +(define (reset-threads-low!) + (set! enable-smp? + (and ((ucode-primitive get-primitive-address 2) 'SMP-COUNT #f) + ((ucode-primitive smp-count 0)))) + (set! processor-count + (if enable-smp? ((ucode-primitive smp-count 0)) 1)) + (let ((len (and current-threads (vector-length current-threads)))) + (cond ((not len) + (set! current-threads (make-vector processor-count #f))) + ((fix:< len processor-count) + (set! current-threads (vector-grow current-threads + processor-count #f))) + (else + (if (not (subvector-filled? current-threads 1 len #f)) + (outf-error "\n;reset-threads restored MULTIPLE threads!")) + unspecific)))) + +(define (reset-threads-high!) + (set! io-registry (and ((ucode-primitive have-select? 0)) + (make-select-registry))) + (set! io-registrations #f)) (define (wait-for-io) (%maybe-toggle-thread-timer #f) - (let ((result - (begin - (set-interrupt-enables! interrupt-mask/all) - (test-select-registry io-registry #t)))) - (set-interrupt-enables! interrupt-mask/gc-ok) - (signal-select-result result) - (if first-running-thread - (run-thread first-running-thread) - (wait-for-io)))) + (let ((result + (begin + (set-interrupt-enables! interrupt-mask/all) + (test-select-registry io-registry #t)))) + (set-interrupt-enables! interrupt-mask/gc-ok) + (signal-select-result result) + (if first-runnable-thread + (let ((id (%id))) + (if (not (thread/continuation first-runnable-thread)) + (outf-error "\n;wait-for-io: BOGUS runnable")) + (run-first-thread id)) + (wait-for-io)))) (define (signal-select-result result) (cond ((vector? result) @@ -472,8 +522,7 @@ USA. '#(READ))))) (define (maybe-signal-io-thread-events) - (if io-registrations - (signal-select-result (test-select-registry io-registry #f)))) + (signal-select-result (test-select-registry io-registry #f))) (define (block-on-io-descriptor descriptor mode) (without-interrupts @@ -482,39 +531,39 @@ USA. (registration-1) (registration-2)) (dynamic-wind - (lambda () - (let ((thread (current-thread))) - (set! registration-1 - (%register-io-thread-event - descriptor - mode - thread - (lambda (mode) - (set! result mode) - unspecific) - #f #t)) - (set! registration-2 - (%register-io-thread-event - 'PROCESS-STATUS-CHANGE - 'READ - thread - (lambda (mode) - mode - (set! result 'PROCESS-STATUS-CHANGE) - unspecific) - #f #t))) - (%maybe-toggle-thread-timer)) - (lambda () - (%suspend-current-thread) - result) - (lambda () - (%maybe-deregister-io-thread-event registration-2) - (%maybe-deregister-io-thread-event registration-1) - (%maybe-toggle-thread-timer))))))) + (lambda () + (let ((thread (current-thread))) + (set! registration-1 + (%register-io-thread-event + descriptor + mode + thread + (lambda (mode) + (set! result mode) + unspecific) + #f #t)) + (set! registration-2 + (%register-io-thread-event + 'PROCESS-STATUS-CHANGE + 'READ + thread + (lambda (mode) + mode + (set! result 'PROCESS-STATUS-CHANGE) + unspecific) + #f #t))) + (%maybe-toggle-thread-timer)) + (lambda () + (%suspend-current-thread) + result) + (lambda () + (%maybe-deregister-io-thread-event registration-2) + (%maybe-deregister-io-thread-event registration-1) + (%maybe-toggle-thread-timer))))))) (define (%maybe-deregister-io-thread-event tentry) ;; Ensure that another thread does not unwind our registration. - (if (eq? (current-thread) (tentry/thread tentry)) + (if (eq? (%current-thread (%id)) (tentry/thread tentry)) (delete-tentry! tentry))) (define (permanently-register-io-thread-event descriptor mode thread event) @@ -745,7 +794,7 @@ USA. (define (block-thread-events) (without-interrupts (lambda () - (let ((thread first-running-thread)) + (let ((thread (%current-thread (%id)))) (if thread (let ((result (thread/block-events? thread))) (set-thread/block-events?! thread #t) @@ -755,14 +804,13 @@ USA. (define (unblock-thread-events) (without-interrupts (lambda () - (call-with-current-thread #t - (lambda (thread) - (handle-thread-events thread) - (set-thread/block-events?! thread #f)))))) + (let ((thread (%current-thread (%id)))) + (handle-thread-events thread) + (set-thread/block-events?! thread #f))))) (define (with-thread-events-blocked thunk) (let ((interrupt-mask (set-interrupt-enables! interrupt-mask/gc-ok))) - (let ((thread first-running-thread)) + (let ((thread (%current-thread (%id)))) (if thread (let ((block-events? (thread/block-events? thread))) (set-thread/block-events?! thread #t) @@ -775,7 +823,7 @@ USA. value)) with-thread-events-blocked block-events?))) - (let ((thread first-running-thread)) + (let ((thread (%current-thread (%id)))) (if thread (set-thread/block-events?! thread block-events?))) (set-interrupt-enables! interrupt-mask) @@ -787,7 +835,7 @@ USA. (define (get-thread-event-block) (without-interrupts (lambda () - (let ((thread first-running-thread)) + (let ((thread (%current-thread (%id)))) (if thread (thread/block-events? thread) #f))))) @@ -795,28 +843,29 @@ USA. (define (set-thread-event-block! block?) (without-interrupts (lambda () - (let ((thread first-running-thread)) + (let ((thread (%current-thread (%id)))) (if thread (set-thread/block-events?! thread block?))) unspecific))) (define (signal-thread-event thread event) (guarantee-thread thread 'SIGNAL-THREAD-EVENT) - (let ((self first-running-thread)) - (if (eq? thread self) - (let ((block-events? (block-thread-events))) - (%add-pending-event thread event) - (if (not block-events?) - (unblock-thread-events))) - (without-interrupts - (lambda () - (if (eq? 'DEAD (thread/execution-state thread)) - (signal-thread-dead thread "signal event to" - signal-thread-event thread event)) - (%signal-thread-event thread event) - (if (and (not self) first-running-thread) - (run-thread first-running-thread) - (%maybe-toggle-thread-timer))))))) + (without-interrupts + (lambda () + (let ((self (%current-thread (%id)))) + (if (eq? thread self) + (let ((block-events? (block-thread-events))) + (%add-pending-event thread event) + (if (not block-events?) + (unblock-thread-events))) + (begin + (if (eq? 'DEAD (thread/execution-state thread)) + (signal-thread-dead thread "signal event to" + signal-thread-event thread event)) + (%signal-thread-event thread event) + (if (and (not self) first-runnable-thread) + (run-first-thread (%id)) + (%maybe-toggle-thread-timer)))))))) (define (%signal-thread-event thread event) (%add-pending-event thread event) @@ -855,7 +904,7 @@ USA. (define (allow-thread-event-delivery) (without-interrupts (lambda () - (let ((thread first-running-thread)) + (let ((thread (%current-thread (%id)))) (if thread (let ((block-events? (thread/block-events? thread))) (set-thread/block-events?! thread #f) @@ -968,9 +1017,9 @@ USA. (if interval (guarantee-exact-positive-integer interval 'SET-THREAD-TIMER-INTERVAL!)) (without-interrupts - (lambda () - (set! timer-interval interval) - (%maybe-toggle-thread-timer)))) + (lambda () + (set! timer-interval interval) + (%maybe-toggle-thread-timer)))) (define (start-thread-timer) (without-interrupts %maybe-toggle-thread-timer)) @@ -1002,10 +1051,7 @@ USA. next-event-time))))) ((and consider-non-timers? timer-interval - (or io-registrations - (let ((current-thread first-running-thread)) - (and current-thread - (thread/next current-thread))))) + (or io-registrations first-runnable-thread)) (start (+ now timer-interval))) (else (%stop-thread-timer)))))) @@ -1063,7 +1109,7 @@ USA. (begin (ring/enqueue (thread-mutex/waiting-threads mutex) thread) (do () ((eq? thread (thread-mutex/owner mutex))) - (%suspend-current-thread))) + (%suspend-thread thread))) (set-thread-mutex/owner! mutex thread))) (define (unlock-thread-mutex mutex) @@ -1091,7 +1137,7 @@ USA. (without-interrupts (lambda () (and (not (thread-mutex/owner mutex)) - (let ((thread (current-thread))) + (let ((thread (%current-thread (%id)))) (set-thread-mutex/owner! mutex thread) (add-thread-mutex! thread mutex) #t))))) @@ -1267,4 +1313,4 @@ USA. (lambda (condition port) condition (write-string "No current thread!" port)))) - unspecific) \ No newline at end of file + unspecific) -- 2.25.1