From d6d51078e6636a396cb872ee71d1600b394990ae Mon Sep 17 00:00:00 2001 From: Matt Birkholz Date: Thu, 13 Nov 2014 15:59:23 -0700 Subject: [PATCH] runtime/thread-queue.scm: Use a mutex to serialize accesses. Also, use #f events. The (lambda () unspecific) events accumulate unnecessarily. --- src/runtime/runtime.pkg | 1 + src/runtime/thread-queue.scm | 73 +++++++++++++++++++++++++++--------- src/runtime/thread.scm | 19 ++++++++++ 3 files changed, 75 insertions(+), 18 deletions(-) diff --git a/src/runtime/runtime.pkg b/src/runtime/runtime.pkg index 98107c5ac..072c505d5 100644 --- a/src/runtime/runtime.pkg +++ b/src/runtime/runtime.pkg @@ -5044,6 +5044,7 @@ USA. with-create-thread-continuation with-thread-events-blocked with-thread-mutex-locked + with-thread-mutex-unlocked with-thread-timer-stopped yield-current-thread) (export (runtime interrupt-handler) diff --git a/src/runtime/thread-queue.scm b/src/runtime/thread-queue.scm index 0d67a6276..a60b8a381 100644 --- a/src/runtime/thread-queue.scm +++ b/src/runtime/thread-queue.scm @@ -37,8 +37,8 @@ USA. ;;; an item. Thread-queue/peek-no-hang returns #F if the queue is ;;; empty. ;;; -;;; If multiple threads block on a thread-queue, bad mojo is afoot. -;;; They are ALL restarted whenever an item becomes available and, +;;; If multiple threads block dequeuing, bad mojo is afoot(?). They +;;; are ALL restarted whenever an item becomes available and, ;;; depending on the thread timer interrupts, ANY ONE of them may be ;;; able to dequeue the item. @@ -54,10 +54,35 @@ USA. element-count max-elements waiting-queuers - waiting-dequeuers) + waiting-dequeuers + mutex) (define-guarantee thread-queue "a thread-queue") +(define-syntax %assert + (syntax-rules () + ((_ CONDITION) + #f))) + +#;(define-syntax %assert + (syntax-rules () + ((_ CONDITION) + (if (not CONDITION) + (error "Assertion failed:" 'CONDITION))))) + +(define-integrable (%locked? queue) + (thread-mutex-owner (%thread-queue/mutex queue))) + +(define (with-queue-locked queue thunk) + (with-thread-mutex-locked (%thread-queue/mutex queue) + (lambda () + (with-thread-events-blocked thunk)))) + +(define (with-queue-unlocked queue thunk) + (with-thread-mutex-unlocked (%thread-queue/mutex queue) + ;; suspend-current-thread will unblock (and re-block) thread-events + thunk)) + (define (print-thread-queue queue port) (write-string " elements:" port) (write-string (number->string @@ -72,18 +97,19 @@ USA. (let ((max (cond ((default-object? max-size) #f) ((integer? max-size) max-size) (else (error "Max-size must be an integer:" max-size))))) - (%make-thread-queue #f #f 0 max '() '()))) + (%make-thread-queue #f #f 0 max '() '() (make-thread-mutex)))) (define (thread-queue/empty? queue) (guarantee-thread-queue queue 'thread-queue/empty?) (%empty? queue)) (define-integrable (%empty? queue) + ;;(%assert (%locked? queue)) Seems unnecessary. (zero? (%thread-queue/element-count queue))) (define (thread-queue/empty! queue) (guarantee-thread-queue queue 'thread-queue/empty!) - (without-interrupts + (with-queue-locked queue (lambda () (if (not (%empty? queue)) (begin @@ -96,24 +122,26 @@ USA. (define (thread-queue/queue! queue item) (guarantee-thread-queue queue 'thread-queue/queue!) (if (not item) (error "Cannot queue #F:" queue)) - (without-interrupts + (with-queue-locked queue (lambda () (do () ((%queue-no-hang! queue item)) (set-%thread-queue/waiting-queuers! queue (append! (%thread-queue/waiting-queuers queue) (list (current-thread)))) - (suspend-current-thread))))) + (with-queue-unlocked queue + suspend-current-thread))))) (define (thread-queue/queue-no-hang! queue item) ;; Returns #F when QUEUE is maxed out. (guarantee-thread-queue queue 'thread-queue/queue-no-hang!) (if (not item) (error "Cannot queue #F:" queue)) - (without-interrupts + (with-queue-locked queue (lambda () (%queue-no-hang! queue item)))) (define (%queue-no-hang! queue item) + (%assert (%locked? queue)) (let ((max (%thread-queue/max-elements queue))) (if max (if (< (%thread-queue/element-count queue) max) @@ -132,7 +160,7 @@ USA. (declare (integrate-operator when-non-empty-before)) (define (when-non-empty-before time queue operation) - (without-interrupts + (with-queue-locked queue (lambda () (let loop () (if (not (%empty? queue)) @@ -144,13 +172,14 @@ USA. (set-%thread-queue/waiting-dequeuers! queue (append! (%thread-queue/waiting-dequeuers queue) (list (current-thread)))) - (register-timer-event (- time now) (lambda () unspecific)) - (suspend-current-thread) + (register-timer-event (- time now) #f) + (with-queue-unlocked queue + suspend-current-thread) (loop))))))))) (define (thread-queue/dequeue! queue) (guarantee-thread-queue queue 'thread-queue/dequeue!) - (without-interrupts + (with-queue-locked queue (lambda () (do () ((and (not (%empty? queue)) @@ -158,7 +187,8 @@ USA. (set-%thread-queue/waiting-dequeuers! queue (append! (%thread-queue/waiting-dequeuers queue) (list (current-thread)))) - (suspend-current-thread))))) + (with-queue-unlocked queue + suspend-current-thread))))) (define (thread-queue/peek-no-hang queue msec) (guarantee-thread-queue queue 'thread-queue/peek-no-hang) @@ -171,7 +201,7 @@ USA. (define (thread-queue/peek queue) (guarantee-thread-queue queue 'thread-queue/peek) - (without-interrupts + (with-queue-locked queue (lambda () (do () ((and (not (%empty? queue)) @@ -179,12 +209,15 @@ USA. (set-%thread-queue/waiting-dequeuers! queue (append! (%thread-queue/waiting-dequeuers queue) (list (current-thread)))) - (suspend-current-thread))))) + (with-queue-unlocked queue + suspend-current-thread))))) (define-integrable (%peek queue) + (%assert (%locked? queue)) (car (%thread-queue/first-pair queue))) (define (%queue! queue item) + (%assert (%locked? queue)) (let ((last (%thread-queue/last-pair queue)) (new (cons item '()))) (if last (set-cdr! last new)) @@ -197,6 +230,7 @@ USA. item) (define (%dequeue! queue) + (%assert (%locked? queue)) (let* ((first (%thread-queue/first-pair queue)) (item (car first))) (if (eq? first (%thread-queue/last-pair queue)) @@ -210,26 +244,28 @@ USA. item)) (define (%resume-queuers queue) + (%assert (%locked? queue)) (do ((queuers (%thread-queue/waiting-queuers queue) (cdr queuers))) ((null? queuers) unspecific) - (signal-thread-event (car queuers) (lambda () unspecific))) + (signal-thread-event (car queuers) #f)) (set-%thread-queue/waiting-queuers! queue '())) (define (%resume-dequeuers queue) + (%assert (%locked? queue)) (do ((dequeuers (%thread-queue/waiting-dequeuers queue) (cdr dequeuers))) ((null? dequeuers) unspecific) - (signal-thread-event (car dequeuers) (lambda () unspecific))) + (signal-thread-event (car dequeuers) #f)) (set-%thread-queue/waiting-dequeuers! queue '())) (define (thread-queue/push! queue item) ;; Place ITEM at the head of the queue, instead of the end. (guarantee-thread-queue queue 'thread-queue/push!) (if (not item) (error "Cannot queue #F:" queue)) - (without-interrupts + (with-queue-locked queue (lambda () (let ((max (%thread-queue/max-elements queue))) (if max @@ -253,6 +289,7 @@ USA. (%push! queue item)))))) (define (%push! queue item) + (%assert (%locked? queue)) (let* ((first (%thread-queue/first-pair queue)) (new (cons item first))) (set-%thread-queue/first-pair! queue new) diff --git a/src/runtime/thread.scm b/src/runtime/thread.scm index d1fe0abd9..549509246 100644 --- a/src/runtime/thread.scm +++ b/src/runtime/thread.scm @@ -1121,6 +1121,25 @@ USA. (if (and grabbed-lock? (eq? (thread-mutex/owner mutex) thread)) (%unlock-thread-mutex mutex thread)))))) +(define (with-thread-mutex-unlocked mutex thunk) + (guarantee-thread-mutex mutex 'WITH-THREAD-MUTEX-UNLOCKED) + (let ((thread (current-thread)) + (released-lock?)) + (dynamic-wind + (lambda () + (let ((owner (thread-mutex/owner mutex))) + (if (not (eq? owner thread)) + (set! released-lock? #f) + (begin + (set! released-lock? #t) + (%unlock-thread-mutex mutex owner))))) + thunk + (lambda () + (if released-lock? + (let ((owner (thread-mutex/owner mutex))) + (if (not (eq? owner thread)) + (%lock-thread-mutex mutex thread owner)))))))) + (define (%disassociate-thread-mutexes thread) (do ((mutexes (thread/mutexes thread) (cdr mutexes))) ((not (pair? mutexes))) -- 2.25.1