runtime/thread-queue.scm: Use a mutex to serialize accesses.
authorMatt Birkholz <puck@birchwood-abbey.net>
Thu, 13 Nov 2014 22:59:23 +0000 (15:59 -0700)
committerMatt Birkholz <puck@birchwood-abbey.net>
Fri, 14 Nov 2014 00:53:31 +0000 (17:53 -0700)
Also, use #f events.  The (lambda () unspecific) events accumulate
unnecessarily.

src/runtime/runtime.pkg
src/runtime/thread-queue.scm
src/runtime/thread.scm

index 98107c5ac27465cfddf17bc8e8ab089f6c18da27..072c505d5b4a36367db198a66da9eef94990e7b6 100644 (file)
@@ -5044,6 +5044,7 @@ USA.
          with-create-thread-continuation
          with-thread-events-blocked
          with-thread-mutex-locked
+         with-thread-mutex-unlocked
          with-thread-timer-stopped
          yield-current-thread)
   (export (runtime interrupt-handler)
index 0d67a6276a91a54343ecff8b40d436cf9478283b..a60b8a38111291c52ff102c56270915edbd2ba43 100644 (file)
@@ -37,8 +37,8 @@ USA.
 ;;; an item.  Thread-queue/peek-no-hang returns #F if the queue is
 ;;; empty.
 ;;;
-;;; If multiple threads block on a thread-queue, bad mojo is afoot.
-;;; They are ALL restarted whenever an item becomes available and,
+;;; If multiple threads block dequeuing, bad mojo is afoot(?).  They
+;;; are ALL restarted whenever an item becomes available and,
 ;;; depending on the thread timer interrupts, ANY ONE of them may be
 ;;; able to dequeue the item.
 
@@ -54,10 +54,35 @@ USA.
   element-count
   max-elements
   waiting-queuers
-  waiting-dequeuers)
+  waiting-dequeuers
+  mutex)
 
 (define-guarantee thread-queue "a thread-queue")
 
