Merge pull request #9674 from jhjourdan/memprof_thread_alloc_callback_2

Memprof: provide the guarantee that an allocation callback is always run in the same thread the allocation takes place. Attempt II.
master
Jacques-Henri Jourdan 2020-10-20 10:29:56 +02:00 committed by GitHub
commit 0cb298f5e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 692 additions and 481 deletions

View File

@ -180,6 +180,10 @@ OCaml 4.12.0
(Enguerrand Decorne, KC Sivaramakrishnan, Xavier Leroy, Stephen Dolan,
David Allsopp, Nicolás Ojeda Bär review by Xavier Leroy, Nicolás Ojeda Bär)
* #9674: Memprof: guarantee that an allocation callback is always run
in the same thread the allocation takes place
(Jacques-Henri Jourdan, review by Stephen Dolan)
### Code generation and optimizations:
- #9551: ocamlc no longer loads DLLs at link time to check that

View File

@ -93,7 +93,7 @@ struct caml_thread_struct {
int backtrace_pos; /* Saved Caml_state->backtrace_pos */
backtrace_slot * backtrace_buffer; /* Saved Caml_state->backtrace_buffer */
value backtrace_last_exn; /* Saved Caml_state->backtrace_last_exn (root) */
struct caml_memprof_th_ctx memprof_ctx;
struct caml_memprof_th_ctx *memprof_ctx;
};
typedef struct caml_thread_struct * caml_thread_t;
@ -152,9 +152,7 @@ static void (*prev_scan_roots_hook) (scanning_action);
static void caml_thread_scan_roots(scanning_action action)
{
caml_thread_t th;
th = curr_thread;
caml_thread_t th = curr_thread;
do {
(*action)(th->descr, &th->descr);
(*action)(th->backtrace_last_exn, &th->backtrace_last_exn);
@ -174,6 +172,17 @@ static void caml_thread_scan_roots(scanning_action action)
if (prev_scan_roots_hook != NULL) (*prev_scan_roots_hook)(action);
}
/* Hook for iterating over Memprof's entries arrays */
static void memprof_ctx_iter(th_ctx_action f, void* data)
{
caml_thread_t th = curr_thread;
do {
f(th->memprof_ctx, data);
th = th->next;
} while (th != curr_thread);
}
/* Saving and restoring runtime state in curr_thread */
Caml_inline void caml_thread_save_runtime_state(void)
@ -196,7 +205,7 @@ Caml_inline void caml_thread_save_runtime_state(void)
curr_thread->backtrace_pos = Caml_state->backtrace_pos;
curr_thread->backtrace_buffer = Caml_state->backtrace_buffer;
curr_thread->backtrace_last_exn = Caml_state->backtrace_last_exn;
caml_memprof_save_th_ctx(&curr_thread->memprof_ctx);
caml_memprof_leave_thread();
}
Caml_inline void caml_thread_restore_runtime_state(void)
@ -219,7 +228,7 @@ Caml_inline void caml_thread_restore_runtime_state(void)
Caml_state->backtrace_pos = curr_thread->backtrace_pos;
Caml_state->backtrace_buffer = curr_thread->backtrace_buffer;
Caml_state->backtrace_last_exn = curr_thread->backtrace_last_exn;
caml_memprof_restore_th_ctx(&curr_thread->memprof_ctx);
caml_memprof_enter_thread(curr_thread->memprof_ctx);
}
/* Hooks for caml_enter_blocking_section and caml_leave_blocking_section */
@ -349,7 +358,7 @@ static caml_thread_t caml_thread_new_info(void)
th->backtrace_pos = 0;
th->backtrace_buffer = NULL;
th->backtrace_last_exn = Val_unit;
caml_memprof_init_th_ctx(&th->memprof_ctx);
th->memprof_ctx = caml_memprof_new_th_ctx();
return th;
}
@ -394,20 +403,15 @@ static void caml_thread_remove_info(caml_thread_t th)
static void caml_thread_reinitialize(void)
{
caml_thread_t thr, next;
struct channel * chan;
/* Remove all other threads (now nonexistent)
from the doubly-linked list of threads */
thr = curr_thread->next;
while (thr != curr_thread) {
next = thr->next;
caml_stat_free(thr);
thr = next;
while (curr_thread->next != curr_thread) {
caml_memprof_delete_th_ctx(curr_thread->next->memprof_ctx);
caml_thread_remove_info(curr_thread->next);
}
curr_thread->next = curr_thread;
curr_thread->prev = curr_thread;
all_threads = curr_thread;
/* Reinitialize the master lock machinery,
just in case the fork happened while other threads were doing
caml_leave_blocking_section */
@ -450,6 +454,7 @@ CAMLprim value caml_thread_initialize(value unit) /* ML */
#ifdef NATIVE_CODE
curr_thread->exit_buf = &caml_termination_jmpbuf;
#endif
curr_thread->memprof_ctx = &caml_memprof_main_ctx;
/* The stack-related fields will be filled in at the next
caml_enter_blocking_section */
/* Associate the thread descriptor with the thread */
@ -468,6 +473,7 @@ CAMLprim value caml_thread_initialize(value unit) /* ML */
caml_channel_mutex_unlock_exn = caml_io_mutex_unlock_exn;
prev_stack_usage_hook = caml_stack_usage_hook;
caml_stack_usage_hook = caml_thread_stack_usage;
caml_memprof_th_ctx_iter_hook = memprof_ctx_iter;
/* Set up fork() to reinitialize the thread machinery in the child
(PR#4577) */
st_atfork(caml_thread_reinitialize);
@ -499,7 +505,7 @@ static void caml_thread_stop(void)
below uses accurate information. */
caml_thread_save_runtime_state();
/* Tell memprof that this thread is terminating. */
caml_memprof_stop_th_ctx(&curr_thread->memprof_ctx);
caml_memprof_delete_th_ctx(curr_thread->memprof_ctx);
/* Signal that the thread has terminated */
caml_threadstatus_terminate(Terminated(curr_thread->descr));
/* Remove th from the doubly-linked list of threads and free its info block */

View File

@ -27,20 +27,27 @@ external yield : unit -> unit = "caml_thread_yield"
external self : unit -> t = "caml_thread_self" [@@noalloc]
external id : t -> int = "caml_thread_id" [@@noalloc]
external join : t -> unit = "caml_thread_join"
external exit : unit -> unit = "caml_thread_exit"
external exit_stub : unit -> unit = "caml_thread_exit"
(* For new, make sure the function passed to thread_new never
raises an exception. *)
let[@inline never] check_memprof_cb () = ref ()
let create fn arg =
thread_new
(fun () ->
try
fn arg; ()
fn arg;
ignore (Sys.opaque_identity (check_memprof_cb ()))
with exn ->
flush stdout; flush stderr;
thread_uncaught_exception exn)
let exit () =
ignore (Sys.opaque_identity (check_memprof_cb ()));
exit_stub ()
(* Thread.kill is currently not implemented due to problems with
cleanup handlers on several platforms *)

View File

@ -40,13 +40,15 @@ extern void caml_memprof_do_roots(scanning_action f);
extern void caml_memprof_update_clean_phase(void);
extern void caml_memprof_invert_tracked(void);
struct caml_memprof_th_ctx {
int suspended, callback_running;
};
CAMLextern void caml_memprof_init_th_ctx(struct caml_memprof_th_ctx*);
CAMLextern void caml_memprof_stop_th_ctx(struct caml_memprof_th_ctx*);
CAMLextern void caml_memprof_save_th_ctx(struct caml_memprof_th_ctx*);
CAMLextern void caml_memprof_restore_th_ctx(const struct caml_memprof_th_ctx*);
CAMLextern struct caml_memprof_th_ctx caml_memprof_main_ctx;
CAMLextern struct caml_memprof_th_ctx* caml_memprof_new_th_ctx(void);
CAMLextern void caml_memprof_leave_thread(void);
CAMLextern void caml_memprof_enter_thread(struct caml_memprof_th_ctx*);
CAMLextern void caml_memprof_delete_th_ctx(struct caml_memprof_th_ctx*);
typedef void (*th_ctx_action)(struct caml_memprof_th_ctx*, void*);
extern void (*caml_memprof_th_ctx_iter_hook)(th_ctx_action, void*);
#endif

File diff suppressed because it is too large Load Diff

View File

@ -454,8 +454,7 @@ void caml_empty_minor_heap (void)
extern uintnat caml_instr_alloc_jump;
#endif /*CAML_INSTR*/
/* Do a minor collection or a slice of major collection, call finalisation
functions, etc.
/* Do a minor collection or a slice of major collection, etc.
Leave enough room in the minor heap to allocate at least one object.
Guaranteed not to call any OCaml callback.
*/

View File

@ -28,7 +28,6 @@
#endif
#include "caml/osdeps.h"
#include "caml/startup_aux.h"
#include "caml/memprof.h"
#ifdef _WIN32

View File

@ -494,6 +494,9 @@ module Memprof :
to keep for minor blocks, and ['major] the type of metadata
for major blocks.
When using threads, it is guaranteed that allocation callbacks are
always run in the thread where the allocation takes place.
If an allocation-tracking or promotion-tracking function returns [None],
memprof stops tracking the corresponding value.
*)
@ -528,19 +531,15 @@ module Memprof :
Note that the callback can be postponed slightly after the
actual event. The callstack passed to the callback is always
accurate, but the program state may have evolved.
Calling [Thread.exit] in a callback is currently unsafe and can
result in undefined behavior. *)
accurate, but the program state may have evolved. *)
val stop : unit -> unit
(** Stop the sampling. Fails if sampling is not active.
This function does not allocate memory, but tries to run the
postponed callbacks for already allocated memory blocks (of
course, these callbacks may allocate).
This function does not allocate memory.
All the already tracked blocks are discarded.
All the already tracked blocks are discarded. If there are
pending postponed callbacks, they may be discarded.
Calling [stop] when a callback is running can lead to
callbacks not being called even though some events happened. *)

