ocaml/otherlibs/systhreads/posix.c

509 lines
15 KiB
C

/***********************************************************************/
/* */
/* Caml Special Light */
/* */
/* Xavier Leroy and Damien Doligez, INRIA Rocquencourt */
/* */
/* Copyright 1995 Institut National de Recherche en Informatique et */
/* Automatique. Distributed only by permission. */
/* */
/***********************************************************************/
/* $Id$ */
/* Thread interface for POSIX 1003.1c threads */
#include <string.h>
#include <pthread.h>
#include <signal.h>
#include <sys/time.h>
#include "alloc.h"
#include "callback.h"
#include "fail.h"
#include "memory.h"
#include "misc.h"
#include "mlvalues.h"
#include "roots.h"
#include "signals.h"
#include "stacks.h"
#include "sys.h"
/* Initial size of stack when a thread is created (4 Ko) */
#define Thread_stack_size (Stack_size / 4)
/* Max computation time before rescheduling, in microseconds (50ms) */
#define Thread_timeout 50000
/* The thread descriptors */
struct caml_thread_struct {
pthread_t pthread; /* The Posix thread id */
value ident; /* Unique id */
value terminated; /* Mutex held while the thread is running */
value * stack_low; /* The execution stack for this thread */
value * stack_high;
value * stack_threshold;
value * sp; /* Saved value of extern_sp for this thread */
value * trapsp; /* Saved value of trapsp for this thread */
value * local_roots; /* Saved value of local_roots for this thr. */
value * local_roots_new; /* Saved value of local_roots_new */
struct longjmp_buffer * external_raise; /* Saved external_raise */
struct caml_thread_struct * next; /* Double linking of running threads */
struct caml_thread_struct * prev;
};
typedef struct caml_thread_struct * caml_thread_t;
#define Assign(dst,src) modify((value *)&(dst), (value)(src))
/* The global mutex used to ensure that at most one thread is running
Caml code */
pthread_mutex_t caml_mutex;
/* The key used for storing the thread descriptor in the specific data
of the corresponding Posix thread. */
pthread_key_t thread_descriptor_key;
/* Identifier for next thread creation */
static long thread_next_ident = 0;
/* Forward declarations */
value caml_mutex_new P((value));
value caml_mutex_lock P((value));
value caml_mutex_unlock P((value));
static void caml_pthread_check P((int, char *));
/* Hook for scanning the stacks of the other threads */
static void (*prev_scan_roots_hook) P((scanning_action));
static void caml_thread_scan_roots(action)
scanning_action action;
{
caml_thread_t curr_thread, new_curr_thread, th;
register value * sp;
value * block;
struct caml_roots_block *lr;
long i;
curr_thread = pthread_getspecific(thread_descriptor_key);
/* Scan all thread descriptors */
(*action)((value) curr_thread, (value *) &new_curr_thread);
Assert(curr_thread == new_curr_thread);
/* Scan the stacks, except that of the current thread (already done). */
for (th = curr_thread->next; th != curr_thread; th = th->next) {
for (sp = th->sp; sp < th->stack_high; sp++) {
(*action)(*sp, sp);
}
for (lr = th->local_roots_new; lr != NULL; lr = lr->next) {
for (i = 0; i < lr->len; i++){
sp = lr->roots[i];
(*action)(*sp, sp);
}
}
for (block = th->local_roots; block != NULL; block = (value *) block [1]){
for (sp = block - (long) block [0]; sp < block; sp++){
(*action)(*sp, sp);
}
}
}
/* Hook */
if (prev_scan_roots_hook != NULL) (*prev_scan_roots_hook)(action);
}
/* Hooks for enter_blocking_section and leave_blocking_section */
static void (*prev_enter_blocking_section_hook) ();
static void (*prev_leave_blocking_section_hook) ();
static void caml_thread_enter_blocking_section()
{
caml_thread_t curr_thread;
if (prev_enter_blocking_section_hook != NULL)
(*prev_enter_blocking_section_hook)();
/* Save the stack-related global variables in the thread descriptor
of the current thread */
curr_thread = pthread_getspecific(thread_descriptor_key);
curr_thread->stack_low = stack_low;
curr_thread->stack_high = stack_high;
curr_thread->stack_threshold = stack_threshold;
curr_thread->sp = extern_sp;
curr_thread->trapsp = trapsp;
curr_thread->local_roots = local_roots;
curr_thread->local_roots_new = local_roots_new;
curr_thread->external_raise = external_raise;
/* Release the global mutex */
pthread_mutex_unlock(&caml_mutex);
}
static void caml_thread_leave_blocking_section()
{
caml_thread_t curr_thread;
/* Re-acquire the global mutex */
pthread_mutex_lock(&caml_mutex);
/* Restore the stack-related global variables */
curr_thread = pthread_getspecific(thread_descriptor_key);
stack_low = curr_thread->stack_low;
stack_high = curr_thread->stack_high;
stack_threshold = curr_thread->stack_threshold;
extern_sp = curr_thread->sp;
trapsp = curr_thread->trapsp;
local_roots = curr_thread->local_roots;
local_roots_new = curr_thread->local_roots_new;
external_raise = curr_thread->external_raise;
if (prev_leave_blocking_section_hook != NULL)
(*prev_leave_blocking_section_hook)();
}
/* The "tick" thread fakes a SIGVTALRM signal at regular intervals. */
static void * caml_thread_tick()
{
struct timeval timeout;
while(1) {
/* select() seems to be the most efficient way to suspend the
thread for sub-second intervals */
timeout.tv_sec = 0;
timeout.tv_usec = Thread_timeout;
select(0, NULL, NULL, NULL, &timeout);
/* This signal should never cause a callback, so don't go through
handle_signal(), tweak the global variables directly. */
pending_signal = SIGVTALRM;
something_to_do = 1;
}
}
/* Thread cleanup: remove the descriptor from the list and free
the stack space. */
static void caml_thread_cleanup(th)
caml_thread_t th;
{
/* Signal that the thread has terminated */
caml_mutex_unlock(th->terminated);
/* Remove th from the doubly-linked list of threads */
Assign(th->next->prev, th->prev);
Assign(th->prev->next, th->next);
/* Free the memory resources */
stat_free((char *) th->stack_low);
th->stack_low = NULL;
th->stack_high = NULL;
th->stack_threshold = NULL;
th->sp = NULL;
th->trapsp = NULL;
th->local_roots = NULL;
th->local_roots_new = NULL;
th->external_raise = NULL;
/* Release the main mutex */
pthread_mutex_unlock(&caml_mutex);
}
/* Initialize the thread machinery */
value caml_thread_initialize(unit) /* ML */
value unit;
{
pthread_t tick_pthread;
pthread_attr_t attr;
caml_thread_t th;
value mu = Val_unit;
Begin_root (mu);
/* Initialize the main mutex */
caml_pthread_check(pthread_mutex_init(&caml_mutex, NULL),
"Thread.init");
pthread_mutex_lock(&caml_mutex);
/* Initialize the key */
pthread_key_create(&thread_descriptor_key, NULL);
/* Create and acquire a termination lock for the current thread */
mu = caml_mutex_new(Val_unit);
caml_mutex_lock(mu);
/* Create a descriptor for the current thread */
th = (caml_thread_t)
alloc_shr(sizeof(struct caml_thread_struct) / sizeof(value), 0);
th->pthread = pthread_self();
th->ident = Val_long(thread_next_ident);
th->terminated = mu;
thread_next_ident++;
/* The stack-related fields will be filled in at the next
enter_blocking_section */
th->next = th;
th->prev = th;
/* Associate the thread descriptor with the thread */
pthread_setspecific(thread_descriptor_key, (void *) th);
/* Allow cancellation */
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
/* Set up the hooks */
prev_scan_roots_hook = scan_roots_hook;
scan_roots_hook = caml_thread_scan_roots;
prev_enter_blocking_section_hook = enter_blocking_section_hook;
enter_blocking_section_hook = caml_thread_enter_blocking_section;
prev_leave_blocking_section_hook = leave_blocking_section_hook;
leave_blocking_section_hook = caml_thread_leave_blocking_section;
/* Fork the tick thread */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
caml_pthread_check(
pthread_create(&tick_pthread, &attr, caml_thread_tick, NULL),
"Thread.init");
pthread_detach(tick_pthread);
End_roots();
return Val_unit;
}
/* Create a thread */
static void * caml_thread_start(th)
caml_thread_t th;
{
value clos;
/* Associate the thread descriptor with the thread */
pthread_setspecific(thread_descriptor_key, (void *) th);
/* Set up termination routine */
pthread_cleanup_push(caml_thread_cleanup, (void *) th);
/* Allow cancellation */
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
/* Acquire the global mutex and set up the stack variables */
leave_blocking_section();
/* Callback the closure */
clos = *extern_sp++;
callback(clos, Val_unit);
/* Cleanup: free the thread resources and release the mutex */
pthread_cleanup_pop(1);
return 0;
}
value caml_thread_new(clos) /* ML */
value clos;
{
pthread_attr_t attr;
caml_thread_t th, curr_thread;
value mu = Val_unit;
Begin_root (mu);
/* Create and acquire the termination lock */
mu = caml_mutex_new(Val_unit);
caml_mutex_lock(mu);
/* Allocate the thread and its stack */
th = (caml_thread_t)
alloc_shr(sizeof(struct caml_thread_struct) / sizeof(value), 0);
th->ident = Val_long(thread_next_ident);
thread_next_ident++;
th->terminated = mu;
th->stack_low = (value *) stat_alloc(Thread_stack_size);
th->stack_high = th->stack_low + Thread_stack_size / sizeof(value);
th->stack_threshold = th->stack_low + Stack_threshold / sizeof(value);
th->sp = th->stack_high;
th->trapsp = th->stack_high;
th->local_roots = NULL;
th->local_roots_new = NULL;
th->external_raise = NULL;
/* Add it to the list of threads */
curr_thread = pthread_getspecific(thread_descriptor_key);
th->next = curr_thread->next;
th->prev = curr_thread;
Assign(curr_thread->next->prev, th);
Assign(curr_thread->next, th);
/* Pass the closure in the newly created stack, so that it will be
preserved by garbage collection */
*--(th->sp) = clos;
/* Fork the new thread */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
caml_pthread_check(
pthread_create(&th->pthread, &attr, caml_thread_start, (void *) th),
"Thread.new");
End_roots();
return (value) th;
}
/* Return the current thread */
value caml_thread_self(unit) /* ML */
value unit;
{
caml_thread_t curr_thread;
curr_thread = pthread_getspecific(thread_descriptor_key);
if (curr_thread == NULL) invalid_argument("Thread.self: not initialized");
return (value) curr_thread;
}
/* Return the identifier of a thread */
value caml_thread_id(th) /* ML */
caml_thread_t th;
{
return th->ident;
}
/* Allow re-scheduling */
value caml_thread_yield(unit) /* ML */
value unit;
{
enter_blocking_section();
#if defined(HAS_SCHED_YIELD)
sched_yield();
#elif defined(HAS_PTHREAD_YIELD)
pthread_yield();
#endif
leave_blocking_section();
return Val_unit;
}
/* Suspend the current thread until another thread terminates */
value caml_thread_join(th) /* ML */
caml_thread_t th;
{
caml_mutex_lock(th->terminated);
caml_mutex_unlock(th->terminated);
return Val_unit;
}
/* Terminate the current thread */
value caml_thread_exit(unit) /* ML */
value unit;
{
pthread_exit(0);
return Val_unit; /* never reached */
}
/* Kill another thread */
value caml_thread_kill(th) /* ML */
caml_thread_t th;
{
pthread_cancel(th->pthread);
return Val_unit;
}
/* Mutex operations */
#define Mutex_val(v) (*((pthread_mutex_t *)(&Field(v, 1))))
#define Max_mutex_number 1000
static void caml_mutex_finalize(mut)
value mut;
{
pthread_mutex_destroy(&Mutex_val(mut));
}
value caml_mutex_new(unit) /* ML */
value unit;
{
value mut;
mut = alloc_final(1 + sizeof(pthread_mutex_t) / sizeof(value),
caml_mutex_finalize, 1, Max_mutex_number);
caml_pthread_check(pthread_mutex_init(&Mutex_val(mut), NULL), "Mutex.new");
return mut;
}
value caml_mutex_lock(mut) /* ML */
value mut;
{
int retcode;
enter_blocking_section();
retcode = pthread_mutex_lock(&(Mutex_val(mut)));
leave_blocking_section();
caml_pthread_check(retcode, "Mutex.lock");
return Val_unit;
}
value caml_mutex_unlock(mut) /* ML */
value mut;
{
int retcode;
enter_blocking_section();
retcode = pthread_mutex_unlock(&(Mutex_val(mut)));
leave_blocking_section();
caml_pthread_check(retcode, "Mutex.unlock");
return Val_unit;
}
value caml_mutex_try_lock(mut) /* ML */
value mut;
{
int retcode;
retcode = pthread_mutex_trylock(&(Mutex_val(mut)));
return retcode == 0 ? Val_true : Val_false;
}
/* Conditions operations */
#define Condition_val(v) (*((pthread_cond_t *)(&Field(v, 1))))
#define Max_condition_number 1000
static void caml_condition_finalize(cond)
value cond;
{
pthread_cond_destroy(&Condition_val(cond));
}
value caml_condition_new(unit) /* ML */
value unit;
{
value cond;
cond = alloc_final(1 + sizeof(pthread_cond_t) / sizeof(value),
caml_condition_finalize, 1, Max_condition_number);
caml_pthread_check(pthread_cond_init(&Condition_val(cond), NULL),
"Condition.new");
return cond;
}
value caml_condition_wait(cond, mut) /* ML */
value cond, mut;
{
int retcode;
enter_blocking_section();
retcode = pthread_cond_wait(&Condition_val(cond), &Mutex_val(mut));
leave_blocking_section();
caml_pthread_check(retcode, "Condition.wait");
return Val_unit;
}
value caml_condition_signal(cond) /* ML */
value cond;
{
int retcode;
enter_blocking_section();
retcode = pthread_cond_signal(&Condition_val(cond));
leave_blocking_section();
caml_pthread_check(retcode, "Condition.signal");
return Val_unit;
}
value caml_condition_broadcast(cond) /* ML */
value cond;
{
int retcode;
enter_blocking_section();
retcode = pthread_cond_broadcast(&Condition_val(cond));
leave_blocking_section();
caml_pthread_check(retcode, "Condition.broadcast");
return Val_unit;
}
/* Error report */
static void caml_pthread_check(retcode, msg)
int retcode;
char * msg;
{
char * err;
int errlen, msglen;
value str;
if (retcode == 0) return;
err = strerror(retcode);
msglen = strlen(msg);
errlen = strlen(err);
str = alloc_string(msglen + 2 + errlen);
bcopy(msg, &Byte(str, 0), msglen);
bcopy(": ", &Byte(str, msglen), 2);
bcopy(err, &Byte(str, msglen + 2), errlen);
raise_sys_error(str);
}