Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add schema versioning to flow #504

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Added
- Defined validators for the ``fork`` directive (#480).
- Submission summary now appears in ``FlowProject`` status output, showing the number of queued, running, unknown statuses. (#472, #488).
- Status overview now shows the number of jobs with incomplete operations and totals for the label overviews (#501).
- Add schema version to ensure compatibility and enable migrations in future package versions (#504).

Changed
+++++++
Expand Down
6 changes: 6 additions & 0 deletions flow/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,9 @@ def parse(self, parser):
def err(self, msg, caller):
"""Raise a template error."""
raise jinja2.TemplateError(msg)


class IncompatibleSchemaVersion(RuntimeError):
"""The FlowProject's schema version is incompatible with this version of flow."""
vyasr marked this conversation as resolved.
Show resolved Hide resolved

pass
113 changes: 113 additions & 0 deletions flow/migration/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Copyright (c) 2021 The Regents of the University of Michigan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly think it is a terrible idea to duplicate all of this logic here. Is it really that hard to adjust the existing framework to perform the migration?

# All rights reserved.
# This software is licensed under the BSD 3-Clause License.
"""Handle migrations of signac schema versions."""
vyasr marked this conversation as resolved.
Show resolved Hide resolved

import os
import sys
from contextlib import contextmanager

from filelock import FileLock
from packaging import version
from signac.common.config import get_config

from flow.util.config import get_config_value

from ..version import SCHEMA_VERSION, __version__
from .v0_to_v1 import migrate_v0_to_v1

FN_MIGRATION_LOCKFILE = ".FLOW_PROJECT_MIGRATION_LOCK"


MIGRATIONS = {
("0", "1"): migrate_v0_to_v1,
}


def _reload_project_config(project):
project_reloaded = project.get_project(
root=project.root_directory(), search=False, _ignore_flow_schema_version=True
)
project._config = project_reloaded._config


def _update_project_config(project, **kwargs):
"""Update the project configuration."""
for fn in ("signac.rc", ".signacrc"):
config = get_config(project.fn(fn))
if "project" in config:
break
else:
raise RuntimeError("Unable to determine project configuration file.")
config.setdefault("flow", {}).update(kwargs)
config.write()
_reload_project_config(project)


@contextmanager
def _lock_for_migration(project):
lock = FileLock(project.fn(FN_MIGRATION_LOCKFILE))
try:
with lock:
yield
finally:
try:
os.unlink(lock.lock_file)
except FileNotFoundError:
pass


def _collect_migrations(project):
schema_version = version.parse(SCHEMA_VERSION)

def get_config_schema_version():
# TODO: The means of getting schema versions will have to change for
# flow versions in schema versions > 1 that no longer rely on signac's
# configuration file and schema.
return version.parse(get_config_value("schema_version", config=project.config))

if get_config_schema_version() > schema_version:
# Project config schema version is newer and therefore not supported.
raise RuntimeError(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be IncompatibleSchemaVersion? Seems like the messages are very similar.

"The signac-flow schema version used by this project is {}, but flow {} "
vyasr marked this conversation as resolved.
Show resolved Hide resolved
"only supports up to schema version {}. Try updating flow.".format(
vyasr marked this conversation as resolved.
Show resolved Hide resolved
get_config_schema_version(), __version__, SCHEMA_VERSION
)
)

while get_config_schema_version() < schema_version:
for (origin, destination), migration in MIGRATIONS.items():
if version.parse(origin) == get_config_schema_version():
yield (origin, destination), migration
break
else:
raise RuntimeError(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be IncompatibleSchemaVersion? Seems like the messages are very similar.

"The signac-flow schema version used by this project is {}, but flow {} "
vyasr marked this conversation as resolved.
Show resolved Hide resolved
"uses schema version {} and does not know how to migrate.".format(
get_config_schema_version(), __version__, schema_version
)
)


def apply_migrations(project):
"""Apply migrations to a project."""
with _lock_for_migration(project):
for (origin, destination), migrate in _collect_migrations(project):
try:
print(
f"Applying migration for version {origin} to {destination}... ",
end="",
file=sys.stderr,
)
migrate(project)
except Exception as e:
raise RuntimeError(f"Failed to apply migration {destination}.") from e
else:
_update_project_config(project, schema_version=destination)
print("OK", file=sys.stderr)
yield origin, destination


__all__ = [
"apply_migrations",
]
46 changes: 46 additions & 0 deletions flow/migration/v0_to_v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright (c) 2021 The Regents of the University of Michigan
# All rights reserved.
# This software is licensed under the BSD 3-Clause License.
"""Migrate from schema version 0 to version 1.

This migration is a null-migration that serves as a template
for future migrations and testing purposes.
"""


def migrate_v0_to_v1(project):
"""Migrate from schema version 0 to version 1."""
pass # nothing to do here, serves purely as an example


def migrate_v1_to_v2(project):
"""Migrate from schema version 1 to version 2."""
# TODO: This migration is not yet implemented, but this comment documents
# what this function will need to do. There are a few different scenarios
# that must be accounted for:
# 1. User updates signac before flow. In this case, signac's config API may
# already have changed, and we cannot rely on it. Therefore, we need add
Comment on lines +21 to +22
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes no sense. flow depends on signac ergo we can rely on its API.

# configobj as an optional requirement for flow rather than relying on
# signac's config API (it's available on PyPI). Alternatively, we could
# bundle configobj like we do in signac, but I don't want to commit to
# that long term so that would at best be a short term solution (say for
# one or two flow releases) and then after that we tell users who
# haven't migrated to just `pip install configobj`. Then, this function
# can rely on the fact that in signac schema version one the config
vyasr marked this conversation as resolved.
Show resolved Hide resolved
# information is stored in signac.rc using the known schema and operate
# accordingly to pull it.
# 2. User updates signac and attempts to migrate before migrating flow
# schema. In this case, `signac migrate` should error and tell the user
# to update flow and run `flow migrate` first. This can be accomplished
Comment on lines +32 to +34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would flows schema be affected by a schema change of signac core? signac's API is how access the config, any schema change in the core package should be transparent to dependencies, including flow.

# by having the v1->v2 migration in signac check for the presence of the
# "flow" key in the config and error if it is present. We will
# introduce the flow v1->v2 migration before signac's to ensure that
# this be possible.
vyasr marked this conversation as resolved.
Show resolved Hide resolved
# 3. Users update signac and create a new project, but still use an old
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not possible unless they break their environment.

# version of flow that has errors trying to access signac config
# information. This case should fail fast, and we'll just have to inform
# such users that they need to upgrade flow.
#
# Once a FlowProject has migrated to schema version 2, it is decoupled from
# signac's internal schema and should not have any further problems.
pass
58 changes: 54 additions & 4 deletions flow/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
import signac
from deprecation import deprecated
from jinja2 import TemplateNotFound as Jinja2TemplateNotFound
from packaging import version
from signac.contrib.filterparse import parse_filter_arg
from tqdm.auto import tqdm

from .aggregates import _aggregator, _get_aggregate_id
from .environment import get_environment
from .errors import (
ConfigKeyError,
IncompatibleSchemaVersion,
NoSchedulerError,
SubmitError,
TemplateError,
Expand All @@ -63,7 +65,7 @@
switch_to_directory,
)
from .util.translate import abbreviate, shorten
from .version import __version__
from .version import SCHEMA_VERSION, __version__

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -1424,9 +1426,20 @@ def hello(job):

"""

def __init__(self, config=None, environment=None, entrypoint=None):
def __init__(
self,
config=None,
environment=None,
entrypoint=None,
_ignore_flow_schema_version=False,
):
super().__init__(config=config)

# This internal constructor parameter exists solely for the purpose of
# allowing reloads of the project during migration.
if not _ignore_flow_schema_version:
self._check_flow_schema_compatibility()

# Associate this class with a compute environment.
self._environment = environment or get_environment()

Expand Down Expand Up @@ -1454,6 +1467,39 @@ def __init__(self, config=None, environment=None, entrypoint=None):
self._group_to_aggregate_store = _bidict()
self._register_groups()

def _check_flow_schema_compatibility(self):
"""Check whether this project's data schema is compatible with this version.

Raises
------
:class:`~flow.errors.IncompatibleSchemaVersion`
If the schema version is incompatible.

"""
schema_version = version.parse(SCHEMA_VERSION)
config_schema_version = version.parse(
flow_config.get_config_value("schema_version", config=self.config)
)
if config_schema_version > schema_version:
# Project config schema version is newer and therefore not supported.
raise IncompatibleSchemaVersion(
"The signac-flow schema version used by this project is '{}', but flow {} "
"only supports up to schema version '{}'. Try updating flow.".format(
Comment on lines +1629 to +1630
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this error message consistent with the one in flow/migration/__init__.py. The terms signac-flow and flow are interchanged (and possibly other differences).

config_schema_version, __version__, schema_version
)
)
elif config_schema_version < schema_version:
raise IncompatibleSchemaVersion(
"The flow schema version used by this project is '{}', but flow {} "
"requires schema version '{}'. Please use '$ flow migrate' to "
"irreversibly migrate this project's schema to the supported "
"version.".format(config_schema_version, __version__, schema_version)
Comment on lines +1636 to +1639
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"The flow schema version used by this project is '{}', but flow {} "
"requires schema version '{}'. Please use '$ flow migrate' to "
"irreversibly migrate this project's schema to the supported "
"version.".format(config_schema_version, __version__, schema_version)
"The signac-flow schema version used by this project is '{}', but signac-flow {} "
"requires schema version '{}'. Please use '$ flow migrate' to "
"irreversibly migrate this project's schema to a supported "
"version.".format(config_schema_version, __version__, schema_version)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that the package will support multiple schema versions concurrently? That is, should this be able to say "requires 3 <= schema version <= 5" or similar?

)
else: # identical and therefore compatible
logger.debug(
f"The project's schema version {config_schema_version} is supported."
)

def _setup_template_environment(self):
"""Set up the jinja2 template environment.

Expand Down Expand Up @@ -2497,7 +2543,9 @@ def print_status(
"eligible_jobs_max_lines"
)

status_parallelization = self.config["flow"]["status_parallelization"]
status_parallelization = flow_config.get_config_value(
"status_parallelization", config=self.config
)

# initialize jinja2 template environment and necessary filters
template_environment = self._template_environment()
Expand Down Expand Up @@ -4349,7 +4397,9 @@ def _main_status(self, args):
# Use small offset to account for overhead with few jobs
delta_t = (time.time() - start - 0.5) / max(length_jobs, 1)
config_key = "status_performance_warn_threshold"
warn_threshold = flow_config.get_config_value(config_key)
warn_threshold = flow_config.get_config_value(
config_key, config=self.config
)
if not args["profile"] and delta_t > warn_threshold >= 0:
print(
"WARNING: "
Expand Down
25 changes: 17 additions & 8 deletions flow/util/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@
# All rights reserved.
# This software is licensed under the BSD 3-Clause License.
"""Contains logic for working with flow related information stored in the signac config."""
from signac.common import config
import signac.common.config

from ..errors import ConfigKeyError

# Monkeypatch the signac config spec to include flow-specific fields.
config.cfg += """
signac.common.config.cfg += """
[flow]
import_packaged_environments = boolean()
status_performance_warn_threshold = float(default=0.2)
show_traceback = boolean()
eligible_jobs_max_lines = int(default=10)
eligible_jobs_max_lines = integer(default=10)
status_parallelization = string(default='none')
schema_version = string(default='1')
"""


Expand All @@ -24,7 +25,7 @@ class _GetConfigValueNoneType:
_GET_CONFIG_VALUE_NONE = _GetConfigValueNoneType()


def require_config_value(key, ns=None, default=_GET_CONFIG_VALUE_NONE):
def require_config_value(key, ns=None, default=_GET_CONFIG_VALUE_NONE, config=None):
"""Request a value from the user's configuration, failing if not available.

Parameters
Expand All @@ -36,6 +37,10 @@ def require_config_value(key, ns=None, default=_GET_CONFIG_VALUE_NONE):
default
A default value in case the key cannot be found
within the user's configuration.
config
The config to pull from. If ``None``, uses
:func:`~signac.common.config.load_config` to acquire one
(Default value = None).

Returns
-------
Expand All @@ -49,11 +54,13 @@ def require_config_value(key, ns=None, default=_GET_CONFIG_VALUE_NONE):
and no default value is provided.

"""
config = config or signac.common.config.load_config()

try:
if ns is None:
return config.load_config()["flow"][key]
return config["flow"][key]
else:
return config.load_config()["flow"][ns][key]
return config["flow"][ns][key]
except KeyError:
if default is _GET_CONFIG_VALUE_NONE:
k = str(key) if ns is None else f"{ns}.{key}"
Expand All @@ -62,7 +69,7 @@ def require_config_value(key, ns=None, default=_GET_CONFIG_VALUE_NONE):
return default


def get_config_value(key, ns=None, default=None):
def get_config_value(key, ns=None, default=None, config=None):
"""Request a value from the user's configuration.

Parameters
Expand All @@ -74,11 +81,13 @@ def get_config_value(key, ns=None, default=None):
default
A default value returned if the key cannot be found within the user
configuration.
config
The config to pull from (Default value = None).

Returns
-------
object
The value if found, None if not found.

"""
return require_config_value(key=key, ns=ns, default=default)
return require_config_value(key=key, ns=ns, default=default, config=config)
4 changes: 3 additions & 1 deletion flow/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@

__version__ = "0.13.0"

__all__ = ["__version__"]
SCHEMA_VERSION = "1"

__all__ = ["__version__", "SCHEMA_VERSION"]
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ jinja2>=2.10
cloudpickle>=1.1.1
deprecation>=2.0.0
tqdm>=4.48.1
filelock~=3.0
packaging>=15.0
2 changes: 2 additions & 0 deletions requirements/requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ coverage==5.5
pytest-cov==2.11.1
pytest==6.2.2
ruamel.yaml==0.17.2
filelock~=3.0
packaging>=15.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These requirements are in addition to the core requirements.txt. Only testing requirements should be listed here.

Loading