View File

@ -12,4 +12,4 @@ Raised by primitive operation at Callstack.f0 in file "callstack.ml", line 11, c
Called from Callstack.f1 in file "callstack.ml", line 12, characters 27-32
Called from Callstack.f2 in file "callstack.ml", line 13, characters 27-32
Called from Callstack.f3 in file "callstack.ml", line 14, characters 27-32
Called from Thread.create.(fun) in file "thread.ml", line 39, characters 8-14
Called from Thread.create.(fun) in file "thread.ml", line 41, characters 8-14

View File

@ -6,8 +6,7 @@ include systhreads
*)
let cnt = ref 0
let alloc_num = ref 0
let alloc_tot = 100000
let alloc_thread = 50000
let (rd1, wr1) = Unix.pipe ()
let (rd2, wr2) = Unix.pipe ()
@ -15,20 +14,26 @@ let (rd2, wr2) = Unix.pipe ()
let main_thread = Thread.self ()
let cb_main = ref 0 and cb_other = ref 0
let stopped = ref false
let minor_alloc_callback _ =
let alloc_callback alloc =
if !stopped then
None
else begin
let do_stop = !cb_main + !cb_other >= alloc_tot in
if do_stop then stopped := true;
let t = Thread.self () in
if t == main_thread then begin
assert (alloc.Gc.Memprof.size < 10 || alloc.Gc.Memprof.size mod 2 = 0);
let do_stop = !cb_main >= alloc_thread in
if do_stop then stopped := true;
incr cb_main;
assert (Unix.write wr2 (Bytes.make 1 'a') 0 1 = 1);
if not do_stop then
assert (Unix.read rd1 (Bytes.make 1 'a') 0 1 = 1)
end else begin
assert (alloc.Gc.Memprof.size < 10 || alloc.Gc.Memprof.size mod 2 = 1);
let do_stop = !cb_other >= alloc_thread in
if do_stop then stopped := true;
incr cb_other;
assert (Unix.write wr1 (Bytes.make 1 'a') 0 1 = 1);
if not do_stop then
assert (Unix.read rd2 (Bytes.make 1 'a') 0 1 = 1)
@ -39,31 +44,34 @@ let minor_alloc_callback _ =
let mut = Mutex.create ()
let () = Mutex.lock mut
let rec go () =
let rec go alloc_num tid =
Mutex.lock mut;
Mutex.unlock mut;
if !alloc_num < alloc_tot then begin
alloc_num := !alloc_num + 1;
Sys.opaque_identity (Bytes.make (Random.int 300) 'a') |> ignore;
go ()
if alloc_num < alloc_thread then begin
let len = 2 * (Random.int 200 + 1) + tid in
Sys.opaque_identity (Array.make len 0) |> ignore;
go (alloc_num + 1) tid
end else begin
cnt := !cnt + 1;
if !cnt < 2 then begin
Gc.minor (); (* check for callbacks *)
Thread.yield ();
go ()
go alloc_num tid
end else begin
Gc.minor () (* check for callbacks *)
end
end
let () =
let t = Thread.create go () in
let t = Thread.create (fun () -> go 0 1) () in
Gc.Memprof.(start ~callstack_size:10 ~sampling_rate:1.
{ null_tracker with alloc_minor = minor_alloc_callback; });
{ null_tracker with
alloc_minor = alloc_callback;
alloc_major = alloc_callback });
Mutex.unlock mut;
go ();
go 0 0;
Thread.join t;
Gc.Memprof.stop ();
assert (abs (!cb_main - !cb_other) <= 1);
assert (!cb_main + !cb_other >= alloc_tot)
assert (!cb_main >= alloc_thread);
assert (!cb_other >= alloc_thread);
assert (abs (!cb_main - !cb_other) <= 1)

