Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix IPv6 support in Xandra.Cluster #328

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ different ports to test different features (such as authenticationn). To run
normal tests, do this from the root of the project:

```bash
docker-compose up --daemon
docker-compose up --detach
mix test
```

The `--daemon` flag runs the instances as daemons in the background. Give it a
The `--detach` flag runs the instances as daemons in the background. Give it a
minute between starting the services and running `mix test.all` since Cassandra
takes a while to start. You can check whether the Docker containers are ready
with `docker-compose ps`. To stop the services, run `docker-compose stop`.
Expand Down
59 changes: 57 additions & 2 deletions lib/xandra/cluster/control_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,10 @@ defmodule Xandra.Cluster.ControlConnection do
# https://user.cassandra.apache.narkive.com/APRtj5hb/system-peers-and-decommissioned-nodes.
peers =
for peer_attrs <- peers,
peer = %Host{queried_peer_to_host(peer_attrs) | port: autodiscovered_nodes_port},
peer = %Host{
queried_peer_to_host(peer_attrs)
| port: autodiscovered_nodes_port
},
not is_nil(peer.host_id),
do: peer

Expand Down Expand Up @@ -412,12 +415,64 @@ defmodule Xandra.Cluster.ControlConnection do
end
end

defp queried_peer_to_host(%{"peer" => _} = peer_attrs) do
defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs)
when is_tuple(rpc_address) do
{address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address")
peer_attrs = Map.delete(peer_attrs, "peer")
peer_attrs = Map.put(peer_attrs, "address", address)
queried_peer_to_host(peer_attrs)
end

defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs) do
{address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address")
peer_attrs = Map.delete(peer_attrs, "peer")

peer_attrs =
case :inet.parse_address(address) do
{:ok, valid_address_tuple} ->
Map.put(peer_attrs, "address", valid_address_tuple)

error ->
Logger.error(
"queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}"
)

# failed to parse, however, use what was returned in the table, see if
# node_validation will validate it
Map.put(peer_attrs, "address", address)
end

queried_peer_to_host(peer_attrs)
end

defp queried_peer_to_host(%{"peer" => peer} = peer_attrs)
when is_tuple(peer) do
{address, peer_attrs} = Map.pop!(peer_attrs, "peer")
peer_attrs = Map.put(peer_attrs, "address", address)
queried_peer_to_host(peer_attrs)
end

defp queried_peer_to_host(%{"peer" => _} = peer_attrs) do
{address, peer_attrs} = Map.pop!(peer_attrs, "peer")

peer_attrs =
case :inet.parse_address(address) do
{:ok, valid_address_tuple} ->
Map.put(peer_attrs, "address", valid_address_tuple)

error ->
Logger.error(
"queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}"
)

# failed to parse, however, use what was returned in the table, see if
# node_validation will validate it
Map.put(peer_attrs, "address", address)
end

queried_peer_to_host(peer_attrs)
end

defp queried_peer_to_host(%{} = peer_attrs) do
columns = [
"address",
Expand Down
40 changes: 19 additions & 21 deletions lib/xandra/cluster/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -152,25 +152,24 @@ defmodule Xandra.Cluster.Pool do
queue_before_connecting_opts = Keyword.fetch!(cluster_opts, :queue_before_connecting)
queue_before_connecting_timeout = Keyword.fetch!(queue_before_connecting_opts, :timeout)

data =
%__MODULE__{
pool_options: pool_opts,
contact_nodes: Keyword.fetch!(cluster_opts, :nodes),
load_balancing_module: lb_mod,
load_balancing_state: lb_mod.init(lb_opts),
autodiscovered_nodes_port: Keyword.fetch!(cluster_opts, :autodiscovered_nodes_port),
xandra_mod: Keyword.fetch!(cluster_opts, :xandra_module),
control_conn_mod: Keyword.fetch!(cluster_opts, :control_connection_module),
target_pools: Keyword.fetch!(cluster_opts, :target_pools),
name: Keyword.get(cluster_opts, :name),
pool_supervisor: pool_sup,
refresh_topology_interval: Keyword.fetch!(cluster_opts, :refresh_topology_interval),
reqs_before_connecting: %{
queue: :queue.new(),
max_size: Keyword.fetch!(queue_before_connecting_opts, :buffer_size)
},
sync_connect_ref: sync_connect_ref_or_nil && {parent, sync_connect_ref_or_nil}
}
data = %__MODULE__{
pool_options: pool_opts,
contact_nodes: Keyword.fetch!(cluster_opts, :nodes),
load_balancing_module: lb_mod,
load_balancing_state: lb_mod.init(lb_opts),
autodiscovered_nodes_port: Keyword.fetch!(cluster_opts, :autodiscovered_nodes_port),
xandra_mod: Keyword.fetch!(cluster_opts, :xandra_module),
control_conn_mod: Keyword.fetch!(cluster_opts, :control_connection_module),
target_pools: Keyword.fetch!(cluster_opts, :target_pools),
name: Keyword.get(cluster_opts, :name),
pool_supervisor: pool_sup,
refresh_topology_interval: Keyword.fetch!(cluster_opts, :refresh_topology_interval),
reqs_before_connecting: %{
queue: :queue.new(),
max_size: Keyword.fetch!(queue_before_connecting_opts, :buffer_size)
},
sync_connect_ref: sync_connect_ref_or_nil && {parent, sync_connect_ref_or_nil}
}

actions = [
{:next_event, :internal, :start_control_connection},
Expand Down Expand Up @@ -425,8 +424,7 @@ defmodule Xandra.Cluster.Pool do
end

defp handle_host_added(%__MODULE__{} = data, %Host{} = host) do
data =
update_in(data.load_balancing_state, &data.load_balancing_module.host_added(&1, host))
data = update_in(data.load_balancing_state, &data.load_balancing_module.host_added(&1, host))

data =
update_in(
Expand Down
22 changes: 22 additions & 0 deletions lib/xandra/options_validators.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,28 @@ defmodule Xandra.OptionsValidators do
end
end

def validate_node(%Xandra.Cluster.Host{address: address, port: port}) when is_tuple(address) do
case :inet.ntoa(address) do
{:error, :einval} ->
{:error,
"expected valid address tuple, got: address: #{inspect(address)} and port: #{inspect(port)}, with error: :einval"}

_valid_address ->
{:ok, {address, port}}
end
end

def validate_node(%Xandra.Cluster.Host{address: address, port: port}) when is_list(address) do
case :inet.parse_address(address) do
{:ok, valid_address} ->
{:ok, {valid_address, port}}

error ->
{:error,
"expected valid address char list, got: address: #{inspect(address)} and port: #{inspect(port)}, with error: #{inspect(error)}"}
end
end

def validate_node(other) do
{:error, "expected node to be a string or a {ip, port} tuple, got: #{inspect(other)}"}
end
Expand Down
3 changes: 1 addition & 2 deletions test/xandra/cluster/pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,12 @@
status: :up,
pool_pid: pool_pid,
host: %Host{address: {127, 0, 0, 1}, port: @port}
} =
get_state(pid).peers[Host.to_peername(host)]
} = get_state(pid).peers[Host.to_peername(host)]

assert is_pid(pool_pid)
end

test "multiple :discovered_hosts where hosts are removed",

Check failure on line 239 in test/xandra/cluster/pool_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.15.4, OTP 25.3, C* 4.1, Scylla 5.1.6, Native protocol v3)

test handling change events multiple :discovered_hosts where hosts are removed (Xandra.Cluster.PoolTest)

Check failure on line 239 in test/xandra/cluster/pool_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.15.4, OTP 25.3, C* 4.1, Scylla 5.1.6, Native protocol v4)

test handling change events multiple :discovered_hosts where hosts are removed (Xandra.Cluster.PoolTest)

Check failure on line 239 in test/xandra/cluster/pool_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.15.4, OTP 25.3, C* 3, Scylla 4.6.3, Native protocol v4)

test handling change events multiple :discovered_hosts where hosts are removed (Xandra.Cluster.PoolTest)
%{cluster_options: cluster_opts, pool_options: pool_opts} do
telemetry_ref =
:telemetry_test.attach_event_handlers(self(), [
Expand Down
3 changes: 1 addition & 2 deletions test/xandra/cluster_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ defmodule Xandra.ClusterTest do

cluster = TestHelper.start_link_supervised!({Xandra.Cluster, opts})

assert {:ok, %Xandra.Page{}} =
Xandra.Cluster.execute(cluster, "SELECT * FROM system.local")
assert {:ok, %Xandra.Page{}} = Xandra.Cluster.execute(cluster, "SELECT * FROM system.local")

assert {:ok, %Xandra.Page{}} =
Xandra.Cluster.execute(
Expand Down
Loading