Skip to content

Commit d32efe6

Browse files
committed
Release v1.0.0
1 parent 4f3067f commit d32efe6

File tree

6 files changed

+70
-43
lines changed

6 files changed

+70
-43
lines changed

.travis.yml

+1-4
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,12 @@ elixir:
44
sudo: false # to use faster container based build environment
55
notifications:
66
recipients:
7-
- jose.valim@plataformatec.com.br
7+
- jose.valim@dashbit.co
88
otp_release:
99
- 22.0
1010
script:
1111
- if [[ $(elixir --version) = *"1.6"* ]]; then mix format --check-formatted; fi
1212
- mix test
13-
after_script:
14-
- mix deps.get --only docs
15-
- MIX_ENV=docs mix inch.report
1613
matrix:
1714
include:
1815
- elixir: 1.5.1

CHANGELOG.md

+7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Changelog
22

3+
## v1.0.0 (2020-02-03)
4+
5+
### Enhancements
6+
7+
* Allow events to be discarded in PartitionDispatcher by returning `:none`
8+
* Raise for unknown partitionis in PartitionDispatcher
9+
310
## v0.14.3 (2019-10-28)
411

512
### Enhancements

lib/gen_stage/dispatchers/partition_dispatcher.ex

+26-29
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ defmodule GenStage.PartitionDispatcher do
2424
and the partition as second. The partition must be one of the partitions
2525
specified in `:partitions` above. The default uses
2626
`fn event -> {event, :erlang.phash2(event, Enum.count(partitions))} end`
27-
on the event to select the partition.
27+
on the event to select the partition. If it returns `:none`, the event
28+
is discarded.
2829
2930
### Examples
3031
@@ -239,30 +240,7 @@ defmodule GenStage.PartitionDispatcher do
239240

240241
@doc false
241242
def dispatch(events, _length, {tag, hash, waiting, pending, partitions, references, infos}) do
242-
{deliver_now, deliver_later, waiting} = split_events(events, waiting, [])
243-
244-
for event <- deliver_now do
245-
{event, partition} =
246-
case hash.(event) do
247-
{event, partition} ->
248-
{event, partition}
249-
250-
other ->
251-
raise "the :hash function should return {event, partition}, got: #{inspect(other)}"
252-
end
253-
254-
case :erlang.get(partition) do
255-
:undefined ->
256-
Logger.error(fn ->
257-
"Unknown partition #{inspect(partition)} computed for GenStage/Flow event " <>
258-
"#{inspect(event)}. The known partitions are #{inspect(Map.keys(partitions))}. " <>
259-
"See the :partitions option to set your own. This event has been discarded."
260-
end)
261-
262-
current ->
263-
Process.put(partition, [event | current])
264-
end
265-
end
243+
{deliver_later, waiting} = split_events(events, waiting, hash, partitions)
266244

267245
partitions =
268246
partitions
@@ -273,11 +251,30 @@ defmodule GenStage.PartitionDispatcher do
273251
{:ok, deliver_later, {tag, hash, waiting, pending, partitions, references, infos}}
274252
end
275253

276-
defp split_events(events, 0, acc), do: {:lists.reverse(acc), events, 0}
277-
defp split_events([], counter, acc), do: {:lists.reverse(acc), [], counter}
254+
defp split_events(events, 0, _hash, _partitions), do: {events, 0}
255+
defp split_events([], counter, _hash, _partitions), do: {[], counter}
256+
257+
defp split_events([event | events], counter, hash, partitions) do
258+
case hash.(event) do
259+
{event, partition} ->
260+
case :erlang.get(partition) do
261+
:undefined ->
262+
raise "unknown partition #{inspect(partition)} computed for GenStage event " <>
263+
"#{inspect(event)}. The known partitions are #{inspect(Map.keys(partitions))}. " <>
264+
"See the :partitions option to set your own. This event has been discarded."
265+
266+
current ->
267+
Process.put(partition, [event | current])
268+
split_events(events, counter - 1, hash, partitions)
269+
end
270+
271+
:none ->
272+
split_events(events, counter, hash, partitions)
278273

279-
defp split_events([event | events], counter, acc),
280-
do: split_events(events, counter - 1, [event | acc])
274+
other ->
275+
raise "the :hash function should return {event, partition}, got: #{inspect(other)}"
276+
end
277+
end
281278

282279
defp dispatch_per_partition([{partition, {pid, ref, demand_or_queue} = value} | rest]) do
283280
case Process.put(partition, []) do

mix.exs

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule GenStage.Mixfile do
22
use Mix.Project
33

4-
@version "0.14.3"
4+
@version "1.0.0"
55

