-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add schema versioning to flow #504
Changes from all commits
c1c516e
e30306f
d712ddf
593911c
b9b61fb
9a8899a
6ff3f14
7b7aaea
49f2e9c
7eb9707
b50db14
6e3c86a
36e3d50
0e70680
0a6a21b
b5f935e
7b0f4c1
d0740c6
e432420
c4c9ebc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
# Copyright (c) 2021 The Regents of the University of Michigan | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I honestly think it is a terrible idea to duplicate all of this logic here. Is it really that hard to adjust the existing framework to perform the migration? |
||
# All rights reserved. | ||
# This software is licensed under the BSD 3-Clause License. | ||
"""Handle migrations of signac-flow schema versions.""" | ||
|
||
import os | ||
import sys | ||
from contextlib import contextmanager | ||
|
||
from filelock import FileLock | ||
from packaging import version | ||
from signac.common.config import get_config | ||
|
||
from flow.util.config import get_config_value | ||
|
||
from ..version import SCHEMA_VERSION, __version__ | ||
from .v0_to_v1 import migrate_v0_to_v1 | ||
|
||
FN_MIGRATION_LOCKFILE = ".FLOW_PROJECT_MIGRATION_LOCK" | ||
|
||
|
||
MIGRATIONS = { | ||
("0", "1"): migrate_v0_to_v1, | ||
} | ||
|
||
|
||
def _reload_project_config(project): | ||
project_reloaded = project.get_project( | ||
root=project.root_directory(), search=False, _ignore_flow_schema_version=True | ||
) | ||
project._config = project_reloaded._config | ||
|
||
|
||
def _update_project_config(project, **kwargs): | ||
"""Update the project configuration.""" | ||
for fn in ("signac.rc", ".signacrc"): | ||
config = get_config(project.fn(fn)) | ||
if "project" in config: | ||
break | ||
else: | ||
raise RuntimeError("Unable to determine project configuration file.") | ||
config.setdefault("flow", {}).update(kwargs) | ||
config.write() | ||
_reload_project_config(project) | ||
|
||
|
||
@contextmanager | ||
def _lock_for_migration(project): | ||
lock = FileLock(project.fn(FN_MIGRATION_LOCKFILE)) | ||
try: | ||
with lock: | ||
yield | ||
finally: | ||
try: | ||
os.unlink(lock.lock_file) | ||
except FileNotFoundError: | ||
pass | ||
|
||
|
||
def _collect_migrations(project): | ||
schema_version = version.parse(SCHEMA_VERSION) | ||
|
||
def get_config_schema_version(): | ||
# TODO: The means of getting schema versions will have to change for | ||
# flow versions in schema versions > 1 that no longer rely on signac's | ||
# configuration file and schema. | ||
return version.parse(get_config_value("schema_version", config=project.config)) | ||
|
||
if get_config_schema_version() > schema_version: | ||
# Project config schema version is newer and therefore not supported. | ||
raise RuntimeError( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be |
||
"The signac-flow configuration schema version used by this project is {}, " | ||
"but signac-flow {} only supports up to schema version {}. Try updating " | ||
"signac-flow.".format( | ||
get_config_schema_version(), __version__, SCHEMA_VERSION | ||
) | ||
) | ||
|
||
while get_config_schema_version() < schema_version: | ||
for (origin, destination), migration in MIGRATIONS.items(): | ||
if version.parse(origin) == get_config_schema_version(): | ||
yield (origin, destination), migration | ||
break | ||
else: | ||
raise RuntimeError( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be |
||
"The signac-flow configuration schema version used by this project is " | ||
"{}, but signac-flow {} uses schema version {} and does not know how " | ||
"to migrate.".format( | ||
get_config_schema_version(), __version__, schema_version | ||
) | ||
) | ||
|
||
|
||
def apply_migrations(project): | ||
"""Apply migrations to a project.""" | ||
with _lock_for_migration(project): | ||
for (origin, destination), migrate in _collect_migrations(project): | ||
try: | ||
print( | ||
f"Applying migration for version {origin} to {destination}... ", | ||
end="", | ||
file=sys.stderr, | ||
) | ||
migrate(project) | ||
except Exception as e: | ||
raise RuntimeError(f"Failed to apply migration {destination}.") from e | ||
else: | ||
_update_project_config(project, schema_version=destination) | ||
print("OK", file=sys.stderr) | ||
yield origin, destination | ||
|
||
|
||
__all__ = [ | ||
"apply_migrations", | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
# Copyright (c) 2021 The Regents of the University of Michigan | ||
# All rights reserved. | ||
# This software is licensed under the BSD 3-Clause License. | ||
"""Migrate from schema version 0 to version 1. | ||
|
||
This migration is a null-migration that serves as a template | ||
for future migrations and testing purposes. | ||
""" | ||
|
||
|
||
def migrate_v0_to_v1(project): | ||
"""Migrate from schema version 0 to version 1.""" | ||
pass # nothing to do here, serves purely as an example | ||
|
||
|
||
def migrate_v1_to_v2(project): | ||
"""Migrate from schema version 1 to version 2.""" | ||
# TODO: This migration is not yet implemented, but this comment documents | ||
# what this function will need to do. There are a few different scenarios | ||
# that must be accounted for: | ||
# 1. User updates signac before flow. In this case, signac's config API may | ||
# already have changed, and we cannot rely on it. Therefore, we need add | ||
Comment on lines
+21
to
+22
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That makes no sense. flow depends on signac ergo we can rely on its API. |
||
# configobj as an optional requirement for flow rather than relying on | ||
# signac's config API (it's available on PyPI). Alternatively, we could | ||
# bundle configobj like we do in signac, but I don't want to commit to | ||
# that long term so that would at best be a short term solution (say for | ||
# one or two flow releases) and then after that we tell users who | ||
# haven't migrated to just `pip install configobj`. Then, this function | ||
# can rely on the fact that in signac schema v1 the config information | ||
# is stored in signac.rc using the known schema and operate accordingly | ||
# to pull it. | ||
# 2. User updates signac and attempts to migrate before migrating flow | ||
# schema. In this case, `signac migrate` should error and tell the user | ||
# to update flow and run `flow migrate` first. This can be accomplished | ||
Comment on lines
+32
to
+34
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why would flows schema be affected by a schema change of signac core? signac's API is how access the config, any schema change in the core package should be transparent to dependencies, including flow. |
||
# by having the v1->v2 migration in signac check for the presence of the | ||
# "flow" key in the config and error if it is present. We will | ||
# introduce the flow v1->v2 migration before signac's to ensure that | ||
# this is possible. | ||
# 3. Users update signac and create a new project, but still use an old | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's not possible unless they break their environment. |
||
# version of flow that has errors trying to access signac config | ||
# information. This case should fail fast, and we'll just have to inform | ||
# such users that they need to upgrade flow. | ||
# | ||
# Once a FlowProject has migrated to schema version 2, it is decoupled from | ||
# signac's internal schema and should not have any further problems. | ||
pass |
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -33,13 +33,15 @@ | |||||||||||||||||
import signac | ||||||||||||||||||
from deprecation import deprecated | ||||||||||||||||||
from jinja2 import TemplateNotFound as Jinja2TemplateNotFound | ||||||||||||||||||
from packaging import version | ||||||||||||||||||
from signac.contrib.filterparse import parse_filter_arg | ||||||||||||||||||
from tqdm.auto import tqdm | ||||||||||||||||||
|
||||||||||||||||||
from .aggregates import _AggregateStore, aggregator, get_aggregate_id | ||||||||||||||||||
from .environment import get_environment | ||||||||||||||||||
from .errors import ( | ||||||||||||||||||
ConfigKeyError, | ||||||||||||||||||
IncompatibleSchemaVersion, | ||||||||||||||||||
NoSchedulerError, | ||||||||||||||||||
SubmitError, | ||||||||||||||||||
TemplateError, | ||||||||||||||||||
|
@@ -63,7 +65,7 @@ | |||||||||||||||||
switch_to_directory, | ||||||||||||||||||
) | ||||||||||||||||||
from .util.translate import abbreviate, shorten | ||||||||||||||||||
from .version import __version__ | ||||||||||||||||||
from .version import SCHEMA_VERSION, __version__ | ||||||||||||||||||
|
||||||||||||||||||
logger = logging.getLogger(__name__) | ||||||||||||||||||
|
||||||||||||||||||
|
@@ -1567,9 +1569,20 @@ def hello(job): | |||||||||||||||||
|
||||||||||||||||||
""" | ||||||||||||||||||
|
||||||||||||||||||
def __init__(self, config=None, environment=None, entrypoint=None): | ||||||||||||||||||
def __init__( | ||||||||||||||||||
self, | ||||||||||||||||||
config=None, | ||||||||||||||||||
environment=None, | ||||||||||||||||||
entrypoint=None, | ||||||||||||||||||
_ignore_flow_schema_version=False, | ||||||||||||||||||
): | ||||||||||||||||||
super().__init__(config=config) | ||||||||||||||||||
|
||||||||||||||||||
# This internal constructor parameter exists solely for the purpose of | ||||||||||||||||||
# allowing reloads of the project during migration. | ||||||||||||||||||
if not _ignore_flow_schema_version: | ||||||||||||||||||
self._check_flow_schema_compatibility() | ||||||||||||||||||
|
||||||||||||||||||
# Associate this class with a compute environment. | ||||||||||||||||||
self._environment = environment or get_environment() | ||||||||||||||||||
|
||||||||||||||||||
|
@@ -1597,6 +1610,39 @@ def __init__(self, config=None, environment=None, entrypoint=None): | |||||||||||||||||
self._group_to_aggregate_store = _bidict() | ||||||||||||||||||
self._register_groups() | ||||||||||||||||||
|
||||||||||||||||||
def _check_flow_schema_compatibility(self): | ||||||||||||||||||
"""Check whether this project's data schema is compatible with this version. | ||||||||||||||||||
|
||||||||||||||||||
Raises | ||||||||||||||||||
------ | ||||||||||||||||||
:class:`~flow.errors.IncompatibleSchemaVersion` | ||||||||||||||||||
If the schema version is incompatible. | ||||||||||||||||||
|
||||||||||||||||||
""" | ||||||||||||||||||
schema_version = version.parse(SCHEMA_VERSION) | ||||||||||||||||||
config_schema_version = version.parse( | ||||||||||||||||||
flow_config.get_config_value("schema_version", config=self.config) | ||||||||||||||||||
) | ||||||||||||||||||
if config_schema_version > schema_version: | ||||||||||||||||||
# Project config schema version is newer and therefore not supported. | ||||||||||||||||||
raise IncompatibleSchemaVersion( | ||||||||||||||||||
"The signac-flow schema version used by this project is '{}', but flow {} " | ||||||||||||||||||
"only supports up to schema version '{}'. Try updating flow.".format( | ||||||||||||||||||
Comment on lines
+1629
to
+1630
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make this error message consistent with the one in |
||||||||||||||||||
config_schema_version, __version__, schema_version | ||||||||||||||||||
) | ||||||||||||||||||
) | ||||||||||||||||||
elif config_schema_version < schema_version: | ||||||||||||||||||
raise IncompatibleSchemaVersion( | ||||||||||||||||||
"The flow schema version used by this project is '{}', but flow {} " | ||||||||||||||||||
"requires schema version '{}'. Please use '$ flow migrate' to " | ||||||||||||||||||
"irreversibly migrate this project's schema to the supported " | ||||||||||||||||||
"version.".format(config_schema_version, __version__, schema_version) | ||||||||||||||||||
Comment on lines
+1636
to
+1639
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible that the package will support multiple schema versions concurrently? That is, should this be able to say "requires 3 <= schema version <= 5" or similar? |
||||||||||||||||||
) | ||||||||||||||||||
else: # identical and therefore compatible | ||||||||||||||||||
logger.debug( | ||||||||||||||||||
f"The project's schema version {config_schema_version} is supported." | ||||||||||||||||||
) | ||||||||||||||||||
|
||||||||||||||||||
def _setup_template_environment(self): | ||||||||||||||||||
"""Set up the jinja2 template environment. | ||||||||||||||||||
|
||||||||||||||||||
|
@@ -2642,7 +2688,9 @@ def print_status( | |||||||||||||||||
"eligible_jobs_max_lines" | ||||||||||||||||||
) | ||||||||||||||||||
|
||||||||||||||||||
status_parallelization = self.config["flow"]["status_parallelization"] | ||||||||||||||||||
status_parallelization = flow_config.get_config_value( | ||||||||||||||||||
"status_parallelization", config=self.config | ||||||||||||||||||
) | ||||||||||||||||||
|
||||||||||||||||||
# initialize jinja2 template environment and necessary filters | ||||||||||||||||||
template_environment = self._template_environment() | ||||||||||||||||||
|
@@ -4450,7 +4498,9 @@ def _main_status(self, args): | |||||||||||||||||
# Use small offset to account for overhead with few jobs | ||||||||||||||||||
delta_t = (time.time() - start - 0.5) / max(length_jobs, 1) | ||||||||||||||||||
config_key = "status_performance_warn_threshold" | ||||||||||||||||||
warn_threshold = flow_config.get_config_value(config_key) | ||||||||||||||||||
warn_threshold = flow_config.get_config_value( | ||||||||||||||||||
config_key, config=self.config | ||||||||||||||||||
) | ||||||||||||||||||
if not args["profile"] and delta_t > warn_threshold >= 0: | ||||||||||||||||||
print( | ||||||||||||||||||
"WARNING: " | ||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,4 +5,6 @@ | |
|
||
__version__ = "0.14.0" | ||
|
||
__all__ = ["__version__"] | ||
SCHEMA_VERSION = "1" | ||
|
||
__all__ = ["__version__", "SCHEMA_VERSION"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,3 +3,5 @@ jinja2>=2.10 | |
cloudpickle>=1.1.1 | ||
deprecation>=2.0.0 | ||
tqdm>=4.48.1 | ||
filelock>=3.0 | ||
packaging>=15.0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,3 +3,5 @@ coverage==5.5 | |
pytest-cov==2.11.1 | ||
pytest==6.2.3 | ||
ruamel.yaml==0.17.4 | ||
filelock>=3.0 | ||
packaging>=15.0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These requirements are in addition to the core |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shorter docstring, to align with other error docstrings.