Skip to content

Commit ed12232

Browse files
committed
Test dynamic column names in resultsets
Test for crate/crate#17580
1 parent e0bf903 commit ed12232

File tree

1 file changed

+29
-235
lines changed

1 file changed

+29
-235
lines changed

tests/bwc/test_upgrade.py

+29-235
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
VersionDef('5.6.x', []),
4444
VersionDef('5.7.x', []),
4545
VersionDef('5.8.x', []),
46-
VersionDef('5.9.x', []),
46+
VersionDef('5.9.11', []),
4747
VersionDef('5.10.x', []),
4848
),
4949
(
@@ -56,7 +56,7 @@
5656
VersionDef('5.6.x', []),
5757
VersionDef('5.7.x', []),
5858
VersionDef('5.8.x', []),
59-
VersionDef('5.9.x', []),
59+
VersionDef('5.9.11', []),
6060
VersionDef('5.10.x', []),
6161
VersionDef('5.10', []),
6262
VersionDef('latest-nightly', [])
@@ -180,10 +180,11 @@ class StorageCompatibilityTest(NodeProvider, unittest.TestCase):
180180
}
181181

182182
def test_upgrade_paths(self):
183+
#path = (VersionDef(version='5.4.x', java_home=[]), VersionDef(version='5.7.x', java_home=[]), VersionDef(version='5.9.11', java_home=[]))
183184
for path in get_test_paths():
184185
try:
185186
self.setUp()
186-
self._test_upgrade_path(path, nodes=3)
187+
self._test_upgrade_path(path, nodes=1)
187188
finally:
188189
self.tearDown()
189190

