diff --git a/lib/xandra/cluster.ex b/lib/xandra/cluster.ex index ea5bd4e2..6e2b7441 100644 --- a/lib/xandra/cluster.ex +++ b/lib/xandra/cluster.ex @@ -345,15 +345,6 @@ defmodule Xandra.Cluster do *Available since v0.18.0*. """ ], - use_rpc_address_for_peer_address: [ - type: :boolean, - doc: """ - In the system.peers table use the `rpc_address` column for the - peer/Host address and not the `peer` column - """, - default: false, - required: false - ], # Internal for testing, not exposed. xandra_module: [type: :atom, default: Xandra, doc: false], diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index 0fbfd895..e71f5932 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -29,8 +29,7 @@ defmodule Xandra.Cluster.ControlConnection do contact_node: [type: {:tuple, [{:or, [:string, :any]}, :non_neg_integer]}, required: true], connection_options: [type: :keyword_list, required: true], autodiscovered_nodes_port: [type: :non_neg_integer, required: true], - refresh_topology_interval: [type: :timeout, required: true], - use_rpc_address_for_peer_address: [type: :boolean, required: true] + refresh_topology_interval: [type: :timeout, required: true] ] defstruct [ @@ -53,10 +52,6 @@ defmodule Xandra.Cluster.ControlConnection do # The interval at which to refresh the cluster topology. :refresh_topology_interval, - # In the system.peers table use the `rpc_address` column for the - # peer/Host address and not the `peer` column - :use_rpc_address_for_peer_address, - # The protocol module of the node we're connected to. :protocol_module, @@ -88,8 +83,6 @@ defmodule Xandra.Cluster.ControlConnection do contact_node: contact_node, autodiscovered_nodes_port: Keyword.fetch!(options, :autodiscovered_nodes_port), refresh_topology_interval: Keyword.fetch!(options, :refresh_topology_interval), - use_rpc_address_for_peer_address: - Keyword.fetch!(options, :use_rpc_address_for_peer_address), connection_options: connection_opts, transport: transport } @@ -151,8 +144,7 @@ defmodule Xandra.Cluster.ControlConnection do state.autodiscovered_nodes_port, state.protocol_module, state.ip, - state.port, - state.use_rpc_address_for_peer_address + state.port ), :ok <- Transport.setopts(state.transport, active: :once) do state = refresh_topology(state, peers) @@ -221,8 +213,7 @@ defmodule Xandra.Cluster.ControlConnection do state.autodiscovered_nodes_port, state.protocol_module, state.ip, - state.port, - state.use_rpc_address_for_peer_address + state.port ), :ok <- register_to_events(state), :ok <- Transport.setopts(state.transport, active: :once) do @@ -360,8 +351,7 @@ defmodule Xandra.Cluster.ControlConnection do :inet.port_number(), module(), :inet.ip_address(), - :inet.port_number(), - boolean() + :inet.port_number() ) :: {:ok, [Host.t()]} | {:error, :closed | :inet.posix()} def fetch_cluster_topology( @@ -369,14 +359,13 @@ defmodule Xandra.Cluster.ControlConnection do autodiscovered_nodes_port, protocol_module, ip, - port, - use_rpc_address + port ) when is_integer(autodiscovered_nodes_port) and is_atom(protocol_module) do with {:ok, [local_node_info]} <- query(transport, protocol_module, @select_local_query), {:ok, peers} <- query(transport, protocol_module, @select_peers_query) do local_peer = %Host{ - queried_peer_to_host(local_node_info, use_rpc_address) + queried_peer_to_host(local_node_info) | address: ip, port: port } @@ -388,7 +377,7 @@ defmodule Xandra.Cluster.ControlConnection do peers = for peer_attrs <- peers, peer = %Host{ - queried_peer_to_host(peer_attrs, use_rpc_address) + queried_peer_to_host(peer_attrs) | port: autodiscovered_nodes_port }, not is_nil(peer.host_id), @@ -426,15 +415,15 @@ defmodule Xandra.Cluster.ControlConnection do end end - defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, true = use_rpc_address) + defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs) when is_tuple(rpc_address) do {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") peer_attrs = Map.delete(peer_attrs, "peer") peer_attrs = Map.put(peer_attrs, "address", address) - queried_peer_to_host(peer_attrs, use_rpc_address) + queried_peer_to_host(peer_attrs) end - defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, true = use_rpc_address) do + defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs) do {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") peer_attrs = Map.delete(peer_attrs, "peer") @@ -453,18 +442,18 @@ defmodule Xandra.Cluster.ControlConnection do Map.put(peer_attrs, "address", address) end - queried_peer_to_host(peer_attrs, use_rpc_address) + queried_peer_to_host(peer_attrs) end - defp queried_peer_to_host(%{"peer" => peer} = peer_attrs, use_rpc_address) + defp queried_peer_to_host(%{"peer" => peer} = peer_attrs) when is_tuple(peer) do {address, peer_attrs} = Map.pop!(peer_attrs, "peer") peer_attrs = Map.delete(peer_attrs, "rpc_address") peer_attrs = Map.put(peer_attrs, "address", address) - queried_peer_to_host(peer_attrs, use_rpc_address) + queried_peer_to_host(peer_attrs) end - defp queried_peer_to_host(%{"peer" => _} = peer_attrs, use_rpc_address) do + defp queried_peer_to_host(%{"peer" => _} = peer_attrs) do {address, peer_attrs} = Map.pop!(peer_attrs, "peer") peer_attrs = Map.delete(peer_attrs, "rpc_address") @@ -483,10 +472,10 @@ defmodule Xandra.Cluster.ControlConnection do Map.put(peer_attrs, "address", address) end - queried_peer_to_host(peer_attrs, use_rpc_address) + queried_peer_to_host(peer_attrs) end - defp queried_peer_to_host(%{} = peer_attrs, _) do + defp queried_peer_to_host(%{} = peer_attrs) do columns = [ "address", "data_center", diff --git a/lib/xandra/cluster/pool.ex b/lib/xandra/cluster/pool.ex index 95bfe21d..b40eb25d 100644 --- a/lib/xandra/cluster/pool.ex +++ b/lib/xandra/cluster/pool.ex @@ -107,10 +107,6 @@ defmodule Xandra.Cluster.Pool do # The name of the cluster (if present), only used for Telemetry events. :name, - # In the system.peers table use the `rpc_address` column for the - # peer/Host address and not the `peer` column - use_rpc_address_for_peer_address: false, - # A map of peername ({address, port}) to info about that peer. # Each info map is: # %{pool_pid: pid(), pool_ref: ref(), host: Host.t(), status: :up | :down | :connected} @@ -172,9 +168,7 @@ defmodule Xandra.Cluster.Pool do queue: :queue.new(), max_size: Keyword.fetch!(queue_before_connecting_opts, :buffer_size) }, - sync_connect_ref: sync_connect_ref_or_nil && {parent, sync_connect_ref_or_nil}, - use_rpc_address_for_peer_address: - Keyword.fetch!(cluster_opts, :use_rpc_address_for_peer_address) + sync_connect_ref: sync_connect_ref_or_nil && {parent, sync_connect_ref_or_nil} } actions = [ @@ -568,8 +562,7 @@ defmodule Xandra.Cluster.Pool do contact_node: {host.address, host.port}, connection_options: data.pool_options, autodiscovered_nodes_port: data.autodiscovered_nodes_port, - refresh_topology_interval: data.refresh_topology_interval, - use_rpc_address_for_peer_address: data.use_rpc_address_for_peer_address + refresh_topology_interval: data.refresh_topology_interval ] case data.control_conn_mod.start_link(control_conn_opts) do diff --git a/test/xandra/cluster/control_connection_test.exs b/test/xandra/cluster/control_connection_test.exs index dd9418ca..26d680fd 100644 --- a/test/xandra/cluster/control_connection_test.exs +++ b/test/xandra/cluster/control_connection_test.exs @@ -29,8 +29,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do refresh_topology_interval: 60_000, autodiscovered_nodes_port: @port, connection_options: [protocol_version: @protocol_version], - contact_node: {~c"127.0.0.1", @port}, - use_rpc_address_for_peer_address: false + contact_node: {~c"127.0.0.1", @port} ] %{mirror_ref: mirror_ref, mirror: mirror, start_options: start_options} @@ -43,15 +42,6 @@ defmodule Xandra.Cluster.ControlConnectionTest do assert %Host{address: {127, 0, 0, 1}, data_center: "datacenter1", rack: "rack1"} = local_peer end - test "reports data upon successful connection with use_rpc_address_for_peer_address = true", - %{mirror_ref: mirror_ref, start_options: start_options} do - start_options = Keyword.merge(start_options, use_rpc_address_for_peer_address: true) - - start_control_connection!(start_options) - assert_receive {^mirror_ref, {:discovered_hosts, [local_peer]}} - assert %Host{address: {127, 0, 0, 1}, data_center: "datacenter1", rack: "rack1"} = local_peer - end - test "fails to start if it can't connect to the contact point node", %{start_options: start_options} do telemetry_event = [:xandra, :cluster, :control_connection, :failed_to_connect] @@ -207,25 +197,6 @@ defmodule Xandra.Cluster.ControlConnectionTest do [%Host{address: {192, 168, 1, 1}}, %Host{address: {192, 168, 1, 2}}]}} end - test "sends :discovered_hosts message when refreshing the cluster topology with use_rpc_address_for_peer_address = true", - %{mirror_ref: mirror_ref, start_options: start_options} do - start_options = Keyword.merge(start_options, use_rpc_address_for_peer_address: true) - - ctrl_conn = start_control_connection!(start_options) - assert_receive {^mirror_ref, {:discovered_hosts, [%Host{address: {127, 0, 0, 1}}]}} - - new_peers = [ - %Host{address: {192, 168, 1, 1}, port: @port, data_center: "datacenter1"}, - %Host{address: {192, 168, 1, 2}, port: @port, data_center: "datacenter2"} - ] - - GenServer.cast(ctrl_conn, {:refresh_topology, new_peers}) - - assert_receive {^mirror_ref, - {:discovered_hosts, - [%Host{address: {192, 168, 1, 1}}, %Host{address: {192, 168, 1, 2}}]}} - end - test "triggers a topology refresh with the :refresh_topology message", %{mirror_ref: mirror_ref, start_options: start_options} do ctrl_conn = start_control_connection!(start_options) diff --git a/test/xandra/cluster/pool_test.exs b/test/xandra/cluster/pool_test.exs index 73a87c9b..0df0150e 100644 --- a/test/xandra/cluster/pool_test.exs +++ b/test/xandra/cluster/pool_test.exs @@ -26,8 +26,7 @@ defmodule Xandra.Cluster.PoolTest do queue_before_connecting: [ buffer_size: 100, timeout: 5000 - ], - use_rpc_address_for_peer_address: true + ] ] pool_opts = @@ -92,54 +91,47 @@ defmodule Xandra.Cluster.PoolTest do end end - for use_rpc_address <- [true, false] do - test "establishes a control connection and a pool when use_rpc_address_for_peer_address is #{use_rpc_address}", - %{cluster_options: cluster_options, pool_options: pool_options} do - telemetry_ref = - :telemetry_test.attach_event_handlers(self(), [ - [:xandra, :cluster, :control_connection, :connected], - [:xandra, :cluster, :pool, :started], - [:xandra, :cluster, :change_event], - [:xandra, :connected] - ]) - - use_rpc_address = unquote(use_rpc_address) - - cluster_options = - Keyword.merge(cluster_options, use_rpc_address_for_peer_address: use_rpc_address) - - assert {:ok, pid} = start_supervised(spec(cluster_options, pool_options)) - - assert %{ - cluster_pid: ^pid, - host: %Host{address: {127, 0, 0, 1}, port: @port} - } = assert_telemetry(telemetry_ref, [:control_connection, :connected]) - - assert %{ - cluster_pid: ^pid, - host: %Host{address: {127, 0, 0, 1}, port: @port} - } = assert_telemetry(telemetry_ref, [:pool, :started]) - - assert %{ - event_type: :host_added, - cluster_pid: ^pid, - host: %Host{address: {127, 0, 0, 1}, port: @port} - } = assert_telemetry(telemetry_ref, [:change_event]) - - assert_receive {[:xandra, :connected], ^telemetry_ref, %{}, - %{address: ~c"127.0.0.1", port: @port}} - - cluster_state = get_state(pid) - assert map_size(cluster_state.peers) == 1 - - assert %{status: status, pool_pid: pool_pid, host: host} = - cluster_state.peers[{{127, 0, 0, 1}, @port}] - - # Let's avoid race conditions... - assert status in [:up, :connected] - assert is_pid(pool_pid) - assert %Host{address: {127, 0, 0, 1}, port: @port, data_center: "datacenter1"} = host - end + test "establishes a control connection and a pool", + %{cluster_options: cluster_options, pool_options: pool_options} do + telemetry_ref = + :telemetry_test.attach_event_handlers(self(), [ + [:xandra, :cluster, :control_connection, :connected], + [:xandra, :cluster, :pool, :started], + [:xandra, :cluster, :change_event], + [:xandra, :connected] + ]) + + assert {:ok, pid} = start_supervised(spec(cluster_options, pool_options)) + + assert %{ + cluster_pid: ^pid, + host: %Host{address: {127, 0, 0, 1}, port: @port} + } = assert_telemetry(telemetry_ref, [:control_connection, :connected]) + + assert %{ + cluster_pid: ^pid, + host: %Host{address: {127, 0, 0, 1}, port: @port} + } = assert_telemetry(telemetry_ref, [:pool, :started]) + + assert %{ + event_type: :host_added, + cluster_pid: ^pid, + host: %Host{address: {127, 0, 0, 1}, port: @port} + } = assert_telemetry(telemetry_ref, [:change_event]) + + assert_receive {[:xandra, :connected], ^telemetry_ref, %{}, + %{address: ~c"127.0.0.1", port: @port}} + + cluster_state = get_state(pid) + assert map_size(cluster_state.peers) == 1 + + assert %{status: status, pool_pid: pool_pid, host: host} = + cluster_state.peers[{{127, 0, 0, 1}, @port}] + + # Let's avoid race conditions... + assert status in [:up, :connected] + assert is_pid(pool_pid) + assert %Host{address: {127, 0, 0, 1}, port: @port, data_center: "datacenter1"} = host end @tag :capture_log