Finition des I/O avec verrouillage.

Portage commun POSIX/Windows.


git-svn-id: http://caml.inria.fr/svn/ocaml/trunk@966 f963ae5c-01c2-4b8c-9fe0-0dff7051ff02
master
Xavier Leroy 1996-09-09 12:25:20 +00:00
parent 76d09bd763
commit 5a2e78b436
8 changed files with 699 additions and 89 deletions

View File

@ -0,0 +1,29 @@
posix.o: posix.c ../../byterun/alloc.h ../../byterun/misc.h \
../../byterun/config.h ../../byterun/../config/m.h \
../../byterun/../config/s.h ../../byterun/mlvalues.h \
../../byterun/fail.h ../../byterun/memory.h ../../byterun/gc.h \
../../byterun/major_gc.h ../../byterun/freelist.h \
../../byterun/minor_gc.h ../../byterun/roots.h \
../../byterun/signals.h ../../byterun/stacks.h ../../byterun/sys.h
win32.o: win32.c ../../byterun/alloc.h ../../byterun/misc.h \
../../byterun/config.h ../../byterun/../config/m.h \
../../byterun/../config/s.h ../../byterun/mlvalues.h \
../../byterun/fail.h ../../byterun/memory.h ../../byterun/gc.h \
../../byterun/major_gc.h ../../byterun/freelist.h \
../../byterun/minor_gc.h ../../byterun/roots.h \
../../byterun/signals.h ../../byterun/stacks.h ../../byterun/sys.h
condition.cmi: mutex.cmi
condition.cmo: mutex.cmi condition.cmi
condition.cmx: mutex.cmx condition.cmi
event.cmo: condition.cmi mutex.cmi event.cmi
event.cmx: condition.cmx mutex.cmx event.cmi
iolock.cmo: iolock.cmi
iolock.cmx: iolock.cmi
mutex.cmo: mutex.cmi
mutex.cmx: mutex.cmi
pervasives.cmo: iolock.cmi pervasives.cmi
pervasives.cmx: iolock.cmx pervasives.cmi
thread.cmo: thread.cmi
thread.cmx: thread.cmi
threadUnix.cmo: iolock.cmi thread.cmi threadUnix.cmi
threadUnix.cmx: iolock.cmx thread.cmx threadUnix.cmi

View File

