diff --git a/lib/anoma/node/storage.ex b/lib/anoma/node/storage.ex new file mode 100644 index 000000000..a748b864c --- /dev/null +++ b/lib/anoma/node/storage.ex @@ -0,0 +1,22 @@ +defmodule Anoma.Node.Storage do + use Supervisor + + def start_link(init_state) do + Supervisor.start_link(__MODULE__, init_state) + end + + def init(name: name) do + init(name: name, table: %Anoma.Storage{}) + end + + def init(name: name, table: table) do + children = [ + {Anoma.Node.Storage.Communicator, name: name}, + {Anoma.Node.Storage.Ordering, name: name, table: table} + ] + + Supervisor.init(children, strategy: :one_for_all) + end + + def shutdown(supervisor), do: Supervisor.stop(supervisor, :normal) +end diff --git a/lib/anoma/node/storage/communicator.ex b/lib/anoma/node/storage/communicator.ex new file mode 100644 index 000000000..a490fddfb --- /dev/null +++ b/lib/anoma/node/storage/communicator.ex @@ -0,0 +1,85 @@ +defmodule Anoma.Node.Storage.Communicator do + @moduledoc """ + + I am the communicator for the Ordering Node, please read my Ordering + node to know more about how my API works + + """ + + use GenServer + use TypedStruct + alias __MODULE__ + alias Anoma.Node.Storage.Ordering + alias Anoma.Node.Utility + + typedstruct do + field(:primary, atom(), require: true) + field(:subscribers, MapSet.t(GenServer.server()), default: MapSet.new()) + end + + def init(name: name) do + {:ok, %Communicator{primary: name}} + end + + def start_link(arg) do + GenServer.start_link( + __MODULE__, + arg, + Utility.name(arg, &Utility.com_name/1) + ) + end + + ############################################################ + # Public RPC API # + ############################################################ + + @spec state(GenServer.server()) :: Ordering.t() + defdelegate state(ordering), to: Ordering + + @spec next_order(GenServer.server()) :: non_neg_integer() + defdelegate next_order(ordering), to: Ordering + + @spec true_order(GenServer.server(), any()) :: non_neg_integer() | nil + defdelegate true_order(ordering, id), to: Ordering + + @spec new_order(GenServer.server(), Ordering.ordered_transactions()) :: + :error | {:ok, any()} + defdelegate new_order(ordering, ordered), to: Ordering + + @spec new_order( + GenServer.server(), + Ordering.ordered_transactions(), + boolean() + ) :: + :error | {:ok, any()} + defdelegate new_order(ordering, ordered, instrumentation), to: Ordering + + @spec reset(GenServer.server()) :: :ok + defdelegate reset(ordering), to: Ordering + ############################################################ + # Genserver Behavior # + ############################################################ + + # Please give things to the subscribers + + def handle_call(:state, _from, com) do + {:reply, Ordering.state(com.primary), com} + end + + def handle_call(:next_order, _from, com) do + {:reply, Ordering.next_order(com.primary), com} + end + + def handle_call({:true_order, id}, _from, com) do + {:reply, Ordering.true_order(com.primary, id), com} + end + + def handle_call({:new_order, trans, instrumentation}, _from, com) do + {:reply, Ordering.new_order(com.primary, trans, instrumentation), com} + end + + def handle_cast(:reset, state) do + Ordering.reset(state.primary()) + {:noreply, state} + end +end diff --git a/lib/anoma/node/storage/ordering.ex b/lib/anoma/node/storage/ordering.ex new file mode 100644 index 000000000..2c186c28b --- /dev/null +++ b/lib/anoma/node/storage/ordering.ex @@ -0,0 +1,143 @@ +defmodule Anoma.Node.Storage.Ordering do + @moduledoc """ + I am a simple mnesia-backed key-value store in an anoma node. + + Currently we do not have a way of propagating what keys users want + to store, thus we take the following approach to deterministic state + reading: + + 1. We keep a next_order, which represents the next_order a + transaction will have. From here the scry reads specifically + [next_order, :key_space | 0] to get a map of the current keys + saved for this node. + + 2. We keep a hash_to_order to cache the id => order mapping + """ + + use TypedStruct + use GenServer + alias Anoma.Node.Utility + alias Anoma.{Storage, Order} + alias __MODULE__ + + @type ordered_transactions() :: + list(Order.t()) + + @type key() :: any() + + typedstruct do + field(:table, Storage.t(), default: %Anoma.Storage{}) + field(:next_order, non_neg_integer(), default: 1) + field(:hash_to_order, %{key() => non_neg_integer()}, default: %{}) + end + + def init(opts) do + return = %Ordering{table: opts[:table]} + # idempotent + Storage.setup(return.table) + :mnesia.subscribe({:table, return.table.qualified, :simple}) + {:ok, return} + end + + @spec start_link(keyword()) :: :ignore | {:error, any()} | {:ok, pid()} + def start_link(arg) do + GenServer.start_link(__MODULE__, arg, Utility.name(arg)) + end + + ############################################################ + # Public RPC API # + ############################################################ + + @spec state(GenServer.server()) :: t() + def state(ordering) do + GenServer.call(ordering, :state) + end + + @spec next_order(GenServer.server()) :: non_neg_integer() + def next_order(ordering) do + GenServer.call(ordering, :next_order) + end + + @spec true_order(GenServer.server(), any()) :: non_neg_integer() | nil + def true_order(ordering, id) do + GenServer.call(ordering, {:true_order, id}) + end + + @spec new_order(GenServer.server(), ordered_transactions()) :: + :error | {:ok, any()} + def new_order(ordering, ordered_transactions) do + new_order(ordering, ordered_transactions, false) + end + + @spec new_order(GenServer.server(), ordered_transactions(), boolean()) :: + :error | {:ok, any()} + def new_order(ordering, ordered, instrumentation) do + GenServer.call(ordering, {:new_order, ordered, instrumentation}) + end + + @spec reset(GenServer.server()) :: :ok + def reset(ordering) do + GenServer.cast(ordering, :reset) + end + + ############################################################ + # Genserver Behavior # + ############################################################ + + def handle_call(:state, _from, state) do + {:reply, state, state} + end + + def handle_call(:next_order, _from, state) do + {:reply, state.next_order, state} + end + + def handle_call({:true_order, id}, _from, state) do + {:reply, Map.get(state.hash_to_order, id), state} + end + + def handle_call({:new_order, trans, instrumentation}, _from, state) do + {next_order, new_map} = handle_new_order(trans, state, instrumentation) + + {:reply, :ok, %{state | next_order: next_order, hash_to_order: new_map}} + end + + def handle_cast(:reset, state) do + {:noreply, %Ordering{table: state.table}} + end + + ############################################################ + # Genserver Implementation # + ############################################################ + + @spec handle_new_order(ordered_transactions(), t(), boolean()) :: + {non_neg_integer(), %{key() => non_neg_integer()}} + def handle_new_order(ordered_transactions, state, instrumentation) do + num_txs = length(ordered_transactions) + instrument(instrumentation, {:new_tx, num_txs}) + + for order <- ordered_transactions do + instrument(instrumentation, {:ready, Order.pid(order)}) + send(Order.pid(order), {:read_ready, Order.index(order)}) + end + + new_next_order = state.next_order + length(ordered_transactions) + + new_map_elements = + Map.new(ordered_transactions, &{Order.id(&1), Order.index(&1)}) + + new_map = Map.merge(state.hash_to_order, new_map_elements) + {new_next_order, new_map} + end + + ############################################################ + # Instrumentation # + ############################################################ + def instrument(instrument, {:new_tx, num_txs}) do + if instrument, do: IO.inspect(num_txs, label: "new tx count") + end + + def instrument(instrument, {:ready, pid}) do + if instrument, do: IO.inspect(pid, label: "sending read ready to pid") + end +end diff --git a/lib/anoma/order.ex b/lib/anoma/order.ex new file mode 100644 index 000000000..f37ca375e --- /dev/null +++ b/lib/anoma/order.ex @@ -0,0 +1,38 @@ +defmodule Anoma.Order do + @moduledoc """ + I am the Order module, I am the format responsible for making sure + that the given process is told that it can be read or executed at + the given index + """ + use TypedStruct + alias __MODULE__ + + @typedoc """ + I am the Order type. I have information regarding a correct order + and having a proper host to ping to notify when it can do a certain + task. + + ### Fields + - `index` - the ordering index to execute at + - `id` - the identification key path of the requested key + - `pid` - the process identifier to message + """ + typedstruct require: true do + field(:index, non_neg_integer()) + field(:id, any()) + field(:pid, pid()) + end + + def new(index, id, pid) do + %Order{index: index, id: id, pid: pid} + end + + @spec index(t()) :: non_neg_integer() + def index(t), do: t.index + + @spec id(t()) :: any() + def id(t), do: t.id + + @spec pid(t()) :: pid() + def pid(t), do: t.pid +end diff --git a/lib/anoma/partialtx.ex b/lib/anoma/partialtx.ex index 4d24c3719..fc5230e54 100644 --- a/lib/anoma/partialtx.ex +++ b/lib/anoma/partialtx.ex @@ -95,20 +95,18 @@ defmodule Anoma.PartialTx do # Helpers ###################################################################### - @doc """ - I update the resource set with the new resource + # I update the resource set with the new resource + # ### Parameters - ### Parameters + # - resource_set (resource_set()) - the resource set we are updating - - resource_set (resource_set()) - the resource set we are updating + # - new_resource (Resource.t()) - the resource we are adding - - new_resource (Resource.t()) - the resource we are adding + # ### Returns - ### Returns + # the updated resource set - the updated resource set - """ @spec update_resource_set(resources, Resource.t()) :: resources defp update_resource_set(resource_set, new_resource) do denom = Resource.denomination(new_resource) @@ -121,14 +119,6 @@ defmodule Anoma.PartialTx do defp add_quantities(resources) do Enum.reduce(resources, 0, fn x, acc -> x.quantity + acc end) end - - @spec sub_quantities(Resource.t(), Resource.t()) :: Resource.t() - defp sub_quantities(resource_1, resource_2) do - %Resource{ - resource_1 - | quantity: resource_1.quantity - resource_2.quantity - } - end end defimpl Anoma.Intent, for: Anoma.PartialTx do diff --git a/lib/anoma/storage.ex b/lib/anoma/storage.ex new file mode 100644 index 000000000..adebc0922 --- /dev/null +++ b/lib/anoma/storage.ex @@ -0,0 +1,273 @@ +defmodule Anoma.Storage do + @moduledoc """ + I am the Anoma Storage engine, I consist of two parts: + 1. An ordering map which tells of the latest order in the + qualification map + 2. A qualification map which maps from a qualified key to the stored + value + + ## Types + + A good way to view this is that for the `t()`, the fields for what + is stored in mnesia, are simply the order_* and qualified_* values + + type `t Anoma.Storage` to find them all. + + Please also type `t Anoma.Storage.t()` to find out more about the + central type + ## API + The important functions for this API are + - `setup/1` + - `ensure_new/1` + - `get/2` + - `get/3` + - `put/3` + - `put/4` + - `blocking_read/2` + - `blocking_read/3` + + If one wants to query the tables by hand then there are manual + functions, but beware, this is an unintended way of using the API + - `read_order/2` + - `read_at_order/3` + - `write_at_order/4` + + + Please see my testing module `AnomaTest.Storage` to learn more on + how to use me + + ### Snapshots + One can snapshot the keys provided in the code by running the following + + - `snapshot_order/1` + - `put_snapshot/2` + - `in_snapshot/2` + - `get_at_snapshot/2` + """ + + use TypedStruct + + @typedoc """ + I represent the qualified and ordered data of storage + + ## Fields + - `:qualified` - The key value value map into storage + - `:order` - A mapping from keys to the properly qualified keys + """ + typedstruct do + field(:qualified, atom(), default: Anoma.Node.Storage.Qualified) + field(:order, atom(), default: Anoma.Node.Storage.Order) + end + + @type result(res) :: {:atomic, res} | {:aborted, any()} + + @typedoc """ + The key we wish to store, also used for order lookup + """ + @type order_key() :: Noun.t() + @type order_value() :: list({any(), Noun.t(), non_neg_integer()}) + + @typedoc """ + [non_neg_integer(), Noun.t() | non_neg_integer()] + """ + @type qualified_key() :: nonempty_improper_list(any(), non_neg_integer()) + @type qualified_value() :: any() + + @type snapshot() :: {t(), list({order_key(), non_neg_integer()})} + + ############################################################ + # Creation API # + ############################################################ + + @doc """ + I setup storage with the given tables: `t()`. + + I will try to setup all values of storage, even if the first one + fails due to already being setup, we will try the others. + """ + @spec setup(t()) :: :ok | {:aborted, any()} + # If this ever gets big, turn it into a map filter situation + def setup(t) do + case {:mnesia.create_table(t.qualified, attributes: [:key, :value]), + :mnesia.create_table(t.order, attributes: [:key, :order])} do + {{:atomic, :ok}, {:atomic, :ok}} -> :ok + {a, {:atomic, :ok}} -> a + {_a____________, b} -> b + end + end + + @spec remove(t()) :: :ok | {:aborted, any()} + def remove(storage) do + :mnesia.delete_table(storage.qualified) + :mnesia.delete_table(storage.order) + end + + @spec ensure_new(t()) :: :ok | {:aborted, any()} + def ensure_new(storage) do + # We don't care about errors that can happen here + remove(storage) + setup(storage) + end + + ############################################################ + # Operations # + ############################################################ + + @spec get(t(), order_key()) :: :absent | {:ok, qualified_value()} + def get(storage, key) do + get(storage, key, false) + end + + @spec get(t(), order_key(), boolean()) :: :absent | {:ok, qualified_value()} + def get(storage, key, instrumentation) do + with {:atomic, [{_, ^key, order}]} <- read_order(storage, key), + {:atomic, [{_, [^order, ^key | 0], value}]} <- + read_at_order(storage, key, order) do + instrument(instrumentation, {:get_order, order}) + {:ok, value} + else + _ -> :absent + end + end + + @spec put(t(), order_key(), qualified_value()) :: result(:ok) + def put(storage, key, value) do + put(storage, key, value, false) + end + + @spec put(t(), order_key(), qualified_value(), boolean()) :: result(:ok) + def put(storage, key, value, instrumentation) do + with {:atomic, order} <- read_order(storage, key), + new_order = calculate_order(order), + {:atomic, :ok} <- write_at_order(storage, key, value, new_order) do + instrument(instrumentation, {:put_order, new_order}) + {:atomic, :ok} + end + end + + @spec blocking_read(t(), qualified_key()) :: :error | {:ok, any()} + def blocking_read(storage, key) do + blocking_read(storage, key, false) + end + + @spec blocking_read(t(), qualified_key(), boolean()) :: + :error | {:ok, any()} + def blocking_read(storage, key, instrumentation) do + instrument(instrumentation, {:read, key}) + + case key do + [0 | _] -> + :error + + [_ | _] -> + :mnesia.subscribe({:table, storage.qualified, :simple}) + tx = fn -> :mnesia.read(storage.qualified, key) end + {:atomic, result} = :mnesia.transaction(tx) + + case result do + [{_, ^key, value}] -> + {:ok, value} + + [] -> + receive do + {:mnesia_table_event, {:write, {_, ^key, value}, _}} -> + {:ok, value} + end + end + + _ -> + :error + end + end + + ############################################################ + # Snapshots # + ############################################################ + + @spec snapshot_order(t()) :: result(snapshot()) + def snapshot_order(storage) do + :mnesia.transaction(fn -> + snapshot = [{{:"$1", :"$2", :"$3"}, [], [{{:"$2", :"$3"}}]}] + {storage, :mnesia.select(storage.order, snapshot)} + end) + end + + @spec put_snapshot(t(), order_key()) :: result(:ok) + def put_snapshot(storage, key) do + with {:atomic, snapshot} <- snapshot_order(storage) do + put(storage, key, snapshot) + end + end + + @spec in_snapshot(snapshot(), order_key()) :: nil | non_neg_integer() + def in_snapshot({_, snapshot}, key) do + List.keyfind(snapshot, key, 0, {nil, nil}) + |> elem(1) + end + + @spec get_at_snapshot(snapshot(), order_key()) :: + :absent | {:ok, qualified_value()} + def get_at_snapshot(snapshot = {storage, _}, key) do + position = in_snapshot(snapshot, key) + + with {:atomic, [{_, [^position, ^key | 0], value}]} <- + read_at_order(storage, key, position) do + {:ok, value} + else + _ -> :absent + end + end + + ############################################################ + # Queries # + ############################################################ + + @spec read_order(t(), order_key()) :: + result(list({atom(), Noun.t(), non_neg_integer()})) + def read_order(storage, key) do + order_tx = fn -> :mnesia.read(storage.order, key) end + :mnesia.transaction(order_tx) + end + + @spec read_at_order(t(), Noun.t(), non_neg_integer()) :: + result(list({atom(), qualified_key(), qualified_value()})) + def read_at_order(storage, key, order) do + value_tx = fn -> :mnesia.read(storage.qualified, [order, key | 0]) end + :mnesia.transaction(value_tx) + end + + @spec write_at_order(t(), Noun.t(), Noun.t(), non_neg_integer()) :: + result(:ok) + def write_at_order(storage, key, value, order) do + write_tx = fn -> + :mnesia.write({storage.order, key, order}) + + :mnesia.write({storage.qualified, [order, key | 0], value}) + end + + :mnesia.transaction(write_tx) + end + + ############################################################ + # Helpers # + ############################################################ + + @spec calculate_order([{any(), any(), number()}]) :: number() + def calculate_order([{_, _, order}]), do: order + 1 + def calculate_order([]), do: 1 + + ############################################################ + # Instrumentation # + ############################################################ + def instrument(instrument, {:get_order, order}) do + if instrument, do: IO.inspect(order, label: "getting at order") + end + + def instrument(instrument, {:put_order, order}) do + if instrument, do: IO.inspect(order, label: "putting at order") + end + + def instrument(instrument, {:read, key}) do + if instrument, do: IO.inspect(key, label: "regular blocking read key") + end +end diff --git a/test/block_test.exs b/test/block_test.exs index 576e33286..a9968d8b2 100644 --- a/test/block_test.exs +++ b/test/block_test.exs @@ -18,7 +18,7 @@ defmodule AnomaTest.Block do end test "no signature on startup" do - {pub, priv} = :crypto.generate_key(:rsa, {1024, 65537}) + {pub, _priv} = :crypto.generate_key(:rsa, {1024, 65537}) block = Block.create(Base.default(), pub, 0) assert block.signature == nil end diff --git a/test/communicator_test.exs b/test/communicator_test.exs index 700a293a2..2613be7d8 100644 --- a/test/communicator_test.exs +++ b/test/communicator_test.exs @@ -1,12 +1,8 @@ defmodule AnomaTest.Communicator do use ExUnit.Case, async: true - import Anoma.Node.Executor.Communicator - alias Anoma.Node.Executor.Communicator - alias Anoma.Subscriber.Basic - alias Anoma.PartialTx alias Anoma.Node.Executor, as: Node doctest(Anoma.Node.Executor.Communicator) diff --git a/test/intent_test.exs b/test/intent_test.exs index fb253ce5b..1d3902a2b 100644 --- a/test/intent_test.exs +++ b/test/intent_test.exs @@ -2,7 +2,7 @@ defmodule AnomaTest.Intent do use ExUnit.Case, async: true alias Anoma.Node.Intent - alias Anoma.Node.Intent.{Communicator, Pool} + alias Anoma.Node.Intent.Communicator alias Anoma.Resource doctest(Anoma.Node.Intent) @@ -15,7 +15,7 @@ defmodule AnomaTest.Intent do Communicator.new_intent(:intents_add_com, resource) Communicator.subscribe(:intents_add_com, self()) - assert_receive {:"$gen_cast", {:intents, resource_set}} + assert_receive {:"$gen_cast", {:intents, ^resource_set}} Communicator.new_intent(:intents_add_com, resource) @@ -45,8 +45,8 @@ defmodule AnomaTest.Intent do Communicator.new_intent(:intents_signal_com, resource_1) Communicator.new_intent(:intents_signal_com, resource_1) - assert_receive {:"$gen_cast", {:new_intent, resource_1}} - assert_receive {:"$gen_cast", {:new_intent, resource_1}} + assert_receive {:"$gen_cast", {:new_intent, ^resource_1}} + assert_receive {:"$gen_cast", {:new_intent, ^resource_1}} Intent.shutdown(supervisor) end diff --git a/test/logic_test.exs b/test/logic_test.exs index 54992c6c7..b73e03452 100644 --- a/test/logic_test.exs +++ b/test/logic_test.exs @@ -3,7 +3,7 @@ defmodule AnomaTest.Logic do alias Anoma.Logic.{Lt, Branch, Mul, Add, Inc, Neg} - alias Anoma.{Eval, PartialTx} + alias Anoma.Eval doctest(Anoma.Logic) diff --git a/test/nock_test.exs b/test/nock_test.exs index 03e2fe266..6f6ccb586 100644 --- a/test/nock_test.exs +++ b/test/nock_test.exs @@ -2,7 +2,6 @@ defmodule AnomaTest.Nock do use ExUnit.Case, async: true import Nock - import Noun doctest(Nock) diff --git a/test/node/storage_test.exs b/test/node/storage_test.exs new file mode 100644 index 000000000..c97a70dd2 --- /dev/null +++ b/test/node/storage_test.exs @@ -0,0 +1,76 @@ +defmodule AnomaTest.Node.Storage do + use ExUnit.Case + + alias Anoma.Order + alias Anoma.Node.Storage.{Communicator, Ordering} + + doctest(Anoma.Node.Storage) + doctest(Anoma.Node.Storage.Communicator) + doctest(Anoma.Node.Storage.Ordering) + + setup_all do + # base storage testing default + storage = %Anoma.Storage{ + qualified: AnomaTest.Node.Storage.Qualified, + order: AnomaTest.Node.Storage.Order + } + + ordering = :node_storage_com + + unless Process.whereis(ordering) do + Anoma.Node.Storage.start_link(name: :node_storage, table: storage) + end + + [ordering: ordering] + end + + test "reset works", %{ordering: ordering} do + Communicator.new_order(ordering, [Order.new(1, <<3>>, self())]) + Communicator.reset(ordering) + ordering = Communicator.state(ordering) + assert ordering.hash_to_order == %{} + assert ordering.next_order == 1 + end + + test "added to the hash_ordering", %{ordering: ordering} do + Communicator.reset(ordering) + + Communicator.new_order( + ordering, + [Order.new(1, <<3>>, self()), Order.new(1, <<3>>, self())], + false + ) + + ordering = Communicator.state(ordering) + assert ordering.hash_to_order == %{<<3>> => 1} + end + + test "getting proper offsets", %{ordering: ordering} do + Communicator.reset(ordering) + assert 1 == Communicator.next_order(ordering) + + Communicator.new_order(ordering, [ + Order.new(1, <<3>>, self()), + Order.new(2, <<3>>, self()) + ]) + + assert 3 == Communicator.next_order(ordering) + end + + test "receiving read readies", %{ordering: ordering} do + Communicator.new_order(ordering, [ + Order.new(3, <<3>>, self()), + Order.new(4, <<3>>, self()) + ]) + + assert_receive {:read_ready, 3} + assert_receive {:read_ready, 4} + end + + test "we properly cache orders", %{ordering: ordering} do + Communicator.reset(ordering) + assert Communicator.true_order(ordering, <<3>>) == nil + Communicator.new_order(ordering, [Order.new(1, <<3>>, self())]) + assert Communicator.true_order(ordering, <<3>>) == 1 + end +end diff --git a/test/node_test.exs b/test/node_test.exs index 065deefdd..3d2b05842 100644 --- a/test/node_test.exs +++ b/test/node_test.exs @@ -1,9 +1,6 @@ defmodule AnomaTest.Node do use ExUnit.Case, async: true - alias Anoma.Node.Executor.{Communicator, Primary} - alias Anoma.Node.Executor - doctest(Anoma.Node.Executor) test "node works" do diff --git a/test/noun_test.exs b/test/noun_test.exs index e11a8d5f5..36acffbc4 100644 --- a/test/noun_test.exs +++ b/test/noun_test.exs @@ -2,7 +2,6 @@ defmodule AnomaTest.Noun do use ExUnit.Case, async: true import Noun - alias Noun.Format doctest(Noun) doctest(Noun.Format) diff --git a/test/partialtx_test.exs b/test/partialtx_test.exs index 3b78a8290..c0599d4f0 100644 --- a/test/partialtx_test.exs +++ b/test/partialtx_test.exs @@ -1,10 +1,6 @@ defmodule AnomaTest.PartialTx do use ExUnit.Case, async: true - alias Anoma.Node.Communicator - - alias Anoma.Subscriber.Basic - alias Anoma.PartialTx doctest(Anoma.PartialTx) diff --git a/test/storage_test.exs b/test/storage_test.exs new file mode 100644 index 000000000..f81cb259b --- /dev/null +++ b/test/storage_test.exs @@ -0,0 +1,126 @@ +defmodule AnomaTest.Storage do + use ExUnit.Case + + alias Anoma.Storage + + doctest(Anoma.Storage) + + setup_all do + # base storage testing default + storage = %Storage{ + qualified: AnomaTest.Storage.Qualified, + order: AnomaTest.Storage.Order + } + + Storage.ensure_new(storage) + [storage: storage] + end + + describe "Direct API" do + test "Empty Storage is absent", %{storage: storage} do + testing_atom = :QWERTZ_abc + assert Storage.get(storage, testing_atom) == :absent + end + + test "Putting works", %{storage: storage} do + testing_atom = :QWERTZ_putting + assert {:atomic, :ok} = Storage.put(storage, testing_atom, 1) + assert {:ok, 1} = Storage.get(storage, testing_atom) + end + + test "block_reads work", %{storage: storage} do + testing_atom = :QWERTZ_blocking + assert {:atomic, :ok} = Storage.put(storage, testing_atom, 1) + + assert {:ok, block} = + Storage.blocking_read(storage, [1, testing_atom | 0]) + + assert {:ok, current} = Storage.get(storage, testing_atom) + assert current == block + end + + test "block_reads can read the past", %{storage: storage} do + testing_atom = :QWERTZ_blocking_past + assert {:atomic, :ok} = Storage.put(storage, testing_atom, 1) + assert {:atomic, :ok} = Storage.put(storage, testing_atom, 2) + + assert {:ok, bl_1} = + Storage.blocking_read(storage, [1, testing_atom | 0]) + + assert {:ok, bl_2} = + Storage.blocking_read(storage, [2, testing_atom | 0]) + + assert {:ok, curr} = Storage.get(storage, testing_atom) + assert curr == bl_2 + assert bl_1 + 1 == bl_2 + end + + test "blocking_reads must accept position indicators", %{storage: s} do + assert Storage.blocking_read(s, :Centuri_Republic) == :error + end + + test "blocking_reads really do block", %{storage: storage} do + testing_atom = System.unique_integer() + home = self() + + pid = + spawn(fn -> + assert {:ok, value} = + Storage.blocking_read(storage, [1, testing_atom | 0]) + + send(home, {:read, value}) + end) + + assert Process.alive?(pid) == true + Storage.put(storage, testing_atom, 1) + assert_receive {:read, 1}, 100 + assert Process.alive?(pid) == false + end + end + + describe "Querying by hand" do + test "Reading at a known order gives results", %{storage: storage} do + testing_atom = 750_089_999 + Storage.write_at_order(storage, testing_atom, 10, 3) + + assert Storage.read_at_order(storage, testing_atom, 3) == + {:atomic, [{storage.qualified, [3, testing_atom | 0], 10}]} + end + + test "Writing at an order gives us the same testing order", %{ + storage: storage + } do + testing_atom = 999_888_777_666 + Storage.write_at_order(storage, testing_atom, 10, 3) + + assert Storage.read_order(storage, testing_atom) == + {:atomic, [{storage.order, testing_atom, 3}]} + end + end + + describe "Snapshots" do + test "snapshots properly put", %{storage: storage} do + snapshot_storage = :snapshot_super_secret + assert {:atomic, :ok} = Storage.put_snapshot(storage, snapshot_storage) + assert {:ok, _} = Storage.get(storage, snapshot_storage) + end + + test "snapshots properly get the latest", %{storage: storage} do + snapshot_storage = :super_hot + testing_atom = 111_222_333_444_555_666 + Storage.write_at_order(storage, testing_atom, 10, 3) + assert {:atomic, :ok} = Storage.put_snapshot(storage, snapshot_storage) + assert {:ok, snapshot} = Storage.get(storage, snapshot_storage) + assert Storage.in_snapshot(snapshot, testing_atom) == 3 + + assert Storage.get_at_snapshot(snapshot, testing_atom) == + {:ok, 10} + end + + test "missing key gives us nil", %{storage: storage} do + nonsense_atom = :very_good_atom + assert {:atomic, snapshot} = Storage.snapshot_order(storage) + assert Storage.get_at_snapshot(snapshot, nonsense_atom) + end + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index 869559e70..1bd8c6eba 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1 +1,2 @@ +Anoma.Mnesia.init() ExUnit.start()