Skip to content

Step by Step Riak Core Lite Key Value Store Project

Notifications You must be signed in to change notification settings

riak-core-lite/rkv

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

16 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Rkv

Note: many sections start with a line that says tag: <tag-name> if you do git checkout <tag-name> it will take you to the commit where the changes of that section are all commited, so you can skip them and try the examples or run the tests. To return to the last commit run git checkout main.

Tools and Versions

tag: 01-tools-setup

This tutorial assumes you have the following tools and versions installed:

Erlang/OTP 23.1 Elixir 1.10.4 (and Mix) Git: any recent version is ok

The tutorial was also tested with this versions:

Erlang/OTP 22.3 Elixir 1.9.4

We assume you are running Linux, Mac OS X or WSL on Windows.

The tutorial was tested on Ubuntu 20.04, Mac OS X Catalina and Ubuntu 20.04 for WSL on Windows 10.

Installation Instructions

For instructions to install Erlang, see https://adoptingerlang.org/docs/development/setup/ For instructions to install Elixir, see https://elixir-lang.org/install.html

If you have no preferences you can use asdf, notice that it requires a C compiler toolchain in order to compiler Erlang, see at the end for instructions to do it on ubuntu-like systems, on Mac OS X it should work if you have brew installed.

git clone https://github.com/asdf-vm/asdf.git ~/.asdf --branch v0.8.0
. ~/.asdf/asdf.sh
asdf plugin add erlang
asdf plugin add elixir

asdf list all elixir
asdf list all erlang

asdf install erlang 23.1
asdf install elixir 1.10.4-otp-23

Run these when you want to enable them:

asdf local erlang 23.1
asdf local elixir 1.10.4-otp-23

Install Compiler Tools on Ubuntu

# required: basic tools and libraries needed
# (compiler, curses for the shell, ssl for crypto)
sudo apt-get -y install build-essential m4 libncurses5-dev libssl-dev autoconf unzip

# optonal: if you want odbc support (database connectivity)
sudo apt-get install unixodbc-dev

# optonal: if you want pdf docs you need apache fop and xslt tools and java (fop is a java project)
sudo apt-get install -y fop xsltproc default-jdk

# optional: if you want to build jinterface you need a JDK
sudo apt-get install -y default-jdk

# optional: if you want wx (desktop GUI modules)
sudo apt-get install -y libwxgtk3.0-dev

Project Setup

tag: 02-project-setup

Instal the Riak Core Lite Mix task:

$ mix archive.install hex rcl
Resolving Hex dependencies...
Dependency resolution completed:
New:
  rcl 0.1.9
* Getting rcl (Hex package)
All dependencies are up to date
Compiling 2 files (.ex)
Generated rcl app
Generated archive "rcl-0.1.9.ez" with MIX_ENV=prod
* creating /home/mariano/.asdf/installs/elixir/1.10.4-otp-23/.mix/archives/rcl-0.1.9

If you have an existing version installed it will ask you if you want to replace it, say y:

Found existing entry: /home/mariano/.asdf/installs/elixir/1.10.4-otp-23/.mix/archives/rcl-0.1.9
Are you sure you want to replace it with "rcl-0.1.9.ez"? [Yn]

Create a new project called rkv, to make it simpler don't change the name of the project unless you want to edit every single snippet :)

mix rcl new rkv
Creating project rkv, module Rkv
* creating README.md
* creating .formatter.exs
* creating .gitignore
* creating mix.exs
* creating lib
* creating lib/rkv.ex
* creating test
* creating test/test_helper.exs
* creating test/rkv_test.exs

Your Mix project was created successfully.
You can use "mix" to compile it, test it, and more:

    cd rkv
    mix test

Run "mix help" for more commands.
rcl: creating rkv/mix.exs
rcl: creating rkv/lib/rkv.ex
rcl: creating rkv/lib/rkv/application.ex
rcl: creating rkv/lib/rkv/vnode.ex
rcl: creating rkv/lib/rkv/service.ex
rcl: creating rkv/lib/rkv/supervisor.ex
rcl: creating rkv/config/config.exs
rcl: creating rkv/config/dev.exs
rcl: creating rkv/config/test.exs
rcl: creating rkv/config/ct.exs
rcl: creating rkv/config/node1.exs
rcl: creating rkv/config/node2.exs
rcl: creating rkv/config/node3.exs
rcl: creating rkv/rel/env.bat.eex
rcl: creating rkv/rel/env.sh.eex
rcl: creating rkv/rel/vm.args.eex
cd rkv

Smoke Test

Get deps and compile:

cd rkv
mix deps.get

mix compile

Start the project and attach iex:

iex --name [email protected] -S mix run

Run this in iex:

Rkv.Service.ping()

