Skip to content

Commit

Permalink
Create a base ordering storage
Browse files Browse the repository at this point in the history
The intended strategy is to hold a current order and cache of ids to
orders, to better get timestamps of the keyspace

This is highly inefficient but is the best we can do without
partitioning the key_space explicitly somehow
  • Loading branch information
mariari committed Dec 21, 2023
1 parent 11ccf90 commit 082eb38
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 0 deletions.
22 changes: 22 additions & 0 deletions lib/anoma/node/storage.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule Anoma.Node.Storage do
use Supervisor

def start_link(init_state) do
Supervisor.start_link(__MODULE__, init_state)
end

def init(name) do
init(name, %Anoma.Storage{})
end

def init(name, table) do
children = [
{Anoma.Node.Storage.Communicator, name: name, table: table},
{Anoma.Node.Storage.Ordering, name: name, table: table}
]

Supervisor.init(children, strategy: :one_for_one)
end

def shutdown(supervisor), do: Supervisor.stop(supervisor, :normal)
end
86 changes: 86 additions & 0 deletions lib/anoma/node/storage/communicator.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
defmodule Anoma.Node.Storage.Communicator do
@moduledoc """
"""

use GenServer
use TypedStruct
alias __MODULE__
alias Anoma.Node.Storage.Ordering
alias Anoma.Node.Utility

typedstruct do
field(:primary, atom(), require: true)
field(:subscribers, MapSet.t(GenServer.server()), default: MapSet.new())
end

def init(name: name) do
{:ok, %Communicator{primary: name}}
end

def start_link(arg) do
GenServer.start_link(
__MODULE__,
arg,
Utility.name(arg, &Utility.com_name/1)
)
end

############################################################
# Public RPC API #
############################################################

@spec state(GenServer.server()) :: Ordering.t()
defdelegate state(ordering), to: Ordering

@spec next_order(GenServer.server()) :: non_neg_integer()
defdelegate next_order(ordering), to: Ordering

@spec true_order(GenServer.server(), any()) :: non_neg_integer()
defdelegate true_order(ordering, id), to: Ordering

@spec new_order(GenServer.server(), Ordering.ordered_transactions()) ::
:error | {:ok, any()}
defdelegate new_order(ordering, ordered), to: Ordering

@spec new_order(
GenServer.server(),
Ordering.ordered_transactions(),
boolean()
) ::
:error | {:ok, any()}
defdelegate new_order(ordering, ordered, instrumentation), to: Ordering

############################################################
# Genserver Behavior #
############################################################

# Please give things to the subscribers

def handle_call(:state, _from, com) do
{:reply, Ordering.state(com.primary), com}
end

def handle_call(:next_order, _from, com) do
{:reply, Ordering.next_order(com.primary), com}
end

def handle_call({:true_order, id}, _from, com) do
{:reply, Ordering.true_order(com.primary, id), com}
end

def handle_call({:new_order, trans, instrumentation}, _from, com) do
{:reply, Ordering.new_order(com.primary, trans, instrumentation), com}
end

############################################################
# Genserver Implementation #
############################################################

# # make this more interesting later
# @spec broadcast_transactions(t(), Enumerable.t(PartialTx.t())) ::
# Primary.response()
# defp broadcast_transactions(agent, trans) do
# Ordering.new_transactions(agent.primary, trans)
# end
end
134 changes: 134 additions & 0 deletions lib/anoma/node/storage/ordering.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
defmodule Anoma.Node.Storage.Ordering do
@moduledoc """
I am a simple mnesia-backed key-value store in an anoma node.
Currently we do not have a way of propagating what keys users want
to store, thus we take the following approach to deterministic state
reading:
1. We keep a next_order, which represents the next_order a
transaction will have. From here the scry reads specifically
[next_order, :key_space | 0] to get a map of the current keys
saved for this node.
2. We keep a hash_to_order to cache the id => order mapping
"""

use TypedStruct
use GenServer
alias Anoma.Node.Utility
alias Anoma.Storage
alias __MODULE__

