877 lines
26 KiB
C
877 lines
26 KiB
C
/**************************************************************************/
|
|
/* */
|
|
/* OCaml */
|
|
/* */
|
|
/* Xavier Leroy, projet Cristal, INRIA Rocquencourt */
|
|
/* */
|
|
/* Copyright 1996 Institut National de Recherche en Informatique et */
|
|
/* en Automatique. */
|
|
/* */
|
|
/* All rights reserved. This file is distributed under the terms of */
|
|
/* the GNU Lesser General Public License version 2.1, with the */
|
|
/* special exception on linking described in the file LICENSE. */
|
|
/* */
|
|
/**************************************************************************/
|
|
|
|
/* The thread scheduler */
|
|
|
|
#include <string.h>
|
|
#include <stdlib.h>
|
|
#include <stdio.h>
|
|
|
|
#include "caml/alloc.h"
|
|
#include "caml/backtrace.h"
|
|
#include "caml/callback.h"
|
|
#include "caml/config.h"
|
|
#include "caml/fail.h"
|
|
#include "caml/io.h"
|
|
#include "caml/memory.h"
|
|
#include "caml/misc.h"
|
|
#include "caml/mlvalues.h"
|
|
#include "caml/printexc.h"
|
|
#include "caml/roots.h"
|
|
#include "caml/signals.h"
|
|
#include "caml/stacks.h"
|
|
#include "caml/sys.h"
|
|
|
|
#if ! (defined(HAS_SELECT) && \
|
|
defined(HAS_SETITIMER) && \
|
|
defined(HAS_GETTIMEOFDAY) && \
|
|
(defined(HAS_WAITPID) || defined(HAS_WAIT4)))
|
|
#include "Cannot compile libthreads, system calls missing"
|
|
#endif
|
|
|
|
#include <errno.h>
|
|
#include <sys/time.h>
|
|
#include <sys/types.h>
|
|
#include <sys/wait.h>
|
|
#include <sys/stat.h>
|
|
#include <fcntl.h>
|
|
#ifdef HAS_UNISTD
|
|
#include <unistd.h>
|
|
#endif
|
|
#ifdef HAS_SYS_SELECT_H
|
|
#include <sys/select.h>
|
|
#endif
|
|
|
|
#ifndef HAS_WAITPID
|
|
#define waitpid(pid,status,opts) wait4(pid,status,opts,NULL)
|
|
#endif
|
|
|
|
#ifndef O_NONBLOCK
|
|
#define O_NONBLOCK O_NDELAY
|
|
#endif
|
|
|
|
/* Configuration */
|
|
|
|
/* Initial size of stack when a thread is created (4kB) */
|
|
#define Thread_stack_size (Stack_size / 4)
|
|
|
|
/* Max computation time before rescheduling, in microseconds (50ms) */
|
|
#define Thread_timeout 50000
|
|
|
|
/* The thread descriptors */
|
|
|
|
struct caml_thread_struct {
|
|
value ident; /* Unique id (for equality comparisons) */
|
|
struct caml_thread_struct * next; /* Double linking of threads */
|
|
struct caml_thread_struct * prev;
|
|
value * stack_low; /* The execution stack for this thread */
|
|
value * stack_high;
|
|
value * stack_threshold;
|
|
value * sp;
|
|
value * trapsp;
|
|
value backtrace_pos; /* The backtrace info for this thread */
|
|
backtrace_slot * backtrace_buffer;
|
|
value backtrace_last_exn;
|
|
value status; /* RUNNABLE, KILLED. etc (see below) */
|
|
value fd; /* File descriptor on which we're doing read or write */
|
|
value readfds, writefds, exceptfds;
|
|
/* Lists of file descriptors on which we're doing select() */
|
|
value delay; /* Time until which this thread is blocked */
|
|
value joining; /* Thread we're trying to join */
|
|
value waitpid; /* PID of process we're waiting for */
|
|
value retval; /* Value to return when thread resumes */
|
|
};
|
|
|
|
typedef struct caml_thread_struct * caml_thread_t;
|
|
|
|
#define RUNNABLE Val_int(0)
|
|
#define KILLED Val_int(1)
|
|
#define SUSPENDED Val_int(2)
|
|
#define BLOCKED_READ Val_int(4)
|
|
#define BLOCKED_WRITE Val_int(8)
|
|
#define BLOCKED_SELECT Val_int(16)
|
|
#define BLOCKED_DELAY Val_int(32)
|
|
#define BLOCKED_JOIN Val_int(64)
|
|
#define BLOCKED_WAIT Val_int(128)
|
|
|
|
#define RESUMED_WAKEUP Val_int(0)
|
|
#define RESUMED_DELAY Val_int(1)
|
|
#define RESUMED_JOIN Val_int(2)
|
|
#define RESUMED_IO Val_int(3)
|
|
|
|
#define TAG_RESUMED_SELECT 0
|
|
#define TAG_RESUMED_WAIT 1
|
|
|
|
#define NO_FDS Val_unit
|
|
#define NO_DELAY Val_unit
|
|
#define NO_JOINING Val_unit
|
|
#define NO_WAITPID Val_int(0)
|
|
|
|
#define DELAY_INFTY 1E30 /* +infty, for this purpose */
|
|
|
|
/* The thread currently active */
|
|
static caml_thread_t curr_thread = NULL;
|
|
/* Identifier for next thread creation */
|
|
static value next_ident = Val_int(0);
|
|
|
|
#define Assign(dst,src) modify((value *)&(dst), (value)(src))
|
|
|
|
/* Scan the stacks of the other threads */
|
|
|
|
static void (*prev_scan_roots_hook) (scanning_action);
|
|
|
|
static void thread_scan_roots(scanning_action action)
|
|
{
|
|
caml_thread_t th, start;
|
|
|
|
/* Scan all active descriptors */
|
|
start = curr_thread;
|
|
(*action)((value) curr_thread, (value *) &curr_thread);
|
|
/* Don't scan curr_thread->sp, this has already been done.
|
|
Don't scan local roots either, for the same reason. */
|
|
for (th = start->next; th != start; th = th->next) {
|
|
do_local_roots(action, th->sp, th->stack_high, NULL);
|
|
}
|
|
/* Hook */
|
|
if (prev_scan_roots_hook != NULL) (*prev_scan_roots_hook)(action);
|
|
}
|
|
|
|
/* Forward declarations for async I/O handling */
|
|
|
|
static int stdin_initial_status, stdout_initial_status, stderr_initial_status;
|
|
static void thread_restore_std_descr(void);
|
|
|
|
/* Initialize the thread machinery */
|
|
|
|
value thread_initialize(value unit) /* ML */
|
|
{
|
|
/* Protect against repeated initialization (PR#1325) */
|
|
if (curr_thread != NULL) return Val_unit;
|
|
/* Create a descriptor for the current thread */
|
|
curr_thread =
|
|
(caml_thread_t) alloc_shr(sizeof(struct caml_thread_struct)
|
|
/ sizeof(value), 0);
|
|
curr_thread->ident = next_ident;
|
|
next_ident = Val_int(Int_val(next_ident) + 1);
|
|
curr_thread->next = curr_thread;
|
|
curr_thread->prev = curr_thread;
|
|
curr_thread->stack_low = stack_low;
|
|
curr_thread->stack_high = stack_high;
|
|
curr_thread->stack_threshold = stack_threshold;
|
|
curr_thread->sp = extern_sp;
|
|
curr_thread->trapsp = trapsp;
|
|
curr_thread->backtrace_pos = Val_int(backtrace_pos);
|
|
curr_thread->backtrace_buffer = backtrace_buffer;
|
|
caml_initialize (&curr_thread->backtrace_last_exn, backtrace_last_exn);
|
|
curr_thread->status = RUNNABLE;
|
|
curr_thread->fd = Val_int(0);
|
|
curr_thread->readfds = NO_FDS;
|
|
curr_thread->writefds = NO_FDS;
|
|
curr_thread->exceptfds = NO_FDS;
|
|
curr_thread->delay = NO_DELAY;
|
|
curr_thread->joining = NO_JOINING;
|
|
curr_thread->waitpid = NO_WAITPID;
|
|
curr_thread->retval = Val_unit;
|
|
/* Initialize GC */
|
|
prev_scan_roots_hook = scan_roots_hook;
|
|
scan_roots_hook = thread_scan_roots;
|
|
/* Set standard file descriptors to non-blocking mode */
|
|
stdin_initial_status = fcntl(0, F_GETFL);
|
|
stdout_initial_status = fcntl(1, F_GETFL);
|
|
stderr_initial_status = fcntl(2, F_GETFL);
|
|
if (stdin_initial_status != -1)
|
|
fcntl(0, F_SETFL, stdin_initial_status | O_NONBLOCK);
|
|
if (stdout_initial_status != -1)
|
|
fcntl(1, F_SETFL, stdout_initial_status | O_NONBLOCK);
|
|
if (stderr_initial_status != -1)
|
|
fcntl(2, F_SETFL, stderr_initial_status | O_NONBLOCK);
|
|
/* Register an at-exit function to restore the standard file descriptors */
|
|
atexit(thread_restore_std_descr);
|
|
return Val_unit;
|
|
}
|
|
|
|
/* Initialize the interval timer used for preemption */
|
|
|
|
value thread_initialize_preemption(value unit) /* ML */
|
|
{
|
|
struct itimerval timer;
|
|
|
|
timer.it_interval.tv_sec = 0;
|
|
timer.it_interval.tv_usec = Thread_timeout;
|
|
timer.it_value = timer.it_interval;
|
|
setitimer(ITIMER_VIRTUAL, &timer, NULL);
|
|
return Val_unit;
|
|
}
|
|
|
|
/* Create a thread */
|
|
|
|
value thread_new(value clos) /* ML */
|
|
{
|
|
caml_thread_t th;
|
|
/* Allocate the thread and its stack */
|
|
Begin_root(clos);
|
|
th = (caml_thread_t) alloc_shr(sizeof(struct caml_thread_struct)
|
|
/ sizeof(value), 0);
|
|
End_roots();
|
|
th->ident = next_ident;
|
|
next_ident = Val_int(Int_val(next_ident) + 1);
|
|
th->stack_low = (value *) caml_stat_alloc(Thread_stack_size);
|
|
th->stack_high = th->stack_low + Thread_stack_size / sizeof(value);
|
|
th->stack_threshold = th->stack_low + Stack_threshold / sizeof(value);
|
|
th->sp = th->stack_high;
|
|
th->trapsp = th->stack_high;
|
|
/* Set up a return frame that pretends we're applying the function to ().
|
|
This way, the next RETURN instruction will run the function. */
|
|
th->sp -= 5;
|
|
th->sp[0] = Val_unit; /* dummy local to be popped by RETURN 1 */
|
|
th->sp[1] = (value) Code_val(clos);
|
|
th->sp[2] = clos;
|
|
th->sp[3] = Val_long(0); /* no extra args */
|
|
th->sp[4] = Val_unit; /* the () argument */
|
|
/* Fake a C call frame */
|
|
th->sp--;
|
|
th->sp[0] = Val_unit; /* a dummy environment */
|
|
/* Finish initialization of th */
|
|
th->backtrace_pos = Val_int(0);
|
|
th->backtrace_buffer = NULL;
|
|
th->backtrace_last_exn = Val_unit;
|
|
/* The thread is initially runnable */
|
|
th->status = RUNNABLE;
|
|
th->fd = Val_int(0);
|
|
th->readfds = NO_FDS;
|
|
th->writefds = NO_FDS;
|
|
th->exceptfds = NO_FDS;
|
|
th->delay = NO_DELAY;
|
|
th->joining = NO_JOINING;
|
|
th->waitpid = NO_WAITPID;
|
|
th->retval = Val_unit;
|
|
/* Insert thread in doubly linked list of threads */
|
|
th->prev = curr_thread->prev;
|
|
th->next = curr_thread;
|
|
Assign(curr_thread->prev->next, th);
|
|
Assign(curr_thread->prev, th);
|
|
/* Return thread */
|
|
return (value) th;
|
|
}
|
|
|
|
/* Return the thread identifier */
|
|
|
|
value thread_id(value th) /* ML */
|
|
{
|
|
return ((caml_thread_t)th)->ident;
|
|
}
|
|
|
|
/* Return the current time as a floating-point number */
|
|
|
|
static double timeofday(void)
|
|
{
|
|
struct timeval tv;
|
|
gettimeofday(&tv, NULL);
|
|
return (double) tv.tv_sec + (double) tv.tv_usec * 1e-6;
|
|
}
|
|
|
|
/* Find a runnable thread and activate it */
|
|
|
|
#define FOREACH_THREAD(x) x = curr_thread; do { x = x->next;
|
|
#define END_FOREACH(x) } while (x != curr_thread)
|
|
|
|
static value alloc_process_status(int pid, int status);
|
|
static void add_fdlist_to_set(value fdl, fd_set *set);
|
|
static value inter_fdlist_set(value fdl, fd_set *set, int *count);
|
|
static void find_bad_fd(int fd, fd_set *set);
|
|
static void find_bad_fds(value fdl, fd_set *set);
|
|
|
|
static value schedule_thread(void)
|
|
{
|
|
caml_thread_t run_thread, th;
|
|
fd_set readfds, writefds, exceptfds;
|
|
double delay, now;
|
|
int need_select, need_wait;
|
|
|
|
/* Don't allow preemption during a callback */
|
|
if (callback_depth > 1) return curr_thread->retval;
|
|
|
|
/* Save the status of the current thread */
|
|
curr_thread->stack_low = stack_low;
|
|
curr_thread->stack_high = stack_high;
|
|
curr_thread->stack_threshold = stack_threshold;
|
|
curr_thread->sp = extern_sp;
|
|
curr_thread->trapsp = trapsp;
|
|
curr_thread->backtrace_pos = Val_int(backtrace_pos);
|
|
curr_thread->backtrace_buffer = backtrace_buffer;
|
|
caml_modify (&curr_thread->backtrace_last_exn, backtrace_last_exn);
|
|
|
|
try_again:
|
|
/* Find if a thread is runnable.
|
|
Build fdsets and delay for select.
|
|
See if some join or wait operations succeeded. */
|
|
run_thread = NULL;
|
|
FD_ZERO(&readfds);
|
|
FD_ZERO(&writefds);
|
|
FD_ZERO(&exceptfds);
|
|
delay = DELAY_INFTY;
|
|
now = -1.0;
|
|
need_select = 0;
|
|
need_wait = 0;
|
|
|
|
FOREACH_THREAD(th)
|
|
if (th->status <= SUSPENDED) continue;
|
|
|
|
if (th->status & (BLOCKED_READ - 1)) {
|
|
FD_SET(Int_val(th->fd), &readfds);
|
|
need_select = 1;
|
|
}
|
|
if (th->status & (BLOCKED_WRITE - 1)) {
|
|
FD_SET(Int_val(th->fd), &writefds);
|
|
need_select = 1;
|
|
}
|
|
if (th->status & (BLOCKED_SELECT - 1)) {
|
|
add_fdlist_to_set(th->readfds, &readfds);
|
|
add_fdlist_to_set(th->writefds, &writefds);
|
|
add_fdlist_to_set(th->exceptfds, &exceptfds);
|
|
need_select = 1;
|
|
}
|
|
if (th->status & (BLOCKED_DELAY - 1)) {
|
|
double th_delay;
|
|
if (now < 0.0) now = timeofday();
|
|
th_delay = Double_val(th->delay) - now;
|
|
if (th_delay <= 0) {
|
|
th->status = RUNNABLE;
|
|
Assign(th->retval,RESUMED_DELAY);
|
|
} else {
|
|
if (th_delay < delay) delay = th_delay;
|
|
}
|
|
}
|
|
if (th->status & (BLOCKED_JOIN - 1)) {
|
|
if (((caml_thread_t)(th->joining))->status == KILLED) {
|
|
th->status = RUNNABLE;
|
|
Assign(th->retval, RESUMED_JOIN);
|
|
}
|
|
}
|
|
if (th->status & (BLOCKED_WAIT - 1)) {
|
|
int status, pid;
|
|
pid = waitpid(Int_val(th->waitpid), &status, WNOHANG);
|
|
if (pid > 0) {
|
|
th->status = RUNNABLE;
|
|
Assign(th->retval, alloc_process_status(pid, status));
|
|
} else {
|
|
need_wait = 1;
|
|
}
|
|
}
|
|
END_FOREACH(th);
|
|
|
|
/* Find if a thread is runnable. */
|
|
run_thread = NULL;
|
|
FOREACH_THREAD(th)
|
|
if (th->status == RUNNABLE) { run_thread = th; break; }
|
|
END_FOREACH(th);
|
|
|
|
/* Do the select if needed */
|
|
if (need_select || run_thread == NULL) {
|
|
struct timeval delay_tv, * delay_ptr;
|
|
int retcode;
|
|
/* If a thread is blocked on wait, don't block forever */
|
|
if (need_wait && delay > Thread_timeout * 1e-6) {
|
|
delay = Thread_timeout * 1e-6;
|
|
}
|
|
/* Convert delay to a timeval */
|
|
/* If a thread is runnable, just poll */
|
|
if (run_thread != NULL) {
|
|
delay_tv.tv_sec = 0;
|
|
delay_tv.tv_usec = 0;
|
|
delay_ptr = &delay_tv;
|
|
}
|
|
else if (delay != DELAY_INFTY) {
|
|
delay_tv.tv_sec = (unsigned int) delay;
|
|
delay_tv.tv_usec = (delay - (double) delay_tv.tv_sec) * 1E6;
|
|
delay_ptr = &delay_tv;
|
|
}
|
|
else {
|
|
delay_ptr = NULL;
|
|
}
|
|
enter_blocking_section();
|
|
retcode = select(FD_SETSIZE, &readfds, &writefds, &exceptfds, delay_ptr);
|
|
leave_blocking_section();
|
|
if (retcode == -1)
|
|
switch (errno) {
|
|
case EINTR:
|
|
break;
|
|
case EBADF:
|
|
/* One of the descriptors in the sets was closed or is bad.
|
|
Find it using fstat() and wake up the threads waiting on it
|
|
so that they'll get an error when operating on it. */
|
|
FOREACH_THREAD(th)
|
|
if (th->status & (BLOCKED_READ - 1)) {
|
|
find_bad_fd(Int_val(th->fd), &readfds);
|
|
}
|
|
if (th->status & (BLOCKED_WRITE - 1)) {
|
|
find_bad_fd(Int_val(th->fd), &writefds);
|
|
}
|
|
if (th->status & (BLOCKED_SELECT - 1)) {
|
|
find_bad_fds(th->readfds, &readfds);
|
|
find_bad_fds(th->writefds, &writefds);
|
|
find_bad_fds(th->exceptfds, &exceptfds);
|
|
}
|
|
END_FOREACH(th);
|
|
retcode = FD_SETSIZE;
|
|
break;
|
|
default:
|
|
sys_error(NO_ARG);
|
|
}
|
|
if (retcode > 0) {
|
|
/* Some descriptors are ready.
|
|
Mark the corresponding threads runnable. */
|
|
FOREACH_THREAD(th)
|
|
if (retcode <= 0) break;
|
|
if ((th->status & (BLOCKED_READ - 1)) &&
|
|
FD_ISSET(Int_val(th->fd), &readfds)) {
|
|
Assign(th->retval, RESUMED_IO);
|
|
th->status = RUNNABLE;
|
|
if (run_thread == NULL) run_thread = th; /* Found one. */
|
|
/* Wake up only one thread per fd */
|
|
FD_CLR(Int_val(th->fd), &readfds);
|
|
retcode--;
|
|
}
|
|
if ((th->status & (BLOCKED_WRITE - 1)) &&
|
|
FD_ISSET(Int_val(th->fd), &writefds)) {
|
|
Assign(th->retval, RESUMED_IO);
|
|
th->status = RUNNABLE;
|
|
if (run_thread == NULL) run_thread = th; /* Found one. */
|
|
/* Wake up only one thread per fd */
|
|
FD_CLR(Int_val(th->fd), &readfds);
|
|
retcode--;
|
|
}
|
|
if (th->status & (BLOCKED_SELECT - 1)) {
|
|
value r = Val_unit, w = Val_unit, e = Val_unit;
|
|
Begin_roots3(r,w,e)
|
|
r = inter_fdlist_set(th->readfds, &readfds, &retcode);
|
|
w = inter_fdlist_set(th->writefds, &writefds, &retcode);
|
|
e = inter_fdlist_set(th->exceptfds, &exceptfds, &retcode);
|
|
if (r != NO_FDS || w != NO_FDS || e != NO_FDS) {
|
|
value retval = alloc_small(3, TAG_RESUMED_SELECT);
|
|
Field(retval, 0) = r;
|
|
Field(retval, 1) = w;
|
|
Field(retval, 2) = e;
|
|
Assign(th->retval, retval);
|
|
th->status = RUNNABLE;
|
|
if (run_thread == NULL) run_thread = th; /* Found one. */
|
|
}
|
|
End_roots();
|
|
}
|
|
END_FOREACH(th);
|
|
}
|
|
/* If we get here with run_thread still NULL, one of the following
|
|
may have happened:
|
|
- a delay has expired
|
|
- a wait() needs to be polled again
|
|
- the select() failed (e.g. was interrupted)
|
|
In these cases, we go through the loop once more to make the
|
|
corresponding threads runnable. */
|
|
if (run_thread == NULL &&
|
|
(delay != DELAY_INFTY || need_wait || retcode == -1))
|
|
goto try_again;
|
|
}
|
|
|
|
/* If we haven't something to run at that point, we're in big trouble. */
|
|
if (run_thread == NULL) invalid_argument("Thread: deadlock");
|
|
|
|
/* Free everything the thread was waiting on */
|
|
Assign(run_thread->readfds, NO_FDS);
|
|
Assign(run_thread->writefds, NO_FDS);
|
|
Assign(run_thread->exceptfds, NO_FDS);
|
|
Assign(run_thread->delay, NO_DELAY);
|
|
Assign(run_thread->joining, NO_JOINING);
|
|
run_thread->waitpid = NO_WAITPID;
|
|
|
|
/* Activate the thread */
|
|
curr_thread = run_thread;
|
|
stack_low = curr_thread->stack_low;
|
|
stack_high = curr_thread->stack_high;
|
|
stack_threshold = curr_thread->stack_threshold;
|
|
extern_sp = curr_thread->sp;
|
|
trapsp = curr_thread->trapsp;
|
|
backtrace_pos = Int_val(curr_thread->backtrace_pos);
|
|
backtrace_buffer = curr_thread->backtrace_buffer;
|
|
backtrace_last_exn = curr_thread->backtrace_last_exn;
|
|
return curr_thread->retval;
|
|
}
|
|
|
|
/* Since context switching is not allowed in callbacks, a thread that
|
|
blocks during a callback is a deadlock. */
|
|
|
|
static void check_callback(void)
|
|
{
|
|
if (callback_depth > 1)
|
|
caml_fatal_error("Thread: deadlock during callback");
|
|
}
|
|
|
|
/* Reschedule without suspending the current thread */
|
|
|
|
value thread_yield(value unit) /* ML */
|
|
{
|
|
Assert(curr_thread != NULL);
|
|
Assign(curr_thread->retval, Val_unit);
|
|
return schedule_thread();
|
|
}
|
|
|
|
/* Honor an asynchronous request for re-scheduling */
|
|
|
|
static void thread_reschedule(void)
|
|
{
|
|
value accu;
|
|
|
|
Assert(curr_thread != NULL);
|
|
/* Pop accu from event frame, making it look like a C_CALL frame
|
|
followed by a RETURN frame */
|
|
accu = *extern_sp++;
|
|
/* Reschedule */
|
|
Assign(curr_thread->retval, accu);
|
|
accu = schedule_thread();
|
|
/* Push accu below C_CALL frame so that it looks like an event frame */
|
|
*--extern_sp = accu;
|
|
}
|
|
|
|
/* Request a re-scheduling as soon as possible */
|
|
|
|
value thread_request_reschedule(value unit) /* ML */
|
|
{
|
|
async_action_hook = thread_reschedule;
|
|
something_to_do = 1;
|
|
return Val_unit;
|
|
}
|
|
|
|
/* Suspend the current thread */
|
|
|
|
value thread_sleep(value unit) /* ML */
|
|
{
|
|
Assert(curr_thread != NULL);
|
|
check_callback();
|
|
curr_thread->status = SUSPENDED;
|
|
return schedule_thread();
|
|
}
|
|
|
|
/* Suspend the current thread on a read() or write() request */
|
|
|
|
static value thread_wait_rw(int kind, value fd)
|
|
{
|
|
/* Don't do an error if we're not initialized yet
|
|
(we can be called from thread-safe Pervasives before initialization),
|
|
just return immediately. */
|
|
if (curr_thread == NULL) return RESUMED_WAKEUP;
|
|
/* As a special case, if we're in a callback, don't fail but block
|
|
the whole process till I/O is possible */
|
|
if (callback_depth > 1) {
|
|
fd_set fds;
|
|
FD_ZERO(&fds);
|
|
FD_SET(Int_val(fd), &fds);
|
|
switch(kind) {
|
|
case BLOCKED_READ: select(FD_SETSIZE, &fds, NULL, NULL, NULL); break;
|
|
case BLOCKED_WRITE: select(FD_SETSIZE, NULL, &fds, NULL, NULL); break;
|
|
}
|
|
return RESUMED_IO;
|
|
} else {
|
|
curr_thread->fd = fd;
|
|
curr_thread->status = kind;
|
|
return schedule_thread();
|
|
}
|
|
}
|
|
|
|
value thread_wait_read(value fd)
|
|
{
|
|
return thread_wait_rw(BLOCKED_READ, fd);
|
|
}
|
|
|
|
value thread_wait_write(value fd)
|
|
{
|
|
return thread_wait_rw(BLOCKED_WRITE, fd);
|
|
}
|
|
|
|
/* Suspend the current thread on a read() or write() request with timeout */
|
|
|
|
static value thread_wait_timed_rw(int kind, value arg)
|
|
{
|
|
double date;
|
|
|
|
check_callback();
|
|
curr_thread->fd = Field(arg, 0);
|
|
date = timeofday() + Double_val(Field(arg, 1));
|
|
Assign(curr_thread->delay, copy_double(date));
|
|
curr_thread->status = kind | BLOCKED_DELAY;
|
|
return schedule_thread();
|
|
}
|
|
|
|
value thread_wait_timed_read(value arg)
|
|
{
|
|
return thread_wait_timed_rw(BLOCKED_READ, arg);
|
|
}
|
|
|
|
value thread_wait_timed_write(value arg)
|
|
{
|
|
return thread_wait_timed_rw(BLOCKED_WRITE, arg);
|
|
}
|
|
|
|
/* Suspend the current thread on a select() request */
|
|
|
|
value thread_select(value arg) /* ML */
|
|
{
|
|
double date;
|
|
check_callback();
|
|
Assign(curr_thread->readfds, Field(arg, 0));
|
|
Assign(curr_thread->writefds, Field(arg, 1));
|
|
Assign(curr_thread->exceptfds, Field(arg, 2));
|
|
date = Double_val(Field(arg, 3));
|
|
if (date >= 0.0) {
|
|
date += timeofday();
|
|
Assign(curr_thread->delay, copy_double(date));
|
|
curr_thread->status = BLOCKED_SELECT | BLOCKED_DELAY;
|
|
} else {
|
|
curr_thread->status = BLOCKED_SELECT;
|
|
}
|
|
return schedule_thread();
|
|
}
|
|
|
|
/* Primitives to implement suspension on buffered channels */
|
|
|
|
value thread_inchan_ready(value vchan) /* ML */
|
|
{
|
|
struct channel * chan = Channel(vchan);
|
|
return Val_bool(chan->curr < chan->max);
|
|
}
|
|
|
|
value thread_outchan_ready(value vchan, value vsize) /* ML */
|
|
{
|
|
struct channel * chan = Channel(vchan);
|
|
intnat size = Long_val(vsize);
|
|
/* Negative size means we want to flush the buffer entirely */
|
|
if (size < 0) {
|
|
return Val_bool(chan->curr == chan->buff);
|
|
} else {
|
|
int free = chan->end - chan->curr;
|
|
if (chan->curr == chan->buff)
|
|
return Val_bool(size < free);
|
|
else
|
|
return Val_bool(size <= free);
|
|
}
|
|
}
|
|
|
|
/* Suspend the current thread for some time */
|
|
|
|
value thread_delay(value time) /* ML */
|
|
{
|
|
double date = timeofday() + Double_val(time);
|
|
Assert(curr_thread != NULL);
|
|
check_callback();
|
|
curr_thread->status = BLOCKED_DELAY;
|
|
Assign(curr_thread->delay, copy_double(date));
|
|
return schedule_thread();
|
|
}
|
|
|
|
/* Suspend the current thread until another thread terminates */
|
|
|
|
value thread_join(value th) /* ML */
|
|
{
|
|
check_callback();
|
|
Assert(curr_thread != NULL);
|
|
if (((caml_thread_t)th)->status == KILLED) return Val_unit;
|
|
curr_thread->status = BLOCKED_JOIN;
|
|
Assign(curr_thread->joining, th);
|
|
return schedule_thread();
|
|
}
|
|
|
|
/* Suspend the current thread until a Unix process exits */
|
|
|
|
value thread_wait_pid(value pid) /* ML */
|
|
{
|
|
Assert(curr_thread != NULL);
|
|
check_callback();
|
|
curr_thread->status = BLOCKED_WAIT;
|
|
curr_thread->waitpid = pid;
|
|
return schedule_thread();
|
|
}
|
|
|
|
/* Reactivate another thread */
|
|
|
|
value thread_wakeup(value thread) /* ML */
|
|
{
|
|
caml_thread_t th = (caml_thread_t) thread;
|
|
switch (th->status) {
|
|
case SUSPENDED:
|
|
th->status = RUNNABLE;
|
|
Assign(th->retval, RESUMED_WAKEUP);
|
|
break;
|
|
case KILLED:
|
|
failwith("Thread.wakeup: killed thread");
|
|
default:
|
|
failwith("Thread.wakeup: thread not suspended");
|
|
}
|
|
return Val_unit;
|
|
}
|
|
|
|
/* Return the current thread */
|
|
|
|
value thread_self(value unit) /* ML */
|
|
{
|
|
Assert(curr_thread != NULL);
|
|
return (value) curr_thread;
|
|
}
|
|
|
|
/* Kill a thread */
|
|
|
|
value thread_kill(value thread) /* ML */
|
|
{
|
|
value retval = Val_unit;
|
|
caml_thread_t th = (caml_thread_t) thread;
|
|
if (th->status == KILLED) failwith("Thread.kill: killed thread");
|
|
/* Don't paint ourselves in a corner */
|
|
if (th == th->next) failwith("Thread.kill: cannot kill the last thread");
|
|
/* This thread is no longer waiting on anything */
|
|
th->status = KILLED;
|
|
/* If this is the current thread, activate another one */
|
|
if (th == curr_thread) {
|
|
Begin_root(thread);
|
|
retval = schedule_thread();
|
|
th = (caml_thread_t) thread;
|
|
End_roots();
|
|
}
|
|
/* Remove thread from the doubly-linked list */
|
|
Assign(th->prev->next, th->next);
|
|
Assign(th->next->prev, th->prev);
|
|
/* Free its resources */
|
|
stat_free((char *) th->stack_low);
|
|
th->stack_low = NULL;
|
|
th->stack_high = NULL;
|
|
th->stack_threshold = NULL;
|
|
th->sp = NULL;
|
|
th->trapsp = NULL;
|
|
if (th->backtrace_buffer != NULL) {
|
|
free(th->backtrace_buffer);
|
|
th->backtrace_buffer = NULL;
|
|
}
|
|
return retval;
|
|
}
|
|
|
|
/* Print uncaught exception and backtrace */
|
|
|
|
value thread_uncaught_exception(value exn) /* ML */
|
|
{
|
|
char * msg = format_caml_exception(exn);
|
|
fprintf(stderr, "Thread %d killed on uncaught exception %s\n",
|
|
Int_val(curr_thread->ident), msg);
|
|
free(msg);
|
|
if (backtrace_active) print_exception_backtrace();
|
|
fflush(stderr);
|
|
return Val_unit;
|
|
}
|
|
|
|
/* Set a list of file descriptors in a fdset */
|
|
|
|
static void add_fdlist_to_set(value fdl, fd_set *set)
|
|
{
|
|
for (/*nothing*/; fdl != NO_FDS; fdl = Field(fdl, 1)) {
|
|
int fd = Int_val(Field(fdl, 0));
|
|
/* Ignore funky file descriptors, which can cause crashes */
|
|
if (fd >= 0 && fd < FD_SETSIZE) FD_SET(fd, set);
|
|
}
|
|
}
|
|
|
|
/* Build the intersection of a list and a fdset (the list of file descriptors
|
|
which are both in the list and in the fdset). */
|
|
|
|
static value inter_fdlist_set(value fdl, fd_set *set, int *count)
|
|
{
|
|
value res = Val_unit;
|
|
value cons;
|
|
|
|
Begin_roots2(fdl, res);
|
|
for (res = NO_FDS; fdl != NO_FDS; fdl = Field(fdl, 1)) {
|
|
int fd = Int_val(Field(fdl, 0));
|
|
if (FD_ISSET(fd, set)) {
|
|
cons = alloc_small(2, 0);
|
|
Field(cons, 0) = Val_int(fd);
|
|
Field(cons, 1) = res;
|
|
res = cons;
|
|
FD_CLR(fd, set); /* wake up only one thread per fd ready */
|
|
(*count)--;
|
|
}
|
|
}
|
|
End_roots();
|
|
return res;
|
|
}
|
|
|
|
/* Find closed file descriptors in a waiting list and set them to 1 in
|
|
the given fdset */
|
|
|
|
static void find_bad_fd(int fd, fd_set *set)
|
|
{
|
|
struct stat s;
|
|
if (fd >= 0 && fd < FD_SETSIZE && fstat(fd, &s) == -1 && errno == EBADF)
|
|
FD_SET(fd, set);
|
|
}
|
|
|
|
static void find_bad_fds(value fdl, fd_set *set)
|
|
{
|
|
for (/*nothing*/; fdl != NO_FDS; fdl = Field(fdl, 1))
|
|
find_bad_fd(Int_val(Field(fdl, 0)), set);
|
|
}
|
|
|
|
/* Auxiliary function for allocating the result of a waitpid() call */
|
|
|
|
#if !(defined(WIFEXITED) && defined(WEXITSTATUS) && defined(WIFSTOPPED) && \
|
|
defined(WSTOPSIG) && defined(WTERMSIG))
|
|
/* Assume old-style V7 status word */
|
|
#define WIFEXITED(status) (((status) & 0xFF) == 0)
|
|
#define WEXITSTATUS(status) (((status) >> 8) & 0xFF)
|
|
#define WIFSTOPPED(status) (((status) & 0xFF) == 0xFF)
|
|
#define WSTOPSIG(status) (((status) >> 8) & 0xFF)
|
|
#define WTERMSIG(status) ((status) & 0x3F)
|
|
#endif
|
|
|
|
#define TAG_WEXITED 0
|
|
#define TAG_WSIGNALED 1
|
|
#define TAG_WSTOPPED 2
|
|
|
|
static value alloc_process_status(int pid, int status)
|
|
{
|
|
value st, res;
|
|
|
|
if (WIFEXITED(status)) {
|
|
st = alloc_small(1, TAG_WEXITED);
|
|
Field(st, 0) = Val_int(WEXITSTATUS(status));
|
|
}
|
|
else if (WIFSTOPPED(status)) {
|
|
st = alloc_small(1, TAG_WSTOPPED);
|
|
Field(st, 0) = Val_int(WSTOPSIG(status));
|
|
}
|
|
else {
|
|
st = alloc_small(1, TAG_WSIGNALED);
|
|
Field(st, 0) = Val_int(WTERMSIG(status));
|
|
}
|
|
Begin_root(st);
|
|
res = alloc_small(2, TAG_RESUMED_WAIT);
|
|
Field(res, 0) = Val_int(pid);
|
|
Field(res, 1) = st;
|
|
End_roots();
|
|
return res;
|
|
}
|
|
|
|
/* Restore the standard file descriptors to their initial state */
|
|
|
|
static void thread_restore_std_descr(void)
|
|
{
|
|
if (stdin_initial_status != -1) fcntl(0, F_SETFL, stdin_initial_status);
|
|
if (stdout_initial_status != -1) fcntl(1, F_SETFL, stdout_initial_status);
|
|
if (stderr_initial_status != -1) fcntl(2, F_SETFL, stderr_initial_status);
|
|
}
|