If after many logs you see something like this (the last number can be different):

{:pong, 2, :"[email protected]", 159851741583067506678528028578343455274867621888}

Then it works!

Smaller Ring (for readability)

tag: 03-ring-size-16

Edit config/config.exs:

# chage
ring_creation_size: 64

# to
ring_creation_size: 16

Note: remove the data folder if it exists since it has a ring file of size 64:

rm -rf data

A Simple Key Value Store

tag: 04-kv-ets

Abstract Key Value Store behaviour:

# lib/rkv/kv.ex

defmodule Rkv.KV do
  @type kv_state :: term()

  @callback init(opts :: %{atom() => term()}) ::
              {:ok, state :: kv_state()} | {:error, reason :: term()}

  @callback put(state :: kv_state(), key :: term(), value :: term()) ::
              :ok | {:error, reason :: term()}
  @callback get(state :: kv_state(), key :: term()) ::
              {:ok, value :: term()} | {:error, reason :: term()}
  @callback delete(state :: kv_state(), key :: term()) ::
              :ok | {:error, reason :: term()}
end

In-memory implementation using ETS:

# lib/rkv/kv_ets.ex

defmodule Rkv.KV.ETS do
  @behaviour Rkv.KV

  defmodule State do
    defstruct [:table_name, :table_id]
  end

  def init(%{uid: uid}) do
    table_name = String.to_atom("kv_ets_#{inspect(uid)}")
    ets_opts = [:set, {:write_concurrency, false}, {:read_concurrency, false}]
    table_id = :ets.new(table_name, ets_opts)

    {:ok, %State{table_name: table_name, table_id: table_id}}
  end

  def put(state, key, value) do
    true = :ets.insert(state.table_id, {key, value})
    :ok
  end

  def get(state, key) do
    case :ets.lookup(state.table_id, key) do
      [] ->
        {:error, :not_found}

      [{_, value}] ->
        {:ok, value}
    end
  end

  def delete(state, key) do
    true = :ets.delete(state.table_id, key)
    :ok
  end
end

Some tests:

# test/rkv_test.exs

defmodule RkvTest do
  use ExUnit.Case
  doctest Rkv

  test "KV.ETS get" do
    {:ok, state} = Rkv.KV.ETS.init(%{uid: :erlang.unique_integer()})

    {:error, :not_found} = Rkv.KV.ETS.get(state, :k1)
    :ok = Rkv.KV.ETS.delete(state, :k1)

    :ok = Rkv.KV.ETS.put(state, :k2, :v2)
    {:ok, :v2} = Rkv.KV.ETS.get(state, :k2)
    :ok = Rkv.KV.ETS.delete(state, :k2)
    {:error, :not_found} = Rkv.KV.ETS.get(state, :k2)
  end
end

Test it:

mix test

Making the Key Value Store Distributed

tag: 05-kv-vnode-commands

Build 3 releases with different configurations to run them on the same machine:

MIX_ENV=node1 mix release node1
MIX_ENV=node2 mix release node2
MIX_ENV=node3 mix release node3

On terminal 1 (node1):

./_build/node1/rel/node1/bin/node1 start_iex

On terminal 2 (node2):

./_build/node2/rel/node2/bin/node2 start_iex

On terminal 3 (node3):

./_build/node3/rel/node3/bin/node3 start_iex

Run on node2 and node3:

:riak_core.join('[email protected]')

You should see something like this on node1:

[info]  '[email protected]' joined cluster with status 'joining'
[info]  '[email protected]' joined cluster with status 'joining'

Run on node1:

:riak_core_claimant.plan()
:riak_core_claimant.commit()

Periodically run this until it stabilizes:

:riak_core_console.member_status([])

You should see something like this on node1:

================================= Membership ==================================
Status     Ring    Pending    Node
-------------------------------------------------------------------------------
valid      37.5%      --      '[email protected]'
valid      31.3%      --      '[email protected]'
valid      31.3%      --      '[email protected]'
-------------------------------------------------------------------------------
Valid:3 / Leaving:0 / Exiting:0 / Joining:0 / Down:0

Periodically run this until it stabilizes:

{:ok, ring} = :riak_core_ring_manager.get_my_ring()
:riak_core_ring.pretty_print(ring, [:legend])

You should see something like this on node1:

==================================== Nodes ====================================
Node a: 6 ( 37.5%) [email protected]
Node b: 5 ( 31.3%) [email protected]
Node c: 5 ( 31.3%) [email protected]
==================================== Ring =====================================
abca|bcab|cabc|abca|

Run these (play with the argument value) and see which node and partition replies

Rkv.Service.ping(3)
Rkv.Service.ping(5)
Rkv.Service.ping(7)

