Skip to content

Commit

Permalink
Add option to control if remote replicas should be used
Browse files Browse the repository at this point in the history
Previously the driver would always failover to use replicas
in the remote datacenters if there was no replicas available
in local_dc when using DC/RackAwareRoundRobinPolicy. There was
no control over it. This is not how the other driver handle it.

This commit adds a option to dc/rackAwareRR to control if dc
failover is permitted. And it changes the default to not use
remote datacenter.
  • Loading branch information
sylwiaszunejko committed Jul 2, 2024
1 parent e853e9b commit ac748fa
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 24 deletions.
70 changes: 55 additions & 15 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,19 +948,37 @@ func (host selectedHostPoolHost) Mark(err error) {
}

type dcAwareRR struct {
local string
localHosts cowHostList
remoteHosts cowHostList
lastUsedHostIdx uint64
local string
localHosts cowHostList
remoteHosts cowHostList
lastUsedHostIdx uint64
permitDCFailover bool
}

type dcFailoverEnabledPolicy interface {
setDCFailoverEnabled()
}

type dcAwarePolicyOption func(p dcFailoverEnabledPolicy)

func HostPolicyOptionEnableDCFailover(p dcFailoverEnabledPolicy) {
p.setDCFailoverEnabled()
}

// DCAwareRoundRobinPolicy is a host selection policies which will prioritize and
// return hosts which are in the local datacentre before returning hosts in all
// other datercentres
func DCAwareRoundRobinPolicy(localDC string) HostSelectionPolicy {
return &dcAwareRR{local: localDC}
func DCAwareRoundRobinPolicy(localDC string, opts ...dcAwarePolicyOption) HostSelectionPolicy {
p := &dcAwareRR{local: localDC, permitDCFailover: false}
for _, opt := range opts {
opt(p)
}
return p
}

func (d *dcAwareRR) setDCFailoverEnabled() {
d.permitDCFailover = true
}
func (d *dcAwareRR) Init(*Session) {}
func (d *dcAwareRR) Reset() {}
func (d *dcAwareRR) KeyspaceChanged(KeyspaceUpdateEvent) {}
Expand Down Expand Up @@ -1035,7 +1053,12 @@ func roundRobbin(shift int, hosts ...[]*HostInfo) NextHost {

func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost {
nextStartOffset := atomic.AddUint64(&d.lastUsedHostIdx, 1)
return roundRobbin(int(nextStartOffset), d.localHosts.get(), d.remoteHosts.get())
if d.permitDCFailover {
return roundRobbin(int(nextStartOffset), d.localHosts.get(), d.remoteHosts.get())
} else {
return roundRobbin(int(nextStartOffset), d.localHosts.get())
}

}

// RackAwareRoundRobinPolicy is a host selection policies which will prioritize and
Expand All @@ -1047,15 +1070,19 @@ type rackAwareRR struct {
// It is accessed atomically and needs to be aligned to 64 bits, so we
// keep it first in the struct. Do not move it or add new struct members
// before it.
lastUsedHostIdx uint64
localDC string
localRack string
hosts []cowHostList
lastUsedHostIdx uint64
localDC string
localRack string
hosts []cowHostList
permitDCFailover bool
}

func RackAwareRoundRobinPolicy(localDC string, localRack string) HostSelectionPolicy {
hosts := make([]cowHostList, 3)
return &rackAwareRR{localDC: localDC, localRack: localRack, hosts: hosts}
func RackAwareRoundRobinPolicy(localDC string, localRack string, opts ...dcAwarePolicyOption) HostSelectionPolicy {
p := &rackAwareRR{localDC: localDC, localRack: localRack, hosts: make([]cowHostList, 3)}
for _, opt := range opts {
opt(p)
}
return p
}

func (d *rackAwareRR) Init(*Session) {}
Expand All @@ -1067,8 +1094,16 @@ func (d *rackAwareRR) MaxHostTier() uint {
return 2
}

func (d *rackAwareRR) setDCFailoverEnabled() {
d.permitDCFailover = true
}

// Experimental, this interface and use may change
func (d *rackAwareRR) SetTablets(tablets []*TabletInfo) {}
func (d *rackAwareRR) PermitDCFailover() HostSelectionPolicy {
d.permitDCFailover = true
return d
}

func (d *rackAwareRR) HostTier(host *HostInfo) uint {
if host.DataCenter() == d.localDC {
Expand Down Expand Up @@ -1101,7 +1136,12 @@ func (d *rackAwareRR) HostDown(host *HostInfo) { d.RemoveHost(host) }

func (d *rackAwareRR) Pick(q ExecutableQuery) NextHost {
nextStartOffset := atomic.AddUint64(&d.lastUsedHostIdx, 1)
return roundRobbin(int(nextStartOffset), d.hosts[0].get(), d.hosts[1].get(), d.hosts[2].get())
if d.permitDCFailover {
return roundRobbin(int(nextStartOffset), d.hosts[0].get(), d.hosts[1].get(), d.hosts[2].get())
} else {
return roundRobbin(int(nextStartOffset), d.hosts[0].get(), d.hosts[1].get())
}

}

// ReadyPolicy defines a policy for when a HostSelectionPolicy can be used. After
Expand Down
18 changes: 9 additions & 9 deletions policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func expectNoMoreHosts(t *testing.T, iter NextHost) {
}

func TestHostPolicy_DCAwareRR(t *testing.T) {
p := DCAwareRoundRobinPolicy("local")
p := DCAwareRoundRobinPolicy("local", HostPolicyOptionEnableDCFailover)

hosts := [...]*HostInfo{
{hostId: "0", connectAddress: net.ParseIP("10.0.0.1"), dataCenter: "local"},
Expand Down Expand Up @@ -606,7 +606,7 @@ func TestHostPolicy_DCAwareRR(t *testing.T) {
// with {"class": "NetworkTopologyStrategy", "a": 1, "b": 1, "c": 1} replication.
func TestHostPolicy_TokenAware(t *testing.T) {
const keyspace = "myKeyspace"
policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"))
policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local", HostPolicyOptionEnableDCFailover))
policyInternal := policy.(*tokenAwareHostPolicy)
policyInternal.getKeyspaceName = func() string { return keyspace }
policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
Expand Down Expand Up @@ -708,7 +708,7 @@ func TestHostPolicy_TokenAware(t *testing.T) {
// with {"class": "NetworkTopologyStrategy", "a": 2, "b": 2, "c": 2} replication.
func TestHostPolicy_TokenAware_NetworkStrategy(t *testing.T) {
const keyspace = "myKeyspace"
policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"), NonLocalReplicasFallback())
policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local", HostPolicyOptionEnableDCFailover), NonLocalReplicasFallback())
policyInternal := policy.(*tokenAwareHostPolicy)
policyInternal.getKeyspaceName = func() string { return keyspace }
policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
Expand Down Expand Up @@ -797,7 +797,7 @@ func TestHostPolicy_TokenAware_NetworkStrategy(t *testing.T) {
}

func TestHostPolicy_RackAwareRR(t *testing.T) {
p := RackAwareRoundRobinPolicy("local", "b")
p := RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover)

hosts := [...]*HostInfo{
{hostId: "0", connectAddress: net.ParseIP("10.0.0.1"), dataCenter: "local", rack: "a"},
Expand Down Expand Up @@ -829,8 +829,8 @@ func TestHostPolicy_RackAwareRR(t *testing.T) {
// DC & Rack aware round-robin host selection policy fallback
func TestHostPolicy_TokenAware_RackAware(t *testing.T) {
const keyspace = "myKeyspace"
policy := TokenAwareHostPolicy(RackAwareRoundRobinPolicy("local", "b"))
policyWithFallback := TokenAwareHostPolicy(RackAwareRoundRobinPolicy("local", "b"), NonLocalReplicasFallback())
policy := TokenAwareHostPolicy(RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover))
policyWithFallback := TokenAwareHostPolicy(RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover), NonLocalReplicasFallback())

policyInternal := policy.(*tokenAwareHostPolicy)
policyInternal.getKeyspaceName = func() string { return keyspace }
Expand Down Expand Up @@ -960,7 +960,7 @@ func TestHostPolicy_TokenAware_RackAware(t *testing.T) {
}

func TestHostPolicy_TokenAware_Issue1274(t *testing.T) {
policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"))
policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local", HostPolicyOptionEnableDCFailover))
policyInternal := policy.(*tokenAwareHostPolicy)
policyInternal.getKeyspaceName = func() string { return "myKeyspace" }
policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
Expand Down Expand Up @@ -1038,7 +1038,7 @@ func TestHostPolicy_TokenAware_Issue1274(t *testing.T) {

func TestTokenAwarePolicyReset(t *testing.T) {
policy := TokenAwareHostPolicy(
RackAwareRoundRobinPolicy("local", "b"),
RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover),
NonLocalReplicasFallback(),
)
policyInternal := policy.(*tokenAwareHostPolicy)
Expand Down Expand Up @@ -1083,7 +1083,7 @@ func TestTokenAwarePolicyReset(t *testing.T) {

func TestTokenAwarePolicyResetInSessionClose(t *testing.T) {
policy := TokenAwareHostPolicy(
RackAwareRoundRobinPolicy("local", "b"),
RackAwareRoundRobinPolicy("local", "b", HostPolicyOptionEnableDCFailover),
NonLocalReplicasFallback(),
)
policyInternal := policy.(*tokenAwareHostPolicy)
Expand Down

0 comments on commit ac748fa

Please sign in to comment.