Skip to content

Commit

Permalink
Fix migration to work with large number of events in brook view state
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Balser authored and bbalser committed Oct 29, 2019
1 parent c1193bb commit 4ae4bbc
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 18 deletions.
2 changes: 1 addition & 1 deletion lib/brook/storage/redis.ex
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ defmodule Brook.Storage.Redis do

put(instance, __MODULE__, %{namespace: namespace, redix: redix, event_limits: event_limits})

Brook.Storage.Redis.Migration.migrate(instance, redix, namespace)
Brook.Storage.Redis.Migration.migrate(instance, redix, namespace, event_limits)

{:ok, %{namespace: namespace, redix: redix, event_limits: event_limits}}
end
Expand Down
42 changes: 34 additions & 8 deletions lib/brook/storage/redis/migration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ defmodule Brook.Storage.Redis.Migration do

@expiration 60 * 60 * 24 * 7

def migrate(instance, redix, namespace) do
def migrate(instance, redix, namespace, event_limits) do
regex = ~r/^#{namespace}:(?<collection>[[:alnum:]-_]+):(?<key>[[:alnum:]-_]+)$/

view_state_entries =
keys(redix, "#{namespace}:*")
|> Enum.filter(&Regex.match?(regex, &1))
|> Enum.map(fn key -> Regex.named_captures(regex, key) |> Map.put("redis_key", key) end)
|> Enum.map(&get_old_view_state(redix, &1))
|> Enum.map(&get_old_events(redix, &1))
|> Enum.map(&get_old_events(redix, event_limits, &1))

Enum.each(view_state_entries, &save_view_state(instance, &1))
Enum.each(view_state_entries, &move_old_entry(redix, &1))
Expand All @@ -24,15 +24,37 @@ defmodule Brook.Storage.Redis.Migration do
Map.put(view_state, "value", view_entry.value)
end

defp get_old_events(redix, view_state) do
defp get_old_events(redix, event_limits, view_state) do
events_key = view_state["redis_key"] <> ":events"

old_events =
Redix.command!(redix, ["LRANGE", events_key, 0, -1])
|> Enum.map(&:erlang.binary_to_term/1)
|> Enum.map(&ensure_event_fields/1)
events_by_type =
Stream.iterate(0, fn x -> x + 1000 end)
|> Stream.map(fn start -> Redix.command!(redix, ["LRANGE", events_key, start, start + 999]) end)
|> Stream.take_while(fn list -> length(list) > 0 end)
|> Stream.map(&binary_to_term/1)
|> Stream.map(&ensure_event_fields/1)
|> Enum.reduce(%{}, fn list, acc ->
Enum.reduce(list, acc, fn event, acc2 ->
Map.update(acc2, event.type, [event], fn cur -> cur ++ [event] end)
end)
|> Enum.map(fn {type, list} ->
{type, ensure_event_limit(event_limits, type, list)}
end)
|> Map.new()
end)

Map.put(view_state, "events", old_events)
Map.put(view_state, "events", Map.values(events_by_type) |> List.flatten())
end

defp ensure_event_limit(event_limits, type, list) do
case Map.get(event_limits, type, :no_limit) do
:no_limit -> list
limit -> Enum.drop(list, length(list) - limit)
end
end

defp binary_to_term(list) when is_list(list) do
Enum.map(list, &:erlang.binary_to_term/1)
end

defp save_view_state(instance, view_state) do
Expand All @@ -53,6 +75,10 @@ defmodule Brook.Storage.Redis.Migration do
Redix.pipeline!(redix, commands)
end

defp ensure_event_fields(list) when is_list(list) do
Enum.map(list, &ensure_event_fields/1)
end

defp ensure_event_fields(event) do
event
|> Map.update(:author, "migrated_default", fn author -> author end)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Brook.MixProject do
def project do
[
app: :brook,
version: "0.4.6",
version: "0.4.7",
elixir: "~> 1.8",
description: description(),
package: package(),
Expand Down
24 changes: 16 additions & 8 deletions test/integration/brook/storage/migration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@ defmodule Brook.Storage.Redis.MigrationTest do
redis_value = %{key: 1, value: view_state}
Redix.command!(redix, ["SET", "#{@namespace}:#{@collection}:1", :erlang.term_to_binary(redis_value)])

events = [
Brook.Event.new(type: "type1", author: "testing", data: 1, create_ts: 0),
Brook.Event.new(type: "type2", author: "testing", data: 2, create_ts: 1)
]
events =
1..2000
|> Enum.map(fn i ->
type = "type#{rem(i, 2)}"
Brook.Event.new(type: type, author: "testing", data: i, create_ts: i)
end)

events
|> Enum.map(&:erlang.term_to_binary(&1, compressed: 9))
|> Enum.each(&Redix.command!(redix, ["RPUSH", "#{@namespace}:#{@collection}:1:events", &1]))

Migration.migrate(@instance, redix, @namespace) |> IO.inspect(label: "output")
Migration.migrate(@instance, redix, @namespace, %{"type0" => 500})

assert {:ok, view_state} == Redis.get(@instance, @collection, 1)
assert {:ok, events} == Redis.get_events(@instance, @collection, 1)
{:ok, stored_events} = Redis.get_events(@instance, @collection, 1)
assert filter_type(events, "type0") |> Enum.drop(500) == filter_type(stored_events, "type0")
assert filter_type(events, "type1") == filter_type(stored_events, "type1")

assert 0 == Redix.command!(redix, ["EXISTS", "#{@namespace}:#{@collection}:1"])
assert 0 == Redix.command!(redix, ["EXISTS", "#{@namespace}:#{@collection}:1:events"])
Expand All @@ -47,7 +51,7 @@ defmodule Brook.Storage.Redis.MigrationTest do
:erlang.term_to_binary(event, compressed: 9)
])

Migration.migrate(@instance, redix, @namespace)
Migration.migrate(@instance, redix, @namespace, %{})

assert {:ok, [%Brook.Event{author: author}]} = Redis.get_events(@instance, @collection, 3)
assert author == "migrated_default"
Expand All @@ -66,12 +70,16 @@ defmodule Brook.Storage.Redis.MigrationTest do
:erlang.term_to_binary(event, compressed: 9)
])

Migration.migrate(@instance, redix, @namespace)
Migration.migrate(@instance, redix, @namespace, %{})

assert {:ok, [%Brook.Event{create_ts: timestamp}]} = Redis.get_events(@instance, @collection, 4)
assert timestamp == 0
end

defp filter_type(events, type) do
Enum.filter(events, fn e -> e.type == type end)
end

defp start_redis(_context) do
registry_name = Brook.Config.registry(@instance)
{:ok, registry} = Registry.start_link(name: registry_name, keys: :unique)
Expand Down

0 comments on commit 4ae4bbc

Please sign in to comment.