/* -*-C-*-
-$Id: os2cthrd.c,v 1.8 1996/05/09 20:21:20 cph Exp $
+$Id: os2cthrd.c,v 1.9 1997/05/11 06:35:29 cph Exp $
-Copyright (c) 1994-96 Massachusetts Institute of Technology
+Copyright (c) 1994-97 Massachusetts Institute of Technology
This material was developed by the Scheme project at the Massachusetts
Institute of Technology, Department of Electrical Engineering and
static void run_channel_thread (void *);
static void start_readahead_thread (channel_context_t *);
static void send_readahead_ack (qid_t, enum readahead_ack_action);
-static msg_list_t * new_list (void);
static msg_t * new_message (void);
\f
typedef struct
return (action);
}
\f
-readahead_buffer_t *
-OS2_make_readahead_buffer (void)
-{
- readahead_buffer_t * buffer = (OS_malloc (sizeof (readahead_buffer_t)));
- (buffer -> head) = 0;
- (buffer -> tail) = 0;
- return (buffer);
-}
-
-int
-OS2_readahead_buffer_emptyp (readahead_buffer_t * buffer)
-{
- return ((buffer -> head) == 0);
-}
-
void
-OS2_readahead_buffer_insert (readahead_buffer_t * buffer, char c)
+OS2_readahead_buffer_insert (void * buffer, char c)
{
- if ((buffer -> head) == 0)
- {
- msg_list_t * tail = (new_list ());
- (buffer -> head) = tail;
- (buffer -> tail) = tail;
- }
- else if ((SM_READAHEAD_SIZE ((buffer -> tail) -> message))
- == SM_READAHEAD_MAX)
+ msg_t * last = (OS2_msg_fifo_last (buffer));
+ if ((last != 0) && ((SM_READAHEAD_SIZE (last)) < SM_READAHEAD_MAX))
+ ((SM_READAHEAD_DATA (last)) [(SM_READAHEAD_SIZE (last))++]) = c;
+ else
{
- msg_list_t * tail = (new_list ());
- ((buffer -> tail) -> next) = tail;
- (buffer -> tail) = tail;
+ msg_t * message = (new_message ());
+ ((SM_READAHEAD_DATA (message)) [(SM_READAHEAD_SIZE (message))++]) = c;
+ OS2_msg_fifo_insert (buffer, message);
}
- {
- msg_t * message = ((buffer -> tail) -> message);
- ((SM_READAHEAD_DATA (message)) [(SM_READAHEAD_SIZE (message))++]) = c;
- }
-}
-
-static msg_list_t *
-new_list (void)
-{
- msg_list_t * cell = (OS_malloc (sizeof (msg_list_t)));
- (cell -> message) = (new_message ());
- (cell -> next) = 0;
- return (cell);
}
static msg_t *
(SM_READAHEAD_SIZE (message)) = 0;
return (message);
}
-\f
+
char
-OS2_readahead_buffer_rubout (readahead_buffer_t * buffer)
+OS2_readahead_buffer_rubout (void * buffer)
{
- if ((buffer -> head) == 0)
+ msg_t * message = (OS2_msg_fifo_last (buffer));
+ if (message == 0)
OS2_logic_error ("Rubout from empty readahead buffer.");
{
- msg_t * message = ((buffer -> tail) -> message);
char c = ((SM_READAHEAD_DATA (message)) [--(SM_READAHEAD_SIZE (message))]);
if ((SM_READAHEAD_SIZE (message)) == 0)
{
- msg_list_t * tail = (buffer -> tail);
- msg_list_t * prev = (buffer -> head);
- if (prev == tail)
- (buffer -> head) = 0;
- else
- {
- while ((prev -> next) != tail)
- prev = (prev -> next);
- (prev -> next) = 0;
- (buffer -> tail) = prev;
- }
- OS_free (tail);
+ OS2_msg_fifo_remove_last (buffer);
OS2_destroy_message (message);
}
return (c);
}
msg_t *
-OS2_readahead_buffer_read (readahead_buffer_t * buffer)
-{
- msg_list_t * head = (buffer -> head);
- if (head == 0)
- return (new_message ());
- else
- {
- msg_t * message = (head -> message);
- (buffer -> head) = (head -> next);
- OS_free (head);
- return (message);
- }
-}
-
-msg_list_t *
-OS2_readahead_buffer_read_all (readahead_buffer_t * buffer)
+OS2_readahead_buffer_read (void * buffer)
{
- msg_list_t * head = (buffer -> head);
- (buffer -> head) = 0;
- return (head);
+ msg_t * message = (OS2_msg_fifo_remove (buffer));
+ return ((message == 0) ? (new_message ()) : message);
}
/* -*-C-*-
-$Id: os2msg.c,v 1.10 1995/11/03 01:22:09 cph Exp $
+$Id: os2msg.c,v 1.11 1997/05/11 06:35:05 cph Exp $
-Copyright (c) 1994-95 Massachusetts Institute of Technology
+Copyright (c) 1994-97 Massachusetts Institute of Technology
This material was developed by the Scheme project at the Massachusetts
Institute of Technology, Department of Electrical Engineering and
qid_t twin; /* other end of connection */
qid_receive_filter_t filter; /* filter for received messages */
tqueue_t * tqueue; /* thread queue for reception */
- msg_list_t * subqueue_head; /* head of receiving subqueue */
- msg_list_t * subqueue_tail; /* tail of receiving subqueue */
+ void * subqueue; /* receiving subqueue */
} iqid_t;
static iqid_t queue_array [QID_MAX + 1];
#define QID_ALLOCATEDP(q) ((_QID (q)) . allocatedp)
#define QID_TWIN(q) ((_QID (q)) . twin)
#define QID_TQUEUE(q) ((_QID (q)) . tqueue)
-#define QID_SUBQUEUE_HEAD(q) ((_QID (q)) . subqueue_head)
-#define QID_SUBQUEUE_TAIL(q) ((_QID (q)) . subqueue_tail)
+#define QID_SUBQUEUE(q) ((_QID (q)) . subqueue)
#define QID_FILTER(q) ((_QID (q)) . filter)
#define MSG_QUEUE_TYPE(m) 0
(QID_ALLOCATEDP (qid)) = 0;
(QID_TQUEUE (qid)) = 0;
(QID_TWIN (qid)) = QID_NONE;
- (QID_SUBQUEUE_HEAD (qid)) = 0;
- (QID_SUBQUEUE_TAIL (qid)) = 0;
+ (QID_SUBQUEUE (qid)) = 0;
if (qid == QID_MAX)
break;
qid += 1;
(QID_ALLOCATEDP (qid)) = 1;
(QID_TQUEUE (qid)) = 0;
(QID_TWIN (qid)) = QID_NONE;
- (QID_SUBQUEUE_HEAD (qid)) = 0;
- (QID_SUBQUEUE_TAIL (qid)) = 0;
+ (QID_SUBQUEUE (qid)) = (OS2_create_msg_fifo ());
OS2_release_mutex_semaphore (qid_lock);
return (qid);
}
void
OS2_close_qid (qid_t qid)
{
- while ((QID_SUBQUEUE_HEAD (qid)) != 0)
+ while (1)
{
- msg_list_t * this = (QID_SUBQUEUE_HEAD (qid));
- (QID_SUBQUEUE_HEAD (qid)) = (this -> next);
- OS2_destroy_message (this -> message);
- OS_free (this);
+ msg_t * msg = (OS2_msg_fifo_remove (QID_SUBQUEUE (qid)));
+ if (msg == 0)
+ break;
+ OS2_destroy_message (msg);
}
+ OS2_destroy_msg_fifo (QID_SUBQUEUE (qid));
+ (QID_SUBQUEUE (qid)) = 0;
OS2_request_mutex_semaphore (qid_lock);
{
qid_t twin = (QID_TWIN (qid));
generated by the PM thread. So it's just simpler to ignore
messages after the receiver decides it's no longer interested
in them. */
- OS2_destroy_message (message);
+ {
+ OS2_release_mutex_semaphore (qid_lock);
+ OS2_destroy_message (message);
+ }
else
{
tqueue_t * tqueue = (QID_TQUEUE (twin));
+ OS2_release_mutex_semaphore (qid_lock);
if (tqueue == 0)
OS2_logic_error ("Write to unopened QID.");
(MSG_SENDER (message)) = twin;
write_tqueue (tqueue, message);
}
- OS2_release_mutex_semaphore (qid_lock);
}
msg_t *
if (message == 0)
return;
}
- {
- msg_list_t * elt = (OS_malloc (sizeof (msg_list_t)));
- (elt -> message) = message;
- (elt -> next) = 0;
- if ((QID_SUBQUEUE_HEAD (qid)) == 0)
- (QID_SUBQUEUE_HEAD (qid)) = elt;
- else
- ((QID_SUBQUEUE_TAIL (qid)) -> next) = elt;
- (QID_SUBQUEUE_TAIL (qid)) = elt;
- }
+ OS2_msg_fifo_insert ((QID_SUBQUEUE (qid)), message);
}
static msg_t *
read_subqueue (qid_t qid)
{
- msg_list_t * head = (QID_SUBQUEUE_HEAD (qid));
- if (head == 0)
- return (0);
- {
- msg_t * message = (head -> message);
- (QID_SUBQUEUE_HEAD (qid)) = (head -> next);
- OS_free (head);
- return (message);
- }
+ return (OS2_msg_fifo_remove (QID_SUBQUEUE (qid)));
}
void
OS2_unread_message (qid_t qid, msg_t * message)
{
- msg_list_t * head = (QID_SUBQUEUE_HEAD (qid));
- msg_list_t * elt = (OS_malloc (sizeof (msg_list_t)));
- (elt -> message) = message;
- (elt -> next) = head;
- (QID_SUBQUEUE_HEAD (qid)) = elt;
- if (head == 0)
- (QID_SUBQUEUE_TAIL (qid)) = elt;
+ OS2_msg_fifo_insert_front ((QID_SUBQUEUE (qid)), message);
}
static int
subqueue_emptyp (qid_t qid)
{
- return ((QID_SUBQUEUE_HEAD (qid)) == 0);
+ return (OS2_msg_fifo_emptyp (QID_SUBQUEUE (qid)));
}
\f
int
}
}
\f
+/* Uncomment the following definition in order to use OS/2 queues.
+
+ There seems to be some kind of bug when using them, which manifests
+ itself as an access violation while reading from a socket. I don't
+ understand this and have been unable to debug it successfully.
+
+ Since my intention was to find a way to speed up the
+ message-handling mechanism, and there is no noticeable improvement,
+ it probably isn't worth much more effort to find the bug. */
+
+/* #define USE_OS2_QUEUES */
+#ifdef USE_OS2_QUEUES
+
+typedef struct
+{
+ tqueue_type_t type;
+ HQUEUE fifo;
+ HEV event; /* event semaphore */
+} std_tqueue_t;
+#define STD_TQUEUE_FIFO(q) (((std_tqueue_t *) (q)) -> fifo)
+#define STD_TQUEUE_EVENT(q) (((std_tqueue_t *) (q)) -> event)
+
+tqueue_t *
+OS2_make_std_tqueue (void)
+{
+ tqueue_t * tqueue = (OS_malloc (sizeof (std_tqueue_t)));
+ (TQUEUE_TYPE (tqueue)) = tqt_std;
+ (STD_TQUEUE_FIFO (tqueue)) = (OS2_create_queue (QUE_FIFO));
+ (STD_TQUEUE_EVENT (tqueue)) = (OS2_create_event_semaphore (0, 0));
+ return (tqueue);
+}
+
+static msg_t *
+read_std_tqueue_1 (tqueue_t * tqueue, int blockp)
+{
+ ULONG type;
+ ULONG length;
+ PVOID data;
+ return
+ ((OS2_read_queue ((STD_TQUEUE_FIFO (tqueue)),
+ (&type),
+ (&length),
+ (&data),
+ (blockp ? 0 : (STD_TQUEUE_EVENT (tqueue)))))
+ ? data
+ : 0);
+}
+
+void
+OS2_close_std_tqueue (tqueue_t * tqueue)
+{
+ while (1)
+ {
+ msg_t * msg = (read_std_tqueue_1 (tqueue, 0));
+ if (msg == 0)
+ break;
+ OS2_destroy_message (msg);
+ }
+ OS2_close_queue (STD_TQUEUE_FIFO (tqueue));
+ OS2_close_event_semaphore (STD_TQUEUE_EVENT (tqueue));
+ OS_free (tqueue);
+}
+
+static msg_t *
+read_std_tqueue (tqueue_t * tqueue, int blockp)
+{
+ msg_t * message = (read_std_tqueue_1 (tqueue, blockp));
+ if (message)
+ write_subqueue (message);
+ return (message);
+}
+
+static void
+write_std_tqueue (tqueue_t * tqueue, msg_t * message)
+{
+ OS2_write_queue ((STD_TQUEUE_FIFO (tqueue)), 0, 0, message, 0);
+}
+
+#else /* not USE_OS2_QUEUES */
+
typedef struct
{
tqueue_type_t type;
- msg_list_t * head; /* queue */
- msg_list_t * tail;
+ void * fifo;
+ unsigned int n_blocked; /* # of blocked threads */
HMTX mutex; /* mutex semaphore */
HEV event; /* event semaphore */
} std_tqueue_t;
-#define STD_TQUEUE_HEAD(q) (((std_tqueue_t *) (q)) -> head)
-#define STD_TQUEUE_TAIL(q) (((std_tqueue_t *) (q)) -> tail)
+#define STD_TQUEUE_FIFO(q) (((std_tqueue_t *) (q)) -> fifo)
#define STD_TQUEUE_MUTEX(q) (((std_tqueue_t *) (q)) -> mutex)
#define STD_TQUEUE_EVENT(q) (((std_tqueue_t *) (q)) -> event)
+#define STD_TQUEUE_N_BLOCKED(q) (((std_tqueue_t *) (q)) -> n_blocked)
tqueue_t *
OS2_make_std_tqueue (void)
{
tqueue_t * tqueue = (OS_malloc (sizeof (std_tqueue_t)));
(TQUEUE_TYPE (tqueue)) = tqt_std;
- (STD_TQUEUE_HEAD (tqueue)) = 0;
- (STD_TQUEUE_TAIL (tqueue)) = 0;
+ (STD_TQUEUE_FIFO (tqueue)) = (OS2_create_msg_fifo ());
+ (STD_TQUEUE_N_BLOCKED (tqueue)) = 0;
(STD_TQUEUE_MUTEX (tqueue)) = (OS2_create_mutex_semaphore (0, 0));
(STD_TQUEUE_EVENT (tqueue)) = (OS2_create_event_semaphore (0, 0));
return (tqueue);
{
OS2_close_event_semaphore (STD_TQUEUE_EVENT (tqueue));
OS2_close_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
- while ((STD_TQUEUE_HEAD (tqueue)) != 0)
+ while (1)
{
- msg_list_t * this = (STD_TQUEUE_HEAD (tqueue));
- (STD_TQUEUE_HEAD (tqueue)) = (this -> next);
- OS2_destroy_message (this -> message);
- OS_free (this);
+ msg_t * msg = (OS2_msg_fifo_remove (STD_TQUEUE_FIFO (tqueue)));
+ if (msg == 0)
+ break;
+ OS2_destroy_message (msg);
}
OS_free (tqueue);
}
static msg_t *
read_std_tqueue (tqueue_t * tqueue, int blockp)
{
+ OS2_request_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
while (1)
{
- OS2_request_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
- if ((STD_TQUEUE_HEAD (tqueue)) != 0)
+ msg_t * message = (OS2_msg_fifo_remove (STD_TQUEUE_FIFO (tqueue)));
+ if (message != 0)
{
- msg_list_t * element = (STD_TQUEUE_HEAD (tqueue));
- (STD_TQUEUE_HEAD (tqueue)) = (element -> next);
- /* This prevents the 16 bit counter inside the event
- semaphore from overflowing, in the unlikely situation
- that the semaphore is not waited on for a long period. */
- (void) OS2_reset_event_semaphore (STD_TQUEUE_EVENT (tqueue));
OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
- {
- msg_t * message = (element -> message);
- OS_free (element);
- write_subqueue (message);
- return (message);
- }
+ write_subqueue (message);
+ return (message);
}
if (!blockp)
{
return (0);
}
(void) OS2_reset_event_semaphore (STD_TQUEUE_EVENT (tqueue));
+ (STD_TQUEUE_N_BLOCKED (tqueue)) += 1;
OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
(void) OS2_wait_event_semaphore ((STD_TQUEUE_EVENT (tqueue)), 1);
+ OS2_request_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
+ (STD_TQUEUE_N_BLOCKED (tqueue)) -= 1;
+ /* This prevents the 16 bit counter inside the event
+ semaphore from overflowing. */
+ if ((STD_TQUEUE_N_BLOCKED (tqueue)) == 0)
+ (void) OS2_reset_event_semaphore (STD_TQUEUE_EVENT (tqueue));
/* Don't wait more than once; the caller must be prepared to
call again if a message is required. The reason this is
necessary is that two threads may be waiting on the same
static void
write_std_tqueue (tqueue_t * tqueue, msg_t * message)
{
- msg_list_t * element = (OS_malloc (sizeof (msg_list_t)));
- (element -> message) = message;
- (element -> next) = 0;
OS2_request_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
- if ((STD_TQUEUE_HEAD (tqueue)) == 0)
- (STD_TQUEUE_HEAD (tqueue)) = element;
+ OS2_msg_fifo_insert ((STD_TQUEUE_FIFO (tqueue)), message);
+ if ((STD_TQUEUE_N_BLOCKED (tqueue)) > 0)
+ {
+ (void) OS2_post_event_semaphore (STD_TQUEUE_EVENT (tqueue));
+ OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
+ /* Immediately transfer control to the receiver.
+ This should improve responsiveness of the system. */
+ (void) DosSleep (0);
+ }
else
- ((STD_TQUEUE_TAIL (tqueue)) -> next) = element;
- (STD_TQUEUE_TAIL (tqueue)) = element;
- (void) OS2_post_event_semaphore (STD_TQUEUE_EVENT (tqueue));
- OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
+ OS2_release_mutex_semaphore (STD_TQUEUE_MUTEX (tqueue));
}
+
+#endif /* not USE_OS2_QUEUES */
\f
static tqueue_t *
make_scm_tqueue (void)
OS2_destroy_message (message);
}
}
+\f
+#define BUFFER_MIN_LENGTH 16
+
+typedef struct
+{
+ unsigned int start;
+ unsigned int end;
+ unsigned int count;
+ unsigned int buffer_length;
+ void ** buffer;
+} msg_fifo_t;
+
+void *
+OS2_create_msg_fifo (void)
+{
+ msg_fifo_t * fifo = (OS_malloc (sizeof (msg_fifo_t)));
+ (fifo -> start) = 0;
+ (fifo -> end) = 0;
+ (fifo -> count) = 0;
+ (fifo -> buffer_length) = BUFFER_MIN_LENGTH;
+ (fifo -> buffer)
+ = (OS_malloc ((fifo -> buffer_length) * (sizeof (void *))));
+ return (fifo);
+}
+
+void
+OS2_destroy_msg_fifo (void * fp)
+{
+ OS_free (((msg_fifo_t *) fp) -> buffer);
+ OS_free (fp);
+}
+
+#define MAYBE_GROW_BUFFER(fifo) \
+{ \
+ if ((fifo -> count) == (fifo -> buffer_length)) \
+ msg_fifo_grow (fifo); \
+}
+
+#define MAYBE_SHRINK_BUFFER(fifo) \
+{ \
+ if (((fifo -> buffer_length) > BUFFER_MIN_LENGTH) \
+ && ((fifo -> count) < ((fifo -> buffer_length) / 4))) \
+ msg_fifo_shrink (fifo); \
+}
+
+static void
+msg_fifo_grow (msg_fifo_t * fifo)
+{
+ (fifo -> buffer_length) *= 2;
+ (fifo -> buffer)
+ = (OS_realloc ((fifo -> buffer),
+ ((fifo -> buffer_length) * (sizeof (void *)))));
+ if ((fifo -> start) > 0)
+ {
+ void ** from = (fifo -> buffer);
+ void ** stop = ((fifo -> buffer) + (fifo -> start));
+ void ** to = ((fifo -> buffer) + (fifo -> count));
+ while (from < stop)
+ (*to++) = (*from++);
+ (fifo -> end) += (fifo -> count);
+ }
+}
+
+static void
+msg_fifo_shrink (msg_fifo_t * fifo)
+{
+ if ((fifo -> start) > (fifo -> end))
+ {
+ void ** from = ((fifo -> buffer) + (fifo -> start));
+ void ** stop = ((fifo -> buffer) + (fifo -> buffer_length));
+ void ** to = (from - ((fifo -> buffer_length) / 2));
+ while (from < stop)
+ (*to++) = (*from++);
+ (fifo -> start) -= ((fifo -> buffer_length) / 2);
+ }
+ else if ((fifo -> end) > ((fifo -> buffer_length) / 2))
+ {
+ void ** from = ((fifo -> buffer) + (fifo -> start));
+ void ** stop = ((fifo -> buffer) + (fifo -> end));
+ void ** to = (fifo -> buffer);
+ while (from < stop)
+ (*to++) = (*from++);
+ (fifo -> start) = 0;
+ (fifo -> end) = (fifo -> count);
+ }
+ (fifo -> buffer_length) /= 2;
+ (fifo -> buffer)
+ = (OS_realloc ((fifo -> buffer),
+ ((fifo -> buffer_length) * (sizeof (void *)))));
+}
+
+void
+OS2_msg_fifo_insert (void * fp, void * element)
+{
+ msg_fifo_t * fifo = fp;
+ MAYBE_GROW_BUFFER (fifo);
+ ((fifo -> buffer) [fifo -> end]) = element;
+ (fifo -> end) += 1;
+ (fifo -> count) += 1;
+ if ((fifo -> end) == (fifo -> buffer_length))
+ (fifo -> end) = 0;
+}
+
+void
+OS2_msg_fifo_insert_front (void * fp, void * element)
+{
+ msg_fifo_t * fifo = fp;
+ MAYBE_GROW_BUFFER (fifo);
+ if ((fifo -> start) == 0)
+ (fifo -> start) = (fifo -> buffer_length);
+ (fifo -> start) -= 1;
+ ((fifo -> buffer) [fifo -> start]) = element;
+ (fifo -> count) += 1;
+}
+
+void *
+OS2_msg_fifo_remove (void * fp)
+{
+ msg_fifo_t * fifo = fp;
+ void * element;
+ if ((fifo -> count) == 0)
+ return (0);
+ element = ((fifo -> buffer) [fifo -> start]);
+ (fifo -> start) += 1;
+ (fifo -> count) -= 1;
+ if ((fifo -> count) == 0)
+ {
+ (fifo -> start) = 0;
+ (fifo -> end) = 0;
+ }
+ else if ((fifo -> start) == (fifo -> buffer_length))
+ (fifo -> start) = 0;
+ MAYBE_SHRINK_BUFFER (fifo);
+ return (element);
+}
+
+void *
+OS2_msg_fifo_remove_last (void * fp)
+{
+ msg_fifo_t * fifo = fp;
+ void * element;
+ if ((fifo -> count) == 0)
+ return (0);
+ if ((fifo -> end) == 0)
+ (fifo -> end) = (fifo -> buffer_length);
+ (fifo -> end) -= 1;
+ element = ((fifo -> buffer) [fifo -> end]);
+ (fifo -> count) -= 1;
+ if ((fifo -> count) == 0)
+ {
+ (fifo -> start) = 0;
+ (fifo -> end) = 0;
+ }
+ MAYBE_SHRINK_BUFFER (fifo);
+ return (element);
+}
+
+void **
+OS2_msg_fifo_remove_all (void * fp)
+{
+ msg_fifo_t * fifo = fp;
+ void ** result = (OS_malloc (((fifo -> count) + 1) * (sizeof (void *))));
+ void ** from = ((fifo -> buffer) + (fifo -> start));
+ void ** stop;
+ void ** to = result;
+ if ((fifo -> start) < (fifo -> end))
+ {
+ stop = ((fifo -> buffer) + (fifo -> end));
+ while (from < stop)
+ (*to++) = (*from++);
+ }
+ else if ((fifo -> count) > 0)
+ {
+ stop = ((fifo -> buffer) + (fifo -> buffer_length));
+ while (from < stop)
+ (*to++) = (*from++);
+ from = (fifo -> buffer);
+ stop = ((fifo -> buffer) + (fifo -> end));
+ while (from < stop)
+ (*to++) = (*from++);
+ }
+ (*to) = 0;
+ (fifo -> start) = 0;
+ (fifo -> end) = 0;
+ (fifo -> count) = 0;
+ if ((fifo -> buffer_length) > BUFFER_MIN_LENGTH)
+ {
+ (fifo -> buffer_length) = (fifo -> buffer_length);
+ (fifo -> buffer)
+ = (OS_realloc ((fifo -> buffer),
+ ((fifo -> buffer_length) * (sizeof (void *)))));
+ }
+ return (result);
+}
+
+int
+OS2_msg_fifo_emptyp (void * fp)
+{
+ return ((((msg_fifo_t *) fp) -> count) == 0);
+}
+
+unsigned int
+OS2_msg_fifo_count (void * fp)
+{
+ return (((msg_fifo_t *) fp) -> count);
+}
+
+void *
+OS2_msg_fifo_last (void * fp)
+{
+ msg_fifo_t * fifo = fp;
+ if ((fifo -> count) == 0)
+ return (0);
+ return ((fifo -> buffer)
+ [(((fifo -> end) == 0) ? (fifo -> buffer_length) : (fifo -> end))
+ - 1]);
+}