66
def project do
77
[
@@ -26,8 +26,7 @@ defmodule GenStage.Mixfile do
2626

2727
defp deps do
2828
[
29-
{:ex_doc, "~> 0.12", only: :docs},
30-
{:inch_ex, ">= 0.4.0", only: :docs}
29+
{:ex_doc, "~> 0.12", only: :docs}
3130
]
3231
end
3332

mix.lock

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
%{
2-
"earmark": {:hex, :earmark, "1.4.2", "3aa0bd23bc4c61cf2f1e5d752d1bb470560a6f8539974f767a38923bb20e1d7f", [:mix], [], "hexpm"},
3-
"ex_doc": {:hex, :ex_doc, "0.21.2", "caca5bc28ed7b3bdc0b662f8afe2bee1eedb5c3cf7b322feeeb7c6ebbde089d6", [:mix], [{:earmark, "~> 1.3.3 or ~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"},
2+
"earmark": {:hex, :earmark, "1.4.3", "364ca2e9710f6bff494117dbbd53880d84bebb692dafc3a78eb50aa3183f2bfd", [:mix], [], "hexpm"},
3+
"ex_doc": {:hex, :ex_doc, "0.21.3", "857ec876b35a587c5d9148a2512e952e24c24345552259464b98bfbb883c7b42", [:mix], [{:earmark, "~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"},
44
"inch_ex": {:hex, :inch_ex, "0.5.5", "b63f57e281467bd3456461525fdbc9e158c8edbe603da6e3e4671befde796a3d", [:mix], [{:poison, "~> 1.5 or ~> 2.0 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"},
55
"makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"},
66
"makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"},
7-
"nimble_parsec": {:hex, :nimble_parsec, "0.5.1", "c90796ecee0289dbb5ad16d3ad06f957b0cd1199769641c961cfe0b97db190e0", [:mix], [], "hexpm"},
7+
"nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm"},
88
"poison": {:hex, :poison, "3.0.0", "625ebd64d33ae2e65201c2c14d6c85c27cc8b68f2d0dd37828fde9c6920dd131", [:mix], [], "hexpm"},
99
}

test/gen_stage/partition_dispatcher_test.exs

+31-4
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,33 @@ defmodule GenStage.PartitionDispatcherTest do
7676
assert_received {:"$gen_consumer", {_, ^ref}, [5, 7, 9, 11, 13]}
7777
end
7878

79+
test "subscribes, asks and dispatches to partitions or none" do
80+
pid = self()
81+
even_ref = make_ref()
82+
odd_ref = make_ref()
83+
84+
hash_fun = fn event ->
85+
cond do
86+
rem(event, 3) == 0 -> :none
87+
rem(event, 2) == 0 -> {event, :even}
88+
true -> {event, :odd}
89+
end
90+
end
91+
92+
disp = dispatcher(partitions: [:odd, :even], hash: hash_fun)
93+
94+
{:ok, 0, disp} = D.subscribe([partition: :even], {pid, even_ref}, disp)
95+
{:ok, 0, disp} = D.subscribe([partition: :odd], {pid, odd_ref}, disp)
96+
97+
{:ok, 4, disp} = D.ask(4, {pid, even_ref}, disp)
98+
{:ok, 4, disp} = D.ask(4, {pid, odd_ref}, disp)
99+
{:ok, [12], disp} = D.dispatch([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], 12, disp)
100+
101+
assert_received {:"$gen_consumer", {_, ^even_ref}, [2, 4, 8, 10]}
102+
assert_received {:"$gen_consumer", {_, ^odd_ref}, [1, 5, 7, 11]}
103+
assert {0, 0} = waiting_and_pending(disp)
104+
end
105+
79106
test "buffers events before subscription" do
80107
disp = dispatcher(partitions: 2)
81108

@@ -195,17 +222,17 @@ defmodule GenStage.PartitionDispatcherTest do
195222
assert_received :hello
196223
end
197224

198-
test "logs on unknown partition" do
225+
test "raises on unknown partition" do
199226
pid = self()
200227
ref = make_ref()
201228
disp = dispatcher(partitions: [:foo, :bar], hash: &{&1, :oops})
202229

203230
{:ok, 0, disp} = D.subscribe([partition: :foo], {pid, ref}, disp)
204231
{:ok, 3, disp} = D.ask(3, {pid, ref}, disp)
205232

206-
error = ExUnit.CaptureLog.capture_log(fn -> D.dispatch([1, 2, 5], 3, disp) end)
207-
assert error =~ "[error] Unknown partition :oops computed for GenStage/Flow event 1"
208-
assert error =~ "The known partitions are [:bar, :foo]"
233+
assert_raise RuntimeError, ~r/unknown partition :oops/, fn ->
234+
D.dispatch([1, 2, 5], 3, disp)
235+
end
209236
end
210237

211238
test "errors if the :hash function returns a bad value" do

0 commit comments

Comments
 (0)