Skip to content

Commit

Permalink
New MPMC queue using cooperative pointer reversal
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Feb 7, 2025
1 parent 1666b8c commit 3036027
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 192 deletions.
279 changes: 88 additions & 191 deletions lib/picos_aux.mpmcq/picos_aux_mpmcq.ml
Original file line number Diff line number Diff line change
@@ -1,208 +1,105 @@
module Atomic = Multicore_magic.Transparent_atomic

type 'a t = { head : 'a head Atomic.t; tail : 'a tail Atomic.t }
type 'a node = { mutable next : 'a node; counter : int; mutable value : 'a }
type 'a t = { head : 'a node Atomic.t; tail : 'a node Atomic.t }

and ('a, _) tdt =
| Cons : {
counter : int;
value : 'a;
suffix : 'a head;
}
-> ('a, [> `Cons ]) tdt
| Head : { counter : int } -> ('a, [> `Head ]) tdt
| Snoc : {
counter : int;
prefix : 'a tail;
value : 'a;
}
-> ('a, [> `Snoc ]) tdt
| Tail : {
counter : int;
mutable move : ('a, [ `Snoc | `Used ]) tdt;
}
-> ('a, [> `Tail ]) tdt
| Used : ('a, [> `Used ]) tdt
let[@inline] maybe_fix tail =
let mystery = tail.next in
if mystery.counter = tail.counter - 1 then
let prev = mystery in
if prev.next != tail then prev.next <- tail

and 'a head = H : ('a, [< `Cons | `Head ]) tdt -> 'a head [@@unboxed]
and 'a tail = T : ('a, [< `Snoc | `Tail ]) tdt -> 'a tail [@@unboxed]

let create ?padded () =
let head =
Atomic.make (H (Head { counter = 1 })) |> Multicore_magic.copy_as ?padded
in
let tail =
Atomic.make (T (Tail { counter = 0; move = Used }))
|> Multicore_magic.copy_as ?padded
in
Multicore_magic.copy_as ?padded { head; tail }

let rec rev (suffix : (_, [< `Cons ]) tdt) = function
| T (Snoc { counter; prefix; value }) ->
rev (Cons { counter; value; suffix = H suffix }) prefix
| T (Tail _) -> suffix

let rev = function
| (Snoc { counter; prefix; value } : (_, [< `Snoc ]) tdt) ->
rev
(Cons { counter; value; suffix = H (Head { counter = counter + 1 }) })
prefix

let rec push t value backoff = function
| T (Snoc snoc_r) as prefix ->
let after = Snoc { counter = snoc_r.counter + 1; prefix; value } in
if not (Atomic.compare_and_set t.tail prefix (T after)) then
let backoff = Backoff.once backoff in
push t value backoff (Atomic.fenceless_get t.tail)
| T (Tail tail_r) as prefix -> begin
match tail_r.move with
| Used ->
let after = Snoc { counter = tail_r.counter + 1; prefix; value } in
if not (Atomic.compare_and_set t.tail prefix (T after)) then
let backoff = Backoff.once backoff in
push t value backoff (Atomic.fenceless_get t.tail)
| Snoc move_r as move ->
begin
match Atomic.get t.head with
| H (Head head_r as head) when head_r.counter < move_r.counter ->
let after = rev move in
if
Atomic.fenceless_get t.head == H head
&& Atomic.compare_and_set t.head (H head) (H after)
then tail_r.move <- Used
| _ -> tail_r.move <- Used
end;
push t value backoff (Atomic.get t.tail)
end
let rec push t value backoff =
let tail = Atomic.get t.tail in
maybe_fix tail;
let new_tail = { next = tail; value; counter = tail.counter + 1 } in
if Atomic.compare_and_set t.tail tail new_tail then tail.next <- new_tail
else push t value (Backoff.once backoff)

exception Empty

let rec pop t backoff = function
| H (Cons cons_r as cons) ->
if Atomic.compare_and_set t.head (H cons) cons_r.suffix then cons_r.value
else
let backoff = Backoff.once backoff in
pop t backoff (Atomic.fenceless_get t.head)
| H (Head head_r as head) -> begin
match Atomic.get t.tail with
| T (Snoc snoc_r as move) ->
if head_r.counter = snoc_r.counter then
if Atomic.compare_and_set t.tail (T move) snoc_r.prefix then
snoc_r.value
else pop t backoff (Atomic.fenceless_get t.head)
else
let (Tail tail_r as tail : (_, [ `Tail ]) tdt) =
Tail { counter = snoc_r.counter; move }
in
let new_head = Atomic.get t.head in
if new_head != H head then pop t backoff new_head
else if Atomic.compare_and_set t.tail (T move) (T tail) then
let (Cons cons_r) = rev move in
let after = cons_r.suffix in
let new_head = Atomic.get t.head in
if new_head != H head then pop t backoff new_head
else if Atomic.compare_and_set t.head (H head) after then begin
tail_r.move <- Used;
cons_r.value
end
else
let backoff = Backoff.once backoff in
pop t backoff (Atomic.fenceless_get t.head)
else pop t backoff (Atomic.fenceless_get t.head)
| T (Tail tail_r) -> begin
match tail_r.move with
| Used ->
let new_head = Atomic.get t.head in
if new_head != H head then pop t backoff new_head
else raise_notrace Empty
| Snoc move_r as move ->
if head_r.counter < move_r.counter then
let (Cons cons_r) = rev move in
let after = cons_r.suffix in
let new_head = Atomic.get t.head in
if new_head != H head then pop t backoff new_head
else if Atomic.compare_and_set t.head (H head) after then begin
tail_r.move <- Used;
cons_r.value
end
else
let backoff = Backoff.once backoff in
pop t backoff (Atomic.fenceless_get t.head)
else
let new_head = Atomic.get t.head in
if new_head != H head then pop t backoff new_head
else raise_notrace Empty
end
let rec pop_exn t backoff =
let head = Atomic.get t.head in
let next = head.next in
if head.counter + 1 = next.counter then
let value = next.value in
if Atomic.compare_and_set t.head head next then begin
next.value <- Obj.magic ();
value
end

let rec push_head t value backoff =
match Atomic.get t.head with
| H (Cons cons_r) as suffix ->
let after = Cons { counter = cons_r.counter - 1; value; suffix } in
if not (Atomic.compare_and_set t.head suffix (H after)) then
push_head t value (Backoff.once backoff)
| H (Head head_r) as head -> begin
match Atomic.get t.tail with
| T (Snoc snoc_r as move) ->
if Atomic.get t.head != head then push_head t value backoff
else if head_r.counter = snoc_r.counter then begin
let prefix = T (Snoc { snoc_r with value }) in
let after =
Snoc { snoc_r with counter = snoc_r.counter + 1; prefix }
in
if not (Atomic.compare_and_set t.tail (T move) (T after)) then
push_head t value (Backoff.once backoff)
end
else
let tail = Tail { counter = snoc_r.counter; move } in
let backoff =
if Atomic.compare_and_set t.tail (T move) (T tail) then backoff
else Backoff.once backoff
in
push_head t value backoff
| T (Tail tail_r) as prefix -> begin
match tail_r.move with
| Used ->
if Atomic.get t.head == head then begin
let tail =
Snoc { counter = tail_r.counter + 1; value; prefix }
in
if not (Atomic.compare_and_set t.tail prefix (T tail)) then
push_head t value (Backoff.once backoff)
end
else push_head t value backoff
| Snoc move_r as move ->
begin
match Atomic.get t.head with
| H (Head head_r as head) when head_r.counter < move_r.counter
->
let after = rev move in
if
Atomic.fenceless_get t.head == H head
&& Atomic.compare_and_set t.head (H head) (H after)
then tail_r.move <- Used
| _ -> tail_r.move <- Used
end;
push_head t value backoff
end
else pop_exn t (Backoff.once backoff)
else
let tail = Atomic.get t.tail in
if tail == head then raise_notrace Empty
else begin
maybe_fix tail;
pop_exn t Backoff.default
end

let rec length t =
let rec pop_all t backoff =
let head = Atomic.get t.head in
let tail = Atomic.fenceless_get t.tail in
if head != Atomic.get t.head then length t
else
let head_at =
match head with H (Cons r) -> r.counter | H (Head r) -> r.counter
let next = head.next in
if head.counter + 1 = next.counter then begin
let new_sentinel =
{ next = Obj.magic (); value = Obj.magic (); counter = head.counter }
in
let tail_at =
match tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter
new_sentinel.next <- new_sentinel;
if Atomic.compare_and_set t.head head new_sentinel then begin
(* TODO: not lock-free. *)
let tail = Atomic.exchange t.tail new_sentinel in
maybe_fix tail;
let rec to_seq work tail () =
Seq.Cons
(work.value, if work == tail then Seq.empty else to_seq work.next tail)
in
to_seq head.next tail
end
else pop_all t (Backoff.once backoff)
end
else
let tail = Atomic.get t.tail in
if tail == head then Seq.empty
else begin
maybe_fix tail;
pop_all t Backoff.default
end

let rec push_head t value backoff =
let head = Atomic.get t.head in
let next = head.next in
if head.counter + 1 = next.counter then begin
let new_next = { value; next; counter = head.counter } in
let new_head =
{ value = Obj.magic (); next = new_next; counter = head.counter - 1 }
in
tail_at - head_at + 1
if not (Atomic.compare_and_set t.head head new_head) then
push_head t value (Backoff.once backoff)
end
else
let tail = Atomic.get t.tail in
if tail == head then
let new_tail = { value; next = tail; counter = tail.counter + 1 } in
if Atomic.compare_and_set t.tail tail new_tail then tail.next <- new_tail
else push_head t value (Backoff.once backoff)
else begin
maybe_fix tail;
push_head t value Backoff.default
end

let[@inline] is_empty t = length t == 0
let[@inline] pop_exn t = pop t Backoff.default (Atomic.fenceless_get t.head)
let rec length t =
let tail = Atomic.get t.tail in
let head = Atomic.fenceless_get t.head in
if tail == Atomic.get t.tail then tail.counter - head.counter else length t

let[@inline] push t value =
push t value Backoff.default (Atomic.fenceless_get t.tail)
let create ?padded () =
let sentinel = { next = Obj.magic (); value = Obj.magic (); counter = 0 } in
sentinel.next <- sentinel;
let head = Atomic.make sentinel |> Multicore_magic.copy_as ?padded in
let tail = Atomic.make sentinel |> Multicore_magic.copy_as ?padded in
{ head; tail } |> Multicore_magic.copy_as ?padded

let[@inline] push t value = push t value Backoff.default
let[@inline] pop_exn t = pop_exn t Backoff.default
let[@inline] pop_all t = pop_all t Backoff.default
let[@inline] push_head t value = push_head t value Backoff.default
let[@inline] is_empty t = length t == 0
4 changes: 4 additions & 0 deletions lib/picos_aux.mpmcq/picos_aux_mpmcq.mli
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ val pop_exn : 'a t -> 'a
@raise Empty in case the queue was empty. *)

val pop_all : 'a t -> 'a Seq.t
(** [pop_all queue] removes all values from the [queue] and returns them as a
sequence. *)

val length : 'a t -> int
(** [length queue] returns the length or the number of values in the [queue]. *)

Expand Down
8 changes: 7 additions & 1 deletion test/test_mpmcq.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ let () =
module Spec = struct
include SpecDefaults

type cmd = Push of int | Push_head of int | Pop_opt | Length
type cmd = Push of int | Push_head of int | Pop_opt | Pop_all | Length

let show_cmd = function
| Push x -> "Push " ^ string_of_int x
| Push_head x -> "Push_head " ^ string_of_int x
| Pop_opt -> "Pop_opt"
| Pop_all -> "Pop_all"
| Length -> "Length"

module State = struct
Expand All @@ -34,6 +35,7 @@ module Spec = struct
match h with [] -> s | [ _ ] -> (List.rev t, []) | _ :: h -> (h, t)

let length (h, t) = List.length h + List.length t
let to_list (h, t) = h @ List.rev t
end

type state = State.t
Expand All @@ -44,6 +46,7 @@ module Spec = struct
Gen.int_range 1 1000 |> Gen.map (fun x -> Push x);
Gen.int_range 1 1000 |> Gen.map (fun x -> Push_head x);
Gen.return Pop_opt;
Gen.return Pop_all;
Gen.return Length;
]
|> Gen.oneof |> make ~print:show_cmd
Expand All @@ -56,6 +59,7 @@ module Spec = struct
| Push x -> State.push x s
| Push_head x -> State.push_head x s
| Pop_opt -> State.drop s
| Pop_all -> ([], [])
| Length -> s

let run c d =
Expand All @@ -68,13 +72,15 @@ module Spec = struct
match Queue.pop_exn d with
| v -> Some v
| exception Queue.Empty -> None )
| Pop_all -> Res (seq int, Queue.pop_all d)
| Length -> Res (int, Queue.length d)

let postcond c (s : state) res =
match (c, res) with
| Push _x, Res ((Unit, _), ()) -> true
| Push_head _x, Res ((Unit, _), ()) -> true
| Pop_opt, Res ((Option Int, _), res) -> res = State.peek_opt s
| Pop_all, Res ((Seq Int, _), res) -> List.of_seq res = State.to_list s
| Length, Res ((Int, _), res) -> res = State.length s
| _, _ -> false
end
Expand Down

0 comments on commit 3036027

Please sign in to comment.