Skip to content

Commit

Permalink
Merge pull request #8 from zifter/feature/multistatement
Browse files Browse the repository at this point in the history
Mutlistatement
  • Loading branch information
zifter authored Jan 3, 2022
2 parents 7024dd4 + e831817 commit 66ece0e
Show file tree
Hide file tree
Showing 15 changed files with 119 additions and 32 deletions.
38 changes: 25 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,39 @@
[![supported versions](https://img.shields.io/pypi/pyversions/clickhouse-migrations.svg)](https://pypi.org/project/clickhouse-migrations/)
[![my site](https://img.shields.io/badge/site-my%20blog-yellow.svg)](https://zifter.github.io/)

## Clickhouse Migrator
# Clickhouse Migrations

[Clickhouse](https://clickhouse.tech/) is known for its scale to store and fetch large datasets.

Development and Maintenance of large-scale db systems many times requires constant changes to the actual DB system.
Holding off the scripts to migrate these will be painful.

We found there is nothing existing earlier and developed one inspired by, [Flyway](https://flywaydb.org/), [Alembic](https://alembic.sqlalchemy.org/en/latest/)
## Features:
* Supports multi statements - more than one query per migration file.
* Allow running migrations out-of-box
* Simple file migrations format: {VERSION}_{name}.sql

This is a python library, which you can execute as a pre-hook using sys python.
Or as a migration framework before deployment/server-startup in your application as required.

### Installation
## Installation

You can install from pypi using `pip install clickhouse-migrations`.

### Usage
## Usage

### In command line
```bash
clickhouse-migrations --db-host localhost \
--db-user default \
--db-password secret \
--db-name test \
--migrations-dir ./migrations
```

### In code
```python
from clickhouse_migrations.clickhouse_cluster import ClickhouseCluster

cluster = ClickhouseCluster(db_host, db_user, db_password)
cluster.migrate(db_name, migrations_home, create_db_if_no_exists)
cluster.migrate(db_name, migrations_home, create_db_if_no_exists=True, multi_statement=True)
```

Parameter | Description | Default
Expand All @@ -36,10 +46,12 @@ db_password | ***** | ****
db_name| Clickhouse database name | None
migrations_home | Path to list of migration files | <project_root>
create_db_if_no_exists | If the `db_name` is not present, enabling this will create the db | True
multi_statement | Allow multiple statements in migration files | True

### Folder and Migration file patterns

The filenames are pretty similar to how `flyway` keeps it.
### Notes
The Clickhouse driver does not natively support executing multipe statements in a single query.
To allow for multiple statements in a single migration, you can use the multi_statement param.
There are two important caveats:
* This mode splits the migration text into separately-executed statements by a semi-colon ;. Thus cannot be used when a statement in the migration contains a string with a semi-colon.
* The queries are not executed in any sort of transaction/batch, meaning you are responsible for fixing partial migrations.

Your first version filename should be prefixed with `V1__` (double underscore)
These migrations are executed one by one, failures in between will stop and not further version files will be executed.
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = clickhouse-migrations
version = attr: clickhouse_migrations.__version__
author = Oleg Strokachuk
author_email = zifter.ai+clickhouse_migrations@gmail.com
author_email = zifter.ai+clickhouse-migrations@gmail.com
home_page = https://github.com/zifter/clickhouse-migrations
description = attr: clickhouse_migrations.__doc__
long_description = file: README.md
Expand All @@ -13,6 +13,7 @@ platform = any
keywords = clickhouse, migrations
classifiers =
Programming Language :: Python
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Expand Down
2 changes: 1 addition & 1 deletion src/clickhouse_migrations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
Simple file-based migrations for clickhouse
"""
__version__ = "0.1.5"
__version__ = "0.2.0"
9 changes: 7 additions & 2 deletions src/clickhouse_migrations/clickhouse_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,24 @@ def migrate(
db_name: str,
migration_path: Path,
create_db_if_no_exists: bool = True,
multi_statement: bool = True,
):
storage = MigrationStorage(migration_path)
migrations = storage.migrations()

return self.apply_migrations(
db_name, migrations, create_db_if_no_exists=create_db_if_no_exists
db_name,
migrations,
create_db_if_no_exists=create_db_if_no_exists,
multi_statement=multi_statement,
)

def apply_migrations(
self,
db_name: str,
migrations: List[Migration],
create_db_if_no_exists: bool = True,
multi_statement: bool = True,
) -> List[Migration]:

if create_db_if_no_exists:
Expand All @@ -67,4 +72,4 @@ def apply_migrations(
with self.connection(db_name) as conn:
migrator = Migrator(conn)
migrator.init_schema()
return migrator.apply_migration(migrations)
return migrator.apply_migration(migrations, multi_statement)
9 changes: 8 additions & 1 deletion src/clickhouse_migrations/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,20 @@ def get_context(args):
default=os.environ.get("MIGRATIONS_DIR", MIGRATIONS_DIR),
help="Path to list of migration files",
)
parser.add_argument(
"--multi-statement",
default=os.environ.get("MULTI_STATEMENT", "1"),
help="Path to list of migration files",
)

return parser.parse_args(args)


def migrate(context) -> int:
cluster = ClickhouseCluster(context.db_host, context.db_user, context.db_password)
cluster.migrate(context.db_name, Path(context.migrations_dir))
cluster.migrate(
context.db_name, Path(context.migrations_dir), int(context.multi_statement) == 1
)
return 0


Expand Down
7 changes: 7 additions & 0 deletions src/clickhouse_migrations/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
class MigrationException(Exception):
pass


__all__ = [
"MigrationException",
]
26 changes: 21 additions & 5 deletions src/clickhouse_migrations/migrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pandas
from clickhouse_driver import Client

from clickhouse_migrations.exceptions import MigrationException
from clickhouse_migrations.types import Migration


Expand Down Expand Up @@ -36,7 +37,7 @@ def migrations_to_apply(self, migrations: List[Migration]) -> List[Migration]:

incoming = pandas.DataFrame(migrations)
if len(incoming) == 0 or len(incoming) < len(applied_migrations):
raise AssertionError(
raise MigrationException(
"Migrations have gone missing, "
"your code base should not truncate migrations, "
"use migrations to correct older migrations"
Expand All @@ -51,7 +52,7 @@ def migrations_to_apply(self, migrations: List[Migration]) -> List[Migration]:
exec_stat.c_md5.notnull() & exec_stat.md5.isnull()
]
if len(committed_and_absconded) > 0:
raise AssertionError(
raise MigrationException(
"Migrations have gone missing, "
"your code base should not truncate migrations, "
"use migrations to correct older migrations"
Expand All @@ -64,21 +65,27 @@ def migrations_to_apply(self, migrations: List[Migration]) -> List[Migration]:
)
terms_violated = exec_stat[index]
if len(terms_violated) > 0:
raise AssertionError(
raise MigrationException(
"Do not edit migrations once run, "
"use migrations to correct older migrations"
)
versions_to_apply = exec_stat[exec_stat.c_md5.isnull()][["version"]]
return [m for m in migrations if m.version in versions_to_apply.values]

def apply_migration(self, migrations: List[Migration]) -> List[Migration]:
def apply_migration(
self, migrations: List[Migration], multi_statement
) -> List[Migration]:
new_migrations = self.migrations_to_apply(migrations)
if not new_migrations:
return []

for migration in new_migrations:
logging.info("Execute migration %s", migration)
self._conn.execute(migration.script)

statements = self.script_to_statements(migration.script, multi_statement)
for statement in statements:
statement = statement.strip()
self._conn.execute(statement)

logging.info("Migration applied")

Expand All @@ -94,3 +101,12 @@ def apply_migration(self, migrations: List[Migration]) -> List[Migration]:
)

return new_migrations

@classmethod
def script_to_statements(cls, script: str, multi_statement) -> List[str]:
script = script.strip()

if multi_statement:
return [m.strip() for m in script.split(";") if m]

return [script]
10 changes: 6 additions & 4 deletions src/clickhouse_migrations/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,25 @@ def __init__(self, storage_dir: Path):
self.storage_dir: Path = storage_dir

def filenames(self) -> List[Path]:
migrations: List[Path] = []
l: List[Path] = []
for f in os.scandir(self.storage_dir):
if f.name.endswith(".sql"):
migrations.append(self.storage_dir / f.name)
l.append(self.storage_dir / f.name)

return migrations
return l

def migrations(self) -> List[Migration]:
migrations: List[Migration] = []

for full_path in self.filenames():
migration = Migration(
version=int(full_path.name.split("_")[0].replace("V", "")),
version=int(full_path.name.split("_")[0]),
script=str(full_path.read_text(encoding="utf8")),
md5=hashlib.md5(full_path.read_bytes()).hexdigest(),
)

migrations.append(migration)

migrations.sort(key=lambda m: m.version)

return migrations
2 changes: 2 additions & 0 deletions src/tests/complex_migrations/001_init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE TABLE sample11(id UInt32, name String) ENGINE MergeTree PARTITION BY tuple()
ORDER BY tuple();
5 changes: 5 additions & 0 deletions src/tests/complex_migrations/002_test2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE sample21(id UInt32, name String) ENGINE MergeTree PARTITION BY tuple()
ORDER BY tuple();

CREATE TABLE sample22(id UInt32, name String) ENGINE MergeTree PARTITION BY tuple()
ORDER BY tuple();
8 changes: 8 additions & 0 deletions src/tests/complex_migrations/003_third_test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE sample31(id UInt32, name String) ENGINE MergeTree PARTITION BY tuple()
ORDER BY tuple();

CREATE TABLE sample32(id UInt32, name String) ENGINE MergeTree PARTITION BY tuple()
ORDER BY tuple();

CREATE TABLE sample33(id UInt32, name String) ENGINE MergeTree PARTITION BY tuple()
ORDER BY tuple()
2 changes: 2 additions & 0 deletions src/tests/complex_migrations/010_migrations_is_not_in_row.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE TABLE sample101(id UInt32, name String) ENGINE MergeTree PARTITION BY tuple()
ORDER BY tuple()
File renamed without changes.
5 changes: 5 additions & 0 deletions src/tests/multi_statements_migrations/001_create_test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE sample(id UInt32, name String) ENGINE MergeTree PARTITION BY tuple()
ORDER BY tuple();

CREATE TABLE sample2(id UInt32, name String) ENGINE MergeTree PARTITION BY tuple()
ORDER BY tuple();
25 changes: 20 additions & 5 deletions src/tests/test_clickhouse_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
from pathlib import Path

import pytest
from clickhouse_driver.errors import ServerException

from clickhouse_migrations.clickhouse_cluster import ClickhouseCluster
from clickhouse_migrations.cmd import get_context, migrate
from clickhouse_migrations.exceptions import MigrationException
from clickhouse_migrations.types import Migration

TESTS_DIR = Path(__file__).parent
Expand Down Expand Up @@ -42,7 +44,7 @@ def test_deleted_migrations_exception(cluster):
[{"version": 1, "script": "location_to_script", "md5": "1234"}],
)

with pytest.raises(AssertionError):
with pytest.raises(MigrationException):
cluster.apply_migrations("pytest", [])


Expand All @@ -59,7 +61,7 @@ def test_missing_migration_exception(cluster):
Migration(version=2, md5="12345", script="location_to_script"),
]

with pytest.raises(AssertionError):
with pytest.raises(MigrationException):
cluster.apply_migrations("pytest", migrations)


Expand All @@ -76,7 +78,7 @@ def test_modified_committed_migrations_exception(cluster):
Migration(version=1, md5="12345", script="location_to_script"),
]

with pytest.raises(AssertionError):
with pytest.raises(MigrationException):
cluster.apply_migrations("pytest", migrations)


Expand Down Expand Up @@ -143,12 +145,25 @@ def test_should_migrate_empty_database(cluster):


def test_migrations_folder_is_empty_ok(cluster):
clean_slate(cluster)

with tempfile.TemporaryDirectory("empty_dir") as d:
cluster.migrate("pytest", d)


def test_multi_statement_migration_by_default_ok(cluster):
cluster.migrate("pytest", TESTS_DIR / "multi_statements_migrations")


def test_multi_statement_migration_disabled_ok(cluster):
with pytest.raises(ServerException):
cluster.migrate(
"pytest", TESTS_DIR / "multi_statements_migrations", multi_statement=False
)


def test_complex_ok(cluster):
cluster.migrate("pytest", TESTS_DIR / "complex_migrations")


def test_main_pass_db_name_ok():
migrate(
get_context(
Expand Down

0 comments on commit 66ece0e

Please sign in to comment.