@ -6,10 +6,23 @@ CFLAGS=-I..\..\byterun $(BYTECCCOMPOPTS)
CAMLC=..\..\boot\ocamlrun ..\..\boot\ocamlc -I ..\..\boot -I ..\win32unix
C_OBJS=win32.obj
CAML_OBJS=mutex.cmo condition.cmo threadIO.cmo threadPrintexc.cmo \
thread.cmo event.cmo threadUnix.cmo threadPrintf.cmo
all: libthreads.lib threads.cma
LIB=..\..\stdlib
LIB_OBJS=mutex.cmo iolock.cmo pervasives.cmo \
$(LIB)\list.cmo $(LIB)\string.cmo $(LIB)\char.cmo \
$(LIB)\array.cmo $(LIB)\sys.cmo $(LIB)\hashtbl.cmo $(LIB)\sort.cmo \
$(LIB)\filename.cmo $(LIB)\obj.cmo $(LIB)\lexing.cmo $(LIB)\parsing.cmo \
$(LIB)\set.cmo $(LIB)\map.cmo $(LIB)\stack.cmo $(LIB)\queue.cmo \
$(LIB)\stream.cmo $(LIB)\printf.cmo $(LIB)\format.cmo $(LIB)\arg.cmo \
$(LIB)\printexc.cmo $(LIB)\gc.cmo $(LIB)\digest.cmo $(LIB)\random.cmo \
$(LIB)\oo.cmo $(LIB)\genlex.cmo
THREAD_OBJS=thread.cmo condition.cmo event.cmo threadUnix.cmo
GENFILES=pervasives.mli thread.ml condition.ml
all: libthreads.lib threads.cma stdlib.cma
allopt:
@ -17,21 +30,41 @@ libthreads.lib: $(C_OBJS)
rm -f libthreads.lib
$(MKLIB)libthreads.lib $(C_OBJS)
threads.cma: $(CAML_OBJS)
$(CAMLC) -a -linkall -o threads.cma $(CAML_OBJS)
threads.cma: $(THREAD_OBJS)
$(CAMLC) -a -o threads.cma $(THREAD_OBJS)
stdlib.cma: $(LIB_OBJS)
$(CAMLC) -a -o stdlib.cma $(LIB_OBJS)
pervasives.cmo: pervasives.mli pervasives.cmi pervasives.ml
$(CAMLC) -nopervasives -c pervasives.ml
pervasives.mli: $(LIB)\pervasives.mli
cp $(LIB)/pervasives.mli pervasives.mli
pervasives.cmi: $(LIB)\pervasives.cmi
cp $(LIB)/pervasives.cmi pervasives.cmi
iolock.cmo: iolock.ml
$(CAMLC) -nopervasives -c iolock.ml
iolock.cmi: iolock.mli
$(CAMLC) -nopervasives -c iolock.mli
clean:
rm -f libthreads.lib *.obj *.cm*
rm -f libthreads.a *.o *.cm*
rm -f $(GENFILES)
realclean:
install:
cp libthreads.lib $(LIBDIR)/libthreads.lib
cp *.cmi threads.cma $(LIBDIR)
if not exist $(LIBDIR)\threads mkdir $(LIBDIR)\threads
cp thread.cmi mutex.cmi condition.cmi event.cmi threadUnix.cmi threads.cma stdlib.cma $(LIBDIR)/threads
installopt:
.SUFFIXES: .ml .mli .cmo .cmi .cmx
.SUFFIXES: .ml .mli .mlp .cmo .cmi .cmx
.mli.cmi:
$(CAMLC) -c $(COMPFLAGS) $<
@ -42,14 +75,16 @@ installopt:
.ml.cmx:
$(CAMLOPT) -c $(COMPFLAGS) $<
.mlp.ml:
@rm -f $*.ml
$(CPP) $(CPPFLAGS) $*.mlp > $*.ml
@attrib +r $*.ml
thread.ml: thread.mlp
condition.ml: condition.mlp
depend:
# gcc -MM -I../../byterun *.c > .depend
..\..\boot\ocamlrun ../../tools/ocamldep *.mli *.ml >> .depend
threadstubs.obj: threadstubs.c
$(CC) $(CFLAGS) -c threadstubs.c
win.obj: win.c
$(CC) $(CFLAGS) -c win.c
include .depend

View File

@ -11,6 +11,8 @@
(* $Id$ *)
#ifdef WIN32
type t = { mut: Mutex.t; mutable waiting: Thread.t list }
let create () =
@ -45,3 +47,12 @@ let broadcast cond =
Mutex.unlock cond.mut;
List.iter wakeup w
#else
type t
external create: unit -> t = "caml_condition_new"
external wait: t -> Mutex.t -> unit = "caml_condition_wait"
external signal: t -> unit = "caml_condition_signal"
external broadcast: t -> unit = "caml_condition_broadcast"
#endif

View File

