diff --git a/.github/workflows/commit.yml b/.github/workflows/commit.yml index 07e7b73bcc..b6f7e9af07 100644 --- a/.github/workflows/commit.yml +++ b/.github/workflows/commit.yml @@ -181,10 +181,13 @@ jobs: working-directory: autotest run: | pytest -v -m="not example" -n=auto --cov=flopy --cov-append --cov-report=xml --durations=0 --keep-failed=.failed --dist loadfile - coverage report env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + - name: Report coverage + working-directory: autotest + run: coverage report + - name: Upload failed test outputs uses: actions/upload-artifact@v4 if: failure() diff --git a/autotest/test_codegen.py b/autotest/test_codegen.py new file mode 100644 index 0000000000..0378b7cede --- /dev/null +++ b/autotest/test_codegen.py @@ -0,0 +1,73 @@ +import pytest + +from autotest.conftest import get_project_root_path +from flopy.mf6.utils.codegen.context import get_context_names +from flopy.mf6.utils.codegen.dfn import Dfn +from flopy.mf6.utils.codegen.make import ( + DfnName, + make_all, + make_context, + make_targets, +) + +PROJ_ROOT = get_project_root_path() +MF6_PATH = PROJ_ROOT / "flopy" / "mf6" +TGT_PATH = MF6_PATH / "modflow" +DFN_PATH = MF6_PATH / "data" / "dfn" +DFN_NAMES = [ + dfn.stem + for dfn in DFN_PATH.glob("*.dfn") + if dfn.stem not in ["common", "flopy"] +] + + +@pytest.mark.parametrize("dfn_name", DFN_NAMES) +def test_dfn_load(dfn_name): + dfn_path = DFN_PATH / f"{dfn_name}.dfn" + with open(dfn_path, "r") as f: + dfn = Dfn.load(f, name=DfnName(*dfn_name.split("-"))) + if dfn_name in ["sln-ems", "exg-gwfprt", "exg-gwfgwe", "exg-gwfgwt"]: + assert not any(dfn) + else: + assert any(dfn) + + +@pytest.mark.parametrize( + "dfn, n_vars, n_flat, n_meta", + [("gwf-ic", 2, 2, 0), ("prt-prp", 18, 40, 1)], +) +def test_make_context(dfn, n_vars, n_flat, n_meta): + with open(DFN_PATH / "common.dfn") as f: + commonvars = Dfn.load(f) + + with open(DFN_PATH / f"{dfn}.dfn") as f: + dfn = DfnName(*dfn.split("-")) + definition = Dfn.load(f, name=dfn) + + context_names = get_context_names(dfn) + context_name = context_names[0] + context = make_context(context_name, definition, commonvars) + assert len(context_names) == 1 + assert len(context.variables) == n_vars + assert len(context.definition) == n_flat + assert len(context.definition.metadata) == n_meta + + +@pytest.mark.parametrize("dfn_name", DFN_NAMES) +def test_make_targets(dfn_name, function_tmpdir): + with open(DFN_PATH / "common.dfn") as f: + common = Dfn.load(f) + + with open(DFN_PATH / f"{dfn_name}.dfn", "r") as f: + dfn_name = DfnName(*dfn_name.split("-")) + dfn = Dfn.load(f, name=dfn_name) + + make_targets(dfn, function_tmpdir, commonvars=common) + for ctx_name in get_context_names(dfn_name): + source_path = function_tmpdir / ctx_name.target + assert source_path.is_file() + + +def test_make_all(function_tmpdir): + make_all(DFN_PATH, function_tmpdir, verbose=True) + assert any(function_tmpdir.glob("*.py")) diff --git a/autotest/test_generate_classes.py b/autotest/test_generate_classes.py deleted file mode 100644 index db812aa40b..0000000000 --- a/autotest/test_generate_classes.py +++ /dev/null @@ -1,158 +0,0 @@ -import sys -from os import environ -from pathlib import Path -from pprint import pprint -from typing import Iterable -from warnings import warn - -import pytest -from modflow_devtools.misc import get_current_branch, run_cmd -from virtualenv import cli_run - -branch = get_current_branch() - - -def nonempty(itr: Iterable): - for x in itr: - if x: - yield x - - -def pytest_generate_tests(metafunc): - """ - Test mf6 module code generation on a small, hopefully - fairly representative set of MODFLOW 6 input & output - specification versions, including the develop branch, - the latest official release, and a few older releases - and commits. - - TODO: May make sense to run the full battery of tests - against all of the versions of mf6io flopy guarantees - support for- maybe develop and latest release? Though - some backwards compatibility seems ideal if possible. - This would need changes in GH Actions CI test matrix. - """ - - owner = "MODFLOW-USGS" - repo = "modflow6" - ref = [ - f"{owner}/{repo}/develop", - f"{owner}/{repo}/master", - f"{owner}/{repo}/6.4.1", - f"{owner}/{repo}/4458f9f", - f"{owner}/{repo}/4458f9f7a6244182e6acc2430a6996f9ca2df367", - ] - - # refs provided as env vars override the defaults - ref_env = environ.get("TEST_GENERATE_CLASSES_REF") - if ref_env: - ref = nonempty(ref_env.strip().split(",")) - - # refs given as CLI options override everything - ref_opt = metafunc.config.getoption("--ref") - if ref_opt: - ref = nonempty([o.strip() for o in ref_opt]) - - # drop duplicates - ref = list(dict.fromkeys(ref)) - - # drop and warn refs with invalid format - # i.e. not "owner/repo/branch" - for r in ref: - spl = r.split("/") - if len(spl) != 3 or not all(spl): - warn(f"Skipping invalid ref: {r}") - ref.remove(r) - - key = "ref" - if key in metafunc.fixturenames: - metafunc.parametrize(key, ref, scope="session") - - -@pytest.mark.generation -@pytest.mark.mf6 -@pytest.mark.slow -def test_generate_classes_from_github_refs( - request, project_root_path, ref, worker_id, function_tmpdir -): - # skip if run in parallel with pytest-xdist without --dist loadfile - argv = ( - request.config.workerinput["mainargv"] - if hasattr(request.config, "workerinput") - else [] - ) - if worker_id != "master" and "loadfile" not in argv: - pytest.skip("can't run in parallel") - - # create virtual environment - venv = function_tmpdir / "venv" - python = venv / "bin" / "python" - pip = venv / "bin" / "pip" - cli_run([str(venv)]) - print(f"Using temp venv at {venv} to test class generation from {ref}") - - # install flopy and dependencies - deps = [str(project_root_path), "modflow-devtools"] - for dep in deps: - out, err, ret = run_cmd(str(pip), "install", dep, verbose=True) - assert not ret, out + err - - # get creation time of files - flopy_path = ( - venv - / "lib" - / f"python{sys.version_info.major}.{sys.version_info.minor}" - / "site-packages" - / "flopy" - ) - assert flopy_path.is_dir() - mod_files = list((flopy_path / "mf6" / "modflow").rglob("*")) + list( - (flopy_path / "mf6" / "data" / "dfn").rglob("*") - ) - mod_file_times = [Path(mod_file).stat().st_mtime for mod_file in mod_files] - pprint(mod_files) - - # split ref into owner, repo, ref name - spl = ref.split("/") - owner = spl[0] - repo = spl[1] - ref = spl[2] - - # generate classes - out, err, ret = run_cmd( - str(python), - "-m", - "flopy.mf6.utils.generate_classes", - "--owner", - owner, - "--repo", - repo, - "--ref", - ref, - "--no-backup", - verbose=True, - ) - assert not ret, out + err - - def get_mtime(f): - try: - return Path(f).stat().st_mtime - except: - return 0 # if file not found - - # make sure files were regenerated - modified_files = [ - mod_files[i] - for i, (before, after) in enumerate( - zip( - mod_file_times, - [get_mtime(f) for f in mod_files], - ) - ) - if after > 0 and after > before - ] - assert any(modified_files) - print(f"{len(modified_files)} files were modified:") - pprint(modified_files) - - # todo checkout mf6 and test with dfnpath? test with backups? diff --git a/docs/mf6_dev_guide.md b/docs/mf6_dev_guide.md index 61c364d3de..8f95ee9bbb 100644 --- a/docs/mf6_dev_guide.md +++ b/docs/mf6_dev_guide.md @@ -10,12 +10,17 @@ FPMF6 uses meta-data files located in flopy/mf6/data/dfn to define the model and All meta-data can be accessed from the flopy.mf6.data.mfstructure.MFStructure class. This is a singleton class, meaning only one instance of this class can be created. The class contains a sim_struct attribute (which is a flopy.mf6.data.mfstructure.MFSimulationStructure object) which contains all of the meta-data for all package files. Meta-data is stored in a structured format. MFSimulationStructure contains MFModelStructure and MFInputFileStructure objects, which contain the meta-data for each model type and each "simulation-level" package (tdis, ims, ...). MFModelStructure contains model specific meta-data and a MFInputFileStructure object for each package in that model. MFInputFileStructure contains package specific meta-data and a MFBlockStructure object for each block contained in the package file. MFBlockStructure contains block specific meta-data and a MFDataStructure object for each data structure defined in the block, and MFDataStructure contains data structure specific meta-data and a MFDataItemStructure object for each data item contained in the data structure. Data structures define the structure of data that is naturally grouped together, for example, the data in a numpy recarray. Data item structures define the structure of specific pieces of data, for example, a single column of a numpy recarray. The meta-data defined in these classes provides all the information FloPy needs to read and write MODFLOW 6 package and name files, create the Flopy interface, and check the data for various constraints. - -*** -MFStructure --+ MFSimulationStructure --+ MFModelStructure --+ MFInputFileStructure --+ MFBlockStructure --+ MFDataStructure --+ MFDataItemStructure - -Figure 1: FPMF6 generic data structure classes. Lines connecting classes show a relationship defined between the two connected classes. A "*" next to the class means that the class is a sub-class of the connected class. A "+" next to the class means that the class is contained within the connected class. -*** +```mermaid +classDiagram + MFStructure --* "1" MFSimulationStructure : has + MFSimulationStructure --* "1+" MFModelStructure : has + MFModelStructure --* "1" MFInputFileStructure : has + MFInputFileStructure --* "1+" MFBlockStructure : has + MFBlockStructure --* "1+" MFDataStructure : has + MFDataStructure --* "1+" MFDataItemStructure : has +``` + +Figure 1: Generic data structure hierarchy. Connections show composition relationships. Package and Data Base Classes ----------------------------------------------- @@ -23,25 +28,26 @@ Package and Data Base Classes The package and data classes are related as shown below in figure 2. On the top of the figure 2 is the MFPackage class, which is the base class for all packages. MFPackage contains generic methods for building data objects and reading and writing the package to a file. MFPackage contains a MFInputFileStructure object that defines how the data is structured in the package file. MFPackage also contains a dictionary of blocks (MFBlock). The MFBlock class is a generic class used to represent a block within a package. MFBlock contains a MFBlockStructure object that defines how the data in the block is structured. MFBlock also contains a dictionary of data objects (subclasses of MFData) contained in the block and a list of block headers (MFBlockHeader) for that block. Block headers contain the block's name and optionally data items (eg. iprn). -*** -MFPackage --+ MFBlock --+ MFData - -MFPackage --+ MFInputFileStructure - -MFBlock --+ MFBlockStructure - -MFData --+ MFDataStructure - -MFData --* MFArray --* MFTransientArray - -MFData --* MFList --* MFTransientList - -MFData --* MFScalar --* MFTransientScalar - -MFTransientData --* MFTransientArray, MFTransientList, MFTransientScalar +```mermaid +classDiagram + +MFPackage --* "1+" MFBlock : has +MFBlock --* "1+" MFData : has +MFPackage --* "1" MFInputFileStructure : has +MFBlock --* "1" MFBlockStructure : has +MFData --* "1" MFDataStructure : has +MFData --|> MFArray +MFArray --|> MFTransientArray +MFData --|> MFList +MFList --|> MFTransientList +MFData --|> MFScalar +MFScalar --|> MFTransientScalar +MFTransientData --|> MFTransientArray +MFTransientData --|> MFTransientList +MFTransientData --|> MFTransientScalar +``` Figure 2: FPMF6 package and data classes. Lines connecting classes show a relationship defined between the two connected classes. A "*" next to the class means that the class is a sub-class of the connected class. A "+" next to the class means that the class is contained within the connected class. -*** There are three main types of data, MFList, MFArray, and MFScalar data. All three of these data types are derived from the MFData abstract base class. MFList data is the type of data stored in a spreadsheet with different column headings. For example, the data describing a flow barrier are of type MFList. MFList data is stored in numpy recarrays. MFArray data is data of a single type (eg. all integer values). For example, the model's HK values are of type MFArray. MFArrays are stored in numpy ndarrays. MFScalar data is a single data item. Most MFScalar data are options. All MFData subclasses contain an MFDataStructure object that defines the expected structure and types of the data. diff --git a/flopy/mf6/data/dfn/utl-tas.dfn b/flopy/mf6/data/dfn/utl-tas.dfn index 6316beba5c..81c6fd25bc 100644 --- a/flopy/mf6/data/dfn/utl-tas.dfn +++ b/flopy/mf6/data/dfn/utl-tas.dfn @@ -19,6 +19,7 @@ type keyword shape reader urword optional false +in_record true longname description xxx @@ -29,6 +30,7 @@ shape any1d tagged false reader urword optional false +in_record true longname description Name by which a package references a particular time-array series. The name must be unique among all time-array series used in a package. @@ -48,6 +50,7 @@ type keyword shape reader urword optional false +in_record true longname description xxx @@ -59,6 +62,7 @@ shape tagged false reader urword optional false +in_record true longname description Interpolation method, which is either STEPWISE or LINEAR. @@ -78,6 +82,7 @@ type keyword shape reader urword optional false +in_record true longname description xxx @@ -88,6 +93,7 @@ shape time_series_name tagged false reader urword optional false +in_record true longname description Scale factor, which will multiply all array values in time series. SFAC is an optional attribute; if omitted, SFAC = 1.0. diff --git a/flopy/mf6/data/dfn/utl-ts.dfn b/flopy/mf6/data/dfn/utl-ts.dfn index a7165ea382..cb641256f2 100644 --- a/flopy/mf6/data/dfn/utl-ts.dfn +++ b/flopy/mf6/data/dfn/utl-ts.dfn @@ -20,6 +20,7 @@ type keyword shape reader urword optional false +in_record true longname description xxx @@ -30,6 +31,7 @@ shape any1d tagged false reader urword optional false +in_record true longname description Name by which a package references a particular time-array series. The name must be unique among all time-array series used in a package. @@ -49,6 +51,7 @@ type keyword shape reader urword optional false +in_record true longname description xxx @@ -59,6 +62,7 @@ valid stepwise linear linearend shape time_series_names tagged false reader urword +in_record true optional false longname description Interpolation method, which is either STEPWISE or LINEAR. @@ -108,6 +112,7 @@ name sfacs type keyword shape reader urword +in_record true optional false longname description xxx @@ -119,6 +124,7 @@ shape 1 and package_dim.model_dim[0].model_name is not None and package_dim.model_dim[0].model_name.lower() diff --git a/flopy/mf6/utils/codegen/context.py b/flopy/mf6/utils/codegen/context.py new file mode 100644 index 0000000000..bbd95b457a --- /dev/null +++ b/flopy/mf6/utils/codegen/context.py @@ -0,0 +1,542 @@ +from ast import literal_eval +from dataclasses import dataclass +from keyword import kwlist +from os import PathLike +from typing import ( + Any, + Dict, + Iterator, + List, + NamedTuple, + Optional, + Union, +) + +from flopy.mf6.utils.codegen.dfn import Dfn, DfnName +from flopy.mf6.utils.codegen.ref import Ref, Refs +from flopy.mf6.utils.codegen.render import renderable +from flopy.mf6.utils.codegen.shim import SHIM +from flopy.mf6.utils.codegen.var import Var, VarKind, Vars + +_SCALAR_TYPES = { + "keyword", + "integer", + "double precision", + "string", +} + + +class ContextName(NamedTuple): + """ + Uniquely identifies an input context by its name, which + consists of a <= 3-letter left term and optional right + term also of <= 3 letters. + + Notes + ----- + A single `DefinitionName` may be associated with one or + more `ContextName`s. For instance, a model DFN file will + produce both a NAM package class and also a model class. + + From the `ContextName` several other things are derived, + including: + + - the input context class' name + - a description of the context class + - the name of the source file to write + - the base class the context inherits from + + """ + + l: str + r: Optional[str] + + @property + def title(self) -> str: + """ + The input context's unique title. This is not + identical to `f"{l}{r}` in some cases, but it + remains unique. The title is substituted into + the file name and class name. + """ + + l, r = self + if self == ("sim", "nam"): + return "simulation" + if l is None: + return r + if r is None: + return l + if l == "sim": + return r + if l in ["sln", "exg"]: + return r + return f"{l}{r}" + + @property + def base(self) -> str: + """Base class from which the input context should inherit.""" + _, r = self + if self == ("sim", "nam"): + return "MFSimulationBase" + if r is None: + return "MFModel" + return "MFPackage" + + @property + def target(self) -> str: + """The source file name to generate.""" + return f"mf{self.title}.py" + + @property + def description(self) -> str: + """A description of the input context.""" + l, r = self + title = self.title.title() + if self.base == "MFPackage": + return f"Modflow{title} defines a {r.upper()} package." + elif self.base == "MFModel": + return f"Modflow{title} defines a {l.upper()} model." + elif self.base == "MFSimulationBase": + return """ + MFSimulation is used to load, build, and/or save a MODFLOW 6 simulation. + A MFSimulation object must be created before creating any of the MODFLOW 6 + model objects.""" + + +def get_context_names(dfn_name: DfnName) -> List[ContextName]: + """ + Returns a list of contexts this definition produces. + + Notes + ----- + An input definition may produce one or more input contexts. + + Model definition files produce both a model class context and + a model namefile package context. The same goes for simulation + definition files. All other definition files produce a single + context. + """ + if dfn_name.r == "nam": + if dfn_name.l == "sim": + return [ + ContextName(None, dfn_name.r), # nam pkg + ContextName(*dfn_name), # simulation + ] + else: + return [ + ContextName(*dfn_name), # nam pkg + ContextName(dfn_name.l, None), # model + ] + elif (dfn_name.l, dfn_name.r) in [ + ("gwf", "mvr"), + ("gwf", "gnc"), + ("gwt", "mvt"), + ]: + return [ContextName(*dfn_name), ContextName(None, dfn_name.r)] + return [ContextName(*dfn_name)] + + +@renderable( + # shim for implementation details in the + # generated context classes which aren't + # really concerns of the core framework, + # and may eventually go away + **SHIM +) +@dataclass +class Context: + """ + An input context. Each of these is specified by a definition file + and becomes a generated class. A definition file may specify more + than one input context (e.g. model DFNs yield a model class and a + package class). + + Notes + ----- + A context class minimally consists of a name, a definition, and a + map of variables. The definition and variables are redundant (the + latter are generated from the former) but for now, the definition + is needed. When generated classes no longer reproduce definitions + verbatim, it can be removed. + + The context class may inherit from a base class, and may specify + a parent context within which it can be created (the parent then + becomes the first `__init__` method parameter). + + The context class may reference other contexts via foreign key + relations held by its variables, and may itself be referenced + by other contexts if desired. + + """ + + name: ContextName + definition: Dfn + variables: Vars + base: Optional[type] = None + parent: Optional[str] = None + description: Optional[str] = None + reference: Optional[Ref] = None + references: Optional[Refs] = None + + +def make_context( + name: ContextName, + definition: Dfn, + commonvars: Optional[Dfn] = None, + references: Optional[Refs] = None, +) -> Context: + """ + Extract from an input definition a context descriptor: + a structured representation of an input context class. + + Each input definition yields one or more input contexts. + The `name` parameter selects which context to make. + + A map of common variables may be provided, which can be + referenced in the given context's variable descriptions. + + A map of other definitions may be provided, in which case a + parameter in this context may act as kind of "foreign key", + identifying another context as a subpackage which this one + is related to. + + Notes + ----- + This function does most of the work in the whole module. + A bit of a beast, but convenient to use the outer scope + (including the input definition, etc) in inner functions + without sending around a lot of parameters. And it's not + complicated; we just map a variable specification from a + definition file to a corresponding Python representation. + """ + + _definition = dict(definition) + _commonvars = dict(commonvars or dict()) + _references = dict(references or dict()) + + # is this context referenceable? + reference = Ref.from_dfn(definition) + + # contexts referenced by this one + referenced = dict() + + def _parent() -> Optional[str]: + """ + Get a string parameter name for the context's parent(s), + i.e. context(s) which can own an instance of this one. + + If this context is a subpackage with multiple possible + parent types "x" and "y, this will be of form "x_or_y". + + """ + l, r = definition.name + if (l, r) == ("sim", "nam") and name == ("sim", "nam"): + return None + if l in ["sim", "exg", "sln"] or name.r is None: + return "simulation" + if reference: + if len(reference.parents) > 1: + return "_or_".join(reference.parents) + return reference.parents[0] + return "model" + + def _convert(var: Dict[str, Any], wrap: bool = False) -> Var: + """ + Transform a variable from its original representation in + an input definition to a Python specification appropriate + for generating an input context class. + + Notes + ----- + This involves expanding nested type hierarchies, mapping + types to roughly equivalent Python primitives/composites, + and other shaping. + + The rules for optional variable defaults are as follows: + If a `default_value` is not provided, keywords are `False` + by default, everything else is `None`. + + If `wrap` is true, scalars will be wrapped as records. + This is useful to distinguish among choices in unions. + + Any filepath variable whose name functions as a foreign key + for another context will be given a pointer to the context. + + """ + + _name = var["name"] + _type = var.get("type", None) + block = var.get("block", None) + shape = var.get("shape", None) + shape = None if shape == "" else shape + default = var.get("default", None) + descr = var.get("description", "") + + # if the var is a foreign key, register the referenced context + ref = _references.get(_name, None) + if ref: + referenced[_name] = ref + + def _description(descr: str) -> str: + """ + Make substitutions from common variable definitions, + remove backslashes, TODO: generate/insert citations. + """ + descr = descr.replace("\\", "") + _, replace, tail = descr.strip().partition("REPLACE") + if replace: + key, _, subs = tail.strip().partition(" ") + subs = literal_eval(subs) + cmn_var = _commonvars.get(key, None) + if cmn_var is None: + raise ValueError(f"Common variable not found: {key}") + descr = cmn_var.get("description", "") + if any(subs): + return descr.replace("\\", "").replace( + "{#1}", subs["{#1}"] + ) + return descr + return descr + + def _default(value: str) -> Any: + """ + Try to parse a default value as a literal. + """ + if _type != "string": + try: + return literal_eval(value) + except: + return value + + def _fields(record_name: str) -> Vars: + """Recursively load/convert a record's fields.""" + record = _definition[record_name] + field_names = record["type"].split()[1:] + fields: Dict[str, Var] = { + n: _convert(field, wrap=False) + for n, field in _definition.items() + if n in field_names + } + field_names = list(fields.keys()) + + # if the record represents a file... + if "file" in record_name: + # remove filein/fileout + for term in ["filein", "fileout"]: + if term in field_names: + fields.pop(term) + + # remove leading keyword + keyword = next(iter(fields), None) + if keyword: + fields.pop(keyword) + + # set the type + n = list(fields.keys())[0] + path_field = fields[n] + path_field._type = Union[str, PathLike] + fields[n] = path_field + + # if tagged, remove the leading keyword + elif record.get("tagged", False): + keyword = next(iter(fields), None) + if keyword: + fields.pop(keyword) + + return fields + + def _var() -> Var: + """ + Create the variable. + + Notes + ----- + Goes through all the possible input kinds + from top (composites) to bottom (scalars): + + - list + - union + - record + - array + - scalar + + Creates and returs a variable of the proper + kind. This may be a composite variable, in + which case nested variables are recursively + created as needed to produce the composite. + """ + + children = dict() + + # list input, child is the item type + if _type.startswith("recarray"): + # make sure columns are defined + names = _type.split()[1:] + n_names = len(names) + if n_names < 1: + raise ValueError(f"Missing recarray definition: {_type}") + + # list input can have records or unions as rows. + # lists which have a consistent record type are + # regular, inconsistent record types irregular. + + # regular tabular/columnar data (1 record type) can be + # defined with a nested record (i.e. explicit) or with + # fields directly inside the recarray (implicit). list + # data for unions/keystrings necessarily comes nested. + + is_explicit_record = len(names) == 1 and _definition[names[0]][ + "type" + ].startswith("record") + + def _is_implicit_scalar_record(): + # if the record is defined implicitly and it has + # only scalar fields + types = [ + v["type"] for n, v in _definition.items() if n in names + ] + return all(t in _SCALAR_TYPES for t in types) + + if is_explicit_record: + record_name = names[0] + record_spec = _definition[record_name] + record = _convert(record_spec, wrap=False) + children = {record_name: record} + kind = VarKind.List + elif _is_implicit_scalar_record(): + record_name = _name + fields = _fields(record_name) + record = Var( + name=record_name, + _type=_type.split()[0], + kind=VarKind.Record, + block=block, + children=fields, + description=descr, + ) + children = {record_name: record} + kind = VarKind.List + else: + # implicit complex record (i.e. some fields are records or unions) + fields = { + n: _convert(_definition[n], wrap=False) for n in names + } + first = list(fields.values())[0] + single = len(fields) == 1 + record_name = first.name if single else _name + record = Var( + name=record_name, + _type=_type.split()[0], + kind=VarKind.Record, + block=block, + children=first.children if single else fields, + description=descr, + ) + children = {record_name: record} + kind = VarKind.List + + # union (product), children are record choices + elif _type.startswith("keystring"): + names = _type.split()[1:] + children = { + n: _convert(_definition[n], wrap=True) for n in names + } + kind = VarKind.Union + + # record (sum), children are fields + elif _type.startswith("record"): + children = _fields(_name) + kind = VarKind.Record + + # are we wrapping a var into a record + # as a choice in a union? + elif wrap: + field_name = _name + field = _convert(var, wrap=False) + children = {field_name: field} + kind = VarKind.Record + + # at this point, if it has a shape, it's an array + elif shape is not None: + if _type not in _SCALAR_TYPES: + raise TypeError(f"Unsupported array type: {_type}") + elif _type == "string": + kind = VarKind.List + else: + kind = VarKind.Array + + # finally scalars + else: + kind = VarKind.Scalar + + # create var + return Var( + # if name is a reserved keyword, add a trailing underscore to it. + # convert dashes to underscores since it may become a class attr. + name=(f"{_name}_" if _name in kwlist else _name).replace( + "-", "_" + ), + _type=_type, + kind=kind, + block=block, + description=_description(descr), + default=_default(default), + children=children, + reference=ref, + ) + + return _var() + + def _variables() -> Vars: + """ + Return all input variables for an input context class. + + Notes + ----- + Not all variables become parameters; nested variables + will become components of composite parameters, e.g., + record fields, keystring (union) choices, list items. + + Variables may be added, depending on the context type. + """ + + vars_ = _definition.copy() + vars_ = { + name: _convert(var, wrap=False) + for name, var in vars_.items() + # skip composites, we already inflated + # their parents in the var hierarchy + if not var.get("in_record", False) + } + + # reset var name since we may have altered + # it when creating the variable e.g. to + # avoid a reserved keyword collision + return {v.name: v for v in vars_.values()} + + return Context( + name=name, + definition=definition, + variables=_variables(), + base=name.base, + parent=_parent(), + description=name.description, + reference=reference, + references=referenced, + ) + + +def make_contexts( + definition: Dfn, + commonvars: Optional[Dfn] = None, + references: Optional[Refs] = None, +) -> Iterator[Context]: + """Generate input contexts from the given input definition.""" + for name in get_context_names(definition.name): + yield make_context( + name=name, + definition=definition, + commonvars=commonvars, + references=references, + ) diff --git a/flopy/mf6/utils/codegen/dfn.py b/flopy/mf6/utils/codegen/dfn.py new file mode 100644 index 0000000000..6391a1f27c --- /dev/null +++ b/flopy/mf6/utils/codegen/dfn.py @@ -0,0 +1,111 @@ +from collections import UserList +from dataclasses import dataclass +from typing import Any, Dict, Iterable, List, NamedTuple, Optional, Tuple + + +class DfnName(NamedTuple): + """ + Uniquely identifies an input definition by its name, which + consists of a <= 3-letter left term and an optional right + term, also <= 3 letters. + """ + + l: str + r: str + + +Metadata = List[str] + + +class Dfn(UserList): + """ + An MF6 input definition. + + Notes + ----- + This class is a list rather than a dictionary to + accommodate duplicate variable names. Dictionary + would be nicer; this constraint goes away if the + DFN specifications become nested instead of flat. + + With conversion to a standard format we get this + for free, and we could then drop the custom load. + """ + + name: Optional[DfnName] + metadata: Optional[Metadata] + + def __init__( + self, + variables: Optional[Iterable[Tuple[str, Dict[str, Any]]]] = None, + name: Optional[DfnName] = None, + metadata: Optional[Metadata] = None, + ): + super().__init__(variables) + self.name = name + self.metadata = metadata or [] + + @classmethod + def load(cls, f, name: Optional[DfnName] = None) -> "Dfn": + """ + Load an input definition from a definition file. + """ + + meta = None + vars_ = list() + var = dict() + + for line in f: + # remove whitespace/etc from the line + line = line.strip() + + # record context name and flopy metadata + # attributes, skip all other comment lines + if line.startswith("#"): + _, sep, tail = line.partition("flopy") + if sep == "flopy": + if meta is None: + meta = list() + tail = tail.strip() + if "solution_package" in tail: + tail = tail.split() + tail.pop(1) + meta.append(tail) + continue + _, sep, tail = line.partition("package-type") + if sep == "package-type": + if meta is None: + meta = list + meta.append(f"{sep} {tail.strip()}") + continue + _, sep, tail = line.partition("solution_package") + continue + + # if we hit a newline and the parameter dict + # is nonempty, we've reached the end of its + # block of attributes + if not any(line): + if any(var): + n = var["name"] + vars_.append((n, var)) + var = dict() + continue + + # split the attribute's key and value and + # store it in the parameter dictionary + key, _, value = line.partition(" ") + if key == "default_value": + key = "default" + if value in ["true", "false"]: + value = value == "true" + var[key] = value + + # add the final parameter + if any(var): + n = var["name"] + vars_.append((n, var)) + + return cls(variables=vars_, name=name, metadata=meta) + + +Dfns = Dict[str, Dfn] diff --git a/flopy/mf6/utils/codegen/make.py b/flopy/mf6/utils/codegen/make.py new file mode 100644 index 0000000000..298f68fbe4 --- /dev/null +++ b/flopy/mf6/utils/codegen/make.py @@ -0,0 +1,95 @@ +from pathlib import Path +from typing import ( + Optional, +) +from warnings import warn + +from jinja2 import Environment, FileSystemLoader + +# noqa: F401 +from flopy.mf6.utils.codegen.context import ( + get_context_names, + make_context, + make_contexts, +) +from flopy.mf6.utils.codegen.dfn import Dfn, DfnName, Dfns +from flopy.mf6.utils.codegen.ref import Ref, Refs + +_TEMPLATE_LOADER = FileSystemLoader(Path(__file__).parent / "templates") +_TEMPLATE_ENV = Environment(loader=_TEMPLATE_LOADER) +_TEMPLATE_NAME = "context.py.jinja" +_TEMPLATE = _TEMPLATE_ENV.get_template(_TEMPLATE_NAME) + + +def make_targets( + definition: Dfn, + outdir: Path, + commonvars: Optional[Dfn] = None, + references: Optional[Refs] = None, + verbose: bool = False, +): + """Generate Python source file(s) from the given input definition.""" + + for context in make_contexts( + definition=definition, commonvars=commonvars, references=references + ): + target = outdir / context.name.target + with open(target, "w") as f: + source = _TEMPLATE.render(**context.render()) + f.write(source) + if verbose: + print(f"Wrote {target}") + + +def make_all(dfndir: Path, outdir: Path, verbose: bool = False): + """Generate Python source files from the DFN files in the given location.""" + + # find definition files + paths = [ + p for p in dfndir.glob("*.dfn") if p.stem not in ["common", "flopy"] + ] + + # try to load common variables + common_path = dfndir / "common.dfn" + if not common_path.is_file: + warn("No common input definition file...") + common = None + else: + with open(common_path, "r") as f: + common = Dfn.load(f) + + # load all the input definitions before we generate input + # contexts so we can create foreign key refs between them. + dfns: Dfns = {} + refs: Refs = {} + for p in paths: + name = DfnName(*p.stem.split("-")) + with open(p) as f: + dfn = Dfn.load(f, name=name) + dfns[name] = dfn + ref = Ref.from_dfn(dfn) + if ref: + # key is the name of the file record + # that's the reference's foreign key + refs[ref.key] = ref + + # generate target files + for dfn in dfns.values(): + with open(p) as f: + make_targets( + definition=dfn, + outdir=outdir, + references=refs, + commonvars=common, + verbose=verbose, + ) + + # generate __init__.py file + init_path = outdir / "__init__.py" + with open(init_path, "w") as f: + for dfn in dfns.values(): + for ctx in get_context_names(dfn.name): + prefix = "MF" if ctx.base == "MFSimulationBase" else "Modflow" + f.write( + f"from .mf{ctx.title} import {prefix}{ctx.title.title()}\n" + ) diff --git a/flopy/mf6/utils/codegen/ref.py b/flopy/mf6/utils/codegen/ref.py new file mode 100644 index 0000000000..97a79f75e0 --- /dev/null +++ b/flopy/mf6/utils/codegen/ref.py @@ -0,0 +1,97 @@ +from dataclasses import dataclass +from typing import Dict, List, Optional +from warnings import warn + +from flopy.mf6.utils.codegen.dfn import Dfn + + +@dataclass +class Ref: + """ + A foreign-key-like reference between a file input variable + and another input definition. This allows an input context + to refer to another input context, by including a filepath + variable whose name acts as a foreign key for a different + input context. Extra parameters are added to the referring + context's `__init__` method so a selected "value" variable + defined in the referenced context can be provided directly + instead of the file path (foreign key) variable. + + Parameters + ---------- + key : str + The name of the foreign key file input variable. + val : str + The name of the selected variable in the referenced context. + abbr : str + An abbreviation of the referenced context's name. + param : str + The subpackage parameter name. TODO: explain + parents : List[Union[str, type]] + The subpackage's supported parent types. + """ + + key: str + val: str + abbr: str + param: str + parents: List[str] + description: Optional[str] + + @classmethod + def from_dfn(cls, dfn: Dfn) -> Optional["Ref"]: + if not dfn.metadata: + return None + + lines = { + "subpkg": next( + iter( + m + for m in dfn.metadata + if isinstance(m, str) and m.startswith("subpac") + ), + None, + ), + "parent": next( + iter( + m + for m in dfn.metadata + if isinstance(m, str) and m.startswith("parent") + ), + None, + ), + } + + def _subpkg(): + line = lines["subpkg"] + _, key, abbr, param, val = line.split() + matches = [v for _, v in dfn if v["name"] == val] + if not any(matches): + descr = None + else: + if len(matches) > 1: + warn(f"Multiple matches for referenced variable {val}") + match = matches[0] + descr = match.get("description", None) + + return { + "key": key, + "val": val, + "abbr": abbr, + "param": param, + "description": descr, + } + + def _parents(): + line = lines["parent"] + _, _, _type = line.split() + return [t.lower().replace("mf", "") for t in _type.split("/")] + + return ( + cls(**_subpkg(), parents=_parents()) + if all(v for v in lines.values()) + else None + ) + + +Refs = Dict[str, Ref] diff --git a/flopy/mf6/utils/codegen/render.py b/flopy/mf6/utils/codegen/render.py new file mode 100644 index 0000000000..e61d839b5f --- /dev/null +++ b/flopy/mf6/utils/codegen/render.py @@ -0,0 +1,166 @@ +from dataclasses import asdict +from enum import Enum +from typing import Any, Callable, Dict, Iterable, Optional, Tuple, Union + +Predicate = Callable[[Any], bool] +Transform = Callable[[Any], Dict[str, str]] +Pair = Tuple[str, Any] +Pairs = Iterable[Pair] + + +def _try_get_enum_value(v: Any) -> Any: + return v.value if isinstance(v, Enum) else v + + +def renderable( + maybe_cls=None, + *, + keep_none: Optional[Iterable[str]] = None, + quote_str: Optional[Iterable[str]] = None, + set_pairs: Optional[Iterable[Tuple[Predicate, Pairs]]] = None, + transform: Optional[Iterable[Tuple[Predicate, Transform]]] = None, +): + """ + Decorator for dataclasses which are meant + to be passed into a Jinja template. The + decorator adds a `.render()` method to + the decorated class, which recursively + converts the instance to a dictionary + with (by default) the `asdict()` builtin + `dataclasses` module function, plus a + few modifications to make the instance + easier to work with from the template. + + By default, attributes with value `None` + are dropped before conversion to a `dict`. + To specify that a given attribute should + remain even with a `None` value, use the + `keep_none` parameter. + + When a string value is to become the RHS + of an assignment or an argument-passing + expression, it needs to be wrapped with + quotation marks before insertion into + the template. To indicate an attribute's + value should be wrapped with quotation + marks, use the `quote_str` parameter. + + Arbitrary transformations of the instance + to which the decorator is applied can be + specified with the `transform` parameter, + which accepts a set of predicate/function + pairs; see below for more information on + how to use the transformation mechanism. + + Notes + ----- + This decorator is intended as a convenient + way to modify dataclass instances to make + them more palatable for templates. It also + aims to keep keep edge cases incidental to + the current design of MF6 input framework + cleanly isolated from the reimplementation + of which this code is a part. + + The basic idea behind this decorator is for + the developer to specify conditions in which + a given dataclass instance should be altered, + and a function to make the alteration. These + are provided as a collection of `Predicate`/ + `Transform` pairs. + + Transformations might be for convenience, or + to handle special cases where an object has + some other need for modification. + + Edge cases in the MF6 classes, e.g. the logic + determining the members of generated classes, + can be isolated as rendering transformations. + This allows keeping more general templating + infrastructure free of incidental complexity + while we move toward a leaner core framework. + + Jinja supports attribute- and dictionary- + based access on arbitrary objects but does + not support arbitrary expressions, and has + only a limited set of custom filters; this + can make it awkward to express some things, + which transformations can also remedy. + + Because a transformation function accepts an + instance of a dataclass and converts it to a + dictionary, only one transformation function + (the first predicate to match) is applied. + """ + + quote_str = quote_str or list() + keep_none = keep_none or list() + set_pairs = set_pairs or list() + transform = transform or list() + + def __renderable(cls): + def _render(d: dict) -> dict: + """ + Render the dictionary recursively, + with requested value modifications. + """ + + def _render_val(k, v): + v = _try_get_enum_value(v) + if ( + k in quote_str + and isinstance(v, str) + and v[0] not in ["'", '"'] + ): + v = f"'{v}'" + elif isinstance(v, dict): + v = _render(v) + return v + + return { + k: _render_val(k, v) + for k, v in d.items() + # drop nones except where requested to keep them + if (k in keep_none or v is not None) + } + + def _dict(o): + """ + Convert the dataclass instance to a dictionary, + applying a transformation if applicable and any + extra key/value pairs if provided. + """ + d = dict(o) + for p, t in transform: + if p(o): + d = t(o) + break + + for p, e in set_pairs: + if not p(d): + continue + if e is None: + raise ValueError(f"No value for key: {k}") + for k, v in e: + if callable(v): + v = v(d) + d[k] = v + + return d + + def _dict_factory(o): + return _render(_dict(o)) + + def render(self) -> dict: + """ + Recursively render the dataclass instance. + """ + return _render(asdict(self, dict_factory=_dict_factory)) + + setattr(cls, "render", render) + return cls + + # first arg value depends on the decorator usage: + # class if `@renderable`, `None` if `@renderable()`. + # referenced from https://github.com/python-attrs/attrs/blob/a59c5d7292228dfec5480388b5f6a14ecdf0626c/src/attr/_next_gen.py#L405C4-L406C65 + return __renderable if maybe_cls is None else __renderable(maybe_cls) diff --git a/flopy/mf6/utils/codegen/shim.py b/flopy/mf6/utils/codegen/shim.py new file mode 100644 index 0000000000..e9840cd701 --- /dev/null +++ b/flopy/mf6/utils/codegen/shim.py @@ -0,0 +1,690 @@ +""" +The purpose of this module is to keep special handling +necessary to support the current `flopy.mf6` generated +classes separate from more general templating and code +generation infrastructure. +""" + +import os +from keyword import kwlist +from typing import List, Optional + +from flopy.mf6.utils.codegen.dfn import Metadata +from flopy.mf6.utils.codegen.var import VarKind + + +def _is_ctx(o) -> bool: + """Whether the object is an input context.""" + d = dict(o) + return "name" in d and "base" in d + + +def _is_var(o) -> bool: + """Whether the object is a input context variable.""" + d = dict(o) + return "name" in d and "_type" in d + + +def _is_init_param(o) -> bool: + """Whether the object is an `__init__` method parameter.""" + d = dict(o) + return not d.get("ref", None) + + +def _is_container_init_param(o) -> bool: + """ + Whether the object is a parameter of the corresponding + package container class. This is only relevant for some + subpackage contexts. + """ + return True + + +def _add_exg_params(ctx: dict) -> dict: + """ + Add initializer parameters for an exchange input context. + Exchanges need different parameters than a typical package. + """ + vars_ = ctx["variables"].copy() + vars_ = { + "loading_package": { + "name": "loading_package", + "_type": "bool", + "description": ( + "Do not set this parameter. It is intended for " + "debugging and internal processing purposes only." + ), + "default": False, + "init_param": True, + }, + "exgtype": { + "name": "exgtype", + "_type": "str", + "default": f"{ctx['name'].r[:3].upper()}6-{ctx['name'].r[:3].upper()}6", + "description": "The exchange type.", + "init_param": True, + }, + "exgmnamea": { + "name": "exgmnamea", + "_type": "str", + "description": "The name of the first model in the exchange.", + "default": None, + "init_param": True, + }, + "exgmnameb": { + "name": "exgmnameb", + "_type": "str", + "description": "The name of the second model in the exchange.", + "default": None, + "init_param": True, + }, + **vars_, + "filename": { + "name": "filename", + "_type": "pathlike", + "description": "File name for this package.", + "default": None, + "init_param": True, + }, + "pname": { + "name": "pname", + "_type": "str", + "description": "Package name for this package.", + "default": None, + "init_param": True, + }, + } + + if ctx["references"]: + for key, ref in ctx["references"].items(): + if key not in vars_: + continue + vars_[ref["val"]] = { + "name": ref["val"], + "description": ref.get("description", None), + "reference": ref, + "init_param": True, + "default": None, + "construct_package": ref["abbr"], + "construct_data": ref["val"], + "parameter_name": ref["param"], + } + + ctx["variables"] = vars_ + return ctx + + +def _add_pkg_params(ctx: dict) -> dict: + """Add variables for a package context.""" + vars_ = ctx["variables"].copy() + + if ctx["name"].r == "nam": + init_skip = ["export_netcdf", "nc_filerecord"] + elif ctx["name"] == ("utl", "ts"): + init_skip = ["method", "interpolation_method_single", "sfac"] + else: + init_skip = [] + for k in init_skip: + var = vars_.get(k, None) + if var: + var["init_param"] = False + var["init_skip"] = True + vars_[k] = var + + vars_ = { + "loading_package": { + "name": "loading_package", + "_type": "bool", + "description": ( + "Do not set this variable. It is intended for debugging " + "and internal processing purposes only." + ), + "default": False, + "init_param": True, + }, + **vars_, + "filename": { + "name": "filename", + "_type": "str", + "description": "File name for this package.", + "default": None, + "init_param": True, + }, + "pname": { + "name": "pname", + "_type": "str", + "description": "Package name for this package.", + "default": None, + "init_param": True, + }, + } + + if ctx["name"].l == "utl": + vars_["parent_file"] = { + "name": "parent_file", + "_type": "pathlike", + "description": ( + "Parent package file that references this package. Only needed " + "for utility packages (mfutl*). For example, mfutllaktab package " + "must have a mfgwflak package parent_file." + ), + } + + if ctx["references"]: + for key, ref in ctx["references"].items(): + if key not in vars_: + continue + vars_[key] = { + "name": ref["val"], + "description": ref.get("description", None), + "reference": ref, + "init_param": ctx["name"].r != "nam", + "default": None, + "construct_package": ref["abbr"], + "construct_data": ref["val"], + "parameter_name": ref["param"], + } + + ctx["variables"] = vars_ + return ctx + + +def _add_mdl_params(ctx: dict) -> dict: + """Add variables for a model context.""" + vars_ = ctx["variables"].copy() + init_skip = ["packages", "export_netcdf", "nc_filerecord"] + for k in init_skip: + var = vars_.get(k, None) + if var: + var["init_param"] = False + var["init_skip"] = True + vars_[k] = var + vars_ = { + "modelname": { + "name": "modelname", + "_type": "str", + "description": "The name of the model.", + "default": "model", + "init_param": True, + }, + "model_nam_file": { + "name": "model_nam_file", + "_type": "pathlike", + "default": None, + "description": ( + "The relative path to the model name file from model working folder." + ), + "init_param": True, + }, + "version": { + "name": "version", + "_type": "str", + "description": "The version of modflow", + "default": "mf6", + "init_param": True, + }, + "exe_name": { + "name": "exe_name", + "_type": "str", + "description": "The executable name.", + "default": "mf6", + "init_param": True, + }, + "model_rel_path": { + "name": "model_rel_path", + "_type": "pathlike", + "description": "The model working folder path.", + "default": os.curdir, + "init_param": True, + }, + **vars_, + } + + if ctx["references"]: + for key, ref in ctx["references"].items(): + if key not in vars_: + continue + vars_[key] = { + "name": ref["val"], + "description": ref.get("description", None), + "reference": ref, + "construct_package": ref["abbr"], + "construct_data": ref["val"], + "parameter_name": ref["param"], + } + + ctx["variables"] = vars_ + return ctx + + +def _add_sim_params(ctx: dict) -> dict: + """Add variables for a simulation context.""" + vars_ = ctx["variables"].copy() + init_skip = [ + "tdis6", + "models", + "exchanges", + "mxiter", + "solutiongroup", + ] + for k in init_skip: + var = vars_.get(k, None) + if var: + var["init_param"] = False + var["init_skip"] = True + vars_[k] = var + vars_ = { + "sim_name": { + "name": "sim_name", + "_type": "str", + "default": "sim", + "description": "Name of the simulation.", + "init_param": True, + }, + "version": { + "name": "version", + "_type": "str", + "default": "mf6", + "init_param": True, + }, + "exe_name": { + "name": "exe_name", + "_type": "pathlike", + "default": "mf6", + "init_param": True, + }, + "sim_ws": { + "name": "sim_ws", + "_type": "pathlike", + "default": ".", + "init_param": True, + }, + "verbosity_level": { + "name": "verbosity_level", + "_type": "int", + "default": 1, + "init_param": True, + }, + "write_headers": { + "name": "write_headers", + "_type": "bool", + "default": True, + "init_param": True, + }, + "use_pandas": { + "name": "use_pandas", + "_type": "bool", + "default": True, + "init_param": True, + }, + "lazy_io": { + "name": "lazy_io", + "_type": "bool", + "default": False, + "init_param": True, + }, + **vars_, + } + + if ctx["references"] and ctx["name"] != (None, "nam"): + for key, ref in ctx["references"].items(): + if key not in vars_: + continue + vars_[key] = { + "name": ref["param"], + "description": ref.get("description", None), + "reference": ref, + "init_param": True, + "default": None, + } + + ctx["variables"] = vars_ + return ctx + + +def _add_parent_param(ctx: dict) -> dict: + vars_ = ctx["variables"] + parent = ctx["parent"] + if ctx.get("reference"): + parent = f"parent_{parent}" + ctx["variables"] = { + parent: { + "name": parent, + "_type": str(ctx["parent"]), + "description": f"Parent {parent} that this package is part of.", + "init_param": True, + }, + **vars_, + } + return ctx + + +def _add_init_params(o): + """Add context-specific `__init__()` method parameters.""" + ctx = dict(o) + if ctx["name"].base == "MFSimulationBase": + ctx = _add_sim_params(ctx) + elif ctx["name"].base == "MFModel": + ctx = _add_mdl_params(ctx) + ctx = _add_parent_param(ctx) + elif ctx["name"].base == "MFPackage": + if ctx["name"].l == "exg": + ctx = _add_exg_params(ctx) + else: + ctx = _add_pkg_params(ctx) + ctx = _add_parent_param(ctx) + return ctx + + +def _transform_context(o): + # add vars depending on the + # specific type of context. + # do this as a transform so + # we can control the order + # they appear in `__init__` + # or other method signatures. + return _add_init_params(o) + + +def _var_attrs(ctx: dict) -> str: + """ + Get class attributes for the context. + """ + ctx_name = ctx["name"] + + def _attr(var: dict) -> Optional[str]: + var_name = var["name"] + var_kind = var.get("kind", None) + var_block = var.get("block", None) + + if var_kind is None or var_kind == VarKind.Scalar.value: + return None + + if var_name in ["cvoptions", "output"]: + return None + + if ( + ctx_name.l is not None and ctx_name.r == "nam" + ) and var_name != "packages": + return None + + if var_kind in [ + VarKind.List.value, + VarKind.Record.value, + VarKind.Union.value, + ]: + if not var_block: + raise ValueError("Need block") + args = [f"'{ctx_name.r}'", f"'{var_block}'", f"'{var_name}'"] + if ctx_name.l is not None and ctx_name.l not in [ + "sim", + "sln", + "utl", + "exg", + ]: + args.insert(0, f"'{ctx_name.l}6'") + return f"{var_name} = ListTemplateGenerator(({', '.join(args)}))" + + if var_kind == VarKind.Array.value: + if not var_block: + raise ValueError("Need block") + args = [f"'{ctx_name.r}'", f"'{var_block}'", f"'{var_name}'"] + if ctx_name.l is not None and ctx_name.l not in [ + "sim", + "sln", + "utl", + "exg", + ]: + args.insert(0, f"'{ctx_name.l}6'") + return f"{var_name} = ArrayTemplateGenerator(({', '.join(args)}))" + + return None + + attrs = [_attr(var) for var in ctx["variables"].values()] + return "\n ".join([a for a in attrs if a]) + + +def _init_body(ctx: dict) -> str: + """ + Get the `__init__` method body for the context. + """ + + def _super_call() -> Optional[str]: + """ + Whether to pass the variable to `super().__init__()` + by name in the `__init__` method. + """ + + if ctx["base"] == "MFPackage": + parent = ctx["parent"] + if ctx["reference"]: + parent = f"parent_{parent}" + pkgtyp = ctx["name"].r + args = [ + parent, + f"'{pkgtyp}'", + "filename", + "pname", + "loading_package", + "**kwargs", + ] + elif ctx["base"] == "MFModel": + parent = ctx["parent"] + mdltyp = ctx["name"].l + args = [ + parent, + f"'{mdltyp}6'", + "modelname=modelname", + "model_nam_file=model_nam_file", + "version=version", + "exe_name=exe_name", + "model_rel_path=model_rel_path", + "**kwargs", + ] + elif ctx["base"] == "MFSimulationBase": + args = [ + "sim_name=sim_name", + "version=version", + "exe_name=exe_name", + "sim_ws=sim_ws", + "verbosity_level=verbosity_level", + "write_headers=write_headers", + "lazy_io=lazy_io", + "use_pandas=use_pandas", + ] + + return f"super().__init__({', '.join(args)})" + + def _should_assign(var: dict) -> bool: + """ + Whether to assign arguments to self in the + `__init__` method. if this is false, assume + the template has conditionals for any more + involved initialization needs. + """ + return var["name"] in ["exgtype", "exgmnamea", "exgmnameb"] + + def _should_build(var: dict) -> bool: + """ + Whether to call `build_mfdata()` on the variable. + in the `__init__` method. + """ + if var.get("reference", None): + return False + name = var["name"] + if name in [ + "simulation", + "model", + "package", + "parent_model", + "parent_package", + "loading_package", + "parent_model_or_package", + "exgtype", + "exgmnamea", + "exgmnameb", + "filename", + "pname", + "parent_file", + "modelname", + "model_nam_file", + "version", + "exe_name", + "model_rel_path", + "sim_name", + "sim_ws", + "verbosity_level", + "write_headers", + "use_pandas", + "lazy_io", + "export_netcdf", + "nc_filerecord", + "method", + "interpolation_method_single", + "sfac", + "output", + ]: + return False + return True + + def _body() -> Optional[str]: + if ctx["base"] in ["MFSimulationBase", "MFModel"]: + statements = [] + references = {} + for var in ctx["variables"].values(): + if not var.get("kind", None) or var.get("init_skip", False): + continue + name = var["name"] + if name in kwlist: + name = f"{name}_" + ref = var.get("reference", None) + statements.append(f"self.name_file.{name}.set_data({name})") + statements.append(f"self.{name} = self.name_file.{name}") + if ref and ref["key"] not in references: + references[ref["key"]] = ref + statements.append( + f"self._{ref['param']} = self._create_package('{ref['abbr']}', {ref['param']})" + ) + else: + statements = [] + references = {} + for var in ctx["variables"].values(): + name = var["name"] + ref = var.get("reference", None) + if name in kwlist: + name = f"{name}_" + + if _should_assign(var): + statements.append(f"self.{name} = {name}") + if name == "exgmnameb": + statements.append( + "simulation.register_exchange_file(self)" + ) + elif _should_build(var): + lname = name[:-1] if name.endswith("_") else name + statements.append( + f"self.{'_' if ref else ''}{name} = self.build_mfdata('{lname}', {name if var.get('init_param', True) else 'None'})" + ) + + if ( + ref + and ref["key"] not in references + and ctx["name"].r != "nam" + ): + references[ref["key"]] = ref + statements.append( + f"self._{ref['key']} = self.build_mfdata('{ref['key']}', None)" + ) + statements.append( + f"self._{ref['abbr']}_package = self.build_child_package('{ref['abbr']}', {ref['val']}, '{ref['param']}', self._{ref['key']})" + ) + + return ( + None + if not any(statements) + else "\n".join([" " + s for s in statements]) + ) + + sections = [_super_call(), _body()] + sections = [s for s in sections if s] + return "\n".join(sections) + + +def _dfn(o) -> List[Metadata]: + """ + Get a list of the class' original definition attributes + as a partial, internal reproduction of the DFN contents. + + Notes + ----- + Currently, generated classes have a `.dfn` property that + reproduces the corresponding DFN sans a few attributes. + This represents the DFN in raw form, before adapting to + Python, consolidating nested types, etc. + """ + + ctx = dict(o) + dfn = ctx["definition"] + + def _fmt_var(var: dict) -> List[str]: + exclude = ["longname", "description"] + + def _fmt_name(k, v): + return v.replace("-", "_") if k == "name" else v + + return [ + " ".join([k, str(_fmt_name(k, v))]).strip() + for k, v in var.items() + if k not in exclude + ] + + meta = dfn.metadata or list() + _dfn = [] + for name, var in dfn: + var_ = ctx["variables"].get(name, None) + if var_ and "construct_package" in var_: + var["construct_package"] = var_["construct_package"] + var["construct_data"] = var_["construct_data"] + var["parameter_name"] = var_["parameter_name"] + _dfn.append((name, var)) + return [["header"] + [m for m in meta]] + [_fmt_var(v) for k, v in _dfn] + + +def _qual_base(ctx: dict): + base = ctx["base"] + if base == "MFSimulationBase": + module = "mfsimbase" + elif base == "MFModel": + module = "mfmodel" + else: + module = "mfpackage" + return f"{module}.{base}" + + +SHIM = { + "keep_none": ["default", "block", "metadata"], + "quote_str": ["default"], + "set_pairs": [ + ( + _is_ctx, + [ + ("dfn", _dfn), + ("qual_base", _qual_base), + ("var_attrs", _var_attrs), + ("init_body", _init_body), + ], + ), + ( + _is_var, + [ + ("init_param", _is_init_param), + ("container_init_param", _is_container_init_param), + ], + ), + ], + "transform": [(_is_ctx, _transform_context)], +} +""" +Arguments for `renderable` as applied to `Context` +to support the current `flopy.mf6` input framework. +""" diff --git a/flopy/mf6/utils/codegen/templates/attrs.jinja b/flopy/mf6/utils/codegen/templates/attrs.jinja new file mode 100644 index 0000000000..86d8170006 --- /dev/null +++ b/flopy/mf6/utils/codegen/templates/attrs.jinja @@ -0,0 +1,9 @@ + {%- if base == "MFModel" %} + model_type = "{{ name.title }}" + {%- elif base == "MFPackage" %} + {{ var_attrs }} + package_abbr = "{% if name.l != "sln" and name.l != "sim" and name.l != "exg" and name.l is not none %}{{ name.l }}{% endif %}{{ name.r }}" + _package_type = "{{ name.r }}" + dfn_file_name = "{% if name.l is not none %}{{ name.l }}-{% elif name.l is none %}sim-{% endif %}{{ name.r }}.dfn" + dfn = {{ dfn|pprint|indent(10) }} + {% endif -%} \ No newline at end of file diff --git a/flopy/mf6/utils/codegen/templates/context.py.jinja b/flopy/mf6/utils/codegen/templates/context.py.jinja new file mode 100644 index 0000000000..89fc1140eb --- /dev/null +++ b/flopy/mf6/utils/codegen/templates/context.py.jinja @@ -0,0 +1,28 @@ +# autogenerated file, do not modify +from os import PathLike, curdir +import typing +import numpy as np +from typing import Any, Optional, Tuple, List, Dict, Union, Literal, Iterable +from numpy.typing import NDArray + +from flopy.mf6.data.mfdatautil import ArrayTemplateGenerator, ListTemplateGenerator +from flopy.mf6 import mfpackage +from flopy.mf6 import mfmodel +{# avoid circular import; some pkgs (e.g. mfnam) are used by mfsimbase.py #} +{% if base == "MFSimulationBase" %} +from flopy.mf6 import mfsimbase +{% endif %} + +class {% if base == "MFSimulationBase" %}MF{% else %}Modflow{% endif %}{{ name.title.title() }}({{ qual_base }}): + {% include "docstring.jinja" %} + + {% include "attrs.jinja" %} + + {% include "init.jinja" %} + + {% include "load.jinja" %} + +{# TODO: cleaner way to filter out hpc subpkgs? #} +{% if reference is defined and name.r != "hpc" %} +{% include "package_container.jinja" %} +{% endif %} \ No newline at end of file diff --git a/flopy/mf6/utils/codegen/templates/docstring.jinja b/flopy/mf6/utils/codegen/templates/docstring.jinja new file mode 100644 index 0000000000..488b567d45 --- /dev/null +++ b/flopy/mf6/utils/codegen/templates/docstring.jinja @@ -0,0 +1,12 @@ +""" + {{ description }} + + Parameters + ---------- + {% include "docstring_params.jinja" %} + + Methods + ------- + {% include "docstring_methods.jinja" %} + """ + diff --git a/flopy/mf6/utils/codegen/templates/docstring_methods.jinja b/flopy/mf6/utils/codegen/templates/docstring_methods.jinja new file mode 100644 index 0000000000..41daf5715d --- /dev/null +++ b/flopy/mf6/utils/codegen/templates/docstring_methods.jinja @@ -0,0 +1,13 @@ +{% if base == "MFSimulationBase" %} + load : (sim_name : str, version : string, + exe_name : str or PathLike, sim_ws : str or PathLike, strict : bool, + verbosity_level : int, load_only : list, verify_data : bool, + write_headers : bool, lazy_io : bool, use_pandas : bool, + ) : MFSimulation + a class method that loads a simulation from files +{% elif base == "MFModel" %} + load : (simulation : MFSimulationData, model_name : string, + namfile : string, version : string, exe_name : string, + model_ws : string, strict : boolean) : MFSimulation + a class method that loads a model from files +{% endif %} \ No newline at end of file diff --git a/flopy/mf6/utils/codegen/templates/docstring_params.jinja b/flopy/mf6/utils/codegen/templates/docstring_params.jinja new file mode 100644 index 0000000000..3afb3be69f --- /dev/null +++ b/flopy/mf6/utils/codegen/templates/docstring_params.jinja @@ -0,0 +1,9 @@ +{%- for v in variables.values() recursive %} + {% if loop.depth > 1 %}* {% endif %}{{ v.name }}{% if v._type is defined and v._type is not none %} : {{ v._type }}{% endif %} +{%- if v.description is defined and v.description is not none %} +{{ v.description|wordwrap|indent(4 + (loop.depth * 4), first=True) }} +{%- endif %} +{%- if v.children is defined and v.children is not none -%} +{{ loop(v.children.values())|indent(4) }} +{%- endif %} +{% endfor -%} \ No newline at end of file diff --git a/flopy/mf6/utils/codegen/templates/init.jinja b/flopy/mf6/utils/codegen/templates/init.jinja new file mode 100644 index 0000000000..f51d0fad7c --- /dev/null +++ b/flopy/mf6/utils/codegen/templates/init.jinja @@ -0,0 +1,18 @@ +def __init__( + self, + {%- for var in variables.values() if var.init_param %} + {%- if var.default is defined %} + {{ var.name }}={{ var.default }}, + {%- else %} + {{ var.name }}, + {%- endif -%} + {%- endfor %} + **kwargs, + ): + {{ init_body }} + {% if name.l == "exg" and n == "exgmnameb" -%} + parent.register_exchange_file(self) + {% endif -%} + {% if base == "MFPackage" %} + self._init_complete = True + {% endif %} \ No newline at end of file diff --git a/flopy/mf6/utils/codegen/templates/load.jinja b/flopy/mf6/utils/codegen/templates/load.jinja new file mode 100644 index 0000000000..e36e13c64e --- /dev/null +++ b/flopy/mf6/utils/codegen/templates/load.jinja @@ -0,0 +1,58 @@ +{% if base == "MFSimulationBase" %} + @classmethod + def load( + cls, + sim_name="modflowsim", + version="mf6", + exe_name: Union[str, PathLike] = "mf6", + sim_ws: Union[str, PathLike] = curdir, + strict=True, + verbosity_level=1, + load_only=None, + verify_data=False, + write_headers=True, + lazy_io=False, + use_pandas=True, + ): + return mfsimbase.MFSimulationBase.load( + cls, + sim_name, + version, + exe_name, + sim_ws, + strict, + verbosity_level, + load_only, + verify_data, + write_headers, + lazy_io, + use_pandas, + ) +{% elif base == "MFModel" %} + @classmethod + def load( + cls, + simulation, + structure, + modelname="NewModel", + model_nam_file="modflowtest.nam", + version="mf6", + exe_name="mf6", + strict=True, + model_rel_path=curdir, + load_only=None, + ): + return mfmodel.MFModel.load_base( + cls, + simulation, + structure, + modelname, + model_nam_file, + "{{ name.title }}6", + version, + exe_name, + strict, + model_rel_path, + load_only, + ) +{% endif %} \ No newline at end of file diff --git a/flopy/mf6/utils/codegen/templates/package_container.jinja b/flopy/mf6/utils/codegen/templates/package_container.jinja new file mode 100644 index 0000000000..a8060b1770 --- /dev/null +++ b/flopy/mf6/utils/codegen/templates/package_container.jinja @@ -0,0 +1,64 @@ +class {{ name.title.title() }}Packages(mfpackage.MFChildPackages): + """ + {{ name.title.title() }}Packages is a container class for the Modflow{{ name.title.title() }} class. + + Methods + ------- + initialize + Initializes a new Modflow{{ name.title.title() }} package removing any sibling child + packages attached to the same parent package. See Modflow{{ name.title.title() }} init + documentation for definition of parameters. + append_package + Adds a new Modflow{{ name.title.title() }} package to the container. See Modflow{{ name.title.title() }} + init documentation for definition of parameters. + """ + + package_abbr = "{{ name.title.lower() }}packages" + + def initialize( + self, + {%- for n, var in variables.items() if var.container_init_param and not var.init_skip %} + {%- if var.default is defined %} + {{ n }}={{ var.default }}, + {%- else -%} + {{ n }}, + {% endif -%} + {%- endfor %} + filename=None, + pname=None, + ): + new_package = Modflow{{ name.title.title() }}( + self._cpparent, + {%- for n, var in variables.items() if var.container_init_param and not var.init_skip %} + {{ n }}={{ n }}, + {%- endfor %} + filename=filename, + pname=pname, + child_builder_call=True, + ) + self.init_package(new_package, filename) + +{% if name.r != "obs" %} + def append_package( + self, + {%- for n, var in variables.items() if var.container_init_param and not var.init_skip %} + {%- if var.default is defined %} + {{ n }}={{ var.default }}, + {%- else -%} + {{ n }}, + {% endif -%} + {%- endfor %} + filename=None, + pname=None, + ): + new_package = Modflow{{ name.title.title() }}( + self._cpparent, + {%- for n, var in variables.items() if var.container_init_param and not var.init_skip %} + {{ n }}={{ n }}, + {%- endfor %} + filename=filename, + pname=pname, + child_builder_call=True, + ) + self._append_package(new_package, filename) +{% endif %} \ No newline at end of file diff --git a/flopy/mf6/utils/codegen/var.py b/flopy/mf6/utils/codegen/var.py new file mode 100644 index 0000000000..d90bb57ae6 --- /dev/null +++ b/flopy/mf6/utils/codegen/var.py @@ -0,0 +1,59 @@ +from dataclasses import dataclass +from enum import Enum +from typing import Any, Dict, Optional + +from flopy.mf6.utils.codegen.dfn import Metadata +from flopy.mf6.utils.codegen.ref import Ref + + +class VarKind(Enum): + """ + An input variable's kind. This is an enumeration + of the general shapes of data MODFLOW 6 accepts. + """ + + Array = "array" + Scalar = "scalar" + Record = "record" + Union = "union" + List = "list" + + +@dataclass +class Var: + """An input variable specification.""" + + name: str + _type: str + kind: VarKind + block: Optional[str] + description: Optional[str] + default: Optional[Any] + children: Optional[Dict[str, "Var"]] + metadata: Optional[Metadata] + reference: Optional[Ref] + + def __init__( + self, + name: str, + _type: str, + kind: VarKind, + block: Optional[str] = None, + description: Optional[str] = None, + default: Optional[Any] = None, + children: Optional["Vars"] = None, + metadata: Optional[Metadata] = None, + reference: Optional[Ref] = None, + ): + self.name = name + self._type = _type + self.kind = kind + self.block = block + self.description = description + self.default = default + self.children = children + self.metadata = metadata + self.reference = reference + + +Vars = Dict[str, Var] diff --git a/flopy/mf6/utils/createpackages.py b/flopy/mf6/utils/createpackages.py index e1a57fb094..6d76aa8cf6 100644 --- a/flopy/mf6/utils/createpackages.py +++ b/flopy/mf6/utils/createpackages.py @@ -81,1056 +81,14 @@ """ -import datetime -import os -import textwrap -from enum import Enum +from pathlib import Path -# keep below as absolute imports -from flopy.mf6.data import mfdatautil, mfstructure -from flopy.utils import datautil +from flopy.mf6.utils.codegen.make import make_all - -class PackageLevel(Enum): - sim_level = 0 - model_level = 1 - - -def build_doc_string(param_name, param_type, param_desc, indent): - return f"{indent}{param_name} : {param_type}\n{indent * 2}* {param_desc}" - - -def generator_type(data_type): - if ( - data_type == mfstructure.DataType.scalar_keyword - or data_type == mfstructure.DataType.scalar - ): - # regular scalar - return "ScalarTemplateGenerator" - elif ( - data_type == mfstructure.DataType.scalar_keyword_transient - or data_type == mfstructure.DataType.scalar_transient - ): - # transient scalar - return "ScalarTemplateGenerator" - elif data_type == mfstructure.DataType.array: - # array - return "ArrayTemplateGenerator" - elif data_type == mfstructure.DataType.array_transient: - # transient array - return "ArrayTemplateGenerator" - elif data_type == mfstructure.DataType.list: - # list - return "ListTemplateGenerator" - elif ( - data_type == mfstructure.DataType.list_transient - or data_type == mfstructure.DataType.list_multiple - ): - # transient or multiple list - return "ListTemplateGenerator" - - -def clean_class_string(name): - if len(name) > 0: - clean_string = name.replace(" ", "_") - clean_string = clean_string.replace("-", "_") - version = mfstructure.MFStructure().get_version_string() - # FIX: remove all numbers - if clean_string[-1] == version: - clean_string = clean_string[:-1] - return clean_string - return name - - -def build_dfn_string(dfn_list, header, package_abbr, flopy_dict): - dfn_string = " dfn = [" - line_length = len(dfn_string) - leading_spaces = " " * line_length - first_di = True - - # process header - dfn_string = f'{dfn_string}\n{leading_spaces}["header", ' - for key, value in header.items(): - if key == "multi-package": - dfn_string = f'{dfn_string}\n{leading_spaces} "multi-package", ' - if key == "package-type": - dfn_string = ( - f'{dfn_string}\n{leading_spaces} "package-type ' f'{value}"' - ) - - # process solution packages - if package_abbr in flopy_dict["solution_packages"]: - model_types = '", "'.join( - flopy_dict["solution_packages"][package_abbr] - ) - dfn_string = ( - f"{dfn_string}\n{leading_spaces} " - f'["solution_package", "{model_types}"], ' - ) - dfn_string = f"{dfn_string}],\n{leading_spaces}" - - # process all data items - for data_item in dfn_list: - line_length += 1 - if not first_di: - dfn_string = f"{dfn_string},\n{leading_spaces}" - line_length = len(leading_spaces) - else: - first_di = False - dfn_string = f"{dfn_string}[" - first_line = True - # process each line in a data item - for line in data_item: - line = line.strip() - # do not include the description of longname - if not line.lower().startswith( - "description" - ) and not line.lower().startswith("longname"): - line = line.replace('"', "'") - line_length += len(line) + 4 - if not first_line: - dfn_string = f"{dfn_string}," - if line_length < 77: - # added text fits on the current line - if first_line: - dfn_string = f'{dfn_string}"{line}"' - else: - dfn_string = f'{dfn_string} "{line}"' - else: - # added text does not fit on the current line - line_length = len(line) + len(leading_spaces) + 2 - if line_length > 79: - # added text too long to fit on a single line, wrap - # text as needed - line = f'"{line}"' - lines = textwrap.wrap( - line, - 75 - len(leading_spaces), - drop_whitespace=True, - ) - lines[0] = f"{leading_spaces} {lines[0]}" - line_join = f' "\n{leading_spaces} "' - dfn_string = f"{dfn_string}\n{line_join.join(lines)}" - else: - dfn_string = f'{dfn_string}\n{leading_spaces} "{line}"' - first_line = False - - dfn_string = f"{dfn_string}]" - dfn_string = f"{dfn_string}]" - return dfn_string - - -def create_init_var(clean_ds_name, data_structure_name, init_val=None): - if init_val is None: - init_val = clean_ds_name - - init_var = f" self.{clean_ds_name} = self.build_mfdata(" - leading_spaces = " " * len(init_var) - if len(init_var) + len(data_structure_name) + 2 > 79: - second_line = f'\n "{data_structure_name}",' - if len(second_line) + len(clean_ds_name) + 2 > 79: - init_var = f"{init_var}{second_line}\n {init_val})" - else: - init_var = f"{init_var}{second_line} {init_val})" - else: - init_var = f'{init_var}"{data_structure_name}",' - if len(init_var) + len(clean_ds_name) + 2 > 79: - init_var = f"{init_var}\n{leading_spaces}{init_val})" - else: - init_var = f"{init_var} {init_val})" - return init_var - - -def create_basic_init(clean_ds_name): - return f" self.{clean_ds_name} = {clean_ds_name}\n" - - -def create_property(clean_ds_name): - return f" {clean_ds_name} = property(get_{clean_ds_name}, set_{clean_ds_name})" - - -def format_var_list(base_string, var_list, is_tuple=False): - if is_tuple: - base_string = f"{base_string}(" - extra_chars = 4 - else: - extra_chars = 2 - line_length = len(base_string) - leading_spaces = " " * line_length - # determine if any variable name is too long to fit - for item in var_list: - if line_length + len(item) + extra_chars > 80: - leading_spaces = " " - base_string = f"{base_string}\n{leading_spaces}" - line_length = len(leading_spaces) - break - - for index, item in enumerate(var_list): - if is_tuple: - item = f"'{item}'" - if index == len(var_list) - 1: - next_var_str = item - else: - next_var_str = f"{item}," - line_length += len(item) + extra_chars - if line_length > 80: - base_string = f"{base_string}\n{leading_spaces}{next_var_str}" - else: - if base_string[-1] == ",": - base_string = f"{base_string} " - base_string = f"{base_string}{next_var_str}" - if is_tuple: - return f"{base_string}))" - else: - return f"{base_string})" - - -def create_package_init_var( - parameter_name, package_abbr, data_name, clean_ds_name -): - one_line = ( - f" self._{package_abbr}_package = self.build_child_package(" - ) - one_line_b = f'"{package_abbr}", {parameter_name},' - leading_spaces = " " * len(one_line) - two_line = f'\n{leading_spaces}"{data_name}",' - three_line = f"\n{leading_spaces}self._{clean_ds_name})" - return f"{one_line}{one_line_b}{two_line}{three_line}" - - -def add_var( - init_vars, - class_vars, - options_param_list, - init_param_list, - package_properties, - doc_string, - data_structure_dict, - default_value, - name, - python_name, - description, - path, - data_type, - basic_init=False, - construct_package=None, - construct_data=None, - parameter_name=None, - set_param_list=None, - mf_nam=False, -): - if set_param_list is None: - set_param_list = [] - clean_ds_name = datautil.clean_name(python_name) - if construct_package is None: - # add variable initialization lines - if basic_init: - init_vars.append(create_basic_init(clean_ds_name)) - else: - init_vars.append(create_init_var(clean_ds_name, name)) - # add to parameter list - if default_value is None: - default_value = "None" - init_param_list.append(f"{clean_ds_name}={default_value}") - if path is not None and "options" in path: - options_param_list.append(f"{clean_ds_name}={default_value}") - # add to set parameter list - set_param_list.append(f"{clean_ds_name}={clean_ds_name}") - else: - clean_parameter_name = datautil.clean_name(parameter_name) - # init hidden variable - init_vars.append(create_init_var(f"_{clean_ds_name}", name, "None")) - if mf_nam: - options_param_list.append( - [f"{parameter_name}_data=None", parameter_name] - ) - else: - # init child package - init_vars.append( - create_package_init_var( - clean_parameter_name, - construct_package, - construct_data, - clean_ds_name, - ) - ) - # add to parameter list - init_param_list.append(f"{clean_parameter_name}=None") - # add to set parameter list - set_param_list.append( - f"{clean_parameter_name}={clean_parameter_name}" - ) - - package_properties.append(create_property(clean_ds_name)) - doc_string.add_parameter(description, model_parameter=True) - data_structure_dict[python_name] = 0 - if class_vars is not None: - gen_type = generator_type(data_type) - if gen_type != "ScalarTemplateGenerator": - new_class_var = f" {clean_ds_name} = {gen_type}(" - class_vars.append(format_var_list(new_class_var, path, True)) - return gen_type - return None - - -def build_init_string( - init_string, init_param_list, whitespace=" " -): - line_chars = len(init_string) - for index, param in enumerate(init_param_list): - if isinstance(param, list): - param = param[0] - if index + 1 < len(init_param_list): - line_chars += len(param) + 2 - else: - line_chars += len(param) + 3 - if line_chars > 79: - if len(param) + len(whitespace) + 1 > 79: - # try to break apart at = sign - param_list = param.split("=") - if len(param_list) == 2: - init_string = "{},\n{}{}=\n{}{}".format( - init_string, - whitespace, - param_list[0], - whitespace, - param_list[1], - ) - line_chars = len(param_list[1]) + len(whitespace) + 1 - continue - init_string = f"{init_string},\n{whitespace}{param}" - line_chars = len(param) + len(whitespace) + 1 - else: - init_string = f"{init_string}, {param}" - return f"{init_string}):\n" - - -def build_model_load(model_type): - model_load_c = ( - " Methods\n -------\n" - " load : (simulation : MFSimulationData, model_name : " - "string,\n namfile : string, " - "version : string, exe_name : string,\n model_ws : " - "string, strict : boolean) : MFSimulation\n" - " a class method that loads a model from files" - '\n """' - ) - - model_load = ( - " @classmethod\n def load(cls, simulation, structure, " - "modelname='NewModel',\n " - "model_nam_file='modflowtest.nam', version='mf6',\n" - " exe_name='mf6', strict=True, " - "model_rel_path='.',\n" - " load_only=None):\n " - "return mfmodel.MFModel.load_base(cls, simulation, structure, " - "modelname,\n " - "model_nam_file, '{}6', version,\n" - " exe_name, strict, " - "model_rel_path,\n" - " load_only)" - "\n".format(model_type) - ) - return model_load, model_load_c - - -def build_sim_load(): - sim_load_c = ( - " Methods\n -------\n" - " load : (sim_name : str, version : " - "string,\n exe_name : str or PathLike, " - "sim_ws : str or PathLike, strict : bool,\n verbosity_level : " - "int, load_only : list, verify_data : bool,\n " - "write_headers : bool, lazy_io : bool, use_pandas : bool,\n " - ") : MFSimulation\n" - " a class method that loads a simulation from files" - '\n """' - ) - - sim_load = ( - " @classmethod\n def load(cls, sim_name='modflowsim', " - "version='mf6',\n " - "exe_name: Union[str, os.PathLike] = 'mf6',\n " - "sim_ws: Union[str, os.PathLike] = os.curdir,\n " - "strict=True, verbosity_level=1, load_only=None,\n " - "verify_data=False, write_headers=True,\n " - "lazy_io=False, use_pandas=True):\n " - "return mfsimbase.MFSimulationBase.load(cls, sim_name, version, " - "\n " - "exe_name, sim_ws, strict,\n" - " verbosity_level, " - "load_only,\n " - "verify_data, write_headers, " - "\n lazy_io, use_pandas)" - "\n" - ) - return sim_load, sim_load_c - - -def build_model_init_vars(param_list): - init_var_list = [] - # build set data calls - for param in param_list: - if not isinstance(param, list): - param_parts = param.split("=") - init_var_list.append( - f" self.name_file.{param_parts[0]}.set_data({param_parts[0]})" - ) - init_var_list.append("") - # build attributes - for param in param_list: - if isinstance(param, list): - pkg_name = param[1] - param_parts = param[0].split("=") - init_var_list.append( - f" self.{param_parts[0]} = " - f"self._create_package('{pkg_name}', {param_parts[0]})" - ) - else: - param_parts = param.split("=") - init_var_list.append( - f" self.{param_parts[0]} = self.name_file.{param_parts[0]}" - ) - - return "\n".join(init_var_list) - - -def create_packages(): - indent = " " - init_string_def = " def __init__(self" - - # load JSON file - file_structure = mfstructure.MFStructure(load_from_dfn_files=True) - sim_struct = file_structure.sim_struct - - # assemble package list of buildable packages - package_list = [] - for package in sim_struct.utl_struct_objs.values(): - # add utility packages to list - package_list.append( - ( - package, - PackageLevel.model_level, - "utl", - package.dfn_list, - package.file_type, - package.header, - ) - ) - package_list.append( - ( - sim_struct.name_file_struct_obj, - PackageLevel.sim_level, - "", - sim_struct.name_file_struct_obj.dfn_list, - sim_struct.name_file_struct_obj.file_type, - sim_struct.name_file_struct_obj.header, - ) - ) - for package in sim_struct.package_struct_objs.values(): - # add simulation level package to list - package_list.append( - ( - package, - PackageLevel.sim_level, - "", - package.dfn_list, - package.file_type, - package.header, - ) - ) - for model_key, model in sim_struct.model_struct_objs.items(): - package_list.append( - ( - model.name_file_struct_obj, - PackageLevel.model_level, - model_key, - model.name_file_struct_obj.dfn_list, - model.name_file_struct_obj.file_type, - model.name_file_struct_obj.header, - ) - ) - for package in model.package_struct_objs.values(): - package_list.append( - ( - package, - PackageLevel.model_level, - model_key, - package.dfn_list, - package.file_type, - package.header, - ) - ) - - util_path, tail = os.path.split(os.path.realpath(__file__)) - init_file = open( - os.path.join(util_path, "..", "modflow", "__init__.py"), - "w", - newline="\n", - ) - init_file.write("from .mfsimulation import MFSimulation # isort:skip\n") - - nam_import_string = ( - "from .. import mfmodel\nfrom ..data.mfdatautil " - "import ArrayTemplateGenerator, ListTemplateGenerator" - ) - - # loop through packages list - init_file_imports = [] - flopy_dict = file_structure.flopy_dict - for package in package_list: - data_structure_dict = {} - package_properties = [] - init_vars = [] - init_param_list = [] - options_param_list = [] - set_param_list = [] - class_vars = [] - template_gens = [] - - package_abbr = clean_class_string( - f"{clean_class_string(package[2])}{package[0].file_type}" - ).lower() - dfn_string = build_dfn_string( - package[3], package[5], package_abbr, flopy_dict - ) - package_name = clean_class_string( - "{}{}{}".format( - clean_class_string(package[2]), - package[0].file_prefix, - package[0].file_type, - ) - ).lower() - if package[0].description: - doc_string = mfdatautil.MFDocString(package[0].description) - else: - if package[2]: - package_container_text = f" within a {package[2]} model" - else: - package_container_text = "" - ds = "Modflow{} defines a {} package{}.".format( - package_name.title(), - package[0].file_type, - package_container_text, - ) - if package[0].file_type == "mvr": - # mvr package warning - if package[2]: - ds = ( - "{} This package\n can only be used to move " - "water between packages within a single model." - "\n To move water between models use ModflowMvr" - ".".format(ds) - ) - else: - ds = ( - "{} This package can only be used to move\n " - "water between two different models. To move " - "water between two packages\n in the same " - 'model use the "model level" mover package (ex. ' - "ModflowGwfmvr).".format(ds) - ) - - doc_string = mfdatautil.MFDocString(ds) - - if package[0].dfn_type == mfstructure.DfnType.exch_file: - exgtype = ( - f'"{package_abbr[0:3].upper()}6-{package_abbr[3:].upper()}6"' - ) - - add_var( - init_vars, - None, - options_param_list, - init_param_list, - package_properties, - doc_string, - data_structure_dict, - exgtype, - "exgtype", - "exgtype", - build_doc_string( - "exgtype", - "", - "is the exchange type (GWF-GWF or GWF-GWT).", - indent, - ), - None, - None, - True, - ) - add_var( - init_vars, - None, - options_param_list, - init_param_list, - package_properties, - doc_string, - data_structure_dict, - None, - "exgmnamea", - "exgmnamea", - build_doc_string( - "exgmnamea", - "", - "is the name of the first model that is " - "part of this exchange.", - indent, - ), - None, - None, - True, - ) - add_var( - init_vars, - None, - options_param_list, - init_param_list, - package_properties, - doc_string, - data_structure_dict, - None, - "exgmnameb", - "exgmnameb", - build_doc_string( - "exgmnameb", - "", - "is the name of the second model that is " - "part of this exchange.", - indent, - ), - None, - None, - True, - ) - init_vars.append( - " simulation.register_exchange_file(self)\n" - ) - - # loop through all blocks - for block in package[0].blocks.values(): - for data_structure in block.data_structures.values(): - # only create one property for each unique data structure name - if data_structure.name not in data_structure_dict: - mf_sim = ( - "parent_name_type" in package[0].header - and package[0].header["parent_name_type"][1] - == "MFSimulation" - ) - mf_nam = package[0].file_type == "nam" - if ( - data_structure.construct_package is not None - and not mf_sim - and not mf_nam - ): - c_pkg = data_structure.construct_package - else: - c_pkg = None - tg = add_var( - init_vars, - class_vars, - options_param_list, - init_param_list, - package_properties, - doc_string, - data_structure_dict, - data_structure.default_value, - data_structure.name, - data_structure.python_name, - data_structure.get_doc_string(79, indent, indent), - data_structure.path, - data_structure.get_datatype(), - False, - # c_pkg, - data_structure.construct_package, - data_structure.construct_data, - data_structure.parameter_name, - set_param_list, - mf_nam, - ) - if tg is not None and tg not in template_gens: - template_gens.append(tg) - - import_string = "from .. import mfpackage" - if template_gens: - import_string += "\nfrom ..data.mfdatautil import " - import_string += ", ".join(sorted(template_gens)) - # add extra docstrings for additional variables - doc_string.add_parameter( - " filename : String\n File name for this package." - ) - doc_string.add_parameter( - " pname : String\n Package name for this package." - ) - doc_string.add_parameter( - " parent_file : MFPackage\n " - "Parent package file that references this " - "package. Only needed for\n utility " - "packages (mfutl*). For example, mfutllaktab " - "package must have \n a mfgwflak " - "package parent_file." - ) - - # build package builder class string - init_vars.append(" self._init_complete = True") - init_vars = "\n".join(init_vars) - package_short_name = clean_class_string(package[0].file_type).lower() - class_def_string = "class Modflow{}(mfpackage.MFPackage):\n".format( - package_name.title() - ) - class_def_string = class_def_string.replace("-", "_") - class_var_string = ( - '{}\n package_abbr = "{}"\n _package_type = ' - '"{}"\n dfn_file_name = "{}"' - "\n".format( - "\n".join(class_vars), - package_abbr, - package[4], - package[0].dfn_file_name, - ) - ) - init_string_full = init_string_def - init_string_sim = f"{init_string_def}, simulation" - # add variables to init string - doc_string.add_parameter( - " loading_package : bool\n " - "Do not set this parameter. It is intended " - "for debugging and internal\n " - "processing purposes only.", - beginning_of_list=True, - ) - if "parent_name_type" in package[0].header: - init_var = package[0].header["parent_name_type"][0] - parent_type = package[0].header["parent_name_type"][1] - elif package[1] == PackageLevel.sim_level: - init_var = "simulation" - parent_type = "MFSimulation" - else: - init_var = "model" - parent_type = "MFModel" - doc_string.add_parameter( - f" {init_var} : {parent_type}\n " - f"{init_var.capitalize()} that this package is a part " - "of. Package is automatically\n " - f"added to {init_var} when it is " - "initialized.", - beginning_of_list=True, - ) - init_string_full = ( - f"{init_string_full}, {init_var}, loading_package=False" - ) - init_param_list.append("filename=None") - init_param_list.append("pname=None") - init_param_list.append("**kwargs") - init_string_full = build_init_string(init_string_full, init_param_list) - - # build init code - parent_init_string = " super().__init__(" - spaces = " " * len(parent_init_string) - parent_init_string = ( - '{}{}, "{}", filename, pname,\n{}' - "loading_package, **kwargs)\n\n" - " # set up variables".format( - parent_init_string, init_var, package_short_name, spaces - ) - ) - local_datetime = datetime.datetime.now(datetime.timezone.utc) - comment_string = ( - "# DO NOT MODIFY THIS FILE DIRECTLY. THIS FILE " - "MUST BE CREATED BY\n# mf6/utils/createpackages.py\n" - "# FILE created on {} UTC".format( - local_datetime.strftime("%B %d, %Y %H:%M:%S") - ) - ) - # assemble full package string - package_string = "{}\n{}\n\n\n{}{}\n{}\n{}\n\n{}{}\n{}\n".format( - comment_string, - import_string, - class_def_string, - doc_string.get_doc_string(), - class_var_string, - dfn_string, - init_string_full, - parent_init_string, - init_vars, - ) - - # open new Packages file - pb_file = open( - os.path.join(util_path, "..", "modflow", f"mf{package_name}.py"), - "w", - newline="\n", - ) - pb_file.write(package_string) - if ( - package[0].sub_package - and package_abbr != "utltab" - and ( - "parent_name_type" not in package[0].header - or package[0].header["parent_name_type"][1] != "MFSimulation" - ) - ): - set_param_list.append("filename=filename") - set_param_list.append("pname=pname") - set_param_list.append("child_builder_call=True") - whsp_1 = " " - whsp_2 = " " - - file_prefix = package[0].dfn_file_name[0:3] - chld_doc_string = ( - ' """\n {}Packages is a container ' - "class for the Modflow{} class.\n\n " - "Methods\n ----------" - "\n".format(package_name.title(), package_name.title()) - ) - - # write out child packages class - chld_cls = ( - "\n\nclass {}Packages(mfpackage.MFChildPackage" "s):\n".format( - package_name.title() - ) - ) - chld_var = ( - f" package_abbr = " - f'"{package_name.title().lower()}packages"\n\n' - ) - chld_init = " def initialize(self" - chld_init = build_init_string( - chld_init, init_param_list[:-1], whsp_1 - ) - init_pkg = "\n self.init_package(new_package, filename)" - params_init = ( - " new_package = Modflow" - f"{package_name.title()}(self._cpparent" - ) - params_init = build_init_string( - params_init, set_param_list, whsp_2 - ) - chld_doc_string = ( - "{} initialize\n Initializes a new " - "Modflow{} package removing any sibling " - "child\n packages attached to the same " - "parent package. See Modflow{} init\n " - " documentation for definition of " - "parameters.\n".format( - chld_doc_string, package_name.title(), package_name.title() - ) - ) - - chld_appn = "" - params_appn = "" - append_pkg = "" - if package_abbr != "utlobs": # Hard coded obs no multi-pkg support - chld_appn = "\n\n def append_package(self" - chld_appn = build_init_string( - chld_appn, init_param_list[:-1], whsp_1 - ) - append_pkg = ( - "\n self._append_package(new_package, filename)" - ) - params_appn = ( - " new_package = Modflow" - f"{file_prefix.capitalize()}" - f"{package_short_name}(self._cpparent" - ) - params_appn = build_init_string( - params_appn, set_param_list, whsp_2 - ) - chld_doc_string = ( - "{} append_package\n Adds a " - "new Modflow{}{} package to the container." - " See Modflow{}{}\n init " - "documentation for definition of " - "parameters.\n".format( - chld_doc_string, - file_prefix.capitalize(), - package_short_name, - file_prefix.capitalize(), - package_short_name, - ) - ) - chld_doc_string = f'{chld_doc_string} """\n' - packages_str = "{}{}{}{}{}{}{}{}{}\n".format( - chld_cls, - chld_doc_string, - chld_var, - chld_init, - params_init[:-2], - init_pkg, - chld_appn, - params_appn[:-2], - append_pkg, - ) - pb_file.write(packages_str) - pb_file.close() - - init_file_imports.append( - f"from .mf{package_name} import Modflow{package_name.title()}\n" - ) - - if package[0].dfn_type == mfstructure.DfnType.model_name_file: - # build model file - init_vars = build_model_init_vars(options_param_list) - - options_param_list.insert(0, "model_rel_path='.'") - options_param_list.insert(0, "exe_name='mf6'") - options_param_list.insert(0, "version='mf6'") - options_param_list.insert(0, "model_nam_file=None") - options_param_list.insert(0, "modelname='model'") - options_param_list.append("**kwargs,") - init_string_sim = build_init_string( - init_string_sim, options_param_list - ) - sim_name = clean_class_string(package[2]) - class_def_string = "class Modflow{}(mfmodel.MFModel):\n".format( - sim_name.capitalize() - ) - class_def_string = class_def_string.replace("-", "_") - doc_string.add_parameter( - " sim : MFSimulation\n " - "Simulation that this model is a part " - "of. Model is automatically\n " - "added to simulation when it is " - "initialized.", - beginning_of_list=True, - model_parameter=True, - ) - doc_string.description = ( - f"Modflow{sim_name} defines a {sim_name} model" - ) - class_var_string = f" model_type = '{sim_name}'\n" - mparent_init_string = " super().__init__(" - spaces = " " * len(mparent_init_string) - mparent_init_string = ( - "{}simulation, model_type='{}6',\n{}" - "modelname=modelname,\n{}" - "model_nam_file=model_nam_file,\n{}" - "version=version, exe_name=exe_name,\n{}" - "model_rel_path=model_rel_path,\n{}" - "**kwargs," - ")\n".format( - mparent_init_string, - sim_name, - spaces, - spaces, - spaces, - spaces, - spaces, - ) - ) - load_txt, doc_text = build_model_load(sim_name) - package_string = "{}\n{}\n\n\n{}{}\n{}\n{}\n{}{}\n{}\n\n{}".format( - comment_string, - nam_import_string, - class_def_string, - doc_string.get_doc_string(True), - doc_text, - class_var_string, - init_string_sim, - mparent_init_string, - init_vars, - load_txt, - ) - md_file = open( - os.path.join(util_path, "..", "modflow", f"mf{sim_name}.py"), - "w", - newline="\n", - ) - md_file.write(package_string) - md_file.close() - init_file_imports.append( - f"from .mf{sim_name} import Modflow{sim_name.capitalize()}\n" - ) - elif package[0].dfn_type == mfstructure.DfnType.sim_name_file: - # build simulation file - init_vars = build_model_init_vars(options_param_list) - - options_param_list.insert(0, "lazy_io=False") - options_param_list.insert(0, "use_pandas=True") - options_param_list.insert(0, "write_headers=True") - options_param_list.insert(0, "verbosity_level=1") - options_param_list.insert( - 0, "sim_ws: Union[str, os.PathLike] = " "os.curdir" - ) - options_param_list.insert( - 0, "exe_name: Union[str, os.PathLike] " '= "mf6"' - ) - options_param_list.insert(0, "version='mf6'") - options_param_list.insert(0, "sim_name='sim'") - init_string_sim = " def __init__(self" - init_string_sim = build_init_string( - init_string_sim, options_param_list - ) - class_def_string = ( - "class MFSimulation(mfsimbase." "MFSimulationBase):\n" - ) - doc_string.add_parameter( - " sim_name : str\n" " Name of the simulation", - beginning_of_list=True, - model_parameter=True, - ) - doc_string.description = ( - "MFSimulation is used to load, build, and/or save a MODFLOW " - "6 simulation. \n A MFSimulation object must be created " - "before creating any of the MODFLOW 6 \n model objects." - ) - sparent_init_string = " super().__init__(" - spaces = " " * len(sparent_init_string) - sparent_init_string = ( - "{}sim_name=sim_name,\n{}" - "version=version,\n{}" - "exe_name=exe_name,\n{}" - "sim_ws=sim_ws,\n{}" - "verbosity_level=verbosity_level,\n{}" - "write_headers=write_headers,\n{}" - "lazy_io=lazy_io,\n{}" - "use_pandas=use_pandas,\n{}" - ")\n".format( - sparent_init_string, - spaces, - spaces, - spaces, - spaces, - spaces, - spaces, - spaces, - spaces, - ) - ) - sim_import_string = ( - "import os\n" - "from typing import Union\n" - "from .. import mfsimbase" - ) - - load_txt, doc_text = build_sim_load() - package_string = "{}\n{}\n\n\n{}{}\n{}\n{}{}\n{}\n\n{}".format( - comment_string, - sim_import_string, - class_def_string, - doc_string.get_doc_string(False, True), - doc_text, - init_string_sim, - sparent_init_string, - init_vars, - load_txt, - ) - sim_file = open( - os.path.join(util_path, "..", "modflow", "mfsimulation.py"), - "w", - newline="\n", - ) - sim_file.write(package_string) - sim_file.close() - init_file_imports.append( - "from .mfsimulation import MFSimulation\n" - ) - - # Sort the imports - for line in sorted(init_file_imports, key=lambda x: x.split()[3]): - init_file.write(line) - init_file.close() +_MF6_PATH = Path(__file__).parents[1] +_DFN_PATH = _MF6_PATH / "data" / "dfn" +_TGT_PATH = _MF6_PATH / "modflow" if __name__ == "__main__": - create_packages() + make_all(_DFN_PATH, _TGT_PATH) diff --git a/flopy/mf6/utils/generate_classes.py b/flopy/mf6/utils/generate_classes.py index 32c1d6978c..5f59135171 100644 --- a/flopy/mf6/utils/generate_classes.py +++ b/flopy/mf6/utils/generate_classes.py @@ -2,9 +2,10 @@ import shutil import tempfile import time +from pathlib import Path from warnings import warn -from .createpackages import create_packages +from .createpackages import make_all thisfilepath = os.path.dirname(os.path.abspath(__file__)) flopypth = os.path.join(thisfilepath, "..", "..") @@ -14,6 +15,10 @@ default_owner = "MODFLOW-USGS" default_repo = "modflow6" +_MF6_PATH = Path(__file__).parents[1] +_DFN_PATH = _MF6_PATH / "data" / "dfn" +_TGT_PATH = _MF6_PATH / "modflow" + def delete_files(files, pth, allow_failure=False, exclude=None): if exclude is None: @@ -189,7 +194,7 @@ def generate_classes( delete_mf6_classes() print(" Create mf6 classes using the downloaded definition files.") - create_packages() + make_all(_DFN_PATH, _TGT_PATH) list_files(os.path.join(flopypth, "mf6", "modflow")) diff --git a/pyproject.toml b/pyproject.toml index f18d5cd164..436cbfcf27 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,9 +29,10 @@ classifiers = [ ] requires-python = ">=3.8" dependencies = [ + "Jinja2>=3.0", "numpy>=1.20.3", "matplotlib >=1.4.0", - "pandas >=2.0.0" + "pandas >=2.0.0", ] dynamic = ["version", "readme"]