From a33a37479a026e4a47cef3ad106fc3c4e13c66ad Mon Sep 17 00:00:00 2001 From: mariari Date: Sun, 31 Dec 2023 13:16:20 +0800 Subject: [PATCH 1/8] Create an Order type This serves as the basis for what a real order would expect the format to be. Making this an explicit type makes it easy to refactor the intenral representation without having to go and change every instance of it --- lib/anoma/order.ex | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 lib/anoma/order.ex diff --git a/lib/anoma/order.ex b/lib/anoma/order.ex new file mode 100644 index 000000000..f37ca375e --- /dev/null +++ b/lib/anoma/order.ex @@ -0,0 +1,38 @@ +defmodule Anoma.Order do + @moduledoc """ + I am the Order module, I am the format responsible for making sure + that the given process is told that it can be read or executed at + the given index + """ + use TypedStruct + alias __MODULE__ + + @typedoc """ + I am the Order type. I have information regarding a correct order + and having a proper host to ping to notify when it can do a certain + task. + + ### Fields + - `index` - the ordering index to execute at + - `id` - the identification key path of the requested key + - `pid` - the process identifier to message + """ + typedstruct require: true do + field(:index, non_neg_integer()) + field(:id, any()) + field(:pid, pid()) + end + + def new(index, id, pid) do + %Order{index: index, id: id, pid: pid} + end + + @spec index(t()) :: non_neg_integer() + def index(t), do: t.index + + @spec id(t()) :: any() + def id(t), do: t.id + + @spec pid(t()) :: pid() + def pid(t), do: t.pid +end From c72990e2767d284087f2ea7fa51ec8a53b503268 Mon Sep 17 00:00:00 2001 From: mariari Date: Mon, 1 Jan 2024 17:20:32 +0800 Subject: [PATCH 2/8] Remove various warnings found throughout the project We remove all known warnings, this makes it nice as `mix test` and other commands will not spit any wanrings anymore --- lib/anoma/partialtx.ex | 22 ++++++---------------- test/block_test.exs | 2 +- test/communicator_test.exs | 4 ---- test/intent_test.exs | 8 ++++---- test/logic_test.exs | 2 +- test/nock_test.exs | 1 - test/node_test.exs | 3 --- test/noun_test.exs | 1 - test/partialtx_test.exs | 4 ---- 9 files changed, 12 insertions(+), 35 deletions(-) diff --git a/lib/anoma/partialtx.ex b/lib/anoma/partialtx.ex index 4d24c3719..fc5230e54 100644 --- a/lib/anoma/partialtx.ex +++ b/lib/anoma/partialtx.ex @@ -95,20 +95,18 @@ defmodule Anoma.PartialTx do # Helpers ###################################################################### - @doc """ - I update the resource set with the new resource + # I update the resource set with the new resource + # ### Parameters - ### Parameters + # - resource_set (resource_set()) - the resource set we are updating - - resource_set (resource_set()) - the resource set we are updating + # - new_resource (Resource.t()) - the resource we are adding - - new_resource (Resource.t()) - the resource we are adding + # ### Returns - ### Returns + # the updated resource set - the updated resource set - """ @spec update_resource_set(resources, Resource.t()) :: resources defp update_resource_set(resource_set, new_resource) do denom = Resource.denomination(new_resource) @@ -121,14 +119,6 @@ defmodule Anoma.PartialTx do defp add_quantities(resources) do Enum.reduce(resources, 0, fn x, acc -> x.quantity + acc end) end - - @spec sub_quantities(Resource.t(), Resource.t()) :: Resource.t() - defp sub_quantities(resource_1, resource_2) do - %Resource{ - resource_1 - | quantity: resource_1.quantity - resource_2.quantity - } - end end defimpl Anoma.Intent, for: Anoma.PartialTx do diff --git a/test/block_test.exs b/test/block_test.exs index 576e33286..a9968d8b2 100644 --- a/test/block_test.exs +++ b/test/block_test.exs @@ -18,7 +18,7 @@ defmodule AnomaTest.Block do end test "no signature on startup" do - {pub, priv} = :crypto.generate_key(:rsa, {1024, 65537}) + {pub, _priv} = :crypto.generate_key(:rsa, {1024, 65537}) block = Block.create(Base.default(), pub, 0) assert block.signature == nil end diff --git a/test/communicator_test.exs b/test/communicator_test.exs index 700a293a2..2613be7d8 100644 --- a/test/communicator_test.exs +++ b/test/communicator_test.exs @@ -1,12 +1,8 @@ defmodule AnomaTest.Communicator do use ExUnit.Case, async: true - import Anoma.Node.Executor.Communicator - alias Anoma.Node.Executor.Communicator - alias Anoma.Subscriber.Basic - alias Anoma.PartialTx alias Anoma.Node.Executor, as: Node doctest(Anoma.Node.Executor.Communicator) diff --git a/test/intent_test.exs b/test/intent_test.exs index fb253ce5b..1d3902a2b 100644 --- a/test/intent_test.exs +++ b/test/intent_test.exs @@ -2,7 +2,7 @@ defmodule AnomaTest.Intent do use ExUnit.Case, async: true alias Anoma.Node.Intent - alias Anoma.Node.Intent.{Communicator, Pool} + alias Anoma.Node.Intent.Communicator alias Anoma.Resource doctest(Anoma.Node.Intent) @@ -15,7 +15,7 @@ defmodule AnomaTest.Intent do Communicator.new_intent(:intents_add_com, resource) Communicator.subscribe(:intents_add_com, self()) - assert_receive {:"$gen_cast", {:intents, resource_set}} + assert_receive {:"$gen_cast", {:intents, ^resource_set}} Communicator.new_intent(:intents_add_com, resource) @@ -45,8 +45,8 @@ defmodule AnomaTest.Intent do Communicator.new_intent(:intents_signal_com, resource_1) Communicator.new_intent(:intents_signal_com, resource_1) - assert_receive {:"$gen_cast", {:new_intent, resource_1}} - assert_receive {:"$gen_cast", {:new_intent, resource_1}} + assert_receive {:"$gen_cast", {:new_intent, ^resource_1}} + assert_receive {:"$gen_cast", {:new_intent, ^resource_1}} Intent.shutdown(supervisor) end diff --git a/test/logic_test.exs b/test/logic_test.exs index 54992c6c7..b73e03452 100644 --- a/test/logic_test.exs +++ b/test/logic_test.exs @@ -3,7 +3,7 @@ defmodule AnomaTest.Logic do alias Anoma.Logic.{Lt, Branch, Mul, Add, Inc, Neg} - alias Anoma.{Eval, PartialTx} + alias Anoma.Eval doctest(Anoma.Logic) diff --git a/test/nock_test.exs b/test/nock_test.exs index 03e2fe266..6f6ccb586 100644 --- a/test/nock_test.exs +++ b/test/nock_test.exs @@ -2,7 +2,6 @@ defmodule AnomaTest.Nock do use ExUnit.Case, async: true import Nock - import Noun doctest(Nock) diff --git a/test/node_test.exs b/test/node_test.exs index 065deefdd..3d2b05842 100644 --- a/test/node_test.exs +++ b/test/node_test.exs @@ -1,9 +1,6 @@ defmodule AnomaTest.Node do use ExUnit.Case, async: true - alias Anoma.Node.Executor.{Communicator, Primary} - alias Anoma.Node.Executor - doctest(Anoma.Node.Executor) test "node works" do diff --git a/test/noun_test.exs b/test/noun_test.exs index e11a8d5f5..36acffbc4 100644 --- a/test/noun_test.exs +++ b/test/noun_test.exs @@ -2,7 +2,6 @@ defmodule AnomaTest.Noun do use ExUnit.Case, async: true import Noun - alias Noun.Format doctest(Noun) doctest(Noun.Format) diff --git a/test/partialtx_test.exs b/test/partialtx_test.exs index 3b78a8290..c0599d4f0 100644 --- a/test/partialtx_test.exs +++ b/test/partialtx_test.exs @@ -1,10 +1,6 @@ defmodule AnomaTest.PartialTx do use ExUnit.Case, async: true - alias Anoma.Node.Communicator - - alias Anoma.Subscriber.Basic - alias Anoma.PartialTx doctest(Anoma.PartialTx) From 68b715034787ad4da7b39221fb2f0b594219b5b2 Mon Sep 17 00:00:00 2001 From: mariari Date: Thu, 21 Dec 2023 14:15:23 +0800 Subject: [PATCH 3/8] Make sure the data base starts before running tests This ensures that the db is initialized before we run any tests. This matters in a lot of topics that require the DB to be online --- test/test_helper.exs | 1 + 1 file changed, 1 insertion(+) diff --git a/test/test_helper.exs b/test/test_helper.exs index 869559e70..1bd8c6eba 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1 +1,2 @@ +Anoma.Mnesia.init() ExUnit.start() From ba2f2cafe2f0473f59d07abc834650e9281e471f Mon Sep 17 00:00:00 2001 From: mariari Date: Thu, 21 Dec 2023 13:49:49 +0800 Subject: [PATCH 4/8] Create Storage Module This module consists of two parts: 1. An ordering map which tells of the latest order in the qualification map 2. A qualification map which maps from a qualified key to the stored value Further we provide types for everything so the code can be somewhat grocked by looking at the types --- lib/anoma/storage.ex | 224 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 224 insertions(+) create mode 100644 lib/anoma/storage.ex diff --git a/lib/anoma/storage.ex b/lib/anoma/storage.ex new file mode 100644 index 000000000..a1ec9bab5 --- /dev/null +++ b/lib/anoma/storage.ex @@ -0,0 +1,224 @@ +defmodule Anoma.Storage do + @moduledoc """ + I am the Anoma Storage engine, I consist of two parts: + 1. An ordering map which tells of the latest order in the + qualification map + 2. A qualification map which maps from a qualified key to the stored + value + + ## Types + + A good way to view this is that for the `t()`, the fields for what + is stored in mnesia, are simply the order_* and qualified_* values + + type `t Anoma.Storage` to find them all. + + Please also type `t Anoma.Storage.t()` to find out more about the + central type + ## API + The important functions for this API are + - `setup/1` + - `ensure_new/1` + - `get/2` + - `get/3` + - `put/3` + - `put/4` + - `blocking_read/2` + - `blocking_read/3` + + If one wants to query the tables by hand then there are manual + functions, but beware, this is an unintended way of using the API + - `read_order/2` + - `read_at_order/3` + - `write_at_order/4` + + + Please see my testing module `AnomaTest.Storage` to learn more on + how to use me + """ + use TypedStruct + + @typedoc """ + I represent the qualified and ordered data of storage + + ## Fields + - `:qualified` - The key value value map into storage + - `:order` - A mapping from keys to the properly qualified keys + """ + typedstruct do + field(:qualified, atom(), default: Anoma.Node.Storage.Qualified) + field(:order, atom(), default: Anoma.Node.Storage.Order) + end + + @type result(res) :: {:atomic, res} | {:aborted, any()} + + @typedoc """ + The key we wish to store, also used for order lookup + """ + @type order_key() :: Noun.t() + @type order_value() :: list({any(), Noun.t(), non_neg_integer()}) + + @typedoc """ + [non_neg_integer(), Noun.t() | non_neg_integer()] + """ + @type qualified_key() :: nonempty_improper_list(any(), non_neg_integer()) + @type qualified_value() :: any() + + ############################################################ + # Creation API # + ############################################################ + + @doc """ + I setup storage with the given tables: `t()`. + + I will try to setup all values of storage, even if the first one + fails due to already being setup, we will try the others. + """ + @spec setup(t()) :: :ok | {:aborted, any()} + # If this ever gets big, turn it into a map filter situation + def setup(t) do + case {:mnesia.create_table(t.qualified, attributes: [:key, :value]), + :mnesia.create_table(t.order, attributes: [:key, :order])} do + {{:atomic, :ok}, {:atomic, :ok}} -> :ok + {a, {:atomic, :ok}} -> a + {_a____________, b} -> b + end + end + + @spec remove(t()) :: :ok | {:aborted, any()} + def remove(storage) do + :mnesia.delete_table(storage.qualified) + :mnesia.delete_table(storage.order) + end + + @spec ensure_new(t()) :: :ok | {:aborted, any()} + def ensure_new(storage) do + # We don't care about errors that can happen here + remove(storage) + setup(storage) + end + + ############################################################ + # Operations # + ############################################################ + + @spec get(t(), order_key()) :: :absent | {:ok, qualified_value()} + def get(storage, key) do + get(storage, key, false) + end + + @spec get(t(), order_key(), boolean()) :: :absent | {:ok, qualified_value()} + def get(storage, key, instrumentation) do + with {:atomic, [{_, ^key, order}]} <- read_order(storage, key), + {:atomic, [{_, [^order, ^key | 0], value}]} <- + read_at_order(storage, key, order) do + instrument(instrumentation, {:get_order, order}) + {:ok, value} + else + _ -> :absent + end + end + + @spec put(t(), order_key(), qualified_value()) :: result(:ok) + def put(storage, key, value) do + put(storage, key, value, false) + end + + @spec put(t(), order_key(), qualified_value(), boolean()) :: result(:ok) + def put(storage, key, value, instrumentation) do + with {:atomic, order} <- read_order(storage, key), + new_order = calculate_order(order), + {:atomic, :ok} <- write_at_order(storage, key, value, new_order) do + instrument(instrumentation, {:put_order, new_order}) + {:atomic, :ok} + end + end + + @spec blocking_read(t(), qualified_key()) :: :error | {:ok, any()} + def blocking_read(storage, key) do + blocking_read(storage, key, false) + end + + @spec blocking_read(t(), qualified_key(), boolean()) :: + :error | {:ok, any()} + def blocking_read(storage, key, instrumentation) do + instrument(instrumentation, {:read, key}) + + case key do + [0 | _] -> + :error + + [_ | _] -> + :mnesia.subscribe({:table, storage.qualified, :simple}) + tx = fn -> :mnesia.read(storage.qualified, key) end + {:atomic, result} = :mnesia.transaction(tx) + + case result do + [{_, ^key, value}] -> + {:ok, value} + + [] -> + receive do + {:mnesia_table_event, {:write, {_, ^key, value}, _}} -> + {:ok, value} + end + end + + _ -> + :error + end + end + + ############################################################ + # Queries # + ############################################################ + + @spec read_order(t(), order_key()) :: + result(list({atom(), Noun.t(), non_neg_integer()})) + def read_order(storage, key) do + order_tx = fn -> :mnesia.read(storage.order, key) end + :mnesia.transaction(order_tx) + end + + @spec read_at_order(t(), Noun.t(), non_neg_integer()) :: + result(list({atom(), qualified_key(), qualified_value()})) + def read_at_order(storage, key, order) do + value_tx = fn -> :mnesia.read(storage.qualified, [order, key | 0]) end + :mnesia.transaction(value_tx) + end + + @spec write_at_order(t(), Noun.t(), Noun.t(), non_neg_integer()) :: + result(:ok) + def write_at_order(storage, key, value, order) do + write_tx = fn -> + :mnesia.write({storage.order, key, order}) + + :mnesia.write({storage.qualified, [order, key | 0], value}) + end + + :mnesia.transaction(write_tx) + end + + ############################################################ + # Helpers # + ############################################################ + + @spec calculate_order([{any(), any(), number()}]) :: number() + def calculate_order([{_, _, order}]), do: order + 1 + def calculate_order([]), do: 1 + + ############################################################ + # Instrumentation # + ############################################################ + def instrument(instrument, {:get_order, order}) do + if instrument, do: IO.inspect(order, label: "getting at order") + end + + def instrument(instrument, {:put_order, order}) do + if instrument, do: IO.inspect(order, label: "putting at order") + end + + def instrument(instrument, {:read, key}) do + if instrument, do: IO.inspect(key, label: "regular blocking read key") + end +end From af75dce91053b63f9a6f770ca0fe1a162c551d18 Mon Sep 17 00:00:00 2001 From: mariari Date: Thu, 21 Dec 2023 14:10:29 +0800 Subject: [PATCH 5/8] Create a testing module for the Storage module We test: 1. reads 2. blocking_reads 3. puts 4. blocking_reads with data not in yet --- test/storage_test.exs | 100 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 test/storage_test.exs diff --git a/test/storage_test.exs b/test/storage_test.exs new file mode 100644 index 000000000..1db0de055 --- /dev/null +++ b/test/storage_test.exs @@ -0,0 +1,100 @@ +defmodule AnomaTest.Storage do + use ExUnit.Case + + alias Anoma.Storage + + doctest(Anoma.Storage) + + setup_all do + # base storage testing default + storage = %Storage{ + qualified: AnomaTest.Storage.Qualified, + order: AnomaTest.Storage.Order + } + + Storage.ensure_new(storage) + [storage: storage] + end + + describe "Direct API" do + test "Empty Storage is absent", %{storage: storage} do + testing_atom = :QWERTZ_abc + assert Storage.get(storage, testing_atom) == :absent + end + + test "Putting works", %{storage: storage} do + testing_atom = :QWERTZ_putting + assert {:atomic, :ok} = Storage.put(storage, testing_atom, 1) + assert {:ok, 1} = Storage.get(storage, testing_atom) + end + + test "block_reads work", %{storage: storage} do + testing_atom = :QWERTZ_blocking + assert {:atomic, :ok} = Storage.put(storage, testing_atom, 1) + + assert {:ok, block} = + Storage.blocking_read(storage, [1, testing_atom | 0]) + + assert {:ok, current} = Storage.get(storage, testing_atom) + assert current == block + end + + test "block_reads can read the past", %{storage: storage} do + testing_atom = :QWERTZ_blocking_past + assert {:atomic, :ok} = Storage.put(storage, testing_atom, 1) + assert {:atomic, :ok} = Storage.put(storage, testing_atom, 2) + + assert {:ok, bl_1} = + Storage.blocking_read(storage, [1, testing_atom | 0]) + + assert {:ok, bl_2} = + Storage.blocking_read(storage, [2, testing_atom | 0]) + + assert {:ok, curr} = Storage.get(storage, testing_atom) + assert curr == bl_2 + assert bl_1 + 1 == bl_2 + end + + test "blocking_reads must accept position indicators", %{storage: s} do + assert Storage.blocking_read(s, :Centuri_Republic) == :error + end + + test "blocking_reads really do block", %{storage: storage} do + testing_atom = System.unique_integer() + home = self() + + pid = + spawn(fn -> + assert {:ok, value} = + Storage.blocking_read(storage, [1, testing_atom | 0]) + + send(home, {:read, value}) + end) + + assert Process.alive?(pid) == true + Storage.put(storage, testing_atom, 1) + assert_receive {:read, 1}, 100 + assert Process.alive?(pid) == false + end + end + + describe "Querying by hand" do + test "Reading at a known order gives results", %{storage: storage} do + testing_atom = 750_089_999 + Storage.write_at_order(storage, testing_atom, 10, 3) + + assert Storage.read_at_order(storage, testing_atom, 3) == + {:atomic, [{storage.qualified, [3, testing_atom | 0], 10}]} + end + + test "Writing at an order gives us the same testing order", %{ + storage: storage + } do + testing_atom = 999_888_777_666 + Storage.write_at_order(storage, testing_atom, 10, 3) + + assert Storage.read_order(storage, testing_atom) == + {:atomic, [{storage.order, testing_atom, 3}]} + end + end +end From e5ae75f6826fefb0979aa6338e1243711414d83a Mon Sep 17 00:00:00 2001 From: mariari Date: Thu, 21 Dec 2023 18:00:19 +0800 Subject: [PATCH 6/8] Create a base ordering storage The intended strategy is to hold a current order and cache of ids to orders, to better get timestamps of the keyspace This is highly inefficient but is the best we can do without partitioning the key_space explicitly somehow --- lib/anoma/node/storage.ex | 22 ++++ lib/anoma/node/storage/communicator.ex | 85 +++++++++++++++ lib/anoma/node/storage/ordering.ex | 143 +++++++++++++++++++++++++ 3 files changed, 250 insertions(+) create mode 100644 lib/anoma/node/storage.ex create mode 100644 lib/anoma/node/storage/communicator.ex create mode 100644 lib/anoma/node/storage/ordering.ex diff --git a/lib/anoma/node/storage.ex b/lib/anoma/node/storage.ex new file mode 100644 index 000000000..a748b864c --- /dev/null +++ b/lib/anoma/node/storage.ex @@ -0,0 +1,22 @@ +defmodule Anoma.Node.Storage do + use Supervisor + + def start_link(init_state) do + Supervisor.start_link(__MODULE__, init_state) + end + + def init(name: name) do + init(name: name, table: %Anoma.Storage{}) + end + + def init(name: name, table: table) do + children = [ + {Anoma.Node.Storage.Communicator, name: name}, + {Anoma.Node.Storage.Ordering, name: name, table: table} + ] + + Supervisor.init(children, strategy: :one_for_all) + end + + def shutdown(supervisor), do: Supervisor.stop(supervisor, :normal) +end diff --git a/lib/anoma/node/storage/communicator.ex b/lib/anoma/node/storage/communicator.ex new file mode 100644 index 000000000..a490fddfb --- /dev/null +++ b/lib/anoma/node/storage/communicator.ex @@ -0,0 +1,85 @@ +defmodule Anoma.Node.Storage.Communicator do + @moduledoc """ + + I am the communicator for the Ordering Node, please read my Ordering + node to know more about how my API works + + """ + + use GenServer + use TypedStruct + alias __MODULE__ + alias Anoma.Node.Storage.Ordering + alias Anoma.Node.Utility + + typedstruct do + field(:primary, atom(), require: true) + field(:subscribers, MapSet.t(GenServer.server()), default: MapSet.new()) + end + + def init(name: name) do + {:ok, %Communicator{primary: name}} + end + + def start_link(arg) do + GenServer.start_link( + __MODULE__, + arg, + Utility.name(arg, &Utility.com_name/1) + ) + end + + ############################################################ + # Public RPC API # + ############################################################ + + @spec state(GenServer.server()) :: Ordering.t() + defdelegate state(ordering), to: Ordering + + @spec next_order(GenServer.server()) :: non_neg_integer() + defdelegate next_order(ordering), to: Ordering + + @spec true_order(GenServer.server(), any()) :: non_neg_integer() | nil + defdelegate true_order(ordering, id), to: Ordering + + @spec new_order(GenServer.server(), Ordering.ordered_transactions()) :: + :error | {:ok, any()} + defdelegate new_order(ordering, ordered), to: Ordering + + @spec new_order( + GenServer.server(), + Ordering.ordered_transactions(), + boolean() + ) :: + :error | {:ok, any()} + defdelegate new_order(ordering, ordered, instrumentation), to: Ordering + + @spec reset(GenServer.server()) :: :ok + defdelegate reset(ordering), to: Ordering + ############################################################ + # Genserver Behavior # + ############################################################ + + # Please give things to the subscribers + + def handle_call(:state, _from, com) do + {:reply, Ordering.state(com.primary), com} + end + + def handle_call(:next_order, _from, com) do + {:reply, Ordering.next_order(com.primary), com} + end + + def handle_call({:true_order, id}, _from, com) do + {:reply, Ordering.true_order(com.primary, id), com} + end + + def handle_call({:new_order, trans, instrumentation}, _from, com) do + {:reply, Ordering.new_order(com.primary, trans, instrumentation), com} + end + + def handle_cast(:reset, state) do + Ordering.reset(state.primary()) + {:noreply, state} + end +end diff --git a/lib/anoma/node/storage/ordering.ex b/lib/anoma/node/storage/ordering.ex new file mode 100644 index 000000000..2c186c28b --- /dev/null +++ b/lib/anoma/node/storage/ordering.ex @@ -0,0 +1,143 @@ +defmodule Anoma.Node.Storage.Ordering do + @moduledoc """ + I am a simple mnesia-backed key-value store in an anoma node. + + Currently we do not have a way of propagating what keys users want + to store, thus we take the following approach to deterministic state + reading: + + 1. We keep a next_order, which represents the next_order a + transaction will have. From here the scry reads specifically + [next_order, :key_space | 0] to get a map of the current keys + saved for this node. + + 2. We keep a hash_to_order to cache the id => order mapping + """ + + use TypedStruct + use GenServer + alias Anoma.Node.Utility + alias Anoma.{Storage, Order} + alias __MODULE__ + + @type ordered_transactions() :: + list(Order.t()) + + @type key() :: any() + + typedstruct do + field(:table, Storage.t(), default: %Anoma.Storage{}) + field(:next_order, non_neg_integer(), default: 1) + field(:hash_to_order, %{key() => non_neg_integer()}, default: %{}) + end + + def init(opts) do + return = %Ordering{table: opts[:table]} + # idempotent + Storage.setup(return.table) + :mnesia.subscribe({:table, return.table.qualified, :simple}) + {:ok, return} + end + + @spec start_link(keyword()) :: :ignore | {:error, any()} | {:ok, pid()} + def start_link(arg) do + GenServer.start_link(__MODULE__, arg, Utility.name(arg)) + end + + ############################################################ + # Public RPC API # + ############################################################ + + @spec state(GenServer.server()) :: t() + def state(ordering) do + GenServer.call(ordering, :state) + end + + @spec next_order(GenServer.server()) :: non_neg_integer() + def next_order(ordering) do + GenServer.call(ordering, :next_order) + end + + @spec true_order(GenServer.server(), any()) :: non_neg_integer() | nil + def true_order(ordering, id) do + GenServer.call(ordering, {:true_order, id}) + end + + @spec new_order(GenServer.server(), ordered_transactions()) :: + :error | {:ok, any()} + def new_order(ordering, ordered_transactions) do + new_order(ordering, ordered_transactions, false) + end + + @spec new_order(GenServer.server(), ordered_transactions(), boolean()) :: + :error | {:ok, any()} + def new_order(ordering, ordered, instrumentation) do + GenServer.call(ordering, {:new_order, ordered, instrumentation}) + end + + @spec reset(GenServer.server()) :: :ok + def reset(ordering) do + GenServer.cast(ordering, :reset) + end + + ############################################################ + # Genserver Behavior # + ############################################################ + + def handle_call(:state, _from, state) do + {:reply, state, state} + end + + def handle_call(:next_order, _from, state) do + {:reply, state.next_order, state} + end + + def handle_call({:true_order, id}, _from, state) do + {:reply, Map.get(state.hash_to_order, id), state} + end + + def handle_call({:new_order, trans, instrumentation}, _from, state) do + {next_order, new_map} = handle_new_order(trans, state, instrumentation) + + {:reply, :ok, %{state | next_order: next_order, hash_to_order: new_map}} + end + + def handle_cast(:reset, state) do + {:noreply, %Ordering{table: state.table}} + end + + ############################################################ + # Genserver Implementation # + ############################################################ + + @spec handle_new_order(ordered_transactions(), t(), boolean()) :: + {non_neg_integer(), %{key() => non_neg_integer()}} + def handle_new_order(ordered_transactions, state, instrumentation) do + num_txs = length(ordered_transactions) + instrument(instrumentation, {:new_tx, num_txs}) + + for order <- ordered_transactions do + instrument(instrumentation, {:ready, Order.pid(order)}) + send(Order.pid(order), {:read_ready, Order.index(order)}) + end + + new_next_order = state.next_order + length(ordered_transactions) + + new_map_elements = + Map.new(ordered_transactions, &{Order.id(&1), Order.index(&1)}) + + new_map = Map.merge(state.hash_to_order, new_map_elements) + {new_next_order, new_map} + end + + ############################################################ + # Instrumentation # + ############################################################ + def instrument(instrument, {:new_tx, num_txs}) do + if instrument, do: IO.inspect(num_txs, label: "new tx count") + end + + def instrument(instrument, {:ready, pid}) do + if instrument, do: IO.inspect(pid, label: "sending read ready to pid") + end +end From 6e3b7cc676ce9d250ad47fce2a3ede586d97ea5a Mon Sep 17 00:00:00 2001 From: mariari Date: Fri, 22 Dec 2023 14:24:53 +0800 Subject: [PATCH 7/8] Create tests for the Storage node As we can see by running: mix test --cover we end up testing most of the module, with just a few basic calls, the ordering system is quite simple, and one can just take a number to figure out where they are next in line Percentage | Module -----------|-------------------------- 60.00% | Anoma.Node.Storage 92.86% | Anoma.Node.Storage.Communicator 96.43% | Anoma.Node.Storage.Ordering --- test/node/storage_test.exs | 76 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 test/node/storage_test.exs diff --git a/test/node/storage_test.exs b/test/node/storage_test.exs new file mode 100644 index 000000000..c97a70dd2 --- /dev/null +++ b/test/node/storage_test.exs @@ -0,0 +1,76 @@ +defmodule AnomaTest.Node.Storage do + use ExUnit.Case + + alias Anoma.Order + alias Anoma.Node.Storage.{Communicator, Ordering} + + doctest(Anoma.Node.Storage) + doctest(Anoma.Node.Storage.Communicator) + doctest(Anoma.Node.Storage.Ordering) + + setup_all do + # base storage testing default + storage = %Anoma.Storage{ + qualified: AnomaTest.Node.Storage.Qualified, + order: AnomaTest.Node.Storage.Order + } + + ordering = :node_storage_com + + unless Process.whereis(ordering) do + Anoma.Node.Storage.start_link(name: :node_storage, table: storage) + end + + [ordering: ordering] + end + + test "reset works", %{ordering: ordering} do + Communicator.new_order(ordering, [Order.new(1, <<3>>, self())]) + Communicator.reset(ordering) + ordering = Communicator.state(ordering) + assert ordering.hash_to_order == %{} + assert ordering.next_order == 1 + end + + test "added to the hash_ordering", %{ordering: ordering} do + Communicator.reset(ordering) + + Communicator.new_order( + ordering, + [Order.new(1, <<3>>, self()), Order.new(1, <<3>>, self())], + false + ) + + ordering = Communicator.state(ordering) + assert ordering.hash_to_order == %{<<3>> => 1} + end + + test "getting proper offsets", %{ordering: ordering} do + Communicator.reset(ordering) + assert 1 == Communicator.next_order(ordering) + + Communicator.new_order(ordering, [ + Order.new(1, <<3>>, self()), + Order.new(2, <<3>>, self()) + ]) + + assert 3 == Communicator.next_order(ordering) + end + + test "receiving read readies", %{ordering: ordering} do + Communicator.new_order(ordering, [ + Order.new(3, <<3>>, self()), + Order.new(4, <<3>>, self()) + ]) + + assert_receive {:read_ready, 3} + assert_receive {:read_ready, 4} + end + + test "we properly cache orders", %{ordering: ordering} do + Communicator.reset(ordering) + assert Communicator.true_order(ordering, <<3>>) == nil + Communicator.new_order(ordering, [Order.new(1, <<3>>, self())]) + assert Communicator.true_order(ordering, <<3>>) == 1 + end +end From 2e1272a433960fa19916c90378c8a83c1e1c64f2 Mon Sep 17 00:00:00 2001 From: mariari Date: Fri, 22 Dec 2023 16:58:58 +0800 Subject: [PATCH 8/8] Create snapshot functionality With this, we have a very slow way of effectively slowly snapshotting the database. Very slow, someone should do a better method than what I'm doing here --- lib/anoma/storage.ex | 49 +++++++++++++++++++++++++++++++++++++++++++ test/storage_test.exs | 26 +++++++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/lib/anoma/storage.ex b/lib/anoma/storage.ex index a1ec9bab5..adebc0922 100644 --- a/lib/anoma/storage.ex +++ b/lib/anoma/storage.ex @@ -35,7 +35,16 @@ defmodule Anoma.Storage do Please see my testing module `AnomaTest.Storage` to learn more on how to use me + + ### Snapshots + One can snapshot the keys provided in the code by running the following + + - `snapshot_order/1` + - `put_snapshot/2` + - `in_snapshot/2` + - `get_at_snapshot/2` """ + use TypedStruct @typedoc """ @@ -64,6 +73,8 @@ defmodule Anoma.Storage do @type qualified_key() :: nonempty_improper_list(any(), non_neg_integer()) @type qualified_value() :: any() + @type snapshot() :: {t(), list({order_key(), non_neg_integer()})} + ############################################################ # Creation API # ############################################################ @@ -169,6 +180,44 @@ defmodule Anoma.Storage do end end + ############################################################ + # Snapshots # + ############################################################ + + @spec snapshot_order(t()) :: result(snapshot()) + def snapshot_order(storage) do + :mnesia.transaction(fn -> + snapshot = [{{:"$1", :"$2", :"$3"}, [], [{{:"$2", :"$3"}}]}] + {storage, :mnesia.select(storage.order, snapshot)} + end) + end + + @spec put_snapshot(t(), order_key()) :: result(:ok) + def put_snapshot(storage, key) do + with {:atomic, snapshot} <- snapshot_order(storage) do + put(storage, key, snapshot) + end + end + + @spec in_snapshot(snapshot(), order_key()) :: nil | non_neg_integer() + def in_snapshot({_, snapshot}, key) do + List.keyfind(snapshot, key, 0, {nil, nil}) + |> elem(1) + end + + @spec get_at_snapshot(snapshot(), order_key()) :: + :absent | {:ok, qualified_value()} + def get_at_snapshot(snapshot = {storage, _}, key) do + position = in_snapshot(snapshot, key) + + with {:atomic, [{_, [^position, ^key | 0], value}]} <- + read_at_order(storage, key, position) do + {:ok, value} + else + _ -> :absent + end + end + ############################################################ # Queries # ############################################################ diff --git a/test/storage_test.exs b/test/storage_test.exs index 1db0de055..f81cb259b 100644 --- a/test/storage_test.exs +++ b/test/storage_test.exs @@ -97,4 +97,30 @@ defmodule AnomaTest.Storage do {:atomic, [{storage.order, testing_atom, 3}]} end end + + describe "Snapshots" do + test "snapshots properly put", %{storage: storage} do + snapshot_storage = :snapshot_super_secret + assert {:atomic, :ok} = Storage.put_snapshot(storage, snapshot_storage) + assert {:ok, _} = Storage.get(storage, snapshot_storage) + end + + test "snapshots properly get the latest", %{storage: storage} do + snapshot_storage = :super_hot + testing_atom = 111_222_333_444_555_666 + Storage.write_at_order(storage, testing_atom, 10, 3) + assert {:atomic, :ok} = Storage.put_snapshot(storage, snapshot_storage) + assert {:ok, snapshot} = Storage.get(storage, snapshot_storage) + assert Storage.in_snapshot(snapshot, testing_atom) == 3 + + assert Storage.get_at_snapshot(snapshot, testing_atom) == + {:ok, 10} + end + + test "missing key gives us nil", %{storage: storage} do + nonsense_atom = :very_good_atom + assert {:atomic, snapshot} = Storage.snapshot_order(storage) + assert Storage.get_at_snapshot(snapshot, nonsense_atom) + end + end end