Let's add get, put and delete to Rkv.Service (lib/rkv/service.ex):

defmodule Rkv.Service do
  def ping(v \\ 1) do
    send_cmd("ping#{v}", {:ping, v})
  end

  def put(k, v) do
    send_cmd(k, {:put, {k, v}})
  end

  def get(k) do
    send_cmd(k, {:get, k})
  end

  def delete(k) do
    send_cmd(k, {:delete, k})
  end

  defp send_cmd(k, cmd) do
    idx = :riak_core_util.chash_key({"rkv", k})
    pref_list = :riak_core_apl.get_primary_apl(idx, 1, Rkv.Service)

    [{index_node, _type}] = pref_list

    :riak_core_vnode_master.sync_command(index_node, cmd, Rkv.VNode_master)
  end
end

Implement the commands in lib/rkv/vnode.ex, change init to:

  def init([partition]) do
    kv_mod = Rkv.KV.ETS
    {:ok, state} = kv_mod.init(%{uid: partition})
    {:ok, %{partition: partition, kv_mod: kv_mod, kv_state: state}}
  end

Add the following 3 clauses after :ping:

  def handle_command({:get, k}, _sender, state) do
    result = state.kv_mod.get(state.kv_state, k)
    {:reply, {result, node(), state.partition}, state}
  end

  def handle_command({:put, {k, v}}, _sender, state) do
    result = state.kv_mod.put(state.kv_state, k, v)
    {:reply, {result, node(), state.partition}, state}
  end

  def handle_command({:delete, k}, _sender, state) do
    result = state.kv_mod.delete(state.kv_state, k)
    {:reply, {result, node(), state.partition}, state}
  end

Compile and run:

mix compile
iex --name [email protected] -S mix run

Test the new functions:

Rkv.Service.get(:k1)
{{:error, :not_found}, :"[email protected]", 639406966332270026714112114313373821099470487552}
Rkv.Service.delete(:k1)
{:ok, :"[email protected]", 639406966332270026714112114313373821099470487552}
Rkv.Service.put(:k2, :v2)
{:ok, :"[email protected]", 685078892498860742907977265335757665463718379520}
Rkv.Service.get(:k2)
{{:ok, :v2}, :"[email protected]", 685078892498860742907977265335757665463718379520}
Rkv.Service.delete(:k2)
{:ok, :"[email protected]", 685078892498860742907977265335757665463718379520}
Rkv.Service.get(:k2)
{{:error, :not_found}, :"[email protected]", 685078892498860742907977265335757665463718379520}

External API Module

tag: 06-kv-external-api

Let's wrap Rkv.Service with an external API that doesn't expose so much internal state that's only useful for learning , some tests and development but not much for production:

Change lib/rkv.ex to:

defmodule Rkv do
  def ping(v \\ 1) do
    {r, _, _} = Rkv.Service.ping(v)
    r
  end

  def put(k, v) do
    {r, _, _} = Rkv.Service.put(k, v)
    r
  end

  def get(k) do
    {r, _, _} = Rkv.Service.get(k)
    r
  end

  def delete(k) do
    {r, _, _} = Rkv.Service.delete(k)
    r
  end
end

Compile and Run:

mix compile
iex --name [email protected] -S mix run
Rkv.get(:k1)
{:error, :not_found}
Rkv.delete(:k1)
:ok
Rkv.put(:k2, :v2)
:ok
Rkv.get(:k2)
{:ok, :v2}
Rkv.delete(:k2)
:ok
Rkv.get(:k2)
{:error, :not_found}

Quorun Commands

tag: 07-quorum-commands

Add the following functions to lib/rkv/service.ex:

  def put_quorum(k, v, n, w, timeout_ms) do
    quorum_cmd(k, {:put, {k, v}}, n, w, timeout_ms)
  end

  def get_quorum(k, n, w, timeout_ms) do
    quorum_cmd(k, {:get, k}, n, w, timeout_ms)
  end

  def delete_quorum(k, n, w, timeout_ms) do
    quorum_cmd(k, {:delete, k}, n, w, timeout_ms)
  end

  defp quorum_cmd(k, cmd, n, w, timeout_ms) do
    ref = make_ref()
    opts = %{ref: ref, from: self(), w: w, wait_timeout_ms: timeout_ms}
    :riak_core_quorum_statem.quorum_request({"rkv", k}, cmd, n, Rkv.Service, Rkv.VNode_master, opts)

    receive do
      {^ref, res} -> res
    end
  end

Compile and Run:

mix compile
iex --name [email protected] -S mix run

Try the new functions:

Rkv.Service.get_quorum(:k1, 3, 2, 1000)
{:ok,
 %{
   reason: :finished,
   result: [
     {{:error, :not_found}, :"[email protected]",
      822094670998632891489572718402909198556462055424},
     {{:error, :not_found}, :"[email protected]",
      639406966332270026714112114313373821099470487552}
   ]
 }}
Rkv.Service.get_quorum(:k1, 3, 3, 1000)
{:ok,
 %{
   reason: :finished,
   result: [
     {{:error, :not_found}, :"[email protected]",
      730750818665451459101842416358141509827966271488},
     {{:error, :not_found}, :"[email protected]",
      822094670998632891489572718402909198556462055424},
     {{:error, :not_found}, :"[email protected]",
      639406966332270026714112114313373821099470487552}
   ]
 }}
Rkv.Service.put_quorum(:k1, :v1, 3, 3, 1000)
{:ok,
 %{
   reason: :finished,
   result: [
     {:ok, :"[email protected]", 730750818665451459101842416358141509827966271488},
     {:ok, :"[email protected]", 822094670998632891489572718402909198556462055424},
     {:ok, :"[email protected]", 639406966332270026714112114313373821099470487552}
   ]
 }}
Rkv.Service.get_quorum(:k1, 3, 3, 1000)
{:ok,
 %{
   reason: :finished,
   result: [
     {{:ok, :v1}, :"[email protected]",
      730750818665451459101842416358141509827966271488},
     {{:ok, :v1}, :"[email protected]",
      639406966332270026714112114313373821099470487552},
     {{:ok, :v1}, :"[email protected]",
      822094670998632891489572718402909198556462055424}
   ]
 }}
Rkv.Service.delete_quorum(:k1, 3, 3, 1000)
{:ok,
 %{
   reason: :finished,
   result: [
     {:ok, :"[email protected]", 822094670998632891489572718402909198556462055424},
     {:ok, :"[email protected]", 730750818665451459101842416358141509827966271488},
     {:ok, :"[email protected]", 639406966332270026714112114313373821099470487552}
   ]
 }}
Rkv.Service.get_quorum(:k1, 3, 3, 1000)
{:ok,
 %{
   reason: :finished,
   result: [
     {{:error, :not_found}, :"[email protected]",
      822094670998632891489572718402909198556462055424},
     {{:error, :not_found}, :"[email protected]",
      730750818665451459101842416358141509827966271488},
     {{:error, :not_found}, :"[email protected]",
      639406966332270026714112114313373821099470487552}
   ]
 }}

Testing

tag: 08-testing

Add this deps to mix.exs:

      {:ctex, "~> 0.1.0", env: :ct},
      {:rcl_test, "~> 0.2.0", env: :ct}

Fetch new deps:

mix deps.get

Create a folder for Common Test suite:

mkdir ct

Add our first Common Test suite:

Create a test suite at ct/rkv_SUITE.exs:

defmodule Rkv_SUITE do
  def all() do
    [:simple_test]
  end

  def init_per_suite(config) do
    config
  end

  def end_per_suite(config) do
    config
  end

  def simple_test(_config) do
    1 = 1
  end
end

Run it:

MIX_ENV=ct mix ct

Test a single node

Change ct/rkv_SUITE.exs to this:

defmodule Rkv_SUITE do
  def all() do
    [:simple_test, :get_not_found]
  end

  def init_per_suite(config) do
    common_config = %{
      app: :rkv,
      build_env: "dev",
      data_dir_name: "rkv-data",
      setup_node_fn: fn (_) -> :ok end,
    }
    nodes_config = %{dev1: %{base_port: 10015}}
    :rcl_test.init_nodes(__MODULE__, config, common_config, nodes_config)
  end

  def end_per_suite(config) do
    config
  end

  def simple_test(_config) do
    1 = 1
  end

  def get_not_found(config) do
    [node] = :test_server.lookup_config(:nodes, config)
    key = :k1
    {:error, :not_found} = :rpc.call(node, Rkv, :get, [key])
  end
end

Run it again:

MIX_ENV=ct mix ct

Test a cluster

Add cluster_join test to all in ct/rkv_SUITE.exs:

  def all() do
    [:simple_test, :get_not_found, :cluster_join]
  end

Add the test implementation at the end of ct/rkv_SUITE.exs:

  def cluster_join(config0) do
    common_config = %{
      app: :rkv,
      build_env: "dev",
      data_dir_name: "rkv-data",
      setup_node_fn: fn (_) -> :ok end,
    }
    nodes_config = %{
      node1: %{base_port: 10115},
      node2: %{base_port: 10215},
      node3: %{base_port: 10315},
    }
    config = :rcl_test.init_nodes(__MODULE__, config0, common_config, nodes_config)
    [node1, node2, node3] = :test_server.lookup_config(:nodes, config)

    :ok = :rcl_test.add_nodes_to_cluster(node1, [node2, node3])

    key = :k1
    val = :v1
    {:error, :not_found} = :rpc.call(node1, Rkv, :get, [key])
    {:error, :not_found} = :rpc.call(node2, Rkv, :get, [key])
    {:error, :not_found} = :rpc.call(node3, Rkv, :get, [key])

    :ok = :rpc.call(node1, Rkv, :put, [key, val])
    {:ok, ^val} = :rpc.call(node2, Rkv, :get, [key])
  end

