Two changes to improve inter-thread message-transmission performance:
authorChris Hanson <org/chris-hanson/cph>
Sun, 11 May 1997 06:35:46 +0000 (06:35 +0000)
committerChris Hanson <org/chris-hanson/cph>
Sun, 11 May 1997 06:35:46 +0000 (06:35 +0000)
1. Eliminate "msg_list_t" type and implement a fifo abstraction that
   is powerful enough to support the previous applications of message
   lists.  This eliminates the calls to malloc and free that were
   needed to build the lists, and replaces them by an aggregate,
   amortized cost.

2. When transmitting a message for which a receiving thread is
   waiting, call DosSleep to give up the rest of the transmitting
   thread's time slice, so that the receiving thread will run as soon
   as possible.

v7/src/microcode/os2conio.c
v7/src/microcode/os2cthrd.c
v7/src/microcode/os2cthrd.h
v7/src/microcode/os2msg.c
v7/src/microcode/os2msg.h
v7/src/microcode/os2pmcon.c

index abc5e6cb1a60742e1cacd2f97ff8e729f6e26159..2651a1ae3e93fbcc732608a870163b32e5367114 100644 (file)
@@ -1,8 +1,8 @@
 /* -*-C-*-
 
-$Id: os2conio.c,v 1.8 1995/05/07 05:54:12 cph Exp $
+$Id: os2conio.c,v 1.9 1997/05/11 06:35:46 cph Exp $
 
-Copyright (c) 1994-95 Massachusetts Institute of Technology
+Copyright (c) 1994-97 Massachusetts Institute of Technology
 
 This material was developed by the Scheme project at the Massachusetts
 Institute of Technology, Department of Electrical Engineering and
@@ -91,7 +91,7 @@ static int input_buffered_p;
 static int output_cooked_p;
 static qid_t console_writer_qid;
 static channel_context_t * console_context;
-static readahead_buffer_t * line_buffer;
+static void * line_buffer;
 
 TID OS2_console_tid;
 
@@ -272,17 +272,20 @@ do_rubout (void)
 static void
 finish_line (void)
 {
-  msg_list_t * messages;
+  msg_t ** messages;
+  msg_t ** scan;
   grab_console_lock ();
   messages = (OS2_readahead_buffer_read_all (line_buffer));
   release_console_lock ();
-  while (messages != 0)
+  scan = messages;
+  while (1)
     {
-      msg_list_t * element = messages;
-      messages = (messages -> next);
-      send_readahead (element -> message);
-      OS_free (element);
+      msg_t * msg = (*scan++);
+      if (msg == 0)
+       break;
+      send_readahead (msg);
     }
+  OS_free (messages);
 }
 
 static void
@@ -345,17 +348,20 @@ console_operator (Tchannel channel, chop_t operation,
 static void
 flush_input (void)
 {
-  msg_list_t * messages;
+  msg_t ** messages;
+  msg_t ** scan;
   grab_console_lock ();
   messages = (OS2_readahead_buffer_read_all (line_buffer));
   release_console_lock ();
-  while (messages != 0)
+  scan = messages;
+  while (1)
     {
-      msg_list_t * element = messages;
-      messages = (messages -> next);
-      OS2_destroy_message (element -> message);
-      OS_free (element);
+      msg_t * msg = (*scan++);
+      if (msg == 0)
+       break;
+      OS2_destroy_message (msg);
     }
+  OS_free (messages);
 }
 
 static void
index 94078425a43e485e99b35adddca08bde24e4bc37..d840d819eb4418784af4bf507d768030c80e3e13 100644 (file)
@@ -1,8 +1,8 @@
 /* -*-C-*-
 
-$Id: os2cthrd.c,v 1.8 1996/05/09 20:21:20 cph Exp $
+$Id: os2cthrd.c,v 1.9 1997/05/11 06:35:29 cph Exp $
 
-Copyright (c) 1994-96 Massachusetts Institute of Technology
+Copyright (c) 1994-97 Massachusetts Institute of Technology
 
 This material was developed by the Scheme project at the Massachusetts
 Institute of Technology, Department of Electrical Engineering and
@@ -39,7 +39,6 @@ MIT in each case. */
 static void run_channel_thread (void *);
 static void start_readahead_thread (channel_context_t *);
 static void send_readahead_ack (qid_t, enum readahead_ack_action);
-static msg_list_t * new_list (void);
 static msg_t * new_message (void);
 \f
 typedef struct
@@ -240,50 +239,18 @@ OS2_wait_for_readahead_ack (qid_t qid)
   return (action);
 }
 \f
-readahead_buffer_t *
-OS2_make_readahead_buffer (void)
-{
-  readahead_buffer_t * buffer = (OS_malloc (sizeof (readahead_buffer_t)));
-  (buffer -> head) = 0;
-  (buffer -> tail) = 0;
-  return (buffer);
-}
-
-int
-OS2_readahead_buffer_emptyp (readahead_buffer_t * buffer)
-{
-  return ((buffer -> head) == 0);
-}
-
 void
