From f5bb426cd8cf60bc42aa2ea066d38cbd8e75a79d Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Fri, 18 Aug 2023 10:05:49 -0500 Subject: [PATCH 01/21] use rpc_address --- CONTRIBUTING.md | 4 ++-- lib/xandra/cluster/control_connection.ex | 10 ++++++++-- lib/xandra/transport.ex | 1 + 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 74db442c..fb612d75 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -8,11 +8,11 @@ different ports to test different features (such as authenticationn). To run normal tests, do this from the root of the project: ```bash -docker-compose up --daemon +docker-compose up --detach mix test ``` -The `--daemon` flag runs the instances as daemons in the background. Give it a +The `--detach` flag runs the instances as daemons in the background. Give it a minute between starting the services and running `mix test.all` since Cassandra takes a while to start. You can check whether the Docker containers are ready with `docker-compose ps`. To stop the services, run `docker-compose stop`. diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index 7b16c24c..9b1a579c 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -412,12 +412,18 @@ defmodule Xandra.Cluster.ControlConnection do end end - defp queried_peer_to_host(%{"peer" => _} = peer_attrs) do - {address, peer_attrs} = Map.pop!(peer_attrs, "peer") + defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs) do + {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") peer_attrs = Map.put(peer_attrs, "address", address) queried_peer_to_host(peer_attrs) end + # defp queried_peer_to_host(%{"peer" => _} = peer_attrs) do + # {address, peer_attrs} = Map.pop!(peer_attrs, "peer") + # peer_attrs = Map.put(peer_attrs, "address", address) + # queried_peer_to_host(peer_attrs) + # end + defp queried_peer_to_host(%{} = peer_attrs) do columns = [ "address", diff --git a/lib/xandra/transport.ex b/lib/xandra/transport.ex index b8a858db..1dc2cece 100644 --- a/lib/xandra/transport.ex +++ b/lib/xandra/transport.ex @@ -43,6 +43,7 @@ defmodule Xandra.Transport do @spec address_and_port(t()) :: {:ok, {:inet.ip_address(), :inet.port_number()}} | {:error, error_reason} def address_and_port(%__MODULE__{socket: socket} = transport) when not is_nil(socket) do + IO.puts("inet_mod(transport.module).peername(socket) transport.module: #{inspect(transport.module)}, socket #{inspect(socket)}") inet_mod(transport.module).peername(socket) end From 72c2a3d0c340b0ba1ce0086b91c2c48907118357 Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Mon, 21 Aug 2023 11:34:25 -0500 Subject: [PATCH 02/21] debug validate_node --- lib/xandra/cluster/control_connection.ex | 16 ++++++++-------- lib/xandra/transport.ex | 2 ++ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index 9b1a579c..04b70975 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -412,18 +412,18 @@ defmodule Xandra.Cluster.ControlConnection do end end - defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs) do - {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") - peer_attrs = Map.put(peer_attrs, "address", address) - queried_peer_to_host(peer_attrs) - end - - # defp queried_peer_to_host(%{"peer" => _} = peer_attrs) do - # {address, peer_attrs} = Map.pop!(peer_attrs, "peer") + # defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs) do + # {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") # peer_attrs = Map.put(peer_attrs, "address", address) # queried_peer_to_host(peer_attrs) # end + defp queried_peer_to_host(%{"peer" => _} = peer_attrs) do + {address, peer_attrs} = Map.pop!(peer_attrs, "peer") + peer_attrs = Map.put(peer_attrs, "address", address) + queried_peer_to_host(peer_attrs) + end + defp queried_peer_to_host(%{} = peer_attrs) do columns = [ "address", diff --git a/lib/xandra/transport.ex b/lib/xandra/transport.ex index 1dc2cece..e7d8c5f5 100644 --- a/lib/xandra/transport.ex +++ b/lib/xandra/transport.ex @@ -44,6 +44,8 @@ defmodule Xandra.Transport do {:ok, {:inet.ip_address(), :inet.port_number()}} | {:error, error_reason} def address_and_port(%__MODULE__{socket: socket} = transport) when not is_nil(socket) do IO.puts("inet_mod(transport.module).peername(socket) transport.module: #{inspect(transport.module)}, socket #{inspect(socket)}") + IO.puts("\tinet_mod(transport.module).peername(socket): #{inspect(inet_mod(transport.module).peername(socket))}") + inet_mod(transport.module).peername(socket) end From 66de9b7df86f2bf2431b9cf15512aaa2e95a9f5f Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Mon, 21 Aug 2023 13:20:30 -0500 Subject: [PATCH 03/21] add option to configure use of rpc_address, node validation for ipv6 --- lib/xandra/cluster.ex | 10 +++++ lib/xandra/cluster/control_connection.ex | 55 +++++++++++++++++------- lib/xandra/cluster/pool.ex | 7 ++- lib/xandra/options_validators.ex | 22 ++++++++++ 4 files changed, 77 insertions(+), 17 deletions(-) diff --git a/lib/xandra/cluster.ex b/lib/xandra/cluster.ex index 6e2b7441..059bfd1d 100644 --- a/lib/xandra/cluster.ex +++ b/lib/xandra/cluster.ex @@ -345,6 +345,16 @@ defmodule Xandra.Cluster do *Available since v0.18.0*. """ ], + use_rpc_address_for_peer_address: [ + type: :boolean, + doc: """ + In the system.peers table use the `rpc_address` column for the + peer/Host address and not the `peer` column + """, + default: false, + required: false + ], + # Internal for testing, not exposed. xandra_module: [type: :atom, default: Xandra, doc: false], diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index 04b70975..d8f6192f 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -29,7 +29,8 @@ defmodule Xandra.Cluster.ControlConnection do contact_node: [type: {:tuple, [{:or, [:string, :any]}, :non_neg_integer]}, required: true], connection_options: [type: :keyword_list, required: true], autodiscovered_nodes_port: [type: :non_neg_integer, required: true], - refresh_topology_interval: [type: :timeout, required: true] + refresh_topology_interval: [type: :timeout, required: true], + use_rpc_address_for_peer_address: [type: :boolean, default: false, required: false] ] defstruct [ @@ -52,6 +53,10 @@ defmodule Xandra.Cluster.ControlConnection do # The interval at which to refresh the cluster topology. :refresh_topology_interval, + # In the system.peers table use the `rpc_address` column for the + # peer/Host address and not the `peer` column + :use_rpc_address_for_peer_address, + # The protocol module of the node we're connected to. :protocol_module, @@ -83,6 +88,7 @@ defmodule Xandra.Cluster.ControlConnection do contact_node: contact_node, autodiscovered_nodes_port: Keyword.fetch!(options, :autodiscovered_nodes_port), refresh_topology_interval: Keyword.fetch!(options, :refresh_topology_interval), + use_rpc_address_for_peer_address: Keyword.fetch!(options, :use_rpc_address_for_peer_address), connection_options: connection_opts, transport: transport } @@ -144,7 +150,8 @@ defmodule Xandra.Cluster.ControlConnection do state.autodiscovered_nodes_port, state.protocol_module, state.ip, - state.port + state.port, + state.use_rpc_address_for_peer_address ), :ok <- Transport.setopts(state.transport, active: :once) do state = refresh_topology(state, peers) @@ -213,7 +220,8 @@ defmodule Xandra.Cluster.ControlConnection do state.autodiscovered_nodes_port, state.protocol_module, state.ip, - state.port + state.port, + state.use_rpc_address_for_peer_address ), :ok <- register_to_events(state), :ok <- Transport.setopts(state.transport, active: :once) do @@ -351,7 +359,8 @@ defmodule Xandra.Cluster.ControlConnection do :inet.port_number(), module(), :inet.ip_address(), - :inet.port_number() + :inet.port_number(), + boolean() ) :: {:ok, [Host.t()]} | {:error, :closed | :inet.posix()} def fetch_cluster_topology( @@ -359,13 +368,14 @@ defmodule Xandra.Cluster.ControlConnection do autodiscovered_nodes_port, protocol_module, ip, - port + port, + use_rpc_address ) when is_integer(autodiscovered_nodes_port) and is_atom(protocol_module) do with {:ok, [local_node_info]} <- query(transport, protocol_module, @select_local_query), {:ok, peers} <- query(transport, protocol_module, @select_peers_query) do local_peer = %Host{ - queried_peer_to_host(local_node_info) + queried_peer_to_host(local_node_info, use_rpc_address) | address: ip, port: port } @@ -376,7 +386,7 @@ defmodule Xandra.Cluster.ControlConnection do # https://user.cassandra.apache.narkive.com/APRtj5hb/system-peers-and-decommissioned-nodes. peers = for peer_attrs <- peers, - peer = %Host{queried_peer_to_host(peer_attrs) | port: autodiscovered_nodes_port}, + peer = %Host{queried_peer_to_host(peer_attrs, use_rpc_address) | port: autodiscovered_nodes_port}, not is_nil(peer.host_id), do: peer @@ -412,19 +422,32 @@ defmodule Xandra.Cluster.ControlConnection do end end - # defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs) do - # {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") - # peer_attrs = Map.put(peer_attrs, "address", address) - # queried_peer_to_host(peer_attrs) - # end + defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, true = use_rpc_address) do + {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") + peer_attrs = Map.delete(peer_attrs, "peer") + peer_attrs = Map.put(peer_attrs, "address", address) + queried_peer_to_host(peer_attrs, use_rpc_address) + end - defp queried_peer_to_host(%{"peer" => _} = peer_attrs) do + defp queried_peer_to_host(%{"peer" => _} = peer_attrs, use_rpc_address) do {address, peer_attrs} = Map.pop!(peer_attrs, "peer") - peer_attrs = Map.put(peer_attrs, "address", address) - queried_peer_to_host(peer_attrs) + peer_attrs = Map.delete(peer_attrs, "rpc_address") + peer_attrs = + case :inet.parse_address(address) do + {:ok, valid_address_tuple} -> + Map.put(peer_attrs, "address", valid_address_tuple) + + error -> + Logger.error("queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}") + # failed to parse, however, use what was returned in the table, see if + # node_validation will pass on it + Map.put(peer_attrs, "address", address) + end + + queried_peer_to_host(peer_attrs, use_rpc_address) end - defp queried_peer_to_host(%{} = peer_attrs) do + defp queried_peer_to_host(%{} = peer_attrs, _) do columns = [ "address", "data_center", diff --git a/lib/xandra/cluster/pool.ex b/lib/xandra/cluster/pool.ex index aa94ccca..2fd2a4dc 100644 --- a/lib/xandra/cluster/pool.ex +++ b/lib/xandra/cluster/pool.ex @@ -107,6 +107,10 @@ defmodule Xandra.Cluster.Pool do # The name of the cluster (if present), only used for Telemetry events. :name, + # In the system.peers table use the `rpc_address` column for the + # peer/Host address and not the `peer` column + use_rpc_address_for_peer_address: false, + # A map of peername ({address, port}) to info about that peer. # Each info map is: # %{pool_pid: pid(), pool_ref: ref(), host: Host.t(), status: :up | :down | :connected} @@ -564,7 +568,8 @@ defmodule Xandra.Cluster.Pool do contact_node: {host.address, host.port}, connection_options: data.pool_options, autodiscovered_nodes_port: data.autodiscovered_nodes_port, - refresh_topology_interval: data.refresh_topology_interval + refresh_topology_interval: data.refresh_topology_interval, + use_rpc_address_for_peer_address: data.use_rpc_address_for_peer_address ] case data.control_conn_mod.start_link(control_conn_opts) do diff --git a/lib/xandra/options_validators.ex b/lib/xandra/options_validators.ex index 99d33b13..3ea866ab 100644 --- a/lib/xandra/options_validators.ex +++ b/lib/xandra/options_validators.ex @@ -62,6 +62,28 @@ defmodule Xandra.OptionsValidators do end end + def validate_node(%Xandra.Cluster.Host{address: address, port: port}) when is_tuple(address) do + case :inet.ntoa(address) do + {:error, :einval} -> + {:error, + "expected valid address tuple, got: address: #{inspect(address)} and port: #{inspect(port)}, with error: :einval"} + + _valid_address -> + {:ok, {address, port}} + end + end + + def validate_node(%Xandra.Cluster.Host{address: address, port: port}) when is_list(address) do + case :inet.parse_address(address) do + {:ok, valid_address} -> + {:ok, {valid_address, port}} + + error -> + {:error, + "expected valid address char list, got: address: #{inspect(address)} and port: #{inspect(port)}, with error: #{inspect(error)}"} + end + end + def validate_node(other) do {:error, "expected node to be a string or a {ip, port} tuple, got: #{inspect(other)}"} end From 8b7b5ffc44e52967ad01b9e85804039e625002db Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Mon, 21 Aug 2023 14:00:31 -0500 Subject: [PATCH 04/21] do not format to string, leave address as Host --- lib/xandra/cluster/control_connection.ex | 24 ++++++++++++++++++++++++ lib/xandra/cluster/pool.ex | 2 +- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index d8f6192f..e32e4d4a 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -422,9 +422,33 @@ defmodule Xandra.Cluster.ControlConnection do end end + defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, true = use_rpc_address) when is_tuple(rpc_address) do + {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") + peer_attrs = Map.delete(peer_attrs, "peer") + peer_attrs = Map.put(peer_attrs, "address", address) + queried_peer_to_host(peer_attrs, use_rpc_address) + end + defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, true = use_rpc_address) do {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") peer_attrs = Map.delete(peer_attrs, "peer") + peer_attrs = + case :inet.parse_address(address) do + {:ok, valid_address_tuple} -> + Map.put(peer_attrs, "address", valid_address_tuple) + + error -> + Logger.error("queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}") + # failed to parse, however, use what was returned in the table, see if + # node_validation will pass on it + Map.put(peer_attrs, "address", address) + end + queried_peer_to_host(peer_attrs, use_rpc_address) + end + + defp queried_peer_to_host(%{"peer" => peer} = peer_attrs, use_rpc_address) when is_tuple(peer) do + {address, peer_attrs} = Map.pop!(peer_attrs, "peer") + peer_attrs = Map.delete(peer_attrs, "rpc_address") peer_attrs = Map.put(peer_attrs, "address", address) queried_peer_to_host(peer_attrs, use_rpc_address) end diff --git a/lib/xandra/cluster/pool.ex b/lib/xandra/cluster/pool.ex index 2fd2a4dc..e471fca0 100644 --- a/lib/xandra/cluster/pool.ex +++ b/lib/xandra/cluster/pool.ex @@ -466,7 +466,7 @@ defmodule Xandra.Cluster.Pool do # peer, and it'll only start it once. defp start_pool(%__MODULE__{} = data, %Host{} = host) do conn_options = - Keyword.merge(data.pool_options, nodes: [Host.format_address(host)], cluster_pid: self()) + Keyword.merge(data.pool_options, nodes: [host], cluster_pid: self()) peername = Host.to_peername(host) From 744f0cf9a5d418ddcb905fe6db7814c72ba01886 Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Mon, 21 Aug 2023 14:51:07 -0500 Subject: [PATCH 05/21] put back call to format_address --- lib/xandra/cluster/pool.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/xandra/cluster/pool.ex b/lib/xandra/cluster/pool.ex index e471fca0..2fd2a4dc 100644 --- a/lib/xandra/cluster/pool.ex +++ b/lib/xandra/cluster/pool.ex @@ -466,7 +466,7 @@ defmodule Xandra.Cluster.Pool do # peer, and it'll only start it once. defp start_pool(%__MODULE__{} = data, %Host{} = host) do conn_options = - Keyword.merge(data.pool_options, nodes: [host], cluster_pid: self()) + Keyword.merge(data.pool_options, nodes: [Host.format_address(host)], cluster_pid: self()) peername = Host.to_peername(host) From d5dd7e3a83163df66849f57cd059d4feeae40f4b Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Mon, 21 Aug 2023 15:12:19 -0500 Subject: [PATCH 06/21] debugging --- lib/xandra/cluster/control_connection.ex | 75 ++++++++++++++++-------- lib/xandra/cluster/pool.ex | 1 + 2 files changed, 51 insertions(+), 25 deletions(-) diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index e32e4d4a..07d0fdaa 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -422,14 +422,14 @@ defmodule Xandra.Cluster.ControlConnection do end end - defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, true = use_rpc_address) when is_tuple(rpc_address) do + defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, use_rpc_address) when is_tuple(rpc_address) do {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") peer_attrs = Map.delete(peer_attrs, "peer") peer_attrs = Map.put(peer_attrs, "address", address) queried_peer_to_host(peer_attrs, use_rpc_address) end - defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, true = use_rpc_address) do + defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, use_rpc_address) do {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") peer_attrs = Map.delete(peer_attrs, "peer") peer_attrs = @@ -446,30 +446,55 @@ defmodule Xandra.Cluster.ControlConnection do queried_peer_to_host(peer_attrs, use_rpc_address) end - defp queried_peer_to_host(%{"peer" => peer} = peer_attrs, use_rpc_address) when is_tuple(peer) do - {address, peer_attrs} = Map.pop!(peer_attrs, "peer") - peer_attrs = Map.delete(peer_attrs, "rpc_address") - peer_attrs = Map.put(peer_attrs, "address", address) - queried_peer_to_host(peer_attrs, use_rpc_address) - end - defp queried_peer_to_host(%{"peer" => _} = peer_attrs, use_rpc_address) do - {address, peer_attrs} = Map.pop!(peer_attrs, "peer") - peer_attrs = Map.delete(peer_attrs, "rpc_address") - peer_attrs = - case :inet.parse_address(address) do - {:ok, valid_address_tuple} -> - Map.put(peer_attrs, "address", valid_address_tuple) - - error -> - Logger.error("queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}") - # failed to parse, however, use what was returned in the table, see if - # node_validation will pass on it - Map.put(peer_attrs, "address", address) - end - - queried_peer_to_host(peer_attrs, use_rpc_address) - end + # defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, true = use_rpc_address) when is_tuple(rpc_address) do + # {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") + # peer_attrs = Map.delete(peer_attrs, "peer") + # peer_attrs = Map.put(peer_attrs, "address", address) + # queried_peer_to_host(peer_attrs, use_rpc_address) + # end + + # defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, true = use_rpc_address) do + # {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") + # peer_attrs = Map.delete(peer_attrs, "peer") + # peer_attrs = + # case :inet.parse_address(address) do + # {:ok, valid_address_tuple} -> + # Map.put(peer_attrs, "address", valid_address_tuple) + + # error -> + # Logger.error("queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}") + # # failed to parse, however, use what was returned in the table, see if + # # node_validation will pass on it + # Map.put(peer_attrs, "address", address) + # end + # queried_peer_to_host(peer_attrs, use_rpc_address) + # end + + # defp queried_peer_to_host(%{"peer" => peer} = peer_attrs, use_rpc_address) when is_tuple(peer) do + # {address, peer_attrs} = Map.pop!(peer_attrs, "peer") + # peer_attrs = Map.delete(peer_attrs, "rpc_address") + # peer_attrs = Map.put(peer_attrs, "address", address) + # queried_peer_to_host(peer_attrs, use_rpc_address) + # end + + # defp queried_peer_to_host(%{"peer" => _} = peer_attrs, use_rpc_address) do + # {address, peer_attrs} = Map.pop!(peer_attrs, "peer") + # peer_attrs = Map.delete(peer_attrs, "rpc_address") + # peer_attrs = + # case :inet.parse_address(address) do + # {:ok, valid_address_tuple} -> + # Map.put(peer_attrs, "address", valid_address_tuple) + + # error -> + # Logger.error("queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}") + # # failed to parse, however, use what was returned in the table, see if + # # node_validation will pass on it + # Map.put(peer_attrs, "address", address) + # end + + # queried_peer_to_host(peer_attrs, use_rpc_address) + # end defp queried_peer_to_host(%{} = peer_attrs, _) do columns = [ diff --git a/lib/xandra/cluster/pool.ex b/lib/xandra/cluster/pool.ex index 2fd2a4dc..1700aa82 100644 --- a/lib/xandra/cluster/pool.ex +++ b/lib/xandra/cluster/pool.ex @@ -467,6 +467,7 @@ defmodule Xandra.Cluster.Pool do defp start_pool(%__MODULE__{} = data, %Host{} = host) do conn_options = Keyword.merge(data.pool_options, nodes: [Host.format_address(host)], cluster_pid: self()) + # Keyword.merge(data.pool_options, nodes: [host], cluster_pid: self()) peername = Host.to_peername(host) From 02ffeb61b653168ca29271b393381da50dfd8c0e Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Mon, 21 Aug 2023 15:17:26 -0500 Subject: [PATCH 07/21] debug --- lib/xandra/cluster/host.ex | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/xandra/cluster/host.ex b/lib/xandra/cluster/host.ex index e94cc015..a2bc1fc5 100644 --- a/lib/xandra/cluster/host.ex +++ b/lib/xandra/cluster/host.ex @@ -67,6 +67,7 @@ defmodule Xandra.Cluster.Host do String.t() def format_peername({address, port}) do if ip_address?(address) do + IO.puts("DEBUG -- format_peername - input #{inspect({address, port})} return #{:inet.ntoa(address)}:#{port}}") "#{:inet.ntoa(address)}:#{port}" else "#{address}:#{port}" From eb2eb9c9f1f9ff84bc821c5242733d9b2e33e23d Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Mon, 21 Aug 2023 16:15:17 -0500 Subject: [PATCH 08/21] missed some spots for passing through use_rpc_address_for_peer_address --- lib/xandra/cluster/control_connection.ex | 2 +- lib/xandra/cluster/pool.ex | 3 ++- test/xandra/cluster/pool_test.exs | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index 07d0fdaa..7f94a67b 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -30,7 +30,7 @@ defmodule Xandra.Cluster.ControlConnection do connection_options: [type: :keyword_list, required: true], autodiscovered_nodes_port: [type: :non_neg_integer, required: true], refresh_topology_interval: [type: :timeout, required: true], - use_rpc_address_for_peer_address: [type: :boolean, default: false, required: false] + use_rpc_address_for_peer_address: [type: :boolean, required: true] ] defstruct [ diff --git a/lib/xandra/cluster/pool.ex b/lib/xandra/cluster/pool.ex index 1700aa82..bf355f87 100644 --- a/lib/xandra/cluster/pool.ex +++ b/lib/xandra/cluster/pool.ex @@ -173,7 +173,8 @@ defmodule Xandra.Cluster.Pool do queue: :queue.new(), max_size: Keyword.fetch!(queue_before_connecting_opts, :buffer_size) }, - sync_connect_ref: sync_connect_ref_or_nil && {parent, sync_connect_ref_or_nil} + sync_connect_ref: sync_connect_ref_or_nil && {parent, sync_connect_ref_or_nil}, + use_rpc_address_for_peer_address: Keyword.fetch!(cluster_opts, :use_rpc_address_for_peer_address) } actions = [ diff --git a/test/xandra/cluster/pool_test.exs b/test/xandra/cluster/pool_test.exs index 3db7bf15..6fb60473 100644 --- a/test/xandra/cluster/pool_test.exs +++ b/test/xandra/cluster/pool_test.exs @@ -26,7 +26,8 @@ defmodule Xandra.Cluster.PoolTest do queue_before_connecting: [ buffer_size: 100, timeout: 5000 - ] + ], + use_rpc_address_for_peer_address: true ] pool_opts = From 65df2175677e933f3f75d78813e6f91096a524cb Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Mon, 21 Aug 2023 16:28:04 -0500 Subject: [PATCH 09/21] use new queried_peer_to_host with use_rpc_address --- lib/xandra/cluster/control_connection.ex | 96 ++++++++++++------------ 1 file changed, 48 insertions(+), 48 deletions(-) diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index 7f94a67b..6cd4f7fa 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -422,39 +422,14 @@ defmodule Xandra.Cluster.ControlConnection do end end - defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, use_rpc_address) when is_tuple(rpc_address) do - {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") - peer_attrs = Map.delete(peer_attrs, "peer") - peer_attrs = Map.put(peer_attrs, "address", address) - queried_peer_to_host(peer_attrs, use_rpc_address) - end - - defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, use_rpc_address) do - {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") - peer_attrs = Map.delete(peer_attrs, "peer") - peer_attrs = - case :inet.parse_address(address) do - {:ok, valid_address_tuple} -> - Map.put(peer_attrs, "address", valid_address_tuple) - - error -> - Logger.error("queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}") - # failed to parse, however, use what was returned in the table, see if - # node_validation will pass on it - Map.put(peer_attrs, "address", address) - end - queried_peer_to_host(peer_attrs, use_rpc_address) - end - - - # defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, true = use_rpc_address) when is_tuple(rpc_address) do + # defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, use_rpc_address) when is_tuple(rpc_address) do # {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") # peer_attrs = Map.delete(peer_attrs, "peer") # peer_attrs = Map.put(peer_attrs, "address", address) # queried_peer_to_host(peer_attrs, use_rpc_address) # end - # defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, true = use_rpc_address) do + # defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, use_rpc_address) do # {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") # peer_attrs = Map.delete(peer_attrs, "peer") # peer_attrs = @@ -471,30 +446,55 @@ defmodule Xandra.Cluster.ControlConnection do # queried_peer_to_host(peer_attrs, use_rpc_address) # end - # defp queried_peer_to_host(%{"peer" => peer} = peer_attrs, use_rpc_address) when is_tuple(peer) do - # {address, peer_attrs} = Map.pop!(peer_attrs, "peer") - # peer_attrs = Map.delete(peer_attrs, "rpc_address") - # peer_attrs = Map.put(peer_attrs, "address", address) - # queried_peer_to_host(peer_attrs, use_rpc_address) - # end - # defp queried_peer_to_host(%{"peer" => _} = peer_attrs, use_rpc_address) do - # {address, peer_attrs} = Map.pop!(peer_attrs, "peer") - # peer_attrs = Map.delete(peer_attrs, "rpc_address") - # peer_attrs = - # case :inet.parse_address(address) do - # {:ok, valid_address_tuple} -> - # Map.put(peer_attrs, "address", valid_address_tuple) + defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, true = use_rpc_address) when is_tuple(rpc_address) do + {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") + peer_attrs = Map.delete(peer_attrs, "peer") + peer_attrs = Map.put(peer_attrs, "address", address) + queried_peer_to_host(peer_attrs, use_rpc_address) + end - # error -> - # Logger.error("queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}") - # # failed to parse, however, use what was returned in the table, see if - # # node_validation will pass on it - # Map.put(peer_attrs, "address", address) - # end + defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, true = use_rpc_address) do + {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") + peer_attrs = Map.delete(peer_attrs, "peer") + peer_attrs = + case :inet.parse_address(address) do + {:ok, valid_address_tuple} -> + Map.put(peer_attrs, "address", valid_address_tuple) - # queried_peer_to_host(peer_attrs, use_rpc_address) - # end + error -> + Logger.error("queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}") + # failed to parse, however, use what was returned in the table, see if + # node_validation will pass on it + Map.put(peer_attrs, "address", address) + end + queried_peer_to_host(peer_attrs, use_rpc_address) + end + + defp queried_peer_to_host(%{"peer" => peer} = peer_attrs, use_rpc_address) when is_tuple(peer) do + {address, peer_attrs} = Map.pop!(peer_attrs, "peer") + peer_attrs = Map.delete(peer_attrs, "rpc_address") + peer_attrs = Map.put(peer_attrs, "address", address) + queried_peer_to_host(peer_attrs, use_rpc_address) + end + + defp queried_peer_to_host(%{"peer" => _} = peer_attrs, use_rpc_address) do + {address, peer_attrs} = Map.pop!(peer_attrs, "peer") + peer_attrs = Map.delete(peer_attrs, "rpc_address") + peer_attrs = + case :inet.parse_address(address) do + {:ok, valid_address_tuple} -> + Map.put(peer_attrs, "address", valid_address_tuple) + + error -> + Logger.error("queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}") + # failed to parse, however, use what was returned in the table, see if + # node_validation will pass on it + Map.put(peer_attrs, "address", address) + end + + queried_peer_to_host(peer_attrs, use_rpc_address) + end defp queried_peer_to_host(%{} = peer_attrs, _) do columns = [ From 403abda8939c42635fae56eb3b2d2db915bf8b72 Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Mon, 21 Aug 2023 16:42:15 -0500 Subject: [PATCH 10/21] remove commented out functions --- lib/xandra/cluster/control_connection.ex | 25 ------------------------ 1 file changed, 25 deletions(-) diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index 6cd4f7fa..2a5a234b 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -422,31 +422,6 @@ defmodule Xandra.Cluster.ControlConnection do end end - # defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, use_rpc_address) when is_tuple(rpc_address) do - # {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") - # peer_attrs = Map.delete(peer_attrs, "peer") - # peer_attrs = Map.put(peer_attrs, "address", address) - # queried_peer_to_host(peer_attrs, use_rpc_address) - # end - - # defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, use_rpc_address) do - # {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") - # peer_attrs = Map.delete(peer_attrs, "peer") - # peer_attrs = - # case :inet.parse_address(address) do - # {:ok, valid_address_tuple} -> - # Map.put(peer_attrs, "address", valid_address_tuple) - - # error -> - # Logger.error("queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}") - # # failed to parse, however, use what was returned in the table, see if - # # node_validation will pass on it - # Map.put(peer_attrs, "address", address) - # end - # queried_peer_to_host(peer_attrs, use_rpc_address) - # end - - defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, true = use_rpc_address) when is_tuple(rpc_address) do {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") peer_attrs = Map.delete(peer_attrs, "peer") From 0cbddafd23060ab2f15795b6243ddaa8a59e8006 Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Tue, 22 Aug 2023 10:32:19 -0500 Subject: [PATCH 11/21] remove debug logs --- lib/xandra/cluster/host.ex | 1 - lib/xandra/cluster/pool.ex | 1 - lib/xandra/transport.ex | 3 --- 3 files changed, 5 deletions(-) diff --git a/lib/xandra/cluster/host.ex b/lib/xandra/cluster/host.ex index a2bc1fc5..e94cc015 100644 --- a/lib/xandra/cluster/host.ex +++ b/lib/xandra/cluster/host.ex @@ -67,7 +67,6 @@ defmodule Xandra.Cluster.Host do String.t() def format_peername({address, port}) do if ip_address?(address) do - IO.puts("DEBUG -- format_peername - input #{inspect({address, port})} return #{:inet.ntoa(address)}:#{port}}") "#{:inet.ntoa(address)}:#{port}" else "#{address}:#{port}" diff --git a/lib/xandra/cluster/pool.ex b/lib/xandra/cluster/pool.ex index bf355f87..375d87a3 100644 --- a/lib/xandra/cluster/pool.ex +++ b/lib/xandra/cluster/pool.ex @@ -468,7 +468,6 @@ defmodule Xandra.Cluster.Pool do defp start_pool(%__MODULE__{} = data, %Host{} = host) do conn_options = Keyword.merge(data.pool_options, nodes: [Host.format_address(host)], cluster_pid: self()) - # Keyword.merge(data.pool_options, nodes: [host], cluster_pid: self()) peername = Host.to_peername(host) diff --git a/lib/xandra/transport.ex b/lib/xandra/transport.ex index e7d8c5f5..b8a858db 100644 --- a/lib/xandra/transport.ex +++ b/lib/xandra/transport.ex @@ -43,9 +43,6 @@ defmodule Xandra.Transport do @spec address_and_port(t()) :: {:ok, {:inet.ip_address(), :inet.port_number()}} | {:error, error_reason} def address_and_port(%__MODULE__{socket: socket} = transport) when not is_nil(socket) do - IO.puts("inet_mod(transport.module).peername(socket) transport.module: #{inspect(transport.module)}, socket #{inspect(socket)}") - IO.puts("\tinet_mod(transport.module).peername(socket): #{inspect(inet_mod(transport.module).peername(socket))}") - inet_mod(transport.module).peername(socket) end From bb3c92511386a87d399d36a41bd6c1cefb7adb0e Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Tue, 22 Aug 2023 10:34:42 -0500 Subject: [PATCH 12/21] fix tests --- test/xandra/cluster/control_connection_test.exs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/xandra/cluster/control_connection_test.exs b/test/xandra/cluster/control_connection_test.exs index 26d680fd..3a6e27f5 100644 --- a/test/xandra/cluster/control_connection_test.exs +++ b/test/xandra/cluster/control_connection_test.exs @@ -29,7 +29,8 @@ defmodule Xandra.Cluster.ControlConnectionTest do refresh_topology_interval: 60_000, autodiscovered_nodes_port: @port, connection_options: [protocol_version: @protocol_version], - contact_node: {~c"127.0.0.1", @port} + contact_node: {~c"127.0.0.1", @port}, + use_rpc_address_for_peer_address: false ] %{mirror_ref: mirror_ref, mirror: mirror, start_options: start_options} From da011a93100bba30b216a3180976049ec3156bdb Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Tue, 22 Aug 2023 10:41:54 -0500 Subject: [PATCH 13/21] formatter --- lib/xandra/cluster.ex | 1 - lib/xandra/cluster/control_connection.ex | 63 +++++++++++++++--------- lib/xandra/cluster/pool.ex | 43 ++++++++-------- lib/xandra/retry_strategy.ex | 2 +- test/support/test_helper.ex | 2 +- test/xandra/cluster/pool_test.exs | 3 +- test/xandra/cluster_test.exs | 3 +- 7 files changed, 64 insertions(+), 53 deletions(-) diff --git a/lib/xandra/cluster.ex b/lib/xandra/cluster.ex index 059bfd1d..ea5bd4e2 100644 --- a/lib/xandra/cluster.ex +++ b/lib/xandra/cluster.ex @@ -355,7 +355,6 @@ defmodule Xandra.Cluster do required: false ], - # Internal for testing, not exposed. xandra_module: [type: :atom, default: Xandra, doc: false], control_connection_module: [type: :atom, default: ControlConnection, doc: false], diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index 2a5a234b..0fbfd895 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -88,7 +88,8 @@ defmodule Xandra.Cluster.ControlConnection do contact_node: contact_node, autodiscovered_nodes_port: Keyword.fetch!(options, :autodiscovered_nodes_port), refresh_topology_interval: Keyword.fetch!(options, :refresh_topology_interval), - use_rpc_address_for_peer_address: Keyword.fetch!(options, :use_rpc_address_for_peer_address), + use_rpc_address_for_peer_address: + Keyword.fetch!(options, :use_rpc_address_for_peer_address), connection_options: connection_opts, transport: transport } @@ -386,7 +387,10 @@ defmodule Xandra.Cluster.ControlConnection do # https://user.cassandra.apache.narkive.com/APRtj5hb/system-peers-and-decommissioned-nodes. peers = for peer_attrs <- peers, - peer = %Host{queried_peer_to_host(peer_attrs, use_rpc_address) | port: autodiscovered_nodes_port}, + peer = %Host{ + queried_peer_to_host(peer_attrs, use_rpc_address) + | port: autodiscovered_nodes_port + }, not is_nil(peer.host_id), do: peer @@ -422,7 +426,8 @@ defmodule Xandra.Cluster.ControlConnection do end end - defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, true = use_rpc_address) when is_tuple(rpc_address) do + defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, true = use_rpc_address) + when is_tuple(rpc_address) do {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") peer_attrs = Map.delete(peer_attrs, "peer") peer_attrs = Map.put(peer_attrs, "address", address) @@ -432,21 +437,27 @@ defmodule Xandra.Cluster.ControlConnection do defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, true = use_rpc_address) do {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") peer_attrs = Map.delete(peer_attrs, "peer") + peer_attrs = - case :inet.parse_address(address) do - {:ok, valid_address_tuple} -> - Map.put(peer_attrs, "address", valid_address_tuple) - - error -> - Logger.error("queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}") - # failed to parse, however, use what was returned in the table, see if - # node_validation will pass on it - Map.put(peer_attrs, "address", address) - end + case :inet.parse_address(address) do + {:ok, valid_address_tuple} -> + Map.put(peer_attrs, "address", valid_address_tuple) + + error -> + Logger.error( + "queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}" + ) + + # failed to parse, however, use what was returned in the table, see if + # node_validation will validate it + Map.put(peer_attrs, "address", address) + end + queried_peer_to_host(peer_attrs, use_rpc_address) end - defp queried_peer_to_host(%{"peer" => peer} = peer_attrs, use_rpc_address) when is_tuple(peer) do + defp queried_peer_to_host(%{"peer" => peer} = peer_attrs, use_rpc_address) + when is_tuple(peer) do {address, peer_attrs} = Map.pop!(peer_attrs, "peer") peer_attrs = Map.delete(peer_attrs, "rpc_address") peer_attrs = Map.put(peer_attrs, "address", address) @@ -456,17 +467,21 @@ defmodule Xandra.Cluster.ControlConnection do defp queried_peer_to_host(%{"peer" => _} = peer_attrs, use_rpc_address) do {address, peer_attrs} = Map.pop!(peer_attrs, "peer") peer_attrs = Map.delete(peer_attrs, "rpc_address") + peer_attrs = - case :inet.parse_address(address) do - {:ok, valid_address_tuple} -> - Map.put(peer_attrs, "address", valid_address_tuple) - - error -> - Logger.error("queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}") - # failed to parse, however, use what was returned in the table, see if - # node_validation will pass on it - Map.put(peer_attrs, "address", address) - end + case :inet.parse_address(address) do + {:ok, valid_address_tuple} -> + Map.put(peer_attrs, "address", valid_address_tuple) + + error -> + Logger.error( + "queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}" + ) + + # failed to parse, however, use what was returned in the table, see if + # node_validation will validate it + Map.put(peer_attrs, "address", address) + end queried_peer_to_host(peer_attrs, use_rpc_address) end diff --git a/lib/xandra/cluster/pool.ex b/lib/xandra/cluster/pool.ex index 375d87a3..95bfe21d 100644 --- a/lib/xandra/cluster/pool.ex +++ b/lib/xandra/cluster/pool.ex @@ -156,26 +156,26 @@ defmodule Xandra.Cluster.Pool do queue_before_connecting_opts = Keyword.fetch!(cluster_opts, :queue_before_connecting) queue_before_connecting_timeout = Keyword.fetch!(queue_before_connecting_opts, :timeout) - data = - %__MODULE__{ - pool_options: pool_opts, - contact_nodes: Keyword.fetch!(cluster_opts, :nodes), - load_balancing_module: lb_mod, - load_balancing_state: lb_mod.init(lb_opts), - autodiscovered_nodes_port: Keyword.fetch!(cluster_opts, :autodiscovered_nodes_port), - xandra_mod: Keyword.fetch!(cluster_opts, :xandra_module), - control_conn_mod: Keyword.fetch!(cluster_opts, :control_connection_module), - target_pools: Keyword.fetch!(cluster_opts, :target_pools), - name: Keyword.get(cluster_opts, :name), - pool_supervisor: pool_sup, - refresh_topology_interval: Keyword.fetch!(cluster_opts, :refresh_topology_interval), - reqs_before_connecting: %{ - queue: :queue.new(), - max_size: Keyword.fetch!(queue_before_connecting_opts, :buffer_size) - }, - sync_connect_ref: sync_connect_ref_or_nil && {parent, sync_connect_ref_or_nil}, - use_rpc_address_for_peer_address: Keyword.fetch!(cluster_opts, :use_rpc_address_for_peer_address) - } + data = %__MODULE__{ + pool_options: pool_opts, + contact_nodes: Keyword.fetch!(cluster_opts, :nodes), + load_balancing_module: lb_mod, + load_balancing_state: lb_mod.init(lb_opts), + autodiscovered_nodes_port: Keyword.fetch!(cluster_opts, :autodiscovered_nodes_port), + xandra_mod: Keyword.fetch!(cluster_opts, :xandra_module), + control_conn_mod: Keyword.fetch!(cluster_opts, :control_connection_module), + target_pools: Keyword.fetch!(cluster_opts, :target_pools), + name: Keyword.get(cluster_opts, :name), + pool_supervisor: pool_sup, + refresh_topology_interval: Keyword.fetch!(cluster_opts, :refresh_topology_interval), + reqs_before_connecting: %{ + queue: :queue.new(), + max_size: Keyword.fetch!(queue_before_connecting_opts, :buffer_size) + }, + sync_connect_ref: sync_connect_ref_or_nil && {parent, sync_connect_ref_or_nil}, + use_rpc_address_for_peer_address: + Keyword.fetch!(cluster_opts, :use_rpc_address_for_peer_address) + } actions = [ {:next_event, :internal, :start_control_connection}, @@ -430,8 +430,7 @@ defmodule Xandra.Cluster.Pool do end defp handle_host_added(%__MODULE__{} = data, %Host{} = host) do - data = - update_in(data.load_balancing_state, &data.load_balancing_module.host_added(&1, host)) + data = update_in(data.load_balancing_state, &data.load_balancing_module.host_added(&1, host)) data = update_in( diff --git a/lib/xandra/retry_strategy.ex b/lib/xandra/retry_strategy.ex index f7f17a51..5c468a4f 100644 --- a/lib/xandra/retry_strategy.ex +++ b/lib/xandra/retry_strategy.ex @@ -115,7 +115,7 @@ defmodule Xandra.RetryStrategy do :error | {:retry, new_options :: keyword, new_state :: state} @doc false - @spec run_with_retrying(keyword, (-> result)) :: result when result: var + @spec run_with_retrying(keyword, (() -> result)) :: result when result: var def run_with_retrying(options, fun) do case Keyword.pop(options, :retry_strategy) do {nil, _options} -> fun.() diff --git a/test/support/test_helper.ex b/test/support/test_helper.ex index c0ede81b..37be8cd7 100644 --- a/test/support/test_helper.ex +++ b/test/support/test_helper.ex @@ -34,7 +34,7 @@ defmodule Xandra.TestHelper do end end - @spec wait_for_passing(pos_integer, (-> result)) :: result when result: var + @spec wait_for_passing(pos_integer, (() -> result)) :: result when result: var def wait_for_passing(time_left, fun) def wait_for_passing(time_left, fun) when time_left < 0, do: fun.() diff --git a/test/xandra/cluster/pool_test.exs b/test/xandra/cluster/pool_test.exs index 6fb60473..06865c4b 100644 --- a/test/xandra/cluster/pool_test.exs +++ b/test/xandra/cluster/pool_test.exs @@ -232,8 +232,7 @@ defmodule Xandra.Cluster.PoolTest do status: :up, pool_pid: pool_pid, host: %Host{address: {127, 0, 0, 1}, port: @port} - } = - get_state(pid).peers[Host.to_peername(host)] + } = get_state(pid).peers[Host.to_peername(host)] assert is_pid(pool_pid) end diff --git a/test/xandra/cluster_test.exs b/test/xandra/cluster_test.exs index 73401486..4e873986 100644 --- a/test/xandra/cluster_test.exs +++ b/test/xandra/cluster_test.exs @@ -156,8 +156,7 @@ defmodule Xandra.ClusterTest do cluster = TestHelper.start_link_supervised!({Xandra.Cluster, opts}) - assert {:ok, %Xandra.Page{}} = - Xandra.Cluster.execute(cluster, "SELECT * FROM system.local") + assert {:ok, %Xandra.Page{}} = Xandra.Cluster.execute(cluster, "SELECT * FROM system.local") assert {:ok, %Xandra.Page{}} = Xandra.Cluster.execute( From c563d737b99bcdec195a38408db490fec046a5e8 Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Tue, 22 Aug 2023 10:44:11 -0500 Subject: [PATCH 14/21] formatter --- lib/xandra/retry_strategy.ex | 2 +- test/support/test_helper.ex | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/xandra/retry_strategy.ex b/lib/xandra/retry_strategy.ex index 5c468a4f..f7f17a51 100644 --- a/lib/xandra/retry_strategy.ex +++ b/lib/xandra/retry_strategy.ex @@ -115,7 +115,7 @@ defmodule Xandra.RetryStrategy do :error | {:retry, new_options :: keyword, new_state :: state} @doc false - @spec run_with_retrying(keyword, (() -> result)) :: result when result: var + @spec run_with_retrying(keyword, (-> result)) :: result when result: var def run_with_retrying(options, fun) do case Keyword.pop(options, :retry_strategy) do {nil, _options} -> fun.() diff --git a/test/support/test_helper.ex b/test/support/test_helper.ex index 37be8cd7..c0ede81b 100644 --- a/test/support/test_helper.ex +++ b/test/support/test_helper.ex @@ -34,7 +34,7 @@ defmodule Xandra.TestHelper do end end - @spec wait_for_passing(pos_integer, (() -> result)) :: result when result: var + @spec wait_for_passing(pos_integer, (-> result)) :: result when result: var def wait_for_passing(time_left, fun) def wait_for_passing(time_left, fun) when time_left < 0, do: fun.() From ce0e7f83c4d67b4328a28b48362036446cd73240 Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Thu, 31 Aug 2023 16:34:00 -0500 Subject: [PATCH 15/21] additional test to check both true and false for use_rpc_address_for_peer_address configuration --- test/xandra/cluster/pool_test.exs | 88 ++++++++++++++++--------------- 1 file changed, 46 insertions(+), 42 deletions(-) diff --git a/test/xandra/cluster/pool_test.exs b/test/xandra/cluster/pool_test.exs index 06865c4b..0a3bd0c9 100644 --- a/test/xandra/cluster/pool_test.exs +++ b/test/xandra/cluster/pool_test.exs @@ -91,48 +91,52 @@ defmodule Xandra.Cluster.PoolTest do }} end end - - test "establishes a control connection and a pool", - %{cluster_options: cluster_options, pool_options: pool_options} do - telemetry_ref = - :telemetry_test.attach_event_handlers(self(), [ - [:xandra, :cluster, :control_connection, :connected], - [:xandra, :cluster, :pool, :started], - [:xandra, :cluster, :change_event], - [:xandra, :connected] - ]) - - assert {:ok, pid} = start_supervised(spec(cluster_options, pool_options)) - - assert %{ - cluster_pid: ^pid, - host: %Host{address: {127, 0, 0, 1}, port: @port} - } = assert_telemetry(telemetry_ref, [:control_connection, :connected]) - - assert %{ - cluster_pid: ^pid, - host: %Host{address: {127, 0, 0, 1}, port: @port} - } = assert_telemetry(telemetry_ref, [:pool, :started]) - - assert %{ - event_type: :host_added, - cluster_pid: ^pid, - host: %Host{address: {127, 0, 0, 1}, port: @port} - } = assert_telemetry(telemetry_ref, [:change_event]) - - assert_receive {[:xandra, :connected], ^telemetry_ref, %{}, - %{address: ~c"127.0.0.1", port: @port}} - - cluster_state = get_state(pid) - assert map_size(cluster_state.peers) == 1 - - assert %{status: status, pool_pid: pool_pid, host: host} = - cluster_state.peers[{{127, 0, 0, 1}, @port}] - - # Let's avoid race conditions... - assert status in [:up, :connected] - assert is_pid(pool_pid) - assert %Host{address: {127, 0, 0, 1}, port: @port, data_center: "datacenter1"} = host + for use_rpc_address <- [true, false] do + test "establishes a control connection and a pool when use_rpc_address_for_peer_address is #{use_rpc_address}", + %{cluster_options: cluster_options, pool_options: pool_options} do + telemetry_ref = + :telemetry_test.attach_event_handlers(self(), [ + [:xandra, :cluster, :control_connection, :connected], + [:xandra, :cluster, :pool, :started], + [:xandra, :cluster, :change_event], + [:xandra, :connected] + ]) + + use_rpc_address = unquote(use_rpc_address) + cluster_options = Keyword.merge(cluster_options, use_rpc_address_for_peer_address: use_rpc_address) + + assert {:ok, pid} = start_supervised(spec(cluster_options, pool_options)) + + assert %{ + cluster_pid: ^pid, + host: %Host{address: {127, 0, 0, 1}, port: @port} + } = assert_telemetry(telemetry_ref, [:control_connection, :connected]) + + assert %{ + cluster_pid: ^pid, + host: %Host{address: {127, 0, 0, 1}, port: @port} + } = assert_telemetry(telemetry_ref, [:pool, :started]) + + assert %{ + event_type: :host_added, + cluster_pid: ^pid, + host: %Host{address: {127, 0, 0, 1}, port: @port} + } = assert_telemetry(telemetry_ref, [:change_event]) + + assert_receive {[:xandra, :connected], ^telemetry_ref, %{}, + %{address: ~c"127.0.0.1", port: @port}} + + cluster_state = get_state(pid) + assert map_size(cluster_state.peers) == 1 + + assert %{status: status, pool_pid: pool_pid, host: host} = + cluster_state.peers[{{127, 0, 0, 1}, @port}] + + # Let's avoid race conditions... + assert status in [:up, :connected] + assert is_pid(pool_pid) + assert %Host{address: {127, 0, 0, 1}, port: @port, data_center: "datacenter1"} = host + end end @tag :capture_log From 0299f04cfb312161a44eeb6332b70f41f3e0e19e Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Thu, 31 Aug 2023 16:36:59 -0500 Subject: [PATCH 16/21] formatter --- test/xandra/cluster/pool_test.exs | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/test/xandra/cluster/pool_test.exs b/test/xandra/cluster/pool_test.exs index 0a3bd0c9..73a87c9b 100644 --- a/test/xandra/cluster/pool_test.exs +++ b/test/xandra/cluster/pool_test.exs @@ -91,9 +91,10 @@ defmodule Xandra.Cluster.PoolTest do }} end end + for use_rpc_address <- [true, false] do test "establishes a control connection and a pool when use_rpc_address_for_peer_address is #{use_rpc_address}", - %{cluster_options: cluster_options, pool_options: pool_options} do + %{cluster_options: cluster_options, pool_options: pool_options} do telemetry_ref = :telemetry_test.attach_event_handlers(self(), [ [:xandra, :cluster, :control_connection, :connected], @@ -103,25 +104,27 @@ defmodule Xandra.Cluster.PoolTest do ]) use_rpc_address = unquote(use_rpc_address) - cluster_options = Keyword.merge(cluster_options, use_rpc_address_for_peer_address: use_rpc_address) + + cluster_options = + Keyword.merge(cluster_options, use_rpc_address_for_peer_address: use_rpc_address) assert {:ok, pid} = start_supervised(spec(cluster_options, pool_options)) assert %{ - cluster_pid: ^pid, - host: %Host{address: {127, 0, 0, 1}, port: @port} - } = assert_telemetry(telemetry_ref, [:control_connection, :connected]) + cluster_pid: ^pid, + host: %Host{address: {127, 0, 0, 1}, port: @port} + } = assert_telemetry(telemetry_ref, [:control_connection, :connected]) assert %{ - cluster_pid: ^pid, - host: %Host{address: {127, 0, 0, 1}, port: @port} - } = assert_telemetry(telemetry_ref, [:pool, :started]) + cluster_pid: ^pid, + host: %Host{address: {127, 0, 0, 1}, port: @port} + } = assert_telemetry(telemetry_ref, [:pool, :started]) assert %{ - event_type: :host_added, - cluster_pid: ^pid, - host: %Host{address: {127, 0, 0, 1}, port: @port} - } = assert_telemetry(telemetry_ref, [:change_event]) + event_type: :host_added, + cluster_pid: ^pid, + host: %Host{address: {127, 0, 0, 1}, port: @port} + } = assert_telemetry(telemetry_ref, [:change_event]) assert_receive {[:xandra, :connected], ^telemetry_ref, %{}, %{address: ~c"127.0.0.1", port: @port}} @@ -130,7 +133,7 @@ defmodule Xandra.Cluster.PoolTest do assert map_size(cluster_state.peers) == 1 assert %{status: status, pool_pid: pool_pid, host: host} = - cluster_state.peers[{{127, 0, 0, 1}, @port}] + cluster_state.peers[{{127, 0, 0, 1}, @port}] # Let's avoid race conditions... assert status in [:up, :connected] From 6b1a5ace1c4536309a2776d43377ef99c63686fd Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Thu, 31 Aug 2023 16:46:42 -0500 Subject: [PATCH 17/21] formatter --- .../cluster/control_connection_test.exs | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/test/xandra/cluster/control_connection_test.exs b/test/xandra/cluster/control_connection_test.exs index 3a6e27f5..dd9418ca 100644 --- a/test/xandra/cluster/control_connection_test.exs +++ b/test/xandra/cluster/control_connection_test.exs @@ -43,6 +43,15 @@ defmodule Xandra.Cluster.ControlConnectionTest do assert %Host{address: {127, 0, 0, 1}, data_center: "datacenter1", rack: "rack1"} = local_peer end + test "reports data upon successful connection with use_rpc_address_for_peer_address = true", + %{mirror_ref: mirror_ref, start_options: start_options} do + start_options = Keyword.merge(start_options, use_rpc_address_for_peer_address: true) + + start_control_connection!(start_options) + assert_receive {^mirror_ref, {:discovered_hosts, [local_peer]}} + assert %Host{address: {127, 0, 0, 1}, data_center: "datacenter1", rack: "rack1"} = local_peer + end + test "fails to start if it can't connect to the contact point node", %{start_options: start_options} do telemetry_event = [:xandra, :cluster, :control_connection, :failed_to_connect] @@ -198,6 +207,25 @@ defmodule Xandra.Cluster.ControlConnectionTest do [%Host{address: {192, 168, 1, 1}}, %Host{address: {192, 168, 1, 2}}]}} end + test "sends :discovered_hosts message when refreshing the cluster topology with use_rpc_address_for_peer_address = true", + %{mirror_ref: mirror_ref, start_options: start_options} do + start_options = Keyword.merge(start_options, use_rpc_address_for_peer_address: true) + + ctrl_conn = start_control_connection!(start_options) + assert_receive {^mirror_ref, {:discovered_hosts, [%Host{address: {127, 0, 0, 1}}]}} + + new_peers = [ + %Host{address: {192, 168, 1, 1}, port: @port, data_center: "datacenter1"}, + %Host{address: {192, 168, 1, 2}, port: @port, data_center: "datacenter2"} + ] + + GenServer.cast(ctrl_conn, {:refresh_topology, new_peers}) + + assert_receive {^mirror_ref, + {:discovered_hosts, + [%Host{address: {192, 168, 1, 1}}, %Host{address: {192, 168, 1, 2}}]}} + end + test "triggers a topology refresh with the :refresh_topology message", %{mirror_ref: mirror_ref, start_options: start_options} do ctrl_conn = start_control_connection!(start_options) From 8340b4d6e20fa45d36a52f79c18893152daae983 Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Tue, 5 Sep 2023 12:20:21 -0500 Subject: [PATCH 18/21] always use rpc_address if available --- lib/xandra/cluster.ex | 9 -- lib/xandra/cluster/control_connection.ex | 43 ++++----- lib/xandra/cluster/pool.ex | 11 +-- .../cluster/control_connection_test.exs | 31 +------ test/xandra/cluster/pool_test.exs | 92 +++++++++---------- 5 files changed, 61 insertions(+), 125 deletions(-) diff --git a/lib/xandra/cluster.ex b/lib/xandra/cluster.ex index ea5bd4e2..6e2b7441 100644 --- a/lib/xandra/cluster.ex +++ b/lib/xandra/cluster.ex @@ -345,15 +345,6 @@ defmodule Xandra.Cluster do *Available since v0.18.0*. """ ], - use_rpc_address_for_peer_address: [ - type: :boolean, - doc: """ - In the system.peers table use the `rpc_address` column for the - peer/Host address and not the `peer` column - """, - default: false, - required: false - ], # Internal for testing, not exposed. xandra_module: [type: :atom, default: Xandra, doc: false], diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index 0fbfd895..e71f5932 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -29,8 +29,7 @@ defmodule Xandra.Cluster.ControlConnection do contact_node: [type: {:tuple, [{:or, [:string, :any]}, :non_neg_integer]}, required: true], connection_options: [type: :keyword_list, required: true], autodiscovered_nodes_port: [type: :non_neg_integer, required: true], - refresh_topology_interval: [type: :timeout, required: true], - use_rpc_address_for_peer_address: [type: :boolean, required: true] + refresh_topology_interval: [type: :timeout, required: true] ] defstruct [ @@ -53,10 +52,6 @@ defmodule Xandra.Cluster.ControlConnection do # The interval at which to refresh the cluster topology. :refresh_topology_interval, - # In the system.peers table use the `rpc_address` column for the - # peer/Host address and not the `peer` column - :use_rpc_address_for_peer_address, - # The protocol module of the node we're connected to. :protocol_module, @@ -88,8 +83,6 @@ defmodule Xandra.Cluster.ControlConnection do contact_node: contact_node, autodiscovered_nodes_port: Keyword.fetch!(options, :autodiscovered_nodes_port), refresh_topology_interval: Keyword.fetch!(options, :refresh_topology_interval), - use_rpc_address_for_peer_address: - Keyword.fetch!(options, :use_rpc_address_for_peer_address), connection_options: connection_opts, transport: transport } @@ -151,8 +144,7 @@ defmodule Xandra.Cluster.ControlConnection do state.autodiscovered_nodes_port, state.protocol_module, state.ip, - state.port, - state.use_rpc_address_for_peer_address + state.port ), :ok <- Transport.setopts(state.transport, active: :once) do state = refresh_topology(state, peers) @@ -221,8 +213,7 @@ defmodule Xandra.Cluster.ControlConnection do state.autodiscovered_nodes_port, state.protocol_module, state.ip, - state.port, - state.use_rpc_address_for_peer_address + state.port ), :ok <- register_to_events(state), :ok <- Transport.setopts(state.transport, active: :once) do @@ -360,8 +351,7 @@ defmodule Xandra.Cluster.ControlConnection do :inet.port_number(), module(), :inet.ip_address(), - :inet.port_number(), - boolean() + :inet.port_number() ) :: {:ok, [Host.t()]} | {:error, :closed | :inet.posix()} def fetch_cluster_topology( @@ -369,14 +359,13 @@ defmodule Xandra.Cluster.ControlConnection do autodiscovered_nodes_port, protocol_module, ip, - port, - use_rpc_address + port ) when is_integer(autodiscovered_nodes_port) and is_atom(protocol_module) do with {:ok, [local_node_info]} <- query(transport, protocol_module, @select_local_query), {:ok, peers} <- query(transport, protocol_module, @select_peers_query) do local_peer = %Host{ - queried_peer_to_host(local_node_info, use_rpc_address) + queried_peer_to_host(local_node_info) | address: ip, port: port } @@ -388,7 +377,7 @@ defmodule Xandra.Cluster.ControlConnection do peers = for peer_attrs <- peers, peer = %Host{ - queried_peer_to_host(peer_attrs, use_rpc_address) + queried_peer_to_host(peer_attrs) | port: autodiscovered_nodes_port }, not is_nil(peer.host_id), @@ -426,15 +415,15 @@ defmodule Xandra.Cluster.ControlConnection do end end - defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, true = use_rpc_address) + defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs) when is_tuple(rpc_address) do {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") peer_attrs = Map.delete(peer_attrs, "peer") peer_attrs = Map.put(peer_attrs, "address", address) - queried_peer_to_host(peer_attrs, use_rpc_address) + queried_peer_to_host(peer_attrs) end - defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, true = use_rpc_address) do + defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs) do {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") peer_attrs = Map.delete(peer_attrs, "peer") @@ -453,18 +442,18 @@ defmodule Xandra.Cluster.ControlConnection do Map.put(peer_attrs, "address", address) end - queried_peer_to_host(peer_attrs, use_rpc_address) + queried_peer_to_host(peer_attrs) end - defp queried_peer_to_host(%{"peer" => peer} = peer_attrs, use_rpc_address) + defp queried_peer_to_host(%{"peer" => peer} = peer_attrs) when is_tuple(peer) do {address, peer_attrs} = Map.pop!(peer_attrs, "peer") peer_attrs = Map.delete(peer_attrs, "rpc_address") peer_attrs = Map.put(peer_attrs, "address", address) - queried_peer_to_host(peer_attrs, use_rpc_address) + queried_peer_to_host(peer_attrs) end - defp queried_peer_to_host(%{"peer" => _} = peer_attrs, use_rpc_address) do + defp queried_peer_to_host(%{"peer" => _} = peer_attrs) do {address, peer_attrs} = Map.pop!(peer_attrs, "peer") peer_attrs = Map.delete(peer_attrs, "rpc_address") @@ -483,10 +472,10 @@ defmodule Xandra.Cluster.ControlConnection do Map.put(peer_attrs, "address", address) end - queried_peer_to_host(peer_attrs, use_rpc_address) + queried_peer_to_host(peer_attrs) end - defp queried_peer_to_host(%{} = peer_attrs, _) do + defp queried_peer_to_host(%{} = peer_attrs) do columns = [ "address", "data_center", diff --git a/lib/xandra/cluster/pool.ex b/lib/xandra/cluster/pool.ex index 95bfe21d..b40eb25d 100644 --- a/lib/xandra/cluster/pool.ex +++ b/lib/xandra/cluster/pool.ex @@ -107,10 +107,6 @@ defmodule Xandra.Cluster.Pool do # The name of the cluster (if present), only used for Telemetry events. :name, - # In the system.peers table use the `rpc_address` column for the - # peer/Host address and not the `peer` column - use_rpc_address_for_peer_address: false, - # A map of peername ({address, port}) to info about that peer. # Each info map is: # %{pool_pid: pid(), pool_ref: ref(), host: Host.t(), status: :up | :down | :connected} @@ -172,9 +168,7 @@ defmodule Xandra.Cluster.Pool do queue: :queue.new(), max_size: Keyword.fetch!(queue_before_connecting_opts, :buffer_size) }, - sync_connect_ref: sync_connect_ref_or_nil && {parent, sync_connect_ref_or_nil}, - use_rpc_address_for_peer_address: - Keyword.fetch!(cluster_opts, :use_rpc_address_for_peer_address) + sync_connect_ref: sync_connect_ref_or_nil && {parent, sync_connect_ref_or_nil} } actions = [ @@ -568,8 +562,7 @@ defmodule Xandra.Cluster.Pool do contact_node: {host.address, host.port}, connection_options: data.pool_options, autodiscovered_nodes_port: data.autodiscovered_nodes_port, - refresh_topology_interval: data.refresh_topology_interval, - use_rpc_address_for_peer_address: data.use_rpc_address_for_peer_address + refresh_topology_interval: data.refresh_topology_interval ] case data.control_conn_mod.start_link(control_conn_opts) do diff --git a/test/xandra/cluster/control_connection_test.exs b/test/xandra/cluster/control_connection_test.exs index dd9418ca..26d680fd 100644 --- a/test/xandra/cluster/control_connection_test.exs +++ b/test/xandra/cluster/control_connection_test.exs @@ -29,8 +29,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do refresh_topology_interval: 60_000, autodiscovered_nodes_port: @port, connection_options: [protocol_version: @protocol_version], - contact_node: {~c"127.0.0.1", @port}, - use_rpc_address_for_peer_address: false + contact_node: {~c"127.0.0.1", @port} ] %{mirror_ref: mirror_ref, mirror: mirror, start_options: start_options} @@ -43,15 +42,6 @@ defmodule Xandra.Cluster.ControlConnectionTest do assert %Host{address: {127, 0, 0, 1}, data_center: "datacenter1", rack: "rack1"} = local_peer end - test "reports data upon successful connection with use_rpc_address_for_peer_address = true", - %{mirror_ref: mirror_ref, start_options: start_options} do - start_options = Keyword.merge(start_options, use_rpc_address_for_peer_address: true) - - start_control_connection!(start_options) - assert_receive {^mirror_ref, {:discovered_hosts, [local_peer]}} - assert %Host{address: {127, 0, 0, 1}, data_center: "datacenter1", rack: "rack1"} = local_peer - end - test "fails to start if it can't connect to the contact point node", %{start_options: start_options} do telemetry_event = [:xandra, :cluster, :control_connection, :failed_to_connect] @@ -207,25 +197,6 @@ defmodule Xandra.Cluster.ControlConnectionTest do [%Host{address: {192, 168, 1, 1}}, %Host{address: {192, 168, 1, 2}}]}} end - test "sends :discovered_hosts message when refreshing the cluster topology with use_rpc_address_for_peer_address = true", - %{mirror_ref: mirror_ref, start_options: start_options} do - start_options = Keyword.merge(start_options, use_rpc_address_for_peer_address: true) - - ctrl_conn = start_control_connection!(start_options) - assert_receive {^mirror_ref, {:discovered_hosts, [%Host{address: {127, 0, 0, 1}}]}} - - new_peers = [ - %Host{address: {192, 168, 1, 1}, port: @port, data_center: "datacenter1"}, - %Host{address: {192, 168, 1, 2}, port: @port, data_center: "datacenter2"} - ] - - GenServer.cast(ctrl_conn, {:refresh_topology, new_peers}) - - assert_receive {^mirror_ref, - {:discovered_hosts, - [%Host{address: {192, 168, 1, 1}}, %Host{address: {192, 168, 1, 2}}]}} - end - test "triggers a topology refresh with the :refresh_topology message", %{mirror_ref: mirror_ref, start_options: start_options} do ctrl_conn = start_control_connection!(start_options) diff --git a/test/xandra/cluster/pool_test.exs b/test/xandra/cluster/pool_test.exs index 73a87c9b..0df0150e 100644 --- a/test/xandra/cluster/pool_test.exs +++ b/test/xandra/cluster/pool_test.exs @@ -26,8 +26,7 @@ defmodule Xandra.Cluster.PoolTest do queue_before_connecting: [ buffer_size: 100, timeout: 5000 - ], - use_rpc_address_for_peer_address: true + ] ] pool_opts = @@ -92,54 +91,47 @@ defmodule Xandra.Cluster.PoolTest do end end - for use_rpc_address <- [true, false] do - test "establishes a control connection and a pool when use_rpc_address_for_peer_address is #{use_rpc_address}", - %{cluster_options: cluster_options, pool_options: pool_options} do - telemetry_ref = - :telemetry_test.attach_event_handlers(self(), [ - [:xandra, :cluster, :control_connection, :connected], - [:xandra, :cluster, :pool, :started], - [:xandra, :cluster, :change_event], - [:xandra, :connected] - ]) - - use_rpc_address = unquote(use_rpc_address) - - cluster_options = - Keyword.merge(cluster_options, use_rpc_address_for_peer_address: use_rpc_address) - - assert {:ok, pid} = start_supervised(spec(cluster_options, pool_options)) - - assert %{ - cluster_pid: ^pid, - host: %Host{address: {127, 0, 0, 1}, port: @port} - } = assert_telemetry(telemetry_ref, [:control_connection, :connected]) - - assert %{ - cluster_pid: ^pid, - host: %Host{address: {127, 0, 0, 1}, port: @port} - } = assert_telemetry(telemetry_ref, [:pool, :started]) - - assert %{ - event_type: :host_added, - cluster_pid: ^pid, - host: %Host{address: {127, 0, 0, 1}, port: @port} - } = assert_telemetry(telemetry_ref, [:change_event]) - - assert_receive {[:xandra, :connected], ^telemetry_ref, %{}, - %{address: ~c"127.0.0.1", port: @port}} - - cluster_state = get_state(pid) - assert map_size(cluster_state.peers) == 1 - - assert %{status: status, pool_pid: pool_pid, host: host} = - cluster_state.peers[{{127, 0, 0, 1}, @port}] - - # Let's avoid race conditions... - assert status in [:up, :connected] - assert is_pid(pool_pid) - assert %Host{address: {127, 0, 0, 1}, port: @port, data_center: "datacenter1"} = host - end + test "establishes a control connection and a pool", + %{cluster_options: cluster_options, pool_options: pool_options} do + telemetry_ref = + :telemetry_test.attach_event_handlers(self(), [ + [:xandra, :cluster, :control_connection, :connected], + [:xandra, :cluster, :pool, :started], + [:xandra, :cluster, :change_event], + [:xandra, :connected] + ]) + + assert {:ok, pid} = start_supervised(spec(cluster_options, pool_options)) + + assert %{ + cluster_pid: ^pid, + host: %Host{address: {127, 0, 0, 1}, port: @port} + } = assert_telemetry(telemetry_ref, [:control_connection, :connected]) + + assert %{ + cluster_pid: ^pid, + host: %Host{address: {127, 0, 0, 1}, port: @port} + } = assert_telemetry(telemetry_ref, [:pool, :started]) + + assert %{ + event_type: :host_added, + cluster_pid: ^pid, + host: %Host{address: {127, 0, 0, 1}, port: @port} + } = assert_telemetry(telemetry_ref, [:change_event]) + + assert_receive {[:xandra, :connected], ^telemetry_ref, %{}, + %{address: ~c"127.0.0.1", port: @port}} + + cluster_state = get_state(pid) + assert map_size(cluster_state.peers) == 1 + + assert %{status: status, pool_pid: pool_pid, host: host} = + cluster_state.peers[{{127, 0, 0, 1}, @port}] + + # Let's avoid race conditions... + assert status in [:up, :connected] + assert is_pid(pool_pid) + assert %Host{address: {127, 0, 0, 1}, port: @port, data_center: "datacenter1"} = host end @tag :capture_log From e3259e1b464c96e834a16e2911ce027e1c2d66f4 Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Thu, 7 Sep 2023 09:15:30 -0500 Subject: [PATCH 19/21] formatter --- lib/xandra/cluster/control_connection.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index e71f5932..77f1f3b6 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -418,6 +418,7 @@ defmodule Xandra.Cluster.ControlConnection do defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs) when is_tuple(rpc_address) do {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") + IO.puts("!!! peer_attrs rpc_address tuple #{address} -- type #{inspect(IEx.Helpers.i(address))}") peer_attrs = Map.delete(peer_attrs, "peer") peer_attrs = Map.put(peer_attrs, "address", address) queried_peer_to_host(peer_attrs) @@ -425,6 +426,7 @@ defmodule Xandra.Cluster.ControlConnection do defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs) do {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") + IO.puts("!!!! peer_attrs rpc_address #{address} -- type #{inspect(IEx.Helpers.i(address))}") peer_attrs = Map.delete(peer_attrs, "peer") peer_attrs = @@ -448,14 +450,12 @@ defmodule Xandra.Cluster.ControlConnection do defp queried_peer_to_host(%{"peer" => peer} = peer_attrs) when is_tuple(peer) do {address, peer_attrs} = Map.pop!(peer_attrs, "peer") - peer_attrs = Map.delete(peer_attrs, "rpc_address") peer_attrs = Map.put(peer_attrs, "address", address) queried_peer_to_host(peer_attrs) end defp queried_peer_to_host(%{"peer" => _} = peer_attrs) do {address, peer_attrs} = Map.pop!(peer_attrs, "peer") - peer_attrs = Map.delete(peer_attrs, "rpc_address") peer_attrs = case :inet.parse_address(address) do From 7b5ca9df8c944687a1ab603393ea83a763ff61d5 Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Thu, 7 Sep 2023 09:44:39 -0500 Subject: [PATCH 20/21] formatter --- lib/xandra/cluster/control_connection.ex | 6 +++++- test/xandra/cluster/pool_test.exs | 24 ++++++++++++------------ 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index 77f1f3b6..96393080 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -418,7 +418,11 @@ defmodule Xandra.Cluster.ControlConnection do defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs) when is_tuple(rpc_address) do {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") - IO.puts("!!! peer_attrs rpc_address tuple #{address} -- type #{inspect(IEx.Helpers.i(address))}") + + IO.puts( + "!!! peer_attrs rpc_address tuple #{address} -- type #{inspect(IEx.Helpers.i(address))}" + ) + peer_attrs = Map.delete(peer_attrs, "peer") peer_attrs = Map.put(peer_attrs, "address", address) queried_peer_to_host(peer_attrs) diff --git a/test/xandra/cluster/pool_test.exs b/test/xandra/cluster/pool_test.exs index 0df0150e..3935def7 100644 --- a/test/xandra/cluster/pool_test.exs +++ b/test/xandra/cluster/pool_test.exs @@ -92,7 +92,7 @@ defmodule Xandra.Cluster.PoolTest do end test "establishes a control connection and a pool", - %{cluster_options: cluster_options, pool_options: pool_options} do + %{cluster_options: cluster_options, pool_options: pool_options} do telemetry_ref = :telemetry_test.attach_event_handlers(self(), [ [:xandra, :cluster, :control_connection, :connected], @@ -104,20 +104,20 @@ defmodule Xandra.Cluster.PoolTest do assert {:ok, pid} = start_supervised(spec(cluster_options, pool_options)) assert %{ - cluster_pid: ^pid, - host: %Host{address: {127, 0, 0, 1}, port: @port} - } = assert_telemetry(telemetry_ref, [:control_connection, :connected]) + cluster_pid: ^pid, + host: %Host{address: {127, 0, 0, 1}, port: @port} + } = assert_telemetry(telemetry_ref, [:control_connection, :connected]) assert %{ - cluster_pid: ^pid, - host: %Host{address: {127, 0, 0, 1}, port: @port} - } = assert_telemetry(telemetry_ref, [:pool, :started]) + cluster_pid: ^pid, + host: %Host{address: {127, 0, 0, 1}, port: @port} + } = assert_telemetry(telemetry_ref, [:pool, :started]) assert %{ - event_type: :host_added, - cluster_pid: ^pid, - host: %Host{address: {127, 0, 0, 1}, port: @port} - } = assert_telemetry(telemetry_ref, [:change_event]) + event_type: :host_added, + cluster_pid: ^pid, + host: %Host{address: {127, 0, 0, 1}, port: @port} + } = assert_telemetry(telemetry_ref, [:change_event]) assert_receive {[:xandra, :connected], ^telemetry_ref, %{}, %{address: ~c"127.0.0.1", port: @port}} @@ -126,7 +126,7 @@ defmodule Xandra.Cluster.PoolTest do assert map_size(cluster_state.peers) == 1 assert %{status: status, pool_pid: pool_pid, host: host} = - cluster_state.peers[{{127, 0, 0, 1}, @port}] + cluster_state.peers[{{127, 0, 0, 1}, @port}] # Let's avoid race conditions... assert status in [:up, :connected] From 5545567fe28bd2a89bd8739649c2548890da50a3 Mon Sep 17 00:00:00 2001 From: Jacqueline McKinney Date: Thu, 7 Sep 2023 09:49:34 -0500 Subject: [PATCH 21/21] formatter --- lib/xandra/cluster/control_connection.ex | 6 ------ 1 file changed, 6 deletions(-) diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index 96393080..16d45f43 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -418,11 +418,6 @@ defmodule Xandra.Cluster.ControlConnection do defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs) when is_tuple(rpc_address) do {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") - - IO.puts( - "!!! peer_attrs rpc_address tuple #{address} -- type #{inspect(IEx.Helpers.i(address))}" - ) - peer_attrs = Map.delete(peer_attrs, "peer") peer_attrs = Map.put(peer_attrs, "address", address) queried_peer_to_host(peer_attrs) @@ -430,7 +425,6 @@ defmodule Xandra.Cluster.ControlConnection do defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs) do {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") - IO.puts("!!!! peer_attrs rpc_address #{address} -- type #{inspect(IEx.Helpers.i(address))}") peer_attrs = Map.delete(peer_attrs, "peer") peer_attrs =