Skip to content

Commit

Permalink
Rewrite Delay module
Browse files Browse the repository at this point in the history
Old implementation had different issues.
Just use mutexes and conditions using C stubs.
Current Ocaml Condition module does not have support for timeout waiting
for conditions, so use C stubs.
This implementation does not require opening a pipe.
This reduces the possibilities of errors calling "wait" to zero.
Mostly of the times it does not require kernel calls.

Signed-off-by: Frediano Ziglio <[email protected]>
  • Loading branch information
freddy77 committed Dec 5, 2024
1 parent 9998658 commit 92114cd
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 61 deletions.
169 changes: 169 additions & 0 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/delay_stubs.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright (C) 2024 Cloud Software Group
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*/

#include <caml/memory.h>
#include <caml/threads.h>
#include <caml/custom.h>
#include <caml/unixsupport.h>

#include <stdbool.h>
#include <errno.h>
#include <pthread.h>

typedef struct delay {
pthread_mutex_t mtx;
pthread_cond_t cond;
bool signaled;
} delay;

// Initialize delay
// Returns error number or 0 if success
static int delay_init(delay *d)
{
int err;
pthread_condattr_t cond_attr;

d->signaled = false;

err = pthread_condattr_init(&cond_attr);
if (err)
goto err0;
err = pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC);
if (!err)
err = pthread_cond_init(&d->cond, &cond_attr);
if (err)
goto err1;
err = pthread_mutex_init(&d->mtx, NULL);
if (err)
goto err2;
pthread_condattr_destroy(&cond_attr);
return 0;

err2:
pthread_cond_destroy(&d->cond);
err1:
pthread_condattr_destroy(&cond_attr);
err0:
return err;
}

static void delay_destroy(delay *d)
{
pthread_cond_destroy(&d->cond);
pthread_mutex_destroy(&d->mtx);
}

static void delay_signal(delay *d)
{
// there are quite some chances lock is not held
if (pthread_mutex_trylock(&d->mtx) == 0) {
d->signaled = true;
pthread_cond_signal(&d->cond);
pthread_mutex_unlock(&d->mtx);
return;
}

// slow way, release engine
caml_release_runtime_system();
pthread_mutex_lock(&d->mtx);
d->signaled = true;
pthread_cond_signal(&d->cond);
pthread_mutex_unlock(&d->mtx);
caml_acquire_runtime_system();
}

// Wait for deadline or signal.
// Returns error number or 0 if success.
// Error can be ETIMEDOUT.
int delay_wait(delay *d, const struct timespec *deadline)
{
int err;

caml_release_runtime_system();
pthread_mutex_lock(&d->mtx);
do {
if (d->signaled) {
d->signaled = false;
err = 0;
break;
}
err = pthread_cond_timedwait(&d->cond, &d->mtx, deadline);
} while (err == 0);
pthread_mutex_unlock(&d->mtx);
caml_acquire_runtime_system();
return err;
}

#define delay_val(v) (*((delay **)Data_custom_val(v)))

static void delay_finalize(value v_delay)
{
delay *d = delay_val(v_delay);
delay_destroy(d);
caml_stat_free(d);
}

static struct custom_operations delay_ops = {
"xapi.delay",
delay_finalize,
custom_compare_default,
custom_hash_default,
custom_serialize_default,
custom_deserialize_default,
custom_compare_ext_default,
custom_fixed_length_default
};

CAMLprim value caml_xapi_delay_create(value v_unit)
{
CAMLparam1(v_unit);
CAMLlocal1(res);
delay *d;
int err;

d = caml_stat_alloc(sizeof(*d));
err = delay_init(d);
if (err) {
caml_stat_free(d);
unix_error(err, "caml_delay_create", Nothing);
}
res = caml_alloc_custom(&delay_ops, sizeof(delay *), 0, 1);
delay_val(res) = d;
CAMLreturn(res);
}

CAMLprim value caml_xapi_delay_signal(value v_delay)
{
CAMLparam1(v_delay);
delay *d = delay_val(v_delay);
delay_signal(d);
CAMLreturn(Val_unit);
}

