diff --git a/bench/bench_two_stack_queue.ml b/bench/bench_two_stack_queue.ml new file mode 100644 index 00000000..779658d8 --- /dev/null +++ b/bench/bench_two_stack_queue.ml @@ -0,0 +1,77 @@ +open Multicore_bench +module Queue = Kcas_data.Two_stack_queue + +let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () = + let t = Queue.create () in + + let op push = if push then Queue.push t 101 else Queue.pop_opt t |> ignore in + + let init _ = + assert (Queue.pop_opt t == None); + Util.generate_push_and_pop_sequence n_msgs + in + let work _ bits = Util.Bits.iter op bits in + + Times.record ~budgetf ~n_domains:1 ~init ~work () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain" + +let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) + ?(n_msgs = 100 * Util.iter_factor) () = + let n_domains = n_adders + n_takers in + + let t = Queue.create () in + + let n_msgs_to_take = Atomic.make n_msgs |> Multicore_magic.copy_as_padded in + let n_msgs_to_add = Atomic.make n_msgs |> Multicore_magic.copy_as_padded in + + let init _ = () in + let work i () = + if i < n_adders then + let rec work () = + let n = Util.alloc n_msgs_to_add in + if 0 < n then begin + for i = 1 to n do + Queue.push t i + done; + work () + end + in + work () + else + let rec work () = + let n = Util.alloc n_msgs_to_take in + if n <> 0 then begin + for _ = 1 to n do + while Option.is_none (Queue.pop_opt t) do + Domain.cpu_relax () + done + done; + work () + end + in + work () + in + let after () = + Atomic.set n_msgs_to_take n_msgs; + Atomic.set n_msgs_to_add n_msgs + in + + let config = + let format role blocking n = + Printf.sprintf "%d %s%s%s" n + (if blocking then "" else "nb ") + role + (if n = 1 then "" else "s") + in + Printf.sprintf "%s, %s" + (format "adder" false n_adders) + (format "taker" false n_takers) + in + Times.record ~budgetf ~n_domains ~init ~work ~after () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config + +let run_suite ~budgetf = + run_one_domain ~budgetf () + @ (Util.cross [ 1; 2 ] [ 1; 2 ] + |> List.concat_map @@ fun (n_adders, n_takers) -> + run_one ~budgetf ~n_adders ~n_takers ()) diff --git a/bench/main.ml b/bench/main.ml index 633f822e..fbab1803 100644 --- a/bench/main.ml +++ b/bench/main.ml @@ -11,6 +11,7 @@ let benchmarks = ("Kcas_data Dllist", Bench_dllist.run_suite); ("Kcas_data Hashtbl", Bench_hashtbl.run_suite); ("Kcas_data Mvar", Bench_mvar.run_suite); + ("Kcas_data Two_stack_queue", Bench_two_stack_queue.run_suite); ("Kcas_data Queue", Bench_queue.run_suite); ("Kcas_data Stack", Bench_stack.run_suite); ] diff --git a/dune-project b/dune-project index 189f6f4d..6af525f1 100644 --- a/dune-project +++ b/dune-project @@ -69,9 +69,7 @@ (multicore-magic (>= 2.1.0)) (backoff - (and - (>= 0.1.0) - :with-test)) + (>= 0.1.0)) (domain-local-await (and (>= 1.0.1) diff --git a/kcas_data.opam b/kcas_data.opam index 8d51fb80..c0339aae 100644 --- a/kcas_data.opam +++ b/kcas_data.opam @@ -19,7 +19,7 @@ depends: [ "dune" {>= "3.8"} "kcas" {= version} "multicore-magic" {>= "2.1.0"} - "backoff" {>= "0.1.0" & with-test} + "backoff" {>= "0.1.0"} "domain-local-await" {>= "1.0.1" & with-test} "domain_shims" {>= "0.1.0" & with-test} "multicore-bench" {>= "0.1.0" & with-test} diff --git a/src/kcas/kcas.ml b/src/kcas/kcas.ml index 799e12cc..f6e1dfce 100644 --- a/src/kcas/kcas.ml +++ b/src/kcas/kcas.ml @@ -539,22 +539,15 @@ let rec exchange_no_alloc backoff loc state = end else exchange_no_alloc (Backoff.once backoff) loc state -let[@inline] rec cas_with_state backoff loc before state state_old = +let[@inline] cas_with_state loc before state state_old = before == eval state_old && (before == state.after - || - if Atomic.compare_and_set (as_atomic loc) state_old state then begin - resume_awaiters state_old.awaiters; - true - end - else - (* We must retry, because compare is by value rather than by state. In - other words, we should not fail spuriously due to some other thread - having installed or removed a waiter. - - Fenceless is safe as there was a fence before. *) - cas_with_state (Backoff.once backoff) loc before state - (fenceless_get (as_atomic loc))) + || atomic_get (as_atomic loc) == state_old + && Atomic.compare_and_set (as_atomic loc) state_old state + && begin + resume_awaiters state_old.awaiters; + true + end) let inc x = x + 1 let dec x = x - 1 @@ -607,10 +600,18 @@ module Loc = struct let[@inline] get_mode loc = if (to_loc loc).id < 0 then `Lock_free else `Obstruction_free - let compare_and_set ?(backoff = Backoff.default) loc before after = - let state = new_state after in + let compare_and_set loc before after = let state_old = atomic_get (as_atomic (to_loc loc)) in - cas_with_state backoff (to_loc loc) before state state_old + before == eval state_old + && (before == after + || atomic_get (as_atomic (to_loc loc)) == state_old + && Atomic.compare_and_set + (as_atomic (to_loc loc)) + state_old (new_state after) + && begin + resume_awaiters state_old.awaiters; + true + end) let fenceless_update ?timeoutf ?(backoff = Backoff.default) loc f = let timeout = Timeout.alloc_opt timeoutf in @@ -909,7 +910,7 @@ module Xt = struct (* Fenceless is safe inside transactions as each log update has a fence. *) let state_old = fenceless_get (as_atomic loc) in - if cas_with_state Backoff.default loc before state state_old then + if cas_with_state loc before state state_old then success xt result else commit_once_reuse backoff xt tx end diff --git a/src/kcas/kcas.mli b/src/kcas/kcas.mli index 2061c9ee..dc2a0026 100644 --- a/src/kcas/kcas.mli +++ b/src/kcas/kcas.mli @@ -232,7 +232,7 @@ module Loc : sig conditional load. It is also safe for the given function [f] to raise any other exception to abort the conditional load. *) - val compare_and_set : ?backoff:Backoff.t -> 'a t -> 'a -> 'a -> bool + val compare_and_set : 'a t -> 'a -> 'a -> bool (** [compare_and_set r before after] atomically updates the shared memory location [r] to the [after] value if the current value of [r] is the [before] value. *) diff --git a/src/kcas_data/dune b/src/kcas_data/dune index d0ec8912..6adf9787 100644 --- a/src/kcas_data/dune +++ b/src/kcas_data/dune @@ -3,6 +3,7 @@ (public_name kcas_data) (libraries (re_export kcas) + backoff multicore-magic)) (rule diff --git a/src/kcas_data/kcas_data.ml b/src/kcas_data/kcas_data.ml index 4176883e..8c7b06ca 100644 --- a/src/kcas_data/kcas_data.ml +++ b/src/kcas_data/kcas_data.ml @@ -1,5 +1,6 @@ module Hashtbl = Hashtbl module Queue = Queue +module Two_stack_queue = Two_stack_queue module Stack = Stack module Mvar = Mvar module Promise = Promise diff --git a/src/kcas_data/kcas_data.mli b/src/kcas_data/kcas_data.mli index 6041c600..96aacb96 100644 --- a/src/kcas_data/kcas_data.mli +++ b/src/kcas_data/kcas_data.mli @@ -122,6 +122,7 @@ module Hashtbl = Hashtbl module Queue = Queue +module Two_stack_queue = Two_stack_queue module Stack = Stack (** {1 Communication and synchronization primitives} *) diff --git a/src/kcas_data/two_stack_queue.ml b/src/kcas_data/two_stack_queue.ml new file mode 100644 index 00000000..8f0a70a1 --- /dev/null +++ b/src/kcas_data/two_stack_queue.ml @@ -0,0 +1,140 @@ +open Kcas + +type 'a t = { head : 'a head Loc.t; tail : 'a tail Loc.t } + +and ('a, _) tdt = + | Cons : { + counter : int; + value : 'a; + suffix : 'a head; + } + -> ('a, [> `Cons ]) tdt + | Head : { counter : int } -> ('a, [> `Head ]) tdt + | Snoc : { + counter : int; + prefix : 'a tail; + value : 'a; + } + -> ('a, [> `Snoc ]) tdt + | Tail : { + counter : int; + mutable move : ('a, [ `Snoc ]) tdt; + } + -> ('a, [> `Tail ]) tdt + +and 'a head = H : ('a, [< `Cons | `Head ]) tdt -> 'a head [@@unboxed] +and 'a tail = T : ('a, [< `Snoc | `Tail ]) tdt -> 'a tail [@@unboxed] + +(* *) + +let create () = + let head = Loc.make ~padded:true (H (Head { counter = 1 })) in + let tail = + Loc.make ~padded:true (T (Tail { counter = 0; move = Obj.magic () })) + in + { head; tail } |> Multicore_magic.copy_as_padded + +(* *) + +let rec rev (suffix : (_, [< `Cons ]) tdt) = function + | T (Snoc { counter; prefix; value }) -> + rev (Cons { counter; value; suffix = H suffix }) prefix + | T (Tail _) -> suffix + +let[@inline] rev = function + | (Snoc { counter; prefix; value } : (_, [< `Snoc ]) tdt) -> + rev + (Cons { counter; value; suffix = H (Head { counter = counter + 1 }) }) + prefix + +(* *) + +let rec push backoff t value = + match Loc.fenceless_get t.tail with + | T (Snoc snoc_r) as prefix -> push_with backoff t snoc_r.counter prefix value + | T (Tail tail_r as tail) -> + let move = tail_r.move in + if move != Obj.magic () then begin + let (Snoc move_r) = move in + match Loc.fenceless_get t.head with + | H (Head head_r as head) when head_r.counter < move_r.counter -> + let after = rev move in + if Loc.compare_and_set t.head (H head) (H after) then + tail_r.move <- Obj.magic () + | _ -> () + end; + push_with backoff t tail_r.counter (T tail) value + +and push_with backoff t counter prefix value = + let after = Snoc { counter = counter + 1; prefix; value } in + if not (Loc.compare_and_set t.tail prefix (T after)) then + push (Backoff.once backoff) t value + +let[@inline] push t value = push Backoff.default t value + +(* *) + +exception Empty + +let rec pop backoff t = + match Loc.get t.head with + | H (Cons cons_r) as before -> + let after = cons_r.suffix in + if Loc.compare_and_set t.head before after then cons_r.value + else pop (Backoff.once backoff) t + | H (Head head_r as head) -> begin + match Loc.fenceless_get t.tail with + | T (Snoc snoc_r as move) -> + if head_r.counter = snoc_r.counter then + if Loc.compare_and_set t.tail (T move) snoc_r.prefix then + snoc_r.value + else pop backoff t + else + let tail = Tail { counter = snoc_r.counter; move } in + if + Loc.fenceless_get t.head == H head + && Loc.compare_and_set t.tail (T move) (T tail) + then pop_moving backoff t head move tail + else pop backoff t + | T (Tail tail_r as tail) -> + let move = tail_r.move in + if move == Obj.magic () then pop_emptyish backoff t head + else pop_moving backoff t head move tail + end + +and pop_moving backoff t (Head head_r as head : (_, [< `Head ]) tdt) + (Snoc move_r as move) (Tail tail_r : (_, [< `Tail ]) tdt) = + if head_r.counter < move_r.counter then + match rev move with + | Cons cons_r -> + if Loc.compare_and_set t.head (H head) cons_r.suffix then begin + tail_r.move <- Obj.magic (); + cons_r.value + end + else pop (Backoff.once backoff) t + else pop_emptyish backoff t head + +and pop_emptyish backoff t head = + if Loc.get t.head == H head then raise_notrace Empty else pop backoff t + +let[@inline] pop_opt t = + match pop Backoff.default t with + | value -> Some value + | exception Empty -> None + +let[@inline] pop t = pop Backoff.default t + +(* *) + +let rec length t = + let head = Loc.get t.head in + let tail = Loc.fenceless_get t.tail in + if head != Loc.get t.head then length t + else + let head_at = + match head with H (Cons r) -> r.counter | H (Head r) -> r.counter + in + let tail_at = + match tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter + in + tail_at - head_at + 1 diff --git a/src/kcas_data/two_stack_queue.mli b/src/kcas_data/two_stack_queue.mli new file mode 100644 index 00000000..910ffbaf --- /dev/null +++ b/src/kcas_data/two_stack_queue.mli @@ -0,0 +1,20 @@ +type !'a t +(** *) + +val create : unit -> 'a t +(** *) + +val push : 'a t -> 'a -> unit +(** *) + +exception Empty +(** *) + +val pop : 'a t -> 'a +(** *) + +val pop_opt : 'a t -> 'a option +(** *) + +val length : 'a t -> int +(** *) diff --git a/test/kcas_data/dune b/test/kcas_data/dune index 45b2c552..d231d255 100644 --- a/test/kcas_data/dune +++ b/test/kcas_data/dune @@ -23,6 +23,7 @@ queue_test_stm stack_test stack_test_stm + two_stack_queue_test_stm xt_test) (libraries alcotest diff --git a/test/kcas_data/two_stack_queue_test_stm.ml b/test/kcas_data/two_stack_queue_test_stm.ml new file mode 100644 index 00000000..0e3033fd --- /dev/null +++ b/test/kcas_data/two_stack_queue_test_stm.ml @@ -0,0 +1,64 @@ +open QCheck +open STM +open Kcas_data + +module Spec = struct + type cmd = Push of int | Take_opt | Length + + let show_cmd = function + | Push x -> "Push " ^ string_of_int x + | Take_opt -> "Take_opt" + | Length -> "Length" + + module State = struct + type t = int list * int list + + let push x (h, t) = if h == [] then ([ x ], []) else (h, x :: t) + let peek_opt (h, _) = match h with x :: _ -> Some x | [] -> None + + let drop ((h, t) as s) = + match h with [] -> s | [ _ ] -> (List.rev t, []) | _ :: h -> (h, t) + + let length (h, t) = List.length h + List.length t + end + + type state = State.t + type sut = int Two_stack_queue.t + + let arb_cmd _s = + [ + Gen.int |> Gen.map (fun x -> Push x); + Gen.return Take_opt; + Gen.return Length; + ] + |> Gen.oneof |> make ~print:show_cmd + + let init_state = ([], []) + let init_sut () = Two_stack_queue.create () + let cleanup _ = () + + let next_state c s = + match c with + | Push x -> State.push x s + | Take_opt -> State.drop s + | Length -> s + + let precond _ _ = true + + let run c d = + match c with + | Push x -> Res (unit, Two_stack_queue.push d x) + | Take_opt -> Res (option int, Two_stack_queue.pop_opt d) + | Length -> Res (int, Two_stack_queue.length d) + + let postcond c (s : state) res = + match (c, res) with + | Push _x, Res ((Unit, _), ()) -> true + | Take_opt, Res ((Option Int, _), res) -> res = State.peek_opt s + | Length, Res ((Int, _), res) -> res = State.length s + | _, _ -> false +end + +let () = + Stm_run.run ~count:1000 ~verbose:true ~name:"Two_stack_queue" (module Spec) + |> exit