342 lines
9.5 KiB
OCaml
342 lines
9.5 KiB
OCaml
(***********************************************************************)
|
|
(* ocamlbuild *)
|
|
(* *)
|
|
(* Nicolas Pouillard, Berke Durak, projet Gallium, INRIA Rocquencourt *)
|
|
(* *)
|
|
(* Copyright 2007 Institut National de Recherche en Informatique et *)
|
|
(* en Automatique. All rights reserved. This file is distributed *)
|
|
(* under the terms of the Q Public License version 1.0. *)
|
|
(* *)
|
|
(***********************************************************************)
|
|
|
|
(* $Id$ *)
|
|
(* Original author: Berke Durak *)
|
|
(* Executor *)
|
|
|
|
open Unix;;
|
|
|
|
module Exit_codes = struct
|
|
let rc_subcommand_failed = 10
|
|
let rc_subcommand_got_signal = 11
|
|
let rc_io_error = 12
|
|
let rc_exceptional_condition = 13
|
|
end;;
|
|
|
|
type task = (string * (unit -> unit));;
|
|
|
|
type job = {
|
|
job_id : int * int;
|
|
job_command : string;
|
|
job_next : (string * (unit -> unit)) list;
|
|
job_result : bool ref; (* Result of this sequence group *)
|
|
job_stdout : in_channel;
|
|
job_stdin : out_channel;
|
|
job_stderr : in_channel;
|
|
job_buffer : Buffer.t;
|
|
mutable job_dying : bool;
|
|
};;
|
|
|
|
module JS = Set.Make(struct type t = job let compare = compare end);;
|
|
module FDM = Map.Make(struct type t = file_descr let compare = compare end);;
|
|
|
|
let sf = Printf.sprintf;;
|
|
let fp = Printf.fprintf;;
|
|
|
|
(*** print_unix_status *)
|
|
(* FIXME never called *)
|
|
let print_unix_status oc = function
|
|
| WEXITED x -> fp oc "exit %d" x
|
|
| WSIGNALED i -> fp oc "signal %d" i
|
|
| WSTOPPED i -> fp oc "stop %d" i
|
|
;;
|
|
(* ***)
|
|
(*** exit *)
|
|
let exit rc =
|
|
raise (Ocamlbuild_pack.My_std.Exit_with_code rc)
|
|
;;
|
|
(* ***)
|
|
(*** print_job_id *)
|
|
let print_job_id oc (x,y) = fp oc "%d.%d" x y;;
|
|
(* ***)
|
|
(*** output_lines *)
|
|
let output_lines prefix oc buffer =
|
|
let u = Buffer.contents buffer in
|
|
let m = String.length u in
|
|
let output_line i j =
|
|
output_string oc prefix;
|
|
output oc u i (j - i);
|
|
output_char oc '\n'
|
|
in
|
|
let rec loop i =
|
|
if i = m then
|
|
()
|
|
else
|
|
begin
|
|
try
|
|
let j = String.index_from u i '\n' in
|
|
output_line i j;
|
|
loop (j + 1)
|
|
with
|
|
| Not_found ->
|
|
output_line i m
|
|
end
|
|
in
|
|
loop 0
|
|
;;
|
|
(* ***)
|
|
(*** execute *)
|
|
(* XXX: Add test for non reentrancy *)
|
|
let execute
|
|
?(max_jobs=max_int)
|
|
?(ticker=ignore)
|
|
?(period=0.1)
|
|
?(display=(fun f -> f Pervasives.stdout))
|
|
(commands : task list list)
|
|
=
|
|
let batch_id = ref 0 in
|
|
let env = environment () in
|
|
let jobs = ref JS.empty in
|
|
let jobs_active = ref 0 in
|
|
let jobs_to_terminate = Queue.create () in
|
|
let commands_to_execute = Queue.create () in
|
|
let all_ok = ref true in
|
|
let results =
|
|
List.map (fun tasks ->
|
|
let result = ref false in
|
|
Queue.add (tasks, result) commands_to_execute;
|
|
result)
|
|
commands
|
|
in
|
|
let outputs = ref FDM.empty in
|
|
let doi = descr_of_in_channel in
|
|
let doo = descr_of_out_channel in
|
|
(*** compute_fds *)
|
|
let compute_fds =
|
|
let fds = ref ([], [], []) in
|
|
let prev_jobs = ref JS.empty in
|
|
fun () ->
|
|
if not (!prev_jobs == !jobs) then
|
|
begin
|
|
prev_jobs := !jobs;
|
|
fds :=
|
|
JS.fold
|
|
begin fun job (rfds, wfds, xfds) ->
|
|
let ofd = doi job.job_stdout
|
|
and ifd = doo job.job_stdin
|
|
and efd = doi job.job_stderr
|
|
in
|
|
(ofd :: efd :: rfds, wfds, ofd :: ifd :: efd :: xfds)
|
|
end
|
|
!jobs
|
|
([], [], [])
|
|
end;
|
|
!fds
|
|
in
|
|
(* ***)
|
|
(*** add_job *)
|
|
let add_job (cmd, action) rest result id =
|
|
(*display begin fun oc -> fp oc "Job %a is %s\n%!" print_job_id id cmd; end;*)
|
|
action ();
|
|
let (stdout', stdin', stderr') = open_process_full cmd env in
|
|
incr jobs_active;
|
|
set_nonblock (doi stdout');
|
|
set_nonblock (doi stderr');
|
|
let job =
|
|
{ job_id = id;
|
|
job_command = cmd;
|
|
job_next = rest;
|
|
job_result = result;
|
|
job_stdout = stdout';
|
|
job_stdin = stdin';
|
|
job_stderr = stderr';
|
|
job_buffer = Buffer.create 1024;
|
|
job_dying = false }
|
|
in
|
|
outputs := FDM.add (doi stdout') job (FDM.add (doi stderr') job !outputs);
|
|
jobs := JS.add job !jobs;
|
|
in
|
|
(* ***)
|
|
(*** add_some_jobs *)
|
|
let add_some_jobs () =
|
|
let (tasks, result) = Queue.take commands_to_execute in
|
|
match tasks with
|
|
| [] -> result := false
|
|
| task :: rest ->
|
|
let b_id = !batch_id in
|
|
incr batch_id;
|
|
add_job task rest result (b_id, 0)
|
|
in
|
|
(* ***)
|
|
(*** terminate *)
|
|
let terminate ?(continue=true) job =
|
|
if not job.job_dying then
|
|
begin
|
|
job.job_dying <- true;
|
|
Queue.add (job, continue) jobs_to_terminate
|
|
end
|
|
else
|
|
()
|
|
in
|
|
(* ***)
|
|
(*** add_more_jobs_if_possible *)
|
|
let add_more_jobs_if_possible () =
|
|
while !jobs_active < max_jobs && not (Queue.is_empty commands_to_execute) do
|
|
add_some_jobs ()
|
|
done
|
|
in
|
|
(* ***)
|
|
(*** do_read *)
|
|
let do_read =
|
|
let u = String.create 4096 in
|
|
fun ?(loop=false) fd job ->
|
|
(*if job.job_dying then
|
|
()
|
|
else*)
|
|
try
|
|
let rec iteration () =
|
|
let m =
|
|
try
|
|
read fd u 0 (String.length u)
|
|
with
|
|
| Unix.Unix_error(_,_,_) -> 0
|
|
in
|
|
if m = 0 then
|
|
if job.job_dying then
|
|
()
|
|
else
|
|
terminate job
|
|
else
|
|
begin
|
|
Buffer.add_substring job.job_buffer u 0 m;
|
|
if loop then
|
|
iteration ()
|
|
else
|
|
()
|
|
end
|
|
in
|
|
iteration ()
|
|
with
|
|
| x ->
|
|
display
|
|
begin fun oc ->
|
|
fp oc "Exception %s while reading output of command %S\n%!" job.job_command
|
|
(Printexc.to_string x);
|
|
end;
|
|
exit Exit_codes.rc_io_error
|
|
in
|
|
(* ***)
|
|
(*** process_jobs_to_terminate *)
|
|
let process_jobs_to_terminate () =
|
|
while not (Queue.is_empty jobs_to_terminate) do
|
|
ticker ();
|
|
let (job, continue) = Queue.take jobs_to_terminate in
|
|
|
|
(*display begin fun oc -> fp oc "Terminating job %a\n%!" print_job_id job.job_id; end;*)
|
|
|
|
decr jobs_active;
|
|
do_read ~loop:true (doi job.job_stdout) job;
|
|
do_read ~loop:true (doi job.job_stderr) job;
|
|
outputs := FDM.remove (doi job.job_stdout) (FDM.remove (doi job.job_stderr) !outputs);
|
|
jobs := JS.remove job !jobs;
|
|
let status = close_process_full (job.job_stdout, job.job_stdin, job.job_stderr) in
|
|
|
|
let shown = ref false in
|
|
|
|
let show_command () =
|
|
if !shown then
|
|
()
|
|
else
|
|
display
|
|
begin fun oc ->
|
|
shown := true;
|
|
fp oc "+ %s\n" job.job_command;
|
|
output_lines "" oc job.job_buffer
|
|
end
|
|
in
|
|
if Buffer.length job.job_buffer > 0 then show_command ();
|
|
begin
|
|
match status with
|
|
| Unix.WEXITED 0 ->
|
|
begin
|
|
if continue then
|
|
begin
|
|
match job.job_next with
|
|
| [] -> job.job_result := true
|
|
| task :: rest ->
|
|
let (b_id, s_id) = job.job_id in
|
|
add_job task rest job.job_result (b_id, s_id + 1)
|
|
end
|
|
else
|
|
all_ok := false;
|
|
end
|
|
| Unix.WEXITED rc ->
|
|
show_command ();
|
|
display (fun oc -> fp oc "Command exited with code %d.\n" rc);
|
|
all_ok := false;
|
|
exit Exit_codes.rc_subcommand_failed
|
|
| Unix.WSTOPPED s | Unix.WSIGNALED s ->
|
|
show_command ();
|
|
all_ok := false;
|
|
display (fun oc -> fp oc "Command got signal %d.\n" s);
|
|
exit Exit_codes.rc_subcommand_got_signal
|
|
end
|
|
done
|
|
in
|
|
(* ***)
|
|
(*** terminate_all_jobs *)
|
|
let terminate_all_jobs () =
|
|
JS.iter (terminate ~continue:false) !jobs
|
|
in
|
|
(* ***)
|
|
(*** loop *)
|
|
let rec loop () =
|
|
(*display (fun oc -> fp oc "Total %d jobs\n" !jobs_active);*)
|
|
process_jobs_to_terminate ();
|
|
add_more_jobs_if_possible ();
|
|
if JS.is_empty !jobs then
|
|
()
|
|
else
|
|
begin
|
|
let (rfds, wfds, xfds) = compute_fds () in
|
|
ticker ();
|
|
let (chrfds, chwfds, chxfds) = select rfds wfds xfds period in
|
|
List.iter
|
|
begin fun (fdlist, hook) ->
|
|
List.iter
|
|
begin fun fd ->
|
|
try
|
|
let job = FDM.find fd !outputs in
|
|
ticker ();
|
|
hook fd job
|
|
with
|
|
| Not_found -> () (* XXX *)
|
|
end
|
|
fdlist
|
|
end
|
|
[chrfds, do_read ~loop:false;
|
|
chwfds, (fun _ _ -> ());
|
|
chxfds,
|
|
begin fun _ _job ->
|
|
(*display (fun oc -> fp oc "Exceptional condition on command %S\n%!" job.job_command);
|
|
exit Exit_codes.rc_exceptional_condition*)
|
|
() (* FIXME *)
|
|
end];
|
|
loop ()
|
|
end
|
|
in
|
|
try
|
|
loop ();
|
|
None
|
|
with
|
|
| x ->
|
|
begin
|
|
try
|
|
terminate_all_jobs ()
|
|
with
|
|
| x' ->
|
|
display (fun oc -> fp oc "Extra exception %s\n%!" (Printexc.to_string x'))
|
|
end;
|
|
Some(List.map (!) results, x)
|
|
;;
|
|
(* ***)
|