Skip to content

Commit

Permalink
second demo tx execution
Browse files Browse the repository at this point in the history
  • Loading branch information
juped authored and mariari committed Dec 20, 2023
1 parent 62c3518 commit 5a2c40e
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 17 deletions.
90 changes: 77 additions & 13 deletions lib/anoma/node/ordering.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,104 @@ defmodule Anoma.Node.Ordering do
end

def init(_opts) do
{:ok, 1}
{:ok, %{}}
end

def handle_call(:execute, _from, state) do
# order the pending transactions
next_order = Anoma.Node.Storage.next_order()
ordered_transactions = order(state, next_order)
number_of_transactions = length(ordered_transactions)
IO.inspect(ordered_transactions, label: "ordered transaction set")
:ok = Anoma.Node.Storage.new_order(ordered_transactions)

for {index, {id, pid}} <- ordered_transactions do

Check warning on line 24 in lib/anoma/node/ordering.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (topic)

variable "id" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 24 in lib/anoma/node/ordering.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (topic)

variable "id" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 24 in lib/anoma/node/ordering.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (topic)

variable "id" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 24 in lib/anoma/node/ordering.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (next)

variable "id" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 24 in lib/anoma/node/ordering.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (next)

variable "id" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 24 in lib/anoma/node/ordering.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (next)

variable "id" is unused (if the variable is not meant to be used, prefix it with an underscore)
IO.inspect(pid, label: "sending write ready to pid")
send(pid, {:write_ready, index})
end

{:reply, {:ok, number_of_transactions}, %{}}
end

def handle_call({:execute_manual, tx_list}, _from, state) do
next_order = Anoma.Node.Storage.next_order()
tx_pid_list = Enum.map(tx_list, fn tx -> spawn_with_random_id(tx) end)

ordered_transactions =
Enum.with_index(tx_pid_list, fn element, index -> {index + next_order, element} end)

number_of_transactions = length(ordered_transactions)
IO.inspect(ordered_transactions, label: "ordered transaction set")
:ok = Anoma.Node.Storage.new_order(ordered_transactions)

for {index, {id, pid}} <- ordered_transactions do

Check warning on line 43 in lib/anoma/node/ordering.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (topic)

variable "id" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 43 in lib/anoma/node/ordering.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (topic)

variable "id" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 43 in lib/anoma/node/ordering.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (topic)

variable "id" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 43 in lib/anoma/node/ordering.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (next)

variable "id" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 43 in lib/anoma/node/ordering.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (next)

variable "id" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 43 in lib/anoma/node/ordering.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (next)

variable "id" is unused (if the variable is not meant to be used, prefix it with an underscore)
IO.inspect(pid, label: "sending write ready to pid")
send(pid, {:write_ready, index})
end

{:reply, {:ok, number_of_transactions}, state}
end

def handle_call(:pending, _from, state) do
{:reply, {:ok, state}, state}
end

def handle_call(_msg, _from, state) do
{:reply, :ok, state}
end

def handle_cast({:tx, tx_code}, state) do
spawn(Anoma.Node.Worker, :run, [state - 1, tx_code])
{:noreply, state + 1}
random_tx_id = random_id()
worker_pid = spawn(Anoma.Node.Worker, :run, [random_tx_id, tx_code])
{:noreply, Map.put(state, random_tx_id, worker_pid)}
end

def handle_cast({:inject, order, tx_code}, state) do
spawn(Anoma.Node.Worker, :run, [order - 1, tx_code])
{:noreply, state}
end
def handle_cast(:reset, state) do
for pid <- Map.values(state) do
Process.exit(pid, :kill)
end

def handle_cast(:reset, _state) do
{:noreply, 1}
{:noreply, %{}}
end

def handle_cast(_msg, state) do
{:noreply, state}
end

def tx(tx_code) do
GenServer.cast(__MODULE__, {:tx, tx_code})
def spawn_with_random_id(tx_code) do
id = random_id()
pid = spawn(Anoma.Node.Worker, :run, [id, tx_code])
{id, pid}
end

def order(tx_map, next_order) do
Map.to_list(tx_map)
|> Enum.shuffle()
|> Enum.with_index(fn element, index -> {index + next_order, element} end)
end

def execute() do
GenServer.call(__MODULE__, :execute)
end

def execute_manual(tx_list) do
GenServer.call(__MODULE__, {:execute_manual, tx_list})
end

def inject_tx(order, tx_code) do
GenServer.cast(__MODULE__, {:inject, order, tx_code})
def pending() do
GenServer.call(__MODULE__, :pending)
end

def tx(tx_code) do
GenServer.cast(__MODULE__, {:tx, tx_code})
end

def reset() do
GenServer.cast(__MODULE__, :reset)
end

# 128-bit random id
def random_id() do
:crypto.strong_rand_bytes(16)
end
end
74 changes: 73 additions & 1 deletion lib/anoma/node/storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,45 @@ defmodule Anoma.Node.Storage do
# idempotent
create_tables()
:mnesia.subscribe({:table, Anoma.Node.Storage.Qualified, :simple})
{:ok, %{}}
{:ok, %{next_order: 1, hash_to_order: %{}, subscribers: %{}}}
end

def handle_call(:state, _from, state) do
{:reply, state, state}
end

