Change implementation of readahead in input channels that require it.
authorChris Hanson <org/chris-hanson/cph>
Thu, 5 Jan 1995 23:43:32 +0000 (23:43 +0000)
committerChris Hanson <org/chris-hanson/cph>
Thu, 5 Jan 1995 23:43:32 +0000 (23:43 +0000)
Readahead is now buffered in the input queue by means of a new
operation to push a message back onto the head of the queue.  Also,
there is now a mechanism to delay starting the readahead thread until
the first read occurs on the channel -- this is used for input pipes
that are passed to child processes and never read from the Scheme end.
It's important that such pipes not be read from.

v7/src/microcode/os2cthrd.c
v7/src/microcode/os2cthrd.h
v7/src/microcode/os2pipe.c

index 546fc05c77816d07156d727678e135099e62115d..eb9298a24c9ae48597e0462d8f98666c59d5f488 100644 (file)
@@ -1,8 +1,8 @@
 /* -*-C-*-
 
-$Id: os2cthrd.c,v 1.3 1994/12/19 22:30:49 cph Exp $
+$Id: os2cthrd.c,v 1.4 1995/01/05 23:42:42 cph Exp $
 
-Copyright (c) 1994 Massachusetts Institute of Technology
+Copyright (c) 1994-95 Massachusetts Institute of Technology
 
 This material was developed by the Scheme project at the Massachusetts
 Institute of Technology, Department of Electrical Engineering and
@@ -36,9 +36,10 @@ MIT in each case. */
 
 #include "os2.h"
 
+static void start_readahead_thread (channel_context_t *);
 static msg_list_t * new_list (void);
 static msg_t * new_message (void);
-
+\f
 void
 OS2_initialize_channel_thread_messages (void)
 {
@@ -52,9 +53,8 @@ OS2_make_channel_context (void)
   channel_context_t * context = (OS_malloc (sizeof (channel_context_t)));
   OS2_make_qid_pair ((& (CHANNEL_CONTEXT_READER_QID (context))),
                     (& (CHANNEL_CONTEXT_WRITER_QID (context))));
-  (CHANNEL_CONTEXT_READAHEAD (context)) = 0;
-  (CHANNEL_CONTEXT_READAHEAD_INDEX (context)) = 0;
   (CHANNEL_CONTEXT_EOFP (context)) = 0;
+  (CHANNEL_CONTEXT_FIRST_READ_P (context)) = 1;
   return (context);
 }
 
@@ -62,68 +62,94 @@ void
 OS2_channel_thread_close (Tchannel channel)
 {
   channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
-  /* Closing handle should force input thread to kill itself.  */
+  /* Closing handle forces input thread to kill itself.  */
   STD_API_CALL (dos_close, (CHANNEL_HANDLE (channel)));
-  if ((CHANNEL_CONTEXT_READAHEAD (context)) != 0)
-    OS2_destroy_message (CHANNEL_CONTEXT_READAHEAD (context));
+  /* If the thread hasn't been read from yet, then it is blocked
+     waiting for the readahead_ack message to wake it up.  In this
+     case, send the message -- the thread should immediately notice
+     that the handle is closed, and kill itself.  */
+  start_readahead_thread (context);
   OS2_close_qid (CHANNEL_CONTEXT_READER_QID (context));
   OS_free (context);
 }
+
+qid_t
+OS2_channel_thread_descriptor (Tchannel channel)
+{
+  channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
+  /* Make sure that the readahead thread is started, so that when
+     input arrives it will be registered properly so that the "select"
+     emulation will notice it.  */
+  start_readahead_thread (context);
+  return (CHANNEL_CONTEXT_READER_QID (context));
+}
+
+static void
+start_readahead_thread (channel_context_t * context)
+{
+  /* Wake up the reader thread if this is the first time we are
+     operating on it.  This is necessary because we sometimes don't
+     want to read from the channel at all -- for example, when the
+     channel is the read side of a pipe that is being passed to a
+     child process.  */
+  if (CHANNEL_CONTEXT_FIRST_READ_P (context))
+    {
+      OS2_send_message ((CHANNEL_CONTEXT_READER_QID (context)),
+                       (OS2_make_readahead_ack ()));
+      (CHANNEL_CONTEXT_FIRST_READ_P (context)) = 0;
+    }
+}
 \f
