From b573c333af6218ba95264a1ac0369b152b336bf7 Mon Sep 17 00:00:00 2001 From: Chris Hanson Date: Thu, 5 Jan 1995 23:43:32 +0000 Subject: [PATCH] Change implementation of readahead in input channels that require it. Readahead is now buffered in the input queue by means of a new operation to push a message back onto the head of the queue. Also, there is now a mechanism to delay starting the readahead thread until the first read occurs on the channel -- this is used for input pipes that are passed to child processes and never read from the Scheme end. It's important that such pipes not be read from. --- v7/src/microcode/os2cthrd.c | 106 ++++++++++++++++++++++-------------- v7/src/microcode/os2cthrd.h | 22 ++++---- v7/src/microcode/os2pipe.c | 27 ++++++--- 3 files changed, 96 insertions(+), 59 deletions(-) diff --git a/v7/src/microcode/os2cthrd.c b/v7/src/microcode/os2cthrd.c index 546fc05c7..eb9298a24 100644 --- a/v7/src/microcode/os2cthrd.c +++ b/v7/src/microcode/os2cthrd.c @@ -1,8 +1,8 @@ /* -*-C-*- -$Id: os2cthrd.c,v 1.3 1994/12/19 22:30:49 cph Exp $ +$Id: os2cthrd.c,v 1.4 1995/01/05 23:42:42 cph Exp $ -Copyright (c) 1994 Massachusetts Institute of Technology +Copyright (c) 1994-95 Massachusetts Institute of Technology This material was developed by the Scheme project at the Massachusetts Institute of Technology, Department of Electrical Engineering and @@ -36,9 +36,10 @@ MIT in each case. */ #include "os2.h" +static void start_readahead_thread (channel_context_t *); static msg_list_t * new_list (void); static msg_t * new_message (void); - + void OS2_initialize_channel_thread_messages (void) { @@ -52,9 +53,8 @@ OS2_make_channel_context (void) channel_context_t * context = (OS_malloc (sizeof (channel_context_t))); OS2_make_qid_pair ((& (CHANNEL_CONTEXT_READER_QID (context))), (& (CHANNEL_CONTEXT_WRITER_QID (context)))); - (CHANNEL_CONTEXT_READAHEAD (context)) = 0; - (CHANNEL_CONTEXT_READAHEAD_INDEX (context)) = 0; (CHANNEL_CONTEXT_EOFP (context)) = 0; + (CHANNEL_CONTEXT_FIRST_READ_P (context)) = 1; return (context); } @@ -62,68 +62,94 @@ void OS2_channel_thread_close (Tchannel channel) { channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel)); - /* Closing handle should force input thread to kill itself. */ + /* Closing handle forces input thread to kill itself. */ STD_API_CALL (dos_close, (CHANNEL_HANDLE (channel))); - if ((CHANNEL_CONTEXT_READAHEAD (context)) != 0) - OS2_destroy_message (CHANNEL_CONTEXT_READAHEAD (context)); + /* If the thread hasn't been read from yet, then it is blocked + waiting for the readahead_ack message to wake it up. In this + case, send the message -- the thread should immediately notice + that the handle is closed, and kill itself. */ + start_readahead_thread (context); OS2_close_qid (CHANNEL_CONTEXT_READER_QID (context)); OS_free (context); } + +qid_t +OS2_channel_thread_descriptor (Tchannel channel) +{ + channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel)); + /* Make sure that the readahead thread is started, so that when + input arrives it will be registered properly so that the "select" + emulation will notice it. */ + start_readahead_thread (context); + return (CHANNEL_CONTEXT_READER_QID (context)); +} + +static void +start_readahead_thread (channel_context_t * context) +{ + /* Wake up the reader thread if this is the first time we are + operating on it. This is necessary because we sometimes don't + want to read from the channel at all -- for example, when the + channel is the read side of a pipe that is being passed to a + child process. */ + if (CHANNEL_CONTEXT_FIRST_READ_P (context)) + { + OS2_send_message ((CHANNEL_CONTEXT_READER_QID (context)), + (OS2_make_readahead_ack ())); + (CHANNEL_CONTEXT_FIRST_READ_P (context)) = 0; + } +} +msg_t * +OS2_make_readahead (void) +{ + msg_t * message = (OS2_create_message (mt_readahead)); + (SM_READAHEAD_INDEX (message)) = 0; + return (message); +} + long OS2_channel_thread_read (Tchannel channel, char * buffer, size_t size) { channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel)); qid_t qid = (CHANNEL_CONTEXT_READER_QID (context)); msg_t * message; - unsigned int index; - unsigned int navail; - if (CHANNEL_CONTEXT_EOFP (context)) + unsigned short index; + unsigned short navail; + if ((CHANNEL_CONTEXT_EOFP (context)) || (size == 0)) return (0); - message = (CHANNEL_CONTEXT_READAHEAD (context)); - index = (CHANNEL_CONTEXT_READAHEAD_INDEX (context)); + start_readahead_thread (context); + message = (OS2_receive_message (qid, (!CHANNEL_NONBLOCKING (channel)), 1)); if (message == 0) + return (-1); + if (OS2_error_message_p (message)) { - if (CHANNEL_NONBLOCKING (channel)) - { - message = (OS2_receive_message (qid, 0, 1)); - if (message == 0) - { - (CHANNEL_CONTEXT_READAHEAD (context)) = 0; - return (-1); - } - } - else - message = (OS2_receive_message (qid, 1, 1)); - /* Acknowledge the message so that the readahead thread will - know that it is safe to start reading some more. */ OS2_send_message (qid, (OS2_make_readahead_ack ())); - if (OS2_error_message_p (message)) - OS2_handle_error_message (message); - if ((MSG_TYPE (message)) != mt_readahead) - OS2_logic_error ("Illegal message from channel thread."); - index = 0; + OS2_handle_error_message (message); } - if ((SM_READAHEAD_SIZE (message)) == 0) + if ((MSG_TYPE (message)) != mt_readahead) + OS2_logic_error ("Illegal message from channel thread."); + index = (SM_READAHEAD_INDEX (message)); + if (index == 0) + OS2_send_message (qid, (OS2_make_readahead_ack ())); + navail = ((SM_READAHEAD_SIZE (message)) - index); + if (navail == 0) { OS2_destroy_message (message); - (CHANNEL_CONTEXT_READAHEAD (context)) = 0; (CHANNEL_CONTEXT_EOFP (context)) = 1; return (0); } - navail = ((SM_READAHEAD_SIZE (message)) - index); - if (navail <= size) + else if (navail <= size) { - FASTCOPY ((SM_READAHEAD_DATA (message)), buffer, navail); + FASTCOPY (((SM_READAHEAD_DATA (message)) + index), buffer, navail); OS2_destroy_message (message); - (CHANNEL_CONTEXT_READAHEAD (context)) = 0; return (navail); } else { - FASTCOPY ((SM_READAHEAD_DATA (message)), buffer, size); - (CHANNEL_CONTEXT_READAHEAD (context)) = message; - (CHANNEL_CONTEXT_READAHEAD_INDEX (context)) = (index + size); + FASTCOPY (((SM_READAHEAD_DATA (message)) + index), buffer, size); + (SM_READAHEAD_INDEX (message)) += size; + OS2_unread_message (qid, message); return (size); } } diff --git a/v7/src/microcode/os2cthrd.h b/v7/src/microcode/os2cthrd.h index 417d73a1e..f59ef09a5 100644 --- a/v7/src/microcode/os2cthrd.h +++ b/v7/src/microcode/os2cthrd.h @@ -1,8 +1,8 @@ /* -*-C-*- -$Id: os2cthrd.h,v 1.2 1994/12/02 20:41:38 cph Exp $ +$Id: os2cthrd.h,v 1.3 1995/01/05 23:42:50 cph Exp $ -Copyright (c) 1994 Massachusetts Institute of Technology +Copyright (c) 1994-95 Massachusetts Institute of Technology This material was developed by the Scheme project at the Massachusetts Institute of Technology, Department of Electrical Engineering and @@ -41,28 +41,29 @@ MIT in each case. */ typedef struct { + TID tid; qid_t reader_qid; qid_t writer_qid; - msg_t * readahead; - unsigned int readahead_index; - char eofp; + unsigned int eofp : 1; + unsigned int first_read_p : 1; } channel_context_t; +#define CHANNEL_CONTEXT_TID(c) ((c) -> tid) #define CHANNEL_CONTEXT_READER_QID(c) ((c) -> reader_qid) #define CHANNEL_CONTEXT_WRITER_QID(c) ((c) -> writer_qid) -#define CHANNEL_CONTEXT_READAHEAD(c) ((c) -> readahead) -#define CHANNEL_CONTEXT_READAHEAD_INDEX(c) ((c) -> readahead_index) #define CHANNEL_CONTEXT_EOFP(c) ((c) -> eofp) +#define CHANNEL_CONTEXT_FIRST_READ_P(c) ((c) -> first_read_p) -typedef struct sm_readahead_s +typedef struct { DECLARE_MSG_HEADER_FIELDS; - ULONG size; + unsigned short size; + unsigned short index; char data [SM_READAHEAD_MAX]; } sm_readahead_t; #define SM_READAHEAD_SIZE(m) (((sm_readahead_t *) (m)) -> size) +#define SM_READAHEAD_INDEX(m) (((sm_readahead_t *) (m)) -> index) #define SM_READAHEAD_DATA(m) (((sm_readahead_t *) (m)) -> data) -#define OS2_make_readahead() OS2_create_message (mt_readahead) #define OS2_make_readahead_ack() OS2_create_message (mt_readahead_ack) typedef msg_t sm_readahead_ack_t; @@ -82,6 +83,7 @@ extern readahead_buffer_t * OS2_make_readahead_buffer (void); extern int OS2_readahead_buffer_emptyp (readahead_buffer_t *); extern void OS2_readahead_buffer_insert (readahead_buffer_t *, char); extern char OS2_readahead_buffer_rubout (readahead_buffer_t *); +extern msg_t * OS2_make_readahead (void); extern msg_t * OS2_readahead_buffer_read (readahead_buffer_t *); extern msg_list_t * OS2_readahead_buffer_read_all (readahead_buffer_t *); diff --git a/v7/src/microcode/os2pipe.c b/v7/src/microcode/os2pipe.c index f2ec5edd6..88e7f8e89 100644 --- a/v7/src/microcode/os2pipe.c +++ b/v7/src/microcode/os2pipe.c @@ -1,8 +1,8 @@ /* -*-C-*- -$Id: os2pipe.c,v 1.2 1994/12/02 20:41:57 cph Exp $ +$Id: os2pipe.c,v 1.3 1995/01/05 23:43:32 cph Exp $ -Copyright (c) 1994 Massachusetts Institute of Technology +Copyright (c) 1994-95 Massachusetts Institute of Technology This material was developed by the Scheme project at the Massachusetts Institute of Technology, Department of Electrical Engineering and @@ -53,7 +53,7 @@ OS_make_pipe (Tchannel * readerp, Tchannel * writerp) (*writerp) = (OS2_make_channel (hwrite, CHANNEL_WRITE)); transaction_commit (); } - + void OS2_initialize_pipe_channel (Tchannel channel) { @@ -62,12 +62,16 @@ OS2_initialize_pipe_channel (Tchannel channel) channel_context_t * context = (OS2_make_channel_context ()); (CHANNEL_OPERATOR_CONTEXT (channel)) = context; OS2_open_qid ((CHANNEL_CONTEXT_READER_QID (context)), OS2_scheme_tqueue); - (void) OS2_beginthread - (input_pipe_thread, (CHANNEL_POINTER (channel)), 0); + OS2_open_qid + ((CHANNEL_CONTEXT_WRITER_QID (context)), (OS2_make_std_tqueue ())); + (CHANNEL_CONTEXT_TID (context)) + = (OS2_beginthread (input_pipe_thread, + (CHANNEL_POINTER (channel)), + 0)); (CHANNEL_OPERATOR (channel)) = input_pipe_operator; } } - + static void input_pipe_operator (Tchannel channel, chop_t operation, choparg_t arg1, choparg_t arg2, choparg_t arg3) @@ -95,19 +99,24 @@ input_pipe_thread (void * arg) LHANDLE handle = (CHANNEL_HANDLE (channel)); channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel)); qid_t qid = (CHANNEL_CONTEXT_WRITER_QID (context)); - OS2_open_qid (qid, (OS2_make_std_tqueue ())); (void) OS2_thread_initialize (qid); + /* Wait for first read request before doing anything. */ + OS2_wait_for_readahead_ack (qid); while (1) { msg_t * message = (OS2_make_readahead ()); + ULONG nread; APIRET rc = (dos_read (handle, (SM_READAHEAD_DATA (message)), (sizeof (SM_READAHEAD_DATA (message))), - (& (SM_READAHEAD_SIZE (message))))); + (& nread))); int eofp; if (rc == NO_ERROR) - eofp = ((SM_READAHEAD_SIZE (message)) == 0); + { + (SM_READAHEAD_SIZE (message)) = nread; + eofp = (nread == 0); + } else { OS2_destroy_message (message); -- 2.25.1