Implement new-style select() mechanism for OS/2.
authorChris Hanson <org/chris-hanson/cph>
Fri, 25 Apr 2003 05:13:14 +0000 (05:13 +0000)
committerChris Hanson <org/chris-hanson/cph>
Fri, 25 Apr 2003 05:13:14 +0000 (05:13 +0000)
v7/src/microcode/os2io.c
v7/src/microcode/os2msg.c
v7/src/microcode/os2msg.h
v7/src/microcode/pros2io.c

index 52b3e98ace5a8e53c49eeb3a26435bd755917a4e..7b6aefd392137aec936fb8045828bf1005951107 100644 (file)
@@ -1,8 +1,8 @@
 /* -*-C-*-
 
-$Id: os2io.c,v 1.10 2003/02/14 18:28:22 cph Exp $
+$Id: os2io.c,v 1.11 2003/04/25 05:13:02 cph Exp $
 
-Copyright (c) 1994-1999 Massachusetts Institute of Technology
+Copyright 1994,1995,1996,2003 Massachusetts Institute of Technology
 
 This file is part of MIT/GNU Scheme.
 
@@ -24,6 +24,7 @@ USA.
 */
 
 #include "os2.h"
+#include "os2proc.h"
 
 extern void add_reload_cleanup (void (*) (void));
 extern void OS2_initialize_console_channel (Tchannel);
@@ -38,14 +39,14 @@ Tchannel * OS2_channel_pointer_table;
 const int OS_have_select_p = 1;
 
 #ifndef OS2_DEFAULT_MAX_FH
-#define OS2_DEFAULT_MAX_FH 256
+#  define OS2_DEFAULT_MAX_FH 256
 #endif
 
 /* Set this to a larger size than OS2_DEFAULT_MAX_FH, because the
    maximum number of file handles can be increased dynamically by
    calling a primitive.  */
 #ifndef OS2_DEFAULT_CHANNEL_TABLE_SIZE
-#define OS2_DEFAULT_CHANNEL_TABLE_SIZE 1024
+#  define OS2_DEFAULT_CHANNEL_TABLE_SIZE 1024
 #endif
 
 void
@@ -369,3 +370,191 @@ OS_channel_write_string (Tchannel channel, const char * string)
   if ((OS_channel_write (channel, string, length)) != length)
     OS2_error_anonymous ();
 }
