|#
-;;;; Multiple Threads of Control
+;;;; Multiple Processors of Multiple Threads of Control
;;; package: (runtime thread)
(declare (usual-integrations))
+
+;;; This is set at boot/restore time and allows a host without the SMP
+;;; primitives to run this code.
+(define enable-smp? #f)
\f
(define-structure (thread
(constructor %make-thread ())
(eq? 'DEAD (thread/execution-state thread)))
\f
(define thread-population)
-(define first-running-thread)
-(define last-running-thread)
+(define first-runnable-thread)
+(define last-runnable-thread)
(define next-scheduled-timeout)
(define root-continuation-default)
;; Called early in the cold load to create the first thread.
(set! root-continuation-default (make-fluid #f))
(set! thread-population (make-population))
- (set! first-running-thread #f)
- (set! last-running-thread #f)
+ (set! first-runnable-thread #f)
+ (set! last-runnable-thread #f)
(set! next-scheduled-timeout #f)
(set! timer-records #f)
(set! timer-interval 100)
+ (reset-threads-low!)
(let ((first (%make-thread)))
(set-thread/exit-value! first detached-thread-marker)
(add-to-population!/unsafe thread-population first)
- (set! first-running-thread first)
- (set! last-running-thread first)))
+ (vector-set! current-threads
+ (if enable-smp?
+ ((ucode-primitive smp-id 0))
+ 0)
+ first)))
(define (initialize-high!)
;; Called later in the cold load, when more of the runtime is initialized.
(initialize-error-conditions!)
- (initialize-io-blocking)
- (add-event-receiver! event:after-restore initialize-io-blocking)
+ (reset-threads-high!)
+ (add-event-receiver! event:after-restore reset-threads!)
(add-event-receiver! event:before-exit stop-thread-timer))
(define (make-thread continuation)
(error:wrong-type-argument continuation
"continuation"
with-create-thread-continuation))
- (let-fluid root-continuation-default continuation
- thunk))
+ (let-fluid root-continuation-default continuation thunk))
\f
+(define processor-count)
+(define current-threads #f)
+
+(define-integrable (%id)
+ ;; To avoid task switching between accessing a processor id and
+ ;; using it (e.g. passing it to %current-thread), %id should be
+ ;; called without-interrupts.
+ (if (not (fix:= (get-interrupt-enables) interrupt-mask/gc-ok))
+ (outf-error "\n;%id: WRONG interrupt mask!"))
+ (if enable-smp?
+ ((ucode-primitive smp-id 0))
+ 0))
+
+(define-integrable (%current-thread id)
+ (vector-ref current-threads id))
+
(define (current-thread)
- (or first-running-thread
- (let ((thread (console-thread)))
- (if thread
- (call-with-current-continuation
- (lambda (continuation)
- (let ((condition
- (make-condition condition-type:no-current-thread
- continuation
- 'BOUND-RESTARTS
- '())))
- (signal-thread-event thread
- (lambda ()
- (error condition)))))))
- (run-first-thread))))
-
-(define (call-with-current-thread return? procedure)
- (let ((thread first-running-thread))
- (cond (thread (procedure thread))
- ((not return?) (run-first-thread)))))
+ (without-interrupts (lambda () (%current-thread (%id)))))
(define (console-thread)
(thread-mutex-owner (port/thread-mutex console-i/o-port)))
(define (other-running-threads?)
- (thread/next (current-thread)))
+ (or first-runnable-thread
+ (without-interrupts
+ (lambda ()
+ (let ((id (%id)))
+ (let loop ((i 0))
+ (and (fix:< i processor-count)
+ (or (and (not (fix:= i id))
+ (%current-thread i))
+ (loop (fix:1+ i))))))))))
(define (thread-continuation thread)
(guarantee-thread thread 'THREAD-CONTINUATION)
- (without-interrupts
- (lambda ()
- (and (eq? 'WAITING (thread/execution-state thread))
- (thread/continuation thread)))))
+ (thread/continuation thread))
(define (thread-running thread)
(%thread-running thread)
(define (%thread-running thread)
(set-thread/execution-state! thread 'RUNNING)
- (let ((prev last-running-thread))
+ (let ((prev last-runnable-thread))
(if prev
(set-thread/next! prev thread)
- (set! first-running-thread thread)))
- (set! last-running-thread thread)
+ (set! first-runnable-thread thread)))
+ (set! last-runnable-thread thread)
unspecific)
-(define (thread-not-running thread state)
+(define (thread-not-running id thread state)
+ (if (not (eq? thread (%current-thread id)))
+ (outf-error "\n;thread-not-running: NOT CURRENT"))
(set-thread/execution-state! thread state)
- (let ((thread* (thread/next thread)))
- (set-thread/next! thread #f)
- (set! first-running-thread thread*))
- (run-first-thread))
-
-(define (run-first-thread)
- (if first-running-thread
- (run-thread first-running-thread)
- (begin
- (set! last-running-thread #f)
- (wait-for-io))))
+ (vector-set! current-threads id #f)
+ (run-first-thread id))
+
+(define (run-first-thread id)
+ (if first-runnable-thread
+ (let ((thread first-runnable-thread))
+ (if (%current-thread id)
+ (outf-error "\n;run-first-thread: ALREADY running a thread!"))
+ (set! first-runnable-thread (thread/next thread))
+ (if (not first-runnable-thread)
+ (set! last-runnable-thread #f)
+ (if (not last-runnable-thread)
+ (outf-error "\n;run-first-thread: lost last-runnable!")))
+ (set-thread/next! thread #f)
+ (vector-set! current-threads id thread)
+ (run-thread thread))
+ (wait-for-io)))
\f
(define (run-thread thread)
(let ((continuation (thread/continuation thread))
(fp-env (thread/floating-point-environment thread)))
+ (if (not (continuation? continuation))
+ (outf-error "\n;run-thread: NO CONTINUATION!"))
(set-thread/continuation! thread #f)
(%within-continuation continuation #t
(lambda ()
(begin
(handle-thread-events thread)
(set-thread/block-events?! thread #f)))
- (%maybe-toggle-thread-timer))
+ (%maybe-toggle-thread-timer)
+ (set-interrupt-enables! interrupt-mask/all))
(define (suspend-current-thread)
(without-interrupts %suspend-current-thread))
(define (%suspend-current-thread)
- (call-with-current-thread #f
- (lambda (thread)
- (let ((block-events? (thread/block-events? thread)))
- (set-thread/block-events?! thread #f)
- (maybe-signal-io-thread-events)
- (let ((any-events? (handle-thread-events thread)))
- (set-thread/block-events?! thread block-events?)
- (if any-events?
- (%maybe-toggle-thread-timer)
- (call-with-current-continuation
- (lambda (continuation)
- (set-thread/continuation! thread continuation)
- (maybe-save-thread-float-environment! thread)
- (set-thread/block-events?! thread #f)
- (thread-not-running thread 'WAITING)))))))))
+ (%suspend-thread (%current-thread (%id))))
+
+(define (%suspend-thread thread)
+ (let ((block-events? (thread/block-events? thread)))
+ (set-thread/block-events?! thread #f)
+ (maybe-signal-io-thread-events)
+ (let ((any-events? (handle-thread-events thread)))
+ (set-thread/block-events?! thread block-events?)
+ (if any-events?
+ (%maybe-toggle-thread-timer)
+ (call-with-current-continuation
+ (lambda (continuation)
+ (set-thread/continuation! thread continuation)
+ (maybe-save-thread-float-environment! thread)
+ (set-thread/block-events?! thread #f)
+ (thread-not-running (%id) thread 'WAITING)))))))
(define (stop-current-thread)
(without-interrupts
(lambda ()
- (call-with-current-thread #f
- (lambda (thread)
- (call-with-current-continuation
- (lambda (continuation)
- (set-thread/continuation! thread continuation)
- (maybe-save-thread-float-environment! thread)
- (thread-not-running thread 'STOPPED))))))))
+ (let* ((id (%id))
+ (thread (%current-thread id)))
+ (call-with-current-continuation
+ (lambda (continuation)
+ (set-thread/continuation! thread continuation)
+ (maybe-save-thread-float-environment! thread)
+ (thread-not-running id thread 'STOPPED)))))))
(define (restart-thread thread discard-events? event)
(guarantee-thread thread 'RESTART-THREAD)
;; Preserve the floating-point environment here to guarantee that the
;; thread timer won't raise or clear exceptions (particularly the
;; inexact result exception) that the interrupted thread cares about.
- (let ((fp-env (enter-default-float-environment first-running-thread)))
- (set! next-scheduled-timeout #f)
- (set-interrupt-enables! interrupt-mask/gc-ok)
- (deliver-timer-events)
- (maybe-signal-io-thread-events)
- (let ((thread first-running-thread))
- (cond ((not thread)
+ (let* ((id (%id))
+ (old (%current-thread id)))
+ (let ((fp-env (enter-default-float-environment old)))
+ (set! next-scheduled-timeout #f)
+ (deliver-timer-events)
+ (maybe-signal-io-thread-events)
+ (cond ((and (not first-runnable-thread) (not old))
(%maybe-toggle-thread-timer))
- ((thread/continuation thread)
- (run-thread thread))
- ((not (eq? 'RUNNING-WITHOUT-PREEMPTION
- (thread/execution-state thread)))
- (yield-thread thread fp-env))
- (else
+ ((not old)
+ (run-first-thread id))
+ ((not first-runnable-thread)
+ (restore-float-environment-from-default fp-env)
+ (%resume-current-thread old))
+ ((eq? 'RUNNING-WITHOUT-PREEMPTION (thread/execution-state old))
(restore-float-environment-from-default fp-env)
- (%resume-current-thread thread))))))
+ (%resume-current-thread old))
+ (else
+ (%yield-thread id old fp-env))))))
(define (yield-current-thread)
(without-interrupts
(lambda ()
- (call-with-current-thread #t
- (lambda (thread)
- ;; Allow preemption now, since the current thread has
- ;; volunteered to yield control.
- (set-thread/execution-state! thread 'RUNNING)
- (yield-thread thread))))))
-
-(define (yield-thread thread #!optional fp-env)
- (let ((next (thread/next thread)))
- (if (not next)
- (begin
- (if (not (default-object? fp-env))
- (restore-float-environment-from-default fp-env))
- (%resume-current-thread thread))
- (call-with-current-continuation
- (lambda (continuation)
- (set-thread/continuation! thread continuation)
- (maybe-save-thread-float-environment! thread fp-env)
- (set-thread/next! thread #f)
- (set-thread/next! last-running-thread thread)
- (set! last-running-thread thread)
- (set! first-running-thread next)
- (run-thread next))))))
+ (let* ((id (%id))
+ (thread (%current-thread id)))
+ (if thread
+ (let ((fp-env (enter-default-float-environment thread)))
+ (maybe-signal-io-thread-events)
+ ;; Allow preemption now, since the current thread has
+ ;; volunteered to yield control.
+ (set-thread/execution-state! thread 'RUNNING)
+ (%yield-thread id thread fp-env)))))))
+
+(define (%yield-thread id thread fp-env)
+ (if (not first-runnable-thread)
+ (begin
+ (restore-float-environment-from-default fp-env)
+ (%resume-current-thread thread))
+ (call-with-current-continuation
+ (lambda (continuation)
+ (set-thread/continuation! thread continuation)
+ (maybe-save-thread-float-environment! thread fp-env)
+ (%thread-running thread)
+ (vector-set! current-threads id #F)
+ (run-first-thread id)))))
(define (thread-float-environment thread)
(thread/floating-point-environment thread))
(set-thread/floating-point-environment! thread fp-env))
\f
(define (exit-current-thread value)
- (let ((thread (current-thread)))
- (set-interrupt-enables! interrupt-mask/gc-ok)
- (set-thread/block-events?! thread #t)
- (ring/discard-all (thread/pending-events thread))
- (dynamic-unwind thread)
- (%deregister-io-thread-events thread #t)
- (%discard-thread-timer-records thread)
- (%disassociate-joined-threads thread)
- (%disassociate-thread-mutexes thread)
- (if (eq? no-exit-value-marker (thread/exit-value thread))
- (release-joined-threads thread value))
- (thread-not-running thread 'DEAD)))
+ (without-interrupts
+ (lambda ()
+ (let* ((id (%id))
+ (thread (%current-thread id)))
+ (set-interrupt-enables! interrupt-mask/gc-ok)
+ (set-thread/block-events?! thread #t)
+ (ring/discard-all (thread/pending-events thread))
+ (dynamic-unwind thread)
+ (%deregister-io-thread-events thread #t)
+ (%discard-thread-timer-records thread)
+ (%disassociate-joined-threads thread)
+ (%disassociate-thread-mutexes thread)
+ (if (eq? no-exit-value-marker (thread/exit-value thread))
+ (release-joined-threads thread value))
+ (thread-not-running id thread 'DEAD)))))
(define (join-thread thread event-constructor)
(guarantee-thread thread 'JOIN-THREAD)
prev
next)
-(define (initialize-io-blocking)
- (set! io-registry (and have-select? (make-select-registry)))
- (set! io-registrations #f)
- unspecific)
+(define (reset-threads!)
+ (reset-threads-low!)
+ (reset-threads-high!))
+
+(define (reset-threads-low!)
+ (set! enable-smp?
+ (and ((ucode-primitive get-primitive-address 2) 'SMP-COUNT #f)
+ ((ucode-primitive smp-count 0))))
+ (set! processor-count
+ (if enable-smp? ((ucode-primitive smp-count 0)) 1))
+ (let ((len (and current-threads (vector-length current-threads))))
+ (cond ((not len)
+ (set! current-threads (make-vector processor-count #f)))
+ ((fix:< len processor-count)
+ (set! current-threads (vector-grow current-threads
+ processor-count #f)))
+ (else
+ (if (not (subvector-filled? current-threads 1 len #f))
+ (outf-error "\n;reset-threads restored MULTIPLE threads!"))
+ unspecific))))
+
+(define (reset-threads-high!)
+ (set! io-registry (and ((ucode-primitive have-select? 0))
+ (make-select-registry)))
+ (set! io-registrations #f))
(define (wait-for-io)
(%maybe-toggle-thread-timer #f)
- (let ((result
- (begin
- (set-interrupt-enables! interrupt-mask/all)
- (test-select-registry io-registry #t))))
- (set-interrupt-enables! interrupt-mask/gc-ok)
- (signal-select-result result)
- (if first-running-thread
- (run-thread first-running-thread)
- (wait-for-io))))
+ (let ((result
+ (begin
+ (set-interrupt-enables! interrupt-mask/all)
+ (test-select-registry io-registry #t))))
+ (set-interrupt-enables! interrupt-mask/gc-ok)
+ (signal-select-result result)
+ (if first-runnable-thread
+ (let ((id (%id)))
+ (if (not (thread/continuation first-runnable-thread))
+ (outf-error "\n;wait-for-io: BOGUS runnable"))
+ (run-first-thread id))
+ (wait-for-io))))
\f
(define (signal-select-result result)
(cond ((vector? result)
'#(READ)))))
(define (maybe-signal-io-thread-events)
- (if io-registrations
- (signal-select-result (test-select-registry io-registry #f))))
+ (signal-select-result (test-select-registry io-registry #f)))
(define (block-on-io-descriptor descriptor mode)
(without-interrupts
(registration-1)
(registration-2))
(dynamic-wind
- (lambda ()
- (let ((thread (current-thread)))
- (set! registration-1
- (%register-io-thread-event
- descriptor
- mode
- thread
- (lambda (mode)
- (set! result mode)
- unspecific)
- #f #t))
- (set! registration-2
- (%register-io-thread-event
- 'PROCESS-STATUS-CHANGE
- 'READ
- thread
- (lambda (mode)
- mode
- (set! result 'PROCESS-STATUS-CHANGE)
- unspecific)
- #f #t)))
- (%maybe-toggle-thread-timer))
- (lambda ()
- (%suspend-current-thread)
- result)
- (lambda ()
- (%maybe-deregister-io-thread-event registration-2)
- (%maybe-deregister-io-thread-event registration-1)
- (%maybe-toggle-thread-timer)))))))
+ (lambda ()
+ (let ((thread (current-thread)))
+ (set! registration-1
+ (%register-io-thread-event
+ descriptor
+ mode
+ thread
+ (lambda (mode)
+ (set! result mode)
+ unspecific)
+ #f #t))
+ (set! registration-2
+ (%register-io-thread-event
+ 'PROCESS-STATUS-CHANGE
+ 'READ
+ thread
+ (lambda (mode)
+ mode
+ (set! result 'PROCESS-STATUS-CHANGE)
+ unspecific)
+ #f #t)))
+ (%maybe-toggle-thread-timer))
+ (lambda ()
+ (%suspend-current-thread)
+ result)
+ (lambda ()
+ (%maybe-deregister-io-thread-event registration-2)
+ (%maybe-deregister-io-thread-event registration-1)
+ (%maybe-toggle-thread-timer)))))))
(define (%maybe-deregister-io-thread-event tentry)
;; Ensure that another thread does not unwind our registration.
- (if (eq? (current-thread) (tentry/thread tentry))
+ (if (eq? (%current-thread (%id)) (tentry/thread tentry))
(delete-tentry! tentry)))
\f
(define (permanently-register-io-thread-event descriptor mode thread event)
(define (block-thread-events)
(without-interrupts
(lambda ()
- (let ((thread first-running-thread))
+ (let ((thread (%current-thread (%id))))
(if thread
(let ((result (thread/block-events? thread)))
(set-thread/block-events?! thread #t)
(define (unblock-thread-events)
(without-interrupts
(lambda ()
- (call-with-current-thread #t
- (lambda (thread)
- (handle-thread-events thread)
- (set-thread/block-events?! thread #f))))))
+ (let ((thread (%current-thread (%id))))
+ (handle-thread-events thread)
+ (set-thread/block-events?! thread #f)))))
(define (with-thread-events-blocked thunk)
(let ((interrupt-mask (set-interrupt-enables! interrupt-mask/gc-ok)))
- (let ((thread first-running-thread))
+ (let ((thread (%current-thread (%id))))
(if thread
(let ((block-events? (thread/block-events? thread)))
(set-thread/block-events?! thread #t)
value))
with-thread-events-blocked
block-events?)))
- (let ((thread first-running-thread))
+ (let ((thread (%current-thread (%id))))
(if thread
(set-thread/block-events?! thread block-events?)))
(set-interrupt-enables! interrupt-mask)
(define (get-thread-event-block)
(without-interrupts
(lambda ()
- (let ((thread first-running-thread))
+ (let ((thread (%current-thread (%id))))
(if thread
(thread/block-events? thread)
#f)))))
(define (set-thread-event-block! block?)
(without-interrupts
(lambda ()
- (let ((thread first-running-thread))
+ (let ((thread (%current-thread (%id))))
(if thread
(set-thread/block-events?! thread block?)))
unspecific)))
\f
(define (signal-thread-event thread event)
(guarantee-thread thread 'SIGNAL-THREAD-EVENT)
- (let ((self first-running-thread))
- (if (eq? thread self)
- (let ((block-events? (block-thread-events)))
- (%add-pending-event thread event)
- (if (not block-events?)
- (unblock-thread-events)))
- (without-interrupts
- (lambda ()
- (if (eq? 'DEAD (thread/execution-state thread))
- (signal-thread-dead thread "signal event to"
- signal-thread-event thread event))
- (%signal-thread-event thread event)
- (if (and (not self) first-running-thread)
- (run-thread first-running-thread)
- (%maybe-toggle-thread-timer)))))))
+ (without-interrupts
+ (lambda ()
+ (let ((self (%current-thread (%id))))
+ (if (eq? thread self)
+ (let ((block-events? (block-thread-events)))
+ (%add-pending-event thread event)
+ (if (not block-events?)
+ (unblock-thread-events)))
+ (begin
+ (if (eq? 'DEAD (thread/execution-state thread))
+ (signal-thread-dead thread "signal event to"
+ signal-thread-event thread event))
+ (%signal-thread-event thread event)
+ (if (and (not self) first-runnable-thread)
+ (run-first-thread (%id))
+ (%maybe-toggle-thread-timer))))))))
(define (%signal-thread-event thread event)
(%add-pending-event thread event)
(define (allow-thread-event-delivery)
(without-interrupts
(lambda ()
- (let ((thread first-running-thread))
+ (let ((thread (%current-thread (%id))))
(if thread
(let ((block-events? (thread/block-events? thread)))
(set-thread/block-events?! thread #f)
(if interval
(guarantee-exact-positive-integer interval 'SET-THREAD-TIMER-INTERVAL!))
(without-interrupts
- (lambda ()
- (set! timer-interval interval)
- (%maybe-toggle-thread-timer))))
+ (lambda ()
+ (set! timer-interval interval)
+ (%maybe-toggle-thread-timer))))
(define (start-thread-timer)
(without-interrupts %maybe-toggle-thread-timer))
next-event-time)))))
((and consider-non-timers?
timer-interval
- (or io-registrations
- (let ((current-thread first-running-thread))
- (and current-thread
- (thread/next current-thread)))))
+ (or io-registrations first-runnable-thread))
(start (+ now timer-interval)))
(else
(%stop-thread-timer))))))
(begin
(ring/enqueue (thread-mutex/waiting-threads mutex) thread)
(do () ((eq? thread (thread-mutex/owner mutex)))
- (%suspend-current-thread)))
+ (%suspend-thread thread)))
(set-thread-mutex/owner! mutex thread)))
(define (unlock-thread-mutex mutex)
(without-interrupts
(lambda ()
(and (not (thread-mutex/owner mutex))
- (let ((thread (current-thread)))
+ (let ((thread (%current-thread (%id))))
(set-thread-mutex/owner! mutex thread)
(add-thread-mutex! thread mutex)
#t)))))
(lambda (condition port)
condition
(write-string "No current thread!" port))))
- unspecific)
\ No newline at end of file
+ unspecific)