Run it again:

MIX_ENV=ct mix ct

Benchmarking

tag: 09-benchmarking

Create a new project inside rkv:

mix new rkv_bench --sup
cd rkv_bench

Add the following dependency to rkv_bench/mix.exs:

{:rcl_bench, "~> 0.1.0"}

Add :rcl_bench to :extra_applications in rkv_bench/mix.exs:

extra_applications: [:logger, :rcl_bench],

Fetch deps:

mix deps.get

Create config folder

mkdir config

Add the following configuration to rkv_bench/config/config.exs to indicate which module will implement the benchmark driver:

import Config

config :rcl_bench,
  driver_module: RkvBench.Driver

Create the benchmark driver at rkv_bench/lib/rkv_bench/driver.ex:

defmodule RkvBench.Driver do
  @behaviour :rcl_bench_driver

  def new(id) do
    node = :"[email protected]"
    state = %{id: id, node: node, existing: %{}, mod: Rkv}
    {:ok, state}
  end

  def run(:get, keygen, _valuegen, %{node: node, mod: mod} = state) do
    key = keygen.()
    {_, _} = :rpc.call(node, mod, :get, [key])
    {:ok, state}
  end

  def run(:put, keygen, valuegen, %{existing: existing, node: node, mod: mod} = state) do
    key = keygen.()
    value = valuegen.()
    :ok = :rpc.call(node, mod, :put, [key, value])
    {:ok, %{state | existing: Map.put(existing, key, true)}}
  end

  def run(:get_own_puts, _keygen, _valuegen, %{existing: existing} = state)
      when map_size(existing) == 0 do
    {:ok, state}
  end

  def run(:get_own_puts, _keygen, _valuegen, %{existing: existing, node: node, mod: mod} = state) do
    max = Enum.count(existing)
    take = :rand.uniform(max) - 1
    {key, _} = Enum.at(existing, take)
    {:ok, _} = :rpc.call(node, mod, :get, [key])
    {:ok, state}
  end

  def terminate(_, _) do
    :ok
  end

  # config callbacks

  def mode() do
    {:ok, {:rate, :max}}
  end

  # Number of concurrent workers
  def concurrent_workers() do
    {:ok, 2}
  end

  # Test duration (minutes)
  def duration() do
    {:ok, 1}
  end

  # Operations (and associated mix)
  def operations() do
    {:ok, [{:get_own_puts, 3}, {:put, 10}, {:get, 2}]}
  end

  # Base test output directory
  def test_dir() do
    {:ok, "tests"}
  end

  # Key generators
  # {uniform_int, N} - Choose a uniformly distributed integer between 0 and N
  def key_generator() do
    {:ok, {:uniform_int, 100_000}}
  end

  # Value generators
  # {fixed_bin, N} - Fixed size binary blob of N bytes
  def value_generator() do
    {:ok, {:fixed_bin, 100}}
  end

  def random_algorithm() do
    {:ok, :exsss}
  end

  def random_seed() do
    {:ok, {1, 4, 3}}
  end

  def shutdown_on_error() do
    false
  end
end

Compile:

mix compile

Start one Rkv node (rkv project) with a fixed cookie:

iex --cookie rcl-bench-cookie --name [email protected] -S mix run

Notice that --name [email protected] is used in the driver.

Start a benchmark node (rkv_bench project) with the same cookie:

iex --cookie rcl-bench-cookie --name [email protected] -S mix run

It will run for a minute (configured in the driver) and then log:

[info]  Benchmark finished
[info]  No Errors

It will generate this files (the name of the folder is configured in the driver):

./tests/put_single.csv
./tests/get_single.csv
./tests/get-own-puts_single.csv

Generate graphs:

mkdir benchmark_graphs
# ./scripts/latency.R <op> <csv-path> <image-path>
./scripts/latency.R get rkv_bench/tests/get_single.csv benchmark_graphs/latency_get_single.png
./scripts/latency.R put rkv_bench/tests/put_single.csv benchmark_graphs/latency_put_single.png
./scripts/latency.R get-own-puts rkv_bench/tests/get-own-puts_single.csv benchmark_graphs/latency_get-own-puts_single.png

