forked from apache/cassandra-gocql-driver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cluster.go
147 lines (127 loc) · 5.73 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright (c) 2012 The gocql Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gocql
import (
"errors"
"sync"
"time"
"github.com/gocql/gocql/internal/lru"
)
const defaultMaxPreparedStmts = 1000
//Package global reference to Prepared Statements LRU
var stmtsLRU preparedLRU
//preparedLRU is the prepared statement cache
type preparedLRU struct {
sync.Mutex
lru *lru.Cache
}
//Max adjusts the maximum size of the cache and cleans up the oldest records if
//the new max is lower than the previous value. Not concurrency safe.
func (p *preparedLRU) Max(max int) {
for p.lru.Len() > max {
p.lru.RemoveOldest()
}
p.lru.MaxEntries = max
}
func initStmtsLRU(max int) {
if stmtsLRU.lru != nil {
stmtsLRU.Max(max)
} else {
stmtsLRU.lru = lru.New(max)
}
}
// To enable periodic node discovery enable DiscoverHosts in ClusterConfig
type DiscoveryConfig struct {
// If not empty will filter all discoverred hosts to a single Data Centre (default: "")
DcFilter string
// If not empty will filter all discoverred hosts to a single Rack (default: "")
RackFilter string
// The interval to check for new hosts (default: 30s)
Sleep time.Duration
}
// PoolConfig configures the connection pool used by the driver, it defaults to
// using a round robbin host selection policy and a round robbin connection selection
// policy for each host.
type PoolConfig struct {
// HostSelectionPolicy sets the policy for selecting which host to use for a
// given query (default: RoundRobinHostPolicy())
HostSelectionPolicy HostSelectionPolicy
// ConnSelectionPolicy sets the policy factory for selecting a connection to use for
// each host for a query (default: RoundRobinConnPolicy())
ConnSelectionPolicy func() ConnSelectionPolicy
}
func (p PoolConfig) buildPool(session *Session) (*policyConnPool, error) {
hostSelection := p.HostSelectionPolicy
if hostSelection == nil {
hostSelection = RoundRobinHostPolicy()
}
connSelection := p.ConnSelectionPolicy
if connSelection == nil {
connSelection = RoundRobinConnPolicy()
}
return newPolicyConnPool(session, hostSelection, connSelection)
}
// ClusterConfig is a struct to configure the default cluster implementation
// of gocoql. It has a varity of attributes that can be used to modify the
// behavior to fit the most common use cases. Applications that requre a
// different setup must implement their own cluster.
type ClusterConfig struct {
Hosts []string // addresses for the initial connections
CQLVersion string // CQL version (default: 3.0.0)
ProtoVersion int // version of the native protocol (default: 2)
Timeout time.Duration // connection timeout (default: 600ms)
Port int // port (default: 9042)
Keyspace string // initial keyspace (optional)
NumConns int // number of connections per host (default: 2)
Consistency Consistency // default consistency level (default: Quorum)
Compressor Compressor // compression algorithm (default: nil)
Authenticator Authenticator // authenticator (default: nil)
RetryPolicy RetryPolicy // Default retry policy to use for queries (default: 0)
SocketKeepalive time.Duration // The keepalive period to use, enabled if > 0 (default: 0)
DiscoverHosts bool // If set, gocql will attempt to automatically discover other members of the Cassandra cluster (default: false)
MaxPreparedStmts int // Sets the maximum cache size for prepared statements globally for gocql (default: 1000)
MaxRoutingKeyInfo int // Sets the maximum cache size for query info about statements for each session (default: 1000)
PageSize int // Default page size to use for created sessions (default: 5000)
SerialConsistency SerialConsistency // Sets the consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL (default: unset)
Discovery DiscoveryConfig
SslOpts *SslOptions
DefaultTimestamp bool // Sends a client side timestamp for all requests which overrides the timestamp at which it arrives at the server. (default: true, only enabled for protocol 3 and above)
// PoolConfig configures the underlying connection pool, allowing the
// configuration of host selection and connection selection policies.
PoolConfig PoolConfig
// The maximum amount of time to wait for schema agreement in a cluster after
// receiving a schema change frame. (deault: 60s)
MaxWaitSchemaAgreement time.Duration
// internal config for testing
disableControlConn bool
}
// NewCluster generates a new config for the default cluster implementation.
func NewCluster(hosts ...string) *ClusterConfig {
cfg := &ClusterConfig{
Hosts: hosts,
CQLVersion: "3.0.0",
ProtoVersion: 2,
Timeout: 600 * time.Millisecond,
Port: 9042,
NumConns: 2,
Consistency: Quorum,
DiscoverHosts: false,
MaxPreparedStmts: defaultMaxPreparedStmts,
MaxRoutingKeyInfo: 1000,
PageSize: 5000,
DefaultTimestamp: true,
MaxWaitSchemaAgreement: 60 * time.Second,
}
return cfg
}
// CreateSession initializes the cluster based on this config and returns a
// session object that can be used to interact with the database.
func (cfg *ClusterConfig) CreateSession() (*Session, error) {
return NewSession(*cfg)
}
var (
ErrNoHosts = errors.New("no hosts provided")
ErrNoConnectionsStarted = errors.New("no connections were made when creating the session")
ErrHostQueryFailed = errors.New("unable to populate Hosts")
)