From e831817e3392e1c13d6d9b71c1fb9262913a7eae Mon Sep 17 00:00:00 2001 From: Oleg Strokachuk Date: Mon, 3 Jan 2022 22:52:21 +0300 Subject: [PATCH] Mutlistatement --- README.md | 38 ++++++++++++------- setup.cfg | 3 +- src/clickhouse_migrations/__init__.py | 2 +- .../clickhouse_cluster.py | 9 ++++- src/clickhouse_migrations/cmd.py | 9 ++++- src/clickhouse_migrations/exceptions.py | 7 ++++ src/clickhouse_migrations/migrator.py | 26 ++++++++++--- src/clickhouse_migrations/types.py | 10 +++-- src/tests/complex_migrations/001_init.sql | 2 + src/tests/complex_migrations/002_test2.sql | 5 +++ .../complex_migrations/003_third_test.sql | 8 ++++ .../010_migrations_is_not_in_row.sql | 2 + ...1__create_test.sql => 001_create_test.sql} | 0 .../001_create_test.sql | 5 +++ src/tests/test_clickhouse_migration.py | 25 +++++++++--- 15 files changed, 119 insertions(+), 32 deletions(-) create mode 100644 src/clickhouse_migrations/exceptions.py create mode 100644 src/tests/complex_migrations/001_init.sql create mode 100644 src/tests/complex_migrations/002_test2.sql create mode 100644 src/tests/complex_migrations/003_third_test.sql create mode 100644 src/tests/complex_migrations/010_migrations_is_not_in_row.sql rename src/tests/migrations/{V1__create_test.sql => 001_create_test.sql} (100%) create mode 100644 src/tests/multi_statements_migrations/001_create_test.sql diff --git a/README.md b/README.md index 8955b7f..c5019af 100644 --- a/README.md +++ b/README.md @@ -3,29 +3,39 @@ [![supported versions](https://img.shields.io/pypi/pyversions/clickhouse-migrations.svg)](https://pypi.org/project/clickhouse-migrations/) [![my site](https://img.shields.io/badge/site-my%20blog-yellow.svg)](https://zifter.github.io/) -## Clickhouse Migrator +# Clickhouse Migrations [Clickhouse](https://clickhouse.tech/) is known for its scale to store and fetch large datasets. Development and Maintenance of large-scale db systems many times requires constant changes to the actual DB system. Holding off the scripts to migrate these will be painful. -We found there is nothing existing earlier and developed one inspired by, [Flyway](https://flywaydb.org/), [Alembic](https://alembic.sqlalchemy.org/en/latest/) +## Features: +* Supports multi statements - more than one query per migration file. +* Allow running migrations out-of-box +* Simple file migrations format: {VERSION}_{name}.sql -This is a python library, which you can execute as a pre-hook using sys python. -Or as a migration framework before deployment/server-startup in your application as required. - -### Installation +## Installation You can install from pypi using `pip install clickhouse-migrations`. -### Usage +## Usage + +### In command line +```bash +clickhouse-migrations --db-host localhost \ + --db-user default \ + --db-password secret \ + --db-name test \ + --migrations-dir ./migrations +``` +### In code ```python from clickhouse_migrations.clickhouse_cluster import ClickhouseCluster cluster = ClickhouseCluster(db_host, db_user, db_password) -cluster.migrate(db_name, migrations_home, create_db_if_no_exists) +cluster.migrate(db_name, migrations_home, create_db_if_no_exists=True, multi_statement=True) ``` Parameter | Description | Default @@ -36,10 +46,12 @@ db_password | ***** | **** db_name| Clickhouse database name | None migrations_home | Path to list of migration files | create_db_if_no_exists | If the `db_name` is not present, enabling this will create the db | True +multi_statement | Allow multiple statements in migration files | True -### Folder and Migration file patterns - -The filenames are pretty similar to how `flyway` keeps it. +### Notes +The Clickhouse driver does not natively support executing multipe statements in a single query. +To allow for multiple statements in a single migration, you can use the multi_statement param. +There are two important caveats: +* This mode splits the migration text into separately-executed statements by a semi-colon ;. Thus cannot be used when a statement in the migration contains a string with a semi-colon. +* The queries are not executed in any sort of transaction/batch, meaning you are responsible for fixing partial migrations. -Your first version filename should be prefixed with `V1__` (double underscore) -These migrations are executed one by one, failures in between will stop and not further version files will be executed. diff --git a/setup.cfg b/setup.cfg index 5e4b395..6e6a8bf 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,7 +2,7 @@ name = clickhouse-migrations version = attr: clickhouse_migrations.__version__ author = Oleg Strokachuk -author_email = zifter.ai+clickhouse_migrations@gmail.com +author_email = zifter.ai+clickhouse-migrations@gmail.com home_page = https://github.com/zifter/clickhouse-migrations description = attr: clickhouse_migrations.__doc__ long_description = file: README.md @@ -13,6 +13,7 @@ platform = any keywords = clickhouse, migrations classifiers = Programming Language :: Python + Programming Language :: Python :: 3.6 Programming Language :: Python :: 3.7 Programming Language :: Python :: 3.8 Programming Language :: Python :: 3.9 diff --git a/src/clickhouse_migrations/__init__.py b/src/clickhouse_migrations/__init__.py index 42d5c08..1959962 100644 --- a/src/clickhouse_migrations/__init__.py +++ b/src/clickhouse_migrations/__init__.py @@ -1,4 +1,4 @@ """ Simple file-based migrations for clickhouse """ -__version__ = "0.1.5" +__version__ = "0.2.0" diff --git a/src/clickhouse_migrations/clickhouse_cluster.py b/src/clickhouse_migrations/clickhouse_cluster.py index ec9e28b..923c46e 100644 --- a/src/clickhouse_migrations/clickhouse_cluster.py +++ b/src/clickhouse_migrations/clickhouse_cluster.py @@ -44,12 +44,16 @@ def migrate( db_name: str, migration_path: Path, create_db_if_no_exists: bool = True, + multi_statement: bool = True, ): storage = MigrationStorage(migration_path) migrations = storage.migrations() return self.apply_migrations( - db_name, migrations, create_db_if_no_exists=create_db_if_no_exists + db_name, + migrations, + create_db_if_no_exists=create_db_if_no_exists, + multi_statement=multi_statement, ) def apply_migrations( @@ -57,6 +61,7 @@ def apply_migrations( db_name: str, migrations: List[Migration], create_db_if_no_exists: bool = True, + multi_statement: bool = True, ) -> List[Migration]: if create_db_if_no_exists: @@ -67,4 +72,4 @@ def apply_migrations( with self.connection(db_name) as conn: migrator = Migrator(conn) migrator.init_schema() - return migrator.apply_migration(migrations) + return migrator.apply_migration(migrations, multi_statement) diff --git a/src/clickhouse_migrations/cmd.py b/src/clickhouse_migrations/cmd.py index 5bda77a..e8290e9 100644 --- a/src/clickhouse_migrations/cmd.py +++ b/src/clickhouse_migrations/cmd.py @@ -41,13 +41,20 @@ def get_context(args): default=os.environ.get("MIGRATIONS_DIR", MIGRATIONS_DIR), help="Path to list of migration files", ) + parser.add_argument( + "--multi-statement", + default=os.environ.get("MULTI_STATEMENT", "1"), + help="Path to list of migration files", + ) return parser.parse_args(args) def migrate(context) -> int: cluster = ClickhouseCluster(context.db_host, context.db_user, context.db_password) - cluster.migrate(context.db_name, Path(context.migrations_dir)) + cluster.migrate( + context.db_name, Path(context.migrations_dir), int(context.multi_statement) == 1 + ) return 0 diff --git a/src/clickhouse_migrations/exceptions.py b/src/clickhouse_migrations/exceptions.py new file mode 100644 index 0000000..3634a29 --- /dev/null +++ b/src/clickhouse_migrations/exceptions.py @@ -0,0 +1,7 @@ +class MigrationException(Exception): + pass + + +__all__ = [ + "MigrationException", +] diff --git a/src/clickhouse_migrations/migrator.py b/src/clickhouse_migrations/migrator.py index 782cd26..745efe7 100644 --- a/src/clickhouse_migrations/migrator.py +++ b/src/clickhouse_migrations/migrator.py @@ -4,6 +4,7 @@ import pandas from clickhouse_driver import Client +from clickhouse_migrations.exceptions import MigrationException from clickhouse_migrations.types import Migration @@ -36,7 +37,7 @@ def migrations_to_apply(self, migrations: List[Migration]) -> List[Migration]: incoming = pandas.DataFrame(migrations) if len(incoming) == 0 or len(incoming) < len(applied_migrations): - raise AssertionError( + raise MigrationException( "Migrations have gone missing, " "your code base should not truncate migrations, " "use migrations to correct older migrations" @@ -51,7 +52,7 @@ def migrations_to_apply(self, migrations: List[Migration]) -> List[Migration]: exec_stat.c_md5.notnull() & exec_stat.md5.isnull() ] if len(committed_and_absconded) > 0: - raise AssertionError( + raise MigrationException( "Migrations have gone missing, " "your code base should not truncate migrations, " "use migrations to correct older migrations" @@ -64,21 +65,27 @@ def migrations_to_apply(self, migrations: List[Migration]) -> List[Migration]: ) terms_violated = exec_stat[index] if len(terms_violated) > 0: - raise AssertionError( + raise MigrationException( "Do not edit migrations once run, " "use migrations to correct older migrations" ) versions_to_apply = exec_stat[exec_stat.c_md5.isnull()][["version"]] return [m for m in migrations if m.version in versions_to_apply.values] - def apply_migration(self, migrations: List[Migration]) -> List[Migration]: + def apply_migration( + self, migrations: List[Migration], multi_statement + ) -> List[Migration]: new_migrations = self.migrations_to_apply(migrations) if not new_migrations: return [] for migration in new_migrations: logging.info("Execute migration %s", migration) - self._conn.execute(migration.script) + + statements = self.script_to_statements(migration.script, multi_statement) + for statement in statements: + statement = statement.strip() + self._conn.execute(statement) logging.info("Migration applied") @@ -94,3 +101,12 @@ def apply_migration(self, migrations: List[Migration]) -> List[Migration]: ) return new_migrations + + @classmethod + def script_to_statements(cls, script: str, multi_statement) -> List[str]: + script = script.strip() + + if multi_statement: + return [m.strip() for m in script.split(";") if m] + + return [script] diff --git a/src/clickhouse_migrations/types.py b/src/clickhouse_migrations/types.py index 9aa6d42..a3b39be 100644 --- a/src/clickhouse_migrations/types.py +++ b/src/clickhouse_migrations/types.py @@ -12,23 +12,25 @@ def __init__(self, storage_dir: Path): self.storage_dir: Path = storage_dir def filenames(self) -> List[Path]: - migrations: List[Path] = [] + l: List[Path] = [] for f in os.scandir(self.storage_dir): if f.name.endswith(".sql"): - migrations.append(self.storage_dir / f.name) + l.append(self.storage_dir / f.name) - return migrations + return l def migrations(self) -> List[Migration]: migrations: List[Migration] = [] for full_path in self.filenames(): migration = Migration( - version=int(full_path.name.split("_")[0].replace("V", "")), + version=int(full_path.name.split("_")[0]), script=str(full_path.read_text(encoding="utf8")), md5=hashlib.md5(full_path.read_bytes()).hexdigest(), ) migrations.append(migration) + migrations.sort(key=lambda m: m.version) + return migrations diff --git a/src/tests/complex_migrations/001_init.sql b/src/tests/complex_migrations/001_init.sql new file mode 100644 index 0000000..9cf62fa --- /dev/null +++ b/src/tests/complex_migrations/001_init.sql @@ -0,0 +1,2 @@ +CREATE TABLE sample11(id UInt32, name String) ENGINE MergeTree PARTITION BY tuple() +ORDER BY tuple(); diff --git a/src/tests/complex_migrations/002_test2.sql b/src/tests/complex_migrations/002_test2.sql new file mode 100644 index 0000000..ff89af6 --- /dev/null +++ b/src/tests/complex_migrations/002_test2.sql @@ -0,0 +1,5 @@ +CREATE TABLE sample21(id UInt32, name String) ENGINE MergeTree PARTITION BY tuple() +ORDER BY tuple(); + +CREATE TABLE sample22(id UInt32, name String) ENGINE MergeTree PARTITION BY tuple() +ORDER BY tuple(); \ No newline at end of file diff --git a/src/tests/complex_migrations/003_third_test.sql b/src/tests/complex_migrations/003_third_test.sql new file mode 100644 index 0000000..c24def6 --- /dev/null +++ b/src/tests/complex_migrations/003_third_test.sql @@ -0,0 +1,8 @@ +CREATE TABLE sample31(id UInt32, name String) ENGINE MergeTree PARTITION BY tuple() +ORDER BY tuple(); + +CREATE TABLE sample32(id UInt32, name String) ENGINE MergeTree PARTITION BY tuple() +ORDER BY tuple(); + +CREATE TABLE sample33(id UInt32, name String) ENGINE MergeTree PARTITION BY tuple() +ORDER BY tuple() \ No newline at end of file diff --git a/src/tests/complex_migrations/010_migrations_is_not_in_row.sql b/src/tests/complex_migrations/010_migrations_is_not_in_row.sql new file mode 100644 index 0000000..e8525f9 --- /dev/null +++ b/src/tests/complex_migrations/010_migrations_is_not_in_row.sql @@ -0,0 +1,2 @@ +CREATE TABLE sample101(id UInt32, name String) ENGINE MergeTree PARTITION BY tuple() +ORDER BY tuple() \ No newline at end of file diff --git a/src/tests/migrations/V1__create_test.sql b/src/tests/migrations/001_create_test.sql similarity index 100% rename from src/tests/migrations/V1__create_test.sql rename to src/tests/migrations/001_create_test.sql diff --git a/src/tests/multi_statements_migrations/001_create_test.sql b/src/tests/multi_statements_migrations/001_create_test.sql new file mode 100644 index 0000000..fa8f238 --- /dev/null +++ b/src/tests/multi_statements_migrations/001_create_test.sql @@ -0,0 +1,5 @@ +CREATE TABLE sample(id UInt32, name String) ENGINE MergeTree PARTITION BY tuple() +ORDER BY tuple(); + +CREATE TABLE sample2(id UInt32, name String) ENGINE MergeTree PARTITION BY tuple() +ORDER BY tuple(); \ No newline at end of file diff --git a/src/tests/test_clickhouse_migration.py b/src/tests/test_clickhouse_migration.py index 6a8d6a4..6a2d254 100644 --- a/src/tests/test_clickhouse_migration.py +++ b/src/tests/test_clickhouse_migration.py @@ -2,9 +2,11 @@ from pathlib import Path import pytest +from clickhouse_driver.errors import ServerException from clickhouse_migrations.clickhouse_cluster import ClickhouseCluster from clickhouse_migrations.cmd import get_context, migrate +from clickhouse_migrations.exceptions import MigrationException from clickhouse_migrations.types import Migration TESTS_DIR = Path(__file__).parent @@ -42,7 +44,7 @@ def test_deleted_migrations_exception(cluster): [{"version": 1, "script": "location_to_script", "md5": "1234"}], ) - with pytest.raises(AssertionError): + with pytest.raises(MigrationException): cluster.apply_migrations("pytest", []) @@ -59,7 +61,7 @@ def test_missing_migration_exception(cluster): Migration(version=2, md5="12345", script="location_to_script"), ] - with pytest.raises(AssertionError): + with pytest.raises(MigrationException): cluster.apply_migrations("pytest", migrations) @@ -76,7 +78,7 @@ def test_modified_committed_migrations_exception(cluster): Migration(version=1, md5="12345", script="location_to_script"), ] - with pytest.raises(AssertionError): + with pytest.raises(MigrationException): cluster.apply_migrations("pytest", migrations) @@ -143,12 +145,25 @@ def test_should_migrate_empty_database(cluster): def test_migrations_folder_is_empty_ok(cluster): - clean_slate(cluster) - with tempfile.TemporaryDirectory("empty_dir") as d: cluster.migrate("pytest", d) +def test_multi_statement_migration_by_default_ok(cluster): + cluster.migrate("pytest", TESTS_DIR / "multi_statements_migrations") + + +def test_multi_statement_migration_disabled_ok(cluster): + with pytest.raises(ServerException): + cluster.migrate( + "pytest", TESTS_DIR / "multi_statements_migrations", multi_statement=False + ) + + +def test_complex_ok(cluster): + cluster.migrate("pytest", TESTS_DIR / "complex_migrations") + + def test_main_pass_db_name_ok(): migrate( get_context(