Ajout de Thread.wait_pid, wait_timed_read, wait_timed_write.

Plus de fonctions dans threadIO.


git-svn-id: http://caml.inria.fr/svn/ocaml/trunk@766 f963ae5c-01c2-4b8c-9fe0-0dff7051ff02
master
Xavier Leroy 1996-04-29 13:21:09 +00:00
parent 4864fecc59
commit e3659d5b1b
6 changed files with 75 additions and 37 deletions

View File

@ -16,3 +16,5 @@ thread.cmo: thread.cmi
thread.cmx: thread.cmi
threadIO.cmo: thread.cmi threadIO.cmi
threadIO.cmx: thread.cmx threadIO.cmi
threadUnix.cmo: thread.cmi threadUnix.cmi
threadUnix.cmx: thread.cmx threadUnix.cmi

View File

@ -6,7 +6,8 @@ CFLAGS=-I../../byterun -O $(BYTECCCOMPOPTS)
CAMLC=../../boot/cslrun ../../boot/cslc -I ../../boot -I ../unix
C_OBJS=scheduler.o
CAML_OBJS=thread.cmo threadIO.cmo mutex.cmo condition.cmo event.cmo
CAML_OBJS=thread.cmo threadIO.cmo threadUnix.cmo \
mutex.cmo condition.cmo event.cmo
all: libthreads.a threads.cma

View File