-OS2_readahead_buffer_insert (readahead_buffer_t * buffer, char c)
+OS2_readahead_buffer_insert (void * buffer, char c)
 {
-  if ((buffer -> head) == 0)
-    {
-      msg_list_t * tail = (new_list ());
-      (buffer -> head) = tail;
-      (buffer -> tail) = tail;
-    }
-  else if ((SM_READAHEAD_SIZE ((buffer -> tail) -> message))
-          == SM_READAHEAD_MAX)
+  msg_t * last = (OS2_msg_fifo_last (buffer));
+  if ((last != 0) && ((SM_READAHEAD_SIZE (last)) < SM_READAHEAD_MAX))
+    ((SM_READAHEAD_DATA (last)) [(SM_READAHEAD_SIZE (last))++]) = c;
+  else
     {
-      msg_list_t * tail = (new_list ());
-      ((buffer -> tail) -> next) = tail;
-      (buffer -> tail) = tail;
+      msg_t * message = (new_message ());
+      ((SM_READAHEAD_DATA (message)) [(SM_READAHEAD_SIZE (message))++]) = c;
+      OS2_msg_fifo_insert (buffer, message);
     }
-  {
-    msg_t * message = ((buffer -> tail) -> message);
-    ((SM_READAHEAD_DATA (message)) [(SM_READAHEAD_SIZE (message))++]) = c;
-  }
-}
-
-static msg_list_t *
-new_list (void)
-{
-  msg_list_t * cell = (OS_malloc (sizeof (msg_list_t)));
-  (cell -> message) = (new_message ());
-  (cell -> next) = 0;
-  return (cell);
 }
 
 static msg_t *
@@ -293,29 +260,18 @@ new_message (void)
   (SM_READAHEAD_SIZE (message)) = 0;
   return (message);
 }
-\f
+
 char
-OS2_readahead_buffer_rubout (readahead_buffer_t * buffer)
+OS2_readahead_buffer_rubout (void * buffer)
 {
-  if ((buffer -> head) == 0)
+  msg_t * message = (OS2_msg_fifo_last (buffer));
+  if (message == 0)
     OS2_logic_error ("Rubout from empty readahead buffer.");
   {
-    msg_t * message = ((buffer -> tail) -> message);
     char c = ((SM_READAHEAD_DATA (message)) [--(SM_READAHEAD_SIZE (message))]);
     if ((SM_READAHEAD_SIZE (message)) == 0)
       {
-       msg_list_t * tail = (buffer -> tail);
-       msg_list_t * prev = (buffer -> head);
-       if (prev == tail)
-         (buffer -> head) = 0;
-       else
-         {
-           while ((prev -> next) != tail)
-             prev = (prev -> next);
-           (prev -> next) = 0;
-           (buffer -> tail) = prev;
-         }
-       OS_free (tail);
+       OS2_msg_fifo_remove_last (buffer);
        OS2_destroy_message (message);
       }
     return (c);
@@ -323,24 +279,8 @@ OS2_readahead_buffer_rubout (readahead_buffer_t * buffer)
 }
 
 msg_t *
-OS2_readahead_buffer_read (readahead_buffer_t * buffer)
-{
-  msg_list_t * head = (buffer -> head);
-  if (head == 0)
-    return (new_message ());
-  else
-    {
-      msg_t * message = (head -> message);
-      (buffer -> head) = (head -> next);
-      OS_free (head);
-      return (message);
-    }
-}
-
-msg_list_t *
-OS2_readahead_buffer_read_all (readahead_buffer_t * buffer)
+OS2_readahead_buffer_read (void * buffer)
 {
-  msg_list_t * head = (buffer -> head);
-  (buffer -> head) = 0;
-  return (head);
+  msg_t * message = (OS2_msg_fifo_remove (buffer));
+  return ((message == 0) ? (new_message ()) : message);
 }
index ae44deb7c931103a4603d06feaff4d12d48196f1..0f0803b8f2e7bd8bf3fd67d47ec63a90a87969be 100644 (file)
@@ -1,8 +1,8 @@
 /* -*-C-*-
 
-$Id: os2cthrd.h,v 1.5 1996/05/09 20:21:30 cph Exp $
+$Id: os2cthrd.h,v 1.6 1997/05/11 06:35:23 cph Exp $
 
-Copyright (c) 1994-96 Massachusetts Institute of Technology
+Copyright (c) 1994-97 Massachusetts Institute of Technology
 
 This material was developed by the Scheme project at the Massachusetts
 Institute of Technology, Department of Electrical Engineering and
@@ -85,18 +85,15 @@ extern long OS2_channel_thread_read (Tchannel, char *, size_t);
 extern enum readahead_ack_action OS2_wait_for_readahead_ack (qid_t);
 extern void OS2_channel_thread_close (Tchannel);
 
-typedef struct
-{
-  msg_list_t * head;
-  msg_list_t * tail;
-} readahead_buffer_t;
-
-extern readahead_buffer_t * OS2_make_readahead_buffer (void);
-extern int OS2_readahead_buffer_emptyp (readahead_buffer_t *);
-extern void OS2_readahead_buffer_insert (readahead_buffer_t *, char);
-extern char OS2_readahead_buffer_rubout (readahead_buffer_t *);
+#define OS2_make_readahead_buffer OS2_create_msg_fifo
+#define OS2_readahead_buffer_emptyp OS2_msg_fifo_emptyp
+
+extern void OS2_readahead_buffer_insert (void *, char);
+extern char OS2_readahead_buffer_rubout (void *);
 extern msg_t * OS2_make_readahead (void);
-extern msg_t * OS2_readahead_buffer_read (readahead_buffer_t *);
-extern msg_list_t * OS2_readahead_buffer_read_all (readahead_buffer_t *);
+extern msg_t * OS2_readahead_buffer_read (void *);
+
+#define OS2_readahead_buffer_read_all(b)                               \
+  ((msg_t **) (OS2_msg_fifo_remove_all (b)))
 
 #endif /* SCM_OS2CTHRD_H */
index 71e3b50ca0a150e293d9dc7ae6fbf55d3820edce..85ba11276d1e3a47e1de8239ae02092a36510c4f 100644 (file)
@@ -1,8 +1,8 @@
 /* -*-C-*-
 
-$Id: os2msg.c,v 1.10 1995/11/03 01:22:09 cph Exp $
+$Id: os2msg.c,v 1.11 1997/05/11 06:35:05 cph Exp $
 
-Copyright (c) 1994-95 Massachusetts Institute of Technology
+Copyright (c) 1994-97 Massachusetts Institute of Technology
 
 This material was developed by the Scheme project at the Massachusetts
 Institute of Technology, Department of Electrical Engineering and
@@ -58,8 +58,7 @@ typedef struct
   qid_t twin;                  /* other end of connection */
   qid_receive_filter_t filter; /* filter for received messages */
   tqueue_t * tqueue;           /* thread queue for reception */
-  msg_list_t * subqueue_head;  /* head of receiving subqueue */
-  msg_list_t * subqueue_tail;  /* tail of receiving subqueue */
+  void * subqueue;             /* receiving subqueue */
 } iqid_t;
 
 static iqid_t queue_array [QID_MAX + 1];
