;;; an item. Thread-queue/peek-no-hang returns #F if the queue is
;;; empty.
;;;
-;;; If multiple threads block on a thread-queue, bad mojo is afoot.
-;;; They are ALL restarted whenever an item becomes available and,
+;;; If multiple threads block dequeuing, bad mojo is afoot(?). They
+;;; are ALL restarted whenever an item becomes available and,
;;; depending on the thread timer interrupts, ANY ONE of them may be
;;; able to dequeue the item.
element-count
max-elements
waiting-queuers
- waiting-dequeuers)
+ waiting-dequeuers
+ mutex)
(define-guarantee thread-queue "a thread-queue")
+(define-syntax %assert
+ (syntax-rules ()
+ ((_ CONDITION)
+ #f)))
+
+#;(define-syntax %assert
+ (syntax-rules ()
+ ((_ CONDITION)
+ (if (not CONDITION)
+ (error "Assertion failed:" 'CONDITION)))))
+
+(define-integrable (%locked? queue)
+ (thread-mutex-owner (%thread-queue/mutex queue)))
+
+(define (with-queue-locked queue thunk)
+ (with-thread-mutex-locked (%thread-queue/mutex queue)
+ (lambda ()
+ (with-thread-events-blocked thunk))))
+
+(define (with-queue-unlocked queue thunk)
+ (with-thread-mutex-unlocked (%thread-queue/mutex queue)
+ ;; suspend-current-thread will unblock (and re-block) thread-events
+ thunk))
+
(define (print-thread-queue queue port)
(write-string " elements:" port)
(write-string (number->string
(let ((max (cond ((default-object? max-size) #f)
((integer? max-size) max-size)
(else (error "Max-size must be an integer:" max-size)))))
- (%make-thread-queue #f #f 0 max '() '())))
+ (%make-thread-queue #f #f 0 max '() '() (make-thread-mutex))))
(define (thread-queue/empty? queue)
(guarantee-thread-queue queue 'thread-queue/empty?)
(%empty? queue))
(define-integrable (%empty? queue)
+ ;;(%assert (%locked? queue)) Seems unnecessary.
(zero? (%thread-queue/element-count queue)))
(define (thread-queue/empty! queue)
(guarantee-thread-queue queue 'thread-queue/empty!)
- (without-interrupts
+ (with-queue-locked queue
(lambda ()
(if (not (%empty? queue))
(begin
(define (thread-queue/queue! queue item)
(guarantee-thread-queue queue 'thread-queue/queue!)
(if (not item) (error "Cannot queue #F:" queue))
- (without-interrupts
+ (with-queue-locked queue
(lambda ()
(do ()
((%queue-no-hang! queue item))
(set-%thread-queue/waiting-queuers!
queue (append! (%thread-queue/waiting-queuers queue)
(list (current-thread))))
- (suspend-current-thread)))))
+ (with-queue-unlocked queue
+ suspend-current-thread)))))
(define (thread-queue/queue-no-hang! queue item)
;; Returns #F when QUEUE is maxed out.
(guarantee-thread-queue queue 'thread-queue/queue-no-hang!)
(if (not item) (error "Cannot queue #F:" queue))
- (without-interrupts
+ (with-queue-locked queue
(lambda ()
(%queue-no-hang! queue item))))
(define (%queue-no-hang! queue item)
+ (%assert (%locked? queue))
(let ((max (%thread-queue/max-elements queue)))
(if max
(if (< (%thread-queue/element-count queue) max)
(declare (integrate-operator when-non-empty-before))
(define (when-non-empty-before time queue operation)
- (without-interrupts
+ (with-queue-locked queue
(lambda ()
(let loop ()
(if (not (%empty? queue))
(set-%thread-queue/waiting-dequeuers!
queue (append! (%thread-queue/waiting-dequeuers queue)
(list (current-thread))))
- (register-timer-event (- time now) (lambda () unspecific))
- (suspend-current-thread)
+ (register-timer-event (- time now) #f)
+ (with-queue-unlocked queue
+ suspend-current-thread)
(loop)))))))))
(define (thread-queue/dequeue! queue)
(guarantee-thread-queue queue 'thread-queue/dequeue!)
- (without-interrupts
+ (with-queue-locked queue
(lambda ()
(do ()
((and (not (%empty? queue))
(set-%thread-queue/waiting-dequeuers!
queue (append! (%thread-queue/waiting-dequeuers queue)
(list (current-thread))))
- (suspend-current-thread)))))
+ (with-queue-unlocked queue
+ suspend-current-thread)))))
(define (thread-queue/peek-no-hang queue msec)
(guarantee-thread-queue queue 'thread-queue/peek-no-hang)
(define (thread-queue/peek queue)
(guarantee-thread-queue queue 'thread-queue/peek)
- (without-interrupts
+ (with-queue-locked queue
(lambda ()
(do ()
((and (not (%empty? queue))
(set-%thread-queue/waiting-dequeuers!
queue (append! (%thread-queue/waiting-dequeuers queue)
(list (current-thread))))
- (suspend-current-thread)))))
+ (with-queue-unlocked queue
+ suspend-current-thread)))))
\f
(define-integrable (%peek queue)
+ (%assert (%locked? queue))
(car (%thread-queue/first-pair queue)))
(define (%queue! queue item)
+ (%assert (%locked? queue))
(let ((last (%thread-queue/last-pair queue))
(new (cons item '())))
(if last (set-cdr! last new))
item)
(define (%dequeue! queue)
+ (%assert (%locked? queue))
(let* ((first (%thread-queue/first-pair queue))
(item (car first)))
(if (eq? first (%thread-queue/last-pair queue))
item))
(define (%resume-queuers queue)
+ (%assert (%locked? queue))
(do ((queuers (%thread-queue/waiting-queuers queue)
(cdr queuers)))
((null? queuers)
unspecific)
- (signal-thread-event (car queuers) (lambda () unspecific)))
+ (signal-thread-event (car queuers) #f))
(set-%thread-queue/waiting-queuers! queue '()))
(define (%resume-dequeuers queue)
+ (%assert (%locked? queue))
(do ((dequeuers (%thread-queue/waiting-dequeuers queue)
(cdr dequeuers)))
((null? dequeuers)
unspecific)
- (signal-thread-event (car dequeuers) (lambda () unspecific)))
+ (signal-thread-event (car dequeuers) #f))
(set-%thread-queue/waiting-dequeuers! queue '()))
(define (thread-queue/push! queue item)
;; Place ITEM at the head of the queue, instead of the end.
(guarantee-thread-queue queue 'thread-queue/push!)
(if (not item) (error "Cannot queue #F:" queue))
- (without-interrupts
+ (with-queue-locked queue
(lambda ()
(let ((max (%thread-queue/max-elements queue)))
(if max
(%push! queue item))))))
(define (%push! queue item)
+ (%assert (%locked? queue))
(let* ((first (%thread-queue/first-pair queue))
(new (cons item first)))
(set-%thread-queue/first-pair! queue new)