/* -*-C-*-
-$Id: os2io.c,v 1.10 2003/02/14 18:28:22 cph Exp $
+$Id: os2io.c,v 1.11 2003/04/25 05:13:02 cph Exp $
-Copyright (c) 1994-1999 Massachusetts Institute of Technology
+Copyright 1994,1995,1996,2003 Massachusetts Institute of Technology
This file is part of MIT/GNU Scheme.
*/
#include "os2.h"
+#include "os2proc.h"
extern void add_reload_cleanup (void (*) (void));
extern void OS2_initialize_console_channel (Tchannel);
const int OS_have_select_p = 1;
#ifndef OS2_DEFAULT_MAX_FH
-#define OS2_DEFAULT_MAX_FH 256
+# define OS2_DEFAULT_MAX_FH 256
#endif
/* Set this to a larger size than OS2_DEFAULT_MAX_FH, because the
maximum number of file handles can be increased dynamically by
calling a primitive. */
#ifndef OS2_DEFAULT_CHANNEL_TABLE_SIZE
-#define OS2_DEFAULT_CHANNEL_TABLE_SIZE 1024
+# define OS2_DEFAULT_CHANNEL_TABLE_SIZE 1024
#endif
void
if ((OS_channel_write (channel, string, length)) != length)
OS2_error_anonymous ();
}
+\f
+struct select_registry_s
+{
+ unsigned int n_qids;
+ unsigned int length;
+ qid_t * qids;
+ unsigned char * qmodes;
+ unsigned char * rmodes;
+};
+
+select_registry_t
+OS_allocate_select_registry (void)
+{
+ struct select_registry_s * r
+ = (OS_malloc (sizeof (struct select_registry_s)));
+ (r -> n_qids) = 0;
+ (r -> length) = 16;
+ (r -> qids) = (OS_malloc ((sizeof (qid_t)) * (r -> length)));
+ (r -> qmodes) = (OS_malloc ((sizeof (unsigned char)) * (r -> length)));
+ (r -> rmodes) = (OS_malloc ((sizeof (unsigned char)) * (r -> length)));
+ return (r);
+}
+
+void
+OS_deallocate_select_registry (select_registry_t registry)
+{
+ struct select_registry_s * r = registry;
+ OS_free (r -> rmodes);
+ OS_free (r -> qmodes);
+ OS_free (r -> qids);
+ OS_free (r);
+}
+
+static void
+resize_select_registry (struct select_registry_s * r, int growp)
+{
+ if (growp)
+ (r -> length) *= 2;
+ else
+ (r -> length) /= 2;
+ (r -> qids)
+ = (OS_realloc ((r -> qids),
+ ((sizeof (qid_t)) * (r -> length))));
+ (r -> qmodes)
+ = (OS_realloc ((r -> qmodes),
+ ((sizeof (unsigned char)) * (r -> length))));
+ (r -> rmodes)
+ = (OS_realloc ((r -> rmodes),
+ ((sizeof (unsigned char)) * (r -> length))));
+}
+
+void
+OS_add_to_select_registry (select_registry_t registry, int fd,
+ unsigned int mode)
+{
+ struct select_registry_s * r = registry;
+ qid_t qid = fd;
+ unsigned int i = 0;
+
+ while (i < (r -> n_qids))
+ {
+ if (((r -> qids) [i]) == qid)
+ {
+ ((r -> qmodes) [i]) |= mode;
+ return;
+ }
+ i += 1;
+ }
+ if (i == (r -> length))
+ resize_select_registry (r, 1);
+ ((r -> qids) [i]) = qid;
+ ((r -> qmodes) [i]) = mode;
+ (r -> n_qids) += 1;
+}
+\f
+void
+OS_remove_from_select_registry (select_registry_t registry, int fd,
+ unsigned int mode)
+{
+ struct select_registry_s * r = registry;
+ qid_t qid = fd;
+ unsigned int i = 0;
+
+ while (1)
+ {
+ if (i == (r -> n_qids))
+ return;
+ if (((r -> qids) [i]) == qid)
+ {
+ ((r -> qmodes) [i]) &=~ mode;
+ if (((r -> qmodes) [i]) == 0)
+ break;
+ else
+ return;
+ }
+ i += 1;
+ }
+ while (i < (r -> n_qids))
+ {
+ ((r -> qids) [i]) = ((r -> qids) [(i + 1)]);
+ ((r -> qmodes) [i]) = ((r -> qmodes) [(i + 1)]);
+ i += 1;
+ }
+ (r -> n_qids) -= 1;
+
+ if (((r -> length) > 16) && ((r -> n_qids) < ((r -> length) / 2)))
+ resize_select_registry (r, 0);
+}
+
+unsigned int
+OS_select_registry_length (select_registry_t registry)
+{
+ struct select_registry_s * r = registry;
+ return (r -> n_qids);
+}
+
+void
+OS_select_registry_result (select_registry_t registry, unsigned int index,
+ int * fd_r, unsigned int * mode_r)
+{
+ struct select_registry_s * r = registry;
+ (*fd_r) = ((r -> qids) [index]);
+ (*mode_r) = ((r -> rmodes) [index]);
+}
+\f
+int
+OS_test_select_descriptor (int fd, int blockp, unsigned int qmode)
+{
+ qid_t qid = fd;
+ unsigned int rmode = (qmode & SELECT_MODE_WRITE);
+ if ((qmode & SELECT_MODE_READ) == 0)
+ return (rmode);
+ switch (OS2_message_availablep (qid, blockp))
+ {
+ case mat_available:
+ return (rmode | SELECT_MODE_READ);
+ case mat_not_available:
+ return (rmode);
+ case mat_interrupt:
+ return
+ ((OS_process_any_status_change ())
+ ? SELECT_PROCESS_STATUS_CHANGE
+ : SELECT_INTERRUPT);
+ default:
+ error_external_return ();
+ return (rmode | SELECT_MODE_ERROR);
+ }
+}
+
+int
+OS_test_select_registry (select_registry_t registry, int blockp)
+{
+ struct select_registry_s * r = registry;
+ unsigned int n_values = 0;
+ int interruptp = 0;
+ unsigned int i;
+
+ while (1)
+ {
+ for (i = 0; (i < (r -> n_qids)); i += 1)
+ {
+ ((r -> rmodes) [i]) = (((r -> qmodes) [i]) & SELECT_MODE_WRITE);
+ if ((((r -> qmodes) [i]) & SELECT_MODE_READ) != 0)
+ switch (OS2_message_availablep (((r -> qids) [i]), 0))
+ {
+ case mat_available:
+ ((r -> rmodes) [i]) |= SELECT_MODE_READ;
+ break;
+ case mat_interrupt:
+ interruptp = 1;
+ break;
+ }
+ if (((r -> rmodes) [i]) != 0)
+ n_values += 1;
+ }
+ if (n_values > 0)
+ return (n_values);
+ if (interruptp)
+ return
+ ((OS_process_any_status_change ())
+ ? SELECT_PROCESS_STATUS_CHANGE
+ : SELECT_INTERRUPT);
+ if (!blockp)
+ return (0);
+ if ((OS2_scheme_tqueue_block ()) == mat_interrupt)
+ interruptp = 1;
+ }
+}
/* -*-C-*-
-$Id: os2msg.c,v 1.16 2003/02/14 18:28:22 cph Exp $
+$Id: os2msg.c,v 1.17 2003/04/25 05:13:06 cph Exp $
-Copyright (c) 1994-2000 Massachusetts Institute of Technology
+Copyright 1994,1995,1997,2000,2003 Massachusetts Institute of Technology
This file is part of MIT/GNU Scheme.
static void write_scm_tqueue (tqueue_t *, msg_t *);
static void process_interrupt_messages (void);
\f
+/*
+
+How this works
+==============
+
+This file describes the inter-thread communications mechanism. The
+naming used here is atrocious. Mea culpa; the code was written in
+1994, while these notes are being written in 2003. I've learned a bit
+in the meantime and can now see how bad the code is.
+
+Every thread has an associated input queue, called its "tqueue".
+(Originally meant to be an abbreviation of "Thread QUEUE".) All
+messages sent to that thread, from any source, are queued there in
+order of transmission.
+
+Two threads that wish to communicate must set up a "channel", which
+consists of a pair of "qid" objects. ("qid" was originally meant to
+be an abbreviation of "Queue IDentifier".) These objects are always
+created in pairs, one for each thread, and after creation each qid is
+associated with one of the two threads. The other half of a qid pair
+is called its "twin". If you are familiar with sockets, a qid is the
+analog of a socket. A qid pair is the analog of a connection.
+
+Also associated with each qid is something called a "subqueue", which
+is simply a secondary queue for messages received from that qid's
+twin.
+
+* Suppose that thread A and thread B share two halves of a qid pair,
+ which we will call QA and QB. Also suppose that the tqueues
+ associated with these threads are called TA and TB.
+
+* If thread A calls OS2_send_message() on QA and a message M, M's
+ "sender" is set to be QB, and M is then queued at the end of TB.
+ Additionally, if any thread is blocked on TB (only B is allowed to
+ block on TB), the event semaphore EB is posted, which wakes up the
+ threads waiting on TB.
+
+* If thread B calls OS2_receive_message() on QB, B first dequeues each
+ the queued messages from TB. Each dequeued message M is queued at
+ the end of the subqueue of the sender of M. For example, if M's
+ sender is QB, then the message is put into QB's subqueue. This
+ process is repeated until all of the messages have been removed from
+ TB and dispatched to the appropriate subqueues.
+
+ The subqueue for QB is then checked; if there are any messages, the
+ first one is dequeued and then returned. Otherwise, B blocks on TB
+ waiting for another message to arrive, by waiting on EB. Eventually
+ the message sent by A arrives in TB, EB is posted, and B wakes up.
+
+ This process continues until a message is received.
+
+Some things to note here: the event semaphores are the primary
+synchronization mechanism for communications. These guarantee that
+messages aren't lost due to timing errors. Additionally, each tqueue
+has an associated mutex semaphore that is used to lock the tqueue in
+critical sections.
+
+*/
+\f
typedef struct
{
unsigned int allocatedp : 1; /* queue allocated? */
OS2_release_mutex_semaphore (QID_LOCK (qid));
return (result);
}
-\f
-int
-OS2_tqueue_select (tqueue_t * tqueue, int blockp)
-{
- msg_t * message = (read_tqueue (tqueue, blockp));
- if ((TQUEUE_TYPE (tqueue)) == tqt_scm)
- {
- process_interrupt_messages ();
- if (pending_interrupts_p ())
- return (-2);
- }
- return ((message != 0) ? (MSG_SENDER (message)) : (-1));
-}
static msg_t *
read_tqueue (tqueue_t * tqueue, int blockp)
return (tqueue);
}
-char OS2_scheme_tqueue_avail_map [QID_MAX + 1];
-
static msg_t *
read_scm_tqueue (tqueue_t * tqueue, int blockp)
{
msg_t * message = (read_std_tqueue (tqueue, blockp));
if (message != 0)
{
- (OS2_scheme_tqueue_avail_map [MSG_SENDER (message)]) = 1;
result = message;
/* At most one message needs to be read in blocking mode. */
blockp = 0;
write_std_tqueue (tqueue, message);
request_attention_interrupt ();
}
-
+\f
void
OS2_handle_attention_interrupt (void)
{
process_interrupt_messages ();
}
+msg_avail_t
+OS2_scheme_tqueue_block (void)
+{
+ int inputp = ((read_tqueue (OS2_scheme_tqueue, 1)) != 0);
+ process_interrupt_messages ();
+ return
+ (inputp
+ ? mat_available
+ : (pending_interrupts_p ())
+ ? mat_interrupt
+ : mat_not_available);
+}
+
static void
process_interrupt_messages (void)
{
/* -*-C-*-
-$Id: os2msg.h,v 1.17 2003/02/14 18:28:22 cph Exp $
+$Id: os2msg.h,v 1.18 2003/04/25 05:13:10 cph Exp $
-Copyright (c) 1994-1999 Massachusetts Institute of Technology
+Copyright 1994,1995,1997,2003 Massachusetts Institute of Technology
This file is part of MIT/GNU Scheme.
extern tqueue_t * OS2_scheme_tqueue;
extern qid_t OS2_interrupt_qid;
-extern char OS2_scheme_tqueue_avail_map [QID_MAX + 1];
extern void OS2_make_qid_pair (qid_t *, qid_t *);
extern void OS2_open_qid (qid_t, tqueue_t *);
extern msg_t * OS2_wait_for_message (qid_t, msg_type_t);
extern msg_t * OS2_message_transaction (qid_t, msg_t *, msg_type_t);
extern void OS2_unread_message (qid_t, msg_t *);
-extern int OS2_tqueue_select (tqueue_t *, int);
+extern msg_avail_t OS2_scheme_tqueue_block (void);
extern tqueue_t * OS2_make_std_tqueue (void);
extern void OS2_close_std_tqueue (tqueue_t *);
/* -*-C-*-
-$Id: pros2io.c,v 1.11 2003/02/14 18:28:23 cph Exp $
+$Id: pros2io.c,v 1.12 2003/04/25 05:13:14 cph Exp $
-Copyright (c) 1994-2000 Massachusetts Institute of Technology
+Copyright 1994,1995,1997,2000,2003 Massachusetts Institute of Technology
This file is part of MIT/GNU Scheme.
PRIMITIVE_HEADER (1);
{
Tchannel channel = (arg_channel (1));
- if (! ((CHANNEL_ABSTRACT_P (channel)) && (CHANNEL_INPUTP (channel))))
- error_bad_range_arg (1);
PRIMITIVE_RETURN
- (LONG_TO_UNSIGNED_FIXNUM (OS2_channel_thread_descriptor (channel)));
+ (LONG_TO_UNSIGNED_FIXNUM
+ ((CHANNEL_ABSTRACT_P (channel))
+ ? (OS2_channel_thread_descriptor (channel))
+ : 0));
}
}
int inputp = 0;
int interruptp = 0;
qid_t qid;
- int n;
- /* This first phase checks the qid subqueues and OS2_scheme_tqueue
- for any previously-queued input. */
- check_for_input:
- for (qid = 0; (qid <= QID_MAX); qid += 1)
+ while (1)
{
- (results [qid]) = 0;
- if ((registry [qid]) != 0)
- switch (OS2_message_availablep (qid, 0))
- {
- case mat_available:
- inputp = 1;
- (results [qid]) = 1;
- break;
- case mat_interrupt:
- interruptp = 1;
- break;
- }
- }
- /* This second phase waits for input if necessary. It does not
- check the subqueues for previously-stored data, so it's
- important that we already did this. Otherwise we could end up
- waiting for input when there was valid input ready. */
- if (blockp)
- while (! (inputp || interruptp))
- {
- for (qid = 0; (qid <= QID_MAX); qid += 1)
- (OS2_scheme_tqueue_avail_map [qid]) = 0;
- n = (OS2_tqueue_select (OS2_scheme_tqueue, blockp));
- if (n == (-1))
- /* If we're unblocked and there's no message in the
- tqueue, go back and check for input again. */
- goto check_for_input;
- if (n < 0)
- interruptp = 1;
- else
- for (qid = 0; (qid <= QID_MAX); qid += 1)
- if (((registry [qid]) != 0)
- && (OS2_scheme_tqueue_avail_map [qid]))
+ for (qid = 0; (qid <= QID_MAX); qid += 1)
+ {
+ (results [qid]) = 0;
+ if ((registry [qid]) != 0)
+ switch (OS2_message_availablep (qid, 0))
{
+ case mat_available:
inputp = 1;
(results [qid]) = 1;
+ break;
+ case mat_interrupt:
+ interruptp = 1;
+ break;
}
- }
+ }
+ if ((!blockp) || inputp || interruptp)
+ break;
+ if ((OS2_scheme_tqueue_block ()) == mat_interrupt)
+ interruptp = 1;
+ }
if (inputp)
PRIMITIVE_RETURN (LONG_TO_UNSIGNED_FIXNUM (0));
else if (!interruptp)