@@ -73,8 +72,7 @@ qid_t OS2_interrupt_qid;
 #define QID_ALLOCATEDP(q) ((_QID (q)) . allocatedp)
 #define QID_TWIN(q) ((_QID (q)) . twin)
 #define QID_TQUEUE(q) ((_QID (q)) . tqueue)
-#define QID_SUBQUEUE_HEAD(q) ((_QID (q)) . subqueue_head)
-#define QID_SUBQUEUE_TAIL(q) ((_QID (q)) . subqueue_tail)
+#define QID_SUBQUEUE(q) ((_QID (q)) . subqueue)
 #define QID_FILTER(q) ((_QID (q)) . filter)
 
 #define MSG_QUEUE_TYPE(m) 0
@@ -90,8 +88,7 @@ OS2_initialize_message_queues (void)
        (QID_ALLOCATEDP (qid)) = 0;
        (QID_TQUEUE (qid)) = 0;
        (QID_TWIN (qid)) = QID_NONE;
-       (QID_SUBQUEUE_HEAD (qid)) = 0;
-       (QID_SUBQUEUE_TAIL (qid)) = 0;
+       (QID_SUBQUEUE (qid)) = 0;
        if (qid == QID_MAX)
          break;
        qid += 1;
@@ -135,8 +132,7 @@ allocate_qid (void)
   (QID_ALLOCATEDP (qid)) = 1;
   (QID_TQUEUE (qid)) = 0;
   (QID_TWIN (qid)) = QID_NONE;
-  (QID_SUBQUEUE_HEAD (qid)) = 0;
-  (QID_SUBQUEUE_TAIL (qid)) = 0;
+  (QID_SUBQUEUE (qid)) = (OS2_create_msg_fifo ());
   OS2_release_mutex_semaphore (qid_lock);
   return (qid);
 }
@@ -160,13 +156,15 @@ OS2_qid_openp (qid_t qid)
 void
 OS2_close_qid (qid_t qid)
 {
-  while ((QID_SUBQUEUE_HEAD (qid)) != 0)
+  while (1)
     {
-      msg_list_t * this = (QID_SUBQUEUE_HEAD (qid));
-      (QID_SUBQUEUE_HEAD (qid)) = (this -> next);
-      OS2_destroy_message (this -> message);
-      OS_free (this);
+      msg_t * msg = (OS2_msg_fifo_remove (QID_SUBQUEUE (qid)));
+      if (msg == 0)
+       break;
+      OS2_destroy_message (msg);
     }
+  OS2_destroy_msg_fifo (QID_SUBQUEUE (qid));
+  (QID_SUBQUEUE (qid)) = 0;
   OS2_request_mutex_semaphore (qid_lock);
   {
     qid_t twin = (QID_TWIN (qid));
@@ -333,16 +331,19 @@ OS2_send_message (qid_t qid, msg_t * message)
        generated by the PM thread.  So it's just simpler to ignore
        messages after the receiver decides it's no longer interested
        in them.  */
-    OS2_destroy_message (message);
+    {
+      OS2_release_mutex_semaphore (qid_lock);
+      OS2_destroy_message (message);
+    }
   else
     {
       tqueue_t * tqueue = (QID_TQUEUE (twin));
+      OS2_release_mutex_semaphore (qid_lock);
       if (tqueue == 0)
        OS2_logic_error ("Write to unopened QID.");
       (MSG_SENDER (message)) = twin;
       write_tqueue (tqueue, message);
     }
-  OS2_release_mutex_semaphore (qid_lock);
 }
 
 msg_t *
@@ -434,48 +435,25 @@ write_subqueue (msg_t * message)
       if (message == 0)
        return;
     }
