diff --git a/README.md b/README.md index d4a48f7..8955b7f 100644 --- a/README.md +++ b/README.md @@ -22,10 +22,10 @@ You can install from pypi using `pip install clickhouse-migrations`. ### Usage ```python -from migration_lib.migrate import migrate +from clickhouse_migrations.clickhouse_cluster import ClickhouseCluster -migrator = Migrator(db_host, db_user, db_password) -migrator.migrate(db_name, migrations_home, create_db_if_no_exists) +cluster = ClickhouseCluster(db_host, db_user, db_password) +cluster.migrate(db_name, migrations_home, create_db_if_no_exists) ``` Parameter | Description | Default diff --git a/src/clickhouse_migrations/__init__.py b/src/clickhouse_migrations/__init__.py index 895cc02..42d5c08 100644 --- a/src/clickhouse_migrations/__init__.py +++ b/src/clickhouse_migrations/__init__.py @@ -1,4 +1,4 @@ """ Simple file-based migrations for clickhouse """ -__version__ = "0.1.4" +__version__ = "0.1.5" diff --git a/src/clickhouse_migrations/clickhouse_cluster.py b/src/clickhouse_migrations/clickhouse_cluster.py new file mode 100644 index 0000000..ec9e28b --- /dev/null +++ b/src/clickhouse_migrations/clickhouse_cluster.py @@ -0,0 +1,70 @@ +import logging +from pathlib import Path +from typing import List + +from clickhouse_driver import Client + +from clickhouse_migrations.defaults import DB_HOST, DB_PASSWORD, DB_USER +from clickhouse_migrations.migrator import Migrator +from clickhouse_migrations.types import Migration, MigrationStorage + + +class ClickhouseCluster: + def __init__( + self, + db_host: str = DB_HOST, + db_user: str = DB_USER, + db_password: str = DB_PASSWORD, + ): + self.db_host = db_host + self.db_user = db_user + self.db_password = db_password + + def connection(self, db_name: str) -> Client: + return Client( + self.db_host, user=self.db_user, password=self.db_password, database=db_name + ) + + def create_db(self, db_name): + with self.connection("") as conn: + conn.execute(f"CREATE DATABASE IF NOT EXISTS {db_name}") + + def init_schema(self, db_name): + with self.connection(db_name) as conn: + migrator = Migrator(conn) + migrator.init_schema() + + def show_tables(self, db_name): + with self.connection(db_name) as conn: + result = conn.execute("show tables") + return [t[0] for t in result] + + def migrate( + self, + db_name: str, + migration_path: Path, + create_db_if_no_exists: bool = True, + ): + storage = MigrationStorage(migration_path) + migrations = storage.migrations() + + return self.apply_migrations( + db_name, migrations, create_db_if_no_exists=create_db_if_no_exists + ) + + def apply_migrations( + self, + db_name: str, + migrations: List[Migration], + create_db_if_no_exists: bool = True, + ) -> List[Migration]: + + if create_db_if_no_exists: + self.create_db(db_name) + + logging.info("Total migrations to apply: %d", len(migrations)) + + with self.connection(db_name) as conn: + migrator = Migrator(conn) + migrator.init_schema() + return migrator.apply_migration(migrations) diff --git a/src/clickhouse_migrations/cmd.py b/src/clickhouse_migrations/cmd.py index 2dc3f0b..5bda77a 100644 --- a/src/clickhouse_migrations/cmd.py +++ b/src/clickhouse_migrations/cmd.py @@ -3,6 +3,7 @@ from argparse import ArgumentParser from pathlib import Path +from clickhouse_migrations.clickhouse_cluster import ClickhouseCluster from clickhouse_migrations.defaults import ( DB_HOST, DB_NAME, @@ -10,7 +11,6 @@ DB_USER, MIGRATIONS_DIR, ) -from clickhouse_migrations.migrator import Migrator def get_context(args): @@ -46,8 +46,8 @@ def get_context(args): def migrate(context) -> int: - migrator = Migrator(context.db_host, context.db_user, context.db_password) - migrator.migrate(context.db_name, Path(context.migrations_dir)) + cluster = ClickhouseCluster(context.db_host, context.db_user, context.db_password) + cluster.migrate(context.db_name, Path(context.migrations_dir)) return 0 diff --git a/src/clickhouse_migrations/migrate.py b/src/clickhouse_migrations/migrate.py deleted file mode 100644 index 355e956..0000000 --- a/src/clickhouse_migrations/migrate.py +++ /dev/null @@ -1,73 +0,0 @@ -import pandas as pd - - -def execute_and_inflate(client, query): - result = client.execute(query, with_column_types=True) - column_names = [c[0] for c in result[len(result) - 1]] - return pd.DataFrame([dict(zip(column_names, d)) for d in result[0]]) - - -def migrations_to_apply(client, incoming): - current_versions = execute_and_inflate( - client, - "SELECT version AS version, script AS c_script, md5 as c_md5 from schema_versions", - ) - if current_versions.empty: - return incoming - if len(incoming) == 0 or len(incoming) < len(current_versions): - raise AssertionError( - "Migrations have gone missing, " - "your code base should not truncate migrations, " - "use migrations to correct older migrations" - ) - - current_versions = current_versions.astype({"version": "int32"}) - incoming = incoming.astype({"version": "int32"}) - exec_stat = pd.merge(current_versions, incoming, on="version", how="outer") - committed_and_absconded = exec_stat[ - exec_stat.c_md5.notnull() & exec_stat.md5.isnull() - ] - if len(committed_and_absconded) > 0: - raise AssertionError( - "Migrations have gone missing, " - "your code base should not truncate migrations, " - "use migrations to correct older migrations" - ) - - index = ( - exec_stat.c_md5.notnull() - & exec_stat.md5.notnull() - & ~(exec_stat.md5 == exec_stat.c_md5) - ) - terms_violated = exec_stat[index] - if len(terms_violated) > 0: - raise AssertionError( - "Do not edit migrations once run, " - "use migrations to correct older migrations" - ) - return exec_stat[exec_stat.c_md5.isnull()][["version", "script", "md5"]] - - -def apply_migration(client, migrations): - if migrations.empty: - return - - migrations = migrations.sort_values("version") - for _, row in migrations.iterrows(): - with open(row["script"], "r", encoding="utf-8") as f: - migration_script = f.read() - client.execute(migration_script) - print( - f"INSERT INTO schema_versions(version, script, md5) " - f"VALUES({row['version']}, '{row['script']}', '{row['md5']}')" - ) - client.execute( - "INSERT INTO schema_versions(version, script, md5) VALUES", - [ - { - "version": row["version"], - "script": row["script"], - "md5": row["md5"], - } - ], - ) diff --git a/src/clickhouse_migrations/migrator.py b/src/clickhouse_migrations/migrator.py index a691480..782cd26 100644 --- a/src/clickhouse_migrations/migrator.py +++ b/src/clickhouse_migrations/migrator.py @@ -1,37 +1,18 @@ import logging -from pathlib import Path +from typing import List -import pandas as pd +import pandas from clickhouse_driver import Client -from .defaults import DB_HOST, DB_PASSWORD, DB_USER -from .migrate import apply_migration, migrations_to_apply -from .types import MigrationStorage +from clickhouse_migrations.types import Migration class Migrator: - def __init__( - self, - db_host: str = DB_HOST, - db_user: str = DB_USER, - db_password: str = DB_PASSWORD, - ): - self.db_host = db_host - self.db_user = db_user - self.db_password = db_password - - def connection(self, db_name: str) -> Client: - return Client( - self.db_host, user=self.db_user, password=self.db_password, database=db_name - ) - - def create_db(self, db_name): - with self.connection("") as conn: - conn.execute(f"CREATE DATABASE IF NOT EXISTS {db_name}") + def __init__(self, conn: Client): + self._conn: Client = conn - @classmethod - def init_schema(cls, conn): - conn.execute( + def init_schema(self): + self._conn.execute( "CREATE TABLE IF NOT EXISTS schema_versions (" "version UInt32, " "md5 String, " @@ -40,20 +21,76 @@ def init_schema(cls, conn): ") ENGINE = MergeTree ORDER BY tuple(created_at)" ) - def migrate( - self, - db_name: str, - migration_path: Path, - create_db_if_no_exists: bool = True, - ): - if create_db_if_no_exists: - self.create_db(db_name) + def execute_and_inflate(self, query) -> pandas.DataFrame: + result = self._conn.execute(query, with_column_types=True) + column_names = [c[0] for c in result[len(result) - 1]] + return pandas.DataFrame([dict(zip(column_names, d)) for d in result[0]]) + + def migrations_to_apply(self, migrations: List[Migration]) -> List[Migration]: + applied_migrations = self.execute_and_inflate( + "SELECT version AS version, script AS c_script, md5 as c_md5 from schema_versions", + ) + + if applied_migrations.empty: + return migrations + + incoming = pandas.DataFrame(migrations) + if len(incoming) == 0 or len(incoming) < len(applied_migrations): + raise AssertionError( + "Migrations have gone missing, " + "your code base should not truncate migrations, " + "use migrations to correct older migrations" + ) + + applied_migrations = applied_migrations.astype({"version": "int32"}) + incoming = incoming.astype({"version": "int32"}) + exec_stat = pandas.merge( + applied_migrations, incoming, on="version", how="outer" + ) + committed_and_absconded = exec_stat[ + exec_stat.c_md5.notnull() & exec_stat.md5.isnull() + ] + if len(committed_and_absconded) > 0: + raise AssertionError( + "Migrations have gone missing, " + "your code base should not truncate migrations, " + "use migrations to correct older migrations" + ) + + index = ( + exec_stat.c_md5.notnull() + & exec_stat.md5.notnull() + & ~(exec_stat.md5 == exec_stat.c_md5) + ) + terms_violated = exec_stat[index] + if len(terms_violated) > 0: + raise AssertionError( + "Do not edit migrations once run, " + "use migrations to correct older migrations" + ) + versions_to_apply = exec_stat[exec_stat.c_md5.isnull()][["version"]] + return [m for m in migrations if m.version in versions_to_apply.values] + + def apply_migration(self, migrations: List[Migration]) -> List[Migration]: + new_migrations = self.migrations_to_apply(migrations) + if not new_migrations: + return [] + + for migration in new_migrations: + logging.info("Execute migration %s", migration) + self._conn.execute(migration.script) - storage = MigrationStorage(migration_path) - migrations = storage.migrations() - logging.info("Total migrations: %d", len(migrations)) + logging.info("Migration applied") - with self.connection(db_name) as conn: - self.init_schema(conn) + self._conn.execute( + "INSERT INTO schema_versions(version, script, md5) VALUES", + [ + { + "version": migration.version, + "script": migration.script, + "md5": migration.md5, + } + ], + ) - apply_migration(conn, migrations_to_apply(conn, pd.DataFrame(migrations))) + return new_migrations diff --git a/src/clickhouse_migrations/types.py b/src/clickhouse_migrations/types.py index 334dd02..9aa6d42 100644 --- a/src/clickhouse_migrations/types.py +++ b/src/clickhouse_migrations/types.py @@ -25,7 +25,7 @@ def migrations(self) -> List[Migration]: for full_path in self.filenames(): migration = Migration( version=int(full_path.name.split("_")[0].replace("V", "")), - script=str(full_path), + script=str(full_path.read_text(encoding="utf8")), md5=hashlib.md5(full_path.read_bytes()).hexdigest(), ) diff --git a/src/tests/test_clickhouse_migration.py b/src/tests/test_clickhouse_migration.py index 12a7713..6a8d6a4 100644 --- a/src/tests/test_clickhouse_migration.py +++ b/src/tests/test_clickhouse_migration.py @@ -1,24 +1,23 @@ import tempfile from pathlib import Path -import pandas as pd import pytest +from clickhouse_migrations.clickhouse_cluster import ClickhouseCluster from clickhouse_migrations.cmd import get_context, migrate -from clickhouse_migrations.migrate import execute_and_inflate, migrations_to_apply -from clickhouse_migrations.migrator import Migrator +from clickhouse_migrations.types import Migration TESTS_DIR = Path(__file__).parent @pytest.fixture -def migrator(): - return Migrator("localhost", "default", "") +def cluster(): + return ClickhouseCluster("localhost", "default", "") @pytest.fixture(autouse=True) -def before(migrator): - clean_slate(migrator) +def before(cluster): + clean_slate(cluster) def clean_slate(migrator): @@ -26,90 +25,128 @@ def clean_slate(migrator): conn.execute("DROP DATABASE IF EXISTS pytest") conn.execute("CREATE DATABASE pytest") - with migrator.connection("pytest") as conn: - migrator.init_schema(conn) +def test_empty_list_of_migrations_ok(cluster): + with tempfile.TemporaryDirectory("empty_dir") as d: + applied = cluster.migrate("pytest", d) + + assert len(applied) == 0 -def test_should_compute_no_migrations_to_run(migrator): - with migrator.connection("pytest") as conn: - incoming = pd.DataFrame([]) - results = migrations_to_apply(conn, incoming) - assert results.size == 0 +def test_deleted_migrations_exception(cluster): + cluster.init_schema("pytest") -def test_should_raise_exception_on_deleted_migrations_no_incoming(migrator): - incoming = pd.DataFrame([]) - with migrator.connection("pytest") as conn: + with cluster.connection("pytest") as conn: conn.execute( "INSERT INTO schema_versions(version, script, md5) VALUES", [{"version": 1, "script": "location_to_script", "md5": "1234"}], ) - with pytest.raises(AssertionError): - migrations_to_apply(conn, incoming) + with pytest.raises(AssertionError): + cluster.apply_migrations("pytest", []) -def test_should_raise_exceptions_on_missing_migration(migrator): - with migrator.connection("pytest") as conn: - incoming = pd.DataFrame( - [{"version": 2, "script": "location_to_script", "md5": "12345"}] - ) + +def test_missing_migration_exception(cluster): + cluster.init_schema("pytest") + + with cluster.connection("pytest") as conn: conn.execute( "INSERT INTO schema_versions(version, script, md5) VALUES", [{"version": 1, "script": "location_to_script", "md5": "1234"}], ) - with pytest.raises(AssertionError): - migrations_to_apply(conn, incoming) + migrations = [ + Migration(version=2, md5="12345", script="location_to_script"), + ] -def test_should_raise_exceptions_on_modified_post_committed_migrations(migrator): - with migrator.connection("pytest") as conn: - incoming = pd.DataFrame( - [{"version": 1, "script": "location_to_script", "md5": "12345"}] - ) + with pytest.raises(AssertionError): + cluster.apply_migrations("pytest", migrations) + + +def test_modified_committed_migrations_exception(cluster): + cluster.init_schema("pytest") + + with cluster.connection("pytest") as conn: conn.execute( "INSERT INTO schema_versions(version, script, md5) VALUES", [{"version": 1, "script": "location_to_script", "md5": "1234"}], ) - with pytest.raises(AssertionError): - migrations_to_apply(conn, incoming) + + migrations = [ + Migration(version=1, md5="12345", script="location_to_script"), + ] + + with pytest.raises(AssertionError): + cluster.apply_migrations("pytest", migrations) -def test_should_return_migrations_to_run(migrator): - with migrator.connection("pytest") as conn: - incoming = pd.DataFrame( - [ - {"version": 1, "script": "location_to_script", "md5": "1234"}, - {"version": 2, "script": "location_to_script_2", "md5": "1234"}, - ] +def test_apply_new_migration_ok(cluster): + cluster.init_schema("pytest") + + with cluster.connection("pytest") as conn: + conn.execute( + "INSERT INTO schema_versions(version, script, md5) VALUES", + [{"version": 1, "script": "SHOW TABLES", "md5": "12345"}], ) + + migrations = [ + Migration(version=1, md5="12345", script="SHOW TABLES"), + Migration(version=2, md5="12345", script="SHOW TABLES"), + ] + + results = cluster.apply_migrations("pytest", migrations) + assert len(results) == 1 + assert results[0] == migrations[-1] + + +def test_apply_two_new_migration_ok(cluster): + cluster.init_schema("pytest") + + with cluster.connection("pytest") as conn: conn.execute( "INSERT INTO schema_versions(version, script, md5) VALUES", - [{"version": 1, "script": "location_to_script", "md5": "1234"}], + [{"version": 1, "script": "SHOW TABLES", "md5": "111"}], ) - results = migrations_to_apply(conn, incoming) - assert len(results) == 1 - assert results.version.values[0] == 2 + conn.execute( + "INSERT INTO schema_versions(version, script, md5) VALUES", + [{"version": 2, "script": "SHOW TABLES", "md5": "222"}], + ) + + migrations = [ + Migration(version=1, md5="111", script="SHOW TABLES"), + Migration(version=2, md5="222", script="SHOW TABLES"), + Migration(version=3, md5="333", script="SHOW TABLES"), + Migration(version=4, md5="444", script="SHOW TABLES"), + Migration(version=5, md5="444", script="SHOW TABLES"), + ] + + results = cluster.apply_migrations("pytest", migrations) + assert len(results) == 3 + assert results[0] == migrations[-3] + assert results[1] == migrations[-2] + assert results[2] == migrations[-1] + + +def test_should_migrate_empty_database(cluster): + cluster.create_db("pytest") -def test_should_migrate_empty_database(migrator): - with migrator.connection("pytest") as conn: - tables = execute_and_inflate(conn, "show tables") - assert len(tables) == 1 - assert tables.name.values[0] == "schema_versions" + tables = cluster.show_tables("pytest") + assert len(tables) == 0 - migrator.migrate("pytest", TESTS_DIR / "migrations") + cluster.migrate("pytest", TESTS_DIR / "migrations") - tables = execute_and_inflate(conn, "show tables") - assert len(tables) == 2 - assert tables.name.values[0] == "sample" - assert tables.name.values[1] == "schema_versions" + tables = cluster.show_tables("pytest") + assert len(tables) == 2 + assert tables[0] == "sample" + assert tables[1] == "schema_versions" -def test_empty_migrations(migrator): - clean_slate(migrator) +def test_migrations_folder_is_empty_ok(cluster): + clean_slate(cluster) with tempfile.TemporaryDirectory("empty_dir") as d: - migrator.migrate("pytest", d) + cluster.migrate("pytest", d) def test_main_pass_db_name_ok(): diff --git a/tox.ini b/tox.ini index d9a33ba..6895848 100644 --- a/tox.ini +++ b/tox.ini @@ -11,14 +11,12 @@ skip_missing_interpreters = True basepython = py38 [testenv] -skip_install = False deps = -r requirements.txt -r requirements-test.txt commands = py.test src \ --cov=src/ \ - --cov-config="{toxinidir}/tox.ini" \ - --cov-append + --cov-config="{toxinidir}/tox.ini" [testenv:flake8-check] deps = flake8==4.0.1 @@ -28,7 +26,8 @@ commands = flake8 --config={toxinidir}/tox.ini src/ deps = {[testenv]deps} pylint==2.12.2 -commands = pylint --rcfile={toxinidir}/tox.ini src +commands = pylint src/tests/ --rcfile={toxinidir}/tox.ini + pylint src/clickhouse_migrations/ --rcfile={toxinidir}/tox.ini [testenv:isort] changedir = {toxinidir}/src