+msg_t *
+OS2_make_readahead (void)
+{
+  msg_t * message = (OS2_create_message (mt_readahead));
+  (SM_READAHEAD_INDEX (message)) = 0;
+  return (message);
+}
+
 long
 OS2_channel_thread_read (Tchannel channel, char * buffer, size_t size)
 {
   channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
   qid_t qid = (CHANNEL_CONTEXT_READER_QID (context));
   msg_t * message;
-  unsigned int index;
-  unsigned int navail;
-  if (CHANNEL_CONTEXT_EOFP (context))
+  unsigned short index;
+  unsigned short navail;
+  if ((CHANNEL_CONTEXT_EOFP (context)) || (size == 0))
     return (0);
-  message = (CHANNEL_CONTEXT_READAHEAD (context));
-  index = (CHANNEL_CONTEXT_READAHEAD_INDEX (context));
+  start_readahead_thread (context);
+  message = (OS2_receive_message (qid, (!CHANNEL_NONBLOCKING (channel)), 1));
   if (message == 0)
+    return (-1);
+  if (OS2_error_message_p (message))
     {
-      if (CHANNEL_NONBLOCKING (channel))
-       {
-         message = (OS2_receive_message (qid, 0, 1));
-         if (message == 0)
-           {
-             (CHANNEL_CONTEXT_READAHEAD (context)) = 0;
-             return (-1);
-           }
-       }
-      else
-       message = (OS2_receive_message (qid, 1, 1));
-      /* Acknowledge the message so that the readahead thread will
-        know that it is safe to start reading some more.  */
       OS2_send_message (qid, (OS2_make_readahead_ack ()));
-      if (OS2_error_message_p (message))
-       OS2_handle_error_message (message);
-      if ((MSG_TYPE (message)) != mt_readahead)
-       OS2_logic_error ("Illegal message from channel thread.");
-      index = 0;
+      OS2_handle_error_message (message);
     }
-  if ((SM_READAHEAD_SIZE (message)) == 0)
+  if ((MSG_TYPE (message)) != mt_readahead)
+    OS2_logic_error ("Illegal message from channel thread.");
+  index = (SM_READAHEAD_INDEX (message));
+  if (index == 0)
+    OS2_send_message (qid, (OS2_make_readahead_ack ()));
+  navail = ((SM_READAHEAD_SIZE (message)) - index);
+  if (navail == 0)
     {
       OS2_destroy_message (message);
-      (CHANNEL_CONTEXT_READAHEAD (context)) = 0;
       (CHANNEL_CONTEXT_EOFP (context)) = 1;
       return (0);
     }
-  navail = ((SM_READAHEAD_SIZE (message)) - index);
-  if (navail <= size)
+  else if (navail <= size)
     {
-      FASTCOPY ((SM_READAHEAD_DATA (message)), buffer, navail);
+      FASTCOPY (((SM_READAHEAD_DATA (message)) + index), buffer, navail);
       OS2_destroy_message (message);
-      (CHANNEL_CONTEXT_READAHEAD (context)) = 0;
       return (navail);
     }
   else
     {
-      FASTCOPY ((SM_READAHEAD_DATA (message)), buffer, size);
-      (CHANNEL_CONTEXT_READAHEAD (context)) = message;
-      (CHANNEL_CONTEXT_READAHEAD_INDEX (context)) = (index + size);
+      FASTCOPY (((SM_READAHEAD_DATA (message)) + index), buffer, size);
+      (SM_READAHEAD_INDEX (message)) += size;
+      OS2_unread_message (qid, message);
       return (size);
     }
 }
index 417d73a1ea56e0ec1ee6a913dc5f884197cc966c..f59ef09a536b90a4827a86c3efd001d87b26c063 100644 (file)
@@ -1,8 +1,8 @@
 /* -*-C-*-
 
-$Id: os2cthrd.h,v 1.2 1994/12/02 20:41:38 cph Exp $
+$Id: os2cthrd.h,v 1.3 1995/01/05 23:42:50 cph Exp $
 
-Copyright (c) 1994 Massachusetts Institute of Technology
+Copyright (c) 1994-95 Massachusetts Institute of Technology
 
 This material was developed by the Scheme project at the Massachusetts
 Institute of Technology, Department of Electrical Engineering and
@@ -41,28 +41,29 @@ MIT in each case. */
 
 typedef struct
 {
+  TID tid;
   qid_t reader_qid;
   qid_t writer_qid;
-  msg_t * readahead;
-  unsigned int readahead_index;
-  char eofp;
+  unsigned int eofp : 1;
+  unsigned int first_read_p : 1;
 } channel_context_t;
+#define CHANNEL_CONTEXT_TID(c) ((c) -> tid)
 #define CHANNEL_CONTEXT_READER_QID(c) ((c) -> reader_qid)
 #define CHANNEL_CONTEXT_WRITER_QID(c) ((c) -> writer_qid)
-#define CHANNEL_CONTEXT_READAHEAD(c) ((c) -> readahead)
-#define CHANNEL_CONTEXT_READAHEAD_INDEX(c) ((c) -> readahead_index)
 #define CHANNEL_CONTEXT_EOFP(c) ((c) -> eofp)