+\f
+struct select_registry_s
+{
+  unsigned int n_qids;
+  unsigned int length;
+  qid_t * qids;
+  unsigned char * qmodes;
+  unsigned char * rmodes;
+};
+
+select_registry_t
+OS_allocate_select_registry (void)
+{
+  struct select_registry_s * r
+    = (OS_malloc (sizeof (struct select_registry_s)));
+  (r -> n_qids) = 0;
+  (r -> length) = 16;
+  (r -> qids) = (OS_malloc ((sizeof (qid_t)) * (r -> length)));
+  (r -> qmodes) = (OS_malloc ((sizeof (unsigned char)) * (r -> length)));
+  (r -> rmodes) = (OS_malloc ((sizeof (unsigned char)) * (r -> length)));
+  return (r);
+}
+
+void
+OS_deallocate_select_registry (select_registry_t registry)
+{
+  struct select_registry_s * r = registry;
+  OS_free (r -> rmodes);
+  OS_free (r -> qmodes);
+  OS_free (r -> qids);
+  OS_free (r);
+}
+
+static void
+resize_select_registry (struct select_registry_s * r, int growp)
+{
+  if (growp)
+    (r -> length) *= 2;
+  else
+    (r -> length) /= 2;
+  (r -> qids)
+    = (OS_realloc ((r -> qids),
+                  ((sizeof (qid_t)) * (r -> length))));
+  (r -> qmodes)
+    = (OS_realloc ((r -> qmodes),
+                  ((sizeof (unsigned char)) * (r -> length))));
+  (r -> rmodes)
+    = (OS_realloc ((r -> rmodes),
+                  ((sizeof (unsigned char)) * (r -> length))));
+}
+
+void
+OS_add_to_select_registry (select_registry_t registry, int fd,
+                          unsigned int mode)
+{
+  struct select_registry_s * r = registry;
+  qid_t qid = fd;
+  unsigned int i = 0;
+
+  while (i < (r -> n_qids))
+    {
+      if (((r -> qids) [i]) == qid)
+       {
+         ((r -> qmodes) [i]) |= mode;
+         return;
+       }
+      i += 1;
+    }
+  if (i == (r -> length))
+    resize_select_registry (r, 1);
+  ((r -> qids) [i]) = qid;
+  ((r -> qmodes) [i]) = mode;
+  (r -> n_qids) += 1;
+}
+\f
+void
+OS_remove_from_select_registry (select_registry_t registry, int fd,
+                               unsigned int mode)
+{
+  struct select_registry_s * r = registry;
+  qid_t qid = fd;
+  unsigned int i = 0;
+
+  while (1)
+    {
+      if (i == (r -> n_qids))
+       return;
+      if (((r -> qids) [i]) == qid)
+       {
+         ((r -> qmodes) [i]) &=~ mode;
+         if (((r -> qmodes) [i]) == 0)
+           break;
+         else
+           return;
+       }
+      i += 1;
+    }
+  while (i < (r -> n_qids))
+    {
+      ((r -> qids) [i]) = ((r -> qids) [(i + 1)]);
+      ((r -> qmodes) [i]) = ((r -> qmodes) [(i + 1)]);
+      i += 1;
+    }
+  (r -> n_qids) -= 1;
+
+  if (((r -> length) > 16) && ((r -> n_qids) < ((r -> length) / 2)))
+    resize_select_registry (r, 0);
+}
+
+unsigned int
+OS_select_registry_length (select_registry_t registry)
+{
+  struct select_registry_s * r = registry;
+  return (r -> n_qids);
+}
+
+void
+OS_select_registry_result (select_registry_t registry, unsigned int index,
+                          int * fd_r, unsigned int * mode_r)
+{
+  struct select_registry_s * r = registry;
+  (*fd_r) = ((r -> qids) [index]);
+  (*mode_r) = ((r -> rmodes) [index]);
+}
+\f
+int
+OS_test_select_descriptor (int fd, int blockp, unsigned int qmode)
+{
+  qid_t qid = fd;
+  unsigned int rmode = (qmode & SELECT_MODE_WRITE);
+  if ((qmode & SELECT_MODE_READ) == 0)
+    return (rmode);
+  switch (OS2_message_availablep (qid, blockp))
+    {
+    case mat_available:
+      return (rmode | SELECT_MODE_READ);
+    case mat_not_available:
+      return (rmode);
+    case mat_interrupt:
+      return
+       ((OS_process_any_status_change ())
+        ? SELECT_PROCESS_STATUS_CHANGE
+        : SELECT_INTERRUPT);
+    default:
+      error_external_return ();
+      return (rmode | SELECT_MODE_ERROR);
+    }
+}
+
+int
+OS_test_select_registry (select_registry_t registry, int blockp)
+{
+  struct select_registry_s * r = registry;
+  unsigned int n_values = 0;
+  int interruptp = 0;
+  unsigned int i;
+
+  while (1)
+    {
+      for (i = 0; (i < (r -> n_qids)); i += 1)
+       {
+         ((r -> rmodes) [i]) = (((r -> qmodes) [i]) & SELECT_MODE_WRITE);
+         if ((((r -> qmodes) [i]) & SELECT_MODE_READ) != 0)
+           switch (OS2_message_availablep (((r -> qids) [i]), 0))
+             {
+             case mat_available:
+               ((r -> rmodes) [i]) |= SELECT_MODE_READ;
+               break;
+             case mat_interrupt:
+               interruptp = 1;
+               break;
+             }
+         if (((r -> rmodes) [i]) != 0)
+           n_values += 1;
+       }
+      if (n_values > 0)
+       return (n_values);
+      if (interruptp)
+       return
+         ((OS_process_any_status_change ())
+          ? SELECT_PROCESS_STATUS_CHANGE
+          : SELECT_INTERRUPT);
+      if (!blockp)
+       return (0);
+      if ((OS2_scheme_tqueue_block ()) == mat_interrupt)
+       interruptp = 1;
+    }
+}
index c882cf78ac105bf57f711cf9f01ea8b7d68fa56d..676766fb1b6ff643aa5cea1b226f900c10977bd3 100644 (file)
@@ -1,8 +1,8 @@
 /* -*-C-*-
 
-$Id: os2msg.c,v 1.16 2003/02/14 18:28:22 cph Exp $
+$Id: os2msg.c,v 1.17 2003/04/25 05:13:06 cph Exp $
 
-Copyright (c) 1994-2000 Massachusetts Institute of Technology
+Copyright 1994,1995,1997,2000,2003 Massachusetts Institute of Technology
 
 This file is part of MIT/GNU Scheme.
 
@@ -44,6 +44,65 @@ static msg_t * read_scm_tqueue (tqueue_t *, int);
 static void write_scm_tqueue (tqueue_t *, msg_t *);
 static void process_interrupt_messages (void);
 \f
+/*
+
+How this works
+==============
+
+This file describes the inter-thread communications mechanism.  The
+naming used here is atrocious.  Mea culpa; the code was written in
+1994, while these notes are being written in 2003.  I've learned a bit
+in the meantime and can now see how bad the code is.
+
+Every thread has an associated input queue, called its "tqueue".
+(Originally meant to be an abbreviation of "Thread QUEUE".)  All
+messages sent to that thread, from any source, are queued there in
+order of transmission.
+
+Two threads that wish to communicate must set up a "channel", which
+consists of a pair of "qid" objects.  ("qid" was originally meant to
+be an abbreviation of "Queue IDentifier".)  These objects are always
+created in pairs, one for each thread, and after creation each qid is
+associated with one of the two threads.  The other half of a qid pair
+is called its "twin".  If you are familiar with sockets, a qid is the
+analog of a socket.  A qid pair is the analog of a connection.
+
+Also associated with each qid is something called a "subqueue", which
+is simply a secondary queue for messages received from that qid's
+twin.
+
+* Suppose that thread A and thread B share two halves of a qid pair,
+  which we will call QA and QB.  Also suppose that the tqueues
+  associated with these threads are called TA and TB.
+
+* If thread A calls OS2_send_message() on QA and a message M, M's
+  "sender" is set to be QB, and M is then queued at the end of TB.
+  Additionally, if any thread is blocked on TB (only B is allowed to
+  block on TB), the event semaphore EB is posted, which wakes up the
+  threads waiting on TB.
+
+* If thread B calls OS2_receive_message() on QB, B first dequeues each
+  the queued messages from TB.  Each dequeued message M is queued at
+  the end of the subqueue of the sender of M.  For example, if M's
+  sender is QB, then the message is put into QB's subqueue.  This
+  process is repeated until all of the messages have been removed from
+  TB and dispatched to the appropriate subqueues.
+
+  The subqueue for QB is then checked; if there are any messages, the
+  first one is dequeued and then returned.  Otherwise, B blocks on TB
+  waiting for another message to arrive, by waiting on EB.  Eventually
+  the message sent by A arrives in TB, EB is posted, and B wakes up.
+
+  This process continues until a message is received.
+
+Some things to note here: the event semaphores are the primary
+synchronization mechanism for communications.  These guarantee that
+messages aren't lost due to timing errors.  Additionally, each tqueue
+has an associated mutex semaphore that is used to lock the tqueue in
+critical sections.
+
+*/
+\f
 typedef struct
 {
   unsigned int allocatedp : 1; /* queue allocated? */
@@ -462,19 +521,6 @@ subqueue_emptyp (qid_t qid)
   OS2_release_mutex_semaphore (QID_LOCK (qid));
   return (result);
 }
