diff --git a/otherlibs/threads/.depend b/otherlibs/threads/.depend index 803c7d4bd..e5018a01b 100644 --- a/otherlibs/threads/.depend +++ b/otherlibs/threads/.depend @@ -16,3 +16,5 @@ thread.cmo: thread.cmi thread.cmx: thread.cmi threadIO.cmo: thread.cmi threadIO.cmi threadIO.cmx: thread.cmx threadIO.cmi +threadUnix.cmo: thread.cmi threadUnix.cmi +threadUnix.cmx: thread.cmx threadUnix.cmi diff --git a/otherlibs/threads/Makefile b/otherlibs/threads/Makefile index c0fad43ec..7d4026eaf 100644 --- a/otherlibs/threads/Makefile +++ b/otherlibs/threads/Makefile @@ -6,7 +6,8 @@ CFLAGS=-I../../byterun -O $(BYTECCCOMPOPTS) CAMLC=../../boot/cslrun ../../boot/cslc -I ../../boot -I ../unix C_OBJS=scheduler.o -CAML_OBJS=thread.cmo threadIO.cmo mutex.cmo condition.cmo event.cmo +CAML_OBJS=thread.cmo threadIO.cmo threadUnix.cmo \ + mutex.cmo condition.cmo event.cmo all: libthreads.a threads.cma diff --git a/otherlibs/threads/scheduler.c b/otherlibs/threads/scheduler.c index 9269d42bf..ba01f9137 100644 --- a/otherlibs/threads/scheduler.c +++ b/otherlibs/threads/scheduler.c @@ -75,7 +75,6 @@ typedef struct thread_struct * thread_t; #define RESUMED_IO Val_int(1) #define RESUMED_DELAY Val_int(2) #define RESUMED_JOIN Val_int(3) -#define RESUMED_WAIT Val_int(4) #define NO_FD Val_int(0) #define NO_DELAY Val_unit @@ -215,6 +214,7 @@ static double timeofday() #define FOREACH_THREAD(x) x = curr_thread; do { x = x->next; #define END_FOREACH(x) } while (x != curr_thread) +static value alloc_process_status(); static value schedule_thread() { @@ -241,15 +241,15 @@ try_again: need_wait = 0; FOREACH_THREAD(th) - if (th->status & BLOCKED_READ) { + if (th->status & (BLOCKED_READ - 1)) { FD_SET(Int_val(th->fd), &readfds); need_select = 1; } - if (th->status & BLOCKED_WRITE) { + if (th->status & (BLOCKED_WRITE - 1)) { FD_SET(Int_val(th->fd), &writefds); need_select = 1; } - if (th->status & BLOCKED_DELAY) { + if (th->status & (BLOCKED_DELAY - 1)) { double th_delay = Double_val(th->delay); if (now < 0.0) now = timeofday(); if (th_delay < now) { @@ -260,19 +260,20 @@ try_again: if (th_delay < delay) delay = th_delay; } } - if (th->status & BLOCKED_JOIN) { + if (th->status & (BLOCKED_JOIN - 1)) { if (((thread_t)(th->joining))->status == KILLED) { th->status = RUNNABLE; Assign(th->joining, NO_JOINING); th->retval = RESUMED_JOIN; } } - if (th->status & BLOCKED_WAIT) { - int status; - if (waitpid(Int_val(th->waitpid), &status, WNOHANG) > 0) { + if (th->status & (BLOCKED_WAIT - 1)) { + int status, pid; + pid = waitpid(Int_val(th->waitpid), &status, WNOHANG); + if (pid > 0) { th->status = RUNNABLE; th->waitpid = NO_WAITPID; - th->retval = RESUMED_WAIT; + Assign(th->retval, alloc_process_status(pid, status)); } else { need_wait = 1; } @@ -315,27 +316,23 @@ try_again: /* Some descriptors are ready. Mark the corresponding threads runnable. */ FOREACH_THREAD(th) - switch (th->status) { - case BLOCKED_READ: - if (FD_ISSET(Int_val(th->fd), &readfds)) { - /* Wake up only one thread per fd. */ - FD_CLR(Int_val(th->fd), &readfds); - th->status = RUNNABLE; - th->fd = NO_FD; - th->retval = RESUMED_IO; - if (run_thread == NULL) run_thread = th; /* Found one. */ - } - break; - case BLOCKED_WRITE: - if (FD_ISSET(Int_val(th->fd), &writefds)) { - /* Wake up only one thread per fd. */ - FD_CLR(Int_val(th->fd), &writefds); - th->status = RUNNABLE; - th->fd = NO_FD; - th->retval = RESUMED_IO; - if (run_thread == NULL) run_thread = th; /* Found one. */ - } - break; + if (th->status & (BLOCKED_READ - 1) + && FD_ISSET(Int_val(th->fd), &readfds)) { + /* Wake up only one thread per fd. */ + FD_CLR(Int_val(th->fd), &readfds); + th->status = RUNNABLE; + th->fd = NO_FD; + th->retval = RESUMED_IO; + if (run_thread == NULL) run_thread = th; /* Found one. */ + } + if (th->status & (BLOCKED_WRITE - 1) + && FD_ISSET(Int_val(th->fd), &writefds)) { + /* Wake up only one thread per fd. */ + FD_CLR(Int_val(th->fd), &writefds); + th->status = RUNNABLE; + th->fd = NO_FD; + th->retval = RESUMED_IO; + if (run_thread == NULL) run_thread = th; /* Found one. */ } END_FOREACH(th); } @@ -481,6 +478,7 @@ value thread_wakeup(thread) /* ML */ case SUSPENDED: th->status = RUNNABLE; th->retval = RESUMED_WAKEUP; + break; case KILLED: failwith("Thread.wakeup: killed thread"); default: @@ -525,3 +523,38 @@ value thread_kill(thread) /* ML */ return retval; } +/* Auxiliary function for allocating the result of a waitpid() call */ + +#ifndef WIFEXITED +#define WIFEXITED(status) ((status) & 0xFF == 0) +#define WEXITSTATUS(status) (((status) >> 8) & 0xFF) +#define WIFSTOPPED(status) ((status) & 0xFF == 0xFF) +#define WSTOPSIG(status) (((status) >> 8) & 0xFF) +#define WTERMSIG(status) ((status) & 0x3F) +#endif + +static value alloc_process_status(pid, status) + int pid, status; +{ + value st, res; + Push_roots(r, 1); + + if (WIFEXITED(status)) { + st = alloc(1, 0); + Field(st, 0) = Val_int(WEXITSTATUS(status)); + } + else if (WIFSTOPPED(status)) { + st = alloc(1, 2); + Field(st, 0) = Val_int(WSTOPSIG(status)); + } + else { + st = alloc(1, 1); + Field(st, 0) = Val_int(WTERMSIG(status)); + } + r[0] = st; + res = alloc_tuple(2); + Field(res, 0) = Val_int(pid); + Field(res, 1) = r[0]; + Pop_roots(); + return res; +} diff --git a/otherlibs/threads/thread.ml b/otherlibs/threads/thread.ml index c8606130b..eef49bc86 100644 --- a/otherlibs/threads/thread.ml +++ b/otherlibs/threads/thread.ml @@ -22,7 +22,7 @@ type resumption_status = | Resumed_io | Resumed_delay | Resumed_join - | Resumed_wait + | Resumed_wait of int * Unix.process_status (* It is mucho important that the primitives that reschedule are called through an ML function call, not directly. That's because when such a @@ -46,7 +46,8 @@ external thread_wait_timed_write = "thread_wait_timed_write" external thread_join : t -> unit = "thread_join" external thread_delay : float -> unit = "thread_delay" -external thread_wait_pid : int -> unit = "thread_wait_pid" +external thread_wait_pid : int -> int * Unix.process_status + = "thread_wait_pid" external thread_wakeup : t -> unit = "thread_wakeup" external thread_self : unit -> t = "thread_self" external thread_kill : t -> unit = "thread_kill" diff --git a/otherlibs/threads/thread.mli b/otherlibs/threads/thread.mli index 6207e41e4..bf2bf32c5 100644 --- a/otherlibs/threads/thread.mli +++ b/otherlibs/threads/thread.mli @@ -61,13 +61,14 @@ val wait_timed_write : Unix.file_descr -> float -> bool the amount of time given as second argument (in seconds). Return [true] if the file descriptor is ready for input/output and [false] if the timeout expired. *) -val wait_pid : int -> unit +val wait_pid : int -> int * Unix.process_status (* [wait_pid p] suspends the execution of the calling thread until the Unix process specified by the process identifier [p] terminates. A pid [p] of [-1] means wait for any child. A pid of [0] means wait for any child in the same process group as the current process. Negative pid arguments represent - process groups. *) + process groups. Returns the pid of the child caught and + its termination status, as per [Unix.wait]. *) (*--*) diff --git a/otherlibs/threads/threadIO.mli b/otherlibs/threads/threadIO.mli index ba4f71f76..b1ce30104 100644 --- a/otherlibs/threads/threadIO.mli +++ b/otherlibs/threads/threadIO.mli @@ -13,9 +13,9 @@ (* Module [ThreadIO]: thread-compatible input-output operations *) -(* This module reimplements some of the input functions from [Pervasives] +(* This module reimplements some of the functions from [Pervasives] and [Lexing] so that they only block the calling thread, not all threads - in the program, if data is not immediately available on the input. + in the program, if input or output is not immediately possible. See the documentation of the [Pervasives] and [Lexing] modules for precise descriptions of the functions below. *)