-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathcontract.go
643 lines (562 loc) · 17.1 KB
/
contract.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
package dsunit
import (
"errors"
"fmt"
"github.com/viant/afs/file"
"github.com/viant/afs/url"
"github.com/viant/assertly"
"github.com/viant/dsc"
dsurl "github.com/viant/dsunit/url"
"github.com/viant/toolbox"
"strings"
"sync"
)
// StatusOk represents ok status
const StatusOk = "ok"
// BaseResponse represent base response.
type BaseResponse struct {
Status string
Message string
}
func (r BaseResponse) Error() error {
if r.Status != StatusOk {
return errors.New(r.Message)
}
return nil
}
func (r *BaseResponse) SetError(err error) {
if err == nil {
return
}
r.Status = "error"
r.Message = err.Error()
}
func NewBaseResponse(status, message string) *BaseResponse {
return &BaseResponse{
Status: status,
Message: message,
}
}
func NewBaseOkResponse() *BaseResponse {
return NewBaseResponse(StatusOk, "")
}
// RegisterRequest represent register request
type RegisterRequest struct {
Datastore string `required:"true" description:"datastore name"`
Config *dsc.Config `description:"datastore config"`
ConfigURL string `description:"datastore config URL"`
Tables []*dsc.TableDescriptor `description:"optional table descriptors"`
PingRequest `json:",inline" yaml:",inline"`
Ping bool `description:"flag to wait for database get online"`
}
func (r *RegisterRequest) Init() (err error) {
if r.ConfigURL != "" {
if r.Config, err = dsc.NewConfigFromURL(r.ConfigURL); err != nil {
return err
}
}
return nil
}
func (r *RegisterRequest) Validate() error {
if r.Datastore == "" {
return errors.New("datastore was empty")
}
if r.Config == nil {
return errors.New("config was empty")
}
return nil
}
// NewRegisterRequest create new register request
func NewRegisterRequest(datastore string, config *dsc.Config, tables ...*dsc.TableDescriptor) *RegisterRequest {
return &RegisterRequest{
Datastore: datastore,
Config: config,
Tables: tables,
}
}
func NewRegisterRequestFromURL(URL string) (*RegisterRequest, error) {
var result = &RegisterRequest{}
location := url.Normalize(URL, file.Scheme)
err := dsurl.Decode(location, result)
return result, err
}
// RegisterResponse represents register response
type RegisterResponse struct {
*BaseResponse
}
// RecreateRequest represent recreate datastore request
type RecreateRequest struct {
Datastore string `required:"true" description:"datastore name to recreate, come database will create the whole schema, other will remove exiting tables and add registered one"`
AdminDatastore string `description:"database used to run DDL"`
}
// NewRecreateRequest create new recreate request
func NewRecreateRequest(datastore, adminDatastore string) *RecreateRequest {
return &RecreateRequest{
Datastore: datastore,
AdminDatastore: adminDatastore,
}
}
// NewRecreateRequestFromURL create a request from URL
func NewRecreateRequestFromURL(URL string) (*RecreateRequest, error) {
var result = &RecreateRequest{}
location := url.Normalize(URL, file.Scheme)
err := dsurl.Decode(location, result)
return result, err
}
// RecreateResponse represents recreate datastore response
type RecreateResponse struct {
*BaseResponse
}
// RunSQLRequest represents run SQL request
type RunSQLRequest struct {
Datastore string `required:"true" description:"registered datastore name"`
Expand bool `description:"substitute $ expression with content of context.state"`
SQL []string
}
// NewRunSQLRequest creates new run SQL request
func NewRunSQLRequest(datastore string, SQL ...string) *RunSQLRequest {
return &RunSQLRequest{
Datastore: datastore,
SQL: SQL,
}
}
// NewRunSQLRequestFromURL create a request from URL
func NewRunSQLRequestFromURL(URL string) (*RunSQLRequest, error) {
var result = &RunSQLRequest{}
location := url.Normalize(URL, file.Scheme)
err := dsurl.Decode(location, result)
return result, err
}
// RunSQLRequest represents run SQL response
type RunSQLResponse struct {
*BaseResponse
RowsAffected int
}
// RunScriptRequest represents run SQL Script request
type RunScriptRequest struct {
Datastore string `required:"true" description:"registered datastore name"`
Expand bool `description:"substitute $ expression with content of context.state"`
Scripts []*dsurl.Resource
}
// NewRunScriptRequest creates new run script request
func NewRunScriptRequest(datastore string, scripts ...*dsurl.Resource) *RunScriptRequest {
return &RunScriptRequest{
Datastore: datastore,
Scripts: scripts,
}
}
// NewRunScriptRequestFromURL create a request from URL
func NewRunScriptRequestFromURL(URL string) (*RunScriptRequest, error) {
var result = &RunScriptRequest{}
location := url.Normalize(URL, file.Scheme)
err := dsurl.Decode(location, result)
return result, err
}
// MappingRequest represnet a mapping request
type MappingRequest struct {
Mappings []*Mapping `required:"true" description:"virtual table mapping"`
}
// Init init request
func (r *MappingRequest) Init() (err error) {
if len(r.Mappings) == 0 {
return nil
}
for _, mapping := range r.Mappings {
if (mapping.Resource != nil && mapping.URL != "") || mapping.Name == "" {
if err = mapping.Init(); err == nil {
location := url.Normalize(mapping.URL, file.Scheme)
err = dsurl.Decode(location, mapping)
}
if err != nil {
return err
}
}
}
return err
}
func (r *MappingRequest) Validate() error {
if r == nil {
return nil
}
if len(r.Mappings) == 0 {
return errors.New("dbTypeMappings were empty")
}
for i, mapping := range r.Mappings {
if mapping.Name == "" {
return fmt.Errorf("dbTypeMappings[%v].name were empty", i)
}
}
return nil
}
// NewMappingRequest creates new mapping request
func NewMappingRequest(mappings ...*Mapping) *MappingRequest {
return &MappingRequest{
Mappings: mappings,
}
}
// NewMappingRequestFromURL create a request from URL
func NewMappingRequestFromURL(URL string) (*MappingRequest, error) {
var result = &MappingRequest{}
location := url.Normalize(URL, file.Scheme)
err := dsurl.Decode(location, result)
return result, err
}
// MappingResponse represents mapping response
type MappingResponse struct {
*BaseResponse
Tables []string
}
// InitRequest represents datastore init request, it actual aggregates, registraction, recreation, mapping and run script request
type InitRequest struct {
Datastore string
Recreate bool
*RegisterRequest
Admin *RegisterRequest
*MappingRequest
*RunScriptRequest
}
func (r *InitRequest) Init() (err error) {
if r.RegisterRequest != nil {
if r.RegisterRequest.Datastore == "" {
r.RegisterRequest.Datastore = r.Datastore
if len(r.Config.Parameters) == 0 {
r.Config.Parameters = map[string]interface{}{}
}
r.Config.Parameters["dbname"] = r.Datastore
}
if r.RegisterRequest.Config == nil && r.RegisterRequest.ConfigURL != "" {
r.Config, err = dsc.NewConfigFromURL(r.RegisterRequest.ConfigURL)
if err != nil {
return err
}
}
}
if r.RunScriptRequest != nil {
if r.RunScriptRequest.Datastore == "" {
r.RunScriptRequest.Datastore = r.Datastore
}
}
return nil
}
func (r *InitRequest) Validate() error {
if r.Datastore == "" {
return errors.New("datastore was empty")
}
if r.RegisterRequest == nil {
return errors.New("register reqeust was empty")
}
if r.RegisterRequest.Config == nil {
return errors.New("register request config was empty")
}
return nil
}
// NewInitRequest creates a new database init request
func NewInitRequest(datastore string, recreate bool, register, admin *RegisterRequest, mapping *MappingRequest, script *RunScriptRequest) *InitRequest {
return &InitRequest{
Datastore: datastore,
Recreate: recreate,
RegisterRequest: register,
Admin: admin,
MappingRequest: mapping,
RunScriptRequest: script,
}
}
// NewInitRequestFromURL create a request from URL
func NewInitRequestFromURL(URL string) (*InitRequest, error) {
var result = &InitRequest{}
location := url.Normalize(URL, file.Scheme)
err := dsurl.Decode(location, result)
return result, err
}
// InitResponse represent init datastore response
type InitResponse struct {
*BaseResponse
Tables []string
}
// PrepareRequest represents a request to populate datastore with data resource
type PrepareRequest struct {
Expand bool `description:"substitute $ expression with content of context.state"`
Threads int
*DatasetResource `required:"true" description:"datasets resource"`
}
// Validate checks if request is valid
func (r *PrepareRequest) Validate() error {
if r.DatasetResource == nil {
return errors.New("dataset resource was empty")
}
if r.DatastoreDatasets == nil {
return errors.New("datastore was empty")
}
if r.Resource == nil {
return errors.New("url was empty")
}
return nil
}
// NewPrepareRequest creates a new prepare request
func NewPrepareRequest(resource *DatasetResource) *PrepareRequest {
return &PrepareRequest{
DatasetResource: resource,
}
}
// NewPrepareRequestFromURL create a request from URL
func NewPrepareRequestFromURL(URL string) (*PrepareRequest, error) {
var result = &PrepareRequest{}
location := url.Normalize(URL, file.Scheme)
err := dsurl.Decode(location, result)
return result, err
}
// ModificationInfo represents a modification info
type ModificationInfo struct {
Subject string
Method string `description:"modification method determined by presence of primary key: load - insert, persist: insert or update"`
Deleted int
Modified int
Added int
}
// PrepareResponse represents a prepare response
type PrepareResponse struct {
*BaseResponse
Expand bool `description:"substitute $ expression with content of context.state"`
Modification map[string]*ModificationInfo `description:"modification info by subject"`
mux sync.Mutex
}
// ExpectRequest represents verification datastore request
type ExpectRequest struct {
*DatasetResource
CheckPolicy int `required:"true" description:"0 - FullTableDatasetCheckPolicy, 1 - SnapshotDatasetCheckPolicy"`
}
// Validate checks if request is valid
func (r *ExpectRequest) Validate() error {
if r.DatasetResource == nil {
return errors.New("dataset resource was empty")
}
if r.Resource == nil {
return errors.New("url was empty")
}
if r.DatastoreDatasets == nil {
return errors.New("datastore was empty")
}
return nil
}
// NewExpectRequest creates a new prepare request
func NewExpectRequest(checkPolicy int, resource *DatasetResource) *ExpectRequest {
return &ExpectRequest{
CheckPolicy: checkPolicy,
DatasetResource: resource,
}
}
// NewExpectRequestFromURL create a request from URL
func NewExpectRequestFromURL(URL string) (*ExpectRequest, error) {
var result = &ExpectRequest{}
location := url.Normalize(URL, file.Scheme)
err := dsurl.Decode(location, result)
return result, err
}
// ExpectRequest represents data validation
type DatasetValidation struct {
Dataset string
*assertly.Validation
Expected interface{}
Actual interface{}
}
// ExpectResponse represents verification response
type ExpectResponse struct {
*BaseResponse
Validation []*DatasetValidation
PassedCount int
FailedCount int
}
// SequenceRequest represents get sequences request
type SequenceRequest struct {
Datastore string
Tables []string
}
func NewSequenceRequest(datastore string, tables ...string) *SequenceRequest {
return &SequenceRequest{
Datastore: datastore,
Tables: tables,
}
}
// SequenceResponse represents get sequences response
type SequenceResponse struct {
*BaseResponse
Sequences map[string]int
}
// QueryRequest represents get sequences request
type QueryRequest struct {
Datastore string
SQL string
IgnoreError bool
Expect []map[string]interface{} `description:"if specified validation would take place"`
}
func NewQueryRequest(datastore, SQL string) *QueryRequest {
return &QueryRequest{
Datastore: datastore,
SQL: SQL,
}
}
// QueryResponse represents get sequences response
type QueryResponse struct {
*BaseResponse
Records Records
*assertly.Validation
}
// FreezeRequest represent a request to create a data set from datastore for provided SQL and target path
type (
FreezeRequest struct {
Datastore string `description:"registered datastore i.e. db1"`
SQL string `description:"dataset SQL soruce"`
DestURL string `description:"represent dataset destination"`
OmitEmpty bool `description:"flag to skip empty attributes"`
Ignore []string `description:"path to ignore i.e. request.postbody"`
Replace map[string]string `description:"key of path with corresponding replacement value"`
LocationTimezone string `description:"convert time to specified timezone i.e UTC"`
Override map[string]string `description:"overrides column with supplied values"`
ASCII []string `description:"column values to be ascii sanitized"`
RelativeDate []string `description:"transform date to date expr"`
Obfuscation []Obfuscation `description:"obfuscation rules"`
Reset bool `description:"add extra empty record to truncate before inserting"`
TimeFormat string `description:"java/ios based time format"`
TimeLayout string `description:"golang based time layout"`
}
)
func (r *FreezeRequest) Init() error {
if r.TimeLayout == "" && r.TimeFormat != "" {
r.TimeLayout = toolbox.DateFormatToLayout(r.TimeFormat)
}
return nil
}
// FreezeResponse response
type FreezeResponse struct {
*BaseResponse
Count int
DestURL string
}
// DumpRequest represent a request to create a database schema
type DumpRequest struct {
Datastore string `description:"registered datastore i.e. db1"`
Tables []string `description:"tables, all if empty"`
DestURL string `description:"represent dataset destination"`
Target string `description:"target vendor, use only if different than source"`
/*
mapping url content should represents a map between source and dest data (all data type should be upper case) type i.e
{
"INT": "BIGINT",
"INTEGER": "BIGINT",
"NUMERIC": "DECIMAL(7,2)",
"FLOAT": "DECIMAL(7,2)",
"VARCHAR": "VARCHAR(255)",
"STRING": "VARCHAR(255)",
"CHAR": "VARCHAR(255)",
"DATE": "DATE",
"TIMESTAMP": "TIMESTAMP",
},
*/
MappingURL string `description:"if target driver is used - you can provide data type mapping"`
}
// DumpResponse represents a dump response
type DumpResponse struct {
*BaseResponse
Count int
DestURL string
}
type DatastoreSQL struct {
Datastore string
SQL string
}
// CompareRequest represent compare request
type CompareRequest struct {
Source1 *DatastoreSQL
Source2 *DatastoreSQL
Directives map[string]interface{}
Ignore []string // columns to ignore
OmitEmpty bool
MaxRowDiscrepancy int //max discrepant rows
}
func (r *CompareRequest) Init() error {
if len(r.Directives) == 0 {
r.Directives = make(map[string]interface{})
}
if _, has := r.Directives[assertly.StrictMapCheckDirective]; !has {
r.Directives[assertly.StrictMapCheckDirective] = true
}
return nil
}
// IndexBy returns index by directive if specified
func (r CompareRequest) IndexBy() []string {
if len(r.Directives) == 0 {
return nil
}
indexBy, ok := r.Directives[assertly.IndexByDirective]
if !ok {
return nil
}
if toolbox.IsSlice(indexBy) {
var result = make([]string, 0)
toolbox.CopySliceElements(indexBy, &result)
return result
}
return strings.Split(toolbox.AsString(indexBy), ",")
}
func (r *CompareRequest) ApplyDirective(record map[string]interface{}) {
if len(r.Directives) == 0 {
return
}
for k, v := range r.Directives {
record[k] = v
}
}
// CompareResponse represents compare response
type CompareResponse struct {
*BaseResponse
Dataset1Count int
Dataset2Count int
MatchedRows int
*assertly.Validation
}
// NewDumpRequestFromURL create a request from url
func NewDumpRequestFromURL(URL string) (*DumpRequest, error) {
var result = &DumpRequest{}
location := url.Normalize(URL, file.Scheme)
err := dsurl.Decode(location, result)
return result, err
}
// PingRequest represents ping request
type PingRequest struct {
Datastore string
TimeoutMs int
}
// PingResponse represents a ping response
type PingResponse struct {
*BaseResponse
}
type SchemaTarget struct {
Datastore string `description:"datastore"`
Target string `description:"target vendor, use only if different than source"`
MappingURL string `description:"if target driver is used - you can provide data type mapping"`
}
// CheckSchemaRequest represents schema check request
type CheckSchemaRequest struct {
Source *SchemaTarget
Dest *SchemaTarget
Tables []string
CheckNullables bool
CheckPrimaryKeys bool
}
type SchemaTableCheck struct {
Table string
*assertly.Validation
}
// CheckSchemaResponse represents schema check response
type CheckSchemaResponse struct {
*BaseResponse
Tables []*SchemaTableCheck
*assertly.Validation
}
// NewCheckSchemaResponse returns new check schema response
func NewCheckSchemaResponse() *CheckSchemaResponse {
return &CheckSchemaResponse{
BaseResponse: NewBaseOkResponse(),
Tables: make([]*SchemaTableCheck, 0),
Validation: assertly.NewValidation()}
}