@@ -258,20 +259,22 @@ def _do_upgrade(self,
258259
assert_busy(lambda: self.assert_green(conn, 'blob', 'b1'))
259260
self.assertIsNotNone(container.get(digest))
260261

262+
accumulated_dynamic_column_names: list[str] = []
261263
self._process_on_stop()
262264
for version_def in versions[1:]:
263265
timestamp = datetime.utcnow().isoformat(timespec='seconds')
264266
print(f"{timestamp} Upgrade to: {version_def.version}")
265-
self.assert_data_persistence(version_def, nodes, digest, paths)
267+
self.assert_data_persistence(version_def, nodes, digest, paths, accumulated_dynamic_column_names)
266268
# restart with latest version
267269
version_def = versions[-1]
268-
self.assert_data_persistence(version_def, nodes, digest, paths)
270+
self.assert_data_persistence(version_def, nodes, digest, paths, accumulated_dynamic_column_names)
269271

270272
def assert_data_persistence(self,
271273
version_def: VersionDef,
272274
nodes: int,
273275
digest: str,
274-
paths: Iterable[str]):
276+
paths: Iterable[str],
277+
accumulated_dynamic_column_names: list[str]):
275278
env = prepare_env(version_def.java_home)
276279
version = version_def.version
277280
cluster = self._new_cluster(version, nodes, data_paths=paths, settings=self.CLUSTER_SETTINGS, env=env)
@@ -303,15 +306,27 @@ def assert_data_persistence(self,
303306
cursor.execute(f'select * from versioned."{table}"')
304307
cursor.execute(f'insert into versioned."{table}" (id, col_int) values (?, ?)', [str(uuid4()), 1])
305308

309+
# to trigger `alter` stmt bug(https://github.com/crate/crate/pull/17178) that falsely updated the table's
310+
# version created setting
311+
cursor.execute('ALTER TABLE doc.parted SET ("refresh_interval" = 900)')
312+
306313
# older versions had a bug that caused this to fail
307-
if version in ('latest-nightly', '3.2'):
308-
# Test that partition and dynamic columns can be created
309-
obj = {"t_" + version.replace('.', '_'): True}
310-
args = (str(uuid4()), version, obj)
311-
cursor.execute(
312-
'INSERT INTO doc.parted (id, version, cols) values (?, ?, ?)',
313-
args
314-
)
314+
# Test that partition and dynamic columns can be created
315+
key = "t_" + version.replace('.', '_')
316+
obj = {key: True}
317+
args = (str(uuid4()), version, obj)
318+
cursor.execute(
319+
'INSERT INTO doc.parted (id, version, cols) values (?, ?, ?)',
320+
args
321+
)
322+
cursor.execute('REFRESH TABLE doc.parted')
323+
accumulated_dynamic_column_names.append(key)
324+
cursor.execute('SELECT cols from doc.parted')
325+
result = cursor.fetchall()
326+
for row in result:
327+
if row[0] is None:
328+
for name in row[0].keys():
329+
self.assertIn(name, accumulated_dynamic_column_names)
315330
self._process_on_stop()
316331

317332
def assert_green(self, conn: Connection, schema: str, table_name: str):
@@ -325,224 +340,3 @@ def assert_nodes(self, conn: Connection, num_nodes: int):
325340
c = conn.cursor()
326341
c.execute("select count(*) from sys.nodes")
327342
self.assertEqual(c.fetchone()[0], num_nodes)
328-
329-
330-
class MetaDataCompatibilityTest(NodeProvider, unittest.TestCase):
331-
332-
CLUSTER_SETTINGS = {
333-
'license.enterprise': 'true',
334-
'lang.js.enabled': 'true',
335-
'cluster.name': gen_id(),
336-
}
337-
338-
SUPPORTED_VERSIONS = (
339-
VersionDef('2.3.x', []),
340-
VersionDef('3.3.x', []),
341-
VersionDef('latest-nightly', [])
342-
)
343-
344-
def test_metadata_compatibility(self):
345-
nodes = 3
346-
347-
cluster = self._new_cluster(
348-
self.SUPPORTED_VERSIONS[0].version,
349-
nodes,
350-
settings=self.CLUSTER_SETTINGS,
351-
explicit_discovery=False
352-
)
353-
cluster.start()
354-
with connect(cluster.node().http_url, error_trace=True) as conn:
355-
cursor = conn.cursor()
356-
cursor.execute('''
357-
CREATE USER user_a;
358-
''')
359-
cursor.execute('''
360-
GRANT ALL PRIVILEGES ON SCHEMA doc TO user_a;
361-
''')
362-
cursor.execute('''
363-
CREATE FUNCTION fact(LONG)
364-
RETURNS LONG
365-
LANGUAGE JAVASCRIPT
366-
AS 'function fact(a) { return a < 2 ? 0 : a * (a - 1); }';
367-
''')
368-
self._process_on_stop()
369-
370-
paths = [node._settings['path.data'] for node in cluster.nodes()]
371-
372-
for version_def in self.SUPPORTED_VERSIONS[1:]:
373-
self.assert_meta_data(version_def, nodes, paths)
374-
375-
# restart with latest version
376-
self.assert_meta_data(self.SUPPORTED_VERSIONS[-1], nodes, paths)
377-
378-
def assert_meta_data(self, version_def, nodes, data_paths=None):
379-
cluster = self._new_cluster(
380-
version_def.version,
381-
nodes,
382-
data_paths,
383-
self.CLUSTER_SETTINGS,
384-
prepare_env(version_def.java_home),
385-
explicit_discovery=False
386-
)
387-
cluster.start()
388-
with connect(cluster.node().http_url, error_trace=True) as conn:
389-
cursor = conn.cursor()
390-
cursor.execute('''
391-
SELECT name, superuser
392-
FROM sys.users
393-
ORDER BY superuser, name;
394-
''')
395-
rs = cursor.fetchall()
396-
self.assertEqual(['user_a', False], rs[0])
397-
self.assertEqual(['crate', True], rs[1])
398-
cursor.execute('''
399-
SELECT fact(100);
400-
''')
401-
self.assertEqual(9900, cursor.fetchone()[0])
402-
cursor.execute('''
403-
SELECT class, grantee, ident, state, type
404-
FROM sys.privileges
405-
ORDER BY class, grantee, ident, state, type
406-
''')
407-
self.assertEqual([['SCHEMA', 'user_a', 'doc', 'GRANT', 'DDL'],
408-
['SCHEMA', 'user_a', 'doc', 'GRANT', 'DML'],
409-
['SCHEMA', 'user_a', 'doc', 'GRANT', 'DQL']],
410-
cursor.fetchall())
411-
412-
self._process_on_stop()
413-
414-
415-
class DefaultTemplateMetaDataCompatibilityTest(NodeProvider, unittest.TestCase):
416-
CLUSTER_ID = gen_id()
417-
418-
CLUSTER_SETTINGS = {
419-
'cluster.name': CLUSTER_ID,
420-
}
421-
422-
SUPPORTED_VERSIONS = (
423-
VersionDef('3.0.x', []),
424-
VersionDef('latest-nightly', [])
425-
)
426-
427-
def test_metadata_compatibility(self):
428-
nodes = 3
429-
430-
cluster = self._new_cluster(self.SUPPORTED_VERSIONS[0].version,
431-
nodes,
432-
settings=self.CLUSTER_SETTINGS)
433-
cluster.start()
434-
with connect(cluster.node().http_url, error_trace=True) as conn:
435-
cursor = conn.cursor()
436-
cursor.execute("select 1")
437-
self._process_on_stop()
438-
439-
paths = [node._settings['path.data'] for node in cluster.nodes()]
440-
for version_def in self.SUPPORTED_VERSIONS[1:]:
441-
self.assert_dynamic_string_detection(version_def, nodes, paths)
442-
443-
def assert_dynamic_string_detection(self, version_def, nodes, data_paths):
444-
""" Test that a dynamic string column detection works as expected.
445-
446-
If the cluster was initially created/started with a lower CrateDB
447-
version, we must ensure that our default template is also upgraded, if
448-
needed, because it is persisted in the cluster state. That's why
449-
re-creating tables would not help.
450-
"""
451-
self._move_nodes_folder_if_needed(data_paths)
452-
cluster = self._new_cluster(
453-
version_def.version,
454-
nodes,
455-
data_paths,
456-
self.CLUSTER_SETTINGS,
457-
prepare_env(version_def.java_home),
458-
)
459-
cluster.start()
460-
with connect(cluster.node().http_url, error_trace=True) as conn:
461-
cursor = conn.cursor()
462-
cursor.execute('CREATE TABLE t1 (o object)')
463-
cursor.execute('''INSERT INTO t1 (o) VALUES ({"name" = 'foo'})''')
464-
self.assertEqual(cursor.rowcount, 1)
465-
cursor.execute('REFRESH TABLE t1')
466-
cursor.execute("SELECT o['name'], count(*) FROM t1 GROUP BY 1")
467-
rs = cursor.fetchall()
468-
self.assertEqual(['foo', 1], rs[0])
469-
cursor.execute('DROP TABLE t1')
470-
self._process_on_stop()
471-
472-
def _move_nodes_folder_if_needed(self, data_paths):
473-
"""Eliminates the cluster-id folder inside the data directory."""
474-
for path in data_paths:
475-
data_path_incl_cluster_id = os.path.join(path, self.CLUSTER_ID)
476-
if os.path.exists(data_path_incl_cluster_id):
477-
src_path_nodes = os.path.join(data_path_incl_cluster_id, 'nodes')
478-
target_path_nodes = os.path.join(self._path_data, 'nodes')
479-
shutil.move(src_path_nodes, target_path_nodes)
480-
shutil.rmtree(data_path_incl_cluster_id)
481-
482-
483-
class SnapshotCompatibilityTest(NodeProvider, unittest.TestCase):
484-
485-
CREATE_REPOSITORY = '''
486-
CREATE REPOSITORY r1 TYPE S3
487-
WITH (access_key = 'minio',
488-
secret_key = 'miniostorage',
489-
bucket='backups',
490-
endpoint = '127.0.0.1:9000',
491-
protocol = 'http')
492-
'''
493-
494-
CREATE_SNAPSHOT_TPT = "CREATE SNAPSHOT r1.s{} ALL WITH (wait_for_completion = true)"
495-
496-
RESTORE_SNAPSHOT_TPT = "RESTORE SNAPSHOT r1.s{} ALL WITH (wait_for_completion = true)"
497-
498-
DROP_DOC_TABLE = 'DROP TABLE t1'
499-
500-
VERSION = ('5.0.x', 'latest-nightly')
501-
502-
def test_snapshot_compatibility(self):
503-
"""Test snapshot compatibility when upgrading 5.0.x -> latest-nightly
504-
505-
Using Minio as a S3 repository, the first cluster that runs
506-
creates the repo, a table and inserts/selects some data, which
507-
then is snapshotted and deleted. The next cluster recovers the
508-
data from the last snapshot, performs further inserts/selects,
509-
to then snapshot the data and delete it.
510-
"""
511-
with MinioServer() as minio:
512-
t = threading.Thread(target=minio.run)
513-
t.daemon = True
514-
t.start()
515-
wait_until(lambda: _is_up('127.0.0.1', 9000))
516-
517-
num_nodes = 3
518-
num_docs = 30
519-
prev_version = None
520-
num_snapshot = 1
521-
522-
cluster_settings = {
523-
'cluster.name': gen_id(),
524-
}
525-
526-
paths = None
527-
for version in self.VERSION:
528-
cluster = self._new_cluster(version, num_nodes, paths, settings=cluster_settings)
529-
paths = [node._settings['path.data'] for node in cluster.nodes()]
530-
cluster.start()
531-
with connect(cluster.node().http_url, error_trace=True) as conn:
532-
c = conn.cursor()
533-
if not prev_version:
534-
c.execute(self.CREATE_REPOSITORY)
535-
c.execute(CREATE_ANALYZER)
536-
c.execute(CREATE_DOC_TABLE)
537-
insert_data(conn, 'doc', 't1', num_docs)
538-
else:
539-
c.execute(self.RESTORE_SNAPSHOT_TPT.format(num_snapshot - 1))
540-
c.execute('SELECT COUNT(*) FROM t1')
541-
rowcount = c.fetchone()[0]
542-
self.assertEqual(rowcount, num_docs)
543-
run_selects(c, version)
544-
c.execute(self.CREATE_SNAPSHOT_TPT.format(num_snapshot))
545-
c.execute(self.DROP_DOC_TABLE)
546-
self._process_on_stop()
547-
prev_version = version
548-
num_snapshot += 1

0 commit comments

Comments
 (0)