/* -*-C-*-
-$Id: os2cthrd.c,v 1.3 1994/12/19 22:30:49 cph Exp $
+$Id: os2cthrd.c,v 1.4 1995/01/05 23:42:42 cph Exp $
-Copyright (c) 1994 Massachusetts Institute of Technology
+Copyright (c) 1994-95 Massachusetts Institute of Technology
This material was developed by the Scheme project at the Massachusetts
Institute of Technology, Department of Electrical Engineering and
#include "os2.h"
+static void start_readahead_thread (channel_context_t *);
static msg_list_t * new_list (void);
static msg_t * new_message (void);
-
+\f
void
OS2_initialize_channel_thread_messages (void)
{
channel_context_t * context = (OS_malloc (sizeof (channel_context_t)));
OS2_make_qid_pair ((& (CHANNEL_CONTEXT_READER_QID (context))),
(& (CHANNEL_CONTEXT_WRITER_QID (context))));
- (CHANNEL_CONTEXT_READAHEAD (context)) = 0;
- (CHANNEL_CONTEXT_READAHEAD_INDEX (context)) = 0;
(CHANNEL_CONTEXT_EOFP (context)) = 0;
+ (CHANNEL_CONTEXT_FIRST_READ_P (context)) = 1;
return (context);
}
OS2_channel_thread_close (Tchannel channel)
{
channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
- /* Closing handle should force input thread to kill itself. */
+ /* Closing handle forces input thread to kill itself. */
STD_API_CALL (dos_close, (CHANNEL_HANDLE (channel)));
- if ((CHANNEL_CONTEXT_READAHEAD (context)) != 0)
- OS2_destroy_message (CHANNEL_CONTEXT_READAHEAD (context));
+ /* If the thread hasn't been read from yet, then it is blocked
+ waiting for the readahead_ack message to wake it up. In this
+ case, send the message -- the thread should immediately notice
+ that the handle is closed, and kill itself. */
+ start_readahead_thread (context);
OS2_close_qid (CHANNEL_CONTEXT_READER_QID (context));
OS_free (context);
}
+
+qid_t
+OS2_channel_thread_descriptor (Tchannel channel)
+{
+ channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
+ /* Make sure that the readahead thread is started, so that when
+ input arrives it will be registered properly so that the "select"
+ emulation will notice it. */
+ start_readahead_thread (context);
+ return (CHANNEL_CONTEXT_READER_QID (context));
+}
+
+static void
+start_readahead_thread (channel_context_t * context)
+{
+ /* Wake up the reader thread if this is the first time we are
+ operating on it. This is necessary because we sometimes don't
+ want to read from the channel at all -- for example, when the
+ channel is the read side of a pipe that is being passed to a
+ child process. */
+ if (CHANNEL_CONTEXT_FIRST_READ_P (context))
+ {
+ OS2_send_message ((CHANNEL_CONTEXT_READER_QID (context)),
+ (OS2_make_readahead_ack ()));
+ (CHANNEL_CONTEXT_FIRST_READ_P (context)) = 0;
+ }
+}
\f
+msg_t *
+OS2_make_readahead (void)
+{
+ msg_t * message = (OS2_create_message (mt_readahead));
+ (SM_READAHEAD_INDEX (message)) = 0;
+ return (message);
+}
+
long
OS2_channel_thread_read (Tchannel channel, char * buffer, size_t size)
{
channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
qid_t qid = (CHANNEL_CONTEXT_READER_QID (context));
msg_t * message;
- unsigned int index;
- unsigned int navail;
- if (CHANNEL_CONTEXT_EOFP (context))
+ unsigned short index;
+ unsigned short navail;
+ if ((CHANNEL_CONTEXT_EOFP (context)) || (size == 0))
return (0);
- message = (CHANNEL_CONTEXT_READAHEAD (context));
- index = (CHANNEL_CONTEXT_READAHEAD_INDEX (context));
+ start_readahead_thread (context);
+ message = (OS2_receive_message (qid, (!CHANNEL_NONBLOCKING (channel)), 1));
if (message == 0)
+ return (-1);
+ if (OS2_error_message_p (message))
{
- if (CHANNEL_NONBLOCKING (channel))
- {
- message = (OS2_receive_message (qid, 0, 1));
- if (message == 0)
- {
- (CHANNEL_CONTEXT_READAHEAD (context)) = 0;
- return (-1);
- }
- }
- else
- message = (OS2_receive_message (qid, 1, 1));
- /* Acknowledge the message so that the readahead thread will
- know that it is safe to start reading some more. */
OS2_send_message (qid, (OS2_make_readahead_ack ()));
- if (OS2_error_message_p (message))
- OS2_handle_error_message (message);
- if ((MSG_TYPE (message)) != mt_readahead)
- OS2_logic_error ("Illegal message from channel thread.");
- index = 0;
+ OS2_handle_error_message (message);
}
- if ((SM_READAHEAD_SIZE (message)) == 0)
+ if ((MSG_TYPE (message)) != mt_readahead)
+ OS2_logic_error ("Illegal message from channel thread.");
+ index = (SM_READAHEAD_INDEX (message));
+ if (index == 0)
+ OS2_send_message (qid, (OS2_make_readahead_ack ()));
+ navail = ((SM_READAHEAD_SIZE (message)) - index);
+ if (navail == 0)
{
OS2_destroy_message (message);
- (CHANNEL_CONTEXT_READAHEAD (context)) = 0;
(CHANNEL_CONTEXT_EOFP (context)) = 1;
return (0);
}
- navail = ((SM_READAHEAD_SIZE (message)) - index);
- if (navail <= size)
+ else if (navail <= size)
{
- FASTCOPY ((SM_READAHEAD_DATA (message)), buffer, navail);
+ FASTCOPY (((SM_READAHEAD_DATA (message)) + index), buffer, navail);
OS2_destroy_message (message);
- (CHANNEL_CONTEXT_READAHEAD (context)) = 0;
return (navail);
}
else
{
- FASTCOPY ((SM_READAHEAD_DATA (message)), buffer, size);
- (CHANNEL_CONTEXT_READAHEAD (context)) = message;
- (CHANNEL_CONTEXT_READAHEAD_INDEX (context)) = (index + size);
+ FASTCOPY (((SM_READAHEAD_DATA (message)) + index), buffer, size);
+ (SM_READAHEAD_INDEX (message)) += size;
+ OS2_unread_message (qid, message);
return (size);
}
}
/* -*-C-*-
-$Id: os2cthrd.h,v 1.2 1994/12/02 20:41:38 cph Exp $
+$Id: os2cthrd.h,v 1.3 1995/01/05 23:42:50 cph Exp $
-Copyright (c) 1994 Massachusetts Institute of Technology
+Copyright (c) 1994-95 Massachusetts Institute of Technology
This material was developed by the Scheme project at the Massachusetts
Institute of Technology, Department of Electrical Engineering and
typedef struct
{
+ TID tid;
qid_t reader_qid;
qid_t writer_qid;
- msg_t * readahead;
- unsigned int readahead_index;
- char eofp;
+ unsigned int eofp : 1;
+ unsigned int first_read_p : 1;
} channel_context_t;
+#define CHANNEL_CONTEXT_TID(c) ((c) -> tid)
#define CHANNEL_CONTEXT_READER_QID(c) ((c) -> reader_qid)
#define CHANNEL_CONTEXT_WRITER_QID(c) ((c) -> writer_qid)
-#define CHANNEL_CONTEXT_READAHEAD(c) ((c) -> readahead)
-#define CHANNEL_CONTEXT_READAHEAD_INDEX(c) ((c) -> readahead_index)
#define CHANNEL_CONTEXT_EOFP(c) ((c) -> eofp)
+#define CHANNEL_CONTEXT_FIRST_READ_P(c) ((c) -> first_read_p)
-typedef struct sm_readahead_s
+typedef struct
{
DECLARE_MSG_HEADER_FIELDS;
- ULONG size;
+ unsigned short size;
+ unsigned short index;
char data [SM_READAHEAD_MAX];
} sm_readahead_t;
#define SM_READAHEAD_SIZE(m) (((sm_readahead_t *) (m)) -> size)
+#define SM_READAHEAD_INDEX(m) (((sm_readahead_t *) (m)) -> index)
#define SM_READAHEAD_DATA(m) (((sm_readahead_t *) (m)) -> data)
-#define OS2_make_readahead() OS2_create_message (mt_readahead)
#define OS2_make_readahead_ack() OS2_create_message (mt_readahead_ack)
typedef msg_t sm_readahead_ack_t;
extern int OS2_readahead_buffer_emptyp (readahead_buffer_t *);
extern void OS2_readahead_buffer_insert (readahead_buffer_t *, char);
extern char OS2_readahead_buffer_rubout (readahead_buffer_t *);
+extern msg_t * OS2_make_readahead (void);
extern msg_t * OS2_readahead_buffer_read (readahead_buffer_t *);
extern msg_list_t * OS2_readahead_buffer_read_all (readahead_buffer_t *);
/* -*-C-*-
-$Id: os2pipe.c,v 1.2 1994/12/02 20:41:57 cph Exp $
+$Id: os2pipe.c,v 1.3 1995/01/05 23:43:32 cph Exp $
-Copyright (c) 1994 Massachusetts Institute of Technology
+Copyright (c) 1994-95 Massachusetts Institute of Technology
This material was developed by the Scheme project at the Massachusetts
Institute of Technology, Department of Electrical Engineering and
(*writerp) = (OS2_make_channel (hwrite, CHANNEL_WRITE));
transaction_commit ();
}
-\f
+
void
OS2_initialize_pipe_channel (Tchannel channel)
{
channel_context_t * context = (OS2_make_channel_context ());
(CHANNEL_OPERATOR_CONTEXT (channel)) = context;
OS2_open_qid ((CHANNEL_CONTEXT_READER_QID (context)), OS2_scheme_tqueue);
- (void) OS2_beginthread
- (input_pipe_thread, (CHANNEL_POINTER (channel)), 0);
+ OS2_open_qid
+ ((CHANNEL_CONTEXT_WRITER_QID (context)), (OS2_make_std_tqueue ()));
+ (CHANNEL_CONTEXT_TID (context))
+ = (OS2_beginthread (input_pipe_thread,
+ (CHANNEL_POINTER (channel)),
+ 0));
(CHANNEL_OPERATOR (channel)) = input_pipe_operator;
}
}
-
+\f
static void
input_pipe_operator (Tchannel channel, chop_t operation,
choparg_t arg1, choparg_t arg2, choparg_t arg3)
LHANDLE handle = (CHANNEL_HANDLE (channel));
channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
qid_t qid = (CHANNEL_CONTEXT_WRITER_QID (context));
- OS2_open_qid (qid, (OS2_make_std_tqueue ()));
(void) OS2_thread_initialize (qid);
+ /* Wait for first read request before doing anything. */
+ OS2_wait_for_readahead_ack (qid);
while (1)
{
msg_t * message = (OS2_make_readahead ());
+ ULONG nread;
APIRET rc
= (dos_read (handle,
(SM_READAHEAD_DATA (message)),
(sizeof (SM_READAHEAD_DATA (message))),
- (& (SM_READAHEAD_SIZE (message)))));
+ (& nread)));
int eofp;
if (rc == NO_ERROR)
- eofp = ((SM_READAHEAD_SIZE (message)) == 0);
+ {
+ (SM_READAHEAD_SIZE (message)) = nread;
+ eofp = (nread == 0);
+ }
else
{
OS2_destroy_message (message);