# ./scripts/throughput.R <op> <csv-path> <image-path>
./scripts/throughput.R get rkv_bench/tests/get_single.csv benchmark_graphs/throughput_get_single.png
./scripts/throughput.R put rkv_bench/tests/put_single.csv benchmark_graphs/throughput_put_single.png
./scripts/throughput.R get-own-puts rkv_bench/tests/get-own-puts_single.csv benchmark_graphs/throughput_get-own-puts_single.png

To install R and libraries on ubuntu:

sudo apt install r-base
R
install.packages("ggplot2")
install.packages("dplyr")
install.packages("scales")
install.packages("lubridate")

Handoff

tag: 10-handoff

source

What is a handoff?

A handoff is a transfer over the network of the keys and associated values from one cluster member to another cluster member. There are four types of handoffs that are supported in riak_core: ownership, hinted, repair, and resize. Of these, the most commonly encountered types are ownership and hinted.

Repairs

A repair handoff happens when your application explicitly calls riak_core_vnode_manager:repair/3 – an example implementation of this can be found in riak_kv_vnode:repair/1. You might use this when your application detects some kind of data error during a periodic integrity sweep – you have to roll your own error detection code; riak_core can’t intuit your application semantics. Be aware that this operation is a big hammer and if there is a lot of data in a vnode, you will pay a significant performance and latency penalty while a repair is on-going between the (physical) nodes involved in the repair operation.

Resize

riak_core is set up to split its hash key space into partitions. The number of keyspaces is defined internally by the “ring size”. By default the ring size is 64. (Currently this number must be a power of two.)

riak_core will figure out how to move vnode data around your cluster members as it conforms to this new partitioning directive and it uses the resize handoff type to achieve this.

Ownership

An ownership handoff happens when a cluster member joins or leaves the cluster. When a cluster is added or removed, riak_core reassigns the (physical) nodes responsible for each vnode and it uses the ownership handoff type to move the data from its old home to its new home. (The reassignment activity occurs when the “cluster plan” command is executed and the data transfers begin once the “cluster commit” command is executed.)

Hinted

When the primary vnode for a particular part of the ring is offline, riak_core still accepts operations on it and routes those to a backup partition or “fallback” as its sometimes known in the source code. When the primary vnode comes back online, riak_core uses a hinted handoff type to sync the current vnode state from the fallback(s) to the primary. Once the primary is synchronized, operations are routed to the primary once again.

Edit lib/rkv/vnode.ex after @behaviour:

  require Logger
  require Record

  Record.defrecord(
    :fold_req_v2,
    :riak_core_fold_req_v2,
    Record.extract(:riak_core_fold_req_v2, from_lib: "riak_core/include/riak_core_vnode.hrl")
  )

replace this functions in lib/rkv/vnode.ex:

  def handoff_starting(_dest, state) do
    Logger.debug("handoff_starting #{state.partition}")
    {true, state}
  end

  def handoff_cancelled(state) do
    Logger.debug("handoff_cancelled #{state.partition}")
    {:ok, state}
  end

  def handoff_finished(_dest, state) do
    Logger.debug("handoff_finished #{state.partition}")
    {:ok, state}
  end

  def handle_handoff_command(fold_req_v2() = fold_req, _sender, state) do
    Logger.debug("handoff #{state.partition}")
    foldfun = fold_req_v2(fold_req, :foldfun)
    acc0 = fold_req_v2(fold_req, :acc0)

    acc_final =
      state.kv_mod.reduce(
        state.kv_state,
        fn {k, v}, acc_in ->
          Logger.debug("handoff #{state.partition}: #{k} #{v}")
          foldfun.(k, v, acc_in)
        end,
        acc0
      )

    {:reply, acc_final, state}
  end

  def handle_handoff_command(_request, _sender, state) do
    Logger.debug("Handoff generic request, ignoring #{state.partition}")
    {:noreply, state}
  end

  def is_empty(state) do
    is_empty = state.kv_mod.is_empty(state.kv_state)
    Logger.debug("is_empty #{state.partition}: #{is_empty}")
    {is_empty, state}
  end

  def terminate(reason, state) do
    Logger.debug("terminate #{state.partition}: #{reason}")
    :ok
  end

  def delete(state) do
    Logger.debug("delete #{state.partition}")
    state.kv_mod.dispose(state.kv_state)
    {:ok, state}
  end

  def handle_handoff_data(bin_data, state) do
    {k, v} = :erlang.binary_to_term(bin_data)
    state.kv_mod.put(state.kv_state, k, v)
    Logger.debug("handle_handoff_data #{state.partition}: #{k} #{v}")
    {:reply, :ok, state}
  end

  def encode_handoff_item(k, v) do
    Logger.debug("encode_handoff_item #{k} #{v}")
    :erlang.term_to_binary({k, v})
  end