@ -11,24 +11,63 @@
(* $Id$ *)
let master_lock = Mutex.create()
(* Since this file is linked before Pervasives and all other library
modules, we can't use any of the standard library functions. *)
let iolocks = (Hashtbl.create 11 : (Obj.t, Mutex.t) Hashtbl.t)
external raise : exn -> 'a = "%raise"
external (mod) : int -> int -> int = "%modint"
external (==) : 'a -> 'a -> bool = "%eq"
external array_create: int -> 'a -> 'a array = "make_vect"
external array_get: 'a array -> int -> 'a = "%array_unsafe_get"
external array_set: 'a array -> int -> 'a -> unit = "%array_unsafe_set"
type obj
external obj_repr: 'a -> obj = "%identity"
external hash_param : int -> int -> 'a -> int = "hash_univ_param" "noalloc"
type mutex
external mutex_create: unit -> mutex = "caml_mutex_new"
external mutex_lock: mutex -> unit = "caml_mutex_lock"
external mutex_unlock: mutex -> unit = "caml_mutex_unlock"
let master_lock = mutex_create()
type bucketlist =
Empty
| Cons of obj * mutex * bucketlist
let iolocks = array_create 27 Empty
let hash channel = (hash_param 10 10 channel) mod 27
let add channel =
let m = Mutex.create() in
Mutex.lock master_lock;
Hashtbl.add iolocks (Obj.repr channel) m;
Mutex.unlock master_lock;
let m = mutex_create() in
mutex_lock master_lock;
let h = hash channel in
array_set iolocks h (Cons(obj_repr channel, m, array_get iolocks h));
mutex_unlock master_lock;
channel
let rec remove_from_bucket ch = function
Empty -> Empty
| Cons(k, m, rem) ->
if ch == k then rem else Cons(k, m, remove_from_bucket ch rem)
let remove channel =
Mutex.lock master_lock;
Hashtbl.remove iolocks (Obj.repr channel);
Mutex.unlock master_lock
mutex_lock master_lock;
let h = hash channel in
array_set iolocks h
(remove_from_bucket (obj_repr channel) (array_get iolocks h));
mutex_unlock master_lock
let rec find_in_bucket ch = function
Empty ->
raise(Invalid_argument "Pervasives: channel closed")
| Cons(k, m, rem) ->
if ch == k then m else find_in_bucket ch rem
let find channel =
try
Hashtbl.find iolocks (Obj.repr channel)
with Not_found ->
invalid_arg "Pervasives: channel closed or not correctly opened"
find_in_bucket (obj_repr channel) (array_get iolocks (hash channel))
let lock m = mutex_lock m
let unlock m = mutex_unlock m

View File

@ -11,6 +11,13 @@
(* $Id$ *)
value add : 'a -> 'a
value remove : 'a -> unit
value find : 'a -> Mutex.t
(* Locking on I/O channels. *)
type mutex
val add : 'a -> 'a
val remove : 'a -> unit
val find : 'a -> mutex
val lock : mutex -> unit
val unlock : mutex -> unit

View File

@ -157,33 +157,33 @@ let stderr = Iolock.add(open_descriptor_out 2)
let wrap1 fn chan =
let m = Iolock.find chan in
Mutex.lock m;
Iolock.lock m;
try
let res = fn chan in
Mutex.unlock m;
Iolock.unlock m;
res
with x ->
Mutex.unlock m; raise x
Iolock.unlock m; raise x
let wrap2 fn chan arg =
let m = Iolock.find chan in
Mutex.lock m;
Iolock.lock m;
try
let res = fn chan arg in
Mutex.unlock m;
Iolock.unlock m;
res
with x ->
Mutex.unlock m; raise x
Iolock.unlock m; raise x
let wrap4 fn chan arg1 arg2 arg3 =
let m = Iolock.find chan in
Mutex.lock m;
Iolock.lock m;
try
let res = fn chan arg1 arg2 arg3 in
Mutex.unlock m;
Iolock.unlock m;
res
with x ->
Mutex.unlock m; raise x
Iolock.unlock m; raise x
(* General output functions *)
@ -203,14 +203,15 @@ let open_out name =
let open_out_bin name =
open_out_gen [Open_wronly; Open_creat; Open_trunc; Open_binary] 0o666 name
external flush : out_channel -> unit = "flush"
let flush = wrap1 flush
external flush_unlocked : out_channel -> unit = "flush"
let flush = wrap1 flush_unlocked
external unsafe_output : out_channel -> string -> int -> int -> unit = "output"
let unsafe_output = wrap4 unsafe_output
external unsafe_output_unlocked : out_channel -> string -> int -> int -> unit
= "output"
let unsafe_output = wrap4 unsafe_output_unlocked
external output_char : out_channel -> char -> unit = "output_char"
let output_char = wrap2 output_char
external output_char_unlocked : out_channel -> char -> unit = "output_char"
let output_char = wrap2 output_char_unlocked
let output_string oc s =
unsafe_output oc s 0 (string_length s)
@ -220,27 +221,27 @@ let output oc s ofs len =
then invalid_arg "output"
else unsafe_output oc s ofs len
external output_byte : out_channel -> int -> unit = "output_char"
let output_byte = wrap2 output_byte
external output_byte_unlocked : out_channel -> int -> unit = "output_char"
let output_byte = wrap2 output_byte_unlocked
external output_binary_int : out_channel -> int -> unit = "output_int"
let output_binary_int = wrap2 output_binary_int
external output_binary_int_unlocked : out_channel -> int -> unit = "output_int"
let output_binary_int = wrap2 output_binary_int_unlocked
external output_value : out_channel -> 'a -> unit = "output_value"
let output_value oc v = wrap2 output_value oc v
external output_value_unlocked : out_channel -> 'a -> unit = "output_value"
let output_value oc v = wrap2 output_value_unlocked oc v
external seek_out : out_channel -> int -> unit = "seek_out"
let seek_out = wrap2 seek_out
external seek_out_unlocked : out_channel -> int -> unit = "seek_out"
let seek_out = wrap2 seek_out_unlocked
external pos_out : out_channel -> int = "pos_out"
let pos_out = wrap1 pos_out
external pos_out_unlocked : out_channel -> int = "pos_out"
let pos_out = wrap1 pos_out_unlocked
external out_channel_length : out_channel -> int = "channel_size"
let out_channel_length = wrap1 out_channel_length
external out_channel_length_unlocked : out_channel -> int = "channel_size"
let out_channel_length = wrap1 out_channel_length_unlocked
external close_out_channel : out_channel -> unit = "close_channel"
let close_out_channel = wrap1 close_out_channel
let close_out oc = flush oc; close_out_channel oc; Iolock.remove oc
external close_out_channel_unlocked : out_channel -> unit = "close_channel"
let close_out oc =
flush oc; wrap1 close_out_channel_unlocked oc; Iolock.remove oc
(* General input functions *)
@ -253,11 +254,12 @@ let open_in name =
let open_in_bin name =
open_in_gen [Open_rdonly; Open_binary] 0 name
external input_char : in_channel -> char = "input_char"
let input_char = wrap1 input_char
external input_char_unlocked : in_channel -> char = "input_char"
let input_char = wrap1 input_char_unlocked
external unsafe_input : in_channel -> string -> int -> int -> int = "input"
let unsafe_input = wrap4 unsafe_input
external unsafe_input_unlocked : in_channel -> string -> int -> int -> int
= "input"
let unsafe_input = wrap4 unsafe_input_unlocked
let input ic s ofs len =
if ofs < 0 or ofs + len > string_length s
@ -279,46 +281,46 @@ let really_input ic s ofs len =
external input_scan_line : in_channel -> int = "input_scan_line"
let rec input_line chan =
let rec input_line_unlocked chan =
let n = input_scan_line chan in
if n = 0 then (* n = 0: we are at EOF *)
raise End_of_file
else if n > 0 then begin (* n > 0: newline found in buffer *)
let res = string_create (n-1) in
unsafe_input chan res 0 (n-1);
input_char chan; (* skip the newline *)
unsafe_input_unlocked chan res 0 (n-1);
input_char_unlocked chan; (* skip the newline *)
res
end else begin (* n < 0: newline not found *)
let beg = string_create (-n) in
unsafe_input chan beg 0 (-n);
unsafe_input_unlocked chan beg 0 (-n);
try
beg ^ input_line chan
beg ^ input_line_unlocked chan
with End_of_file ->
beg
end
let input_line = wrap1 input_line
let input_line = wrap1 input_line_unlocked
external input_byte : in_channel -> int = "input_char"
let input_byte = wrap1 input_byte
external input_byte_unlocked : in_channel -> int = "input_char"
let input_byte = wrap1 input_byte_unlocked
external input_binary_int : in_channel -> int = "input_int"
let input_binary_int = wrap1 input_binary_int
external input_binary_int_unlocked : in_channel -> int = "input_int"
let input_binary_int = wrap1 input_binary_int_unlocked
external input_value : in_channel -> 'a = "input_value"
let input_value ic = wrap1 input_value ic
external input_value_unlocked : in_channel -> 'a = "input_value"
let input_value ic = wrap1 input_value_unlocked ic
external seek_in : in_channel -> int -> unit = "seek_in"
let seek_in = wrap2 seek_in
external seek_in_unlocked : in_channel -> int -> unit = "seek_in"
let seek_in = wrap2 seek_in_unlocked
external pos_in : in_channel -> int = "pos_in"
let pos_in = wrap1 pos_in
external pos_in_unlocked : in_channel -> int = "pos_in"
let pos_in = wrap1 pos_in_unlocked
external in_channel_length : in_channel -> int = "channel_size"
let in_channel_length = wrap1 in_channel_length
external in_channel_length_unlocked : in_channel -> int = "channel_size"
let in_channel_length = wrap1 in_channel_length_unlocked
external close_in : in_channel -> unit = "close_channel"
let close_in ic = wrap1 close_in ic; Iolock.remove ic
external close_in_unlocked : in_channel -> unit = "close_channel"
let close_in ic = wrap1 close_in_unlocked ic; Iolock.remove ic
(* Output functions on standard output *)

View File

@ -0,0 +1,467 @@
/***********************************************************************/
/* */
/* Caml Special Light */
/* */
/* Xavier Leroy and Damien Doligez, INRIA Rocquencourt */
/* */
/* Copyright 1995 Institut National de Recherche en Informatique et */
/* Automatique. Distributed only by permission. */
/* */
/***********************************************************************/
/* $Id$ */
/* Thread interface for POSIX 1003.1c threads */
#include <pthread.h>
#include <signal.h>
#include <sys/time.h>
#include "alloc.h"
#include "fail.h"
#include "memory.h"
#include "misc.h"
#include "mlvalues.h"
#include "roots.h"
#include "signals.h"
#include "stacks.h"
#include "sys.h"
/* Initial size of stack when a thread is created (4 Ko) */
#define Thread_stack_size (Stack_size / 4)
/* Max computation time before rescheduling, in microseconds (50ms) */
#define Thread_timeout 50000
/* The thread descriptors */
struct caml_thread_struct {
pthread_t pthread; /* The Posix thread id */
value ident; /* Unique id */
value terminated; /* Mutex held while the thread is running */
value * stack_low; /* The execution stack for this thread */
value * stack_high;
value * stack_threshold;
value * sp; /* Saved value of extern_sp for this thread */
value * trapsp; /* Saved value of trapsp for this thread */
value * local_roots; /* Saved value of local_roots for this thr. */
struct longjmp_buffer * external_raise; /* Saved external_raise */
struct caml_thread_struct * next; /* Double linking of running threads */
struct caml_thread_struct * prev;
};
typedef struct caml_thread_struct * caml_thread_t;
#define Assign(dst,src) modify((value *)&(dst), (value)(src))
/* The global mutex used to ensure that at most one thread is running
Caml code */
pthread_mutex_t caml_mutex;
/* The key used for storing the thread descriptor in the specific data
of the corresponding Posix thread. */
pthread_key_t thread_descriptor_key;
/* Identifier for next thread creation */
static long thread_next_ident = 0;
/* Forward declarations */
value caml_mutex_new P((value));
value caml_mutex_lock P((value));
value caml_mutex_unlock P((value));
/* Hook for scanning the stacks of the other threads */
static void (*prev_scan_roots_hook) P((scanning_action));
static void caml_thread_scan_roots(action)
scanning_action action;
{
caml_thread_t curr_thread, new_curr_thread, th;
register value * sp;
value * block;
curr_thread = pthread_getspecific(thread_descriptor_key);
/* Scan all thread descriptors */
(*action)((value) curr_thread, (value *) &new_curr_thread);
Assert(curr_thread == new_curr_thread);
/* Scan the stacks, except that of the current thread (already done). */
for (th = curr_thread->next; th != curr_thread; th = th->next) {
for (sp = th->sp; sp < th->stack_high; sp++) {
(*action)(*sp, sp);
}
for (block = th->local_roots; block != NULL; block = (value *) block [1]){
for (sp = block - (long) block [0]; sp < block; sp++){
(*action)(*sp, sp);
}
}
}
/* Hook */
if (prev_scan_roots_hook != NULL) (*prev_scan_roots_hook)(action);
}
/* Hooks for enter_blocking_section and leave_blocking_section */
static void (*prev_enter_blocking_section_hook) ();
static void (*prev_leave_blocking_section_hook) ();
static void caml_thread_enter_blocking_section()
{
caml_thread_t curr_thread;
if (prev_enter_blocking_section_hook != NULL)
(*prev_enter_blocking_section_hook)();
/* Save the stack-related global variables in the thread descriptor
of the current thread */
curr_thread = pthread_getspecific(thread_descriptor_key);
curr_thread->stack_low = stack_low;
curr_thread->stack_high = stack_high;
curr_thread->stack_threshold = stack_threshold;
curr_thread->sp = extern_sp;
curr_thread->trapsp = trapsp;
curr_thread->local_roots = local_roots;
curr_thread->external_raise = external_raise;
/* Release the global mutex */
pthread_mutex_unlock(&caml_mutex);
}
static void caml_thread_leave_blocking_section()
{
caml_thread_t curr_thread;
/* Re-acquire the global mutex */
pthread_mutex_lock(&caml_mutex);
/* Restore the stack-related global variables */
curr_thread = pthread_getspecific(thread_descriptor_key);
stack_low = curr_thread->stack_low;
stack_high = curr_thread->stack_high;
stack_threshold = curr_thread->stack_threshold;
extern_sp = curr_thread->sp;
trapsp = curr_thread->trapsp;
local_roots = curr_thread->local_roots;
external_raise = curr_thread->external_raise;
if (prev_leave_blocking_section_hook != NULL)
(*prev_leave_blocking_section_hook)();
}
/* The "tick" thread fakes a SIGVTALRM signal at regular intervals. */
static void * caml_thread_tick()
{
struct timeval timeout;
while(1) {
/* select() seems to be the most efficient way to suspend the
thread for sub-second intervals */
timeout.tv_sec = 0;
timeout.tv_usec = Thread_timeout;
select(0, NULL, NULL, NULL, &timeout);
/* This signal should never cause a callback, so don't go through
handle_signal(), tweak the global variables directly. */
pending_signal = SIGVTALRM;
something_to_do = 1;
}
}
/* Thread cleanup: remove the descriptor from the list and free
the stack space. */
static void caml_thread_cleanup(th)
caml_thread_t th;
{
/* Signal that the thread has terminated */
caml_mutex_unlock(th->terminated);
/* Remove th from the doubly-linked list of threads */
Assign(th->next->prev, th->prev);
Assign(th->prev->next, th->next);
/* Free the memory resources */
stat_free((char *) th->stack_low);
th->stack_low = NULL;
th->stack_high = NULL;
th->stack_threshold = NULL;
th->sp = NULL;
th->trapsp = NULL;
th->local_roots = NULL;
th->external_raise = NULL;
/* Release the main mutex */
pthread_mutex_unlock(&caml_mutex);
}
/* Initialize the thread machinery */
value caml_thread_initialize(unit) /* ML */
value unit;
{
pthread_t tick_pthread;
pthread_attr_t attr;
caml_thread_t th;
Push_roots(r, 1);
/* Initialize the main mutex */
if (pthread_mutex_init(&caml_mutex, NULL) != 0) sys_error("Thread.init");
pthread_mutex_lock(&caml_mutex);
/* Initialize the key */
pthread_key_create(&thread_descriptor_key, NULL);
/* Create and acquire a termination lock for the current thread */
r[0] = caml_mutex_new(Val_unit);
caml_mutex_lock(r[0]);
/* Create a descriptor for the current thread */
th = (caml_thread_t)
alloc_shr(sizeof(struct caml_thread_struct) / sizeof(value), 0);
th->pthread = pthread_self();
th->ident = Val_long(thread_next_ident);
th->terminated = r[0];
thread_next_ident++;
/* The stack-related fields will be filled in at the next
enter_blocking_section */
th->next = th;
th->prev = th;
/* Associate the thread descriptor with the thread */
pthread_setspecific(thread_descriptor_key, (void *) th);
/* Allow cancellation */
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
/* Set up the hooks */
prev_scan_roots_hook = scan_roots_hook;
scan_roots_hook = caml_thread_scan_roots;
prev_enter_blocking_section_hook = enter_blocking_section_hook;
enter_blocking_section_hook = caml_thread_enter_blocking_section;
prev_leave_blocking_section_hook = leave_blocking_section_hook;
leave_blocking_section_hook = caml_thread_leave_blocking_section;
/* Fork the tick thread */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if (pthread_create(&tick_pthread, &attr, caml_thread_tick, NULL) != 0)
sys_error("Thread.init");
pthread_detach(tick_pthread);
Pop_roots();
return Val_unit;
}
/* Create a thread */
static void * caml_thread_start(th)
caml_thread_t th;
{
value clos;
/* Associate the thread descriptor with the thread */
pthread_setspecific(thread_descriptor_key, (void *) th);
/* Set up termination routine */
pthread_cleanup_push(caml_thread_cleanup, (void *) th);
/* Allow cancellation */
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
/* Acquire the global mutex and set up the stack variables */
leave_blocking_section();
/* Callback the closure */
clos = *extern_sp++;
callback(clos, Val_unit);
/* Cleanup: free the thread resources and release the mutex */
pthread_cleanup_pop(1);
return 0;
}
value caml_thread_new(clos) /* ML */
value clos;
{
pthread_attr_t attr;
caml_thread_t th, curr_thread;
Push_roots(r, 1);
/* Create and acquire the termination lock */
r[0] = caml_mutex_new(Val_unit);
caml_mutex_lock(r[0]);
/* Allocate the thread and its stack */
th = (caml_thread_t)
alloc_shr(sizeof(struct caml_thread_struct) / sizeof(value), 0);
th->ident = Val_long(thread_next_ident);
thread_next_ident++;
th->terminated = r[0];
th->stack_low = (value *) stat_alloc(Thread_stack_size);
th->stack_high = th->stack_low + Thread_stack_size / sizeof(value);
th->stack_threshold = th->stack_low + Stack_threshold / sizeof(value);
th->sp = th->stack_high;
th->trapsp = th->stack_high;
th->local_roots = NULL;
th->external_raise = NULL;
/* Add it to the list of threads */
curr_thread = pthread_getspecific(thread_descriptor_key);
th->next = curr_thread->next;
th->prev = curr_thread;
Assign(curr_thread->next->prev, th);
Assign(curr_thread->next, th);
/* Pass the closure in the newly created stack, so that it will be
preserved by garbage collection */
*--(th->sp) = clos;
/* Fork the new thread */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if (pthread_create(&th->pthread, &attr, caml_thread_start, (void *) th) != 0)
sys_error("Thread.new");
Pop_roots();
return (value) th;
}
/* Return the current thread */
value caml_thread_self(unit) /* ML */
value unit;
{
caml_thread_t curr_thread;
curr_thread = pthread_getspecific(thread_descriptor_key);
return (value) curr_thread;
}
/* Return the identifier of a thread */
value caml_thread_id(th) /* ML */
caml_thread_t th;
{
return th->ident;
}
/* Allow re-scheduling */
value caml_thread_yield(unit) /* ML */
value unit;
{
enter_blocking_section();
#if defined(HAS_SCHED_YIELD)
sched_yield();
#elif defined(HAS_PTHREAD_YIELD)
pthread_yield();
#endif
leave_blocking_section();
return Val_unit;
}
/* Suspend the current thread until another thread terminates */
value caml_thread_join(th) /* ML */
caml_thread_t th;
{
caml_mutex_lock(th->terminated);
caml_mutex_unlock(th->terminated);
return Val_unit;
}
/* Terminate the current thread */
value caml_thread_exit(unit) /* ML */
value unit;
{
pthread_exit(0);
return Val_unit; /* never reached */
}
/* Kill another thread */
value caml_thread_kill(th) /* ML */
caml_thread_t th;
{
pthread_cancel(th->pthread);
return Val_unit;
}
/* Mutex operations */
#define Mutex_val(v) (*((pthread_mutex_t *)(&Field(v, 1))))
#define Max_mutex_number 1000
static void caml_mutex_finalize(mut)
value mut;
{
pthread_mutex_destroy(&Mutex_val(mut));
}
value caml_mutex_new(unit) /* ML */
value unit;
{
value mut;
mut = alloc_final(1 + sizeof(pthread_mutex_t) / sizeof(value),
caml_mutex_finalize, 1, Max_mutex_number);
if (pthread_mutex_init(&Mutex_val(mut), NULL) != 0)
sys_error("Mutex.new");
return mut;
}
value caml_mutex_lock(mut) /* ML */
value mut;
{
int retcode;
enter_blocking_section();
retcode = pthread_mutex_lock(&(Mutex_val(mut)));
leave_blocking_section();
if (retcode != 0) sys_error("Mutex.lock");
return Val_unit;
}
value caml_mutex_unlock(mut) /* ML */
value mut;
{
int retcode;
enter_blocking_section();
retcode = pthread_mutex_unlock(&(Mutex_val(mut)));
leave_blocking_section();
if (retcode != 0) sys_error("Mutex.unlock");
return Val_unit;
}
value caml_mutex_try_lock(mut) /* ML */
value mut;
{
int retcode;
retcode = pthread_mutex_trylock(&(Mutex_val(mut)));
return retcode == 0 ? Val_true : Val_false;
}
/* Conditions operations */
#define Condition_val(v) (*((pthread_cond_t *)(&Field(v, 1))))
#define Max_condition_number 1000
static void caml_condition_finalize(cond)
value cond;
{
pthread_cond_destroy(&Condition_val(cond));
}
value caml_condition_new(unit) /* ML */
value unit;
{
value cond;
cond = alloc_final(1 + sizeof(pthread_cond_t) / sizeof(value),
caml_condition_finalize, 1, Max_condition_number);
if (pthread_cond_init(&Condition_val(cond), NULL) != 0)
sys_error("Condition.new");
return cond;
}
value caml_condition_wait(cond, mut) /* ML */
value cond, mut;
{
int retcode;
enter_blocking_section();
retcode = pthread_cond_wait(&Condition_val(cond), &Mutex_val(mut));
leave_blocking_section();
if (retcode != 0) sys_error("Condition.wait");
return Val_unit;
}
value caml_condition_signal(cond) /* ML */
value cond;
{
int retcode;
enter_blocking_section();
retcode = pthread_cond_signal(&Condition_val(cond));
leave_blocking_section();
if (retcode != 0) sys_error("Condition.signal");
return Val_unit;
}
value caml_condition_broadcast(cond) /* ML */
value cond;
{
int retcode;
enter_blocking_section();
retcode = pthread_cond_broadcast(&Condition_val(cond));
leave_blocking_section();
if (retcode != 0) sys_error("Condition.broadcast");
return Val_unit;
}

View File

@ -13,8 +13,6 @@
(* User-level threads *)
open ThreadIO
type t
external thread_initialize : unit -> unit = "caml_thread_initialize"
@ -26,7 +24,6 @@ external id : t -> int = "caml_thread_id"
external exit : unit -> unit = "caml_thread_exit"
external join : t -> unit = "caml_thread_join"
external detach : t -> unit = "caml_thread_detach"
external delay: float -> unit = "caml_thread_delay"
external kill : t -> unit = "caml_thread_kill"
(* For new, make sure the function passed to thread_new never
@ -36,7 +33,7 @@ let create fn arg =
thread_new
(fun () ->
try
ThreadPrintexc.print fn arg; exit()
Printexc.print fn arg; exit()
with x ->
flush stdout; flush stderr; exit())
@ -46,13 +43,36 @@ let preempt signal = yield()
(* Initialization of the scheduler *)
#ifdef WIN32
#define PREEMPT_SIGNAL 1
#else
#define PREEMPT_SIGNAL Sys.sigvtalrm
#endif
let _ =
Sys.signal 1 (Sys.Signal_handle preempt);
Sys.signal PREEMPT_SIGNAL (Sys.Signal_handle preempt);
thread_initialize()
(* Wait functions *)
#ifdef WIN32
external delay: float -> unit = "caml_thread_delay"
#else
let delay time = Unix.select [] [] [] time; ()
#endif
let wait_read fd = ()
let wait_write fd = ()
#ifdef WIN32
let wait_timed_read fd delay = true
let wait_timed_write fd delay = true
#else
let wait_timed_read fd d =
match Unix.select [fd] [] [] d with ([], _, _) -> false | (_, _, _) -> true
let wait_timed_write fd d =
match Unix.select [] [fd] [] d with (_, [], _) -> false | (_, _, _) -> true
#endif
let wait_pid p = Unix.waitpid [] p