def handle_call(:next_order, _from, state) do
{:reply, state.next_order, state}
end

def handle_call({:true_order, id}, _from, state) do
{:reply, Map.get(state.hash_to_order, id), state}
end

def handle_call({:new_order, ordered_transactions}, _from, state) do
num_txs = length(ordered_transactions)
IO.inspect(num_txs, label: "new tx count")

for {index, {id, pid}} <- ordered_transactions do

Check warning on line 38 in lib/anoma/node/storage.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (topic)

variable "id" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 38 in lib/anoma/node/storage.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (topic)

variable "id" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 38 in lib/anoma/node/storage.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (topic)

variable "id" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 38 in lib/anoma/node/storage.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (next)

variable "id" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 38 in lib/anoma/node/storage.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (next)

variable "id" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 38 in lib/anoma/node/storage.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (next)

variable "id" is unused (if the variable is not meant to be used, prefix it with an underscore)
IO.inspect(pid, label: "sending read ready to pid")
send(pid, {:read_ready, index})
end

new_next_order = state.next_order + length(ordered_transactions)
new_map_elements = Map.new(ordered_transactions, fn {index, {id, pid}} -> {id, index} end)

Check warning on line 44 in lib/anoma/node/storage.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (topic)

variable "pid" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 44 in lib/anoma/node/storage.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (topic)

variable "pid" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 44 in lib/anoma/node/storage.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (topic)

variable "pid" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 44 in lib/anoma/node/storage.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (next)

variable "pid" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 44 in lib/anoma/node/storage.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (next)

variable "pid" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 44 in lib/anoma/node/storage.ex

View workflow job for this annotation

GitHub Actions / Tests, dialyzer, format (next)

variable "pid" is unused (if the variable is not meant to be used, prefix it with an underscore)
new_map = Map.merge(state.hash_to_order, new_map_elements)
{:reply, :ok, %{state | next_order: new_next_order, hash_to_order: new_map}}
end

def handle_call({:get, key}, _from, state) do
value = blocking_read(key)
{:reply, value, state}
end

def handle_call(:reset, _from, _state) do
{:reply, :ok, %{next_order: 1, hash_to_order: %{}, subscribers: %{}}}
end

def handle_call(_msg, _from, state) do
{:reply, :ok, state}
end
Expand All @@ -37,7 +68,31 @@ defmodule Anoma.Node.Storage do
{:noreply, state}
end

# translate from ids to true order
def blocking_read_id([id | subkey]) do
maybe_true_order = true_order(id)

case maybe_true_order do
nil ->
IO.inspect({self(), id}, label: "waiting on read ready")

receive do
{:read_ready, true_order} ->
read_order = true_order - 1
full_key = [read_order | subkey]
IO.inspect({self(), id, true_order}, label: "got read ready")
IO.inspect(full_key, label: "getting at key")
blocking_read(full_key)
end

true_order ->
blocking_read([true_order - 1 | subkey])
end
end

def blocking_read(key) do
IO.inspect(key, label: "regular blocking read key")

case key do
[0 | _] ->
:error
Expand Down Expand Up @@ -109,9 +164,26 @@ defmodule Anoma.Node.Storage do
:mnesia.create_table(Anoma.Node.Storage.Order, attributes: [:key, :order])
end

def state() do
GenServer.call(__MODULE__, :state)
end

def next_order() do
GenServer.call(__MODULE__, :next_order)
end

def true_order(id) do
GenServer.call(__MODULE__, {:true_order, id})
end

def new_order(ordered_transactions) do
GenServer.call(__MODULE__, {:new_order, ordered_transactions})
end

def reset() do
:mnesia.delete_table(Anoma.Node.Storage.Qualified)
:mnesia.delete_table(Anoma.Node.Storage.Order)
create_tables()
GenServer.call(__MODULE__, :reset)
end
end
14 changes: 12 additions & 2 deletions lib/anoma/node/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,27 @@ defmodule Anoma.Node.Worker do
import Nock

def run(order, gate) do
IO.inspect(order, label: "worker dispatched after order")
IO.inspect(order, label: "worker dispatched with order id")
{:ok, ordered_tx} = nock(gate, [10, [6, 1 | order], 0 | 1])
tx_result = nock(ordered_tx, [9, 2, 0 | 1])

case tx_result do
{:ok, [key | value]} ->
IO.inspect(tx_result, label: "worker succeeded with output")
# wait to be told to make writes
true_order =
receive do
{:write_ready, order} ->
IO.inspect(self(), label: "got write ready")
order
end

IO.inspect(true_order, label: "worker writing at true order")
Anoma.Node.Storage.put(key, value)
IO.puts("worker succeeded")

_ ->
IO.puts("worker failed!")
IO.inspect(tx_result, label: "failure result")
end
end
end
2 changes: 1 addition & 1 deletion lib/nock.ex
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ defmodule Nock do
{:ok, sub_result} = nock(subject, subformula, jettedness)
# this blocks until available
IO.inspect(sub_result, label: "scrying key")
{:ok, value} = Anoma.Node.Storage.blocking_read(sub_result)
{:ok, value} = Anoma.Node.Storage.blocking_read_id(sub_result)
{:ok, value}
rescue
_ in MatchError -> :error
Expand Down

0 comments on commit 5a2c40e

Please sign in to comment.