Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshotting Storage #97

Merged
merged 9 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions lib/anoma/node/storage.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule Anoma.Node.Storage do
use Supervisor

def start_link(init_state) do
Supervisor.start_link(__MODULE__, init_state)
end

def init(name: name) do
init(name: name, table: %Anoma.Storage{})
end

def init(name: name, table: table) do
children = [
{Anoma.Node.Storage.Communicator, name: name},
{Anoma.Node.Storage.Ordering, name: name, table: table}
]

Supervisor.init(children, strategy: :one_for_all)
end

def shutdown(supervisor), do: Supervisor.stop(supervisor, :normal)
end
85 changes: 85 additions & 0 deletions lib/anoma/node/storage/communicator.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
defmodule Anoma.Node.Storage.Communicator do
@moduledoc """

I am the communicator for the Ordering Node, please read my Ordering
node to know more about how my API works

"""

use GenServer
use TypedStruct
alias __MODULE__
alias Anoma.Node.Storage.Ordering
alias Anoma.Node.Utility

typedstruct do
field(:primary, atom(), require: true)
field(:subscribers, MapSet.t(GenServer.server()), default: MapSet.new())
end

def init(name: name) do
{:ok, %Communicator{primary: name}}
end

def start_link(arg) do
GenServer.start_link(
__MODULE__,
arg,
Utility.name(arg, &Utility.com_name/1)
)
end

############################################################
# Public RPC API #
############################################################

@spec state(GenServer.server()) :: Ordering.t()
defdelegate state(ordering), to: Ordering

@spec next_order(GenServer.server()) :: non_neg_integer()
defdelegate next_order(ordering), to: Ordering

@spec true_order(GenServer.server(), any()) :: non_neg_integer() | nil
defdelegate true_order(ordering, id), to: Ordering

@spec new_order(GenServer.server(), Ordering.ordered_transactions()) ::
:error | {:ok, any()}
defdelegate new_order(ordering, ordered), to: Ordering

@spec new_order(
GenServer.server(),
Ordering.ordered_transactions(),
boolean()
) ::
:error | {:ok, any()}
defdelegate new_order(ordering, ordered, instrumentation), to: Ordering

@spec reset(GenServer.server()) :: :ok
defdelegate reset(ordering), to: Ordering
############################################################
# Genserver Behavior #
############################################################

# Please give things to the subscribers

def handle_call(:state, _from, com) do
{:reply, Ordering.state(com.primary), com}
end

def handle_call(:next_order, _from, com) do
{:reply, Ordering.next_order(com.primary), com}
end

def handle_call({:true_order, id}, _from, com) do
{:reply, Ordering.true_order(com.primary, id), com}
end

def handle_call({:new_order, trans, instrumentation}, _from, com) do
{:reply, Ordering.new_order(com.primary, trans, instrumentation), com}
end

def handle_cast(:reset, state) do
Ordering.reset(state.primary())
{:noreply, state}
end
end
143 changes: 143 additions & 0 deletions lib/anoma/node/storage/ordering.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
defmodule Anoma.Node.Storage.Ordering do
@moduledoc """
I am a simple mnesia-backed key-value store in an anoma node.

Currently we do not have a way of propagating what keys users want
to store, thus we take the following approach to deterministic state
reading:

1. We keep a next_order, which represents the next_order a
transaction will have. From here the scry reads specifically
[next_order, :key_space | 0] to get a map of the current keys
saved for this node.

2. We keep a hash_to_order to cache the id => order mapping
"""

use TypedStruct
use GenServer
alias Anoma.Node.Utility
alias Anoma.{Storage, Order}
alias __MODULE__

@type ordered_transactions() ::
list(Order.t())

@type key() :: any()

typedstruct do
field(:table, Storage.t(), default: %Anoma.Storage{})
field(:next_order, non_neg_integer(), default: 1)
field(:hash_to_order, %{key() => non_neg_integer()}, default: %{})
end

def init(opts) do
return = %Ordering{table: opts[:table]}
# idempotent
Storage.setup(return.table)
:mnesia.subscribe({:table, return.table.qualified, :simple})
{:ok, return}
end

@spec start_link(keyword()) :: :ignore | {:error, any()} | {:ok, pid()}
def start_link(arg) do
GenServer.start_link(__MODULE__, arg, Utility.name(arg))
end

############################################################
# Public RPC API #
############################################################

@spec state(GenServer.server()) :: t()
def state(ordering) do
GenServer.call(ordering, :state)
end

@spec next_order(GenServer.server()) :: non_neg_integer()
def next_order(ordering) do
GenServer.call(ordering, :next_order)
end

