From b6afc16aee895d8fc5a3b6fd9ad92152f8bf7caa Mon Sep 17 00:00:00 2001 From: Chris Hanson Date: Thu, 5 Jan 1995 23:54:48 +0000 Subject: [PATCH] Add mechanism to "unread" messages back to the head of a message queue. Add mechanism to get the other end of a qid pair. Eliminate logic error that was being generated when sending a message to a qid whose other end had been closed (now we just discard the message). Reimplement message queues to NOT use the OS/2 queue abstraction -- apparently these queues use quite alot of memory and I recently ran into a situation in which the memory was exhausted, which caused Scheme to die. Now, the memory needed for the queues is taken from the Scheme process's virtual memory. --- v7/src/microcode/os2msg.c | 208 ++++++++++++++++++++++++-------------- 1 file changed, 132 insertions(+), 76 deletions(-) diff --git a/v7/src/microcode/os2msg.c b/v7/src/microcode/os2msg.c index a74d90d05..2ddf411cc 100644 --- a/v7/src/microcode/os2msg.c +++ b/v7/src/microcode/os2msg.c @@ -1,8 +1,8 @@ /* -*-C-*- -$Id: os2msg.c,v 1.3 1994/12/19 22:31:31 cph Exp $ +$Id: os2msg.c,v 1.4 1995/01/05 23:54:48 cph Exp $ -Copyright (c) 1994 Massachusetts Institute of Technology +Copyright (c) 1994-95 Massachusetts Institute of Technology This material was developed by the Scheme project at the Massachusetts Institute of Technology, Department of Electrical Engineering and @@ -53,7 +53,6 @@ static void process_interrupt_messages (void); typedef struct { unsigned int allocatedp : 1; /* queue allocated? */ - unsigned int openp : 1; /* queue open? */ qid_t twin; /* other end of connection */ qid_receive_filter_t filter; /* filter for received messages */ tqueue_t * tqueue; /* thread queue for reception */ @@ -159,17 +158,13 @@ OS2_qid_openp (qid_t qid) void OS2_close_qid (qid_t qid) { - { - msg_list_t * elt = (QID_SUBQUEUE_HEAD (qid)); - while (elt != 0) - { - msg_list_t * next = (elt -> next); - OS_free (elt); - elt = next; - } - } - (QID_SUBQUEUE_HEAD (qid)) = 0; - (QID_SUBQUEUE_TAIL (qid)) = 0; + while ((QID_SUBQUEUE_HEAD (qid)) != 0) + { + msg_list_t * this = (QID_SUBQUEUE_HEAD (qid)); + (QID_SUBQUEUE_HEAD (qid)) = (this -> next); + OS2_destroy_message (this -> message); + OS_free (this); + } (QID_TQUEUE (qid)) = 0; OS2_request_mutex_semaphore (qid_lock); { @@ -183,6 +178,16 @@ OS2_close_qid (qid_t qid) (QID_ALLOCATEDP (qid)) = 0; OS2_release_mutex_semaphore (qid_lock); } + +qid_t +OS2_qid_twin (qid_t qid) +{ + return (((QID_ALLOCATEDP (qid)) + && ((QID_TWIN (qid)) != QID_NONE) + && (QID_ALLOCATEDP (QID_TWIN (qid)))) + ? (QID_TWIN (qid)) + : QID_NONE); +} void OS2_close_qid_pair (qid_t qid) @@ -235,12 +240,12 @@ OS2_set_message_type_length (msg_type_t type, msg_length_t length) } msg_t * -OS2_create_message (msg_type_t type) +OS2_create_message_1 (msg_type_t type, msg_length_t extra) { /* Do allocation carefully to prevent infinite loop when signalling "out of memory" condition. */ msg_t * message = - (malloc ((unsigned long) (OS2_message_type_length (type)))); + (malloc (((unsigned long) (OS2_message_type_length (type))) + extra)); if (message == 0) if ((type == mt_syscall_error) && ((SM_SYSCALL_ERROR_CODE (message)) == ERROR_NOT_ENOUGH_MEMORY) @@ -255,7 +260,7 @@ OS2_create_message (msg_type_t type) void OS2_destroy_message (msg_t * message) { - OS_free ((void *) message); + OS_free (message); } /* Message Transmission and Reception */ @@ -264,11 +269,25 @@ void OS2_send_message (qid_t qid, msg_t * message) { qid_t twin = (QID_TWIN (qid)); - tqueue_t * tqueue = (QID_TQUEUE (twin)); - if (tqueue == 0) - OS2_logic_error ("Write to closed QID."); - (MSG_SENDER (message)) = twin; - write_tqueue (tqueue, message); + if (twin == QID_NONE) + /* Other end of connection has been closed, so discard the + message. We used to signal an error here, but this can happen + pretty easily when closing windows or exiting Scheme. The only + way to avoid this is to force synchronization of communicating + threads, which can be tricky. For example, when closing a PM + window, it's not obvious when the last message will be + generated by the PM thread. So it's just simpler to ignore + messages after the receiver decides it's no longer interested + in them. */ + OS2_destroy_message (message); + else + { + tqueue_t * tqueue = (QID_TQUEUE (twin)); + if (tqueue == 0) + OS2_logic_error ("Write to unopened QID."); + (MSG_SENDER (message)) = twin; + write_tqueue (tqueue, message); + } } msg_t * @@ -316,25 +335,6 @@ OS2_message_availablep (qid_t qid, int blockp) } } -int -OS2_tqueue_select (tqueue_t * tqueue, int blockp) -{ - while (1) - { - msg_t * message = (read_tqueue (tqueue, blockp)); - if ((TQUEUE_TYPE (tqueue)) == tqt_scm) - { - process_interrupt_messages (); - if (pending_interrupts_p ()) - return (-2); - } - if (message != 0) - return (MSG_SENDER (message)); - if (!blockp) - return (-1); - } -} - msg_t * OS2_wait_for_message (qid_t qid, msg_type_t reply_type) { @@ -365,14 +365,13 @@ write_subqueue (msg_t * message) return; } { - msg_list_t * tail = (QID_SUBQUEUE_TAIL (qid)); - msg_list_t * elt = (OS_malloc (sizeof (struct msg_list_s))); + msg_list_t * elt = (OS_malloc (sizeof (msg_list_t))); (elt -> message) = message; (elt -> next) = 0; - if (tail == 0) + if ((QID_SUBQUEUE_HEAD (qid)) == 0) (QID_SUBQUEUE_HEAD (qid)) = elt; else - (tail -> next) = elt; + ((QID_SUBQUEUE_TAIL (qid)) -> next) = elt; (QID_SUBQUEUE_TAIL (qid)) = elt; } } @@ -386,18 +385,47 @@ read_subqueue (qid_t qid) { msg_t * message = (head -> message); (QID_SUBQUEUE_HEAD (qid)) = (head -> next); - if ((head -> next) == 0) - (QID_SUBQUEUE_TAIL (qid)) = 0; OS_free (head); return (message); } } +void +OS2_unread_message (qid_t qid, msg_t * message) +{ + msg_list_t * head = (QID_SUBQUEUE_HEAD (qid)); + msg_list_t * elt = (OS_malloc (sizeof (msg_list_t))); + (elt -> message) = message; + (elt -> next) = head; + (QID_SUBQUEUE_HEAD (qid)) = elt; + if (head == 0) + (QID_SUBQUEUE_TAIL (qid)) = elt; +} + static int subqueue_emptyp (qid_t qid) { return ((QID_SUBQUEUE_HEAD (qid)) == 0); } + +int +OS2_tqueue_select (tqueue_t * tqueue, int blockp) +{ + while (1) + { + msg_t * message = (read_tqueue (tqueue, blockp)); + if ((TQUEUE_TYPE (tqueue)) == tqt_scm) + { + process_interrupt_messages (); + if (pending_interrupts_p ()) + return (-2); + } + if (message != 0) + return (MSG_SENDER (message)); + if (!blockp) + return (-1); + } +} static msg_t * read_tqueue (tqueue_t * tqueue, int blockp) @@ -433,10 +461,14 @@ write_tqueue (tqueue_t * tqueue, msg_t * message) typedef struct { tqueue_type_t type; - HQUEUE queue; /* queue */ - HEV event; /* associated event semaphore */ + msg_list_t * head; /* queue */ + msg_list_t * tail; + HMTX mutex; /* mutex semaphore */ + HEV event; /* event semaphore */ } std_tqueue_t; -#define STD_TQUEUE_QUEUE(q) (((std_tqueue_t *) (q)) -> queue) +#define STD_TQUEUE_HEAD(q) (((std_tqueue_t *) (q)) -> head) +#define STD_TQUEUE_TAIL(q) (((std_tqueue_t *) (q)) -> tail) +#define STD_TQUEUE_MUTEX(q) (((std_tqueue_t *) (q)) -> mutex) #define STD_TQUEUE_EVENT(q) (((std_tqueue_t *) (q)) -> event) tqueue_t * @@ -444,7 +476,9 @@ OS2_make_std_tqueue (void) { tqueue_t * tqueue = (OS_malloc (sizeof (std_tqueue_t))); (TQUEUE_TYPE (tqueue)) = tqt_std; - (STD_TQUEUE_QUEUE (tqueue)) = (OS2_create_queue (QUE_FIFO)); + (STD_TQUEUE_HEAD (tqueue)) = 0; + (STD_TQUEUE_TAIL (tqueue)) = 0; + (STD_TQUEUE_MUTEX (tqueue)) = (OS2_create_mutex_semaphore (0, 0)); (STD_TQUEUE_EVENT (tqueue)) = (OS2_create_event_semaphore (0, 0)); return (tqueue); } @@ -452,43 +486,65 @@ OS2_make_std_tqueue (void) void OS2_close_std_tqueue (tqueue_t * tqueue) { - OS2_close_queue (STD_TQUEUE_QUEUE (tqueue)); OS2_close_event_semaphore (STD_TQUEUE_EVENT (tqueue)); + OS2_close_event_semaphore (STD_TQUEUE_MUTEX (tqueue)); + while ((STD_TQUEUE_HEAD (tqueue)) != 0) + { + msg_list_t * this = (STD_TQUEUE_HEAD (tqueue)); + (STD_TQUEUE_HEAD (tqueue)) = (this -> next); + OS2_destroy_message (this -> message); + OS_free (this); + } OS_free (tqueue); } static msg_t * read_std_tqueue (tqueue_t * tqueue, int blockp) { - ULONG type; - ULONG length; - PVOID data; - msg_t * message; - const char * s = "Non-message read from message queue."; - - if (!OS2_read_queue ((STD_TQUEUE_QUEUE (tqueue)), - (&type), - (&length), - (&data), - (blockp ? NULLHANDLE : (STD_TQUEUE_EVENT (tqueue))))) - return (0); - if (length < (sizeof (msg_t))) - OS2_logic_error (s); - message = ((msg_t *) data); - if ((type != 0) || (length != (MSG_LENGTH (message)))) - OS2_logic_error (s); - write_subqueue (message); - return (message); + while (1) + { + OS2_request_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue)); + if ((STD_TQUEUE_HEAD (tqueue)) != 0) + { + msg_list_t * element = (STD_TQUEUE_HEAD (tqueue)); + (STD_TQUEUE_HEAD (tqueue)) = (element -> next); + /* This prevents the 16 bit counter inside the event + semaphore from overflowing, in the unlikely situation + that the semaphore is not waited on for a long period. */ + (void) OS2_reset_event_semaphore (STD_TQUEUE_EVENT (tqueue)); + OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue)); + { + msg_t * message = (element -> message); + OS_free (element); + write_subqueue (message); + return (message); + } + } + if (!blockp) + { + OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue)); + return (0); + } + (void) OS2_reset_event_semaphore (STD_TQUEUE_EVENT (tqueue)); + OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue)); + (void) OS2_wait_event_semaphore ((STD_TQUEUE_EVENT (tqueue)), 1); + } } static void write_std_tqueue (tqueue_t * tqueue, msg_t * message) { - OS2_write_queue ((STD_TQUEUE_QUEUE (tqueue)), - 0, - (MSG_LENGTH (message)), - ((PVOID) message), - 0); + msg_list_t * element = (OS_malloc (sizeof (msg_list_t))); + (element -> message) = message; + (element -> next) = 0; + OS2_request_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue)); + if ((STD_TQUEUE_HEAD (tqueue)) == 0) + (STD_TQUEUE_HEAD (tqueue)) = element; + else + ((STD_TQUEUE_TAIL (tqueue)) -> next) = element; + (STD_TQUEUE_TAIL (tqueue)) = element; + (void) OS2_post_event_semaphore (STD_TQUEUE_EVENT (tqueue)); + OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue)); } static tqueue_t * -- 2.25.1