+(define-syntax %assert
+  (syntax-rules ()
+    ((_ CONDITION)
+     #f)))
+
+#;(define-syntax %assert
+  (syntax-rules ()
+    ((_ CONDITION)
+     (if (not CONDITION)
+        (error "Assertion failed:" 'CONDITION)))))
+
+(define-integrable (%locked? queue)
+  (thread-mutex-owner (%thread-queue/mutex queue)))
+
+(define (with-queue-locked queue thunk)
+  (with-thread-mutex-locked (%thread-queue/mutex queue)
+    (lambda ()
+      (with-thread-events-blocked thunk))))
+
+(define (with-queue-unlocked queue thunk)
+  (with-thread-mutex-unlocked (%thread-queue/mutex queue)
+   ;; suspend-current-thread will unblock (and re-block) thread-events
+   thunk))
+
 (define (print-thread-queue queue port)
   (write-string " elements:" port)
   (write-string (number->string
@@ -72,18 +97,19 @@ USA.
   (let ((max (cond ((default-object? max-size) #f)
                   ((integer? max-size) max-size)
                   (else (error "Max-size must be an integer:" max-size)))))
-    (%make-thread-queue #f #f 0 max '() '())))
+    (%make-thread-queue #f #f 0 max '() '() (make-thread-mutex))))
 
 (define (thread-queue/empty? queue)
   (guarantee-thread-queue queue 'thread-queue/empty?)
   (%empty? queue))
 
 (define-integrable (%empty? queue)
+  ;;(%assert (%locked? queue))  Seems unnecessary.
   (zero? (%thread-queue/element-count queue)))
 
 (define (thread-queue/empty! queue)
   (guarantee-thread-queue queue 'thread-queue/empty!)
-  (without-interrupts
+  (with-queue-locked queue
    (lambda ()
      (if (not (%empty? queue))
         (begin
@@ -96,24 +122,26 @@ USA.
 (define (thread-queue/queue! queue item)
   (guarantee-thread-queue queue 'thread-queue/queue!)
   (if (not item) (error "Cannot queue #F:" queue))
-  (without-interrupts
+  (with-queue-locked queue
    (lambda ()
      (do ()
         ((%queue-no-hang! queue item))
        (set-%thread-queue/waiting-queuers!
        queue (append! (%thread-queue/waiting-queuers queue)
                       (list (current-thread))))
-       (suspend-current-thread)))))
+       (with-queue-unlocked queue
+       suspend-current-thread)))))
 
 (define (thread-queue/queue-no-hang! queue item)
   ;; Returns #F when QUEUE is maxed out.
   (guarantee-thread-queue queue 'thread-queue/queue-no-hang!)
   (if (not item) (error "Cannot queue #F:" queue))
-  (without-interrupts
+  (with-queue-locked queue
    (lambda ()
      (%queue-no-hang! queue item))))
 
 (define (%queue-no-hang! queue item)
+  (%assert (%locked? queue))
   (let ((max (%thread-queue/max-elements queue)))
     (if max
        (if (< (%thread-queue/element-count queue) max)
@@ -132,7 +160,7 @@ USA.
 
 (declare (integrate-operator when-non-empty-before))
 (define (when-non-empty-before time queue operation)
-  (without-interrupts
+  (with-queue-locked queue
    (lambda ()
      (let loop ()
        (if (not (%empty? queue))
@@ -144,13 +172,14 @@ USA.
                   (set-%thread-queue/waiting-dequeuers!
                    queue (append! (%thread-queue/waiting-dequeuers queue)
                                   (list (current-thread))))
-                  (register-timer-event (- time now) (lambda () unspecific))
-                  (suspend-current-thread)
+                  (register-timer-event (- time now) #f)
+                  (with-queue-unlocked queue
+                   suspend-current-thread)
                   (loop)))))))))
 
 (define (thread-queue/dequeue! queue)
   (guarantee-thread-queue queue 'thread-queue/dequeue!)
-  (without-interrupts
+  (with-queue-locked queue
    (lambda ()
      (do ()
         ((and (not (%empty? queue))
@@ -158,7 +187,8 @@ USA.
        (set-%thread-queue/waiting-dequeuers!
        queue (append! (%thread-queue/waiting-dequeuers queue)
                       (list (current-thread))))
-       (suspend-current-thread)))))
+       (with-queue-unlocked queue
+       suspend-current-thread)))))
 
 (define (thread-queue/peek-no-hang queue msec)
   (guarantee-thread-queue queue 'thread-queue/peek-no-hang)
@@ -171,7 +201,7 @@ USA.
 
 (define (thread-queue/peek queue)
   (guarantee-thread-queue queue 'thread-queue/peek)
-  (without-interrupts
+  (with-queue-locked queue
    (lambda ()
      (do ()
         ((and (not (%empty? queue))
@@ -179,12 +209,15 @@ USA.
        (set-%thread-queue/waiting-dequeuers!
        queue (append! (%thread-queue/waiting-dequeuers queue)
                       (list (current-thread))))
-       (suspend-current-thread)))))
+       (with-queue-unlocked queue
+       suspend-current-thread)))))
 \f
 (define-integrable (%peek queue)
+  (%assert (%locked? queue))
   (car (%thread-queue/first-pair queue)))
 
 (define (%queue! queue item)
+  (%assert (%locked? queue))
   (let ((last (%thread-queue/last-pair queue))
        (new (cons item '())))
     (if last (set-cdr! last new))
@@ -197,6 +230,7 @@ USA.
   item)
 
 (define (%dequeue! queue)
+  (%assert (%locked? queue))
   (let* ((first (%thread-queue/first-pair queue))
         (item (car first)))
     (if (eq? first (%thread-queue/last-pair queue))
@@ -210,26 +244,28 @@ USA.
     item))
 
 (define (%resume-queuers queue)
+  (%assert (%locked? queue))
   (do ((queuers (%thread-queue/waiting-queuers queue)
                (cdr queuers)))
       ((null? queuers)
        unspecific)
-    (signal-thread-event (car queuers) (lambda () unspecific)))
+    (signal-thread-event (car queuers) #f))
   (set-%thread-queue/waiting-queuers! queue '()))
 
 (define (%resume-dequeuers queue)
+  (%assert (%locked? queue))
   (do ((dequeuers (%thread-queue/waiting-dequeuers queue)
                  (cdr dequeuers)))
       ((null? dequeuers)
        unspecific)
-    (signal-thread-event (car dequeuers) (lambda () unspecific)))
+    (signal-thread-event (car dequeuers) #f))
   (set-%thread-queue/waiting-dequeuers! queue '()))
 
 (define (thread-queue/push! queue item)
   ;; Place ITEM at the head of the queue, instead of the end.
   (guarantee-thread-queue queue 'thread-queue/push!)
   (if (not item) (error "Cannot queue #F:" queue))
-  (without-interrupts
+  (with-queue-locked queue
    (lambda ()
      (let ((max (%thread-queue/max-elements queue)))
        (if max
@@ -253,6 +289,7 @@ USA.
           (%push! queue item))))))
 
 (define (%push! queue item)
+  (%assert (%locked? queue))
   (let* ((first (%thread-queue/first-pair queue))
         (new (cons item first)))
     (set-%thread-queue/first-pair! queue new)
index d1fe0abd963cf85d5ea478760ad08d6fb01bb42c..549509246d375e1722a80b7eaad96f5327ef7ad3 100644 (file)
@@ -1121,6 +1121,25 @@ USA.
        (if (and grabbed-lock? (eq? (thread-mutex/owner mutex) thread))
           (%unlock-thread-mutex mutex thread))))))
 
+(define (with-thread-mutex-unlocked mutex thunk)
+  (guarantee-thread-mutex mutex 'WITH-THREAD-MUTEX-UNLOCKED)
+  (let ((thread (current-thread))
+       (released-lock?))
+    (dynamic-wind
+     (lambda ()
+       (let ((owner (thread-mutex/owner mutex)))
+        (if (not (eq? owner thread))
+            (set! released-lock? #f)
+            (begin
+              (set! released-lock? #t)
+              (%unlock-thread-mutex mutex owner)))))
+     thunk
+     (lambda ()
+       (if released-lock?
+          (let ((owner (thread-mutex/owner mutex)))
+            (if (not (eq? owner thread))
+                (%lock-thread-mutex mutex thread owner))))))))
+
 (define (%disassociate-thread-mutexes thread)
   (do ((mutexes (thread/mutexes thread) (cdr mutexes)))
       ((not (pair? mutexes)))