+#define CHANNEL_CONTEXT_FIRST_READ_P(c) ((c) -> first_read_p)
 
-typedef struct sm_readahead_s
+typedef struct
 {
   DECLARE_MSG_HEADER_FIELDS;
-  ULONG size;
+  unsigned short size;
+  unsigned short index;
   char data [SM_READAHEAD_MAX];
 } sm_readahead_t;
 #define SM_READAHEAD_SIZE(m) (((sm_readahead_t *) (m)) -> size)
+#define SM_READAHEAD_INDEX(m) (((sm_readahead_t *) (m)) -> index)
 #define SM_READAHEAD_DATA(m) (((sm_readahead_t *) (m)) -> data)
 
-#define OS2_make_readahead() OS2_create_message (mt_readahead)
 #define OS2_make_readahead_ack() OS2_create_message (mt_readahead_ack)
 
 typedef msg_t sm_readahead_ack_t;
@@ -82,6 +83,7 @@ extern readahead_buffer_t * OS2_make_readahead_buffer (void);
 extern int OS2_readahead_buffer_emptyp (readahead_buffer_t *);
 extern void OS2_readahead_buffer_insert (readahead_buffer_t *, char);
 extern char OS2_readahead_buffer_rubout (readahead_buffer_t *);
+extern msg_t * OS2_make_readahead (void);
 extern msg_t * OS2_readahead_buffer_read (readahead_buffer_t *);
 extern msg_list_t * OS2_readahead_buffer_read_all (readahead_buffer_t *);
 
index f2ec5edd67e4db255fb8718c3c24e62def596be7..88e7f8e897f98169f9c8b39e9b0e7decf82b2df2 100644 (file)
@@ -1,8 +1,8 @@
 /* -*-C-*-
 
-$Id: os2pipe.c,v 1.2 1994/12/02 20:41:57 cph Exp $
+$Id: os2pipe.c,v 1.3 1995/01/05 23:43:32 cph Exp $
 
-Copyright (c) 1994 Massachusetts Institute of Technology
+Copyright (c) 1994-95 Massachusetts Institute of Technology
 
 This material was developed by the Scheme project at the Massachusetts
 Institute of Technology, Department of Electrical Engineering and
@@ -53,7 +53,7 @@ OS_make_pipe (Tchannel * readerp, Tchannel * writerp)
   (*writerp) = (OS2_make_channel (hwrite, CHANNEL_WRITE));
   transaction_commit ();
 }
-\f
+
 void
 OS2_initialize_pipe_channel (Tchannel channel)
 {
@@ -62,12 +62,16 @@ OS2_initialize_pipe_channel (Tchannel channel)
       channel_context_t * context = (OS2_make_channel_context ());
       (CHANNEL_OPERATOR_CONTEXT (channel)) = context;
       OS2_open_qid ((CHANNEL_CONTEXT_READER_QID (context)), OS2_scheme_tqueue);
-      (void) OS2_beginthread
-       (input_pipe_thread, (CHANNEL_POINTER (channel)), 0);
+      OS2_open_qid
+       ((CHANNEL_CONTEXT_WRITER_QID (context)), (OS2_make_std_tqueue ()));
+      (CHANNEL_CONTEXT_TID (context))
+       = (OS2_beginthread (input_pipe_thread,
+                           (CHANNEL_POINTER (channel)),
+                           0));
       (CHANNEL_OPERATOR (channel)) = input_pipe_operator;
     }
 }
-
+\f
 static void
 input_pipe_operator (Tchannel channel, chop_t operation,
                     choparg_t arg1, choparg_t arg2, choparg_t arg3)
@@ -95,19 +99,24 @@ input_pipe_thread (void * arg)
   LHANDLE handle = (CHANNEL_HANDLE (channel));
   channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
   qid_t qid = (CHANNEL_CONTEXT_WRITER_QID (context));
-  OS2_open_qid (qid, (OS2_make_std_tqueue ()));
   (void) OS2_thread_initialize (qid);
+  /* Wait for first read request before doing anything.  */
+  OS2_wait_for_readahead_ack (qid);
   while (1)
     {
       msg_t * message = (OS2_make_readahead ());
+      ULONG nread;
       APIRET rc
        = (dos_read (handle,
                     (SM_READAHEAD_DATA (message)),
                     (sizeof (SM_READAHEAD_DATA (message))),
-                    (& (SM_READAHEAD_SIZE (message)))));
+                    (& nread)));
       int eofp;
       if (rc == NO_ERROR)
-       eofp = ((SM_READAHEAD_SIZE (message)) == 0);
+       {
+         (SM_READAHEAD_SIZE (message)) = nread;
+         eofp = (nread == 0);
+       }
       else
        {
          OS2_destroy_message (message);