Added thread-queues (aka mailboxes).
authorMatt Birkholz <matt@birkholz.chandler.az.us>
Fri, 19 Aug 2011 03:21:13 +0000 (20:21 -0700)
committerMatt Birkholz <matt@birkholz.chandler.az.us>
Fri, 19 Aug 2011 03:21:13 +0000 (20:21 -0700)
src/runtime/runtime.pkg
src/runtime/thread-queue.scm [new file with mode: 0644]

index 0dd1fef85f159e47af5d6c2b4f36455fea4242fc..f86084bc14baa1b2a22e20e228e8c4ba38311114 100644 (file)
@@ -561,6 +561,21 @@ USA.
          queue-map!
          queue->list))
 
+(define-package (runtime thread-queue)
+  (files "thread-queue")
+  (parent (runtime))
+  (export ()
+         make-thread-queue
+         thread-queue/empty?
+         thread-queue/empty!
+         thread-queue/queue!
+         thread-queue/queue-no-hang!
+         thread-queue/push!
+         thread-queue/dequeue!
+         thread-queue/peek-no-hang
+         thread-queue/peek-until
+         thread-queue/peek))
+
 (define-package (runtime simple-file-ops)
   (files "sfile")
   (parent (runtime))
diff --git a/src/runtime/thread-queue.scm b/src/runtime/thread-queue.scm
new file mode 100644 (file)
index 0000000..bf4fc31
--- /dev/null
@@ -0,0 +1,281 @@
+#| -*-Scheme-*-
+
+Copyright (C) 2005, 2009, 2010, 2011  Matthew Birkholz
+
+This file is part of MIT/GNU Scheme.
+
+MIT/GNU Scheme is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; either version 2 of the License, or (at
+your option) any later version.
+
+MIT/GNU Scheme is distributed in the hope that it will be useful, but
+WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with MIT/GNU Scheme; if not, write to the Free Software
+Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301,
+USA.
+
+|#
+
+;;;; Thread-blocking Queues
+;;; package: (runtime thread-queue)
+
+(declare (usual-integrations))
+
+;;; These queues are like the simple queues provided by the (runtime
+;;; simple-queue) package, EXCEPT that they will, when empty, block a
+;;; thread attempting to dequeue an item.  Also, a maximum size can be
+;;; specified so that the queue will, when full, block a thread
+;;; attempting to queue an item.  Note that #F should not be used as
+;;; an item.  Thread-queue/peek-no-hang returns #F if the queue is
+;;; empty.
+;;;
+;;; If multiple threads block on a thread-queue, bad mojo is afoot.
+;;; They are ALL restarted whenever an item becomes available and,
+;;; depending on the thread timer interrupts, ANY ONE of them may be
+;;; able to dequeue the item.
+
+(define-structure (thread-queue (constructor %make-thread-queue)
+                               (conc-name %thread-queue/)
+                               (print-procedure
+                                (standard-unparser-method
+                                 'thread-queue
+                                 (lambda (queue port)
+                                   (print-thread-queue queue port)))))
+  first-pair
+  last-pair
+  element-count
+  max-elements
+  waiting-queuers
+  waiting-dequeuers)
+
+(define-guarantee thread-queue "a thread-queue")
+
+(define (print-thread-queue queue port)
+  (write-string " elements:" port)
+  (write-string (number->string
+                (%thread-queue/element-count queue)) port)
+  (let ((max (%thread-queue/max-elements queue)))
+    (if max
+       (begin
+         (write-string " max:" port)
+         (write-string (number->string max) port)))))
+
+(define (make-thread-queue #!optional max-size)
+  (let ((max (cond ((default-object? max-size) #f)
+                  ((integer? max-size) max-size)
+                  (else (error "Max-size must be an integer:" max-size)))))
+    (%make-thread-queue #f #f 0 max '() '())))
+
+(define (thread-queue/empty? queue)
+  (%empty? queue))
+
+(define-integrable (%empty? queue)
+  (zero? (%thread-queue/element-count queue)))
+
+(define (thread-queue/empty! queue)
+  (without-interrupts
+   (lambda ()
+     (if (not (%empty? queue))
+        (begin
+          (set-%thread-queue/first-pair! queue #f)
+          (set-%thread-queue/last-pair! queue #f)
+          (set-%thread-queue/element-count! queue 0)
+          (%resume-queuers queue)))))
+  unspecific)
+
+(define (thread-queue/queue! queue item)
+  (if (not item) (error "Cannot queue #F:" queue))
+  (without-interrupts
+   (lambda ()
+     (do ()
+        ((%queue-no-hang! queue item))
+       (set-%thread-queue/waiting-queuers!
+       queue (append! (%thread-queue/waiting-queuers queue)
+                      (list (current-thread))))
+       (suspend-current-thread)))))
+
+(define (thread-queue/queue-no-hang! queue item)
+  ;; Returns #F when QUEUE is maxed out.
+  (if (not item) (error "Cannot queue #F:" queue))
+  (without-interrupts
+   (lambda ()
+     (%queue-no-hang! queue item))))
+
+(define (%queue-no-hang! queue item)
+  (let ((max (%thread-queue/max-elements queue)))
+    (if max
+       (if (< (%thread-queue/element-count queue) max)
+           (%queue! queue item)
+           #f)
+       (%queue! queue item))))
+
+(define (thread-queue/dequeue-no-hang queue timeout)
+  (guarantee-thread-queue queue 'thread-queue/dequeue-no-hang)
+  (guarantee-non-negative-fixnum timeout 'thread-queue/dequeue-no-hang)
+  (thread-queue/dequeue-until queue (+ (real-time-clock) timeout)))
+
+(define (thread-queue/dequeue-until queue time)
+  (guarantee-thread-queue queue 'thread-queue/dequeue-until)
+  (guarantee-integer time 'thread-queue/dequeue-until)
+  (when-non-empty-before time queue %dequeue!))
+
+(declare (integrate-operator when-non-empty-before))
+(define (when-non-empty-before time queue operation)
+  (without-interrupts
+   (lambda ()
+     (let loop ()
+       (if (not (%empty? queue))
+          (operation queue)
+          (let ((now (real-time-clock)))
+            (if (<= time now)
+                #f
+                (begin
+                  (register-timer-event (- time now) (lambda () unspecific))
+                  (suspend-current-thread)
+                  (loop)))))))))
+
+(define (thread-queue/dequeue! queue)
+  (without-interrupts
+   (lambda ()
+     (do ()
+        ((and (not (%empty? queue))
+              (%dequeue! queue)))
+       (set-%thread-queue/waiting-dequeuers!
+       queue (append! (%thread-queue/waiting-dequeuers queue)
+                      (list (current-thread))))
+       (suspend-current-thread)))))
+
+(define (thread-queue/peek-no-hang queue timeout)
+  (guarantee-thread-queue queue 'thread-queue/peek-no-hang)
+  (guarantee-non-negative-fixnum timeout 'thread-queue/peek-no-hang)
+  (thread-queue/peek-until queue (+ (real-time-clock) timeout)))
+
+(define (thread-queue/peek-until queue time)
+  (guarantee-thread-queue queue 'thread-queue/peek-until)
+  (guarantee-integer time 'thread-queue/peek-until)
+  (when-non-empty-before time queue %peek))
+
+(define (thread-queue/peek queue)
+  (without-interrupts
+   (lambda ()
+     (do ()
+        ((and (not (%empty? queue))
+              (%peek queue)))
+       (set-%thread-queue/waiting-dequeuers!
+       queue (append! (%thread-queue/waiting-dequeuers queue)
+                      (list (current-thread))))
+       (suspend-current-thread)))))
+\f
+(define-integrable (%peek queue)
+  (car (%thread-queue/first-pair queue)))
+
+(define (%queue! queue item)
+  (let ((last (%thread-queue/last-pair queue))
+       (new (cons item '())))
+    (if last (set-cdr! last new))
+    (set-%thread-queue/last-pair! queue new)
+    (if (not (%thread-queue/first-pair queue))
+       (set-%thread-queue/first-pair! queue new)))
+  (set-%thread-queue/element-count!
+   queue (1+ (%thread-queue/element-count queue)))
+  (%resume-dequeuers queue)
+  item)
+
+(define (%dequeue! queue)
+  (let* ((first (%thread-queue/first-pair queue))
+        (item (car first)))
+    (if (eq? first (%thread-queue/last-pair queue))
+       (begin
+         (set-%thread-queue/first-pair! queue #f)
+         (set-%thread-queue/last-pair! queue #f))
+        (set-%thread-queue/first-pair! queue (cdr first)))
+    (set-%thread-queue/element-count!
+     queue (-1+ (%thread-queue/element-count queue)))
+    (%resume-queuers queue)
+    item))
+
+(define (%resume-queuers queue)
+  (do ((queuers (%thread-queue/waiting-queuers queue)
+               (cdr queuers)))
+      ((null? queuers)
+       unspecific)
+    (signal-thread-event (car queuers) (lambda () unspecific)))
+  (set-%thread-queue/waiting-queuers! queue '()))
+
+(define (%resume-dequeuers queue)
+  (do ((dequeuers (%thread-queue/waiting-dequeuers queue)
+                 (cdr dequeuers)))
+      ((null? dequeuers)
+       unspecific)
+    (signal-thread-event (car dequeuers) (lambda () unspecific)))
+  (set-%thread-queue/waiting-dequeuers! queue '()))
+
+(define (thread-queue/push! queue item)
+  ;; Place ITEM at the head of the queue, instead of the end.
+  (if (not item) (error "Cannot queue #F:" queue))
+  (without-interrupts
+   (lambda ()
+     (let ((max (%thread-queue/max-elements queue)))
+       (if max
+          (if (< (%thread-queue/element-count queue) max)
+              (%push! queue item)
+              (let ((last (%thread-queue/last-pair queue))
+                    (first (%thread-queue/first-pair queue)))
+                (let ((new-last
+                       (let before-last ((list first))
+                         ;; Assume LIST is always a pair, thus that
+                         ;; max > 0, and LAST is in FIRST.
+                         (if (eq? (cdr list) last)
+                             list
+                             (before-last (cdr list))))))
+                  (set-cdr! new-last '())
+                  (set-%thread-queue/last-pair! queue new-last)
+                  (set-car! last item) ;Clobber most recently queued item!
+                  (set-cdr! last first)
+                  (set-%thread-queue/first-pair! queue last))
+                item))
+          (%push! queue item))))))
+
+(define (%push! queue item)
+  (let* ((first (%thread-queue/first-pair queue))
+        (new (cons item first)))
+    (set-%thread-queue/first-pair! queue new)
+    (if (not (%thread-queue/last-pair queue))
+       (set-%thread-queue/last-pair! queue new))
+    (set-%thread-queue/element-count! queue
+                                     (1+ (%thread-queue/element-count queue)))
+    (%resume-dequeuers queue)
+    item))
+
+(define (test)
+  ;; Sets up a "producer" thread that puts the letters of the alphabet
+  ;; into a thread-queue, one each 2-3 seconds.  A "consumer" thread
+  ;; waits on the queue, printing what it reads.
+  (outf-error ";Thread Queue Test\n")
+  (let ((queue (make-thread-queue)))
+    (create-thread
+     #f
+     (lambda ()
+       (outf-error ";    Consumer: "(current-thread)"\n")
+       (let loop ()
+        (outf-error ";    Consumer reads.\n")
+        (let ((item (thread-queue/dequeue! queue)))
+          (outf-error ";    Consumer read "item"\n")
+          (loop)))))
+    (create-thread
+     #f
+     (lambda ()
+       (outf-error ";    Producer: "(current-thread)"\n")
+       (for-each (lambda (item)
+                  (outf-error ";    Producer: sleeping...\n")
+                  (sleep-current-thread 2000)
+                  (outf-error ";    Producer: queuing "item"...\n")
+                  (thread-queue/queue! queue item)
+                  (outf-error ";    Producer: queued "item"\n"))
+                '(#\a #\b #\c #\d #\e))
+       (outf-error ";    Producer done.\n")))))
\ No newline at end of file