Add mechanism to "unread" messages back to the head of a message
authorChris Hanson <org/chris-hanson/cph>
Thu, 5 Jan 1995 23:54:48 +0000 (23:54 +0000)
committerChris Hanson <org/chris-hanson/cph>
Thu, 5 Jan 1995 23:54:48 +0000 (23:54 +0000)
queue.  Add mechanism to get the other end of a qid pair.  Eliminate
logic error that was being generated when sending a message to a qid
whose other end had been closed (now we just discard the message).
Reimplement message queues to NOT use the OS/2 queue abstraction --
apparently these queues use quite alot of memory and I recently ran
into a situation in which the memory was exhausted, which caused
Scheme to die.  Now, the memory needed for the queues is taken from
the Scheme process's virtual memory.

v7/src/microcode/os2msg.c

index a74d90d05091ebea5f2d32dbe739a4f49934e9e0..2ddf411cceebdad67ae5c756dd58dd17114ed6cf 100644 (file)
@@ -1,8 +1,8 @@
 /* -*-C-*-
 
-$Id: os2msg.c,v 1.3 1994/12/19 22:31:31 cph Exp $
+$Id: os2msg.c,v 1.4 1995/01/05 23:54:48 cph Exp $
 
-Copyright (c) 1994 Massachusetts Institute of Technology
+Copyright (c) 1994-95 Massachusetts Institute of Technology
 
 This material was developed by the Scheme project at the Massachusetts
 Institute of Technology, Department of Electrical Engineering and
@@ -53,7 +53,6 @@ static void process_interrupt_messages (void);
 typedef struct
 {
   unsigned int allocatedp : 1; /* queue allocated? */
-  unsigned int openp : 1;      /* queue open? */
   qid_t twin;                  /* other end of connection */
   qid_receive_filter_t filter; /* filter for received messages */
   tqueue_t * tqueue;           /* thread queue for reception */
@@ -159,17 +158,13 @@ OS2_qid_openp (qid_t qid)
 void
 OS2_close_qid (qid_t qid)
 {
-  {
-    msg_list_t * elt = (QID_SUBQUEUE_HEAD (qid));
-    while (elt != 0)
-      {
-       msg_list_t * next = (elt -> next);
-       OS_free (elt);
-       elt = next;
-      }
-  }
-  (QID_SUBQUEUE_HEAD (qid)) = 0;
-  (QID_SUBQUEUE_TAIL (qid)) = 0;
+  while ((QID_SUBQUEUE_HEAD (qid)) != 0)
+    {
+      msg_list_t * this = (QID_SUBQUEUE_HEAD (qid));
+      (QID_SUBQUEUE_HEAD (qid)) = (this -> next);
+      OS2_destroy_message (this -> message);
+      OS_free (this);
+    }
   (QID_TQUEUE (qid)) = 0;
   OS2_request_mutex_semaphore (qid_lock);
   {
@@ -183,6 +178,16 @@ OS2_close_qid (qid_t qid)
   (QID_ALLOCATEDP (qid)) = 0;
   OS2_release_mutex_semaphore (qid_lock);
 }
+\f
+qid_t
+OS2_qid_twin (qid_t qid)
+{
+  return (((QID_ALLOCATEDP (qid))
+          && ((QID_TWIN (qid)) != QID_NONE)
+          && (QID_ALLOCATEDP (QID_TWIN (qid))))
+         ? (QID_TWIN (qid))
+         : QID_NONE);
+}
 
 void
 OS2_close_qid_pair (qid_t qid)
@@ -235,12 +240,12 @@ OS2_set_message_type_length (msg_type_t type, msg_length_t length)
 }
 
 msg_t *
-OS2_create_message (msg_type_t type)
+OS2_create_message_1 (msg_type_t type, msg_length_t extra)
 {
   /* Do allocation carefully to prevent infinite loop when signalling
      "out of memory" condition.  */
   msg_t * message =
-    (malloc ((unsigned long) (OS2_message_type_length (type))));
+    (malloc (((unsigned long) (OS2_message_type_length (type))) + extra));
   if (message == 0)
     if ((type == mt_syscall_error)
        && ((SM_SYSCALL_ERROR_CODE (message)) == ERROR_NOT_ENOUGH_MEMORY)
@@ -255,7 +260,7 @@ OS2_create_message (msg_type_t type)
 void
 OS2_destroy_message (msg_t * message)
 {
-  OS_free ((void *) message);
+  OS_free (message);
 }
 \f
 /* Message Transmission and Reception */
@@ -264,11 +269,25 @@ void
 OS2_send_message (qid_t qid, msg_t * message)
 {
   qid_t twin = (QID_TWIN (qid));
-  tqueue_t * tqueue = (QID_TQUEUE (twin));
-  if (tqueue == 0)
-    OS2_logic_error ("Write to closed QID.");
-  (MSG_SENDER (message)) = twin;
-  write_tqueue (tqueue, message);
+  if (twin == QID_NONE)
+    /* Other end of connection has been closed, so discard the
+       message.  We used to signal an error here, but this can happen
+       pretty easily when closing windows or exiting Scheme.  The only
+       way to avoid this is to force synchronization of communicating
+       threads, which can be tricky.  For example, when closing a PM
+       window, it's not obvious when the last message will be
+       generated by the PM thread.  So it's just simpler to ignore
+       messages after the receiver decides it's no longer interested
+       in them.  */
+    OS2_destroy_message (message);
+  else
+    {
+      tqueue_t * tqueue = (QID_TQUEUE (twin));
+      if (tqueue == 0)
+       OS2_logic_error ("Write to unopened QID.");
+      (MSG_SENDER (message)) = twin;
+      write_tqueue (tqueue, message);
+    }
 }
 
 msg_t *
@@ -316,25 +335,6 @@ OS2_message_availablep (qid_t qid, int blockp)
     }
 }
 