-  {
-    msg_list_t * elt = (OS_malloc (sizeof (msg_list_t)));
-    (elt -> message) = message;
-    (elt -> next) = 0;
-    if ((QID_SUBQUEUE_HEAD (qid)) == 0)
-      (QID_SUBQUEUE_HEAD (qid)) = elt;
-    else
-      ((QID_SUBQUEUE_TAIL (qid)) -> next) = elt;
-    (QID_SUBQUEUE_TAIL (qid)) = elt;
-  }
+  OS2_msg_fifo_insert ((QID_SUBQUEUE (qid)), message);
 }
 
 static msg_t *
 read_subqueue (qid_t qid)
 {
-  msg_list_t * head = (QID_SUBQUEUE_HEAD (qid));
-  if (head == 0)
-    return (0);
-  {
-    msg_t * message = (head -> message);
-    (QID_SUBQUEUE_HEAD (qid)) = (head -> next);
-    OS_free (head);
-    return (message);
-  }
+  return (OS2_msg_fifo_remove (QID_SUBQUEUE (qid)));
 }
 
 void
 OS2_unread_message (qid_t qid, msg_t * message)
 {
-  msg_list_t * head = (QID_SUBQUEUE_HEAD (qid));
-  msg_list_t * elt = (OS_malloc (sizeof (msg_list_t)));
-  (elt -> message) = message;
-  (elt -> next) = head;
-  (QID_SUBQUEUE_HEAD (qid)) = elt;
-  if (head == 0)
-    (QID_SUBQUEUE_TAIL (qid)) = elt;
+  OS2_msg_fifo_insert_front ((QID_SUBQUEUE (qid)), message);
 }
 
 static int
 subqueue_emptyp (qid_t qid)
 {
-  return ((QID_SUBQUEUE_HEAD (qid)) == 0);
+  return (OS2_msg_fifo_emptyp (QID_SUBQUEUE (qid)));
 }
 \f
 int
@@ -522,26 +500,106 @@ write_tqueue (tqueue_t * tqueue, msg_t * message)
     }
 }
 \f
+/* Uncomment the following definition in order to use OS/2 queues.
+
+   There seems to be some kind of bug when using them, which manifests
+   itself as an access violation while reading from a socket.  I don't
+   understand this and have been unable to debug it successfully.
+
+   Since my intention was to find a way to speed up the
+   message-handling mechanism, and there is no noticeable improvement,
+   it probably isn't worth much more effort to find the bug.  */
+
+/* #define USE_OS2_QUEUES */
+#ifdef USE_OS2_QUEUES
+
+typedef struct
+{
+  tqueue_type_t type;
+  HQUEUE fifo;
+  HEV event;                   /* event semaphore */
+} std_tqueue_t;
+#define STD_TQUEUE_FIFO(q) (((std_tqueue_t *) (q)) -> fifo)
+#define STD_TQUEUE_EVENT(q) (((std_tqueue_t *) (q)) -> event)
+
+tqueue_t *
+OS2_make_std_tqueue (void)
+{
+  tqueue_t * tqueue = (OS_malloc (sizeof (std_tqueue_t)));
+  (TQUEUE_TYPE (tqueue)) = tqt_std;
+  (STD_TQUEUE_FIFO (tqueue)) = (OS2_create_queue (QUE_FIFO));
+  (STD_TQUEUE_EVENT (tqueue)) = (OS2_create_event_semaphore (0, 0));
+  return (tqueue);
+}
+
+static msg_t *
+read_std_tqueue_1 (tqueue_t * tqueue, int blockp)
+{
+  ULONG type;
+  ULONG length;
+  PVOID data;
+  return
+    ((OS2_read_queue ((STD_TQUEUE_FIFO (tqueue)),
+                     (&type),
+                     (&length),
+                     (&data),
+                     (blockp ? 0 : (STD_TQUEUE_EVENT (tqueue)))))
+     ? data
+     : 0);
+}
+
+void
+OS2_close_std_tqueue (tqueue_t * tqueue)
+{
+  while (1)
+    {
+      msg_t * msg = (read_std_tqueue_1 (tqueue, 0));
+      if (msg == 0)
+       break;
+      OS2_destroy_message (msg);
+    }
+  OS2_close_queue (STD_TQUEUE_FIFO (tqueue));
+  OS2_close_event_semaphore (STD_TQUEUE_EVENT (tqueue));
+  OS_free (tqueue);
+}
+
+static msg_t *
+read_std_tqueue (tqueue_t * tqueue, int blockp)
+{
+  msg_t * message = (read_std_tqueue_1 (tqueue, blockp));
+  if (message)
+    write_subqueue (message);
+  return (message);
+}
+
+static void
+write_std_tqueue (tqueue_t * tqueue, msg_t * message)
+{
+  OS2_write_queue ((STD_TQUEUE_FIFO (tqueue)), 0, 0, message, 0);
+}
+
+#else /* not USE_OS2_QUEUES */
+
 typedef struct
 {
   tqueue_type_t type;
-  msg_list_t * head;           /* queue */
-  msg_list_t * tail;
+  void * fifo;
+  unsigned int n_blocked;      /* # of blocked threads */
   HMTX mutex;                  /* mutex semaphore */
   HEV event;                   /* event semaphore */
 } std_tqueue_t;
