Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store_past_p2p_view #1660

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 257 additions & 0 deletions lib/archethic/db/embedded_impl/p2p_view2.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
defmodule Archethic.DB.EmbeddedImpl.P2PViewTwo do
defstruct [
:geo_patch,
:available?,
:avg_availability
]

@type t :: %{
geo_patch: binary(),
available?: boolean(),
avg_availability: float()
}

@archethic_db_p2pview :archethic_db_p2pview

require Logger
use GenServer

@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

def init(_) do
setup_ets_table()
{:ok, %{}}
end

defp setup_ets_table, do: :ets.new(@archethic_db_p2pview, [:ordered_set, :named_table])

@spec set_summary(timestamp :: DateTime.t(), p2p_view :: list(t())) :: :ok
def set_summary(timestamp, p2p_view) when is_list(p2p_view) do
unix_timestamp = DateTime.to_unix(timestamp)
bin_p2p_view = serialize(p2p_view)

GenServer.call(__MODULE__, {:set_summary, unix_timestamp, bin_p2p_view})
end

@spec get_summary(timestamp :: DateTime.t()) :: list(t())
def get_summary(timestamp) do
DateTime.to_unix(timestamp)
|> read_nodes()
|> deserialize()
end

@spec update_node(
changes :: [],
start_timestamp :: DateTime.t(),
index_at_timestamp :: (DateTime.t() -> integer())
) :: :ok
def update_node(changes, start_timestamp, index_at_timestamp) do
unix_start_timestamp = DateTime.to_unix(start_timestamp)
GenServer.call(__MODULE__, {:update_node, changes, unix_start_timestamp, index_at_timestamp})
end

@spec add_node(
bin_node :: t(),
start_timestamp :: DateTime.t(),
index_at_timestamp :: (DateTime.t() -> integer())
) ::
:ok
def add_node(bin_node, start_timestamp, index_at_timestamp) do
unix_start_timestamp = DateTime.to_unix(start_timestamp)
node_bin = serialize_node(bin_node)
GenServer.call(__MODULE__, {:add_node, node_bin, unix_start_timestamp, index_at_timestamp})
end

# GenServer Callbacks
def handle_call({:set_summary, unix_timestamp, serialized_nodes}, _from, state) do
write_nodes(serialized_nodes, unix_timestamp)
{:reply, :ok, state}
end

def handle_call({:update_node, changes, unix_start_timestamp, index_at_timestamp}, _from, state) do
do_update_node(changes, unix_start_timestamp, index_at_timestamp)
{:reply, :ok, state}
end

defp do_update_node(_, :"$end_of_table", _) do
end

defp do_update_node(changes, unix_timestamp, index_at_timestamp) do
node_index = index_at_timestamp.(unix_timestamp)

read_nodes(unix_timestamp)
|> update_bin_node(node_index, changes)
|> write_nodes(unix_timestamp)

do_update_node(changes, :ets.next(@archethic_db_p2pview, unix_timestamp), index_at_timestamp)
end

def handle_call({:add_node, node_bin, unix_start_timestamp, index_at_timestamp}, _from, state) do

Check warning on line 92 in lib/archethic/db/embedded_impl/p2p_view2.ex

View workflow job for this annotation

GitHub Actions / Build and test

clauses with the same name and arity (number of arguments) should be grouped together, "def handle_call/3" was previously defined (lib/archethic/db/embedded_impl/p2p_view2.ex:69)
do_add_node(node_bin, unix_start_timestamp, index_at_timestamp)
{:reply, :ok, state}
end

defp do_add_node(_, :"$end_of_table", _) do
end

defp do_add_node(node_bin, unix_timestamp, index_at_timestamp) do
node_index = index_at_timestamp.(unix_timestamp)

read_nodes(unix_timestamp)
|> insert_bin_node(node_index, node_bin)
|> write_nodes(unix_timestamp)

do_add_node(
node_bin,
:ets.next(@archethic_db_p2pview, unix_timestamp),
index_at_timestamp
)
end

