@@ -306,7 +306,7 @@ def assert_data_persistence(self,
306
306
cursor .execute (f'insert into versioned."{ table } " (id, col_int) values (?, ?)' , [str (uuid4 ()), 1 ])
307
307
308
308
# to trigger `alter` stmt bug(https://github.com/crate/crate/pull/17178) that falsely updated the table's
309
- # version created setting
309
+ # version created setting that resulted in oids replacing column names in resultsets
310
310
cursor .execute ('ALTER TABLE doc.parted SET ("refresh_interval" = 900)' )
311
311
312
312
# older versions had a bug that caused this to fail
@@ -318,7 +318,6 @@ def assert_data_persistence(self,
318
318
'INSERT INTO doc.parted (id, version, cols) values (?, ?, ?)' ,
319
319
args
320
320
)
321
- cursor .execute ('REFRESH TABLE doc.parted' )
322
321
accumulated_dynamic_column_names .append (key )
323
322
cursor .execute ('SELECT cols from doc.parted' )
324
323
result = cursor .fetchall ()
@@ -339,3 +338,224 @@ def assert_nodes(self, conn: Connection, num_nodes: int):
339
338
c = conn .cursor ()
340
339
c .execute ("select count(*) from sys.nodes" )
341
340
self .assertEqual (c .fetchone ()[0 ], num_nodes )
341
+
342
+
343
+ class MetaDataCompatibilityTest (NodeProvider , unittest .TestCase ):
344
+
345
+ CLUSTER_SETTINGS = {
346
+ 'license.enterprise' : 'true' ,
347
+ 'lang.js.enabled' : 'true' ,
348
+ 'cluster.name' : gen_id (),
349
+ }
350
+
351
+ SUPPORTED_VERSIONS = (
352
+ VersionDef ('2.3.x' , []),
353
+ VersionDef ('3.3.x' , []),
354
+ VersionDef ('latest-nightly' , [])
355
+ )
356
+
357
+ def test_metadata_compatibility (self ):
358
+ nodes = 3
359
+
360
+ cluster = self ._new_cluster (
361
+ self .SUPPORTED_VERSIONS [0 ].version ,
362
+ nodes ,
363
+ settings = self .CLUSTER_SETTINGS ,
364
+ explicit_discovery = False
365
+ )
366
+ cluster .start ()
367
+ with connect (cluster .node ().http_url , error_trace = True ) as conn :
368
+ cursor = conn .cursor ()
369
+ cursor .execute ('''
370
+ CREATE USER user_a;
371
+ ''' )
372
+ cursor .execute ('''
373
+ GRANT ALL PRIVILEGES ON SCHEMA doc TO user_a;
374
+ ''' )
375
+ cursor .execute ('''
376
+ CREATE FUNCTION fact(LONG)
377
+ RETURNS LONG
378
+ LANGUAGE JAVASCRIPT
379
+ AS 'function fact(a) { return a < 2 ? 0 : a * (a - 1); }';
380
+ ''' )
381
+ self ._process_on_stop ()
382
+
383
+ paths = [node ._settings ['path.data' ] for node in cluster .nodes ()]
384
+
385
+ for version_def in self .SUPPORTED_VERSIONS [1 :]:
386
+ self .assert_meta_data (version_def , nodes , paths )
387
+
388
+ # restart with latest version
389
+ self .assert_meta_data (self .SUPPORTED_VERSIONS [- 1 ], nodes , paths )
390
+
391
+ def assert_meta_data (self , version_def , nodes , data_paths = None ):
392
+ cluster = self ._new_cluster (
393
+ version_def .version ,
394
+ nodes ,
395
+ data_paths ,
396
+ self .CLUSTER_SETTINGS ,
397
+ prepare_env (version_def .java_home ),
398
+ explicit_discovery = False
399
+ )
400
+ cluster .start ()
401
+ with connect (cluster .node ().http_url , error_trace = True ) as conn :
402
+ cursor = conn .cursor ()
403
+ cursor .execute ('''
404
+ SELECT name, superuser
405
+ FROM sys.users
406
+ ORDER BY superuser, name;
407
+ ''' )
408
+ rs = cursor .fetchall ()
409
+ self .assertEqual (['user_a' , False ], rs [0 ])
410
+ self .assertEqual (['crate' , True ], rs [1 ])
411
+ cursor .execute ('''
412
+ SELECT fact(100);
413
+ ''' )
414
+ self .assertEqual (9900 , cursor .fetchone ()[0 ])
415
+ cursor .execute ('''
416
+ SELECT class, grantee, ident, state, type
417
+ FROM sys.privileges
418
+ ORDER BY class, grantee, ident, state, type
419
+ ''' )
420
+ self .assertEqual ([['SCHEMA' , 'user_a' , 'doc' , 'GRANT' , 'DDL' ],
421
+ ['SCHEMA' , 'user_a' , 'doc' , 'GRANT' , 'DML' ],
422
+ ['SCHEMA' , 'user_a' , 'doc' , 'GRANT' , 'DQL' ]],
423
+ cursor .fetchall ())
424
+
425
+ self ._process_on_stop ()
426
+
427
+
428
+ class DefaultTemplateMetaDataCompatibilityTest (NodeProvider , unittest .TestCase ):
429
+ CLUSTER_ID = gen_id ()
430
+
431
+ CLUSTER_SETTINGS = {
432
+ 'cluster.name' : CLUSTER_ID ,
433
+ }
434
+
435
+ SUPPORTED_VERSIONS = (
436
+ VersionDef ('3.0.x' , []),
437
+ VersionDef ('latest-nightly' , [])
438
+ )
439
+
440
+ def test_metadata_compatibility (self ):
441
+ nodes = 3
442
+
443
+ cluster = self ._new_cluster (self .SUPPORTED_VERSIONS [0 ].version ,
444
+ nodes ,
445
+ settings = self .CLUSTER_SETTINGS )
446
+ cluster .start ()
447
+ with connect (cluster .node ().http_url , error_trace = True ) as conn :
448
+ cursor = conn .cursor ()
449
+ cursor .execute ("select 1" )
450
+ self ._process_on_stop ()
451
+
452
+ paths = [node ._settings ['path.data' ] for node in cluster .nodes ()]
453
+ for version_def in self .SUPPORTED_VERSIONS [1 :]:
454
+ self .assert_dynamic_string_detection (version_def , nodes , paths )
455
+
456
+ def assert_dynamic_string_detection (self , version_def , nodes , data_paths ):
457
+ """ Test that a dynamic string column detection works as expected.
458
+
459
+ If the cluster was initially created/started with a lower CrateDB
460
+ version, we must ensure that our default template is also upgraded, if
461
+ needed, because it is persisted in the cluster state. That's why
462
+ re-creating tables would not help.
463
+ """
464
+ self ._move_nodes_folder_if_needed (data_paths )
465
+ cluster = self ._new_cluster (
466
+ version_def .version ,
467
+ nodes ,
468
+ data_paths ,
469
+ self .CLUSTER_SETTINGS ,
470
+ prepare_env (version_def .java_home ),
471
+ )
472
+ cluster .start ()
473
+ with connect (cluster .node ().http_url , error_trace = True ) as conn :
474
+ cursor = conn .cursor ()
475
+ cursor .execute ('CREATE TABLE t1 (o object)' )
476
+ cursor .execute ('''INSERT INTO t1 (o) VALUES ({"name" = 'foo'})''' )
477
+ self .assertEqual (cursor .rowcount , 1 )
478
+ cursor .execute ('REFRESH TABLE t1' )
479
+ cursor .execute ("SELECT o['name'], count(*) FROM t1 GROUP BY 1" )
480
+ rs = cursor .fetchall ()
481
+ self .assertEqual (['foo' , 1 ], rs [0 ])
482
+ cursor .execute ('DROP TABLE t1' )
483
+ self ._process_on_stop ()
484
+
485
+ def _move_nodes_folder_if_needed (self , data_paths ):
486
+ """Eliminates the cluster-id folder inside the data directory."""
487
+ for path in data_paths :
488
+ data_path_incl_cluster_id = os .path .join (path , self .CLUSTER_ID )
489
+ if os .path .exists (data_path_incl_cluster_id ):
490
+ src_path_nodes = os .path .join (data_path_incl_cluster_id , 'nodes' )
491
+ target_path_nodes = os .path .join (self ._path_data , 'nodes' )
492
+ shutil .move (src_path_nodes , target_path_nodes )
493
+ shutil .rmtree (data_path_incl_cluster_id )
494
+
495
+
496
+ class SnapshotCompatibilityTest (NodeProvider , unittest .TestCase ):
497
+
498
+ CREATE_REPOSITORY = '''
499
+ CREATE REPOSITORY r1 TYPE S3
500
+ WITH (access_key = 'minio',
501
+ secret_key = 'miniostorage',
502
+ bucket='backups',
503
+ endpoint = '127.0.0.1:9000',
504
+ protocol = 'http')
505
+ '''
506
+
507
+ CREATE_SNAPSHOT_TPT = "CREATE SNAPSHOT r1.s{} ALL WITH (wait_for_completion = true)"
508
+
509
+ RESTORE_SNAPSHOT_TPT = "RESTORE SNAPSHOT r1.s{} ALL WITH (wait_for_completion = true)"
510
+
511
+ DROP_DOC_TABLE = 'DROP TABLE t1'
512
+
513
+ VERSION = ('5.0.x' , 'latest-nightly' )
514
+
515
+ def test_snapshot_compatibility (self ):
516
+ """Test snapshot compatibility when upgrading 5.0.x -> latest-nightly
517
+
518
+ Using Minio as a S3 repository, the first cluster that runs
519
+ creates the repo, a table and inserts/selects some data, which
520
+ then is snapshotted and deleted. The next cluster recovers the
521
+ data from the last snapshot, performs further inserts/selects,
522
+ to then snapshot the data and delete it.
523
+ """
524
+ with MinioServer () as minio :
525
+ t = threading .Thread (target = minio .run )
526
+ t .daemon = True
527
+ t .start ()
528
+ wait_until (lambda : _is_up ('127.0.0.1' , 9000 ))
529
+
530
+ num_nodes = 3
531
+ num_docs = 30
532
+ prev_version = None
533
+ num_snapshot = 1
534
+
535
+ cluster_settings = {
536
+ 'cluster.name' : gen_id (),
537
+ }
538
+
539
+ paths = None
540
+ for version in self .VERSION :
541
+ cluster = self ._new_cluster (version , num_nodes , paths , settings = cluster_settings )
542
+ paths = [node ._settings ['path.data' ] for node in cluster .nodes ()]
543
+ cluster .start ()
544
+ with connect (cluster .node ().http_url , error_trace = True ) as conn :
545
+ c = conn .cursor ()
546
+ if not prev_version :
547
+ c .execute (self .CREATE_REPOSITORY )
548
+ c .execute (CREATE_ANALYZER )
549
+ c .execute (CREATE_DOC_TABLE )
550
+ insert_data (conn , 'doc' , 't1' , num_docs )
551
+ else :
552
+ c .execute (self .RESTORE_SNAPSHOT_TPT .format (num_snapshot - 1 ))
553
+ c .execute ('SELECT COUNT(*) FROM t1' )
554
+ rowcount = c .fetchone ()[0 ]
555
+ self .assertEqual (rowcount , num_docs )
556
+ run_selects (c , version )
557
+ c .execute (self .CREATE_SNAPSHOT_TPT .format (num_snapshot ))
558
+ c .execute (self .DROP_DOC_TABLE )
559
+ self ._process_on_stop ()
560
+ prev_version = version
561
+ num_snapshot += 1
0 commit comments