Skip to content

Commit

Permalink
always use rpc_address if available
Browse files Browse the repository at this point in the history
  • Loading branch information
jacquelineIO committed Sep 5, 2023
1 parent 6b1a5ac commit 8340b4d
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 125 deletions.
9 changes: 0 additions & 9 deletions lib/xandra/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -345,15 +345,6 @@ defmodule Xandra.Cluster do
*Available since v0.18.0*.
"""
],
use_rpc_address_for_peer_address: [
type: :boolean,
doc: """
In the system.peers table use the `rpc_address` column for the
peer/Host address and not the `peer` column
""",
default: false,
required: false
],

# Internal for testing, not exposed.
xandra_module: [type: :atom, default: Xandra, doc: false],
Expand Down
43 changes: 16 additions & 27 deletions lib/xandra/cluster/control_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ defmodule Xandra.Cluster.ControlConnection do
contact_node: [type: {:tuple, [{:or, [:string, :any]}, :non_neg_integer]}, required: true],
connection_options: [type: :keyword_list, required: true],
autodiscovered_nodes_port: [type: :non_neg_integer, required: true],
refresh_topology_interval: [type: :timeout, required: true],
use_rpc_address_for_peer_address: [type: :boolean, required: true]
refresh_topology_interval: [type: :timeout, required: true]
]

defstruct [
Expand All @@ -53,10 +52,6 @@ defmodule Xandra.Cluster.ControlConnection do
# The interval at which to refresh the cluster topology.
:refresh_topology_interval,

# In the system.peers table use the `rpc_address` column for the
# peer/Host address and not the `peer` column
:use_rpc_address_for_peer_address,

# The protocol module of the node we're connected to.
:protocol_module,

Expand Down Expand Up @@ -88,8 +83,6 @@ defmodule Xandra.Cluster.ControlConnection do
contact_node: contact_node,
autodiscovered_nodes_port: Keyword.fetch!(options, :autodiscovered_nodes_port),
refresh_topology_interval: Keyword.fetch!(options, :refresh_topology_interval),
use_rpc_address_for_peer_address:
Keyword.fetch!(options, :use_rpc_address_for_peer_address),
connection_options: connection_opts,
transport: transport
}
Expand Down Expand Up @@ -151,8 +144,7 @@ defmodule Xandra.Cluster.ControlConnection do
state.autodiscovered_nodes_port,
state.protocol_module,
state.ip,
state.port,
state.use_rpc_address_for_peer_address
state.port
),
:ok <- Transport.setopts(state.transport, active: :once) do
state = refresh_topology(state, peers)
Expand Down Expand Up @@ -221,8 +213,7 @@ defmodule Xandra.Cluster.ControlConnection do
state.autodiscovered_nodes_port,
state.protocol_module,
state.ip,
state.port,
state.use_rpc_address_for_peer_address
state.port
),
:ok <- register_to_events(state),
:ok <- Transport.setopts(state.transport, active: :once) do
Expand Down Expand Up @@ -360,23 +351,21 @@ defmodule Xandra.Cluster.ControlConnection do
:inet.port_number(),
module(),
:inet.ip_address(),
:inet.port_number(),
boolean()
:inet.port_number()
) ::
{:ok, [Host.t()]} | {:error, :closed | :inet.posix()}
def fetch_cluster_topology(
%Transport{} = transport,
autodiscovered_nodes_port,
protocol_module,
ip,
port,
use_rpc_address
port
)
when is_integer(autodiscovered_nodes_port) and is_atom(protocol_module) do
with {:ok, [local_node_info]} <- query(transport, protocol_module, @select_local_query),
{:ok, peers} <- query(transport, protocol_module, @select_peers_query) do
local_peer = %Host{
queried_peer_to_host(local_node_info, use_rpc_address)
queried_peer_to_host(local_node_info)
| address: ip,
port: port
}
Expand All @@ -388,7 +377,7 @@ defmodule Xandra.Cluster.ControlConnection do
peers =
for peer_attrs <- peers,
peer = %Host{
queried_peer_to_host(peer_attrs, use_rpc_address)
queried_peer_to_host(peer_attrs)
| port: autodiscovered_nodes_port
},
not is_nil(peer.host_id),
Expand Down Expand Up @@ -426,15 +415,15 @@ defmodule Xandra.Cluster.ControlConnection do
end
end

defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, true = use_rpc_address)
defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs)
when is_tuple(rpc_address) do
{address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address")
peer_attrs = Map.delete(peer_attrs, "peer")
peer_attrs = Map.put(peer_attrs, "address", address)
queried_peer_to_host(peer_attrs, use_rpc_address)
queried_peer_to_host(peer_attrs)
end

defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, true = use_rpc_address) do
defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs) do
{address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address")
peer_attrs = Map.delete(peer_attrs, "peer")

Expand All @@ -453,18 +442,18 @@ defmodule Xandra.Cluster.ControlConnection do
Map.put(peer_attrs, "address", address)
end

queried_peer_to_host(peer_attrs, use_rpc_address)
queried_peer_to_host(peer_attrs)
end

defp queried_peer_to_host(%{"peer" => peer} = peer_attrs, use_rpc_address)
defp queried_peer_to_host(%{"peer" => peer} = peer_attrs)
when is_tuple(peer) do
{address, peer_attrs} = Map.pop!(peer_attrs, "peer")
peer_attrs = Map.delete(peer_attrs, "rpc_address")
peer_attrs = Map.put(peer_attrs, "address", address)
queried_peer_to_host(peer_attrs, use_rpc_address)
queried_peer_to_host(peer_attrs)
end

defp queried_peer_to_host(%{"peer" => _} = peer_attrs, use_rpc_address) do
defp queried_peer_to_host(%{"peer" => _} = peer_attrs) do
{address, peer_attrs} = Map.pop!(peer_attrs, "peer")
peer_attrs = Map.delete(peer_attrs, "rpc_address")

Expand All @@ -483,10 +472,10 @@ defmodule Xandra.Cluster.ControlConnection do
Map.put(peer_attrs, "address", address)
end

queried_peer_to_host(peer_attrs, use_rpc_address)
queried_peer_to_host(peer_attrs)
end

defp queried_peer_to_host(%{} = peer_attrs, _) do
defp queried_peer_to_host(%{} = peer_attrs) do
columns = [
"address",
"data_center",
Expand Down
11 changes: 2 additions & 9 deletions lib/xandra/cluster/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ defmodule Xandra.Cluster.Pool do
# The name of the cluster (if present), only used for Telemetry events.
:name,

# In the system.peers table use the `rpc_address` column for the
# peer/Host address and not the `peer` column
use_rpc_address_for_peer_address: false,

# A map of peername ({address, port}) to info about that peer.
# Each info map is:
# %{pool_pid: pid(), pool_ref: ref(), host: Host.t(), status: :up | :down | :connected}
Expand Down Expand Up @@ -172,9 +168,7 @@ defmodule Xandra.Cluster.Pool do
queue: :queue.new(),
max_size: Keyword.fetch!(queue_before_connecting_opts, :buffer_size)
},
sync_connect_ref: sync_connect_ref_or_nil && {parent, sync_connect_ref_or_nil},
use_rpc_address_for_peer_address:
Keyword.fetch!(cluster_opts, :use_rpc_address_for_peer_address)
sync_connect_ref: sync_connect_ref_or_nil && {parent, sync_connect_ref_or_nil}
}

actions = [
Expand Down Expand Up @@ -568,8 +562,7 @@ defmodule Xandra.Cluster.Pool do
contact_node: {host.address, host.port},
connection_options: data.pool_options,
autodiscovered_nodes_port: data.autodiscovered_nodes_port,
refresh_topology_interval: data.refresh_topology_interval,
use_rpc_address_for_peer_address: data.use_rpc_address_for_peer_address
refresh_topology_interval: data.refresh_topology_interval
]

case data.control_conn_mod.start_link(control_conn_opts) do
Expand Down
31 changes: 1 addition & 30 deletions test/xandra/cluster/control_connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do
refresh_topology_interval: 60_000,
autodiscovered_nodes_port: @port,
connection_options: [protocol_version: @protocol_version],
contact_node: {~c"127.0.0.1", @port},
use_rpc_address_for_peer_address: false
contact_node: {~c"127.0.0.1", @port}
]

%{mirror_ref: mirror_ref, mirror: mirror, start_options: start_options}
Expand All @@ -43,15 +42,6 @@ defmodule Xandra.Cluster.ControlConnectionTest do
assert %Host{address: {127, 0, 0, 1}, data_center: "datacenter1", rack: "rack1"} = local_peer
end

test "reports data upon successful connection with use_rpc_address_for_peer_address = true",
%{mirror_ref: mirror_ref, start_options: start_options} do
start_options = Keyword.merge(start_options, use_rpc_address_for_peer_address: true)

start_control_connection!(start_options)
assert_receive {^mirror_ref, {:discovered_hosts, [local_peer]}}
assert %Host{address: {127, 0, 0, 1}, data_center: "datacenter1", rack: "rack1"} = local_peer
end

test "fails to start if it can't connect to the contact point node",
%{start_options: start_options} do
telemetry_event = [:xandra, :cluster, :control_connection, :failed_to_connect]
Expand Down Expand Up @@ -207,25 +197,6 @@ defmodule Xandra.Cluster.ControlConnectionTest do
[%Host{address: {192, 168, 1, 1}}, %Host{address: {192, 168, 1, 2}}]}}
end

test "sends :discovered_hosts message when refreshing the cluster topology with use_rpc_address_for_peer_address = true",
%{mirror_ref: mirror_ref, start_options: start_options} do
start_options = Keyword.merge(start_options, use_rpc_address_for_peer_address: true)

ctrl_conn = start_control_connection!(start_options)
assert_receive {^mirror_ref, {:discovered_hosts, [%Host{address: {127, 0, 0, 1}}]}}

new_peers = [
%Host{address: {192, 168, 1, 1}, port: @port, data_center: "datacenter1"},
%Host{address: {192, 168, 1, 2}, port: @port, data_center: "datacenter2"}
]

GenServer.cast(ctrl_conn, {:refresh_topology, new_peers})

assert_receive {^mirror_ref,
{:discovered_hosts,
[%Host{address: {192, 168, 1, 1}}, %Host{address: {192, 168, 1, 2}}]}}
end

test "triggers a topology refresh with the :refresh_topology message",
%{mirror_ref: mirror_ref, start_options: start_options} do
ctrl_conn = start_control_connection!(start_options)
Expand Down
92 changes: 42 additions & 50 deletions test/xandra/cluster/pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ defmodule Xandra.Cluster.PoolTest do
queue_before_connecting: [
buffer_size: 100,
timeout: 5000
],
use_rpc_address_for_peer_address: true
]
]

pool_opts =
Expand Down Expand Up @@ -92,54 +91,47 @@ defmodule Xandra.Cluster.PoolTest do
end
end

for use_rpc_address <- [true, false] do
test "establishes a control connection and a pool when use_rpc_address_for_peer_address is #{use_rpc_address}",
%{cluster_options: cluster_options, pool_options: pool_options} do
telemetry_ref =
:telemetry_test.attach_event_handlers(self(), [
[:xandra, :cluster, :control_connection, :connected],
[:xandra, :cluster, :pool, :started],
[:xandra, :cluster, :change_event],
[:xandra, :connected]
])

use_rpc_address = unquote(use_rpc_address)

cluster_options =
Keyword.merge(cluster_options, use_rpc_address_for_peer_address: use_rpc_address)

assert {:ok, pid} = start_supervised(spec(cluster_options, pool_options))

assert %{
cluster_pid: ^pid,
host: %Host{address: {127, 0, 0, 1}, port: @port}
} = assert_telemetry(telemetry_ref, [:control_connection, :connected])

assert %{
cluster_pid: ^pid,
host: %Host{address: {127, 0, 0, 1}, port: @port}
} = assert_telemetry(telemetry_ref, [:pool, :started])

assert %{
event_type: :host_added,
cluster_pid: ^pid,
host: %Host{address: {127, 0, 0, 1}, port: @port}
} = assert_telemetry(telemetry_ref, [:change_event])

assert_receive {[:xandra, :connected], ^telemetry_ref, %{},
%{address: ~c"127.0.0.1", port: @port}}

cluster_state = get_state(pid)
assert map_size(cluster_state.peers) == 1

assert %{status: status, pool_pid: pool_pid, host: host} =
cluster_state.peers[{{127, 0, 0, 1}, @port}]

# Let's avoid race conditions...
assert status in [:up, :connected]
assert is_pid(pool_pid)
assert %Host{address: {127, 0, 0, 1}, port: @port, data_center: "datacenter1"} = host
end
test "establishes a control connection and a pool",

Check failure on line 94 in test/xandra/cluster/pool_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.15.4, OTP 25.3, C* 3, Scylla 4.6.3, Native protocol v4)

test startup establishes a control connection and a pool (Xandra.Cluster.PoolTest)
%{cluster_options: cluster_options, pool_options: pool_options} do
telemetry_ref =
:telemetry_test.attach_event_handlers(self(), [
[:xandra, :cluster, :control_connection, :connected],
[:xandra, :cluster, :pool, :started],
[:xandra, :cluster, :change_event],
[:xandra, :connected]
])

assert {:ok, pid} = start_supervised(spec(cluster_options, pool_options))

assert %{
cluster_pid: ^pid,
host: %Host{address: {127, 0, 0, 1}, port: @port}
} = assert_telemetry(telemetry_ref, [:control_connection, :connected])

assert %{
cluster_pid: ^pid,
host: %Host{address: {127, 0, 0, 1}, port: @port}
} = assert_telemetry(telemetry_ref, [:pool, :started])

assert %{
event_type: :host_added,
cluster_pid: ^pid,
host: %Host{address: {127, 0, 0, 1}, port: @port}
} = assert_telemetry(telemetry_ref, [:change_event])

assert_receive {[:xandra, :connected], ^telemetry_ref, %{},
%{address: ~c"127.0.0.1", port: @port}}

cluster_state = get_state(pid)
assert map_size(cluster_state.peers) == 1

assert %{status: status, pool_pid: pool_pid, host: host} =
cluster_state.peers[{{127, 0, 0, 1}, @port}]

# Let's avoid race conditions...
assert status in [:up, :connected]
assert is_pid(pool_pid)
assert %Host{address: {127, 0, 0, 1}, port: @port, data_center: "datacenter1"} = host
end

@tag :capture_log
Expand Down

0 comments on commit 8340b4d

Please sign in to comment.