# TODO decliner avec enregistrement sur fichier
@spec read_nodes(unix_timestamp :: integer()) :: binary()
defp read_nodes(unix_timestamp) do
index = :ets.prev(@archethic_db_p2pview, unix_timestamp + 1)

{_, data} =
:ets.lookup(@archethic_db_p2pview, index)
|> Enum.at(0)

data
end

# TODO decliner avec enregistrement sur fichier
@spec write_nodes(nodes :: binary(), unix_timestamp :: integer()) :: :ok
defp write_nodes(nodes, unix_timestamp) do
:ets.insert(
@archethic_db_p2pview,
{unix_timestamp, nodes}
)

:ok
end

@bin_node_byte_size 5

@spec serialize(nodes :: list(t()), acc :: binary()) :: binary()
defp serialize(nodes, acc \\ <<>>)

defp serialize([], acc) do
acc
end

defp serialize([bin_node | rest], acc) do
node_bin = serialize_node(bin_node)

serialize(
rest,
acc <> node_bin
)
end

defp serialize_node(%__MODULE__{
geo_patch: geo_patch,
available?: available?,
avg_availability: avg_availability
}) do
available_bin = if available?, do: 1, else: 0
avg_availability_int = round(avg_availability * 100)

<<geo_patch::binary-size(3), available_bin, avg_availability_int::8>>
end

@spec deserialize(rest :: binary(), acc :: list(t())) :: list(t())
defp deserialize(rest, acc \\ [])

defp deserialize(<<>>, acc) do
acc |> Enum.reverse()
end

defp deserialize(
<<node_bin::binary-size(@bin_node_byte_size), rest::binary>>,
acc
) do
node = deserialize_node(node_bin)

deserialize(rest, [
node
| acc
])
end

@spec deserialize_node(bin_node :: binary()) :: t()
defp deserialize_node(<<geo_patch::binary-size(3), available_bin, avg_availability_int::8>>) do
available? = available_bin == 1
avg_availability = avg_availability_int / 100

%__MODULE__{
geo_patch: geo_patch,
available?: available?,
avg_availability: avg_availability
}
end

# Helper functions for bin_node manipulation in binary form

@spec get_bin_node(bin_p2p_view :: binary(), index :: integer()) :: binary()
defp get_bin_node(
bin_p2p_view,
index
) do
:erlang.binary_part(bin_p2p_view, @bin_node_byte_size * index, @bin_node_byte_size)
end

@spec insert_bin_node(bin_p2p_view :: binary(), index :: integer(), bin_node :: binary()) ::
binary()
defp insert_bin_node(
bin_p2p_view,
index,
bin_node
) do
prefix_size = @bin_node_byte_size * index

:erlang.binary_part(bin_p2p_view, 0, prefix_size) <>
bin_node <>
:erlang.binary_part(
bin_p2p_view,
prefix_size,
:erlang.byte_size(bin_p2p_view) - prefix_size
)
end

@spec update_bin_node(bin_p2p_view :: binary(), index :: integer(), changes :: []) ::
binary()
defp update_bin_node(
bin_p2p_view,
index,
changes
) do
updated_node =
get_bin_node(bin_p2p_view, index)
|> deserialize_node()
|> apply_changes_to_node(changes)
|> serialize_node()

prefix_size = @bin_node_byte_size * index
suffix_size = :erlang.byte_size(bin_p2p_view) - @bin_node_byte_size * (index + 1)

:erlang.binary_part(bin_p2p_view, 0, prefix_size) <>
updated_node <>
:erlang.binary_part(
bin_p2p_view,
prefix_size,
suffix_size
)
end

@spec apply_changes_to_node(bin_node :: t(), changes :: []) :: t()
defp apply_changes_to_node(bin_node, changes) do
Map.merge(
bin_node,
Map.new(changes)
)
end
end
1 change: 1 addition & 0 deletions lib/archethic/p2p/mem_table.ex
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ defmodule Archethic.P2P.MemTable do
@doc """
List the P2P nodes
"""
# TODO add date en parametre. retourner tout les noeuds ou enrollment_date < date
@spec list_nodes() :: list(Node.t())
def list_nodes do
:ets.foldl(
Expand Down
Loading
Loading