We need more functions in our KV behavour to handle handoff, add the following at the end of lib/rkv/kv.ex:

  @callback is_empty(state :: kv_state()) ::
              bool()
  @callback dispose(state :: kv_state()) ::
              :ok | {:error, reason :: term()}

  @callback reduce(
              state :: kv_state(),
              fun :: ({term(), term()}, term() -> term()),
              acc0 :: term()
            ) :: term()

Implement the new callbacks at the end of lib/rkv/kv_ets.ex:

  def is_empty(state) do
    :ets.first(state.table_id) == :"$end_of_table"
  end

  def dispose(state) do
    true = :ets.delete(state.table_id)
    :ok
  end

  def reduce(state, fun, acc0) do
    :ets.foldl(fun, acc0, state.table_id)
  end

Clean existing cluster state:

rm -rf data

Rebuild:

MIX_ENV=node1 mix release --overwrite node1
MIX_ENV=node2 mix release --overwrite node2
MIX_ENV=node3 mix release --overwrite node3

Start node1:

./_build/node1/rel/node1/bin/node1 start_iex

Run in node1:

for i <- :lists.seq(1, 100), do: Rkv.Service.put("k#{i}", i)

Start node2 in another terminal:

./_build/node2/rel/node2/bin/node2 start_iex

Run in node2:

for i <- :lists.seq(101, 200), do: Rkv.Service.put("k#{i}", i)

Start node3 in another terminal:

./_build/node3/rel/node3/bin/node3 start_iex

Run in node3:

for i <- :lists.seq(201, 300), do: Rkv.Service.put("k#{i}", i)

Run in node2 and node3:

:riak_core.join('[email protected]')

Run in node1:

:riak_core_claimant.plan()
:riak_core_claimant.commit()
Periodically run this until it stabilizes:

