From ac748facac025624f2a9ee83c1daa220b08f345e Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Tue, 2 Jul 2024 07:55:49 +0200 Subject: [PATCH] Add option to control if remote replicas should be used Previously the driver would always failover to use replicas in the remote datacenters if there was no replicas available in local_dc when using DC/RackAwareRoundRobinPolicy. There was no control over it. This is not how the other driver handle it. This commit adds a option to dc/rackAwareRR to control if dc failover is permitted. And it changes the default to not use remote datacenter. --- policies.go | 70 +++++++++++++++++++++++++++++++++++++----------- policies_test.go | 18 ++++++------- 2 files changed, 64 insertions(+), 24 deletions(-) diff --git a/policies.go b/policies.go index 8be5bcec3..d73af4085 100644 --- a/policies.go +++ b/policies.go @@ -948,19 +948,37 @@ func (host selectedHostPoolHost) Mark(err error) { } type dcAwareRR struct { - local string - localHosts cowHostList - remoteHosts cowHostList - lastUsedHostIdx uint64 + local string + localHosts cowHostList + remoteHosts cowHostList + lastUsedHostIdx uint64 + permitDCFailover bool +} + +type dcFailoverEnabledPolicy interface { + setDCFailoverEnabled() +} + +type dcAwarePolicyOption func(p dcFailoverEnabledPolicy) + +func HostPolicyOptionEnableDCFailover(p dcFailoverEnabledPolicy) { + p.setDCFailoverEnabled() } // DCAwareRoundRobinPolicy is a host selection policies which will prioritize and // return hosts which are in the local datacentre before returning hosts in all // other datercentres -func DCAwareRoundRobinPolicy(localDC string) HostSelectionPolicy { - return &dcAwareRR{local: localDC} +func DCAwareRoundRobinPolicy(localDC string, opts ...dcAwarePolicyOption) HostSelectionPolicy { + p := &dcAwareRR{local: localDC, permitDCFailover: false} + for _, opt := range opts { + opt(p) + } + return p } +func (d *dcAwareRR) setDCFailoverEnabled() { + d.permitDCFailover = true +} func (d *dcAwareRR) Init(*Session) {} func (d *dcAwareRR) Reset() {} func (d *dcAwareRR) KeyspaceChanged(KeyspaceUpdateEvent) {} @@ -1035,7 +1053,12 @@ func roundRobbin(shift int, hosts ...[]*HostInfo) NextHost { func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost { nextStartOffset := atomic.AddUint64(&d.lastUsedHostIdx, 1) - return roundRobbin(int(nextStartOffset), d.localHosts.get(), d.remoteHosts.get()) + if d.permitDCFailover { + return roundRobbin(int(nextStartOffset), d.localHosts.get(), d.remoteHosts.get()) + } else { + return roundRobbin(int(nextStartOffset), d.localHosts.get()) + } + } // RackAwareRoundRobinPolicy is a host selection policies which will prioritize and @@ -1047,15 +1070,19 @@ type rackAwareRR struct { // It is accessed atomically and needs to be aligned to 64 bits, so we // keep it first in the struct. Do not move it or add new struct members // before it. - lastUsedHostIdx uint64 - localDC string - localRack string - hosts []cowHostList + lastUsedHostIdx uint64 + localDC string + localRack string + hosts []cowHostList + permitDCFailover bool } -func RackAwareRoundRobinPolicy(localDC string, localRack string) HostSelectionPolicy { - hosts := make([]cowHostList, 3) - return &rackAwareRR{localDC: localDC, localRack: localRack, hosts: hosts} +func RackAwareRoundRobinPolicy(localDC string, localRack string, opts ...dcAwarePolicyOption) HostSelectionPolicy { + p := &rackAwareRR{localDC: localDC, localRack: localRack, hosts: make([]cowHostList, 3)} + for _, opt := range opts { + opt(p) + } + return p } func (d *rackAwareRR) Init(*Session) {} @@ -1067,8 +1094,16 @@ func (d *rackAwareRR) MaxHostTier() uint { return 2 } +func (d *rackAwareRR) setDCFailoverEnabled() { + d.permitDCFailover = true +} + // Experimental, this interface and use may change func (d *rackAwareRR) SetTablets(tablets []*TabletInfo) {} +func (d *rackAwareRR) PermitDCFailover() HostSelectionPolicy { + d.permitDCFailover = true + return d +} func (d *rackAwareRR) HostTier(host *HostInfo) uint { if host.DataCenter() == d.localDC { @@ -1101,7 +1136,12 @@ func (d *rackAwareRR) HostDown(host *HostInfo) { d.RemoveHost(host) } func (d *rackAwareRR) Pick(q ExecutableQuery) NextHost { nextStartOffset := atomic.AddUint64(&d.lastUsedHostIdx, 1) - return roundRobbin(int(nextStartOffset), d.hosts[0].get(), d.hosts[1].get(), d.hosts[2].get()) + if d.permitDCFailover { + return roundRobbin(int(nextStartOffset), d.hosts[0].get(), d.hosts[1].get(), d.hosts[2].get()) + } else { + return roundRobbin(int(nextStartOffset), d.hosts[0].get(), d.hosts[1].get()) + } + } // ReadyPolicy defines a policy for when a HostSelectionPolicy can be used. After diff --git a/policies_test.go b/policies_test.go index 60f043b19..0695d409d 100644 --- a/policies_test.go +++ b/policies_test.go @@ -556,7 +556,7 @@ func expectNoMoreHosts(t *testing.T, iter NextHost) { } func TestHostPolicy_DCAwareRR(t *testing.T) { - p := DCAwareRoundRobinPolicy("local") + p := DCAwareRoundRobinPolicy("local", HostPolicyOptionEnableDCFailover) hosts := [...]*HostInfo{ {hostId: "0", connectAddress: net.ParseIP("10.0.0.1"), dataCenter: "local"}, @@ -606,7 +606,7 @@ func TestHostPolicy_DCAwareRR(t *testing.T) { // with {"class": "NetworkTopologyStrategy", "a": 1, "b": 1, "c": 1} replication. func TestHostPolicy_TokenAware(t *testing.T) { const keyspace = "myKeyspace" - policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local")) + policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local", HostPolicyOptionEnableDCFailover)) policyInternal := policy.(*tokenAwareHostPolicy) policyInternal.getKeyspaceName = func() string { return keyspace } policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) { @@ -708,7 +708,7 @@ func TestHostPolicy_TokenAware(t *testing.T) { // with {"class": "NetworkTopologyStrategy", "a": 2, "b": 2, "c": 2} replication. func TestHostPolicy_TokenAware_NetworkStrategy(t *testing.T) { const keyspace = "myKeyspace" - policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"), NonLocalReplicasFallback()) + policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local", HostPolicyOptionEnableDCFailover), NonLocalReplicasFallback()) policyInternal := policy.(*tokenAwareHostPolicy) policyInternal.getKeyspaceName = func() string { return keyspace } policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) { @@ -797,7 +797,7 @@ func TestHostPolicy_TokenAware_NetworkStrategy(t *testing.T) { } func TestHostPolicy_RackAwareRR(t *testing.T) { - p := RackAwareRoundRobinPolicy("local", "b") + p := RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover) hosts := [...]*HostInfo{ {hostId: "0", connectAddress: net.ParseIP("10.0.0.1"), dataCenter: "local", rack: "a"}, @@ -829,8 +829,8 @@ func TestHostPolicy_RackAwareRR(t *testing.T) { // DC & Rack aware round-robin host selection policy fallback func TestHostPolicy_TokenAware_RackAware(t *testing.T) { const keyspace = "myKeyspace" - policy := TokenAwareHostPolicy(RackAwareRoundRobinPolicy("local", "b")) - policyWithFallback := TokenAwareHostPolicy(RackAwareRoundRobinPolicy("local", "b"), NonLocalReplicasFallback()) + policy := TokenAwareHostPolicy(RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover)) + policyWithFallback := TokenAwareHostPolicy(RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover), NonLocalReplicasFallback()) policyInternal := policy.(*tokenAwareHostPolicy) policyInternal.getKeyspaceName = func() string { return keyspace } @@ -960,7 +960,7 @@ func TestHostPolicy_TokenAware_RackAware(t *testing.T) { } func TestHostPolicy_TokenAware_Issue1274(t *testing.T) { - policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local")) + policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local", HostPolicyOptionEnableDCFailover)) policyInternal := policy.(*tokenAwareHostPolicy) policyInternal.getKeyspaceName = func() string { return "myKeyspace" } policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) { @@ -1038,7 +1038,7 @@ func TestHostPolicy_TokenAware_Issue1274(t *testing.T) { func TestTokenAwarePolicyReset(t *testing.T) { policy := TokenAwareHostPolicy( - RackAwareRoundRobinPolicy("local", "b"), + RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover), NonLocalReplicasFallback(), ) policyInternal := policy.(*tokenAwareHostPolicy) @@ -1083,7 +1083,7 @@ func TestTokenAwarePolicyReset(t *testing.T) { func TestTokenAwarePolicyResetInSessionClose(t *testing.T) { policy := TokenAwareHostPolicy( - RackAwareRoundRobinPolicy("local", "b"), + RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover), NonLocalReplicasFallback(), ) policyInternal := policy.(*tokenAwareHostPolicy)