-#define STD_TQUEUE_HEAD(q) (((std_tqueue_t *) (q)) -> head)
-#define STD_TQUEUE_TAIL(q) (((std_tqueue_t *) (q)) -> tail)
+#define STD_TQUEUE_FIFO(q) (((std_tqueue_t *) (q)) -> fifo)
 #define STD_TQUEUE_MUTEX(q) (((std_tqueue_t *) (q)) -> mutex)
 #define STD_TQUEUE_EVENT(q) (((std_tqueue_t *) (q)) -> event)
+#define STD_TQUEUE_N_BLOCKED(q) (((std_tqueue_t *) (q)) -> n_blocked)
 
 tqueue_t *
 OS2_make_std_tqueue (void)
 {
   tqueue_t * tqueue = (OS_malloc (sizeof (std_tqueue_t)));
   (TQUEUE_TYPE (tqueue)) = tqt_std;
-  (STD_TQUEUE_HEAD (tqueue)) = 0;
-  (STD_TQUEUE_TAIL (tqueue)) = 0;
+  (STD_TQUEUE_FIFO (tqueue)) = (OS2_create_msg_fifo ());
+  (STD_TQUEUE_N_BLOCKED (tqueue)) = 0;
   (STD_TQUEUE_MUTEX (tqueue)) = (OS2_create_mutex_semaphore (0, 0));
   (STD_TQUEUE_EVENT (tqueue)) = (OS2_create_event_semaphore (0, 0));
   return (tqueue);
@@ -552,12 +610,12 @@ OS2_close_std_tqueue (tqueue_t * tqueue)
 {
   OS2_close_event_semaphore (STD_TQUEUE_EVENT (tqueue));
   OS2_close_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
-  while ((STD_TQUEUE_HEAD (tqueue)) != 0)
+  while (1)
     {
-      msg_list_t * this = (STD_TQUEUE_HEAD (tqueue));
-      (STD_TQUEUE_HEAD (tqueue)) = (this -> next);
-      OS2_destroy_message (this -> message);
-      OS_free (this);
+      msg_t * msg = (OS2_msg_fifo_remove (STD_TQUEUE_FIFO (tqueue)));
+      if (msg == 0)
+       break;
+      OS2_destroy_message (msg);
     }
   OS_free (tqueue);
 }
@@ -565,24 +623,15 @@ OS2_close_std_tqueue (tqueue_t * tqueue)
 static msg_t *
 read_std_tqueue (tqueue_t * tqueue, int blockp)
 {
+  OS2_request_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
   while (1)
     {
-      OS2_request_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
-      if ((STD_TQUEUE_HEAD (tqueue)) != 0)
+      msg_t * message = (OS2_msg_fifo_remove (STD_TQUEUE_FIFO (tqueue)));
+      if (message != 0)
        {
-         msg_list_t * element = (STD_TQUEUE_HEAD (tqueue));
-         (STD_TQUEUE_HEAD (tqueue)) = (element -> next);
-         /* This prevents the 16 bit counter inside the event
-            semaphore from overflowing, in the unlikely situation
-            that the semaphore is not waited on for a long period.  */
-         (void) OS2_reset_event_semaphore (STD_TQUEUE_EVENT (tqueue));
          OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
-         {
-           msg_t * message = (element -> message);
-           OS_free (element);
-           write_subqueue (message);
-           return (message);
-         }
+         write_subqueue (message);
+         return (message);
        }
       if (!blockp)
        {
@@ -590,8 +639,15 @@ read_std_tqueue (tqueue_t * tqueue, int blockp)
          return (0);
        }
       (void) OS2_reset_event_semaphore (STD_TQUEUE_EVENT (tqueue));
+      (STD_TQUEUE_N_BLOCKED (tqueue)) += 1;
       OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
       (void) OS2_wait_event_semaphore ((STD_TQUEUE_EVENT (tqueue)), 1);
+      OS2_request_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
+      (STD_TQUEUE_N_BLOCKED (tqueue)) -= 1;
+      /* This prevents the 16 bit counter inside the event
+        semaphore from overflowing.  */
+      if ((STD_TQUEUE_N_BLOCKED (tqueue)) == 0)
+       (void) OS2_reset_event_semaphore (STD_TQUEUE_EVENT (tqueue));
       /* Don't wait more than once; the caller must be prepared to
         call again if a message is required.  The reason this is
         necessary is that two threads may be waiting on the same
@@ -608,18 +664,21 @@ read_std_tqueue (tqueue_t * tqueue, int blockp)
 static void
 write_std_tqueue (tqueue_t * tqueue, msg_t * message)
 {
-  msg_list_t * element = (OS_malloc (sizeof (msg_list_t)));
-  (element -> message) = message;
-  (element -> next) = 0;
   OS2_request_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
-  if ((STD_TQUEUE_HEAD (tqueue)) == 0)
-    (STD_TQUEUE_HEAD (tqueue)) = element;
+  OS2_msg_fifo_insert ((STD_TQUEUE_FIFO (tqueue)), message);
+  if ((STD_TQUEUE_N_BLOCKED (tqueue)) > 0)
+    {
+      (void) OS2_post_event_semaphore (STD_TQUEUE_EVENT (tqueue));
+      OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
+      /* Immediately transfer control to the receiver.
+        This should improve responsiveness of the system.  */
+      (void) DosSleep (0);
+    }
   else
-    ((STD_TQUEUE_TAIL (tqueue)) -> next) = element;
-  (STD_TQUEUE_TAIL (tqueue)) = element;
-  (void) OS2_post_event_semaphore (STD_TQUEUE_EVENT (tqueue));
-  OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
+    OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
 }
+
+#endif /* not USE_OS2_QUEUES */
 \f
 static tqueue_t *
 make_scm_tqueue (void)
@@ -706,3 +765,220 @@ process_interrupt_messages (void)
       OS2_destroy_message (message);
     }
 }
