Introduction de Thread.select, qui subsume wait_{timed,}_{read,write}

git-svn-id: http://caml.inria.fr/svn/ocaml/trunk@1494 f963ae5c-01c2-4b8c-9fe0-0dff7051ff02
master
Xavier Leroy 1997-04-11 13:57:34 +00:00
parent dea956a7d3
commit 8145ec700f
6 changed files with 171 additions and 110 deletions

View File

@ -166,11 +166,11 @@ let stderr = open_descriptor_out 2
(* Non-blocking stuff *)
external thread_wait_read_prim : Unix.file_descr -> unit = "thread_wait_read"
external thread_wait_write_prim : Unix.file_descr -> unit = "thread_wait_write"
external thread_select_prim :
Unix.file_descr list * Unix.file_descr list *
Unix.file_descr list * float -> unit = "thread_select"
let thread_wait_read fd = thread_wait_read_prim fd
let thread_wait_write fd = thread_wait_write_prim fd
let thread_select arg = thread_select_prim arg
external inchan_ready : in_channel -> bool = "thread_inchan_ready"
external outchan_ready : out_channel -> int -> bool = "thread_outchan_ready"
@ -178,9 +178,13 @@ external descr_inchan : in_channel -> Unix.file_descr = "channel_descriptor"
external descr_outchan : out_channel -> Unix.file_descr = "channel_descriptor"
let wait_inchan ic =
if not (inchan_ready ic) then thread_wait_read (descr_inchan ic)
if not (inchan_ready ic) then begin
thread_select([descr_inchan ic], [], [], -1.0); ()
end
let wait_outchan oc len =
if not (outchan_ready oc len) then thread_wait_write (descr_outchan oc)
if not (outchan_ready oc len) then begin
thread_select([], [descr_outchan oc], [], -1.0); ()
end
(* General output functions *)

View File