-int
-OS2_tqueue_select (tqueue_t * tqueue, int blockp)
-{
-  while (1)
-    {
-      msg_t * message = (read_tqueue (tqueue, blockp));
-      if ((TQUEUE_TYPE (tqueue)) == tqt_scm)
-       {
-         process_interrupt_messages ();
-         if (pending_interrupts_p ())
-           return (-2);
-       }
-      if (message != 0)
-       return (MSG_SENDER (message));
-      if (!blockp)
-       return (-1);
-    }
-}
-
 msg_t *
 OS2_wait_for_message (qid_t qid, msg_type_t reply_type)
 {
@@ -365,14 +365,13 @@ write_subqueue (msg_t * message)
        return;
     }
   {
-    msg_list_t * tail = (QID_SUBQUEUE_TAIL (qid));
-    msg_list_t * elt = (OS_malloc (sizeof (struct msg_list_s)));
+    msg_list_t * elt = (OS_malloc (sizeof (msg_list_t)));
     (elt -> message) = message;
     (elt -> next) = 0;
-    if (tail == 0)
+    if ((QID_SUBQUEUE_HEAD (qid)) == 0)
       (QID_SUBQUEUE_HEAD (qid)) = elt;
     else
-      (tail -> next) = elt;
+      ((QID_SUBQUEUE_TAIL (qid)) -> next) = elt;
     (QID_SUBQUEUE_TAIL (qid)) = elt;
   }
 }
@@ -386,18 +385,47 @@ read_subqueue (qid_t qid)
   {
     msg_t * message = (head -> message);
     (QID_SUBQUEUE_HEAD (qid)) = (head -> next);
-    if ((head -> next) == 0)
-      (QID_SUBQUEUE_TAIL (qid)) = 0;
     OS_free (head);
     return (message);
   }
 }
 
+void
+OS2_unread_message (qid_t qid, msg_t * message)
+{
+  msg_list_t * head = (QID_SUBQUEUE_HEAD (qid));
+  msg_list_t * elt = (OS_malloc (sizeof (msg_list_t)));
+  (elt -> message) = message;
+  (elt -> next) = head;
+  (QID_SUBQUEUE_HEAD (qid)) = elt;
+  if (head == 0)
+    (QID_SUBQUEUE_TAIL (qid)) = elt;
+}
+
 static int
 subqueue_emptyp (qid_t qid)
 {
   return ((QID_SUBQUEUE_HEAD (qid)) == 0);
 }
+\f
+int
+OS2_tqueue_select (tqueue_t * tqueue, int blockp)
+{
+  while (1)
+    {
+      msg_t * message = (read_tqueue (tqueue, blockp));
+      if ((TQUEUE_TYPE (tqueue)) == tqt_scm)
+       {
+         process_interrupt_messages ();
+         if (pending_interrupts_p ())
+           return (-2);
+       }
+      if (message != 0)
+       return (MSG_SENDER (message));
+      if (!blockp)
+       return (-1);
+    }
+}
 
 static msg_t *
 read_tqueue (tqueue_t * tqueue, int blockp)
@@ -433,10 +461,14 @@ write_tqueue (tqueue_t * tqueue, msg_t * message)
 typedef struct
 {
   tqueue_type_t type;
-  HQUEUE queue;                        /* queue */
-  HEV event;                   /* associated event semaphore */
+  msg_list_t * head;           /* queue */
+  msg_list_t * tail;
+  HMTX mutex;                  /* mutex semaphore */
+  HEV event;                   /* event semaphore */
 } std_tqueue_t;