+\f
+#define BUFFER_MIN_LENGTH 16
+
+typedef struct
+{
+  unsigned int start;
+  unsigned int end;
+  unsigned int count;
+  unsigned int buffer_length;
+  void ** buffer;
+} msg_fifo_t;
+
+void *
+OS2_create_msg_fifo (void)
+{
+  msg_fifo_t * fifo = (OS_malloc (sizeof (msg_fifo_t)));
+  (fifo -> start) = 0;
+  (fifo -> end) = 0;
+  (fifo -> count) = 0;
+  (fifo -> buffer_length) = BUFFER_MIN_LENGTH;
+  (fifo -> buffer)
+    = (OS_malloc ((fifo -> buffer_length) * (sizeof (void *))));
+  return (fifo);
+}
+
+void
+OS2_destroy_msg_fifo (void * fp)
+{
+  OS_free (((msg_fifo_t *) fp) -> buffer);
+  OS_free (fp);
+}
+
+#define MAYBE_GROW_BUFFER(fifo)                                                \
+{                                                                      \
+  if ((fifo -> count) == (fifo -> buffer_length))                      \
+    msg_fifo_grow (fifo);                                              \
+}
+
+#define MAYBE_SHRINK_BUFFER(fifo)                                      \
+{                                                                      \
+  if (((fifo -> buffer_length) > BUFFER_MIN_LENGTH)                    \
+      && ((fifo -> count) < ((fifo -> buffer_length) / 4)))            \
+    msg_fifo_shrink (fifo);                                            \
+}
+
+static void
+msg_fifo_grow (msg_fifo_t * fifo)
+{
+  (fifo -> buffer_length) *= 2;
+  (fifo -> buffer)
+    = (OS_realloc ((fifo -> buffer),
+                  ((fifo -> buffer_length) * (sizeof (void *)))));
+  if ((fifo -> start) > 0)
+    {
+      void ** from = (fifo -> buffer);
+      void ** stop = ((fifo -> buffer) + (fifo -> start));
+      void ** to = ((fifo -> buffer) + (fifo -> count));
+      while (from < stop)
+       (*to++) = (*from++);
+      (fifo -> end) += (fifo -> count);
+    }
+}
+
+static void
+msg_fifo_shrink (msg_fifo_t * fifo)
+{
+  if ((fifo -> start) > (fifo -> end))
+    {
+      void ** from = ((fifo -> buffer) + (fifo -> start));
+      void ** stop = ((fifo -> buffer) + (fifo -> buffer_length));
+      void ** to = (from - ((fifo -> buffer_length) / 2));
+      while (from < stop)
+       (*to++) = (*from++);
+      (fifo -> start) -= ((fifo -> buffer_length) / 2);
+    }
+  else if ((fifo -> end) > ((fifo -> buffer_length) / 2))
+    {
+      void ** from = ((fifo -> buffer) + (fifo -> start));
+      void ** stop = ((fifo -> buffer) + (fifo -> end));
+      void ** to = (fifo -> buffer);
+      while (from < stop)
+       (*to++) = (*from++);
+      (fifo -> start) = 0;
+      (fifo -> end) = (fifo -> count);
+    }
+  (fifo -> buffer_length) /= 2;
+  (fifo -> buffer)
+    = (OS_realloc ((fifo -> buffer),
+                  ((fifo -> buffer_length) * (sizeof (void *)))));
+}
+
+void
+OS2_msg_fifo_insert (void * fp, void * element)
+{
+  msg_fifo_t * fifo = fp;
+  MAYBE_GROW_BUFFER (fifo);
+  ((fifo -> buffer) [fifo -> end]) = element;
+  (fifo -> end) += 1;
+  (fifo -> count) += 1;
+  if ((fifo -> end) == (fifo -> buffer_length))
+    (fifo -> end) = 0;
+}
+
+void
+OS2_msg_fifo_insert_front (void * fp, void * element)
+{
+  msg_fifo_t * fifo = fp;
+  MAYBE_GROW_BUFFER (fifo);
+  if ((fifo -> start) == 0)
+    (fifo -> start) = (fifo -> buffer_length);
+  (fifo -> start) -= 1;
+  ((fifo -> buffer) [fifo -> start]) = element;
+  (fifo -> count) += 1;
+}
+
+void *
+OS2_msg_fifo_remove (void * fp)
+{
+  msg_fifo_t * fifo = fp;
+  void * element;
+  if ((fifo -> count) == 0)
+    return (0);
+  element = ((fifo -> buffer) [fifo -> start]);
+  (fifo -> start) += 1;
+  (fifo -> count) -= 1;
+  if ((fifo -> count) == 0)
+    {
+      (fifo -> start) = 0;
+      (fifo -> end) = 0;
+    }
+  else if ((fifo -> start) == (fifo -> buffer_length))
+    (fifo -> start) = 0;
+  MAYBE_SHRINK_BUFFER (fifo);
+  return (element);
+}
+
+void *
+OS2_msg_fifo_remove_last (void * fp)
+{
+  msg_fifo_t * fifo = fp;
+  void * element;
+  if ((fifo -> count) == 0)
+    return (0);
+  if ((fifo -> end) == 0)
+    (fifo -> end) = (fifo -> buffer_length);
+  (fifo -> end) -= 1;
+  element = ((fifo -> buffer) [fifo -> end]);
+  (fifo -> count) -= 1;
+  if ((fifo -> count) == 0)
+    {
+      (fifo -> start) = 0;
+      (fifo -> end) = 0;
+    }
+  MAYBE_SHRINK_BUFFER (fifo);
+  return (element);
+}
+
+void **
+OS2_msg_fifo_remove_all (void * fp)
+{
+  msg_fifo_t * fifo = fp;
+  void ** result = (OS_malloc (((fifo -> count) + 1) * (sizeof (void *))));
+  void ** from = ((fifo -> buffer) + (fifo -> start));
+  void ** stop;
+  void ** to = result;
+  if ((fifo -> start) < (fifo -> end))
+    {
+      stop = ((fifo -> buffer) + (fifo -> end));
+      while (from < stop)
+       (*to++) = (*from++);
+    }
+  else if ((fifo -> count) > 0)
+    {
+      stop = ((fifo -> buffer) + (fifo -> buffer_length));
+      while (from < stop)
+       (*to++) = (*from++);
+      from = (fifo -> buffer);
+      stop = ((fifo -> buffer) + (fifo -> end));
+      while (from < stop)
+       (*to++) = (*from++);
+    }
+  (*to) = 0;
+  (fifo -> start) = 0;
+  (fifo -> end) = 0;
+  (fifo -> count) = 0;
+  if ((fifo -> buffer_length) > BUFFER_MIN_LENGTH)
+    {
+      (fifo -> buffer_length) = (fifo -> buffer_length);
+      (fifo -> buffer)
+       = (OS_realloc ((fifo -> buffer),
+                      ((fifo -> buffer_length) * (sizeof (void *)))));
+    }
+  return (result);
+}
+
+int
+OS2_msg_fifo_emptyp (void * fp)
+{
+  return ((((msg_fifo_t *) fp) -> count) == 0);
+}
+
+unsigned int
+OS2_msg_fifo_count (void * fp)
+{
+  return (((msg_fifo_t *) fp) -> count);
+}
+
+void *
+OS2_msg_fifo_last (void * fp)
+{
+  msg_fifo_t * fifo = fp;
+  if ((fifo -> count) == 0)
+    return (0);
+  return ((fifo -> buffer)
+         [(((fifo -> end) == 0) ? (fifo -> buffer_length) : (fifo -> end))
+          - 1]);
+}
index 402c16b65d86a4980f99992f9bfb5d2a4de5684f..d83dd616ba052113fff23a152e21635c76cad21e 100644 (file)
@@ -1,8 +1,8 @@
 /* -*-C-*-
 
-$Id: os2msg.h,v 1.13 1995/11/03 01:29:04 cph Exp $
+$Id: os2msg.h,v 1.14 1997/05/11 06:35:16 cph Exp $
 
-Copyright (c) 1994-95 Massachusetts Institute of Technology
+Copyright (c) 1994-97 Massachusetts Institute of Technology
 
 This material was developed by the Scheme project at the Massachusetts
 Institute of Technology, Department of Electrical Engineering and
@@ -175,6 +175,17 @@ extern void OS2_unread_message (qid_t, msg_t *);
 extern int OS2_tqueue_select (tqueue_t *, int);
 extern tqueue_t * OS2_make_std_tqueue (void);
 extern void OS2_close_std_tqueue (tqueue_t *);
+
+extern void * OS2_create_msg_fifo (void);
+void OS2_destroy_msg_fifo (void *);
+extern void OS2_msg_fifo_insert (void *, void *);
+extern void OS2_msg_fifo_insert_front (void *, void *);
+extern void * OS2_msg_fifo_remove (void *);
+extern void * OS2_msg_fifo_remove_last (void *);
+extern void ** OS2_msg_fifo_remove_all (void *);
+extern int OS2_msg_fifo_emptyp (void *);
+extern unsigned int OS2_msg_fifo_count (void *);
+extern void * OS2_msg_fifo_last (void *);
 \f
 #define MSG_LENGTH(m) (OS2_message_type_length (MSG_TYPE (m)))
 
@@ -183,12 +194,6 @@ extern void OS2_close_std_tqueue (tqueue_t *);
 
 #define OS2_create_message(type) OS2_create_message_1 ((type), 0)
 
-typedef struct msg_list_s
-{
-  msg_t * message;
-  struct msg_list_s * next;
-} msg_list_t;
-
 typedef struct
 {
   DECLARE_MSG_HEADER_FIELDS;
index 5b76aff9d041399fbe668e8ca516e01e59b0b898..82d76503a46554feb82be145043258c73e778f0d 100644 (file)
@@ -1,6 +1,6 @@
 /* -*-C-*-
 
-$Id: os2pmcon.c,v 1.22 1997/01/01 10:10:34 cph Exp $
+$Id: os2pmcon.c,v 1.23 1997/05/11 06:35:37 cph Exp $
 
 Copyright (c) 1994-97 Massachusetts Institute of Technology
 
@@ -48,7 +48,6 @@ static unsigned short cy2y (unsigned short, int);
 static unsigned short x2cx (short, int);
 static unsigned short y2cy (short, int);
 static void process_events (int);
-static void enqueue_pending_event (msg_t *);
 static void initialize_marked_region (short, short);
 static void update_marked_region (short, short);
 static void unmark_marked_region (void);
@@ -98,8 +97,7 @@ static unsigned short readahead_repeat;
 static char readahead_char;
 static const char * readahead_insert;
 static const char * readahead_insert_scan;
-static msg_list_t * pending_events_head;
-static msg_list_t * pending_events_tail;
+static void * pending_events;
 static tqueue_t * console_tqueue;
 static qid_t console_event_qid;
 static qid_t console_pm_qid;
@@ -153,7 +151,7 @@ OS2_initialize_pm_console (void)
     = (WinQuerySysPointer (HWND_DESKTOP, SPTR_TEXT, FALSE));
   readahead_repeat = 0;
   readahead_insert = 0;
-  pending_events_head = 0;
+  pending_events = (OS2_create_msg_fifo ());
   console_tqueue = (OS2_make_std_tqueue ());
   {
     qid_t remote;
@@ -336,7 +334,7 @@ process_events (int blockp)
              case WM_CHAR:
              case WM_CLOSE:
              postpone_event:
-               enqueue_pending_event (message);
+               OS2_msg_fifo_insert (pending_events, message);
                message = 0;
                if (blockp)
                  return;
@@ -455,19 +453,6 @@ process_events (int blockp)
        }
     }
 }
-
-static void
-enqueue_pending_event (msg_t * message)
-{
-  msg_list_t * element = (OS_malloc (sizeof (msg_list_t)));
-  (element -> message) = message;
-  (element -> next) = 0;
-  if (pending_events_head == 0)
-    pending_events_head = element;
-  else
-    (pending_events_tail -> next) = element;
-  pending_events_tail = element;
-}
 \f
 static void
 initialize_marked_region (short x, short y)
@@ -909,16 +894,13 @@ OS2_pm_console_getch (void)
   if ((readahead_repeat == 0) && (readahead_insert == 0))
     while (1)
       {
-       process_events (pending_events_head == 0);
+       process_events (OS2_msg_fifo_emptyp (pending_events));
        {
-         msg_list_t * element = pending_events_head;
-         msg_t * message = (element -> message);
+         msg_t * message = (OS2_msg_fifo_remove (pending_events));
          ULONG msg = (SM_PM_EVENT_MSG (message));
          MPARAM mp1 = (SM_PM_EVENT_MP1 (message));
          MPARAM mp2 = (SM_PM_EVENT_MP2 (message));
-         pending_events_head = (element -> next);
          OS2_destroy_message (message);
-         OS_free (element);
          switch (msg)
            {
            case WM_CHAR: