Note: many sections start with a line that says tag: <tag-name>
if you do
git checkout <tag-name>
it will take you to the commit where the changes of
that section are all commited, so you can skip them and try the examples or run
the tests. To return to the last commit run git checkout main
.
tag: 01-tools-setup
This tutorial assumes you have the following tools and versions installed:
Erlang/OTP 23.1 Elixir 1.10.4 (and Mix) Git: any recent version is ok
The tutorial was also tested with this versions:
Erlang/OTP 22.3 Elixir 1.9.4
We assume you are running Linux, Mac OS X or WSL on Windows.
The tutorial was tested on Ubuntu 20.04, Mac OS X Catalina and Ubuntu 20.04 for WSL on Windows 10.
For instructions to install Erlang, see https://adoptingerlang.org/docs/development/setup/ For instructions to install Elixir, see https://elixir-lang.org/install.html
If you have no preferences you can use asdf, notice that it requires a C compiler toolchain in order to compiler Erlang, see at the end for instructions to do it on ubuntu-like systems, on Mac OS X it should work if you have brew installed.
git clone https://github.com/asdf-vm/asdf.git ~/.asdf --branch v0.8.0
. ~/.asdf/asdf.sh
asdf plugin add erlang
asdf plugin add elixir
asdf list all elixir
asdf list all erlang
asdf install erlang 23.1
asdf install elixir 1.10.4-otp-23
Run these when you want to enable them:
asdf local erlang 23.1
asdf local elixir 1.10.4-otp-23
# required: basic tools and libraries needed
# (compiler, curses for the shell, ssl for crypto)
sudo apt-get -y install build-essential m4 libncurses5-dev libssl-dev autoconf unzip
# optonal: if you want odbc support (database connectivity)
sudo apt-get install unixodbc-dev
# optonal: if you want pdf docs you need apache fop and xslt tools and java (fop is a java project)
sudo apt-get install -y fop xsltproc default-jdk
# optional: if you want to build jinterface you need a JDK
sudo apt-get install -y default-jdk
# optional: if you want wx (desktop GUI modules)
sudo apt-get install -y libwxgtk3.0-dev
tag: 02-project-setup
Instal the Riak Core Lite Mix task:
$ mix archive.install hex rcl
Resolving Hex dependencies...
Dependency resolution completed:
New:
rcl 0.1.9
* Getting rcl (Hex package)
All dependencies are up to date
Compiling 2 files (.ex)
Generated rcl app
Generated archive "rcl-0.1.9.ez" with MIX_ENV=prod
* creating /home/mariano/.asdf/installs/elixir/1.10.4-otp-23/.mix/archives/rcl-0.1.9
If you have an existing version installed it will ask you if you want to replace it, say y
:
Found existing entry: /home/mariano/.asdf/installs/elixir/1.10.4-otp-23/.mix/archives/rcl-0.1.9
Are you sure you want to replace it with "rcl-0.1.9.ez"? [Yn]
Create a new project called rkv
, to make it simpler don't change the name of the project unless you want to edit every single snippet :)
mix rcl new rkv
Creating project rkv, module Rkv
* creating README.md
* creating .formatter.exs
* creating .gitignore
* creating mix.exs
* creating lib
* creating lib/rkv.ex
* creating test
* creating test/test_helper.exs
* creating test/rkv_test.exs
Your Mix project was created successfully.
You can use "mix" to compile it, test it, and more:
cd rkv
mix test
Run "mix help" for more commands.
rcl: creating rkv/mix.exs
rcl: creating rkv/lib/rkv.ex
rcl: creating rkv/lib/rkv/application.ex
rcl: creating rkv/lib/rkv/vnode.ex
rcl: creating rkv/lib/rkv/service.ex
rcl: creating rkv/lib/rkv/supervisor.ex
rcl: creating rkv/config/config.exs
rcl: creating rkv/config/dev.exs
rcl: creating rkv/config/test.exs
rcl: creating rkv/config/ct.exs
rcl: creating rkv/config/node1.exs
rcl: creating rkv/config/node2.exs
rcl: creating rkv/config/node3.exs
rcl: creating rkv/rel/env.bat.eex
rcl: creating rkv/rel/env.sh.eex
rcl: creating rkv/rel/vm.args.eex
cd rkv
Get deps and compile:
cd rkv
mix deps.get
mix compile
Start the project and attach iex:
iex --name [email protected] -S mix run
Run this in iex:
Rkv.Service.ping()
If after many logs you see something like this (the last number can be different):
{:pong, 2, :"[email protected]", 159851741583067506678528028578343455274867621888}
Then it works!
tag: 03-ring-size-16
Edit config/config.exs
:
# chage
ring_creation_size: 64
# to
ring_creation_size: 16
Note: remove the data
folder if it exists since it has a ring file of size 64:
rm -rf data
tag: 04-kv-ets
Abstract Key Value Store behaviour:
# lib/rkv/kv.ex
defmodule Rkv.KV do
@type kv_state :: term()
@callback init(opts :: %{atom() => term()}) ::
{:ok, state :: kv_state()} | {:error, reason :: term()}
@callback put(state :: kv_state(), key :: term(), value :: term()) ::
:ok | {:error, reason :: term()}
@callback get(state :: kv_state(), key :: term()) ::
{:ok, value :: term()} | {:error, reason :: term()}
@callback delete(state :: kv_state(), key :: term()) ::
:ok | {:error, reason :: term()}
end
In-memory implementation using ETS:
# lib/rkv/kv_ets.ex
defmodule Rkv.KV.ETS do
@behaviour Rkv.KV
defmodule State do
defstruct [:table_name, :table_id]
end
def init(%{uid: uid}) do
table_name = String.to_atom("kv_ets_#{inspect(uid)}")
ets_opts = [:set, {:write_concurrency, false}, {:read_concurrency, false}]
table_id = :ets.new(table_name, ets_opts)
{:ok, %State{table_name: table_name, table_id: table_id}}
end
def put(state, key, value) do
true = :ets.insert(state.table_id, {key, value})
:ok
end
def get(state, key) do
case :ets.lookup(state.table_id, key) do
[] ->
{:error, :not_found}
[{_, value}] ->
{:ok, value}
end
end
def delete(state, key) do
true = :ets.delete(state.table_id, key)
:ok
end
end
Some tests:
# test/rkv_test.exs
defmodule RkvTest do
use ExUnit.Case
doctest Rkv
test "KV.ETS get" do
{:ok, state} = Rkv.KV.ETS.init(%{uid: :erlang.unique_integer()})
{:error, :not_found} = Rkv.KV.ETS.get(state, :k1)
:ok = Rkv.KV.ETS.delete(state, :k1)
:ok = Rkv.KV.ETS.put(state, :k2, :v2)
{:ok, :v2} = Rkv.KV.ETS.get(state, :k2)
:ok = Rkv.KV.ETS.delete(state, :k2)
{:error, :not_found} = Rkv.KV.ETS.get(state, :k2)
end
end
Test it:
mix test
tag: 05-kv-vnode-commands
Build 3 releases with different configurations to run them on the same machine:
MIX_ENV=node1 mix release node1
MIX_ENV=node2 mix release node2
MIX_ENV=node3 mix release node3
On terminal 1 (node1):
./_build/node1/rel/node1/bin/node1 start_iex
On terminal 2 (node2):
./_build/node2/rel/node2/bin/node2 start_iex
On terminal 3 (node3):
./_build/node3/rel/node3/bin/node3 start_iex
Run on node2 and node3:
:riak_core.join('[email protected]')
You should see something like this on node1:
[info] '[email protected]' joined cluster with status 'joining'
[info] '[email protected]' joined cluster with status 'joining'
Run on node1:
:riak_core_claimant.plan()
:riak_core_claimant.commit()
Periodically run this until it stabilizes:
:riak_core_console.member_status([])
You should see something like this on node1:
================================= Membership ==================================
Status Ring Pending Node
-------------------------------------------------------------------------------
valid 37.5% -- '[email protected]'
valid 31.3% -- '[email protected]'
valid 31.3% -- '[email protected]'
-------------------------------------------------------------------------------
Valid:3 / Leaving:0 / Exiting:0 / Joining:0 / Down:0
Periodically run this until it stabilizes:
{:ok, ring} = :riak_core_ring_manager.get_my_ring()
:riak_core_ring.pretty_print(ring, [:legend])
You should see something like this on node1:
==================================== Nodes ====================================
Node a: 6 ( 37.5%) [email protected]
Node b: 5 ( 31.3%) [email protected]
Node c: 5 ( 31.3%) [email protected]
==================================== Ring =====================================
abca|bcab|cabc|abca|
Run these (play with the argument value) and see which node and partition replies
Rkv.Service.ping(3)
Rkv.Service.ping(5)
Rkv.Service.ping(7)
Let's add get
, put
and delete
to Rkv.Service
(lib/rkv/service.ex
):
defmodule Rkv.Service do
def ping(v \\ 1) do
send_cmd("ping#{v}", {:ping, v})
end
def put(k, v) do
send_cmd(k, {:put, {k, v}})
end
def get(k) do
send_cmd(k, {:get, k})
end
def delete(k) do
send_cmd(k, {:delete, k})
end
defp send_cmd(k, cmd) do
idx = :riak_core_util.chash_key({"rkv", k})
pref_list = :riak_core_apl.get_primary_apl(idx, 1, Rkv.Service)
[{index_node, _type}] = pref_list
:riak_core_vnode_master.sync_command(index_node, cmd, Rkv.VNode_master)
end
end
Implement the commands in lib/rkv/vnode.ex
, change init to:
def init([partition]) do
kv_mod = Rkv.KV.ETS
{:ok, state} = kv_mod.init(%{uid: partition})
{:ok, %{partition: partition, kv_mod: kv_mod, kv_state: state}}
end
Add the following 3 clauses after :ping
:
def handle_command({:get, k}, _sender, state) do
result = state.kv_mod.get(state.kv_state, k)
{:reply, {result, node(), state.partition}, state}
end
def handle_command({:put, {k, v}}, _sender, state) do
result = state.kv_mod.put(state.kv_state, k, v)
{:reply, {result, node(), state.partition}, state}
end
def handle_command({:delete, k}, _sender, state) do
result = state.kv_mod.delete(state.kv_state, k)
{:reply, {result, node(), state.partition}, state}
end
Compile and run:
mix compile
iex --name [email protected] -S mix run
Test the new functions:
Rkv.Service.get(:k1)
{{:error, :not_found}, :"[email protected]", 639406966332270026714112114313373821099470487552}
Rkv.Service.delete(:k1)
{:ok, :"[email protected]", 639406966332270026714112114313373821099470487552}
Rkv.Service.put(:k2, :v2)
{:ok, :"[email protected]", 685078892498860742907977265335757665463718379520}
Rkv.Service.get(:k2)
{{:ok, :v2}, :"[email protected]", 685078892498860742907977265335757665463718379520}
Rkv.Service.delete(:k2)
{:ok, :"[email protected]", 685078892498860742907977265335757665463718379520}
Rkv.Service.get(:k2)
{{:error, :not_found}, :"[email protected]", 685078892498860742907977265335757665463718379520}
tag: 06-kv-external-api
Let's wrap Rkv.Service
with an external API that doesn't expose so much
internal state that's only useful for learning , some tests and development but
not much for production:
Change lib/rkv.ex
to:
defmodule Rkv do
def ping(v \\ 1) do
{r, _, _} = Rkv.Service.ping(v)
r
end
def put(k, v) do
{r, _, _} = Rkv.Service.put(k, v)
r
end
def get(k) do
{r, _, _} = Rkv.Service.get(k)
r
end
def delete(k) do
{r, _, _} = Rkv.Service.delete(k)
r
end
end
Compile and Run:
mix compile
iex --name [email protected] -S mix run
Rkv.get(:k1)
{:error, :not_found}
Rkv.delete(:k1)
:ok
Rkv.put(:k2, :v2)
:ok
Rkv.get(:k2)
{:ok, :v2}
Rkv.delete(:k2)
:ok
Rkv.get(:k2)
{:error, :not_found}
tag: 07-quorum-commands
Add the following functions to lib/rkv/service.ex
:
def put_quorum(k, v, n, w, timeout_ms) do
quorum_cmd(k, {:put, {k, v}}, n, w, timeout_ms)
end
def get_quorum(k, n, w, timeout_ms) do
quorum_cmd(k, {:get, k}, n, w, timeout_ms)
end
def delete_quorum(k, n, w, timeout_ms) do
quorum_cmd(k, {:delete, k}, n, w, timeout_ms)
end
defp quorum_cmd(k, cmd, n, w, timeout_ms) do
ref = make_ref()
opts = %{ref: ref, from: self(), w: w, wait_timeout_ms: timeout_ms}
:riak_core_quorum_statem.quorum_request({"rkv", k}, cmd, n, Rkv.Service, Rkv.VNode_master, opts)
receive do
{^ref, res} -> res
end
end
Compile and Run:
mix compile
iex --name [email protected] -S mix run
Try the new functions:
Rkv.Service.get_quorum(:k1, 3, 2, 1000)
{:ok,
%{
reason: :finished,
result: [
{{:error, :not_found}, :"[email protected]",
822094670998632891489572718402909198556462055424},
{{:error, :not_found}, :"[email protected]",
639406966332270026714112114313373821099470487552}
]
}}
Rkv.Service.get_quorum(:k1, 3, 3, 1000)
{:ok,
%{
reason: :finished,
result: [
{{:error, :not_found}, :"[email protected]",
730750818665451459101842416358141509827966271488},
{{:error, :not_found}, :"[email protected]",
822094670998632891489572718402909198556462055424},
{{:error, :not_found}, :"[email protected]",
639406966332270026714112114313373821099470487552}
]
}}
Rkv.Service.put_quorum(:k1, :v1, 3, 3, 1000)
{:ok,
%{
reason: :finished,
result: [
{:ok, :"[email protected]", 730750818665451459101842416358141509827966271488},
{:ok, :"[email protected]", 822094670998632891489572718402909198556462055424},
{:ok, :"[email protected]", 639406966332270026714112114313373821099470487552}
]
}}
Rkv.Service.get_quorum(:k1, 3, 3, 1000)
{:ok,
%{
reason: :finished,
result: [
{{:ok, :v1}, :"[email protected]",
730750818665451459101842416358141509827966271488},
{{:ok, :v1}, :"[email protected]",
639406966332270026714112114313373821099470487552},
{{:ok, :v1}, :"[email protected]",
822094670998632891489572718402909198556462055424}
]
}}
Rkv.Service.delete_quorum(:k1, 3, 3, 1000)
{:ok,
%{
reason: :finished,
result: [
{:ok, :"[email protected]", 822094670998632891489572718402909198556462055424},
{:ok, :"[email protected]", 730750818665451459101842416358141509827966271488},
{:ok, :"[email protected]", 639406966332270026714112114313373821099470487552}
]
}}
Rkv.Service.get_quorum(:k1, 3, 3, 1000)
{:ok,
%{
reason: :finished,
result: [
{{:error, :not_found}, :"[email protected]",
822094670998632891489572718402909198556462055424},
{{:error, :not_found}, :"[email protected]",
730750818665451459101842416358141509827966271488},
{{:error, :not_found}, :"[email protected]",
639406966332270026714112114313373821099470487552}
]
}}
tag: 08-testing
Add this deps to mix.exs
:
{:ctex, "~> 0.1.0", env: :ct},
{:rcl_test, "~> 0.2.0", env: :ct}
Fetch new deps:
mix deps.get
Create a folder for Common Test suite:
mkdir ct
Add our first Common Test suite:
Create a test suite at ct/rkv_SUITE.exs
:
defmodule Rkv_SUITE do
def all() do
[:simple_test]
end
def init_per_suite(config) do
config
end
def end_per_suite(config) do
config
end
def simple_test(_config) do
1 = 1
end
end
Run it:
MIX_ENV=ct mix ct
Change ct/rkv_SUITE.exs
to this:
defmodule Rkv_SUITE do
def all() do
[:simple_test, :get_not_found]
end
def init_per_suite(config) do
common_config = %{
app: :rkv,
build_env: "dev",
data_dir_name: "rkv-data",
setup_node_fn: fn (_) -> :ok end,
}
nodes_config = %{dev1: %{base_port: 10015}}
:rcl_test.init_nodes(__MODULE__, config, common_config, nodes_config)
end
def end_per_suite(config) do
config
end
def simple_test(_config) do
1 = 1
end
def get_not_found(config) do
[node] = :test_server.lookup_config(:nodes, config)
key = :k1
{:error, :not_found} = :rpc.call(node, Rkv, :get, [key])
end
end
Run it again:
MIX_ENV=ct mix ct
Add cluster_join
test to all
in ct/rkv_SUITE.exs
:
def all() do
[:simple_test, :get_not_found, :cluster_join]
end
Add the test implementation at the end of ct/rkv_SUITE.exs
:
def cluster_join(config0) do
common_config = %{
app: :rkv,
build_env: "dev",
data_dir_name: "rkv-data",
setup_node_fn: fn (_) -> :ok end,
}
nodes_config = %{
node1: %{base_port: 10115},
node2: %{base_port: 10215},
node3: %{base_port: 10315},
}
config = :rcl_test.init_nodes(__MODULE__, config0, common_config, nodes_config)
[node1, node2, node3] = :test_server.lookup_config(:nodes, config)
:ok = :rcl_test.add_nodes_to_cluster(node1, [node2, node3])
key = :k1
val = :v1
{:error, :not_found} = :rpc.call(node1, Rkv, :get, [key])
{:error, :not_found} = :rpc.call(node2, Rkv, :get, [key])
{:error, :not_found} = :rpc.call(node3, Rkv, :get, [key])
:ok = :rpc.call(node1, Rkv, :put, [key, val])
{:ok, ^val} = :rpc.call(node2, Rkv, :get, [key])
end
Run it again:
MIX_ENV=ct mix ct
tag: 09-benchmarking
Create a new project inside rkv
:
mix new rkv_bench --sup
cd rkv_bench
Add the following dependency to rkv_bench/mix.exs
:
{:rcl_bench, "~> 0.1.0"}
Add :rcl_bench
to :extra_applications
in rkv_bench/mix.exs
:
extra_applications: [:logger, :rcl_bench],
Fetch deps:
mix deps.get
Create config folder
mkdir config
Add the following configuration to rkv_bench/config/config.exs
to indicate which
module will implement the benchmark driver:
import Config
config :rcl_bench,
driver_module: RkvBench.Driver
Create the benchmark driver at rkv_bench/lib/rkv_bench/driver.ex
:
defmodule RkvBench.Driver do
@behaviour :rcl_bench_driver
def new(id) do
node = :"[email protected]"
state = %{id: id, node: node, existing: %{}, mod: Rkv}
{:ok, state}
end
def run(:get, keygen, _valuegen, %{node: node, mod: mod} = state) do
key = keygen.()
{_, _} = :rpc.call(node, mod, :get, [key])
{:ok, state}
end
def run(:put, keygen, valuegen, %{existing: existing, node: node, mod: mod} = state) do
key = keygen.()
value = valuegen.()
:ok = :rpc.call(node, mod, :put, [key, value])
{:ok, %{state | existing: Map.put(existing, key, true)}}
end
def run(:get_own_puts, _keygen, _valuegen, %{existing: existing} = state)
when map_size(existing) == 0 do
{:ok, state}
end
def run(:get_own_puts, _keygen, _valuegen, %{existing: existing, node: node, mod: mod} = state) do
max = Enum.count(existing)
take = :rand.uniform(max) - 1
{key, _} = Enum.at(existing, take)
{:ok, _} = :rpc.call(node, mod, :get, [key])
{:ok, state}
end
def terminate(_, _) do
:ok
end
# config callbacks
def mode() do
{:ok, {:rate, :max}}
end
# Number of concurrent workers
def concurrent_workers() do
{:ok, 2}
end
# Test duration (minutes)
def duration() do
{:ok, 1}
end
# Operations (and associated mix)
def operations() do
{:ok, [{:get_own_puts, 3}, {:put, 10}, {:get, 2}]}
end
# Base test output directory
def test_dir() do
{:ok, "tests"}
end
# Key generators
# {uniform_int, N} - Choose a uniformly distributed integer between 0 and N
def key_generator() do
{:ok, {:uniform_int, 100_000}}
end
# Value generators
# {fixed_bin, N} - Fixed size binary blob of N bytes
def value_generator() do
{:ok, {:fixed_bin, 100}}
end
def random_algorithm() do
{:ok, :exsss}
end
def random_seed() do
{:ok, {1, 4, 3}}
end
def shutdown_on_error() do
false
end
end
Compile:
mix compile
Start one Rkv node (rkv
project) with a fixed cookie:
iex --cookie rcl-bench-cookie --name [email protected] -S mix run
Notice that --name [email protected]
is used in the driver.
Start a benchmark node (rkv_bench
project) with the same cookie:
iex --cookie rcl-bench-cookie --name [email protected] -S mix run
It will run for a minute (configured in the driver) and then log:
[info] Benchmark finished
[info] No Errors
It will generate this files (the name of the folder is configured in the driver):
./tests/put_single.csv
./tests/get_single.csv
./tests/get-own-puts_single.csv
Generate graphs:
mkdir benchmark_graphs
# ./scripts/latency.R <op> <csv-path> <image-path>
./scripts/latency.R get rkv_bench/tests/get_single.csv benchmark_graphs/latency_get_single.png
./scripts/latency.R put rkv_bench/tests/put_single.csv benchmark_graphs/latency_put_single.png
./scripts/latency.R get-own-puts rkv_bench/tests/get-own-puts_single.csv benchmark_graphs/latency_get-own-puts_single.png
# ./scripts/throughput.R <op> <csv-path> <image-path>
./scripts/throughput.R get rkv_bench/tests/get_single.csv benchmark_graphs/throughput_get_single.png
./scripts/throughput.R put rkv_bench/tests/put_single.csv benchmark_graphs/throughput_put_single.png
./scripts/throughput.R get-own-puts rkv_bench/tests/get-own-puts_single.csv benchmark_graphs/throughput_get-own-puts_single.png
To install R and libraries on ubuntu:
sudo apt install r-base
R
install.packages("ggplot2")
install.packages("dplyr")
install.packages("scales")
install.packages("lubridate")
tag: 10-handoff
A handoff is a transfer over the network of the keys and associated values from one cluster member to another cluster member. There are four types of handoffs that are supported in riak_core: ownership, hinted, repair, and resize. Of these, the most commonly encountered types are ownership and hinted.
A repair handoff happens when your application explicitly calls
riak_core_vnode_manager:repair/3
– an example implementation of this can be
found in riak_kv_vnode:repair/1
. You might use this when your application
detects some kind of data error during a periodic integrity sweep – you have to
roll your own error detection code; riak_core can’t intuit your application
semantics. Be aware that this operation is a big hammer and if there is a lot
of data in a vnode, you will pay a significant performance and latency penalty
while a repair is on-going between the (physical) nodes involved in the repair
operation.
riak_core is set up to split its hash key space into partitions. The number of keyspaces is defined internally by the “ring size”. By default the ring size is 64. (Currently this number must be a power of two.)
riak_core will figure out how to move vnode data around your cluster members as it conforms to this new partitioning directive and it uses the resize handoff type to achieve this.
An ownership handoff happens when a cluster member joins or leaves the cluster. When a cluster is added or removed, riak_core reassigns the (physical) nodes responsible for each vnode and it uses the ownership handoff type to move the data from its old home to its new home. (The reassignment activity occurs when the “cluster plan” command is executed and the data transfers begin once the “cluster commit” command is executed.)
When the primary vnode for a particular part of the ring is offline, riak_core still accepts operations on it and routes those to a backup partition or “fallback” as its sometimes known in the source code. When the primary vnode comes back online, riak_core uses a hinted handoff type to sync the current vnode state from the fallback(s) to the primary. Once the primary is synchronized, operations are routed to the primary once again.
Edit lib/rkv/vnode.ex
after @behaviour
:
require Logger
require Record
Record.defrecord(
:fold_req_v2,
:riak_core_fold_req_v2,
Record.extract(:riak_core_fold_req_v2, from_lib: "riak_core/include/riak_core_vnode.hrl")
)
replace this functions in lib/rkv/vnode.ex
:
def handoff_starting(_dest, state) do
Logger.debug("handoff_starting #{state.partition}")
{true, state}
end
def handoff_cancelled(state) do
Logger.debug("handoff_cancelled #{state.partition}")
{:ok, state}
end
def handoff_finished(_dest, state) do
Logger.debug("handoff_finished #{state.partition}")
{:ok, state}
end
def handle_handoff_command(fold_req_v2() = fold_req, _sender, state) do
Logger.debug("handoff #{state.partition}")
foldfun = fold_req_v2(fold_req, :foldfun)
acc0 = fold_req_v2(fold_req, :acc0)
acc_final =
state.kv_mod.reduce(
state.kv_state,
fn {k, v}, acc_in ->
Logger.debug("handoff #{state.partition}: #{k} #{v}")
foldfun.(k, v, acc_in)
end,
acc0
)
{:reply, acc_final, state}
end
def handle_handoff_command(_request, _sender, state) do
Logger.debug("Handoff generic request, ignoring #{state.partition}")
{:noreply, state}
end
def is_empty(state) do
is_empty = state.kv_mod.is_empty(state.kv_state)
Logger.debug("is_empty #{state.partition}: #{is_empty}")
{is_empty, state}
end
def terminate(reason, state) do
Logger.debug("terminate #{state.partition}: #{reason}")
:ok
end
def delete(state) do
Logger.debug("delete #{state.partition}")
state.kv_mod.dispose(state.kv_state)
{:ok, state}
end
def handle_handoff_data(bin_data, state) do
{k, v} = :erlang.binary_to_term(bin_data)
state.kv_mod.put(state.kv_state, k, v)
Logger.debug("handle_handoff_data #{state.partition}: #{k} #{v}")
{:reply, :ok, state}
end
def encode_handoff_item(k, v) do
Logger.debug("encode_handoff_item #{k} #{v}")
:erlang.term_to_binary({k, v})
end
We need more functions in our KV behavour to handle handoff, add the following at the end of lib/rkv/kv.ex
:
@callback is_empty(state :: kv_state()) ::
bool()
@callback dispose(state :: kv_state()) ::
:ok | {:error, reason :: term()}
@callback reduce(
state :: kv_state(),
fun :: ({term(), term()}, term() -> term()),
acc0 :: term()
) :: term()
Implement the new callbacks at the end of lib/rkv/kv_ets.ex
:
def is_empty(state) do
:ets.first(state.table_id) == :"$end_of_table"
end
def dispose(state) do
true = :ets.delete(state.table_id)
:ok
end
def reduce(state, fun, acc0) do
:ets.foldl(fun, acc0, state.table_id)
end
Clean existing cluster state:
rm -rf data
Rebuild:
MIX_ENV=node1 mix release --overwrite node1
MIX_ENV=node2 mix release --overwrite node2
MIX_ENV=node3 mix release --overwrite node3
Start node1:
./_build/node1/rel/node1/bin/node1 start_iex
Run in node1:
for i <- :lists.seq(1, 100), do: Rkv.Service.put("k#{i}", i)
Start node2 in another terminal:
./_build/node2/rel/node2/bin/node2 start_iex
Run in node2:
for i <- :lists.seq(101, 200), do: Rkv.Service.put("k#{i}", i)
Start node3 in another terminal:
./_build/node3/rel/node3/bin/node3 start_iex
Run in node3:
for i <- :lists.seq(201, 300), do: Rkv.Service.put("k#{i}", i)
Run in node2 and node3:
:riak_core.join('[email protected]')
Run in node1:
:riak_core_claimant.plan()
:riak_core_claimant.commit()
Periodically run this until it stabilizes:
```elixir
:riak_core_console.member_status([])
You should see something like this on node1:
================================= Membership ==================================
Status Ring Pending Node
-------------------------------------------------------------------------------
valid 37.5% -- '[email protected]'
valid 31.3% -- '[email protected]'
valid 31.3% -- '[email protected]'
-------------------------------------------------------------------------------
Valid:3 / Leaving:0 / Exiting:0 / Joining:0 / Down:0
Periodically run this until it stabilizes:
{:ok, ring} = :riak_core_ring_manager.get_my_ring()
:riak_core_ring.pretty_print(ring, [:legend])
You should see something like this on node1:
==================================== Nodes ====================================
Node a: 6 ( 37.5%) [email protected]
Node b: 5 ( 31.3%) [email protected]
Node c: 5 ( 31.3%) [email protected]
==================================== Ring =====================================
abca|bcab|cabc|abca|
Fetch some key and check which node returns it:
Rkv.Service.get("k23")
tag: 11-redis-api
Add dep to mix.exs
:
{:edis_proto, "~> 0.2.0"},
Update init in lib/rkv/supervisor.ex
to start the Redis Listener:
def init(_args) do
min_port = Application.get_env(:rkv, :redis_min_port, 6379)
max_port = Application.get_env(:rkv, :redis_max_port, 6379)
listener_opts = %{min_port: min_port, max_port: max_port}
listener_sup = {:edis_listener_sup, {:edis_listener_sup, :start_link, [listener_opts]},
:permanent, 1000, :supervisor, [:edis_listener_sup]}
client_opts = %{command_runner_mod: Rkv.Redis.Protocol}
client_sup = {:edis_client_sup, {:edis_client_sup, :start_link, [client_opts]},
:permanent, 1000, :supervisor, [:edis_client_sup]}
children = [
worker(:riak_core_vnode_master, [Rkv.VNode], id: Rkv.VNode_master_worker),
listener_sup,
client_sup
]
Supervisor.init(children, strategy: :one_for_one, max_restarts: 5, max_seconds: 10)
end
Create a new file at lib/rkv/redis_protocol.ex
with the following content:
defmodule Rkv.Redis.Protocol do
def run_command("SET", [key, val]) do
:ok = Rkv.put(key, val)
{:ok, nil}
end
def run_command("GET", [key]) do
case Rkv.get(key) do
{:ok, v} ->
{:ok, nil, v}
{:error, _reason} ->
{:ok, nil, nil}
end
end
def run_command("DEL", [key]) do
:ok = Rkv.delete(key)
{:ok, nil}
end
end
Compile and run:
mix deps.get
mix compile
iex --name [email protected] -S mix run
Install redis tools, in ubuntu:
sudo apt install redis-tools
redis-cli get foo
(nil)
redis-cli set foo 42
OK
redis-cli get foo
42
redis-cli del foo
OK
redis-cli get foo
(nil)
To listen in different ports in node1, node2 and node3 add the following:
config/node1.exs
:
config :rkv,
redis_min_port: 6379,
redis_max_port: 6379
config/node2.exs
:
config :rkv,
redis_min_port: 6479,
redis_max_port: 6479
config/node3.exs
:
config :rkv,
redis_min_port: 6579,
redis_max_port: 6579
Use redis-cli -p 6479
to specify the port of the node you want to send the command to.
tag: 12-persistent-kv-dets
Add a new file at lib/rkv/kv_dets.ex
with the following content:
defmodule Rkv.KV.DETS do
@behaviour Rkv.KV
defmodule State do
defstruct [:table_name]
end
def init(%{uid: uid}) do
table_name = String.to_charlist("dets_#{uid}")
dets_opts = []
{:ok, ^table_name} = :dets.open_file(table_name, dets_opts)
{:ok, %State{table_name: table_name}}
end
def put(state, key, value) do
:dets.insert(state.table_name, {key, value})
end
def get(state, key) do
case :dets.lookup(state.table_name, key) do
[] ->
{:error, :not_found}
[{_, value}] ->
{:ok, value}
end
end
def delete(state, key) do
:dets.delete(state.table_name, key)
:ok
end
def is_empty(state) do
:dets.first(state.table_name) == :"$end_of_table"
end
def dispose(state) do
:dets.delete_all_objects(state.table_name)
end
def reduce(state, fun, acc0) do
:dets.foldl(fun, acc0, state.table_name)
end
end
Add some tests to test/rkv_test.exs
:
defmodule RkvTest do
use ExUnit.Case
doctest Rkv
alias Rkv.KV
def reduce_fn(pair, acc_in) do
[pair | acc_in]
end
test "KV.ETS get" do
{:ok, state} = KV.ETS.init(%{uid: :erlang.unique_integer()})
true = KV.ETS.is_empty(state)
[] = KV.ETS.reduce(state, &reduce_fn/2, [])
{:error, :not_found} = KV.ETS.get(state, :k1)
:ok = KV.ETS.delete(state, :k1)
:ok = KV.ETS.put(state, :k2, :v2)
[{:k2, :v2}] = KV.ETS.reduce(state, &reduce_fn/2, [])
false = KV.ETS.is_empty(state)
{:ok, :v2} = KV.ETS.get(state, :k2)
:ok = KV.ETS.delete(state, :k2)
{:error, :not_found} = KV.ETS.get(state, :k2)
:ok = KV.ETS.dispose(state)
end
test "KV.DETS get" do
{:ok, state} = KV.DETS.init(%{uid: :erlang.unique_integer()})
true = KV.DETS.is_empty(state)
[] = KV.DETS.reduce(state, &reduce_fn/2, [])
{:error, :not_found} = KV.DETS.get(state, :k1)
:ok = KV.DETS.delete(state, :k1)
:ok = KV.DETS.put(state, :k2, :v2)
false = KV.DETS.is_empty(state)
[{:k2, :v2}] = KV.DETS.reduce(state, &reduce_fn/2, [])
{:ok, :v2} = KV.DETS.get(state, :k2)
:ok = KV.DETS.delete(state, :k2)
{:error, :not_found} = KV.DETS.get(state, :k2)
:ok = KV.DETS.dispose(state)
end
end
Update lib/rkv/vnode.ex
to select the KV backend from configuration,
change the first line of the init function:
# ...
def init([partition]) do
kv_mod = Application.get_env(:rkv, :kv_mod, Rkv.KV.ETS)
# ...
Add the selected backend to config/config.exs
:
# ...
config :rkv,
redis_min_port: 6379,
redis_max_port: 6379,
kv_mod: Rkv.KV.DETS
# ...
Compile, test and run:
mix compile
mix test
iex --name [email protected] -S mix run
Test that it works:
Rkv.get(:k1)
{:error, :not_found}
Rkv.delete(:k1)
:ok
Rkv.put(:k2, :v2)
:ok
Rkv.get(:k2)
{:ok, :v2}
Rkv.delete(:k2)
:ok
Rkv.get(:k2)
{:error, :not_found}
Write a key:
Rkv.put(:k2, :v2)
Stop the node and start it again:
iex --name [email protected] -S mix run
Rkv.get(:k2)
The value should still be there:
{:ok, :v2}