-#define STD_TQUEUE_QUEUE(q) (((std_tqueue_t *) (q)) -> queue)
+#define STD_TQUEUE_HEAD(q) (((std_tqueue_t *) (q)) -> head)
+#define STD_TQUEUE_TAIL(q) (((std_tqueue_t *) (q)) -> tail)
+#define STD_TQUEUE_MUTEX(q) (((std_tqueue_t *) (q)) -> mutex)
 #define STD_TQUEUE_EVENT(q) (((std_tqueue_t *) (q)) -> event)
 
 tqueue_t *
@@ -444,7 +476,9 @@ OS2_make_std_tqueue (void)
 {
   tqueue_t * tqueue = (OS_malloc (sizeof (std_tqueue_t)));
   (TQUEUE_TYPE (tqueue)) = tqt_std;
-  (STD_TQUEUE_QUEUE (tqueue)) = (OS2_create_queue (QUE_FIFO));
+  (STD_TQUEUE_HEAD (tqueue)) = 0;
+  (STD_TQUEUE_TAIL (tqueue)) = 0;
+  (STD_TQUEUE_MUTEX (tqueue)) = (OS2_create_mutex_semaphore (0, 0));
   (STD_TQUEUE_EVENT (tqueue)) = (OS2_create_event_semaphore (0, 0));
   return (tqueue);
 }
@@ -452,43 +486,65 @@ OS2_make_std_tqueue (void)
 void
 OS2_close_std_tqueue (tqueue_t * tqueue)
 {
-  OS2_close_queue (STD_TQUEUE_QUEUE (tqueue));
   OS2_close_event_semaphore (STD_TQUEUE_EVENT (tqueue));
+  OS2_close_event_semaphore (STD_TQUEUE_MUTEX (tqueue));
+  while ((STD_TQUEUE_HEAD (tqueue)) != 0)
+    {
+      msg_list_t * this = (STD_TQUEUE_HEAD (tqueue));
+      (STD_TQUEUE_HEAD (tqueue)) = (this -> next);
+      OS2_destroy_message (this -> message);
+      OS_free (this);
+    }
   OS_free (tqueue);
 }
 
 static msg_t *
 read_std_tqueue (tqueue_t * tqueue, int blockp)
 {
-  ULONG type;
-  ULONG length;
-  PVOID data;
-  msg_t * message;
-  const char * s = "Non-message read from message queue.";
-
-  if (!OS2_read_queue ((STD_TQUEUE_QUEUE (tqueue)),
-                      (&type),
-                      (&length),
-                      (&data),
-                      (blockp ? NULLHANDLE : (STD_TQUEUE_EVENT (tqueue)))))
-    return (0);
-  if (length < (sizeof (msg_t)))
-    OS2_logic_error (s);
-  message = ((msg_t *) data);
-  if ((type != 0) || (length != (MSG_LENGTH (message))))
-    OS2_logic_error (s);
-  write_subqueue (message);
-  return (message);
+  while (1)
+    {
+      OS2_request_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
+      if ((STD_TQUEUE_HEAD (tqueue)) != 0)
+       {
+         msg_list_t * element = (STD_TQUEUE_HEAD (tqueue));
+         (STD_TQUEUE_HEAD (tqueue)) = (element -> next);
+         /* This prevents the 16 bit counter inside the event
+            semaphore from overflowing, in the unlikely situation
+            that the semaphore is not waited on for a long period.  */
+         (void) OS2_reset_event_semaphore (STD_TQUEUE_EVENT (tqueue));
+         OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
+         {
+           msg_t * message = (element -> message);
+           OS_free (element);
+           write_subqueue (message);
+           return (message);
+         }
+       }
+      if (!blockp)
+       {
+         OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
+         return (0);
+       }
+      (void) OS2_reset_event_semaphore (STD_TQUEUE_EVENT (tqueue));
+      OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
+      (void) OS2_wait_event_semaphore ((STD_TQUEUE_EVENT (tqueue)), 1);
+    }
 }
 
 static void
 write_std_tqueue (tqueue_t * tqueue, msg_t * message)
 {
-  OS2_write_queue ((STD_TQUEUE_QUEUE (tqueue)),
-                  0,
-                  (MSG_LENGTH (message)),
-                  ((PVOID) message),
-                  0);
+  msg_list_t * element = (OS_malloc (sizeof (msg_list_t)));
+  (element -> message) = message;
+  (element -> next) = 0;
+  OS2_request_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
+  if ((STD_TQUEUE_HEAD (tqueue)) == 0)
+    (STD_TQUEUE_HEAD (tqueue)) = element;
+  else
+    ((STD_TQUEUE_TAIL (tqueue)) -> next) = element;
+  (STD_TQUEUE_TAIL (tqueue)) = element;
+  (void) OS2_post_event_semaphore (STD_TQUEUE_EVENT (tqueue));
+  OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
 }
 \f
 static tqueue_t *