diff --git a/policies.go b/policies.go index 8be5bcec3..50e7dd591 100644 --- a/policies.go +++ b/policies.go @@ -948,19 +948,37 @@ func (host selectedHostPoolHost) Mark(err error) { } type dcAwareRR struct { - local string - localHosts cowHostList - remoteHosts cowHostList - lastUsedHostIdx uint64 + local string + localHosts cowHostList + remoteHosts cowHostList + lastUsedHostIdx uint64 + permitDCFailover bool +} + +type dcFailoverDisabledPolicy interface { + setDCFailoverDisabled() +} + +type dcAwarePolicyOption func(p dcFailoverDisabledPolicy) + +func HostPolicyOptionDisableDCFailover(p dcFailoverDisabledPolicy) { + p.setDCFailoverDisabled() } // DCAwareRoundRobinPolicy is a host selection policies which will prioritize and // return hosts which are in the local datacentre before returning hosts in all // other datercentres -func DCAwareRoundRobinPolicy(localDC string) HostSelectionPolicy { - return &dcAwareRR{local: localDC} +func DCAwareRoundRobinPolicy(localDC string, opts ...dcAwarePolicyOption) HostSelectionPolicy { + p := &dcAwareRR{local: localDC, permitDCFailover: true} + for _, opt := range opts { + opt(p) + } + return p } +func (d *dcAwareRR) setDCFailoverDisabled() { + d.permitDCFailover = false +} func (d *dcAwareRR) Init(*Session) {} func (d *dcAwareRR) Reset() {} func (d *dcAwareRR) KeyspaceChanged(KeyspaceUpdateEvent) {} @@ -1035,7 +1053,12 @@ func roundRobbin(shift int, hosts ...[]*HostInfo) NextHost { func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost { nextStartOffset := atomic.AddUint64(&d.lastUsedHostIdx, 1) - return roundRobbin(int(nextStartOffset), d.localHosts.get(), d.remoteHosts.get()) + if d.permitDCFailover { + return roundRobbin(int(nextStartOffset), d.localHosts.get(), d.remoteHosts.get()) + } else { + return roundRobbin(int(nextStartOffset), d.localHosts.get()) + } + } // RackAwareRoundRobinPolicy is a host selection policies which will prioritize and @@ -1047,15 +1070,19 @@ type rackAwareRR struct { // It is accessed atomically and needs to be aligned to 64 bits, so we // keep it first in the struct. Do not move it or add new struct members // before it. - lastUsedHostIdx uint64 - localDC string - localRack string - hosts []cowHostList + lastUsedHostIdx uint64 + localDC string + localRack string + hosts []cowHostList + permitDCFailover bool } -func RackAwareRoundRobinPolicy(localDC string, localRack string) HostSelectionPolicy { - hosts := make([]cowHostList, 3) - return &rackAwareRR{localDC: localDC, localRack: localRack, hosts: hosts} +func RackAwareRoundRobinPolicy(localDC string, localRack string, opts ...dcAwarePolicyOption) HostSelectionPolicy { + p := &rackAwareRR{localDC: localDC, localRack: localRack, hosts: make([]cowHostList, 3), permitDCFailover: true} + for _, opt := range opts { + opt(p) + } + return p } func (d *rackAwareRR) Init(*Session) {} @@ -1067,6 +1094,10 @@ func (d *rackAwareRR) MaxHostTier() uint { return 2 } +func (d *rackAwareRR) setDCFailoverDisabled() { + d.permitDCFailover = false +} + // Experimental, this interface and use may change func (d *rackAwareRR) SetTablets(tablets []*TabletInfo) {} @@ -1101,7 +1132,12 @@ func (d *rackAwareRR) HostDown(host *HostInfo) { d.RemoveHost(host) } func (d *rackAwareRR) Pick(q ExecutableQuery) NextHost { nextStartOffset := atomic.AddUint64(&d.lastUsedHostIdx, 1) - return roundRobbin(int(nextStartOffset), d.hosts[0].get(), d.hosts[1].get(), d.hosts[2].get()) + if d.permitDCFailover { + return roundRobbin(int(nextStartOffset), d.hosts[0].get(), d.hosts[1].get(), d.hosts[2].get()) + } else { + return roundRobbin(int(nextStartOffset), d.hosts[0].get(), d.hosts[1].get()) + } + } // ReadyPolicy defines a policy for when a HostSelectionPolicy can be used. After