diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 74db442c..fb612d75 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -8,11 +8,11 @@ different ports to test different features (such as authenticationn). To run normal tests, do this from the root of the project: ```bash -docker-compose up --daemon +docker-compose up --detach mix test ``` -The `--daemon` flag runs the instances as daemons in the background. Give it a +The `--detach` flag runs the instances as daemons in the background. Give it a minute between starting the services and running `mix test.all` since Cassandra takes a while to start. You can check whether the Docker containers are ready with `docker-compose ps`. To stop the services, run `docker-compose stop`. diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index 7b16c24c..16d45f43 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -376,7 +376,10 @@ defmodule Xandra.Cluster.ControlConnection do # https://user.cassandra.apache.narkive.com/APRtj5hb/system-peers-and-decommissioned-nodes. peers = for peer_attrs <- peers, - peer = %Host{queried_peer_to_host(peer_attrs) | port: autodiscovered_nodes_port}, + peer = %Host{ + queried_peer_to_host(peer_attrs) + | port: autodiscovered_nodes_port + }, not is_nil(peer.host_id), do: peer @@ -412,12 +415,64 @@ defmodule Xandra.Cluster.ControlConnection do end end - defp queried_peer_to_host(%{"peer" => _} = peer_attrs) do + defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs) + when is_tuple(rpc_address) do + {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") + peer_attrs = Map.delete(peer_attrs, "peer") + peer_attrs = Map.put(peer_attrs, "address", address) + queried_peer_to_host(peer_attrs) + end + + defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs) do + {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") + peer_attrs = Map.delete(peer_attrs, "peer") + + peer_attrs = + case :inet.parse_address(address) do + {:ok, valid_address_tuple} -> + Map.put(peer_attrs, "address", valid_address_tuple) + + error -> + Logger.error( + "queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}" + ) + + # failed to parse, however, use what was returned in the table, see if + # node_validation will validate it + Map.put(peer_attrs, "address", address) + end + + queried_peer_to_host(peer_attrs) + end + + defp queried_peer_to_host(%{"peer" => peer} = peer_attrs) + when is_tuple(peer) do {address, peer_attrs} = Map.pop!(peer_attrs, "peer") peer_attrs = Map.put(peer_attrs, "address", address) queried_peer_to_host(peer_attrs) end + defp queried_peer_to_host(%{"peer" => _} = peer_attrs) do + {address, peer_attrs} = Map.pop!(peer_attrs, "peer") + + peer_attrs = + case :inet.parse_address(address) do + {:ok, valid_address_tuple} -> + Map.put(peer_attrs, "address", valid_address_tuple) + + error -> + Logger.error( + "queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}" + ) + + # failed to parse, however, use what was returned in the table, see if + # node_validation will validate it + Map.put(peer_attrs, "address", address) + end + + queried_peer_to_host(peer_attrs) + end + defp queried_peer_to_host(%{} = peer_attrs) do columns = [ "address", diff --git a/lib/xandra/cluster/pool.ex b/lib/xandra/cluster/pool.ex index aa94ccca..b40eb25d 100644 --- a/lib/xandra/cluster/pool.ex +++ b/lib/xandra/cluster/pool.ex @@ -152,25 +152,24 @@ defmodule Xandra.Cluster.Pool do queue_before_connecting_opts = Keyword.fetch!(cluster_opts, :queue_before_connecting) queue_before_connecting_timeout = Keyword.fetch!(queue_before_connecting_opts, :timeout) - data = - %__MODULE__{ - pool_options: pool_opts, - contact_nodes: Keyword.fetch!(cluster_opts, :nodes), - load_balancing_module: lb_mod, - load_balancing_state: lb_mod.init(lb_opts), - autodiscovered_nodes_port: Keyword.fetch!(cluster_opts, :autodiscovered_nodes_port), - xandra_mod: Keyword.fetch!(cluster_opts, :xandra_module), - control_conn_mod: Keyword.fetch!(cluster_opts, :control_connection_module), - target_pools: Keyword.fetch!(cluster_opts, :target_pools), - name: Keyword.get(cluster_opts, :name), - pool_supervisor: pool_sup, - refresh_topology_interval: Keyword.fetch!(cluster_opts, :refresh_topology_interval), - reqs_before_connecting: %{ - queue: :queue.new(), - max_size: Keyword.fetch!(queue_before_connecting_opts, :buffer_size) - }, - sync_connect_ref: sync_connect_ref_or_nil && {parent, sync_connect_ref_or_nil} - } + data = %__MODULE__{ + pool_options: pool_opts, + contact_nodes: Keyword.fetch!(cluster_opts, :nodes), + load_balancing_module: lb_mod, + load_balancing_state: lb_mod.init(lb_opts), + autodiscovered_nodes_port: Keyword.fetch!(cluster_opts, :autodiscovered_nodes_port), + xandra_mod: Keyword.fetch!(cluster_opts, :xandra_module), + control_conn_mod: Keyword.fetch!(cluster_opts, :control_connection_module), + target_pools: Keyword.fetch!(cluster_opts, :target_pools), + name: Keyword.get(cluster_opts, :name), + pool_supervisor: pool_sup, + refresh_topology_interval: Keyword.fetch!(cluster_opts, :refresh_topology_interval), + reqs_before_connecting: %{ + queue: :queue.new(), + max_size: Keyword.fetch!(queue_before_connecting_opts, :buffer_size) + }, + sync_connect_ref: sync_connect_ref_or_nil && {parent, sync_connect_ref_or_nil} + } actions = [ {:next_event, :internal, :start_control_connection}, @@ -425,8 +424,7 @@ defmodule Xandra.Cluster.Pool do end defp handle_host_added(%__MODULE__{} = data, %Host{} = host) do - data = - update_in(data.load_balancing_state, &data.load_balancing_module.host_added(&1, host)) + data = update_in(data.load_balancing_state, &data.load_balancing_module.host_added(&1, host)) data = update_in( diff --git a/lib/xandra/options_validators.ex b/lib/xandra/options_validators.ex index 99d33b13..3ea866ab 100644 --- a/lib/xandra/options_validators.ex +++ b/lib/xandra/options_validators.ex @@ -62,6 +62,28 @@ defmodule Xandra.OptionsValidators do end end + def validate_node(%Xandra.Cluster.Host{address: address, port: port}) when is_tuple(address) do + case :inet.ntoa(address) do + {:error, :einval} -> + {:error, + "expected valid address tuple, got: address: #{inspect(address)} and port: #{inspect(port)}, with error: :einval"} + + _valid_address -> + {:ok, {address, port}} + end + end + + def validate_node(%Xandra.Cluster.Host{address: address, port: port}) when is_list(address) do + case :inet.parse_address(address) do + {:ok, valid_address} -> + {:ok, {valid_address, port}} + + error -> + {:error, + "expected valid address char list, got: address: #{inspect(address)} and port: #{inspect(port)}, with error: #{inspect(error)}"} + end + end + def validate_node(other) do {:error, "expected node to be a string or a {ip, port} tuple, got: #{inspect(other)}"} end diff --git a/test/xandra/cluster/pool_test.exs b/test/xandra/cluster/pool_test.exs index 3db7bf15..3935def7 100644 --- a/test/xandra/cluster/pool_test.exs +++ b/test/xandra/cluster/pool_test.exs @@ -231,8 +231,7 @@ defmodule Xandra.Cluster.PoolTest do status: :up, pool_pid: pool_pid, host: %Host{address: {127, 0, 0, 1}, port: @port} - } = - get_state(pid).peers[Host.to_peername(host)] + } = get_state(pid).peers[Host.to_peername(host)] assert is_pid(pool_pid) end diff --git a/test/xandra/cluster_test.exs b/test/xandra/cluster_test.exs index 73401486..4e873986 100644 --- a/test/xandra/cluster_test.exs +++ b/test/xandra/cluster_test.exs @@ -156,8 +156,7 @@ defmodule Xandra.ClusterTest do cluster = TestHelper.start_link_supervised!({Xandra.Cluster, opts}) - assert {:ok, %Xandra.Page{}} = - Xandra.Cluster.execute(cluster, "SELECT * FROM system.local") + assert {:ok, %Xandra.Page{}} = Xandra.Cluster.execute(cluster, "SELECT * FROM system.local") assert {:ok, %Xandra.Page{}} = Xandra.Cluster.execute(