@ -44,6 +44,7 @@
#endif
#ifndef FD_ISSET
/* Assume old-style BSD 4.2 fd sets */
typedef int fd_set;
#define FD_SETSIZE (sizeof(int) * 8)
#define FD_SET(fd,fds) (*(fds) |= 1 << (fd))
@ -76,7 +77,8 @@ struct thread_struct {
value * sp;
value * trapsp;
value status; /* RUNNABLE, KILLED. etc (see below) */
value fd; /* File descriptor on which this thread is waiting */
value readfds, writefds, exceptfds;
/* Lists of file descriptors on which we're doing select() */
value delay; /* Time until which this thread is blocked */
value joining; /* Thread we're trying to join */
value waitpid; /* PID of process we're waiting for */
@ -88,18 +90,19 @@ typedef struct thread_struct * thread_t;
#define RUNNABLE Val_int(0)
#define KILLED Val_int(1)
#define SUSPENDED Val_int(2)
#define BLOCKED_READ Val_int(4)
#define BLOCKED_WRITE Val_int(8)
#define BLOCKED_DELAY Val_int(16)
#define BLOCKED_JOIN Val_int(32)
#define BLOCKED_WAIT Val_int(64)
#define BLOCKED_IO Val_int(4)
#define BLOCKED_DELAY Val_int(8)
#define BLOCKED_JOIN Val_int(16)
#define BLOCKED_WAIT Val_int(32)
#define RESUMED_WAKEUP Val_int(0)
#define RESUMED_IO Val_int(1)
#define RESUMED_DELAY Val_int(2)
#define RESUMED_JOIN Val_int(3)
#define RESUMED_DELAY Val_int(1)
#define RESUMED_JOIN Val_int(2)
#define NO_FD Val_int(0)
#define TAG_RESUMED_IO 0
#define TAG_RESUMED_WAIT 1
#define NO_FDS Val_unit
#define NO_DELAY Val_unit
#define NO_JOINING Val_unit
#define NO_WAITPID Val_int(0)
@ -154,7 +157,9 @@ value thread_initialize(unit) /* ML */
curr_thread->sp = extern_sp;
curr_thread->trapsp = trapsp;
curr_thread->status = RUNNABLE;
curr_thread->fd = NO_FD;
curr_thread->readfds = NO_FDS;
curr_thread->writefds = NO_FDS;
curr_thread->exceptfds = NO_FDS;
curr_thread->delay = NO_DELAY;
curr_thread->joining = NO_JOINING;
curr_thread->waitpid = NO_WAITPID;
@ -202,7 +207,9 @@ value thread_new(clos) /* ML */
th->sp[0] = Val_unit; /* a dummy environment */
/* The thread is initially runnable */
th->status = RUNNABLE;
th->fd = NO_FD;
th->readfds = NO_FDS;
th->writefds = NO_FDS;
th->exceptfds = NO_FDS;
th->delay = NO_DELAY;
th->joining = NO_JOINING;
th->waitpid = NO_WAITPID;
@ -237,12 +244,15 @@ static double timeofday()
#define FOREACH_THREAD(x) x = curr_thread; do { x = x->next;
#define END_FOREACH(x) } while (x != curr_thread)
static value alloc_process_status();
static void add_fdlist_to_set();
static value inter_fdlist_set();
static value schedule_thread()
{
thread_t run_thread, th;
fd_set readfds, writefds;
fd_set readfds, writefds, exceptfds;
double delay, now;
int need_select, need_wait;
@ -261,18 +271,17 @@ try_again:
See if some join or wait operations succeeded. */
FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_ZERO(&exceptfds);
delay = DELAY_INFTY;
now = -1.0;
need_select = 0;
need_wait = 0;
FOREACH_THREAD(th)
if (th->status & (BLOCKED_READ - 1)) {
FD_SET(Int_val(th->fd), &readfds);
need_select = 1;
}
if (th->status & (BLOCKED_WRITE - 1)) {
FD_SET(Int_val(th->fd), &writefds);
if (th->status & (BLOCKED_IO - 1)) {
add_fdlist_to_set(th->readfds, &readfds);
add_fdlist_to_set(th->writefds, &writefds);
add_fdlist_to_set(th->exceptfds, &exceptfds);
need_select = 1;
}
if (th->status & (BLOCKED_DELAY - 1)) {
@ -281,7 +290,6 @@ try_again:
th_delay = Double_val(th->delay) - now;
if (th_delay <= 0) {
th->status = RUNNABLE;
Assign(th->delay, NO_DELAY);
th->retval = RESUMED_DELAY;
} else {
if (th_delay < delay) delay = th_delay;
@ -290,7 +298,6 @@ try_again:
if (th->status & (BLOCKED_JOIN - 1)) {
if (((thread_t)(th->joining))->status == KILLED) {
th->status = RUNNABLE;
Assign(th->joining, NO_JOINING);
th->retval = RESUMED_JOIN;
}
}
@ -299,7 +306,6 @@ try_again:
pid = waitpid(Int_val(th->waitpid), &status, WNOHANG);
if (pid > 0) {
th->status = RUNNABLE;
th->waitpid = NO_WAITPID;
Assign(th->retval, alloc_process_status(pid, status));
} else {
need_wait = 1;
@ -337,29 +343,27 @@ try_again:
delay_ptr = NULL;
}
enter_blocking_section();
retcode = select(FD_SETSIZE, &readfds, &writefds, NULL, delay_ptr);
retcode = select(FD_SETSIZE, &readfds, &writefds, &exceptfds, delay_ptr);
leave_blocking_section();
if (retcode > 0) {
/* Some descriptors are ready.
Mark the corresponding threads runnable. */
FOREACH_THREAD(th)
if (th->status & (BLOCKED_READ - 1)
&& FD_ISSET(Int_val(th->fd), &readfds)) {
/* Wake up only one thread per fd. */
FD_CLR(Int_val(th->fd), &readfds);
th->status = RUNNABLE;
th->fd = NO_FD;
th->retval = RESUMED_IO;
if (run_thread == NULL) run_thread = th; /* Found one. */
}
if (th->status & (BLOCKED_WRITE - 1)
&& FD_ISSET(Int_val(th->fd), &writefds)) {
/* Wake up only one thread per fd. */
FD_CLR(Int_val(th->fd), &writefds);
th->status = RUNNABLE;
th->fd = NO_FD;
th->retval = RESUMED_IO;
if (run_thread == NULL) run_thread = th; /* Found one. */
if (th->status & (BLOCKED_IO - 1)) {
Push_roots(r, 3);
r[0] = inter_fdlist_set(th->readfds, &readfds);
r[1] = inter_fdlist_set(th->writefds, &writefds);
r[2] = inter_fdlist_set(th->exceptfds, &exceptfds);
if (r[0] != NO_FDS || r[1] != NO_FDS || r[2] != NO_FDS) {
value retval = alloc(3, TAG_RESUMED_IO);
Field(retval, 0) = r[0];
Field(retval, 1) = r[1];
Field(retval, 2) = r[2];
Assign(th->retval, retval);
th->status = RUNNABLE;
if (run_thread == NULL) run_thread = th; /* Found one. */
}
Pop_roots();
}
END_FOREACH(th);
}
@ -378,6 +382,14 @@ try_again:
/* If we haven't something to run at that point, we're in big trouble. */
if (run_thread == NULL) invalid_argument("Thread: deadlock");
/* Free everything the thread was waiting on */
Assign(run_thread->readfds, NO_FDS);
Assign(run_thread->writefds, NO_FDS);
Assign(run_thread->exceptfds, NO_FDS);
Assign(run_thread->delay, NO_DELAY);
Assign(run_thread->joining, NO_JOINING);
run_thread->waitpid = NO_WAITPID;
/* Activate the thread */
curr_thread = run_thread;
stack_low = curr_thread->stack_low;
@ -418,25 +430,28 @@ value thread_sleep(unit) /* ML */
return schedule_thread();
}
/* Suspend the current thread on a Unix file descriptor */
/* Suspend the current thread on a select() request */
value thread_wait_read(fd) /* ML */
value fd;
value thread_select(arg) /* ML */
value arg;
{
if (curr_thread == NULL) return Val_unit;
double date;
/* Don't do an error if we're not initialized yet
(we can be called from thread-safe Pervasives before initialization),
just return immediately. */
if (curr_thread == NULL) return RESUMED_WAKEUP;
check_callback();
curr_thread->status = BLOCKED_READ;
curr_thread->fd = fd;
return schedule_thread();
}
value thread_wait_write(fd) /* ML */
value fd;
{
if (curr_thread == NULL) return Val_unit;
check_callback();
curr_thread->status = BLOCKED_WRITE;
curr_thread->fd = fd;
Assign(curr_thread->readfds, Field(arg, 0));
Assign(curr_thread->writefds, Field(arg, 1));
Assign(curr_thread->exceptfds, Field(arg, 2));
date = Double_val(Field(arg, 3));
if (date >= 0.0) {
date += timeofday();
Assign(curr_thread->delay, copy_double(date));
curr_thread->status = BLOCKED_IO | BLOCKED_DELAY;
} else {
curr_thread->status = BLOCKED_IO;
}
return schedule_thread();
}
@ -478,32 +493,6 @@ value thread_delay(time) /* ML */
return schedule_thread();
}
/* Suspend the current thread on a Unix file descriptor, with timeout */
value thread_wait_timed_read(fd_time) /* ML */
value fd_time;
{
double date = timeofday() + Double_val(Field(fd_time, 1));
Assert(curr_thread != NULL);
check_callback();
curr_thread->status = BLOCKED_READ | BLOCKED_DELAY;
curr_thread->fd = Field(fd_time, 0);
Assign(curr_thread->delay, copy_double(date));
return schedule_thread();
}
value thread_wait_timed_write(fd_time) /* ML */
value fd_time;
{
double date = timeofday() + Double_val(Field(fd_time, 1));
Assert(curr_thread != NULL);
check_callback();
curr_thread->status = BLOCKED_WRITE | BLOCKED_DELAY;
curr_thread->fd = Field(fd_time, 0);
Assign(curr_thread->delay, copy_double(date));
return schedule_thread();
}
/* Suspend the current thread until another thread terminates */
value thread_join(th) /* ML */
@ -568,8 +557,6 @@ value thread_kill(thread) /* ML */
if (th == th->next) failwith("Thread.kill: cannot kill the last thread");
/* This thread is no longer waiting on anything */
th->status = KILLED;
Assign(th->delay, NO_DELAY);
Assign(th->joining, NO_JOINING);
/* If this is the current thread, activate another one */
if (th == curr_thread) retval = schedule_thread();
/* Remove thread from the doubly-linked list */
@ -585,10 +572,50 @@ value thread_kill(thread) /* ML */
return retval;
}
/* Set a list of file descriptors in a fdset */
static void add_fdlist_to_set(fdl, set)
value fdl;
fd_set * set;
{
for (/*nothing*/; fdl != NO_FDS; fdl = Field(fdl, 1)) {
FD_SET(Int_val(Field(fdl, 0)), set);
}
}
/* Build the intersection of a list and a fdset (the list of file descriptors
which are both in the list and in the fdset). */
static value inter_fdlist_set(fdl, set)
value fdl;
fd_set * set;
{
value res, cons;
for (res = NO_FDS; fdl != NO_FDS; fdl = Field(fdl, 1)) {
int fd = Int_val(Field(fdl, 0));
if (FD_ISSET(fd, set)) {
Push_roots(r, 2);
r[0] = fdl;
r[1] = res;
cons = alloc(2, 0);
fdl = r[0];
res = r[1];
Pop_roots();
Field(cons, 0) = Val_int(fd);
Field(cons, 1) = res;
res = cons;
FD_CLR(fd, set); /* wake up only one thread per fd ready */
}
}
return res;
}
/* Auxiliary function for allocating the result of a waitpid() call */
#if !(defined(WIFEXITED) && defined(WEXITSTATUS) && defined(WIFSTOPPED) && \
defined(WSTOPSIG) && defined(WTERMSIG))
/* Assume old-style V7 status word */
#define WIFEXITED(status) ((status) & 0xFF == 0)
#define WEXITSTATUS(status) (((status) >> 8) & 0xFF)
#define WIFSTOPPED(status) ((status) & 0xFF == 0xFF)
@ -596,6 +623,10 @@ value thread_kill(thread) /* ML */
#define WTERMSIG(status) ((status) & 0x3F)
#endif
#define TAG_WEXITED 0
#define TAG_WSIGNALED 1
#define TAG_WSTOPPED 2
static value alloc_process_status(pid, status)
int pid, status;
{
@ -603,19 +634,19 @@ static value alloc_process_status(pid, status)
Push_roots(r, 1);
if (WIFEXITED(status)) {
st = alloc(1, 0);
st = alloc(1, TAG_WEXITED);
Field(st, 0) = Val_int(WEXITSTATUS(status));
}
else if (WIFSTOPPED(status)) {
st = alloc(1, 2);
st = alloc(1, TAG_WSTOPPED);
Field(st, 0) = Val_int(WSTOPSIG(status));
}
else {
st = alloc(1, 1);
st = alloc(1, TAG_WSIGNALED);
Field(st, 0) = Val_int(WTERMSIG(status));
}
r[0] = st;
res = alloc_tuple(2);
res = alloc(2, TAG_RESUMED_WAIT);
Field(res, 0) = Val_int(pid);
Field(res, 1) = r[0];
Pop_roots();

View File

@ -19,9 +19,10 @@ let critical_section = ref false
type resumption_status =
Resumed_wakeup
| Resumed_io
| Resumed_delay
| Resumed_join
| Resumed_select of
Unix.file_descr list * Unix.file_descr list * Unix.file_descr list
| Resumed_wait of int * Unix.process_status
(* It is mucho important that the primitives that reschedule are called
@ -38,14 +39,10 @@ external thread_initialize : unit -> unit = "thread_initialize"
external thread_new : (unit -> unit) -> t = "thread_new"
external thread_yield : unit -> unit = "thread_yield"
external thread_sleep : unit -> unit = "thread_sleep"
external thread_wait_read : Unix.file_descr -> unit = "thread_wait_read"
external thread_wait_write : Unix.file_descr -> unit = "thread_wait_write"
external thread_wait_timed_read
: Unix.file_descr * float -> resumption_status (* remeber: 1 arg *)
= "thread_wait_timed_read"
external thread_wait_timed_write
: Unix.file_descr * float -> resumption_status (* remeber: 1 arg *)
= "thread_wait_timed_write"
external thread_select :
Unix.file_descr list * Unix.file_descr list * (* remember: 1 arg *)
Unix.file_descr list * float -> resumption_status
= "thread_select"
external thread_join : t -> unit = "thread_join"
external thread_delay : float -> unit = "thread_delay"
external thread_wait_pid : int -> resumption_status = "thread_wait_pid"
@ -60,8 +57,6 @@ external id : t -> int = "thread_id"
making all other operations atomic. *)
let sleep () = critical_section := false; thread_sleep()
let wait_read fd = thread_wait_read fd
let wait_write fd = thread_wait_write fd
let delay duration = thread_delay duration
let join th = thread_join th
let wakeup pid = thread_wakeup pid
@ -69,12 +64,27 @@ let self () = thread_self()
let kill pid = thread_kill pid
let exit () = thread_kill(thread_self())
let wait_timed_read_aux arg = thread_wait_timed_read arg
let wait_timed_write_aux arg = thread_wait_timed_write arg
let select_aux arg = thread_select arg
let select readfds writefds exceptfds delay =
match select_aux (readfds, writefds, exceptfds, delay) with
Resumed_select(r, w, e) -> (r, w, e)
| _ -> ([], [], [])
let wait_read fd = select_aux([fd], [], [], -1.0); ()
let wait_write fd = select_aux([], [fd], [], -1.0); ()
let wait_timed_read fd delay =
match select_aux([fd], [], [], delay) with
Resumed_select(_, _, _) -> true
| _ -> false
let wait_timed_write fd delay =
match select_aux([], [fd], [], delay) with
Resumed_select(_, _, _) -> true
| _ -> false
let wait_pid_aux pid = thread_wait_pid pid
let wait_timed_read fd d = wait_timed_read_aux (fd, d) = Resumed_io
let wait_timed_write fd d = wait_timed_write_aux (fd, d) = Resumed_io
let wait_pid pid =
match wait_pid_aux pid with
Resumed_wait(pid, status) -> (pid, status)

View File

@ -62,6 +62,14 @@ val wait_timed_write : Unix.file_descr -> float -> bool
the amount of time given as second argument (in seconds).
Return [true] if the file descriptor is ready for input/output
and [false] if the timeout expired. *)
val select :
Unix.file_descr list -> Unix.file_descr list ->
Unix.file_descr list -> float ->
Unix.file_descr list * Unix.file_descr list * Unix.file_descr list
(* Suspend the execution of the calling thead until input/output
becomes possible on the given Unix file descriptors.
The arguments and results have the same meaning as for
[Unix.select]. *)
val wait_pid : int -> int * Unix.process_status
(* [wait_pid p] suspends the execution of the calling thread
until the Unix process specified by the process identifier [p]
@ -70,7 +78,6 @@ val wait_pid : int -> int * Unix.process_status
as the current process. Negative pid arguments represent
process groups. Returns the pid of the child caught and
its termination status, as per [Unix.wait]. *)
(*--*)
(* The following primitives provide the basis for implementing

View File

@ -68,6 +68,8 @@ let timed_write fd buff ofs len timeout =
then Unix.write fd buff ofs len
else raise (Unix_error(ETIMEDOUT, "timed_write", ""))
let select = Thread.select
(*** Interfacing with the standard input/output library *)
external in_channel_of_descr : Unix.file_descr -> in_channel

View File

@ -42,6 +42,13 @@ val timed_write : Unix.file_descr -> string -> int -> int -> float -> int
available for reading or ready for writing after [d] seconds.
The delay [d] is given in the fifth argument, in seconds. *)
(*** Polling *)
val select :
Unix.file_descr list -> Unix.file_descr list ->
Unix.file_descr list -> float ->
Unix.file_descr list * Unix.file_descr list * Unix.file_descr list
(*** Interfacing with the standard input/output library *)
external in_channel_of_descr : Unix.file_descr -> in_channel