View File

@ -16,6 +16,11 @@ let alloc_tracker on_alloc =
its uncaught exception handler. *)
let _ = Printexc.record_backtrace false
let () =
start ~callstack_size:10 ~sampling_rate:1.
(alloc_tracker (fun _ -> stop ()));
ignore (Sys.opaque_identity (Array.make 200 0))
let _ =
start ~callstack_size:10 ~sampling_rate:1.
(alloc_tracker (fun _ -> failwith "callback failed"));

View File

@ -32,5 +32,6 @@ let () =
ignore (Sys.opaque_identity (alloc_stub ()));
assert(not !callback_done);
callback_ok := true;
stop ();
assert(!callback_done)
ignore (Sys.opaque_identity (ref ()));
assert(!callback_done);
stop ()

View File

@ -0,0 +1,76 @@
(* TEST
* hassysthreads
include systhreads
** bytecode
** native
*)
let t2_begin = Atomic.make false
let t2_promoting = Atomic.make false
let t2_finish_promote = Atomic.make false
let t2_done = Atomic.make false
let t2_quit = Atomic.make false
let await a =
while not (Atomic.get a) do Thread.yield () done
let set a =
Atomic.set a true
(* no-alloc printing to stdout *)
let say msg =
Unix.write Unix.stdout (Bytes.unsafe_of_string msg) 0 (String.length msg) |> ignore
let static_ref = ref 0
let global = ref static_ref
let thread_fn () =
await t2_begin;
say "T2: alloc\n";
let r = ref 0 in
global := r;
say "T2: minor GC\n";
Gc.minor ();
global := static_ref;
say "T2: done\n";
set t2_done;
await t2_quit
let big = ref [| |]
let fill_big () = big := Array.make 1000 42
[@@inline never] (* Prevent flambda to move the allocated array in a global
root (see #9978). *)
let empty_big () = big := [| |]
[@@inline never]
let () =
let th = Thread.create thread_fn () in
Gc.Memprof.(start ~sampling_rate:1.
{ null_tracker with
alloc_minor = (fun _ ->
say " minor alloc\n";
Some ());
alloc_major = (fun _ ->
say " major alloc\n";
Some "major block\n");
promote = (fun () ->
say " promoting...\n";
set t2_promoting;
await t2_finish_promote;
say " ...done promoting\n";
Some "promoted block\n");
dealloc_major = (fun msg ->
say " major dealloc: "; say msg) });
say "T1: alloc\n";
fill_big ();
set t2_begin;
await t2_promoting;
say "T1: major GC\n";
empty_big ();
Gc.full_major ();
set t2_finish_promote;
await t2_done;
say "T1: major GC\n";
Gc.full_major ();
say "T1: done\n";
Gc.Memprof.stop ();
set t2_quit;
Thread.join th

View File

@ -0,0 +1,13 @@
T1: alloc
major alloc
T2: alloc
minor alloc
T2: minor GC
promoting...
T1: major GC
major dealloc: major block
...done promoting
T2: done
T1: major GC
major dealloc: promoted block
T1: done

View File

@ -1,18 +1,26 @@
(* TEST
modules = "thread_exit_in_callback_stub.c"
exit_status = "42"
* hassysthreads
include systhreads
** bytecode
** native
*)
(* We cannot tell Ocamltest that this program is supposed to stop with
a fatal error. Instead, we install a fatal error hook and call exit(42) *)
external install_fatal_error_hook : unit -> unit = "install_fatal_error_hook"
let _ =
let main_thread = Thread.id (Thread.self ()) in
Gc.Memprof.(start ~callstack_size:10 ~sampling_rate:1.
{ null_tracker with alloc_minor = fun _ ->
if Thread.id (Thread.self ()) <> main_thread then
Thread.exit ();
None });
let t = Thread.create (fun () ->
ignore (Sys.opaque_identity (ref 1));
assert false) ()
in
Thread.join t;
Gc.Memprof.stop ()
let _ =
install_fatal_error_hook ();
Gc.Memprof.(start ~callstack_size:10 ~sampling_rate:1.
{ null_tracker with alloc_minor = fun _ -> Thread.exit (); None });
ignore (Sys.opaque_identity (ref 1))
ignore (Sys.opaque_identity (ref 1));
assert false

View File

@ -1 +0,0 @@
Fatal error hook: Thread.exit called from a memprof callback.

View File

@ -1,16 +0,0 @@
#include <stdio.h>
#include "caml/misc.h"
#include "caml/mlvalues.h"
void fatal_error_hook_exit_3 (char *msg, va_list args) {
fprintf(stderr, "Fatal error hook: ");
vfprintf(stderr, msg, args);
fprintf(stderr, "\n");
exit(42);
}
value install_fatal_error_hook (value unit) {
caml_fatal_error_hook = fatal_error_hook_exit_3;
return Val_unit;
}