-\f
-int
-OS2_tqueue_select (tqueue_t * tqueue, int blockp)
-{
-  msg_t * message = (read_tqueue (tqueue, blockp));
-  if ((TQUEUE_TYPE (tqueue)) == tqt_scm)
-    {
-      process_interrupt_messages ();
-      if (pending_interrupts_p ())
-       return (-2);
-    }
-  return ((message != 0) ? (MSG_SENDER (message)) : (-1));
-}
 
 static msg_t *
 read_tqueue (tqueue_t * tqueue, int blockp)
@@ -695,8 +741,6 @@ make_scm_tqueue (void)
   return (tqueue);
 }
 
-char OS2_scheme_tqueue_avail_map [QID_MAX + 1];
-
 static msg_t *
 read_scm_tqueue (tqueue_t * tqueue, int blockp)
 {
@@ -721,7 +765,6 @@ read_scm_tqueue (tqueue_t * tqueue, int blockp)
       msg_t * message = (read_std_tqueue (tqueue, blockp));
       if (message != 0)
        {
-         (OS2_scheme_tqueue_avail_map [MSG_SENDER (message)]) = 1;
          result = message;
          /* At most one message needs to be read in blocking mode.  */
          blockp = 0;
@@ -737,7 +780,7 @@ write_scm_tqueue (tqueue_t * tqueue, msg_t * message)
   write_std_tqueue (tqueue, message);
   request_attention_interrupt ();
 }
-
+\f
 void
 OS2_handle_attention_interrupt (void)
 {
@@ -747,6 +790,19 @@ OS2_handle_attention_interrupt (void)
   process_interrupt_messages ();
 }
 
+msg_avail_t
+OS2_scheme_tqueue_block (void)
+{
+  int inputp = ((read_tqueue (OS2_scheme_tqueue, 1)) != 0);
+  process_interrupt_messages ();
+  return
+    (inputp
+     ? mat_available
+     : (pending_interrupts_p ())
+     ? mat_interrupt
+     : mat_not_available);
+}
+
 static void
 process_interrupt_messages (void)
 {
index 139c5dd18dff08e50a9d49f06c53ac9c78cb31fa..4b2555e94cc67b327fa74cf7150d8f263c126dbd 100644 (file)
@@ -1,8 +1,8 @@
 /* -*-C-*-
 
-$Id: os2msg.h,v 1.17 2003/02/14 18:28:22 cph Exp $
+$Id: os2msg.h,v 1.18 2003/04/25 05:13:10 cph Exp $
 
-Copyright (c) 1994-1999 Massachusetts Institute of Technology
+Copyright 1994,1995,1997,2003 Massachusetts Institute of Technology
 
 This file is part of MIT/GNU Scheme.
 
@@ -143,7 +143,6 @@ typedef enum { mat_not_available, mat_available, mat_interrupt } msg_avail_t;
 
 extern tqueue_t * OS2_scheme_tqueue;
 extern qid_t OS2_interrupt_qid;
-extern char OS2_scheme_tqueue_avail_map [QID_MAX + 1];
 
 extern void OS2_make_qid_pair (qid_t *, qid_t *);
 extern void OS2_open_qid (qid_t, tqueue_t *);
@@ -163,7 +162,7 @@ extern msg_avail_t OS2_message_availablep (qid_t, int);
 extern msg_t * OS2_wait_for_message (qid_t, msg_type_t);
 extern msg_t * OS2_message_transaction (qid_t, msg_t *, msg_type_t);
 extern void OS2_unread_message (qid_t, msg_t *);
-extern int OS2_tqueue_select (tqueue_t *, int);
+extern msg_avail_t OS2_scheme_tqueue_block (void);
 extern tqueue_t * OS2_make_std_tqueue (void);
 extern void OS2_close_std_tqueue (tqueue_t *);
 
index 08861ea4e3ccb607468003a660360877a71d6a51..4637a6f0ee99ea9199d2708255b599c28251bfa1 100644 (file)
@@ -1,8 +1,8 @@
 /* -*-C-*-
 
-$Id: pros2io.c,v 1.11 2003/02/14 18:28:23 cph Exp $
+$Id: pros2io.c,v 1.12 2003/04/25 05:13:14 cph Exp $
 
-Copyright (c) 1994-2000 Massachusetts Institute of Technology
+Copyright 1994,1995,1997,2000,2003 Massachusetts Institute of Technology
 
 This file is part of MIT/GNU Scheme.
 
@@ -41,10 +41,11 @@ DEFINE_PRIMITIVE ("CHANNEL-DESCRIPTOR", Prim_channel_descriptor, 1, 1, 0)
   PRIMITIVE_HEADER (1);
   {
     Tchannel channel = (arg_channel (1));
-    if (! ((CHANNEL_ABSTRACT_P (channel)) && (CHANNEL_INPUTP (channel))))
-      error_bad_range_arg (1);
     PRIMITIVE_RETURN
-      (LONG_TO_UNSIGNED_FIXNUM (OS2_channel_thread_descriptor (channel)));
+      (LONG_TO_UNSIGNED_FIXNUM
+       ((CHANNEL_ABSTRACT_P (channel))
+       ? (OS2_channel_thread_descriptor (channel))
+       : 0));
   }
 }
 
@@ -93,51 +94,29 @@ DEFINE_PRIMITIVE ("OS2-SELECT-REGISTRY-TEST", Prim_OS2_select_registry_test, 3,
     int inputp = 0;
     int interruptp = 0;
     qid_t qid;
-    int n;
 
-    /* This first phase checks the qid subqueues and OS2_scheme_tqueue
-       for any previously-queued input.  */
-  check_for_input:
-    for (qid = 0; (qid <= QID_MAX); qid += 1)
+    while (1)
       {
-       (results [qid]) = 0;
-       if ((registry [qid]) != 0)
-         switch (OS2_message_availablep (qid, 0))
-           {
-           case mat_available:
-             inputp = 1;
-             (results [qid]) = 1;
-             break;
-           case mat_interrupt:
-             interruptp = 1;
-             break;
-           }
-      }
-    /* This second phase waits for input if necessary.  It does not
-       check the subqueues for previously-stored data, so it's
-       important that we already did this.  Otherwise we could end up
-       waiting for input when there was valid input ready.  */
-    if (blockp)
-      while (! (inputp || interruptp))
-       {
-         for (qid = 0; (qid <= QID_MAX); qid += 1)
-           (OS2_scheme_tqueue_avail_map [qid]) = 0;
-         n = (OS2_tqueue_select (OS2_scheme_tqueue, blockp));
-         if (n == (-1))
-           /* If we're unblocked and there's no message in the
-              tqueue, go back and check for input again.  */
-           goto check_for_input;
-         if (n < 0)
-           interruptp = 1;
-         else
-           for (qid = 0; (qid <= QID_MAX); qid += 1)
-             if (((registry [qid]) != 0)
-                 && (OS2_scheme_tqueue_avail_map [qid]))
+       for (qid = 0; (qid <= QID_MAX); qid += 1)
+         {
+           (results [qid]) = 0;
+           if ((registry [qid]) != 0)
+             switch (OS2_message_availablep (qid, 0))
                {
+               case mat_available:
                  inputp = 1;
                  (results [qid]) = 1;
+                 break;
+               case mat_interrupt:
+                 interruptp = 1;
+                 break;
                }
-       }
+         }
+       if ((!blockp) || inputp || interruptp)
+         break;
+       if ((OS2_scheme_tqueue_block ()) == mat_interrupt)
+         interruptp = 1;
+      }
     if (inputp)
       PRIMITIVE_RETURN (LONG_TO_UNSIGNED_FIXNUM (0));
     else if (!interruptp)