@ -75,7 +75,6 @@ typedef struct thread_struct * thread_t;
#define RESUMED_IO Val_int(1)
#define RESUMED_DELAY Val_int(2)
#define RESUMED_JOIN Val_int(3)
#define RESUMED_WAIT Val_int(4)
#define NO_FD Val_int(0)
#define NO_DELAY Val_unit
@ -215,6 +214,7 @@ static double timeofday()
#define FOREACH_THREAD(x) x = curr_thread; do { x = x->next;
#define END_FOREACH(x) } while (x != curr_thread)
static value alloc_process_status();
static value schedule_thread()
{
@ -241,15 +241,15 @@ try_again:
need_wait = 0;
FOREACH_THREAD(th)
if (th->status & BLOCKED_READ) {
if (th->status & (BLOCKED_READ - 1)) {
FD_SET(Int_val(th->fd), &readfds);
need_select = 1;
}
if (th->status & BLOCKED_WRITE) {
if (th->status & (BLOCKED_WRITE - 1)) {
FD_SET(Int_val(th->fd), &writefds);
need_select = 1;
}
if (th->status & BLOCKED_DELAY) {
if (th->status & (BLOCKED_DELAY - 1)) {
double th_delay = Double_val(th->delay);
if (now < 0.0) now = timeofday();
if (th_delay < now) {
@ -260,19 +260,20 @@ try_again:
if (th_delay < delay) delay = th_delay;
}
}
if (th->status & BLOCKED_JOIN) {
if (th->status & (BLOCKED_JOIN - 1)) {
if (((thread_t)(th->joining))->status == KILLED) {
th->status = RUNNABLE;
Assign(th->joining, NO_JOINING);
th->retval = RESUMED_JOIN;
}
}
if (th->status & BLOCKED_WAIT) {
int status;
if (waitpid(Int_val(th->waitpid), &status, WNOHANG) > 0) {
if (th->status & (BLOCKED_WAIT - 1)) {
int status, pid;
pid = waitpid(Int_val(th->waitpid), &status, WNOHANG);
if (pid > 0) {
th->status = RUNNABLE;
th->waitpid = NO_WAITPID;
th->retval = RESUMED_WAIT;
Assign(th->retval, alloc_process_status(pid, status));
} else {
need_wait = 1;
}
@ -315,27 +316,23 @@ try_again:
/* Some descriptors are ready.
Mark the corresponding threads runnable. */
FOREACH_THREAD(th)
switch (th->status) {
case BLOCKED_READ:
if (FD_ISSET(Int_val(th->fd), &readfds)) {
/* Wake up only one thread per fd. */
FD_CLR(Int_val(th->fd), &readfds);
th->status = RUNNABLE;
th->fd = NO_FD;
th->retval = RESUMED_IO;
if (run_thread == NULL) run_thread = th; /* Found one. */
}
break;
case BLOCKED_WRITE:
if (FD_ISSET(Int_val(th->fd), &writefds)) {
/* Wake up only one thread per fd. */
FD_CLR(Int_val(th->fd), &writefds);
th->status = RUNNABLE;
th->fd = NO_FD;
th->retval = RESUMED_IO;
if (run_thread == NULL) run_thread = th; /* Found one. */
}
break;
if (th->status & (BLOCKED_READ - 1)
&& FD_ISSET(Int_val(th->fd), &readfds)) {
/* Wake up only one thread per fd. */
FD_CLR(Int_val(th->fd), &readfds);
th->status = RUNNABLE;
th->fd = NO_FD;
th->retval = RESUMED_IO;
if (run_thread == NULL) run_thread = th; /* Found one. */
}
if (th->status & (BLOCKED_WRITE - 1)
&& FD_ISSET(Int_val(th->fd), &writefds)) {
/* Wake up only one thread per fd. */
FD_CLR(Int_val(th->fd), &writefds);
th->status = RUNNABLE;
th->fd = NO_FD;
th->retval = RESUMED_IO;
if (run_thread == NULL) run_thread = th; /* Found one. */
}
END_FOREACH(th);
}
@ -481,6 +478,7 @@ value thread_wakeup(thread) /* ML */
case SUSPENDED:
th->status = RUNNABLE;
th->retval = RESUMED_WAKEUP;
break;
case KILLED:
failwith("Thread.wakeup: killed thread");
default:
@ -525,3 +523,38 @@ value thread_kill(thread) /* ML */
return retval;
}
/* Auxiliary function for allocating the result of a waitpid() call */
#ifndef WIFEXITED
#define WIFEXITED(status) ((status) & 0xFF == 0)
#define WEXITSTATUS(status) (((status) >> 8) & 0xFF)
#define WIFSTOPPED(status) ((status) & 0xFF == 0xFF)
#define WSTOPSIG(status) (((status) >> 8) & 0xFF)
#define WTERMSIG(status) ((status) & 0x3F)
#endif
static value alloc_process_status(pid, status)
int pid, status;
{
value st, res;
Push_roots(r, 1);
if (WIFEXITED(status)) {
st = alloc(1, 0);
Field(st, 0) = Val_int(WEXITSTATUS(status));
}
else if (WIFSTOPPED(status)) {
st = alloc(1, 2);
Field(st, 0) = Val_int(WSTOPSIG(status));
}
else {
st = alloc(1, 1);
Field(st, 0) = Val_int(WTERMSIG(status));
}
r[0] = st;
res = alloc_tuple(2);
Field(res, 0) = Val_int(pid);
Field(res, 1) = r[0];
Pop_roots();
return res;
}

View File

@ -22,7 +22,7 @@ type resumption_status =
| Resumed_io
| Resumed_delay
| Resumed_join
| Resumed_wait
| Resumed_wait of int * Unix.process_status
(* It is mucho important that the primitives that reschedule are called
through an ML function call, not directly. That's because when such a
@ -46,7 +46,8 @@ external thread_wait_timed_write
= "thread_wait_timed_write"
external thread_join : t -> unit = "thread_join"
external thread_delay : float -> unit = "thread_delay"
external thread_wait_pid : int -> unit = "thread_wait_pid"
external thread_wait_pid : int -> int * Unix.process_status
= "thread_wait_pid"
external thread_wakeup : t -> unit = "thread_wakeup"
external thread_self : unit -> t = "thread_self"
external thread_kill : t -> unit = "thread_kill"

View File

@ -61,13 +61,14 @@ val wait_timed_write : Unix.file_descr -> float -> bool
the amount of time given as second argument (in seconds).
Return [true] if the file descriptor is ready for input/output
and [false] if the timeout expired. *)
val wait_pid : int -> unit
val wait_pid : int -> int * Unix.process_status
(* [wait_pid p] suspends the execution of the calling thread
until the Unix process specified by the process identifier [p]
terminates. A pid [p] of [-1] means wait for any child.
A pid of [0] means wait for any child in the same process group
as the current process. Negative pid arguments represent
process groups. *)
process groups. Returns the pid of the child caught and
its termination status, as per [Unix.wait]. *)
(*--*)

View File

@ -13,9 +13,9 @@
(* Module [ThreadIO]: thread-compatible input-output operations *)
(* This module reimplements some of the input functions from [Pervasives]
(* This module reimplements some of the functions from [Pervasives]
and [Lexing] so that they only block the calling thread, not all threads
in the program, if data is not immediately available on the input.
in the program, if input or output is not immediately possible.
See the documentation of the [Pervasives] and [Lexing] modules for
precise descriptions of the functions below. *)