@type ordered_transactions() ::
list({non_neg_integer(), {binary(), pid()}})

@type key() :: any()

typedstruct do
field(:table, Storage.t(), default: %Anoma.Storage{})
field(:next_order, non_neg_integer(), default: 1)
field(:hash_to_order, %{key() => non_neg_integer()}, default: %{})
end

def init(opts) do
return = %Ordering{table: opts[:table]}
# idempotent
Storage.setup(return.table)
:mnesia.subscribe({:table, return.table.qualified, :simple})
{:ok, return}
end

@spec start_link(keyword()) :: :ignore | {:error, any()} | {:ok, pid()}
def start_link(arg) do
GenServer.start_link(__MODULE__, arg, Utility.name(arg))
end

############################################################
# Public RPC API #
############################################################

@spec state(GenServer.server()) :: t()
def state(ordering) do
GenServer.call(ordering, :state)
end

@spec next_order(GenServer.server()) :: non_neg_integer()
def next_order(ordering) do
GenServer.call(ordering, :next_order)
end

@spec true_order(GenServer.server(), any()) :: non_neg_integer()
def true_order(ordering, id) do
GenServer.call(ordering, {:true_order, id})
end

@spec new_order(GenServer.server(), ordered_transactions()) ::
:error | {:ok, any()}
def new_order(ordering, ordered_transactions) do
new_order(ordering, ordered_transactions, false)
end

@spec new_order(GenServer.server(), ordered_transactions(), boolean()) ::
:error | {:ok, any()}
def new_order(ordering, ordered, instrumentation) do
GenServer.call(ordering, {:new_order, ordered, instrumentation})
end

############################################################
# Genserver Behavior #
############################################################

def handle_call(:state, _from, state) do
{:reply, state, state}
end

def handle_call(:next_order, _from, state) do
{:reply, state.next_order, state}
end

def handle_call({:true_order, id}, _from, state) do
{:reply, Map.get(state.hash_to_order, id), state}
end

def handle_call({:new_order, trans, instrumentation}, _from, state) do
{next_order, new_map} = handle_new_order(trans, state, instrumentation)

{:reply, :ok, %{state | next_order: next_order, hash_to_order: new_map}}
end

############################################################
# Genserver Implementation #
############################################################

@spec handle_new_order(ordered_transactions(), t(), boolean()) ::
{non_neg_integer(), %{key() => non_neg_integer()}}
def handle_new_order(ordered_transactions, state, instrumentation) do
num_txs = length(ordered_transactions)
instrument(instrumentation, {:new_tx, num_txs})

for {index, {_id, pid}} <- ordered_transactions do
instrument(instrumentation, {:ready, pid})
send(pid, {:read_ready, index})
end

new_next_order = state.next_order + length(ordered_transactions)

new_map_elements =
Map.new(ordered_transactions, fn {index, {id, pid}} -> {id, index} end)

Check warning on line 118 in lib/anoma/node/storage/ordering.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (topic)

variable "pid" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 118 in lib/anoma/node/storage/ordering.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (topic)

variable "pid" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 118 in lib/anoma/node/storage/ordering.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (topic)

variable "pid" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 118 in lib/anoma/node/storage/ordering.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (next)

variable "pid" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 118 in lib/anoma/node/storage/ordering.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (next)

variable "pid" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 118 in lib/anoma/node/storage/ordering.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (next)

variable "pid" is unused (if the variable is not meant to be used, prefix it with an underscore)

new_map = Map.merge(state.hash_to_order, new_map_elements)
{new_next_order, new_map}
end

############################################################
# Instrumentation #
############################################################
def instrument(instrument, {:new_tx, num_txs}) do
if instrument, do: IO.inspect(num_txs, label: "new tx count")
end

def instrument(instrument, {:ready, pid}) do
if instrument, do: IO.inspect(pid, label: "sending read ready to pid")
end
end

0 comments on commit 082eb38

Please sign in to comment.