diff --git a/changelog.txt b/changelog.txt index 2f46df066..8ca9aa1a3 100644 --- a/changelog.txt +++ b/changelog.txt @@ -17,6 +17,7 @@ Added - Add support for aggregation (operations acting on multiple jobs) via ``flow.aggregator`` (#464). - Add official support for Andes cluster (#500). - Decorator for setting directives while registering operation function ``FlowProject.operation.with_directives`` (#309, #502). +- Schema version to ensure compatibility and enable migrations in future package versions (#504, #506). Changed +++++++ diff --git a/flow/errors.py b/flow/errors.py index dcc70e0f3..c7595ed49 100644 --- a/flow/errors.py +++ b/flow/errors.py @@ -61,3 +61,9 @@ def parse(self, parser): def err(self, msg, caller): """Raise a template error.""" raise jinja2.TemplateError(msg) + + +class IncompatibleSchemaVersion(RuntimeError): + """The FlowProject's configuration schema version is incompatible with this version of signac-flow.""" # noqa: E501 + + pass diff --git a/flow/migration/__init__.py b/flow/migration/__init__.py new file mode 100644 index 000000000..1b334a515 --- /dev/null +++ b/flow/migration/__init__.py @@ -0,0 +1,115 @@ +# Copyright (c) 2021 The Regents of the University of Michigan +# All rights reserved. +# This software is licensed under the BSD 3-Clause License. +"""Handle migrations of signac-flow schema versions.""" + +import os +import sys +from contextlib import contextmanager + +from filelock import FileLock +from packaging import version +from signac.common.config import get_config + +from flow.util.config import get_config_value + +from ..version import SCHEMA_VERSION, __version__ +from .v0_to_v1 import migrate_v0_to_v1 + +FN_MIGRATION_LOCKFILE = ".FLOW_PROJECT_MIGRATION_LOCK" + + +MIGRATIONS = { + ("0", "1"): migrate_v0_to_v1, +} + + +def _reload_project_config(project): + project_reloaded = project.get_project( + root=project.root_directory(), search=False, _ignore_flow_schema_version=True + ) + project._config = project_reloaded._config + + +def _update_project_config(project, **kwargs): + """Update the project configuration.""" + for fn in ("signac.rc", ".signacrc"): + config = get_config(project.fn(fn)) + if "project" in config: + break + else: + raise RuntimeError("Unable to determine project configuration file.") + config.setdefault("flow", {}).update(kwargs) + config.write() + _reload_project_config(project) + + +@contextmanager +def _lock_for_migration(project): + lock = FileLock(project.fn(FN_MIGRATION_LOCKFILE)) + try: + with lock: + yield + finally: + try: + os.unlink(lock.lock_file) + except FileNotFoundError: + pass + + +def _collect_migrations(project): + schema_version = version.parse(SCHEMA_VERSION) + + def get_config_schema_version(): + # TODO: The means of getting schema versions will have to change for + # flow versions in schema versions > 1 that no longer rely on signac's + # configuration file and schema. + return version.parse(get_config_value("schema_version", config=project.config)) + + if get_config_schema_version() > schema_version: + # Project config schema version is newer and therefore not supported. + raise RuntimeError( + "The signac-flow configuration schema version used by this project is {}, " + "but signac-flow {} only supports up to schema version {}. Try updating " + "signac-flow.".format( + get_config_schema_version(), __version__, SCHEMA_VERSION + ) + ) + + while get_config_schema_version() < schema_version: + for (origin, destination), migration in MIGRATIONS.items(): + if version.parse(origin) == get_config_schema_version(): + yield (origin, destination), migration + break + else: + raise RuntimeError( + "The signac-flow configuration schema version used by this project is " + "{}, but signac-flow {} uses schema version {} and does not know how " + "to migrate.".format( + get_config_schema_version(), __version__, schema_version + ) + ) + + +def apply_migrations(project): + """Apply migrations to a project.""" + with _lock_for_migration(project): + for (origin, destination), migrate in _collect_migrations(project): + try: + print( + f"Applying migration for version {origin} to {destination}... ", + end="", + file=sys.stderr, + ) + migrate(project) + except Exception as e: + raise RuntimeError(f"Failed to apply migration {destination}.") from e + else: + _update_project_config(project, schema_version=destination) + print("OK", file=sys.stderr) + yield origin, destination + + +__all__ = [ + "apply_migrations", +] diff --git a/flow/migration/v0_to_v1.py b/flow/migration/v0_to_v1.py new file mode 100644 index 000000000..ff00cf9c4 --- /dev/null +++ b/flow/migration/v0_to_v1.py @@ -0,0 +1,46 @@ +# Copyright (c) 2021 The Regents of the University of Michigan +# All rights reserved. +# This software is licensed under the BSD 3-Clause License. +"""Migrate from schema version 0 to version 1. + +This migration is a null-migration that serves as a template +for future migrations and testing purposes. +""" + + +def migrate_v0_to_v1(project): + """Migrate from schema version 0 to version 1.""" + pass # nothing to do here, serves purely as an example + + +def migrate_v1_to_v2(project): + """Migrate from schema version 1 to version 2.""" + # TODO: This migration is not yet implemented, but this comment documents + # what this function will need to do. There are a few different scenarios + # that must be accounted for: + # 1. User updates signac before flow. In this case, signac's config API may + # already have changed, and we cannot rely on it. Therefore, we need add + # configobj as an optional requirement for flow rather than relying on + # signac's config API (it's available on PyPI). Alternatively, we could + # bundle configobj like we do in signac, but I don't want to commit to + # that long term so that would at best be a short term solution (say for + # one or two flow releases) and then after that we tell users who + # haven't migrated to just `pip install configobj`. Then, this function + # can rely on the fact that in signac schema v1 the config information + # is stored in signac.rc using the known schema and operate accordingly + # to pull it. + # 2. User updates signac and attempts to migrate before migrating flow + # schema. In this case, `signac migrate` should error and tell the user + # to update flow and run `flow migrate` first. This can be accomplished + # by having the v1->v2 migration in signac check for the presence of the + # "flow" key in the config and error if it is present. We will + # introduce the flow v1->v2 migration before signac's to ensure that + # this is possible. + # 3. Users update signac and create a new project, but still use an old + # version of flow that has errors trying to access signac config + # information. This case should fail fast, and we'll just have to inform + # such users that they need to upgrade flow. + # + # Once a FlowProject has migrated to schema version 2, it is decoupled from + # signac's internal schema and should not have any further problems. + pass diff --git a/flow/project.py b/flow/project.py index 4ad57874a..a031ca3e1 100644 --- a/flow/project.py +++ b/flow/project.py @@ -33,6 +33,7 @@ import signac from deprecation import deprecated from jinja2 import TemplateNotFound as Jinja2TemplateNotFound +from packaging import version from signac.contrib.filterparse import parse_filter_arg from tqdm.auto import tqdm @@ -40,6 +41,7 @@ from .environment import get_environment from .errors import ( ConfigKeyError, + IncompatibleSchemaVersion, NoSchedulerError, SubmitError, TemplateError, @@ -63,7 +65,7 @@ switch_to_directory, ) from .util.translate import abbreviate, shorten -from .version import __version__ +from .version import SCHEMA_VERSION, __version__ logger = logging.getLogger(__name__) @@ -1567,9 +1569,20 @@ def hello(job): """ - def __init__(self, config=None, environment=None, entrypoint=None): + def __init__( + self, + config=None, + environment=None, + entrypoint=None, + _ignore_flow_schema_version=False, + ): super().__init__(config=config) + # This internal constructor parameter exists solely for the purpose of + # allowing reloads of the project during migration. + if not _ignore_flow_schema_version: + self._check_flow_schema_compatibility() + # Associate this class with a compute environment. self._environment = environment or get_environment() @@ -1597,6 +1610,39 @@ def __init__(self, config=None, environment=None, entrypoint=None): self._group_to_aggregate_store = _bidict() self._register_groups() + def _check_flow_schema_compatibility(self): + """Check whether this project's data schema is compatible with this version. + + Raises + ------ + :class:`~flow.errors.IncompatibleSchemaVersion` + If the schema version is incompatible. + + """ + schema_version = version.parse(SCHEMA_VERSION) + config_schema_version = version.parse( + flow_config.get_config_value("schema_version", config=self.config) + ) + if config_schema_version > schema_version: + # Project config schema version is newer and therefore not supported. + raise IncompatibleSchemaVersion( + "The signac-flow schema version used by this project is '{}', but flow {} " + "only supports up to schema version '{}'. Try updating flow.".format( + config_schema_version, __version__, schema_version + ) + ) + elif config_schema_version < schema_version: + raise IncompatibleSchemaVersion( + "The flow schema version used by this project is '{}', but flow {} " + "requires schema version '{}'. Please use '$ flow migrate' to " + "irreversibly migrate this project's schema to the supported " + "version.".format(config_schema_version, __version__, schema_version) + ) + else: # identical and therefore compatible + logger.debug( + f"The project's schema version {config_schema_version} is supported." + ) + def _setup_template_environment(self): """Set up the jinja2 template environment. @@ -2642,7 +2688,9 @@ def print_status( "eligible_jobs_max_lines" ) - status_parallelization = self.config["flow"]["status_parallelization"] + status_parallelization = flow_config.get_config_value( + "status_parallelization", config=self.config + ) # initialize jinja2 template environment and necessary filters template_environment = self._template_environment() @@ -4450,7 +4498,9 @@ def _main_status(self, args): # Use small offset to account for overhead with few jobs delta_t = (time.time() - start - 0.5) / max(length_jobs, 1) config_key = "status_performance_warn_threshold" - warn_threshold = flow_config.get_config_value(config_key) + warn_threshold = flow_config.get_config_value( + config_key, config=self.config + ) if not args["profile"] and delta_t > warn_threshold >= 0: print( "WARNING: " diff --git a/flow/util/config.py b/flow/util/config.py index 665ea58f9..12d78f2a3 100644 --- a/flow/util/config.py +++ b/flow/util/config.py @@ -2,18 +2,19 @@ # All rights reserved. # This software is licensed under the BSD 3-Clause License. """Contains logic for working with flow related information stored in the signac config.""" -from signac.common import config +import signac.common.config from ..errors import ConfigKeyError # Monkeypatch the signac config spec to include flow-specific fields. -config.cfg += """ +signac.common.config.cfg += """ [flow] import_packaged_environments = boolean() status_performance_warn_threshold = float(default=0.2) show_traceback = boolean() -eligible_jobs_max_lines = int(default=10) +eligible_jobs_max_lines = integer(default=10) status_parallelization = string(default='none') +schema_version = string(default='1') """ @@ -24,7 +25,7 @@ class _GetConfigValueNoneType: _GET_CONFIG_VALUE_NONE = _GetConfigValueNoneType() -def require_config_value(key, ns=None, default=_GET_CONFIG_VALUE_NONE): +def require_config_value(key, ns=None, default=_GET_CONFIG_VALUE_NONE, config=None): """Request a value from the user's configuration, failing if not available. Parameters @@ -36,6 +37,10 @@ def require_config_value(key, ns=None, default=_GET_CONFIG_VALUE_NONE): default A default value in case the key cannot be found within the user's configuration. + config + The config to pull from. If ``None``, uses + :func:`~signac.common.config.load_config` to acquire one + (Default value = None). Returns ------- @@ -49,11 +54,13 @@ def require_config_value(key, ns=None, default=_GET_CONFIG_VALUE_NONE): and no default value is provided. """ + config = config or signac.common.config.load_config() + try: if ns is None: - return config.load_config()["flow"][key] + return config["flow"][key] else: - return config.load_config()["flow"][ns][key] + return config["flow"][ns][key] except KeyError: if default is _GET_CONFIG_VALUE_NONE: k = str(key) if ns is None else f"{ns}.{key}" @@ -62,7 +69,7 @@ def require_config_value(key, ns=None, default=_GET_CONFIG_VALUE_NONE): return default -def get_config_value(key, ns=None, default=None): +def get_config_value(key, ns=None, default=None, config=None): """Request a value from the user's configuration. Parameters @@ -74,6 +81,8 @@ def get_config_value(key, ns=None, default=None): default A default value returned if the key cannot be found within the user configuration. + config + The config to pull from (Default value = None). Returns ------- @@ -81,4 +90,4 @@ def get_config_value(key, ns=None, default=None): The value if found, None if not found. """ - return require_config_value(key=key, ns=ns, default=default) + return require_config_value(key=key, ns=ns, default=default, config=config) diff --git a/flow/version.py b/flow/version.py index a8a9d731c..c9ce2d0d7 100644 --- a/flow/version.py +++ b/flow/version.py @@ -5,4 +5,6 @@ __version__ = "0.14.0" -__all__ = ["__version__"] +SCHEMA_VERSION = "1" + +__all__ = ["__version__", "SCHEMA_VERSION"] diff --git a/requirements.txt b/requirements.txt index 314ee4000..d1f613710 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,5 @@ jinja2>=2.10 cloudpickle>=1.1.1 deprecation>=2.0.0 tqdm>=4.48.1 +filelock>=3.0 +packaging>=15.0 diff --git a/requirements/requirements-test.txt b/requirements/requirements-test.txt index ae6e2b138..927939340 100644 --- a/requirements/requirements-test.txt +++ b/requirements/requirements-test.txt @@ -3,3 +3,5 @@ coverage==5.5 pytest-cov==2.11.1 pytest==6.2.3 ruamel.yaml==0.17.4 +filelock>=3.0 +packaging>=15.0 diff --git a/setup.py b/setup.py index d009df5fb..5659aa2f2 100644 --- a/setup.py +++ b/setup.py @@ -16,6 +16,10 @@ "deprecation>=2.0.0", # Progress bars "tqdm>=4.48.1", + # Platform-independent file locking + "filelock>=3.0", + # Used for version parsing and comparison + "packaging>=15.0", ] description = "Simple workflow management for signac projects." diff --git a/tests/test_project.py b/tests/test_project.py index 820fff08a..b6c57a480 100644 --- a/tests/test_project.py +++ b/tests/test_project.py @@ -4,6 +4,7 @@ import collections.abc import datetime import inspect +import io import logging import os import subprocess @@ -17,6 +18,7 @@ from itertools import groupby from tempfile import TemporaryDirectory +import packaging.version import pytest import signac from define_aggregate_test_project import _AggregateTestProject @@ -36,9 +38,10 @@ with_job, ) from flow.environment import ComputeEnvironment -from flow.errors import DirectivesError, SubmitError +from flow.errors import DirectivesError, IncompatibleSchemaVersion, SubmitError from flow.project import IgnoreConditions, _AggregatesCursor from flow.scheduling.base import ClusterJob, JobStatus, Scheduler +from flow.util.config import get_config_value from flow.util.misc import ( add_cwd_to_environment_pythonpath, add_path_to_environment_pythonpath, @@ -2130,3 +2133,50 @@ def test_invert(self): } for key, value in expected_results.items(): assert ~key == value + + +class TestProjectSchema(TestProjectBase): + project_class = _TestProject + + def test_project_schema_versions(self): + impossibly_high_schema_version = "9999" + assert packaging.version.parse( + get_config_value("schema_version", config=self.project._config) + ) < packaging.version.parse(impossibly_high_schema_version) + config = signac.common.config.get_config(self.project.fn("signac.rc")) + flow_config = config.setdefault("flow", {}) + flow_config["schema_version"] = impossibly_high_schema_version + config.write() + with pytest.raises(IncompatibleSchemaVersion): + self.project_class.init_project( + name=str(self.project), root=self.project.root_directory() + ) + + def test_project_schema_version_migration(self): + from flow.migration import apply_migrations + + apply_migrations(self.project) + self.project._config["flow"]["schema_version"] = "0" + assert self.project._config["flow"]["schema_version"] == "0" + err = io.StringIO() + with redirect_stderr(err): + for origin, destination in apply_migrations(self.project): + assert self.project._config["schema_version"] == destination + project = signac.get_project(root=self.project.root_directory()) + assert project._config["schema_version"] == destination + assert self.project._config["schema_version"] == "1" + assert "OK" in err.getvalue() + assert "0 to 1" in err.getvalue() + + def test_no_migration(self): + # This unit test should fail as long as there are no schema migrations + # implemented within the signac.contrib.migration package. + # + # Once migrations are implemented: + # + # 1. Ensure to enable the 'migrate' sub-command within the __main__ module. + # 2. Either update or remove this unit test. + from signac.contrib.migration import _collect_migrations + + migrations = list(_collect_migrations(self.project)) + assert len(migrations) == 0