@spec true_order(GenServer.server(), any()) :: non_neg_integer() | nil
def true_order(ordering, id) do
GenServer.call(ordering, {:true_order, id})
end

@spec new_order(GenServer.server(), ordered_transactions()) ::
:error | {:ok, any()}
def new_order(ordering, ordered_transactions) do
new_order(ordering, ordered_transactions, false)
end

@spec new_order(GenServer.server(), ordered_transactions(), boolean()) ::
:error | {:ok, any()}
def new_order(ordering, ordered, instrumentation) do
GenServer.call(ordering, {:new_order, ordered, instrumentation})
end

@spec reset(GenServer.server()) :: :ok
def reset(ordering) do
GenServer.cast(ordering, :reset)
end

############################################################
# Genserver Behavior #
############################################################

def handle_call(:state, _from, state) do
{:reply, state, state}
end

def handle_call(:next_order, _from, state) do
{:reply, state.next_order, state}
end

def handle_call({:true_order, id}, _from, state) do
{:reply, Map.get(state.hash_to_order, id), state}
end

def handle_call({:new_order, trans, instrumentation}, _from, state) do
{next_order, new_map} = handle_new_order(trans, state, instrumentation)

{:reply, :ok, %{state | next_order: next_order, hash_to_order: new_map}}
end

def handle_cast(:reset, state) do
{:noreply, %Ordering{table: state.table}}
end

############################################################
# Genserver Implementation #
############################################################

@spec handle_new_order(ordered_transactions(), t(), boolean()) ::
{non_neg_integer(), %{key() => non_neg_integer()}}
def handle_new_order(ordered_transactions, state, instrumentation) do
num_txs = length(ordered_transactions)
instrument(instrumentation, {:new_tx, num_txs})

for order <- ordered_transactions do
instrument(instrumentation, {:ready, Order.pid(order)})
send(Order.pid(order), {:read_ready, Order.index(order)})
end

new_next_order = state.next_order + length(ordered_transactions)

new_map_elements =
Map.new(ordered_transactions, &{Order.id(&1), Order.index(&1)})

new_map = Map.merge(state.hash_to_order, new_map_elements)
{new_next_order, new_map}
end

############################################################
# Instrumentation #
############################################################
def instrument(instrument, {:new_tx, num_txs}) do
if instrument, do: IO.inspect(num_txs, label: "new tx count")
end

def instrument(instrument, {:ready, pid}) do
if instrument, do: IO.inspect(pid, label: "sending read ready to pid")
end
end
38 changes: 38 additions & 0 deletions lib/anoma/order.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule Anoma.Order do
@moduledoc """
I am the Order module, I am the format responsible for making sure
that the given process is told that it can be read or executed at
the given index
"""
use TypedStruct
alias __MODULE__

@typedoc """
I am the Order type. I have information regarding a correct order
and having a proper host to ping to notify when it can do a certain
task.

### Fields
- `index` - the ordering index to execute at
- `id` - the identification key path of the requested key
- `pid` - the process identifier to message
"""
typedstruct require: true do
field(:index, non_neg_integer())
field(:id, any())
field(:pid, pid())
end

def new(index, id, pid) do
%Order{index: index, id: id, pid: pid}
end

@spec index(t()) :: non_neg_integer()
def index(t), do: t.index

@spec id(t()) :: any()
def id(t), do: t.id

@spec pid(t()) :: pid()
def pid(t), do: t.pid
end
22 changes: 6 additions & 16 deletions lib/anoma/partialtx.ex
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,18 @@ defmodule Anoma.PartialTx do
# Helpers
######################################################################

@doc """
I update the resource set with the new resource
# I update the resource set with the new resource

# ### Parameters

### Parameters
# - resource_set (resource_set()) - the resource set we are updating

- resource_set (resource_set()) - the resource set we are updating
# - new_resource (Resource.t()) - the resource we are adding

- new_resource (Resource.t()) - the resource we are adding
# ### Returns

### Returns
# the updated resource set

the updated resource set
"""
@spec update_resource_set(resources, Resource.t()) :: resources
defp update_resource_set(resource_set, new_resource) do
denom = Resource.denomination(new_resource)
Expand All @@ -121,14 +119,6 @@ defmodule Anoma.PartialTx do
defp add_quantities(resources) do
Enum.reduce(resources, 0, fn x, acc -> x.quantity + acc end)
end

@spec sub_quantities(Resource.t(), Resource.t()) :: Resource.t()
defp sub_quantities(resource_1, resource_2) do
%Resource{
resource_1
| quantity: resource_1.quantity - resource_2.quantity
}
end
end

defimpl Anoma.Intent, for: Anoma.PartialTx do
Expand Down
Loading
Loading