From 29bb1bf8b79b4cfce07f7393817f9991296fb8c1 Mon Sep 17 00:00:00 2001 From: Chris Hanson Date: Fri, 25 Apr 2003 05:13:14 +0000 Subject: [PATCH] Implement new-style select() mechanism for OS/2. --- v7/src/microcode/os2io.c | 197 ++++++++++++++++++++++++++++++++++++- v7/src/microcode/os2msg.c | 94 ++++++++++++++---- v7/src/microcode/os2msg.h | 7 +- v7/src/microcode/pros2io.c | 67 +++++-------- 4 files changed, 294 insertions(+), 71 deletions(-) diff --git a/v7/src/microcode/os2io.c b/v7/src/microcode/os2io.c index 52b3e98ac..7b6aefd39 100644 --- a/v7/src/microcode/os2io.c +++ b/v7/src/microcode/os2io.c @@ -1,8 +1,8 @@ /* -*-C-*- -$Id: os2io.c,v 1.10 2003/02/14 18:28:22 cph Exp $ +$Id: os2io.c,v 1.11 2003/04/25 05:13:02 cph Exp $ -Copyright (c) 1994-1999 Massachusetts Institute of Technology +Copyright 1994,1995,1996,2003 Massachusetts Institute of Technology This file is part of MIT/GNU Scheme. @@ -24,6 +24,7 @@ USA. */ #include "os2.h" +#include "os2proc.h" extern void add_reload_cleanup (void (*) (void)); extern void OS2_initialize_console_channel (Tchannel); @@ -38,14 +39,14 @@ Tchannel * OS2_channel_pointer_table; const int OS_have_select_p = 1; #ifndef OS2_DEFAULT_MAX_FH -#define OS2_DEFAULT_MAX_FH 256 +# define OS2_DEFAULT_MAX_FH 256 #endif /* Set this to a larger size than OS2_DEFAULT_MAX_FH, because the maximum number of file handles can be increased dynamically by calling a primitive. */ #ifndef OS2_DEFAULT_CHANNEL_TABLE_SIZE -#define OS2_DEFAULT_CHANNEL_TABLE_SIZE 1024 +# define OS2_DEFAULT_CHANNEL_TABLE_SIZE 1024 #endif void @@ -369,3 +370,191 @@ OS_channel_write_string (Tchannel channel, const char * string) if ((OS_channel_write (channel, string, length)) != length) OS2_error_anonymous (); } + +struct select_registry_s +{ + unsigned int n_qids; + unsigned int length; + qid_t * qids; + unsigned char * qmodes; + unsigned char * rmodes; +}; + +select_registry_t +OS_allocate_select_registry (void) +{ + struct select_registry_s * r + = (OS_malloc (sizeof (struct select_registry_s))); + (r -> n_qids) = 0; + (r -> length) = 16; + (r -> qids) = (OS_malloc ((sizeof (qid_t)) * (r -> length))); + (r -> qmodes) = (OS_malloc ((sizeof (unsigned char)) * (r -> length))); + (r -> rmodes) = (OS_malloc ((sizeof (unsigned char)) * (r -> length))); + return (r); +} + +void +OS_deallocate_select_registry (select_registry_t registry) +{ + struct select_registry_s * r = registry; + OS_free (r -> rmodes); + OS_free (r -> qmodes); + OS_free (r -> qids); + OS_free (r); +} + +static void +resize_select_registry (struct select_registry_s * r, int growp) +{ + if (growp) + (r -> length) *= 2; + else + (r -> length) /= 2; + (r -> qids) + = (OS_realloc ((r -> qids), + ((sizeof (qid_t)) * (r -> length)))); + (r -> qmodes) + = (OS_realloc ((r -> qmodes), + ((sizeof (unsigned char)) * (r -> length)))); + (r -> rmodes) + = (OS_realloc ((r -> rmodes), + ((sizeof (unsigned char)) * (r -> length)))); +} + +void +OS_add_to_select_registry (select_registry_t registry, int fd, + unsigned int mode) +{ + struct select_registry_s * r = registry; + qid_t qid = fd; + unsigned int i = 0; + + while (i < (r -> n_qids)) + { + if (((r -> qids) [i]) == qid) + { + ((r -> qmodes) [i]) |= mode; + return; + } + i += 1; + } + if (i == (r -> length)) + resize_select_registry (r, 1); + ((r -> qids) [i]) = qid; + ((r -> qmodes) [i]) = mode; + (r -> n_qids) += 1; +} + +void +OS_remove_from_select_registry (select_registry_t registry, int fd, + unsigned int mode) +{ + struct select_registry_s * r = registry; + qid_t qid = fd; + unsigned int i = 0; + + while (1) + { + if (i == (r -> n_qids)) + return; + if (((r -> qids) [i]) == qid) + { + ((r -> qmodes) [i]) &=~ mode; + if (((r -> qmodes) [i]) == 0) + break; + else + return; + } + i += 1; + } + while (i < (r -> n_qids)) + { + ((r -> qids) [i]) = ((r -> qids) [(i + 1)]); + ((r -> qmodes) [i]) = ((r -> qmodes) [(i + 1)]); + i += 1; + } + (r -> n_qids) -= 1; + + if (((r -> length) > 16) && ((r -> n_qids) < ((r -> length) / 2))) + resize_select_registry (r, 0); +} + +unsigned int +OS_select_registry_length (select_registry_t registry) +{ + struct select_registry_s * r = registry; + return (r -> n_qids); +} + +void +OS_select_registry_result (select_registry_t registry, unsigned int index, + int * fd_r, unsigned int * mode_r) +{ + struct select_registry_s * r = registry; + (*fd_r) = ((r -> qids) [index]); + (*mode_r) = ((r -> rmodes) [index]); +} + +int +OS_test_select_descriptor (int fd, int blockp, unsigned int qmode) +{ + qid_t qid = fd; + unsigned int rmode = (qmode & SELECT_MODE_WRITE); + if ((qmode & SELECT_MODE_READ) == 0) + return (rmode); + switch (OS2_message_availablep (qid, blockp)) + { + case mat_available: + return (rmode | SELECT_MODE_READ); + case mat_not_available: + return (rmode); + case mat_interrupt: + return + ((OS_process_any_status_change ()) + ? SELECT_PROCESS_STATUS_CHANGE + : SELECT_INTERRUPT); + default: + error_external_return (); + return (rmode | SELECT_MODE_ERROR); + } +} + +int +OS_test_select_registry (select_registry_t registry, int blockp) +{ + struct select_registry_s * r = registry; + unsigned int n_values = 0; + int interruptp = 0; + unsigned int i; + + while (1) + { + for (i = 0; (i < (r -> n_qids)); i += 1) + { + ((r -> rmodes) [i]) = (((r -> qmodes) [i]) & SELECT_MODE_WRITE); + if ((((r -> qmodes) [i]) & SELECT_MODE_READ) != 0) + switch (OS2_message_availablep (((r -> qids) [i]), 0)) + { + case mat_available: + ((r -> rmodes) [i]) |= SELECT_MODE_READ; + break; + case mat_interrupt: + interruptp = 1; + break; + } + if (((r -> rmodes) [i]) != 0) + n_values += 1; + } + if (n_values > 0) + return (n_values); + if (interruptp) + return + ((OS_process_any_status_change ()) + ? SELECT_PROCESS_STATUS_CHANGE + : SELECT_INTERRUPT); + if (!blockp) + return (0); + if ((OS2_scheme_tqueue_block ()) == mat_interrupt) + interruptp = 1; + } +} diff --git a/v7/src/microcode/os2msg.c b/v7/src/microcode/os2msg.c index c882cf78a..676766fb1 100644 --- a/v7/src/microcode/os2msg.c +++ b/v7/src/microcode/os2msg.c @@ -1,8 +1,8 @@ /* -*-C-*- -$Id: os2msg.c,v 1.16 2003/02/14 18:28:22 cph Exp $ +$Id: os2msg.c,v 1.17 2003/04/25 05:13:06 cph Exp $ -Copyright (c) 1994-2000 Massachusetts Institute of Technology +Copyright 1994,1995,1997,2000,2003 Massachusetts Institute of Technology This file is part of MIT/GNU Scheme. @@ -44,6 +44,65 @@ static msg_t * read_scm_tqueue (tqueue_t *, int); static void write_scm_tqueue (tqueue_t *, msg_t *); static void process_interrupt_messages (void); +/* + +How this works +============== + +This file describes the inter-thread communications mechanism. The +naming used here is atrocious. Mea culpa; the code was written in +1994, while these notes are being written in 2003. I've learned a bit +in the meantime and can now see how bad the code is. + +Every thread has an associated input queue, called its "tqueue". +(Originally meant to be an abbreviation of "Thread QUEUE".) All +messages sent to that thread, from any source, are queued there in +order of transmission. + +Two threads that wish to communicate must set up a "channel", which +consists of a pair of "qid" objects. ("qid" was originally meant to +be an abbreviation of "Queue IDentifier".) These objects are always +created in pairs, one for each thread, and after creation each qid is +associated with one of the two threads. The other half of a qid pair +is called its "twin". If you are familiar with sockets, a qid is the +analog of a socket. A qid pair is the analog of a connection. + +Also associated with each qid is something called a "subqueue", which +is simply a secondary queue for messages received from that qid's +twin. + +* Suppose that thread A and thread B share two halves of a qid pair, + which we will call QA and QB. Also suppose that the tqueues + associated with these threads are called TA and TB. + +* If thread A calls OS2_send_message() on QA and a message M, M's + "sender" is set to be QB, and M is then queued at the end of TB. + Additionally, if any thread is blocked on TB (only B is allowed to + block on TB), the event semaphore EB is posted, which wakes up the + threads waiting on TB. + +* If thread B calls OS2_receive_message() on QB, B first dequeues each + the queued messages from TB. Each dequeued message M is queued at + the end of the subqueue of the sender of M. For example, if M's + sender is QB, then the message is put into QB's subqueue. This + process is repeated until all of the messages have been removed from + TB and dispatched to the appropriate subqueues. + + The subqueue for QB is then checked; if there are any messages, the + first one is dequeued and then returned. Otherwise, B blocks on TB + waiting for another message to arrive, by waiting on EB. Eventually + the message sent by A arrives in TB, EB is posted, and B wakes up. + + This process continues until a message is received. + +Some things to note here: the event semaphores are the primary +synchronization mechanism for communications. These guarantee that +messages aren't lost due to timing errors. Additionally, each tqueue +has an associated mutex semaphore that is used to lock the tqueue in +critical sections. + +*/ + typedef struct { unsigned int allocatedp : 1; /* queue allocated? */ @@ -462,19 +521,6 @@ subqueue_emptyp (qid_t qid) OS2_release_mutex_semaphore (QID_LOCK (qid)); return (result); } - -int -OS2_tqueue_select (tqueue_t * tqueue, int blockp) -{ - msg_t * message = (read_tqueue (tqueue, blockp)); - if ((TQUEUE_TYPE (tqueue)) == tqt_scm) - { - process_interrupt_messages (); - if (pending_interrupts_p ()) - return (-2); - } - return ((message != 0) ? (MSG_SENDER (message)) : (-1)); -} static msg_t * read_tqueue (tqueue_t * tqueue, int blockp) @@ -695,8 +741,6 @@ make_scm_tqueue (void) return (tqueue); } -char OS2_scheme_tqueue_avail_map [QID_MAX + 1]; - static msg_t * read_scm_tqueue (tqueue_t * tqueue, int blockp) { @@ -721,7 +765,6 @@ read_scm_tqueue (tqueue_t * tqueue, int blockp) msg_t * message = (read_std_tqueue (tqueue, blockp)); if (message != 0) { - (OS2_scheme_tqueue_avail_map [MSG_SENDER (message)]) = 1; result = message; /* At most one message needs to be read in blocking mode. */ blockp = 0; @@ -737,7 +780,7 @@ write_scm_tqueue (tqueue_t * tqueue, msg_t * message) write_std_tqueue (tqueue, message); request_attention_interrupt (); } - + void OS2_handle_attention_interrupt (void) { @@ -747,6 +790,19 @@ OS2_handle_attention_interrupt (void) process_interrupt_messages (); } +msg_avail_t +OS2_scheme_tqueue_block (void) +{ + int inputp = ((read_tqueue (OS2_scheme_tqueue, 1)) != 0); + process_interrupt_messages (); + return + (inputp + ? mat_available + : (pending_interrupts_p ()) + ? mat_interrupt + : mat_not_available); +} + static void process_interrupt_messages (void) { diff --git a/v7/src/microcode/os2msg.h b/v7/src/microcode/os2msg.h index 139c5dd18..4b2555e94 100644 --- a/v7/src/microcode/os2msg.h +++ b/v7/src/microcode/os2msg.h @@ -1,8 +1,8 @@ /* -*-C-*- -$Id: os2msg.h,v 1.17 2003/02/14 18:28:22 cph Exp $ +$Id: os2msg.h,v 1.18 2003/04/25 05:13:10 cph Exp $ -Copyright (c) 1994-1999 Massachusetts Institute of Technology +Copyright 1994,1995,1997,2003 Massachusetts Institute of Technology This file is part of MIT/GNU Scheme. @@ -143,7 +143,6 @@ typedef enum { mat_not_available, mat_available, mat_interrupt } msg_avail_t; extern tqueue_t * OS2_scheme_tqueue; extern qid_t OS2_interrupt_qid; -extern char OS2_scheme_tqueue_avail_map [QID_MAX + 1]; extern void OS2_make_qid_pair (qid_t *, qid_t *); extern void OS2_open_qid (qid_t, tqueue_t *); @@ -163,7 +162,7 @@ extern msg_avail_t OS2_message_availablep (qid_t, int); extern msg_t * OS2_wait_for_message (qid_t, msg_type_t); extern msg_t * OS2_message_transaction (qid_t, msg_t *, msg_type_t); extern void OS2_unread_message (qid_t, msg_t *); -extern int OS2_tqueue_select (tqueue_t *, int); +extern msg_avail_t OS2_scheme_tqueue_block (void); extern tqueue_t * OS2_make_std_tqueue (void); extern void OS2_close_std_tqueue (tqueue_t *); diff --git a/v7/src/microcode/pros2io.c b/v7/src/microcode/pros2io.c index 08861ea4e..4637a6f0e 100644 --- a/v7/src/microcode/pros2io.c +++ b/v7/src/microcode/pros2io.c @@ -1,8 +1,8 @@ /* -*-C-*- -$Id: pros2io.c,v 1.11 2003/02/14 18:28:23 cph Exp $ +$Id: pros2io.c,v 1.12 2003/04/25 05:13:14 cph Exp $ -Copyright (c) 1994-2000 Massachusetts Institute of Technology +Copyright 1994,1995,1997,2000,2003 Massachusetts Institute of Technology This file is part of MIT/GNU Scheme. @@ -41,10 +41,11 @@ DEFINE_PRIMITIVE ("CHANNEL-DESCRIPTOR", Prim_channel_descriptor, 1, 1, 0) PRIMITIVE_HEADER (1); { Tchannel channel = (arg_channel (1)); - if (! ((CHANNEL_ABSTRACT_P (channel)) && (CHANNEL_INPUTP (channel)))) - error_bad_range_arg (1); PRIMITIVE_RETURN - (LONG_TO_UNSIGNED_FIXNUM (OS2_channel_thread_descriptor (channel))); + (LONG_TO_UNSIGNED_FIXNUM + ((CHANNEL_ABSTRACT_P (channel)) + ? (OS2_channel_thread_descriptor (channel)) + : 0)); } } @@ -93,51 +94,29 @@ DEFINE_PRIMITIVE ("OS2-SELECT-REGISTRY-TEST", Prim_OS2_select_registry_test, 3, int inputp = 0; int interruptp = 0; qid_t qid; - int n; - /* This first phase checks the qid subqueues and OS2_scheme_tqueue - for any previously-queued input. */ - check_for_input: - for (qid = 0; (qid <= QID_MAX); qid += 1) + while (1) { - (results [qid]) = 0; - if ((registry [qid]) != 0) - switch (OS2_message_availablep (qid, 0)) - { - case mat_available: - inputp = 1; - (results [qid]) = 1; - break; - case mat_interrupt: - interruptp = 1; - break; - } - } - /* This second phase waits for input if necessary. It does not - check the subqueues for previously-stored data, so it's - important that we already did this. Otherwise we could end up - waiting for input when there was valid input ready. */ - if (blockp) - while (! (inputp || interruptp)) - { - for (qid = 0; (qid <= QID_MAX); qid += 1) - (OS2_scheme_tqueue_avail_map [qid]) = 0; - n = (OS2_tqueue_select (OS2_scheme_tqueue, blockp)); - if (n == (-1)) - /* If we're unblocked and there's no message in the - tqueue, go back and check for input again. */ - goto check_for_input; - if (n < 0) - interruptp = 1; - else - for (qid = 0; (qid <= QID_MAX); qid += 1) - if (((registry [qid]) != 0) - && (OS2_scheme_tqueue_avail_map [qid])) + for (qid = 0; (qid <= QID_MAX); qid += 1) + { + (results [qid]) = 0; + if ((registry [qid]) != 0) + switch (OS2_message_availablep (qid, 0)) { + case mat_available: inputp = 1; (results [qid]) = 1; + break; + case mat_interrupt: + interruptp = 1; + break; } - } + } + if ((!blockp) || inputp || interruptp) + break; + if ((OS2_scheme_tqueue_block ()) == mat_interrupt) + interruptp = 1; + } if (inputp) PRIMITIVE_RETURN (LONG_TO_UNSIGNED_FIXNUM (0)); else if (!interruptp) -- 2.25.1