forked from couchbase/cbgt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpindex_impl.go
288 lines (237 loc) · 9.69 KB
/
pindex_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.
package cbgt
import (
"container/list"
"fmt"
"io"
"github.com/gorilla/mux"
"github.com/rcrowley/go-metrics"
)
// PIndexImpl represents a runtime pindex implementation instance,
// whose runtime type depends on the pindex's type.
type PIndexImpl interface{}
// PIndexImplType defines the functions that every pindex
// implementation type must register on startup.
type PIndexImplType struct {
// Invoked by the manager to customize the index definition
// during creating or updating indexes (Optional).
Prepare func(indexDef *IndexDef) (*IndexDef, error)
// Invoked by the manager to validate the index definition
// before going ahead with the actual creation (Optional).
Validate func(indexType, indexName, indexParams string) error
// Invoked by the manager on index deletion to clean up
// any stats/resources pertaining to the index before removing
// the index (Optional).
OnDelete func(indexDef *IndexDef)
// Invoked by the manager when it wants to create an index
// partition. The pindex implementation should persist enough
// info into the path subdirectory so that it can reconstitute the
// pindex during restart and Open().
New func(indexType, indexParams, path string, restart func()) (
PIndexImpl, Dest, error)
// Invoked by the manager when it wants a pindex implementation to
// reconstitute and reload a pindex instance back into the
// process, such as when the process has re-started.
Open func(indexType, path string, restart func()) (
PIndexImpl, Dest, error)
// Optional, invoked by the manager when it wants a pindex
// implementation to reconstitute and reload a pindex instance
// back into the process, with the updated index parameter values.
OpenUsing func(indexType, path, indexParams string,
restart func()) (PIndexImpl, Dest, error)
// Invoked by the manager when it wants a count of documents from
// an index. The registered Count() function can be nil.
Count func(mgr *Manager, indexName, indexUUID string) (
uint64, error)
// Invoked by the manager when it wants to query an index. The
// registered Query() function can be nil.
Query func(mgr *Manager, indexName, indexUUID string,
req []byte, res io.Writer) error
// Description is used to populate docs, UI, etc, such as index
// type drop-down control in the web admin UI. Format of the
// description string:
//
// $categoryName/$indexType - short descriptive string
//
// The $categoryName is something like "advanced", or "general".
Description string
// A prototype instance of indexParams JSON that is usable for
// Validate() and New().
StartSample interface{}
// Example instances of JSON that are usable for Query requests().
// These are used to help generate API documentation.
QuerySamples func() []Documentation
// Displayed in docs, web admin UI, etc, and often might be a link
// to even further help.
QueryHelp string
// Invoked during startup to allow pindex implementation to affect
// the REST API with its own endpoint.
InitRouter func(r *mux.Router, phase string, mgr *Manager)
// Optional, additional handlers a pindex implementation may have
// for /api/diag output.
DiagHandlers []DiagHandler
// Optional, allows pindex implementation to add more information
// to the REST /api/managerMeta output.
MetaExtra func(map[string]interface{})
// Optional, allows pindex implementation to specify advanced UI
// implementations and information.
UI map[string]string
// Optional, invoked for checking whether the pindex implementations
// can effect the config changes through a restart of pindexes.
AnalyzeIndexDefUpdates func(mgr *Manager, configUpdates *ConfigAnalyzeRequest) ResultCode
// Invoked by the manager when it wants to trigger generic operations
// on the index.
SubmitTaskRequest func(mgr *Manager, indexName,
indexUUID string, req []byte) (*TaskRequestStatus, error)
}
// ConfigAnalyzeRequest wraps up the various configuration
// parameters that the PIndexImplType implementations deals with.
type ConfigAnalyzeRequest struct {
IndexDefnCur *IndexDef
IndexDefnPrev *IndexDef
SourcePartitionsCur map[string]bool
SourcePartitionsPrev map[string]bool
}
// ResultCode represents the return code indicative of the various operations
// recommended by the pindex implementations upon detecting a config change.
type ResultCode string
const (
// PINDEXES_RESTART suggests a reboot of the pindexes
PINDEXES_RESTART ResultCode = "request_restart_pindexes"
)
// PIndexImplTypes is a global registry of pindex type backends or
// implementations. It is keyed by indexType and should be treated as
// immutable/read-only after process init/startup.
var PIndexImplTypes = make(map[string]*PIndexImplType)
// RegisterPIndexImplType registers a index type into the system.
func RegisterPIndexImplType(indexType string, t *PIndexImplType) {
PIndexImplTypes[indexType] = t
}
// NewPIndexImpl creates an index partition of the given, registered
// index type.
func NewPIndexImpl(indexType, indexParams, path string, restart func()) (
PIndexImpl, Dest, error) {
t, exists := PIndexImplTypes[indexType]
if !exists || t == nil || t.New == nil {
return nil, nil,
fmt.Errorf("pindex_impl: NewPIndexImpl indexType: %s",
indexType)
}
return t.New(indexType, indexParams, path, restart)
}
// OpenPIndexImpl loads an index partition of the given, registered
// index type from a given path.
func OpenPIndexImpl(indexType, path string, restart func()) (
PIndexImpl, Dest, error) {
t, exists := PIndexImplTypes[indexType]
if !exists || t == nil || t.Open == nil {
return nil, nil, fmt.Errorf("pindex_impl: OpenPIndexImpl"+
" indexType: %s", indexType)
}
return t.Open(indexType, path, restart)
}
// OpenPIndexImplUsing loads an index partition of the given, registered
// index type from a given path with the given indexParams.
func OpenPIndexImplUsing(indexType, path, indexParams string,
restart func()) (PIndexImpl, Dest, error) {
t, exists := PIndexImplTypes[indexType]
if !exists || t == nil || t.OpenUsing == nil {
return nil, nil, fmt.Errorf("pindex_impl: OpenPIndexImplUsing"+
" indexType: %s", indexType)
}
return t.OpenUsing(indexType, path, indexParams, restart)
}
// PIndexImplTypeForIndex retrieves from the Cfg provider the index
// type for a given index.
func PIndexImplTypeForIndex(cfg Cfg, indexName string) (
*PIndexImplType, error) {
_, pindexImplType, err := GetIndexDef(cfg, indexName)
return pindexImplType, err
}
// GetIndexDef retrieves the IndexDef and PIndexImplType for an index.
func GetIndexDef(cfg Cfg, indexName string) (
*IndexDef, *PIndexImplType, error) {
indexDefs, _, err := CfgGetIndexDefs(cfg)
if err != nil || indexDefs == nil {
return nil, nil, fmt.Errorf("pindex_impl: could not get indexDefs,"+
" indexName: %s, err: %v",
indexName, err)
}
indexDef := indexDefs.IndexDefs[indexName]
if indexDef == nil {
return nil, nil, fmt.Errorf("pindex_impl: no indexDef,"+
" indexName: %s", indexName)
}
pindexImplType := PIndexImplTypes[indexDef.Type]
if pindexImplType == nil {
return nil, nil, fmt.Errorf("pindex_impl: no pindexImplType,"+
" indexName: %s, indexDef.Type: %s",
indexName, indexDef.Type)
}
return indexDef, pindexImplType, nil
}
// ------------------------------------------------
// QueryCtlParams defines the JSON that includes the "ctl" part of a
// query request. These "ctl" query request parameters are
// independent of any specific pindex type.
type QueryCtlParams struct {
Ctl QueryCtl `json:"ctl"`
}
// QueryCtl defines the JSON parameters that control query execution
// and which are independent of any specific pindex type.
//
// A PartitionSelection value can optionally be specified for performing
// advanced scatter gather operations, recognized options:
// - "" : default behavior - active partitions only
// - "advanced-local" : local partitions are favored
// - "advanced-random" : pseudo-random selection from available options
type QueryCtl struct {
Timeout int64 `json:"timeout"`
Consistency *ConsistencyParams `json:"consistency"`
PartitionSelection string `json:"partition_selection,omitempty"`
}
// QUERY_CTL_DEFAULT_TIMEOUT_MS is the default query timeout.
const QUERY_CTL_DEFAULT_TIMEOUT_MS = int64(10000)
// ------------------------------------------------
// PINDEX_STORE_MAX_ERRORS is the max number of errors that a
// PIndexStoreStats will track.
var PINDEX_STORE_MAX_ERRORS = 40
// PIndexStoreStats provides some common stats/metrics and error
// tracking that some pindex type backends can reuse.
type PIndexStoreStats struct {
TimerBatchStore metrics.Timer
Errors *list.List // Capped list of string (json).
TotalErrorCount uint64
}
func (d *PIndexStoreStats) WriteJSON(w io.Writer) {
w.Write([]byte(`{"TimerBatchStore":`))
WriteTimerJSON(w, d.TimerBatchStore)
if d.Errors != nil {
w.Write([]byte(`,"Errors":[`))
e := d.Errors.Front()
i := 0
for e != nil {
j, ok := e.Value.(string)
if ok && j != "" {
if i > 0 {
w.Write(JsonComma)
}
w.Write([]byte(j))
}
e = e.Next()
i = i + 1
}
w.Write([]byte(`]`))
}
w.Write(JsonCloseBrace)
}
var prefixPIndexStoreStats = []byte(`{"pindexStoreStats":`)