```elixir
:riak_core_console.member_status([])

You should see something like this on node1:

================================= Membership ==================================
Status     Ring    Pending    Node
-------------------------------------------------------------------------------
valid      37.5%      --      '[email protected]'
valid      31.3%      --      '[email protected]'
valid      31.3%      --      '[email protected]'
-------------------------------------------------------------------------------
Valid:3 / Leaving:0 / Exiting:0 / Joining:0 / Down:0

Periodically run this until it stabilizes:

{:ok, ring} = :riak_core_ring_manager.get_my_ring()
:riak_core_ring.pretty_print(ring, [:legend])

You should see something like this on node1:

==================================== Nodes ====================================
Node a: 6 ( 37.5%) [email protected]
Node b: 5 ( 31.3%) [email protected]
Node c: 5 ( 31.3%) [email protected]
==================================== Ring =====================================
abca|bcab|cabc|abca|

Fetch some key and check which node returns it:

Rkv.Service.get("k23")

Redis Compatible API

tag: 11-redis-api

Add dep to mix.exs:

      {:edis_proto, "~> 0.2.0"},

Update init in lib/rkv/supervisor.ex to start the Redis Listener:

  def init(_args) do
    min_port = Application.get_env(:rkv, :redis_min_port, 6379)
    max_port = Application.get_env(:rkv, :redis_max_port, 6379)
    listener_opts = %{min_port: min_port, max_port: max_port}
    listener_sup = {:edis_listener_sup, {:edis_listener_sup, :start_link, [listener_opts]},
        :permanent, 1000, :supervisor, [:edis_listener_sup]}
    client_opts = %{command_runner_mod: Rkv.Redis.Protocol}
    client_sup = {:edis_client_sup, {:edis_client_sup, :start_link, [client_opts]},
        :permanent, 1000, :supervisor, [:edis_client_sup]}

    children = [
      worker(:riak_core_vnode_master, [Rkv.VNode], id: Rkv.VNode_master_worker),
      listener_sup,
      client_sup
    ]

    Supervisor.init(children, strategy: :one_for_one, max_restarts: 5, max_seconds: 10)
  end

Create a new file at lib/rkv/redis_protocol.ex with the following content:

defmodule Rkv.Redis.Protocol do
  def run_command("SET", [key, val]) do
    :ok = Rkv.put(key, val)
    {:ok, nil}
  end

  def run_command("GET", [key]) do
    case Rkv.get(key) do
      {:ok, v} ->
        {:ok, nil, v}

      {:error, _reason} ->
        {:ok, nil, nil}
    end
  end

  def run_command("DEL", [key]) do
    :ok = Rkv.delete(key)
    {:ok, nil}
  end
end

Compile and run:

mix deps.get
mix compile
iex --name [email protected] -S mix run

Install redis tools, in ubuntu:

sudo apt install redis-tools
redis-cli get foo
(nil)

redis-cli set foo 42
OK

redis-cli get foo
42

redis-cli del foo
OK

redis-cli get foo
(nil)

To listen in different ports in node1, node2 and node3 add the following:

config/node1.exs:

config :rkv,
  redis_min_port: 6379,
  redis_max_port: 6379

config/node2.exs:

config :rkv,
  redis_min_port: 6479,
  redis_max_port: 6479

config/node3.exs:

config :rkv,
  redis_min_port: 6579,
  redis_max_port: 6579

Use redis-cli -p 6479 to specify the port of the node you want to send the command to.

Persistent KV Implementation with DETS

tag: 12-persistent-kv-dets

Add a new file at lib/rkv/kv_dets.ex with the following content:

defmodule Rkv.KV.DETS do
  @behaviour Rkv.KV

  defmodule State do
    defstruct [:table_name]
  end

  def init(%{uid: uid}) do

    table_name = String.to_charlist("dets_#{uid}")
    dets_opts = []
    {:ok, ^table_name} = :dets.open_file(table_name, dets_opts)

    {:ok, %State{table_name: table_name}}
  end

  def put(state, key, value) do
    :dets.insert(state.table_name, {key, value})
  end

  def get(state, key) do
    case :dets.lookup(state.table_name, key) do
      [] ->
        {:error, :not_found}

      [{_, value}] ->
        {:ok, value}
    end
  end

  def delete(state, key) do
    :dets.delete(state.table_name, key)
    :ok
  end

  def is_empty(state) do
    :dets.first(state.table_name) == :"$end_of_table"
  end

  def dispose(state) do
    :dets.delete_all_objects(state.table_name)
  end

  def reduce(state, fun, acc0) do
    :dets.foldl(fun, acc0, state.table_name)
  end
end

Add some tests to test/rkv_test.exs:

defmodule RkvTest do
  use ExUnit.Case
  doctest Rkv
  alias Rkv.KV

  def reduce_fn(pair, acc_in) do
    [pair | acc_in]
  end

  test "KV.ETS get" do
    {:ok, state} = KV.ETS.init(%{uid: :erlang.unique_integer()})

    true = KV.ETS.is_empty(state)
    [] = KV.ETS.reduce(state, &reduce_fn/2, [])
    {:error, :not_found} = KV.ETS.get(state, :k1)
    :ok = KV.ETS.delete(state, :k1)

    :ok = KV.ETS.put(state, :k2, :v2)
    [{:k2, :v2}] = KV.ETS.reduce(state, &reduce_fn/2, [])
    false = KV.ETS.is_empty(state)
    {:ok, :v2} = KV.ETS.get(state, :k2)
    :ok = KV.ETS.delete(state, :k2)
    {:error, :not_found} = KV.ETS.get(state, :k2)
    :ok = KV.ETS.dispose(state)
  end

  test "KV.DETS get" do
    {:ok, state} = KV.DETS.init(%{uid: :erlang.unique_integer()})

    true = KV.DETS.is_empty(state)
    [] = KV.DETS.reduce(state, &reduce_fn/2, [])
    {:error, :not_found} = KV.DETS.get(state, :k1)
    :ok = KV.DETS.delete(state, :k1)

    :ok = KV.DETS.put(state, :k2, :v2)
    false = KV.DETS.is_empty(state)
    [{:k2, :v2}] = KV.DETS.reduce(state, &reduce_fn/2, [])
    {:ok, :v2} = KV.DETS.get(state, :k2)
    :ok = KV.DETS.delete(state, :k2)
    {:error, :not_found} = KV.DETS.get(state, :k2)
    :ok = KV.DETS.dispose(state)
  end
end

Update lib/rkv/vnode.ex to select the KV backend from configuration, change the first line of the init function:

# ...
  def init([partition]) do
    kv_mod = Application.get_env(:rkv, :kv_mod, Rkv.KV.ETS)
# ...

Add the selected backend to config/config.exs:

# ...
config :rkv,
  redis_min_port: 6379,
  redis_max_port: 6379,
  kv_mod: Rkv.KV.DETS
# ...

Compile, test and run:

mix compile
mix test
iex --name [email protected] -S mix run

Test that it works:

Rkv.get(:k1)
{:error, :not_found}
Rkv.delete(:k1)
:ok
Rkv.put(:k2, :v2)
:ok
Rkv.get(:k2)
{:ok, :v2}
Rkv.delete(:k2)
:ok
Rkv.get(:k2)
{:error, :not_found}

Write a key:

Rkv.put(:k2, :v2)

Stop the node and start it again:

iex --name [email protected] -S mix run
Rkv.get(:k2)

The value should still be there:

{:ok, :v2}