CAMLprim value caml_xapi_delay_wait(value v_delay, value v_deadline)
{
CAMLparam2(v_delay, v_deadline);
delay *d = delay_val(v_delay);
uint64_t deadline = (uint64_t) Int64_val(v_deadline);
struct timespec ts = {
deadline / 1000000000u,
deadline % 1000000000u
};

int err = delay_wait(d, &ts);
if (err != 0 && err != ETIMEDOUT)
unix_error(err, "caml_delay_wait", Nothing);

CAMLreturn(err ? Val_true : Val_false);
}
5 changes: 5 additions & 0 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@
(name xapi_stdext_threads)
(modules :standard \ ipq scheduler threadext_test ipq_test)
(libraries
mtime
mtime.clock.os
threads.posix
unix
xapi-stdext-unix
xapi-stdext-pervasives)
(foreign_stubs
(language c)
(names delay_stubs))
)

(library
Expand Down
76 changes: 17 additions & 59 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml
Original file line number Diff line number Diff line change
Expand Up @@ -55,69 +55,27 @@ let thread_iter f xs =
match thread_iter_all_exns f xs with [] -> () | (_, e) :: _ -> raise e

module Delay = struct
(* Concrete type is the ends of a pipe *)
type t = {
(* A pipe is used to wake up a thread blocked in wait: *)
mutable pipe_in: Unix.file_descr option
; (* Indicates that a signal arrived before a wait: *)
mutable signalled: bool
; m: M.t
}
type t

let make () = {pipe_in= None; signalled= false; m= M.create ()}
external make : unit -> t = "caml_xapi_delay_create"

exception Pre_signalled
external signal : t -> unit = "caml_xapi_delay_signal"

let wait (x : t) (seconds : float) =
let to_close = ref [] in
let close' fd =
if List.mem fd !to_close then Unix.close fd ;
to_close := List.filter (fun x -> fd <> x) !to_close
in
finally
(fun () ->
try
let pipe_out =
Mutex.execute x.m (fun () ->
if x.signalled then (
x.signalled <- false ;
raise Pre_signalled
) ;
let pipe_out, pipe_in = Unix.pipe () in
(* these will be unconditionally closed on exit *)
to_close := [pipe_out; pipe_in] ;
x.pipe_in <- Some pipe_in ;
x.signalled <- false ;
pipe_out
)
in
let open Xapi_stdext_unix.Unixext in
(* flush the single byte from the pipe *)
try
let (_ : string) =
time_limited_single_read pipe_out 1 ~max_wait:seconds
in
false
with Timeout -> true
(* return true if we waited the full length of time, false if we were woken *)
with Pre_signalled -> false
)
(fun () ->
Mutex.execute x.m (fun () ->
x.pipe_in <- None ;
List.iter close' !to_close
)
)
external wait : t -> int64 -> bool = "caml_xapi_delay_wait"

let signal (x : t) =
Mutex.execute x.m (fun () ->
match x.pipe_in with
| Some fd ->
ignore (Unix.write fd (Bytes.of_string "X") 0 1)
| None ->
x.signalled <- true
(* If the wait hasn't happened yet then store up the signal *)
)
let wait d t =
if t <= 0. then
true
else
match Mtime.Span.of_float_ns (t *. 1e9) with
| Some span ->
let now = Mtime_clock.now () in
let deadline =
Mtime.add_span now span |> Option.value ~default:Mtime.max_stamp
in
wait d (Mtime.to_uint64_ns deadline)
| None ->
invalid_arg "Time specified too big"
end

let wait_timed_read fd timeout =
Expand Down
6 changes: 4 additions & 2 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ module Delay : sig
val wait : t -> float -> bool
(** Blocks the calling thread for a given period of time with the option of
returning early if someone calls 'signal'. Returns true if the full time
period elapsed and false if signalled. Note that multple 'signals' are
coalesced; 'signals' sent before 'wait' is called are not lost. *)
period elapsed and false if signalled. Note that multiple 'signals' are
coalesced; 'signals' sent before 'wait' is called are not lost.
Only one thread should call 'wait' for a given 'Delay', attempts
to call from multiple thread is an undefined behaviour. *)

val signal : t -> unit
(** Sends a signal to a waiting thread. See 'wait' *)
Expand Down

0 comments on commit 92114cd

Please sign in to comment.