/* -*-C-*-
-$Id: os2msg.c,v 1.3 1994/12/19 22:31:31 cph Exp $
+$Id: os2msg.c,v 1.4 1995/01/05 23:54:48 cph Exp $
-Copyright (c) 1994 Massachusetts Institute of Technology
+Copyright (c) 1994-95 Massachusetts Institute of Technology
This material was developed by the Scheme project at the Massachusetts
Institute of Technology, Department of Electrical Engineering and
typedef struct
{
unsigned int allocatedp : 1; /* queue allocated? */
- unsigned int openp : 1; /* queue open? */
qid_t twin; /* other end of connection */
qid_receive_filter_t filter; /* filter for received messages */
tqueue_t * tqueue; /* thread queue for reception */
void
OS2_close_qid (qid_t qid)
{
- {
- msg_list_t * elt = (QID_SUBQUEUE_HEAD (qid));
- while (elt != 0)
- {
- msg_list_t * next = (elt -> next);
- OS_free (elt);
- elt = next;
- }
- }
- (QID_SUBQUEUE_HEAD (qid)) = 0;
- (QID_SUBQUEUE_TAIL (qid)) = 0;
+ while ((QID_SUBQUEUE_HEAD (qid)) != 0)
+ {
+ msg_list_t * this = (QID_SUBQUEUE_HEAD (qid));
+ (QID_SUBQUEUE_HEAD (qid)) = (this -> next);
+ OS2_destroy_message (this -> message);
+ OS_free (this);
+ }
(QID_TQUEUE (qid)) = 0;
OS2_request_mutex_semaphore (qid_lock);
{
(QID_ALLOCATEDP (qid)) = 0;
OS2_release_mutex_semaphore (qid_lock);
}
+\f
+qid_t
+OS2_qid_twin (qid_t qid)
+{
+ return (((QID_ALLOCATEDP (qid))
+ && ((QID_TWIN (qid)) != QID_NONE)
+ && (QID_ALLOCATEDP (QID_TWIN (qid))))
+ ? (QID_TWIN (qid))
+ : QID_NONE);
+}
void
OS2_close_qid_pair (qid_t qid)
}
msg_t *
-OS2_create_message (msg_type_t type)
+OS2_create_message_1 (msg_type_t type, msg_length_t extra)
{
/* Do allocation carefully to prevent infinite loop when signalling
"out of memory" condition. */
msg_t * message =
- (malloc ((unsigned long) (OS2_message_type_length (type))));
+ (malloc (((unsigned long) (OS2_message_type_length (type))) + extra));
if (message == 0)
if ((type == mt_syscall_error)
&& ((SM_SYSCALL_ERROR_CODE (message)) == ERROR_NOT_ENOUGH_MEMORY)
void
OS2_destroy_message (msg_t * message)
{
- OS_free ((void *) message);
+ OS_free (message);
}
\f
/* Message Transmission and Reception */
OS2_send_message (qid_t qid, msg_t * message)
{
qid_t twin = (QID_TWIN (qid));
- tqueue_t * tqueue = (QID_TQUEUE (twin));
- if (tqueue == 0)
- OS2_logic_error ("Write to closed QID.");
- (MSG_SENDER (message)) = twin;
- write_tqueue (tqueue, message);
+ if (twin == QID_NONE)
+ /* Other end of connection has been closed, so discard the
+ message. We used to signal an error here, but this can happen
+ pretty easily when closing windows or exiting Scheme. The only
+ way to avoid this is to force synchronization of communicating
+ threads, which can be tricky. For example, when closing a PM
+ window, it's not obvious when the last message will be
+ generated by the PM thread. So it's just simpler to ignore
+ messages after the receiver decides it's no longer interested
+ in them. */
+ OS2_destroy_message (message);
+ else
+ {
+ tqueue_t * tqueue = (QID_TQUEUE (twin));
+ if (tqueue == 0)
+ OS2_logic_error ("Write to unopened QID.");
+ (MSG_SENDER (message)) = twin;
+ write_tqueue (tqueue, message);
+ }
}
msg_t *
}
}
-int
-OS2_tqueue_select (tqueue_t * tqueue, int blockp)
-{
- while (1)
- {
- msg_t * message = (read_tqueue (tqueue, blockp));
- if ((TQUEUE_TYPE (tqueue)) == tqt_scm)
- {
- process_interrupt_messages ();
- if (pending_interrupts_p ())
- return (-2);
- }
- if (message != 0)
- return (MSG_SENDER (message));
- if (!blockp)
- return (-1);
- }
-}
-
msg_t *
OS2_wait_for_message (qid_t qid, msg_type_t reply_type)
{
return;
}
{
- msg_list_t * tail = (QID_SUBQUEUE_TAIL (qid));
- msg_list_t * elt = (OS_malloc (sizeof (struct msg_list_s)));
+ msg_list_t * elt = (OS_malloc (sizeof (msg_list_t)));
(elt -> message) = message;
(elt -> next) = 0;
- if (tail == 0)
+ if ((QID_SUBQUEUE_HEAD (qid)) == 0)
(QID_SUBQUEUE_HEAD (qid)) = elt;
else
- (tail -> next) = elt;
+ ((QID_SUBQUEUE_TAIL (qid)) -> next) = elt;
(QID_SUBQUEUE_TAIL (qid)) = elt;
}
}
{
msg_t * message = (head -> message);
(QID_SUBQUEUE_HEAD (qid)) = (head -> next);
- if ((head -> next) == 0)
- (QID_SUBQUEUE_TAIL (qid)) = 0;
OS_free (head);
return (message);
}
}
+void
+OS2_unread_message (qid_t qid, msg_t * message)
+{
+ msg_list_t * head = (QID_SUBQUEUE_HEAD (qid));
+ msg_list_t * elt = (OS_malloc (sizeof (msg_list_t)));
+ (elt -> message) = message;
+ (elt -> next) = head;
+ (QID_SUBQUEUE_HEAD (qid)) = elt;
+ if (head == 0)
+ (QID_SUBQUEUE_TAIL (qid)) = elt;
+}
+
static int
subqueue_emptyp (qid_t qid)
{
return ((QID_SUBQUEUE_HEAD (qid)) == 0);
}
+\f
+int
+OS2_tqueue_select (tqueue_t * tqueue, int blockp)
+{
+ while (1)
+ {
+ msg_t * message = (read_tqueue (tqueue, blockp));
+ if ((TQUEUE_TYPE (tqueue)) == tqt_scm)
+ {
+ process_interrupt_messages ();
+ if (pending_interrupts_p ())
+ return (-2);
+ }
+ if (message != 0)
+ return (MSG_SENDER (message));
+ if (!blockp)
+ return (-1);
+ }
+}
static msg_t *
read_tqueue (tqueue_t * tqueue, int blockp)
typedef struct
{
tqueue_type_t type;
- HQUEUE queue; /* queue */
- HEV event; /* associated event semaphore */
+ msg_list_t * head; /* queue */
+ msg_list_t * tail;
+ HMTX mutex; /* mutex semaphore */
+ HEV event; /* event semaphore */
} std_tqueue_t;
-#define STD_TQUEUE_QUEUE(q) (((std_tqueue_t *) (q)) -> queue)
+#define STD_TQUEUE_HEAD(q) (((std_tqueue_t *) (q)) -> head)
+#define STD_TQUEUE_TAIL(q) (((std_tqueue_t *) (q)) -> tail)
+#define STD_TQUEUE_MUTEX(q) (((std_tqueue_t *) (q)) -> mutex)
#define STD_TQUEUE_EVENT(q) (((std_tqueue_t *) (q)) -> event)
tqueue_t *
{
tqueue_t * tqueue = (OS_malloc (sizeof (std_tqueue_t)));
(TQUEUE_TYPE (tqueue)) = tqt_std;
- (STD_TQUEUE_QUEUE (tqueue)) = (OS2_create_queue (QUE_FIFO));
+ (STD_TQUEUE_HEAD (tqueue)) = 0;
+ (STD_TQUEUE_TAIL (tqueue)) = 0;
+ (STD_TQUEUE_MUTEX (tqueue)) = (OS2_create_mutex_semaphore (0, 0));
(STD_TQUEUE_EVENT (tqueue)) = (OS2_create_event_semaphore (0, 0));
return (tqueue);
}
void
OS2_close_std_tqueue (tqueue_t * tqueue)
{
- OS2_close_queue (STD_TQUEUE_QUEUE (tqueue));
OS2_close_event_semaphore (STD_TQUEUE_EVENT (tqueue));
+ OS2_close_event_semaphore (STD_TQUEUE_MUTEX (tqueue));
+ while ((STD_TQUEUE_HEAD (tqueue)) != 0)
+ {
+ msg_list_t * this = (STD_TQUEUE_HEAD (tqueue));
+ (STD_TQUEUE_HEAD (tqueue)) = (this -> next);
+ OS2_destroy_message (this -> message);
+ OS_free (this);
+ }
OS_free (tqueue);
}
static msg_t *
read_std_tqueue (tqueue_t * tqueue, int blockp)
{
- ULONG type;
- ULONG length;
- PVOID data;
- msg_t * message;
- const char * s = "Non-message read from message queue.";
-
- if (!OS2_read_queue ((STD_TQUEUE_QUEUE (tqueue)),
- (&type),
- (&length),
- (&data),
- (blockp ? NULLHANDLE : (STD_TQUEUE_EVENT (tqueue)))))
- return (0);
- if (length < (sizeof (msg_t)))
- OS2_logic_error (s);
- message = ((msg_t *) data);
- if ((type != 0) || (length != (MSG_LENGTH (message))))
- OS2_logic_error (s);
- write_subqueue (message);
- return (message);
+ while (1)
+ {
+ OS2_request_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
+ if ((STD_TQUEUE_HEAD (tqueue)) != 0)
+ {
+ msg_list_t * element = (STD_TQUEUE_HEAD (tqueue));
+ (STD_TQUEUE_HEAD (tqueue)) = (element -> next);
+ /* This prevents the 16 bit counter inside the event
+ semaphore from overflowing, in the unlikely situation
+ that the semaphore is not waited on for a long period. */
+ (void) OS2_reset_event_semaphore (STD_TQUEUE_EVENT (tqueue));
+ OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
+ {
+ msg_t * message = (element -> message);
+ OS_free (element);
+ write_subqueue (message);
+ return (message);
+ }
+ }
+ if (!blockp)
+ {
+ OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
+ return (0);
+ }
+ (void) OS2_reset_event_semaphore (STD_TQUEUE_EVENT (tqueue));
+ OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
+ (void) OS2_wait_event_semaphore ((STD_TQUEUE_EVENT (tqueue)), 1);
+ }
}
static void
write_std_tqueue (tqueue_t * tqueue, msg_t * message)
{
- OS2_write_queue ((STD_TQUEUE_QUEUE (tqueue)),
- 0,
- (MSG_LENGTH (message)),
- ((PVOID) message),
- 0);
+ msg_list_t * element = (OS_malloc (sizeof (msg_list_t)));
+ (element -> message) = message;
+ (element -> next) = 0;
+ OS2_request_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
+ if ((STD_TQUEUE_HEAD (tqueue)) == 0)
+ (STD_TQUEUE_HEAD (tqueue)) = element;
+ else
+ ((STD_TQUEUE_TAIL (tqueue)) -> next) = element;
+ (STD_TQUEUE_TAIL (tqueue)) = element;
+ (void) OS2_post_event_semaphore (STD_TQUEUE_EVENT (tqueue));
+ OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
}
\f
static tqueue_t *