Skip to content

Commit

Permalink
demo tx execution
Browse files Browse the repository at this point in the history
  • Loading branch information
juped authored and mariari committed Dec 20, 2023
1 parent 73bfd7d commit 62c3518
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 1 deletion.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,9 @@ anoma-*.tar
*.*~
\#*\#

# Mnesia
Mnesia*/


# VSCode git ignore
.vscode/
6 changes: 6 additions & 0 deletions lib/anoma/mnesia.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ defmodule Anoma.Mnesia do
Anoma.Block.create_table()
end

def attach() do
:mnesia.stop()
:mnesia.start()
:mnesia_rocksdb.register()
end

# TODO Present the table nicely
@doc """
Expand Down
3 changes: 2 additions & 1 deletion lib/anoma/node/executor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ defmodule Anoma.Node.Executor do
def init(name) do
children = [
{Anoma.Node.Executor.Communicator, name: name},
{Anoma.Node.Executor.Primary, init: [], name: name}
{Anoma.Node.Executor.Primary, init: [], name: name},
{DynamicSupervisor, name: Anoma.Node.WorkerPool, strategy: :one_for_one}
]

Supervisor.init(children, strategy: :one_for_one)
Expand Down
49 changes: 49 additions & 0 deletions lib/anoma/node/ordering.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
defmodule Anoma.Node.Ordering do
@moduledoc """
Dummy ordering service.
"""

use GenServer

def start_link() do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end

def init(_opts) do
{:ok, 1}
end

def handle_call(_msg, _from, state) do
{:reply, :ok, state}
end

def handle_cast({:tx, tx_code}, state) do
spawn(Anoma.Node.Worker, :run, [state - 1, tx_code])
{:noreply, state + 1}
end

def handle_cast({:inject, order, tx_code}, state) do
spawn(Anoma.Node.Worker, :run, [order - 1, tx_code])
{:noreply, state}
end

def handle_cast(:reset, _state) do
{:noreply, 1}
end

def handle_cast(_msg, state) do
{:noreply, state}
end

def tx(tx_code) do
GenServer.cast(__MODULE__, {:tx, tx_code})
end

def inject_tx(order, tx_code) do
GenServer.cast(__MODULE__, {:inject, order, tx_code})
end

def reset() do
GenServer.cast(__MODULE__, :reset)
end
end
117 changes: 117 additions & 0 deletions lib/anoma/node/storage.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
defmodule Anoma.Node.Storage do
@moduledoc """
I am a simple mnesia-backed key-value store in an anoma node.
"""

@dialyzer :no_improper_lists

@behaviour GenServer

def start_link() do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end

def init(_opts) do
{:ok, :rocksdb_copies} = Anoma.Mnesia.attach()
# idempotent
create_tables()
:mnesia.subscribe({:table, Anoma.Node.Storage.Qualified, :simple})
{:ok, %{}}
end

def handle_call({:get, key}, _from, state) do
value = blocking_read(key)
{:reply, value, state}
end

def handle_call(_msg, _from, state) do
{:reply, :ok, state}
end

def handle_cast(_msg, state) do
{:noreply, state}
end

def handle_info(msg, state) do
IO.inspect(msg, label: "message to storage")
{:noreply, state}
end

def blocking_read(key) do
case key do
[0 | _] ->
:error

[_ | _] ->
:mnesia.subscribe({:table, Anoma.Node.Storage.Qualified, :simple})
tx = fn -> :mnesia.read(Anoma.Node.Storage.Qualified, key) end
{:atomic, result} = :mnesia.transaction(tx)

case result do
[{_, ^key, value}] ->
{:ok, value}

[] ->
receive do
{:mnesia_table_event, {:write, {Anoma.Node.Storage.Qualified, ^key, value}, _}} ->
{:ok, value}
end
end

_ ->
:error
end
end

def get(key) do
maybe_order_tx = fn -> :mnesia.read(Anoma.Node.Storage.Order, key) end
{:atomic, maybe_order} = :mnesia.transaction(maybe_order_tx)

case maybe_order do
[{_, ^key, order}] ->
IO.inspect(order, label: "getting at order")
value_tx = fn -> :mnesia.read(Anoma.Node.Storage.Qualified, [order, key | 0]) end
{:atomic, [{_, [^order, ^key | 0], value}]} = :mnesia.transaction(value_tx)
{:ok, value}

[] ->
:absent
end
end

def put(key, value) do
maybe_order_tx = fn -> :mnesia.read(Anoma.Node.Storage.Order, key) end
{:atomic, maybe_order} = :mnesia.transaction(maybe_order_tx)

new_order =
case maybe_order do
[{_, ^key, order}] ->
order + 1

[] ->
1
end

IO.inspect(new_order, label: "putting at order")

write_tx = fn ->
:mnesia.write({Anoma.Node.Storage.Order, key, new_order})
:mnesia.write({Anoma.Node.Storage.Qualified, [new_order, key | 0], value})
end

:mnesia.transaction(write_tx)
end

def create_tables() do
# fully qualified key-value map
:mnesia.create_table(Anoma.Node.Storage.Qualified, attributes: [:key, :value])
# map from keys to qualified keys
:mnesia.create_table(Anoma.Node.Storage.Order, attributes: [:key, :order])
end

def reset() do
:mnesia.delete_table(Anoma.Node.Storage.Qualified)
:mnesia.delete_table(Anoma.Node.Storage.Order)
create_tables()
end
end
24 changes: 24 additions & 0 deletions lib/anoma/node/worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule Anoma.Node.Worker do
@moduledoc """
I am a Nock worker, supporting scry.
"""

@dialyzer :no_improper_lists

import Nock

def run(order, gate) do
IO.inspect(order, label: "worker dispatched after order")
{:ok, ordered_tx} = nock(gate, [10, [6, 1 | order], 0 | 1])
tx_result = nock(ordered_tx, [9, 2, 0 | 1])

case tx_result do
{:ok, [key | value]} ->
Anoma.Node.Storage.put(key, value)
IO.puts("worker succeeded")

_ ->
IO.puts("worker failed!")
end
end
end
52 changes: 52 additions & 0 deletions lib/anoma/tx.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Anoma.Tx do
@moduledoc """
Sample transactions.
"""

@dialyzer :no_improper_lists

# %ctr
@counter_name_val 7_500_899
def counter_name do
@counter_name_val
end

# [%ctr 0]
@zero_counter_val [Noun.Format.parse_always("[1 7.500.899 0]"), 0 | Nock.stdlib_core()]
def zero_counter do
@zero_counter_val
end

# [%ctr a]
def set_counter(a) do
[[1, 7_500_899 | a], 0 | Nock.stdlib_core()]
end

# [%ctr .+(.^(/[order]/ctr))]
@increment_counter_val [
Noun.Format.parse_always("[[1 7.500.899] 4 12 [1 0] [0 6] 1 7.500.899 0]"),
0 | Nock.stdlib_core()
]
def increment_counter do
@increment_counter_val
end

# [%ctr (add a .^(/[order]/ctr))]
def add_to_counter(a) do
[
[
[1 | @counter_name_val],
8,
[9, 20, 0 | 7],
9,
2,
10,
[6, [7, [0 | 3], 1 | a], 7, [0 | 3], 12, [1 | 0], [0 | 6], 1, @counter_name_val | 0],
0
| 2
],
0
| Nock.stdlib_core()
]
end
end
16 changes: 16 additions & 0 deletions lib/nock.ex
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,22 @@ defmodule Nock do
}
end

# scry: magically read from storage.
def nock(subject, [12, type_formula | subformula], jettedness) do
try do
# type information ignored for now
{:ok, _type_result} = nock(subject, type_formula, jettedness)
# the sub result is a fully qualified key into storage.
{:ok, sub_result} = nock(subject, subformula, jettedness)
# this blocks until available
IO.inspect(sub_result, label: "scrying key")
{:ok, value} = Anoma.Node.Storage.blocking_read(sub_result)
{:ok, value}
rescue
_ in MatchError -> :error
end
end

# generic case: use naive nock to reduce once.
@spec nock(Noun.t(), Noun.t(), :jetted | :unjetted_once | :unjetted) ::
{:ok, Noun.t()} | :error
Expand Down

0 comments on commit 62c3518

Please sign in to comment.