diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 556f8a4..3ce0f15 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,8 +26,8 @@ jobs: - name: Install Python packages run: | - pip3 install . - pip3 install ".[lint]" + pip install . + pip install ".[lint]" - name: Run isort run: isort --verbose --check --diff modflow_devtools @@ -81,8 +81,8 @@ jobs: - name: Install Python packages working-directory: modflow-devtools run: | - pip3 install . - pip3 install ".[test]" + python3 -m pip install . + python3 -m pip install ".[test]" - name: Run tests working-directory: modflow-devtools diff --git a/DEVELOPER.md b/DEVELOPER.md index 222ecbe..5158eb2 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -5,35 +5,30 @@ This document provides guidance to set up a development environment and discusse -- [Developing `modflow-devtools`](#developing-modflow-devtools) - - [Installation](#installation) - - [Testing](#testing) - - [Environment variables](#environment-variables) - - [Running the tests](#running-the-tests) +- [Installation](#installation) +- [Testing](#testing) + - [Running the tests](#running-the-tests) + - [Writing new tests](#writing-new-tests) + - [Temporary directories](#temporary-directories) ## Installation -To get started, first fork and clone this repository. +To get started, first fork and clone this repository. Then install the project and core packages as well as linting and testing dependencies: -## Testing - -This project uses [`pytest`](https://docs.pytest.org/en/latest/) and several plugins. [`PyGithub`](https://github.com/PyGithub/PyGithub) is used to communicate with the GitHub API. - -### Environment variables - -#### `GITHUB_TOKEN` - -Tests require access to the GitHub API — in order to avoid rate limits, the tests attempt to authenticate with an access token. In C, this is the `GITHUB_TOKEN` provided by GitHub Actions. For local development a personal access token must be used. Setting the `GITHUB_TOKEN` variable manually will work, but the recommended approach is to use a `.env` file in your project root (the tests will automatically discover and use any environment variables configured here courtesy of [`pytest-dotenv`](https://github.com/quiqua/pytest-dotenv)). +```shell +pip install . +pip install ".[lint, test]" +``` -#### `MODFLOW6_PATH` +## Testing -By default, ths project's tests look for the `modflow6` repository side-by-side with `modflow-devtools` on the filesystem. The `MODFLOW6_PATH` variable is optional and can be used to configure a different location for the `modflow6` repo. +This repository's tests use [`pytest`](https://docs.pytest.org/en/latest/) and several plugins. ### Running the tests -To run the tests in parallel with verbose output, run from the project root: +Tests should be run from the project root. To run the tests in parallel with verbose output: ```shell pytest -v -n auto @@ -45,12 +40,4 @@ Tests should follow a few conventions for ease of use and maintenance. #### Temporary directories -If tests must write to disk, they should use `pytest`'s built-in `temp_dir` fixture or one of the scoped temporary directory fixtures defined in `conftest.py`. - -#### Using the GitHub API - -To access the GitHub API from a test case, just construct a `Github` object: - -```python -api = Github(environ.get("GITHUB_TOKEN")) -``` \ No newline at end of file +Tests which must write to disk should use `pytest`'s built-in `temp_dir` fixture or one of the scoped temporary directory fixtures defined in `conftest.py` (the latter are part of this package's public API and so are tested in `modflow_devtools/test/test_conftest.py`). diff --git a/README.md b/README.md index 254f8df..396bcee 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,237 @@ -# modflow-devtools -python tools for MODFLOW development +# MODFLOW developer tools + +[![Project Status: WIP – Initial development is in progress, but there has not yet been a stable, usable release suitable for the public.](https://www.repostatus.org/badges/latest/wip.svg)](https://www.repostatus.org/#wip) + +Python tools for MODFLOW and FloPy development and testing. + + + + +- [Requirements](#requirements) +- [Installation](#installation) +- [Usage](#usage) + - [Regression test framework](#regression-test-framework) + - [`MFZipFile` class and usage](#mfzipfile-class-and-usage) + - [Keepable temporary directories](#keepable-temporary-directories) + - [Example model test generation](#example-model-test-generation) + - [Test model fixtures](#test-model-fixtures) + - [Example model fixtures](#example-model-fixtures) + - [Conditionally skipping tests](#conditionally-skipping-tests) + - [Miscellaneous](#miscellaneous) + - [Generating TOCs with `doctoc`](#generating-tocs-with-doctoc) + - [Testing CI workflows with `act`](#testing-ci-workflows-with-act) +- [MODFLOW Resources](#modflow-resources) + + + +## Requirements + +This package requires Python3.7+. Its only dependencies are `numpy` and `pytest`. + +## Installation + +This package is not yet published to PyPI or a Conda channel. To install it please see the [developer documentation](DEVELOPER.md). + +## Usage + +This package contains shared tools for developing and testing MODFLOW 6 and FloPy, including standalone utilities as well as `pytest` fixtures, CLI options, and test parametrizations: + +- a framework for MODFLOW regression test comparisons +- a `ZipFile` child class preserving file attributes +- various `pytest` fixtures and utilities + - keepable temporary directories + - fixtures/hooks to generate tests from example repos + - markers to conditionally skip test cases based on + - operating system + - Python packages installed + - executables available on the path + +To import `pytest` configuration in a project consuming `modflow-devtools`, add the following to the project's top-level `conftest.py` file: + +```python +pytest_plugins = [ "modflow_devtools/fixtures" ] +``` + +Note that `pytest` requires that this `conftest.py` live in your project root. (You can create nested `conftest.py` files to override default behavior if needed.) + +### Regression test framework + +### `MFZipFile` class and usage + +### Keepable temporary directories + +Tests often need to exercise code that reads from and/or writes to disk. The test harness may also need to create test data during setup and clean up the filesystem on teardown. Temporary directories are built into `pytest` via the [`tmp_path`](https://docs.pytest.org/en/latest/how-to/tmp_path.html#the-tmp-path-fixture) and `tmp_path_factory` fixtures. + +Several fixtures are provided in `modflow_devtools/test/conftest.py` to extend the behavior of temporary directories for test functions: + +- `tmpdir` +- `module_tmpdir` +- `class_tmpdir` +- `session_tmpdir` + +These are automatically created before test code runs and lazily removed afterwards, subject to the same [cleanup procedure](https://docs.pytest.org/en/latest/how-to/tmp_path.html#the-default-base-temporary-directory) used by the default `pytest` fixtures. Their purpose is to allow temporary test artifacts to be saved in a user-specified location when `pytest` is invoked with a `--keep` option — this can be useful to debug failing tests. + +```python +from pathlib import Path +import inspect + +def test_tmpdirs(tmpdir, module_tmpdir): + # function-scoped temporary directory + assert isinstance(tmpdir, Path) + assert tmpdir.is_dir() + assert inspect.currentframe().f_code.co_name in tmpdir.stem + + # module-scoped temp dir (accessible to other tests in the script) + assert module_tmpdir.is_dir() + assert "autotest" in module_tmpdir.stem +``` + +Any files written to the temporary directory will be saved to saved to subdirectories of `temp` named according to the test case, class or module. For instance, to store test outputs in a new folder named `temp` relative to the working directory (e.g., `/autotest`), run: + +```shell +pytest --keep temp +``` + +### Example model test generation + +Fixtures are provided to parametrize test functions dynamically from models in the MODFLOW 6 example and test model repositories: + +- [`MODFLOW-USGS/modflow6-examples`](https://github.com/MODFLOW-USGS/modflow6-examples) +- [`MODFLOW-USGS/modflow6-testmodels`](https://github.com/MODFLOW-USGS/modflow6-testmodels) +- [`MODFLOW-USGS/modflow6-largetestmodels`](https://github.com/MODFLOW-USGS/modflow6-largetestmodels) + +These can be requested like any other `pytest` fixture by adding one of the following test function arguments: + +- `test_model_mf5to6` +- `test_model_mf6` +- `large_test_model` +- `example_scenario` + +**Note**: test models for `mf5to6` and `mf6` both live in the `modflow6-testmodels` repository and must be requested separately. + +**Note**: example models must be built with the `ci_build_files.py` script located in `modflow6-examples/etc` before running tests using the `example_scenario` fixture. + +#### Test model fixtures + +The `test_model_mf5to6`, `test_model_mf6` and `large_test_model` fixtures are the `Path` to the directory containing the model's namefile. These can be used straightforwardly, for instance: + +```python +def test_mf5to6_model( + tmpdir: Path, + testmodel_mf5to6: Path): + # load the model + # switch to temp workdir + # run the model + ... +``` + +#### Example model fixtures + +The `example_scenario` fixture is an ordered list of model namefile `Path`s, representing models to be run in the specified order. (Order matters, as some models may depend on the outputs of others.) + +### Conditionally skipping tests + +Several `pytest` markers are provided to conditionally skip tests based on executable availability, Python package environment or operating system. + +To skip tests if one or more executables are not available on the path: + +```python +from shutil import which +from autotest.conftest import requires_exe + +@requires_exe("mf6") +def test_mf6(): + assert which("mf6") + +@requires_exe("mf6", "mp7") +def test_mf6_and_mp7(): + assert which("mf6") + assert which("mp7") +``` + +To skip tests if one or more Python packages are not available: + +```python +from autotest.conftest import requires_pkg + +@requires_pkg("pandas") +def test_needs_pandas(): + import pandas as pd + +@requires_pkg("pandas", "shapefile") +def test_needs_pandas(): + import pandas as pd + from shapefile import Reader +``` + +To mark tests requiring or incompatible with particular operating systems: + +```python +import os +import platform +from autotest.conftest import requires_platform, excludes_platform + +@requires_platform("Windows") +def test_needs_windows(): + assert platform.system() == "Windows" + +@excludes_platform("Darwin", ci_only=True) +def test_breaks_osx_ci(): + if "CI" in os.environ: + assert platform.system() != "Darwin" +``` + +Platforms must be specified as returned by `platform.system()`. + +Both these markers accept a `ci_only` flag, which indicates whether the policy should only apply when the test is running on GitHub Actions CI. + +There is also a `@requires_github` marker, which will skip decorated tests if the GitHub API is unreachable. + +### Miscellaneous + +A few other useful tools for FloPy development include: + +- [`doctoc`](https://www.npmjs.com/package/doctoc): automatically generate table of contents sections for markdown files +- [`act`](https://github.com/nektos/act): test GitHub Actions workflows locally (requires Docker) + +#### Generating TOCs with `doctoc` + +The [`doctoc`](https://www.npmjs.com/package/doctoc) tool can be used to automatically generate table of contents sections for markdown files. `doctoc` is distributed with the [Node Package Manager](https://docs.npmjs.com/cli/v7/configuring-npm/install). With Node installed use `npm install -g doctoc` to install `doctoc` globally. Then just run `doctoc `, e.g.: + +```shell +doctoc DEVELOPER.md +``` + +This will insert HTML comments surrounding an automatically edited region, scanning for headers and creating an appropriately indented TOC tree. Subsequent runs are idempotent, updating if the file has changed or leaving it untouched if not. + +To run `doctoc` for all markdown files in a particular directory (recursive), use `doctoc some/path`. + +#### Testing CI workflows with `act` + +The [`act`](https://github.com/nektos/act) tool uses Docker to run containerized CI workflows in a simulated GitHub Actions environment. [Docker Desktop](https://www.docker.com/products/docker-desktop/) is required for Mac or Windows and [Docker Engine](https://docs.docker.com/engine/) on Linux. + +With Docker installed and running, run `act -l` from the project root to see available CI workflows. To run all workflows and jobs, just run `act`. To run a particular workflow use `-W`: + +```shell +act -W .github/workflows/commit.yml +``` + +To run a particular job within a workflow, add the `-j` option: + +```shell +act -W .github/workflows/commit.yml -j build +``` + +**Note:** GitHub API rate limits are easy to exceed, especially with job matrices. Authenticated GitHub users have a much higher rate limit: use `-s GITHUB_TOKEN=` when invoking `act` to provide a personal access token. Note that this will log your token in shell history — leave the value blank for a prompt to enter it more securely. + +The `-n` flag can be used to execute a dry run, which doesn't run anything, just evaluates workflow, job and step definitions. See the [docs](https://github.com/nektos/act#example-commands) for more. + +**Note:** `act` can only run Linux-based container definitions, so Mac or Windows workflows or matrix OS entries will be skipped. + + +## MODFLOW Resources + ++ [MODFLOW and Related Programs](https://water.usgs.gov/ogw/modflow/) ++ [Online guide for MODFLOW-2000](https://water.usgs.gov/nrp/gwsoftware/modflow2000/Guide/) ++ [Online guide for MODFLOW-2005](https://water.usgs.gov/ogw/modflow/MODFLOW-2005-Guide/) ++ [Online guide for MODFLOW-NWT](https://water.usgs.gov/ogw/modflow-nwt/MODFLOW-NWT-Guide/) \ No newline at end of file diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..74bcb56 --- /dev/null +++ b/conftest.py @@ -0,0 +1 @@ +pytest_plugins = ["modflow_devtools.fixtures"] diff --git a/modflow_devtools/__init__.py b/modflow_devtools/__init__.py index 970b2c8..e69de29 100644 --- a/modflow_devtools/__init__.py +++ b/modflow_devtools/__init__.py @@ -1,9 +0,0 @@ -from .config import ( - __author__, - __date__, - __description__, - __email__, - __maintainer__, - __status__, - __version__, -) diff --git a/modflow_devtools/build.py b/modflow_devtools/build.py deleted file mode 100644 index 9ed9b3c..0000000 --- a/modflow_devtools/build.py +++ /dev/null @@ -1,40 +0,0 @@ -import subprocess -import sys -from os import PathLike -from pathlib import Path - -from modflow_devtools.misc import set_dir - - -def meson_build( - prj_path: PathLike = ".", - bld_path: PathLike = "build", - bin_path: PathLike = "bin", - lib_path: PathLike = "bin", - quiet: bool = True, -): - """ - Setup, compile and install with meson. - """ - - prj_path = Path(prj_path).expanduser().absolute() - bld_path = Path(bld_path).expanduser().absolute() - bin_path = Path(bin_path).expanduser().absolute() - lib_path = Path(lib_path).expanduser().absolute() - - with set_dir(prj_path): - cmd = ( - f"meson setup {bld_path} " - + f"--bindir={bin_path} " - + f"--libdir={lib_path} " - + "--prefix=" - + ("%CD%" if sys.platform.lower() == "win32" else "$(pwd)") - ) - if not quiet: - print(f"Running: {cmd}") - subprocess.run(cmd, shell=True, check=True) - - cmd = f"meson install -C {bld_path}" - if not quiet: - print(f"Running: {cmd}") - subprocess.run(cmd, shell=True, check=True) diff --git a/modflow_devtools/config.py b/modflow_devtools/config.py deleted file mode 100644 index f46a0cb..0000000 --- a/modflow_devtools/config.py +++ /dev/null @@ -1,7 +0,0 @@ -__author__ = "Joseph D. Hughes" -__date__ = "March 2, 2022" -__version__ = "0.0.1" -__maintainer__ = "Joseph D. Hughes" -__email__ = "jdhughes@usgs.gov" -__status__ = "Production" -__description__ = """MODFLOW 6 developer tools.""" diff --git a/modflow_devtools/context.py b/modflow_devtools/context.py deleted file mode 100644 index 9e4c0de..0000000 --- a/modflow_devtools/context.py +++ /dev/null @@ -1,572 +0,0 @@ -import json -import os -import shutil -import subprocess -import sys -import tarfile -import timeit -from enum import Enum - -import flopy -from modflow_devtools.build import meson_build -from modflow_devtools.http import get_request -from modflow_devtools.usgsprograms import usgs_program_data -from modflow_devtools.zip import MFZipFile - - -class MFTargetType(Enum): - TEST = 1 - RELEASE = 2 - REGRESSION = 3 - - -class MFTestTargets: - """define test targets for modflow tests""" - - def __init__( - self, - testbin: str = None, - releasebin: str = None, - builtbin: str = None, - use_path: bool = False, - ): - """MFTestTargets init""" - - self._exe_targets = { - "mf6": {"exe": "mf6", "type": MFTargetType.TEST}, - "mf5to6": {"exe": "mf5to6", "type": MFTargetType.TEST}, - "zbud6": {"exe": "zbud6", "type": MFTargetType.TEST}, - "libmf6": {"exe": None, "type": MFTargetType.TEST}, - "mf2005": {"exe": "mf2005dbl", "type": MFTargetType.RELEASE}, - "mfnwt": {"exe": "mfnwtdbl", "type": MFTargetType.RELEASE}, - "mfusg": {"exe": "mfusgdbl", "type": MFTargetType.RELEASE}, - "mflgr": {"exe": "mflgrdbl", "type": MFTargetType.RELEASE}, - "mf2005s": {"exe": "mf2005", "type": MFTargetType.RELEASE}, - "mt3dms": {"exe": "mt3dms", "type": MFTargetType.RELEASE}, - "mf6-regression": {"exe": "mf6", "type": MFTargetType.REGRESSION}, - } - - self._testbin = testbin - self._releasebin = releasebin - self._builtbin = builtbin - self._use_path = use_path - self._target_path_d = None - - def set_targets(self): - """ - set target paths from current bin directories - """ - self._set_targets() - - def target_paths(self): - """ - get the target path dictionary generated by set_targets - """ - return self._target_path_d - - def get_mf6_version(self, version=None): - """ - get version of mf6 entry in _exe_targets - """ - return self._mf6_target_version(target=version) - - def target_exe_d(self): - """ - get the _exe_targets dictionary - """ - return self._exe_targets - - def release_exe_names(self): - """ - get name list of release executables - """ - target_ext, target_so = self._extensions() - return [ - f"{self._exe_targets[t]['exe']}{target_ext}" - for t in self._exe_targets - if self._exe_targets[t]["type"] == MFTargetType.RELEASE - and self._exe_targets[t]["exe"] - ] - - def release_lib_names(self): - """ - get name list of release libs - """ - target_ext, target_so = self._extensions() - return [ - f"{self._exe_targets[t]}{target_so}" - for t in self._exe_targets - if self._exe_targets[t]["type"] == MFTargetType.RELEASE - and self._exe_targets[t]["exe"] is None - ] - - def regression_exe_names(self): - """ - get name list of regression executables - """ - target_ext, target_so = self._extensions() - return [ - f"{self._exe_targets[t]['exe']}{target_ext}" - for t in self._exe_targets - if self._exe_targets[t]["type"] == MFTargetType.REGRESSION - and self._exe_targets[t]["exe"] - ] - - def regression_lib_names(self): - """ - get name list of regression libs - """ - target_ext, target_so = self._extensions() - return [ - f"{self._exe_targets[t]}{target_so}" - for t in self._exe_targets - if self._exe_targets[t]["type"] == MFTargetType.REGRESSION - and self._exe_targets[t]["exe"] is None - ] - - def _target_pth(self, target, target_t=None, is_lib=False): - if target_t == MFTargetType.TEST: - path = self._testbin - elif target_t == MFTargetType.REGRESSION: - path = self._builtbin - elif target_t == MFTargetType.RELEASE: - path = self._releasebin - - if target_t != MFTargetType.TEST and self._use_path: - exe_exists = flopy.which(target) - else: - exe_exists = flopy.which(target, path=path) - - if ( - exe_exists is None - and is_lib - and os.path.isfile(os.path.join(path, target)) - ): - exe_exists = os.path.join(path, target) - - if exe_exists is None: - print(target) - raise Exception( - f"{target} does not exist or is not executable in test context." - ) - - return os.path.abspath(exe_exists) - - def _run_exe(self, argv, ws="."): - buff = [] - proc = subprocess.Popen( - argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=ws - ) - result, error = proc.communicate() - if result is not None: - c = result.decode("utf-8") - c = c.rstrip("\r\n") - # print(f"{c}") - buff.append(c) - - return proc.returncode, buff - - def _mf6_target_version(self, target=None): - exe = self._target_path_d[target] - return_code, buff = self._run_exe((exe, "-v")) - if return_code == 0: - version = buff[0].split()[1] - else: - version = None - return version - - def _set_targets(self): - self._target_path_d = None - target_ext, target_so = self._extensions() - - self._target_path_d = {} - for t in list(self._exe_targets): - is_lib = False - if self._exe_targets[t]["exe"] is None: - name = f"{t}{target_so}" - is_lib = True - else: - name = f"{self._exe_targets[t]['exe']}{target_ext}" - - target = self._target_pth( - name, target_t=self._exe_targets[t]["type"], is_lib=is_lib - ) - self._target_path_d[t] = target - - def _extensions(self): - target_ext = "" - target_so = ".so" - sysinfo = sys.platform.lower() - if sysinfo.lower() == "win32": - target_ext = ".exe" - target_so = ".dll" - elif sysinfo.lower() == "darwin": - target_so = ".dylib" - - return target_ext, target_so - - -class MFTestContext: - """setup test context for modflow tests""" - - def __init__( - self, - testbin: str = None, - use_path: bool = False, - update_exe: bool = False, - ): - """MFTestContext init""" - - self._testbin = os.path.abspath(testbin) - self._releasebin = os.path.abspath( - os.path.join(os.path.dirname(__file__), "bin") - ) - - builtbin = os.path.join(self._releasebin, "rebuilt") - - self._update = update_exe - - self._targets = MFTestTargets( - testbin=testbin, - releasebin=self._releasebin, - builtbin=builtbin, - use_path=use_path, - ) - - self._exe = MFTestExe( - releasebin=self._releasebin, - builtbin=builtbin, - targets=self._targets, - ) - - self._update_context() - - def get_target_dictionary(self): - """ - get target path dictionary - """ - return self._targets.target_paths() - - def get_mf6_version(self, version="mf6"): - """ - get mf6 version - """ - return self._targets.get_mf6_version(version=version) - - def _update_context(self): - if not self._exe.verify_exe() or ( - self._update and not self._exe.releases_current() - ): - self._exe.cleanup() - self._exe.build_mf6_release() - - self._targets.set_targets() - - -class MFTestExe: - """update and/or verify regression executables for test""" - - def __init__( - self, - releasebin: str = None, - builtbin: str = None, - targets: object = None, - ): - """MFTestExe init""" - - self._releasebin = releasebin - self._builtbin = builtbin - self._targets = targets - self._working_dir = os.path.abspath( - os.path.join(os.path.dirname(__file__), "temp") - ) - - def verify_exe(self): - """ - verify downloaded and built exe exist - """ - if not ( - os.path.isdir(self._releasebin) or os.path.isdir(self._builtbin) - ): - return False - - for t in self._targets.release_exe_names(): - if not os.path.isfile(os.path.join(self._releasebin, t)): - return False - - for t in self._targets.release_lib_names(): - if not os.path.isfile(os.path.join(self._releasebin, t)): - return False - - for t in self._targets.regression_exe_names(): - if not os.path.isfile(os.path.join(self._builtbin, t)): - return False - - for t in self._targets.regression_lib_names(): - if not os.path.isfile(os.path.join(self._builtbin, t)): - return False - - return True - - def releases_current(self): - """ - check downloaded versions against local db versions - """ - try: - with open(os.path.join(self._releasebin, "code.json")) as fh: - release_d = json.load(fh) - except: - return False - - program_d = usgs_program_data.get_program_dict() - exe_d = self._targets.target_exe_d() - if release_d and program_d: - for t in exe_d: - if t in release_d: - key = t - elif exe_d[t]["exe"] in release_d: - key = exe_d[t]["exe"] - if ( - key not in release_d - or release_d[key]["version"] != program_d[key]["version"] - ): - return False - - return True - - return False - - def build_mf6_release(self): - """ - download mf6 release source and build exe - """ - self._build_mf6_release() - - def cleanup(self): - """ - remove bins when possible - """ - shutil.rmtree(self._builtbin, ignore_errors=True) - shutil.rmtree(self._releasebin, ignore_errors=True) - - def _create_dirs(self): - pths = [self._releasebin, self._working_dir] - for pth in pths: - print(f"creating... {os.path.abspath(pth)}") - os.makedirs(pth, exist_ok=True) - errmsg = f"could not create... {os.path.abspath(pth)}" - assert os.path.exists(pth), errmsg - - def _build_mf6_release(self): - target_dict = usgs_program_data.get_target("mf6") - - download_and_unzip( - target_dict["url"], - pth=self._working_dir, - verbose=True, - ) - - # update IDEVELOP MODE in the release - srcpth = os.path.join( - self._working_dir, target_dict["dirname"], target_dict["srcdir"] - ) - fpth = os.path.join(srcpth, "Utilities", "version.f90") - with open(fpth) as f: - lines = f.read().splitlines() - assert len(lines) > 0, f"could not update {srcpth}" - - f = open(fpth, "w") - for line in lines: - tag = "IDEVELOPMODE = 0" - if tag in line: - line = line.replace(tag, "IDEVELOPMODE = 1") - f.write(f"{line}\n") - f.close() - - # build release source files with Meson - root_path = os.path.join(self._working_dir, target_dict["dirname"]) - bin_path = os.path.abspath(self._builtbin) - meson_build( - prj_path=os.getcwd(), - bld_path=root_path, - bin_path=bin_path, - lib_path=bin_path, - ) - - -def download_and_unzip( - url, - pth="./", - delete_zip=True, - timeout=30, - max_requests=10, - chunk_size=2048000, - verbose=False, -): - """Download and unzip a zip file from a url. - Parameters - ---------- - url : str - url address for the zip file - pth : str - path where the zip file will be saved (default is the current path) - delete_zip : bool - boolean indicating if the zip file should be deleted after it is - unzipped (default is True) - timeout : int - url request time out length (default is 30 seconds) - max_requests : int - number of url download request attempts (default is 10) - chunk_size : int - maximum url download request chunk size (default is 2048000 bytes) - verbose : bool - boolean indicating if output will be printed to the terminal - Returns - ------- - """ - - # create download directory - if not os.path.exists(pth): - if verbose: - print(f"Creating the directory:\n {pth}") - os.makedirs(pth) - - if verbose: - print(f"Attempting to download the file:\n {url}") - - # define the filename - file_name = os.path.join(pth, url.split("/")[-1]) - - # download the file - success = False - tic = timeit.default_timer() - - # open request - req = get_request( - url, - timeout=timeout, - max_requests=max_requests, - verbose=verbose, - ) - - # get content length, if available - tag = "Content-length" - if tag in req.headers: - file_size = req.headers[tag] - len_file_size = len(file_size) - file_size = int(file_size) - - bfmt = "{:" + f"{len_file_size}" + ",d}" - sbfmt = "{:>" + f"{len(bfmt.format(int(file_size)))}" + "s} bytes" - msg = f" file size: {sbfmt.format(bfmt.format(int(file_size)))}" - if verbose: - print(msg) - else: - file_size = 0.0 - - # download data from url - for idx in range(max_requests): - # print download attempt message - if verbose: - print(f" download attempt: {idx + 1}") - - # connection established - download the file - download_size = 0 - try: - with open(file_name, "wb") as f: - for chunk in req.iter_content(chunk_size=chunk_size): - if chunk: - # increment the counter - download_size += len(chunk) - - # write the chunk - f.write(chunk) - - # write information to the screen - if verbose: - if file_size > 0: - download_percent = float( - download_size - ) / float(file_size) - msg = ( - " downloaded " - + sbfmt.format(bfmt.format(download_size)) - + " of " - + bfmt.format(int(file_size)) - + " bytes" - + f" ({download_percent:10.4%})" - ) - else: - msg = ( - " downloaded " - + sbfmt.format(bfmt.format(download_size)) - + " bytes" - ) - print(msg) - else: - sys.stdout.write("") - sys.stdout.flush() - - success = True - except: - # reestablish request - req = get_request( - url, - timeout=timeout, - max_requests=max_requests, - verbose=verbose, - ) - - # try to download the data again - continue - - # terminate the download attempt loop - if success: - break - - # write the total download time - toc = timeit.default_timer() - tsec = toc - tic - if verbose: - print(f"\ntotal download time: {tsec} seconds") - - if success: - if file_size > 0: - if verbose: - print(f"download speed: {file_size / (1e6 * tsec)} MB/s") - else: - msg = f"could not download...{url}" - raise ConnectionError(msg) - - # Unzip the file, and delete zip file if successful. - if "zip" in os.path.basename(file_name) or "exe" in os.path.basename( - file_name - ): - z = MFZipFile(file_name) - try: - # write a message - if not verbose: - sys.stdout.write("\n") - print(f"uncompressing...'{file_name}'") - - # extract the files - z.extractall(pth) - except: - p = "Could not unzip the file. Stopping." - raise Exception(p) - z.close() - elif "tar" in os.path.basename(file_name): - ar = tarfile.open(file_name) - ar.extractall(path=pth) - ar.close() - - # delete the zipfile - if delete_zip: - if verbose: - print("Deleting the zipfile...") - os.remove(file_name) - - if verbose: - print("Done downloading and extracting...\n") - - return success diff --git a/modflow_devtools/cross_section.py b/modflow_devtools/cross_section.py deleted file mode 100644 index 57a9759..0000000 --- a/modflow_devtools/cross_section.py +++ /dev/null @@ -1,276 +0,0 @@ -import numpy as np - -# power for Manning's hydraulic radius term -mpow = 2.0 / 3.0 - - -def calculate_rectchan_mannings_discharge( - conversion_factor, roughness, slope, width, depth -): - """ - Calculate Manning's discharge for a rectangular channel. - - """ - area = width * depth - return conversion_factor * area * depth**mpow * slope**0.5 / roughness - - -# n-point cross-section functions -def get_wetted_station( - x0, - x1, - h0, - h1, - depth, -): - """Get the wetted length in the x-direction""" - # -- calculate the minimum and maximum depth - hmin = min(h0, h1) - hmax = max(h0, h1) - - # -- if depth is less than or equal to the minimum value the - # station length (xlen) is zero - if depth <= hmin: - x1 = x0 - # -- if depth is between hmin and hmax, station length is less - # than h1 - h0 - elif depth < hmax: - xlen = x1 - x0 - dlen = h1 - h0 - if abs(dlen) > 0.0: - slope = xlen / dlen - else: - slope = 0.0 - if h0 > h1: - dx = (depth - h1) * slope - xt = x1 + dx - xt0 = xt - xt1 = x1 - else: - dx = (depth - h0) * slope - xt = x0 + dx - xt0 = x0 - xt1 = xt - x0 = xt0 - x1 = xt1 - return x0, x1 - - -def get_wetted_perimeter( - x0, - x1, - h0, - h1, - depth, -): - # -- calculate the minimum and maximum depth - hmin = min(h0, h1) - hmax = max(h0, h1) - - # -- calculate the wetted perimeter for the segment - xlen = x1 - x0 - if xlen > 0.0: - if depth > hmax: - dlen = hmax - hmin - else: - dlen = depth - hmin - else: - if depth > hmin: - dlen = min(depth, hmax) - hmin - else: - dlen = 0.0 - return np.sqrt(xlen**2.0 + dlen**2.0) - - -def get_wetted_area(x0, x1, h0, h1, depth): - # -- calculate the minimum and maximum depth - hmin = min(h0, h1) - hmax = max(h0, h1) - - # -- calculate the wetted area for the segment - xlen = x1 - x0 - area = 0.0 - if xlen > 0.0: - # -- add the area above hmax - if depth > hmax: - area = xlen * (depth - hmax) - # -- add the area below zmax - if hmax != hmin and depth > hmin: - area += 0.5 * (depth - hmin) - return area - - -def wetted_area( - x, - h, - depth, - verbose=False, -): - area = 0.0 - if x.shape[0] == 1: - area = x[0] * depth - else: - for idx in range(0, x.shape[0] - 1): - x0, x1 = x[idx], x[idx + 1] - h0, h1 = h[idx], h[idx + 1] - - # get station data - x0, x1 = get_wetted_station(x0, x1, h0, h1, depth) - - # get wetted area - a = get_wetted_area(x0, x1, h0, h1, depth) - area += a - - # write to screen - if verbose: - print( - f"{idx}->{idx + 1} ({x0},{x1}) - " - f"perimeter={x1 - x0} - area={a}" - ) - - return area - - -def wetted_perimeter( - x, - h, - depth, - verbose=False, -): - perimeter = 0.0 - if x.shape[0] == 1: - perimeter = x[0] - else: - for idx in range(0, x.shape[0] - 1): - x0, x1 = x[idx], x[idx + 1] - h0, h1 = h[idx], h[idx + 1] - - # get station data - x0, x1 = get_wetted_station(x0, x1, h0, h1, depth) - - # get wetted perimeter - perimeter += get_wetted_perimeter(x0, x1, h0, h1, depth) - - # write to screen - if verbose: - print(f"{idx}->{idx + 1} ({x0},{x1}) - perimeter={x1 - x0}") - - return perimeter - - -def manningsq( - x, - h, - depth, - roughness=0.01, - slope=0.001, - conv=1.0, -): - if isinstance(roughness, float): - roughness = np.ones(x.shape, dtype=float) * roughness - if x.shape[0] > 1: - q = 0.0 - for i0 in range(x.shape[0] - 1): - i1 = i0 + 1 - perimeter = get_wetted_perimeter(x[i0], x[i1], h[i0], h[i1], depth) - area = get_wetted_area(x[i0], x[i1], h[i0], h[i1], depth) - if perimeter > 0.0: - radius = area / perimeter - q += ( - conv * area * radius**mpow * slope**0.5 / roughness[i0] - ) - else: - perimeter = wetted_perimeter(x, h, depth) - area = wetted_area(x, h, depth) - radius = 0.0 - if perimeter > 0.0: - radius = area / perimeter - q = conv * area * radius**mpow * slope**0.5 / roughness[0] - return q - - -def get_depths( - flows, - x, - h, - roughness=0.01, - slope=0.001, - conv=1.0, - dd=1e-4, - verbose=False, -): - if isinstance(flows, float): - flows = np.array([flows], dtype=float) - if isinstance(roughness, float): - roughness = np.ones(x.shape, dtype=float) * roughness - depths = np.zeros(flows.shape, dtype=float) - for idx, q in enumerate(flows): - depths[idx] = qtodepth( - x, - h, - q, - roughness=roughness, - slope=slope, - conv=conv, - dd=dd, - verbose=False, - ) - - return depths - - -def qtodepth( - x, - h, - q, - roughness=0.01, - slope=0.001, - conv=1.0, - dd=1e-4, - verbose=False, -): - h0 = 0.0 - q0 = manningsq( - x, - h, - h0, - roughness=roughness, - slope=slope, - conv=conv, - ) - r = q0 - q - - iter = 0 - if verbose: - print(f"iteration {iter:>2d} - residual={r}") - while abs(r) > 1e-12: - q1 = manningsq( - x, - h, - h0 + dd, - roughness=roughness, - slope=slope, - conv=conv, - ) - dq = q1 - q0 - if dq != 0.0: - derv = dd / (q1 - q0) - else: - derv = 0.0 - h0 -= derv * r - q0 = manningsq( - x, - h, - h0, - roughness=roughness, - slope=slope, - conv=conv, - ) - r = q0 - q - - iter += 1 - if verbose: - print(f"iteration {iter:>2d} - residual={r}") - if iter > 100: - break - return h0 diff --git a/modflow_devtools/fixtures.py b/modflow_devtools/fixtures.py new file mode 100644 index 0000000..8c12b85 --- /dev/null +++ b/modflow_devtools/fixtures.py @@ -0,0 +1,286 @@ +from collections import OrderedDict +from itertools import groupby +from os import PathLike, environ +from pathlib import Path +from shutil import copytree +from typing import Dict, List, Optional + +import pytest +from modflow_devtools.misc import get_mf6_ftypes, get_models + +# temporary directory fixtures + + +@pytest.fixture(scope="function") +def function_tmpdir(tmpdir_factory, request) -> Path: + node = ( + request.node.name.replace("/", "_") + .replace("\\", "_") + .replace(":", "_") + ) + temp = Path(tmpdir_factory.mktemp(node)) + yield Path(temp) + + keep = request.config.getoption("--keep") + if keep: + copytree(temp, Path(keep) / temp.name) + + keep_failed = request.config.getoption("--keep-failed") + if keep_failed and request.node.rep_call.failed: + copytree(temp, Path(keep_failed) / temp.name) + + +@pytest.fixture(scope="class") +def class_tmpdir(tmpdir_factory, request) -> Path: + assert ( + request.cls is not None + ), "Class-scoped temp dir fixture must be used on class" + temp = Path(tmpdir_factory.mktemp(request.cls.__name__)) + yield temp + + keep = request.config.getoption("--keep") + if keep: + copytree(temp, Path(keep) / temp.name) + + +@pytest.fixture(scope="module") +def module_tmpdir(tmpdir_factory, request) -> Path: + temp = Path(tmpdir_factory.mktemp(request.module.__name__)) + yield temp + + keep = request.config.getoption("--keep") + if keep: + copytree(temp, Path(keep) / temp.name) + + +@pytest.fixture(scope="session") +def session_tmpdir(tmpdir_factory, request) -> Path: + temp = Path(tmpdir_factory.mktemp(request.session.name)) + yield temp + + keep = request.config.getoption("--keep") + if keep: + copytree(temp, Path(keep) / temp.name) + + +# environment-dependent fixtures + + +@pytest.fixture +def repos_path() -> Optional[Path]: + """Path to directory containing test model and example repositories""" + return environ.get("REPOS_PATH", None) + + +# pytest configuration hooks + + +def pytest_addoption(parser): + parser.addoption( + "-K", + "--keep", + action="store", + default=None, + help="Move the contents of temporary test directories to correspondingly named subdirectories at the given " + "location after tests complete. This option can be used to exclude test results from automatic cleanup, " + "e.g. for manual inspection. The provided path is created if it does not already exist. An error is " + "thrown if any matching files already exist.", + ) + + parser.addoption( + "--keep-failed", + action="store", + default=None, + help="Move the contents of temporary test directories to correspondingly named subdirectories at the given " + "location if the test case fails. This option automatically saves the outputs of failed tests in the " + "given location. The path is created if it doesn't already exist. An error is thrown if files with the " + "same names already exist in the given location.", + ) + + parser.addoption( + "-S", + "--smoke", + action="store_true", + default=False, + help="Run only smoke tests (should complete in <1 minute).", + ) + + +def pytest_runtest_setup(item): + # smoke tests are \ {slow U example U regression} + smoke = item.config.getoption("--smoke") + slow = list(item.iter_markers(name="slow")) + example = list(item.iter_markers(name="example")) + regression = list(item.iter_markers(name="regression")) + if smoke and (slow or example or regression): + pytest.skip() + + +@pytest.hookimpl(hookwrapper=True, tryfirst=True) +def pytest_runtest_makereport(item, call): + # this is necessary so temp dir fixtures can + # inspect test results and check for failure + # (see https://doc.pytest.org/en/latest/example/simple.html#making-test-result-information-available-in-fixtures) + + outcome = yield + rep = outcome.get_result() + + # report attribute for each phase (setup, call, teardown) + # we're only interested in result of the function call + setattr(item, "rep_" + rep.when, rep) + + +def pytest_generate_tests(metafunc): + models_selected = metafunc.config.getoption("--model", None) + packages_selected = metafunc.config.getoption("--package", None) + repos_path = environ.get("REPOS_PATH") + + key = "test_model_mf6" + if key in metafunc.fixturenames: + models = ( + get_models( + repos_path / "modflow6-testmodels" / "mf6", + prefix="test", + excluded=["test205_gwtbuy-henrytidal"], + selected=models_selected, + packages=packages_selected, + ) + if repos_path + else [] + ) + metafunc.parametrize(key, models, ids=[m.name for m in models]) + + key = "test_model_mf5to6" + if key in metafunc.fixturenames: + models = ( + get_models( + repos_path / "modflow6-testmodels" / "mf5to6", + prefix="test", + namefile="*.nam", + excluded=["test205_gwtbuy-henrytidal"], + selected=models_selected, + packages=packages_selected, + ) + if repos_path + else [] + ) + metafunc.parametrize(key, models, ids=[m.name for m in models]) + + key = "large_test_model" + if key in metafunc.fixturenames: + models = ( + get_models( + repos_path / "modflow6-largetestmodels", + prefix="test", + namefile="*.nam", + excluded=[], + selected=models_selected, + packages=packages_selected, + ) + if repos_path + else [] + ) + metafunc.parametrize(key, models, ids=[m.name for m in models]) + + key = "example_scenario" + if key in metafunc.fixturenames: + + def example_namfile_is_nested(namfile_path: PathLike) -> bool: + p = Path(namfile_path) + if not p.is_file() or not p.name.endswith(".nam"): + raise ValueError(f"Expected namefile path, got {p}") + + return p.parent.parent.name == "examples " + + def example_name_from_namfile_path(path: PathLike) -> str: + p = Path(path) + if not p.is_file() or not p.name.endswith(".nam"): + raise ValueError(f"Expected namefile path, got {p}") + + return ( + p.parent.parent.name + if example_namfile_is_nested(p) + else p.parent.name + ) + + def group_examples(namefile_paths) -> Dict[str, List[Path]]: + d = OrderedDict() + + for name, paths in groupby( + namefile_paths, key=example_name_from_namfile_path + ): + # sort alphabetically (gwf < gwt) + nfpaths = sorted(list(paths)) + + # skip if no models found + if len(nfpaths) == 0: + continue + + d[name] = nfpaths + + return d + + def get_examples(): + examples_excluded = [ + "ex-gwf-csub-p02c", + "ex-gwt-gwtgwt-mt3dms-p10", + ] + + # find and filter namfiles + namfiles = [ + p + for p in (repos_path / "modflow6-examples" / "examples").rglob( + "mfsim.nam" + ) + ] + namfiles = [ + p + for p in namfiles + if (not any(e in str(p) for e in examples_excluded)) + ] + + # group example scenarios with multiple models + examples = group_examples(namfiles) + + # filter by example name (optional) + if models_selected: + examples = { + name: nfps + for name, nfps in examples.items() + if any(s in name for s in models_selected) + } + + # filter by package (optional) + if packages_selected: + filtered = [] + for name, namefiles in examples.items(): + ftypes = [] + for namefile in namefiles: + ftype = get_mf6_ftypes(namefile, packages_selected) + if ftype not in ftypes: + ftypes += ftype + if len(ftypes) > 0: + ftypes = [item.upper() for item in ftypes] + for pkg in packages_selected: + if pkg in ftypes: + filtered.append(name) + break + examples = { + name: nfps + for name, nfps in examples.items() + if name in filtered + } + + # remove mf6gwf and mf6gwt + examples = { + name: nfps + for name, nfps in examples.items() + if name not in ["mf6gwf", "mf6gwt"] + } + + examples = get_examples() if repos_path else dict() + metafunc.parametrize( + key, + [(name, nfps) for name, nfps in examples.items()], + ids=[name for name, ex in examples.items()], + ) diff --git a/modflow_devtools/framework.py b/modflow_devtools/framework.py index 695f319..a1273e5 100644 --- a/modflow_devtools/framework.py +++ b/modflow_devtools/framework.py @@ -1,773 +1,933 @@ import os import shutil -import sys -import time - -import flopy -import modflow_devtools as devtools -import numpy as np - - -class Framework: - def build(self, build_function, idx, exdir): - """ - Build base and regression MODFLOW 6 models - - Parameters - ---------- - build_function : function - user defined function that builds a base model and optionally - builds a regression model. If a regression model is not built - then None must be returned from the function for the regression - model. - idx : int - counter that corresponds to exdir entry - exdir : str - path to regression model files - """ - base, regression = build_function(idx, exdir) - base.write_simulation() - if regression is not None: - if isinstance(regression, flopy.mf6.MFSimulation): - regression.write_simulation() - else: - regression.write_input() - - def run(self, config, workspace): - """ - Run the MODFLOW 6 test and compare to existing head file or - MODFLOW-2005, MODFLOW-NWT, MODFLOW-USG, or MODFLOW-LGR run. - - Parameters - ---------- - config : TestConfig object - Modflow 6 test configuration object that runs the base and - regression models, compares the results, and tears down the - test if successful. - workspace : Pathlike - The test workspace - """ - config.set_model(workspace, testModel=False) - config.run() - config.compare() - if config.exfunc is not None: - config.exfunc(config) - - -class TestConfig: - # tell pytest this isn't a test class, don't collect it - __test__ = False - - sfmt = "{:25s} - {}" - extdict = { - "hds": "head", - "hed": "head", - "bhd": "head", - "ucn": "concentration", - "cbc": "cell-by-cell", - } - - def __init__( - self, - name, - targets, - exfunc=None, - exe_dict=None, - htol=None, - pdtol=None, - rclose=None, - idxsim=None, - cmp_verbose=True, - require_failure=None, - api_func=None, - mf6_regression=False, - make_comparison=True, - ): - self.targets = targets - - for idx, arg in enumerate(sys.argv): - if arg[2:].lower() in list(self.targets.keys()): - key = arg[2:].lower() - exe0 = self.targets[key] - exe = os.path.join(os.path.dirname(exe0), sys.argv[idx + 1]) - msg = ( - f"replacing {key} executable " - + f'"{self.targets[key]}" with ' - + f'"{exe}".' - ) - print(msg) - self.targets[key] = exe - - if exe_dict is not None: - if not isinstance(exe_dict, dict): - msg = "exe_dict must be a dictionary" - assert False, msg - keys = list(self.targets.keys()) - for key, value in exe_dict.items(): - if key in keys: - exe0 = self.targets[key] - exe = os.path.join(os.path.dirname(exe0), value) - msg = ( - f"replacing {key} executable " - + f'"{self.targets[key]}" with ' - + f'"{exe}".' - ) - print(msg) - self.targets[key] = exe - - msg = self.sfmt.format("Initializing test", name) - print(msg) - self.name = name - self.exfunc = exfunc - self.simpath = None - self.inpt = None - self.outp = None - self.coutp = None - self.api_func = api_func - self.mf6_regression = mf6_regression - self.make_comparison = make_comparison - self.action = None - - # set htol for comparisons - if htol is None: - htol = 0.001 - else: - msg = self.sfmt.format("User specified comparison htol", htol) - print(msg) - - self.htol = htol - # set pdtol for comparisons - if pdtol is None: - pdtol = 0.001 - else: - msg = self.sfmt.format( - "User specified percent difference comparison pdtol", pdtol - ) - print(msg) - - self.pdtol = pdtol - - # set rclose for comparisons - if rclose is None: - rclose = 0.001 +ignore_ext = ( + ".hds", + ".hed", + ".bud", + ".cbb", + ".cbc", + ".ddn", + ".ucn", + ".glo", + ".lst", + ".list", + ".gwv", + ".mv", + ".out", +) + + +def model_setup(namefile, dst, remove_existing=True, extrafiles=None): + """Setup MODFLOW-based model files for autotests. + + Parameters + ---------- + namefile : str + MODFLOW-based model name file. + dst : str + destination path for comparison model or file(s) + remove_existing : bool + boolean indicating if an existing comparision model or file(s) should + be replaced (default is True) + extrafiles : str or list of str + list of extra files to include in the comparision + + Returns + ------- + + """ + # Construct src pth from namefile or lgr file + src = os.path.dirname(namefile) + + # Create the destination folder, if required + create_dir = False + if os.path.exists(dst): + if remove_existing: + print("Removing folder " + dst) + shutil.rmtree(dst) + create_dir = True + else: + create_dir = True + if create_dir: + os.mkdir(dst) + + # determine if a namefile is a lgr control file - get individual + # name files out of the lgr control file + namefiles = [namefile] + ext = os.path.splitext(namefile)[1] + if ".lgr" in ext.lower(): + lines = [line.rstrip("\n") for line in open(namefile)] + for line in lines: + if len(line) < 1: + continue + if line[0] == "#": + continue + t = line.split() + if ".nam" in t[0].lower(): + fpth = os.path.join(src, t[0]) + namefiles.append(fpth) + + # Make list of files to copy + files2copy = [] + for fpth in namefiles: + files2copy.append(os.path.basename(fpth)) + ext = os.path.splitext(fpth)[1] + # copy additional files contained in the name file and + # associated package files + if ext.lower() == ".nam": + fname = os.path.abspath(fpth) + files2copy = files2copy + get_input_files(fname) + + if extrafiles is not None: + if isinstance(extrafiles, str): + extrafiles = [extrafiles] + for fl in extrafiles: + files2copy.append(os.path.basename(fl)) + + # Copy the files + for f in files2copy: + srcf = os.path.join(src, f) + dstf = os.path.join(dst, f) + + # Check to see if dstf is going into a subfolder, and create that + # subfolder if it doesn't exist + sf = os.path.dirname(dstf) + if not os.path.isdir(sf): + os.makedirs(sf) + + # Now copy the file + if os.path.exists(srcf): + print("Copy file '" + srcf + "' -> '" + dstf + "'") + shutil.copy(srcf, dstf) else: - msg = self.sfmt.format( - "User specified percent difference comparison rclose", rclose - ) - print(msg) - - self.rclose = rclose - - # set index for multi-simulation comparisons - self.idxsim = idxsim - - # set compare verbosity - self.cmp_verbose = cmp_verbose - - # set allow failure - self.require_failure = require_failure - - self.success = False - - def __repr__(self): - return self.name - - def set_model(self, pth, testModel=True): - """ - Set paths to MODFLOW 6 model and associated comparison test - """ - # make sure this is a valid path - if not os.path.isdir(pth): - assert False, f"{pth} is not a valid directory" - - self.simpath = pth - - # get MODFLOW 6 output file names - fpth = os.path.join(pth, "mfsim.nam") - mf6inp, mf6outp = devtools.get_mf6_files(fpth) - self.outp = mf6outp - - # determine comparison model - self.setup_comparison(pth, pth, testModel=testModel) - # if self.mf6_regression: - # self.action = "mf6-regression" - # else: - # self.action = devtools.get_mf6_comparison(pth) - if self.action is not None: - if "mf6" in self.action or "mf6-regression" in self.action: - cinp, self.coutp = devtools.get_mf6_files(fpth) - - def setup(self, src, dst): - msg = self.sfmt.format("Setup test", self.name) - self.originpath = src - self.simpath = dst - self.inpt, self.outp = devtools.setup_mf6(src=src, dst=dst) - time.sleep(0.5) - self.setup_comparison(src, dst) - - def setup_comparison(self, src, dst, testModel=True): - - # evaluate if comparison should be made - if not self.make_comparison: - return - - # adjust htol if it is smaller than IMS outer_dvclose - dvclose = self._get_dvclose(dst) - if dvclose is not None: - dvclose *= 5.0 - if self.htol < dvclose: - self.htol = dvclose - - # get rclose to use with budget comparisons - rclose = self._get_rclose(dst) - if rclose is None: - rclose = 0.5 + print(srcf + " does not exist") + + +def setup_comparison(namefile, dst, remove_existing=True): + """Setup a comparison model or comparision file(s) for a MODFLOW-based + model. + + Parameters + ---------- + namefile : str + MODFLOW-based model name file. + dst : str + destination path for comparison model or file(s) + remove_existing : bool + boolean indicating if an existing comparision model or file(s) should + be replaced (default is True) + + + Returns + ------- + + """ + # Construct src pth from namefile + src = os.path.dirname(namefile) + action = None + for root, dirs, files in os.walk(src): + dl = [d.lower() for d in dirs] + if any(".cmp" in s for s in dl): + idx = None + for jdx, d in enumerate(dl): + if ".cmp" in d: + idx = jdx + break + if idx is not None: + if "mf2005.cmp" in dl[idx] or "mf2005" in dl[idx]: + action = dirs[idx] + elif "mfnwt.cmp" in dl[idx] or "mfnwt" in dl[idx]: + action = dirs[idx] + elif "mfusg.cmp" in dl[idx] or "mfusg" in dl[idx]: + action = dirs[idx] + elif "mf6.cmp" in dl[idx] or "mf6" in dl[idx]: + action = dirs[idx] + elif "libmf6.cmp" in dl[idx] or "libmf6" in dl[idx]: + action = dirs[idx] + else: + action = dirs[idx] + break + if action is not None: + dst = os.path.join(dst, f"{action}") + if not os.path.isdir(dst): + try: + os.mkdir(dst) + except: + print("Could not make " + dst) + # clean directory else: - rclose *= 5.0 - self.rclose = rclose - - # Copy comparison simulations if available - if self.mf6_regression: - action = "mf6-regression" - pth = os.path.join(dst, action) - if os.path.isdir(pth): - shutil.rmtree(pth) - shutil.copytree(dst, pth) - elif testModel: - action = devtools.setup_mf6_comparison(src, dst) + print(f"cleaning...{dst}") + for root, dirs, files in os.walk(dst): + for f in files: + tpth = os.path.join(root, f) + print(f" removing...{tpth}") + os.remove(tpth) + for d in dirs: + tdir = os.path.join(root, d) + print(f" removing...{tdir}") + shutil.rmtree(tdir) + # copy files + cmppth = os.path.join(src, action) + files = os.listdir(cmppth) + files2copy = [] + if action.lower() == ".cmp": + for file in files: + if ".cmp" in os.path.splitext(file)[1].lower(): + files2copy.append(os.path.join(cmppth, file)) + for srcf in files2copy: + f = os.path.basename(srcf) + dstf = os.path.join(dst, f) + # Now copy the file + if os.path.exists(srcf): + print("Copy file '" + srcf + "' -> '" + dstf + "'") + shutil.copy(srcf, dstf) + else: + print(srcf + " does not exist") else: - action = devtools.get_mf6_comparison(dst) - - self.action = action - - def run(self): - """ - Run the model and assert if the model terminated successfully - """ - msg = self.sfmt.format("Run test", self.name) - print(msg) - - # Set nam as namefile name without path - nam = None - - # run mf6 models - exe = self.targets["mf6"] - msg = self.sfmt.format("using executable", exe) - print(msg) + for file in files: + if ".nam" in os.path.splitext(file)[1].lower(): + files2copy.append( + os.path.join(cmppth, os.path.basename(file)) + ) + nf = os.path.join(src, action, os.path.basename(file)) + model_setup(nf, dst, remove_existing=remove_existing) + break + + return action + + +def get_input_files(namefile): + """Return a list of all the input files in this model. + + Parameters + ---------- + namefile : str + path to a MODFLOW-based model name file + + Returns + ------- + filelist : list + list of MODFLOW-based model input files + + """ + srcdir = os.path.dirname(namefile) + filelist = [] + fname = os.path.join(srcdir, namefile) + with open(fname, "r") as f: + lines = f.readlines() + + for line in lines: + ll = line.strip().split() + if len(ll) < 2: + continue + if line.strip()[0] in ["#", "!"]: + continue + ext = os.path.splitext(ll[2])[1] + if ext.lower() not in ignore_ext: + if len(ll) > 3: + if "replace" in ll[3].lower(): + continue + filelist.append(ll[2]) + + # Now go through every file and look for other files to copy, + # such as 'OPEN/CLOSE'. If found, then add that file to the + # list of files to copy. + otherfiles = [] + for fname in filelist: + fname = os.path.join(srcdir, fname) try: - success, buff = flopy.run_model( - exe, - nam, - model_ws=self.simpath, - silent=False, - report=True, - ) - msg = self.sfmt.format("MODFLOW 6 run", self.name) - if success: - print(msg) - else: - print(msg) + f = open(fname, "r") + for line in f: + + # Skip invalid lines + ll = line.strip().split() + if len(ll) < 2: + continue + if line.strip()[0] in ["#", "!"]: + continue + + if "OPEN/CLOSE" in line.upper(): + for i, s in enumerate(ll): + if "OPEN/CLOSE" in s.upper(): + stmp = ll[i + 1] + stmp = stmp.replace('"', "") + stmp = stmp.replace("'", "") + otherfiles.append(stmp) + break except: - msg = self.sfmt.format("MODFLOW 6 run", self.name) - print(msg) - success = False - - # set failure based on success and require_failure setting - if self.require_failure is None: - msg = "MODFLOW 6 model did not terminate normally" - if success: - failure = False - else: - failure = True + print(fname + " does not exist") + + filelist = filelist + otherfiles + + return filelist + + +def get_namefiles(pth, exclude=None): + """Search through a path (pth) for all .nam files. + + Parameters + ---------- + pth : str + path to model files + exclude : str or lst + File or list of files to exclude from the search (default is None) + + Returns + ------- + namefiles : lst + List of namefiles with paths + + """ + namefiles = [] + for root, _, files in os.walk(pth): + namefiles += [ + os.path.join(root, file) for file in files if file.endswith(".nam") + ] + if exclude is not None: + if isinstance(exclude, str): + exclude = [exclude] + exclude = [e.lower() for e in exclude] + pop_list = [] + for namefile in namefiles: + for e in exclude: + if e in namefile.lower(): + pop_list.append(namefile) + for e in pop_list: + namefiles.remove(e) + + return namefiles + + +def get_entries_from_namefile(namefile, ftype=None, unit=None, extension=None): + """Get entries from a namefile. Can select using FTYPE, UNIT, or file + extension. + + Parameters + ---------- + namefile : str + path to a MODFLOW-based model name file + ftype : str + package type + unit : int + file unit number + extension : str + file extension + + Returns + ------- + entries : list of tuples + list of tuples containing FTYPE, UNIT, FNAME, STATUS for each + namefile entry that meets a user-specified value. + + """ + entries = [] + f = open(namefile, "r") + for line in f: + if line.strip() == "": + continue + if line[0] == "#": + continue + ll = line.strip().split() + if len(ll) < 3: + continue + status = "UNKNOWN" + if len(ll) > 3: + status = ll[3].upper() + if ftype is not None: + if ftype.upper() == ll[0].upper(): + filename = os.path.join(os.path.split(namefile)[0], ll[2]) + entries.append((filename, ll[0], ll[1], status)) + elif unit is not None: + if int(unit) == int(ll[1]): + filename = os.path.join(os.path.split(namefile)[0], ll[2]) + entries.append((filename, ll[0], ll[1], status)) + elif extension is not None: + filename = os.path.join(os.path.split(namefile)[0], ll[2]) + ext = os.path.splitext(filename)[1] + if len(ext) > 0: + if ext[0] == ".": + ext = ext[1:] + if extension.lower() == ext.lower(): + entries.append((filename, ll[0], ll[1], status)) + f.close() + if len(entries) < 1: + entries.append((None, None, None, None)) + return entries + + +def get_sim_name(namefiles, rootpth=None): + """Get simulation name. + + Parameters + ---------- + namefiles : str or list of strings + path(s) to MODFLOW-based model name files + rootpth : str + optional root directory path (default is None) + + Returns + ------- + simname : list + list of namefiles without the file extension + + """ + if isinstance(namefiles, str): + namefiles = [namefiles] + sim_name = [] + for namefile in namefiles: + t = namefile.split(os.sep) + if rootpth is None: + idx = -1 else: - if self.require_failure: - msg = "MODFLOW 6 model should have failed" - if not success: - failure = False - else: - failure = True - else: - msg = "MODFLOW 6 model should not have failed" - if success: - failure = False - else: - failure = True - - # print end of mfsim.lst to the screen - if failure: - fpth = os.path.join(self.simpath, "mfsim.lst") - msg = self._get_mfsim_listing(fpth) + msg - - # test for failure - assert not failure, msg - - self.nam_cmp = None - if success: - if self.action is not None: - if self.action.lower() == "compare": - msg = self.sfmt.format("Comparison files", self.name) - print(msg) + idx = t.index(os.path.split(rootpth)[1]) + + # build dst with everything after the rootpth and before + # the namefile file name. + dst = "" + if idx < len(t): + for d in t[idx + 1 : -1]: + dst += f"{d}_" + + # add namefile basename without extension + dst += t[-1].replace(".nam", "") + sim_name.append(dst) + + return sim_name + + +def setup_mf6( + src, dst, mfnamefile="mfsim.nam", extrafiles=None, remove_existing=True +): + """Copy all of the MODFLOW 6 input files from the src directory to the dst + directory. + + Parameters + ---------- + src : src + directory path with original MODFLOW 6 input files + dst : str + directory path that original MODFLOW 6 input files will be copied to + mfnamefile : str + optional MODFLOW 6 simulation name file (default is mfsim.nam) + extrafiles : bool + boolean indicating if extra files should be included (default is None) + remove_existing : bool + boolean indicating if existing file in dst should be removed (default + is True) + + Returns + ------- + mf6inp : list + list of MODFLOW 6 input files + mf6outp : list + list of MODFLOW 6 output files + + """ + + # Create the destination folder + create_dir = False + if os.path.exists(dst): + if remove_existing: + print("Removing folder " + dst) + shutil.rmtree(dst) + create_dir = True + else: + create_dir = True + if create_dir: + os.makedirs(dst) + + # Make list of files to copy + fname = os.path.join(src, mfnamefile) + fname = os.path.abspath(fname) + mf6inp, mf6outp = get_mf6_files(fname) + files2copy = [mfnamefile] + mf6inp + + # determine if there are any .ex files + exinp = [] + for f in mf6outp: + ext = os.path.splitext(f)[1] + if ext.lower() == ".hds": + pth = os.path.join(src, f + ".ex") + if os.path.isfile(pth): + exinp.append(f + ".ex") + if len(exinp) > 0: + files2copy += exinp + if extrafiles is not None: + files2copy += extrafiles + + # Copy the files + for f in files2copy: + srcf = os.path.join(src, f) + dstf = os.path.join(dst, f) + + # Check to see if dstf is going into a subfolder, and create that + # subfolder if it doesn't exist + sf = os.path.dirname(dstf) + if not os.path.isdir(sf): + try: + os.mkdir(sf) + except: + print("Could not make " + sf) + + # Now copy the file + if os.path.exists(srcf): + print("Copy file '" + srcf + "' -> '" + dstf + "'") + shutil.copy(srcf, dstf) + else: + print(srcf + " does not exist") + + return mf6inp, mf6outp + + +def get_mf6_comparison(src): + """Determine comparison type for MODFLOW 6 simulation. + + Parameters + ---------- + src : str + directory path to search for comparison types + + Returns + ------- + action : str + comparison type + + """ + action = None + # Possible comparison - the order matters + optcomp = ( + "compare", + ".cmp", + "mf2005", + "mf2005.cmp", + "mfnwt", + "mfnwt.cmp", + "mfusg", + "mfusg.cmp", + "mflgr", + "mflgr.cmp", + "libmf6", + "libmf6.cmp", + "mf6", + "mf6.cmp", + ) + # Construct src pth from namefile + action = None + for _, dirs, _ in os.walk(src): + dl = [d.lower() for d in dirs] + for oc in optcomp: + if any(oc in s for s in dl): + action = oc + break + return action + + +def setup_mf6_comparison(src, dst, remove_existing=True): + """Setup comparision for MODFLOW 6 simulation. + + Parameters + ---------- + src : src + directory path with original MODFLOW 6 input files + dst : str + directory path that original MODFLOW 6 input files will be copied to + remove_existing : bool + boolean indicating if existing file in dst should be removed (default + is True) + + Returns + ------- + action : str + comparison type + + """ + # get the type of comparison to use (compare, mf2005, etc.) + action = get_mf6_comparison(src) + + if action is not None: + dst = os.path.join(dst, f"{action}") + if not os.path.isdir(dst): + try: + os.mkdir(dst) + except: + print("Could not make " + dst) + # clean directory + else: + print(f"cleaning...{dst}") + for root, dirs, files in os.walk(dst): + for f in files: + tpth = os.path.join(root, f) + print(f" removing...{tpth}") + os.remove(tpth) + for d in dirs: + tdir = os.path.join(root, d) + print(f" removing...{tdir}") + shutil.rmtree(tdir) + # copy files + cmppth = os.path.join(src, action) + files = os.listdir(cmppth) + files2copy = [] + if action.lower() == "compare" or action.lower() == ".cmp": + for file in files: + if ".cmp" in os.path.splitext(file)[1].lower(): + files2copy.append(os.path.join(cmppth, file)) + for srcf in files2copy: + f = os.path.basename(srcf) + dstf = os.path.join(dst, f) + # Now copy the file + if os.path.exists(srcf): + print("Copy file '" + srcf + "' -> '" + dstf + "'") + shutil.copy(srcf, dstf) else: - cpth = os.path.join(self.simpath, self.action) - key = self.action.lower().replace(".cmp", "") - exe = os.path.abspath(self.targets[key]) - msg = self.sfmt.format("comparison executable", exe) - print(msg) - if ( - "mf6" in key - or "libmf6" in key - or "mf6-regression" in key - ): - nam = None - else: - npth = devtools.get_namefiles(cpth)[0] - nam = os.path.basename(npth) - self.nam_cmp = nam - try: - if self.api_func is None: - success_cmp, buff = flopy.run_model( - exe, - nam, - model_ws=cpth, - silent=False, - report=True, - ) - else: - success_cmp, buff = self.api_func( - exe, self.idxsim, model_ws=cpth - ) - msg = self.sfmt.format( - "Comparison run", self.name + "/" + key - ) - print(msg) - - # print end of mfsim.lst to the screen - if "mf6" in key: - if not success: - fpth = os.path.join(cpth, "mfsim.lst") - print(self._get_mfsim_listing(fpth)) - - except: - success_cmp = False - msg = self.sfmt.format( - "Comparison run", self.name + "/" + key - ) - print(msg) - - assert success_cmp, "Unsuccessful comparison run" - - def compare(self): - """ - Compare the model results - - """ - self.success = True - - # evaluate if comparison should be made - if not self.make_comparison: - return - - msgall = "" - msg = self.sfmt.format("Comparison test", self.name) - print(msg) - - if self.action is not None: - cpth = os.path.join(self.simpath, self.action) - files_cmp = None - if self.action.lower() == "compare": - files_cmp = [] - files = os.listdir(cpth) + print(srcf + " does not exist") + else: + if "mf6" in action.lower(): for file in files: - files_cmp.append(file) - elif "mf6" in self.action: - fpth = os.path.join(cpth, "mfsim.nam") - cinp, self.coutp = devtools.get_mf6_files(fpth) - - head_extensions = ( - "hds", - "hed", - "bhd", - "ahd", - "bin", - ) - if "mf6-regression" in self.action: - success, msgall = self._compare_heads( - msgall, - extensions=head_extensions, - ) - if not success: - self.success = False - # non-regression runs - for new features + if "mfsim.nam" in file.lower(): + srcf = os.path.join(cmppth, os.path.basename(file)) + files2copy.append(srcf) + srcdir = os.path.join(src, action) + setup_mf6(srcdir, dst, remove_existing=remove_existing) + break else: - files1 = [] - files2 = [] - exfiles = [] - ipos = 0 - for file1 in self.outp: - ext = os.path.splitext(file1)[1][1:] - - if ext.lower() in head_extensions: - - # simulation file - pth = os.path.join(self.simpath, file1) - files1.append(pth) - - # look for an exclusion file - pth = os.path.join(self.simpath, file1 + ".ex") - if os.path.isfile(pth): - exfiles.append(pth) - else: - exfiles.append(None) - - # Check to see if there is a corresponding compare file - if files_cmp is not None: - - if file1 + ".cmp" in files_cmp: - # compare file - idx = files_cmp.index(file1 + ".cmp") - pth = os.path.join(cpth, files_cmp[idx]) - files2.append(pth) - txt = self.sfmt.format( - f"Comparison file {ipos + 1}", - os.path.basename(pth), - ) - print(txt) - else: - if self.coutp is not None: - for file2 in self.coutp: - ext = os.path.splitext(file2)[1][1:] - - if ext.lower() in head_extensions: - # simulation file - pth = os.path.join(cpth, file2) - files2.append(pth) - - else: - files2.append(None) - - if self.nam_cmp is None: - pth = None - else: - pth = os.path.join(cpth, self.nam_cmp) - - for ipos in range(len(files1)): - file1 = files1[ipos] - ext = os.path.splitext(file1)[1][1:].lower() - outfile = os.path.splitext(os.path.basename(file1))[0] - outfile = os.path.join( - self.simpath, outfile + "." + ext + ".cmp.out" - ) - if files2 is None: - file2 = None - else: - file2 = files2[ipos] - - # set exfile - exfile = None - if file2 is None: - if len(exfiles) > 0: - exfile = exfiles[ipos] - if exfile is not None: - txt = self.sfmt.format( - f"Exclusion file {ipos + 1}", - os.path.basename(exfile), - ) - print(txt) - - # make comparison - success_tst = devtools.compare_heads( - None, - pth, - precision="double", - text=self.extdict[ext], - outfile=outfile, - files1=file1, - files2=file2, - htol=self.htol, - difftol=True, - # Change to true to have list of all nodes exceeding htol - verbose=self.cmp_verbose, - exfile=exfile, - ) - msg = self.sfmt.format( - f"{self.extdict[ext]} comparison {ipos + 1}", - self.name, - ) - print(msg) - - if not success_tst: - self.success = False - msgall += msg + " ... FAILED\n" - - # compare concentrations - if "mf6-regression" in self.action: - success, msgall = self._compare_concentrations(msgall) - if not success: - self.success = False - - # compare cbc files - if "mf6-regression" in self.action: - cbc_extensions = ( - "cbc", - "bud", - ) - success, msgall = self._compare_budgets( - msgall, extensions=cbc_extensions - ) - if not success: - self.success = False - - assert self.success, msgall - - def _get_mfsim_listing(self, lst_pth): - """Get the tail of the mfsim.lst listing file""" - msg = "" - ilen = 100 - with open(lst_pth) as fp: - lines = fp.read().splitlines() - msg = "\n" + 79 * "-" + "\n" - if len(lines) > ilen: - i0 = -100 - else: - i0 = 0 - for line in lines[i0:]: - if len(line) > 0: - msg += f"{line}\n" - msg += 79 * "-" + "\n\n" - return msg - - def _get_dvclose(self, dir_pth): - """Get outer_dvclose value from MODFLOW 6 ims file""" - dvclose = None - files = os.listdir(dir_pth) - for file_name in files: - pth = os.path.join(dir_pth, file_name) - if os.path.isfile(pth): - if file_name.lower().endswith(".ims"): - with open(pth) as f: - lines = f.read().splitlines() - for line in lines: - if "outer_dvclose" in line.lower(): - v = float(line.split()[1]) - if dvclose is None: - dvclose = v - else: - if v > dvclose: - dvclose = v + for file in files: + if ".nam" in os.path.splitext(file)[1].lower(): + srcf = os.path.join(cmppth, os.path.basename(file)) + files2copy.append(srcf) + nf = os.path.join(src, action, os.path.basename(file)) + model_setup(nf, dst, remove_existing=remove_existing) + break + + return action + + +def get_mf6_nper(tdisfile): + """Return the number of stress periods in the MODFLOW 6 model. + + Parameters + ---------- + tdisfile : str + path to the TDIS file + + Returns + ------- + nper : int + number of stress periods in the simulation + + """ + with open(tdisfile, "r") as f: + lines = f.readlines() + line = [line for line in lines if "NPER" in line.upper()][0] + nper = line.strip().split()[1] + return nper + + +def get_mf6_mshape(disfile): + """Return the shape of the MODFLOW 6 model. + + Parameters + ---------- + disfile : str + path to a MODFLOW 6 discretization file + + Returns + ------- + mshape : tuple + tuple with the shape of the MODFLOW 6 model. + + """ + with open(disfile, "r") as f: + lines = f.readlines() + + d = {} + for line in lines: + + # Skip over blank and commented lines + ll = line.strip().split() + if len(ll) < 2: + continue + if line.strip()[0] in ["#", "!"]: + continue + + for key in ["NODES", "NCPL", "NLAY", "NROW", "NCOL"]: + if ll[0].upper() in key: + d[key] = int(ll[1]) + + if "NODES" in d: + mshape = (d["NODES"],) + elif "NCPL" in d: + mshape = (d["NLAY"], d["NCPL"]) + elif "NLAY" in d: + mshape = (d["NLAY"], d["NROW"], d["NCOL"]) + else: + print(d) + raise Exception("Could not determine model shape") + return mshape + + +def get_mf6_files(mfnamefile): + """Return a list of all the MODFLOW 6 input and output files in this model. + + Parameters + ---------- + mfnamefile : str + path to the MODFLOW 6 simulation name file + + Returns + ------- + filelist : list + list of MODFLOW 6 input files in a simulation + outplist : list + list of MODFLOW 6 output files in a simulation + + """ + + srcdir = os.path.dirname(mfnamefile) + filelist = [] + outplist = [] + + filekeys = ["TDIS6", "GWF6", "GWT", "GWF6-GWF6", "GWF-GWT", "IMS6"] + namefilekeys = ["GWF6", "GWT"] + namefiles = [] + + with open(mfnamefile) as f: + + # Read line and skip comments + lines = f.readlines() + + for line in lines: + + # Skip over blank and commented lines + ll = line.strip().split() + if len(ll) < 2: + continue + if line.strip()[0] in ["#", "!"]: + continue + + for key in filekeys: + if key in ll[0].upper(): + fname = ll[1] + filelist.append(fname) + + for key in namefilekeys: + if key in ll[0].upper(): + fname = ll[1] + namefiles.append(fname) + + # Go through name files and get files + for namefile in namefiles: + fname = os.path.join(srcdir, namefile) + with open(fname, "r") as f: + lines = f.readlines() + insideblock = False + + for line in lines: + ll = line.upper().strip().split() + if len(ll) < 2: + continue + if ll[0] in "BEGIN" and ll[1] in "PACKAGES": + insideblock = True + continue + if ll[0] in "END" and ll[1] in "PACKAGES": + insideblock = False + + if insideblock: + ll = line.strip().split() + if len(ll) < 2: + continue + if line.strip()[0] in ["#", "!"]: + continue + filelist.append(ll[1]) + + # Recursively go through every file and look for other files to copy, + # such as 'OPEN/CLOSE' and 'TIMESERIESFILE'. If found, then + # add that file to the list of files to copy. + flist = filelist + # olist = outplist + while True: + olist = [] + flist, olist = _get_mf6_external_files(srcdir, olist, flist) + # add to filelist + if len(flist) > 0: + filelist = filelist + flist + # add to outplist + if len(olist) > 0: + outplist = outplist + olist + # terminate loop if no additional files + # if len(flist) < 1 and len(olist) < 1: + if len(flist) < 1: + break + + return filelist, outplist + + +def _get_mf6_external_files(srcdir, outplist, files): + """Get list of external files in a MODFLOW 6 simulation. + + Parameters + ---------- + srcdir : str + path to a directory containing a MODFLOW 6 simulation + outplist : list + list of output files in a MODFLOW 6 simulation + files : list + list of MODFLOW 6 name files + + Returns + ------- + + """ + extfiles = [] + + for fname in files: + fname = os.path.join(srcdir, fname) + try: + f = open(fname, "r") + for line in f: + + # Skip invalid lines + ll = line.strip().split() + if len(ll) < 2: + continue + if line.strip()[0] in ["#", "!"]: + continue + + if "OPEN/CLOSE" in line.upper(): + for i, s in enumerate(ll): + if s.upper() == "OPEN/CLOSE": + stmp = ll[i + 1] + stmp = stmp.replace('"', "") + stmp = stmp.replace("'", "") + extfiles.append(stmp) break - return dvclose + if "TS6" in line.upper(): + for i, s in enumerate(ll): + if s.upper() == "FILEIN": + stmp = ll[i + 1] + stmp = stmp.replace('"', "") + stmp = stmp.replace("'", "") + extfiles.append(stmp) + break - def _get_rclose(self, dir_pth): - """Get inner_rclose value from MODFLOW 6 ims file""" - rclose = None - files = os.listdir(dir_pth) - for file_name in files: - pth = os.path.join(dir_pth, file_name) - if os.path.isfile(pth): - if file_name.lower().endswith(".ims"): - with open(pth) as f: - lines = f.read().splitlines() - for line in lines: - if "inner_rclose" in line.lower(): - v = float(line.split()[1]) - if rclose is None: - rclose = v - else: - if v > rclose: - rclose = v + if "TAS6" in line.upper(): + for i, s in enumerate(ll): + if s.upper() == "FILEIN": + stmp = ll[i + 1] + stmp = stmp.replace('"', "") + stmp = stmp.replace("'", "") + extfiles.append(stmp) break - return rclose - - def _regression_files(self, extensions): - if isinstance(extensions, str): - extensions = [extensions] - files = os.listdir(self.simpath) - files0 = [] - files1 = [] - for file_name in files: - fpth0 = os.path.join(self.simpath, file_name) - if os.path.isfile(fpth0): - for extension in extensions: - if file_name.lower().endswith(extension): - files0.append(fpth0) - fpth1 = os.path.join( - self.simpath, "mf6-regression", file_name - ) - files1.append(fpth1) - break - return files0, files1 - - def _compare_heads(self, msgall, extensions="hds"): - if isinstance(extensions, str): - extensions = [extensions] - success = True - files0, files1 = self._regression_files(extensions) - extension = "hds" - ipos = 0 - for idx, (fpth0, fpth1) in enumerate(zip(files0, files1)): - outfile = os.path.splitext(os.path.basename(fpth0))[0] - outfile = os.path.join( - self.simpath, outfile + f".{extension}.cmp.out" - ) - success_tst = devtools.compare_heads( - None, - None, - precision="double", - htol=self.htol, - text=self.extdict[extension], - outfile=outfile, - files1=fpth0, - files2=fpth1, - verbose=self.cmp_verbose, - ) - msg = self.sfmt.format( - f"{self.extdict[extension]} comparison {ipos + 1}", - f"{self.name} ({os.path.basename(fpth0)})", - ) - ipos += 1 - print(msg) - - if not success_tst: - success = False - msgall += msg + " ... FAILED\n" - - return success, msgall - - def _compare_concentrations(self, msgall, extensions="ucn"): - if isinstance(extensions, str): - extensions = [extensions] - success = True - files0, files1 = self._regression_files(extensions) - extension = "ucn" - ipos = 0 - for idx, (fpth0, fpth1) in enumerate(zip(files0, files1)): - outfile = os.path.splitext(os.path.basename(fpth0))[0] - outfile = os.path.join( - self.simpath, outfile + f".{extension}.cmp.out" - ) - success_tst = devtools.compare_heads( - None, - None, - precision="double", - htol=self.htol, - text=self.extdict[extension], - outfile=outfile, - files1=fpth0, - files2=fpth1, - verbose=self.cmp_verbose, - ) - msg = self.sfmt.format( - f"{self.extdict[extension]} comparison {ipos + 1}", - f"{self.name} ({os.path.basename(fpth0)})", - ) - ipos += 1 - print(msg) - - if not success_tst: - success = False - msgall += msg + " ... FAILED\n" - - return success, msgall - - def _compare_budgets(self, msgall, extensions="cbc"): - if isinstance(extensions, str): - extensions = [extensions] - success = True - files0, files1 = self._regression_files(extensions) - extension = "cbc" - ipos = 0 - for idx, (fpth0, fpth1) in enumerate(zip(files0, files1)): - if os.stat(fpth0).st_size * os.stat(fpth0).st_size == 0: - continue - outfile = os.path.splitext(os.path.basename(fpth0))[0] - outfile = os.path.join( - self.simpath, outfile + f".{extension}.cmp.out" - ) - fcmp = open(outfile, "w") - - # open the files - cbc0 = flopy.utils.CellBudgetFile( - fpth0, precision="double", verbose=self.cmp_verbose - ) - cbc1 = flopy.utils.CellBudgetFile( - fpth1, precision="double", verbose=self.cmp_verbose - ) - - # build list of cbc data to retrieve - avail0 = cbc0.get_unique_record_names() - avail1 = cbc1.get_unique_record_names() - avail0 = [t.decode().strip() for t in avail0] - avail1 = [t.decode().strip() for t in avail1] - - # initialize list for storing totals for each budget term terms - cbc_keys0 = [] - cbc_keys1 = [] - for t in avail0: - t1 = t - if t not in avail1: - # check if RCHA or EVTA is available and use that instead - # should be able to remove this once v6.3.0 is released - if t[:-1] in avail1: - t1 = t[:-1] - else: - raise Exception(f"Could not find {t} in {fpth1}") - cbc_keys0.append(t) - cbc_keys1.append(t1) - - # get list of times and kstpkper - kk = cbc0.get_kstpkper() - times = cbc0.get_times() - - # process data - success_tst = True - for key, key1 in zip(cbc_keys0, cbc_keys1): - for idx, (k, t) in enumerate(zip(kk, times)): - v0 = cbc0.get_data(kstpkper=k, text=key)[0] - v1 = cbc1.get_data(kstpkper=k, text=key1)[0] - if v0.dtype.names is not None: - v0 = v0["q"] - v1 = v1["q"] - # skip empty vectors - if v0.size < 1: - continue - vmin = self.rclose - if vmin < 1e-6: - vmin = 1e-6 - vmin_tol = 5.0 * vmin - idx = (abs(v0) > vmin) & (abs(v1) > vmin) - diff = np.zeros(v0.shape, dtype=v0.dtype) - diff[idx] = abs(v0[idx] - v1[idx]) - diffmax = diff.max() - indices = np.where(diff == diffmax)[0] - if diffmax > vmin_tol: - success_tst = False - msg = ( - f"{os.path.basename(fpth0)} - " - + f"{key:16s} " - + f"difference ({diffmax:10.4g}) " - + f"> {self.pdtol:10.4g} " - + f"at {indices.size} nodes " - + f" [first location ({indices[0] + 1})] " - + f"at time {t} " - ) - fcmp.write(f"{msg}\n") - if self.cmp_verbose: - print(msg) - - msg = self.sfmt.format( - f"{self.extdict[extension]} comparison {ipos + 1}", - f"{self.name} ({os.path.basename(fpth0)})", - ) - ipos += 1 - print(msg) - - fcmp.close() - - if not success_tst: - success = False - msgall += msg + " ... FAILED\n" - - return success, msgall + if "OBS6" in line.upper(): + for i, s in enumerate(ll): + if s.upper() == "FILEIN": + stmp = ll[i + 1] + stmp = stmp.replace('"', "") + stmp = stmp.replace("'", "") + extfiles.append(stmp) + break + + if "EXTERNAL" in line.upper(): + for i, s in enumerate(ll): + if s.upper() == "EXTERNAL": + stmp = ll[i + 1] + stmp = stmp.replace('"', "") + stmp = stmp.replace("'", "") + extfiles.append(stmp) + break + + if "FILE" in line.upper(): + for i, s in enumerate(ll): + if s.upper() == "FILEIN": + stmp = ll[i + 1] + stmp = stmp.replace('"', "") + stmp = stmp.replace("'", "") + extfiles.append(stmp) + break + + if "FILE" in line.upper(): + for i, s in enumerate(ll): + if s.upper() == "FILEOUT": + stmp = ll[i + 1] + stmp = stmp.replace('"', "") + stmp = stmp.replace("'", "") + outplist.append(stmp) + break + + except: + print("could not get a list of external mf6 files") + + return extfiles, outplist + + +def get_mf6_ftypes(namefile, ftypekeys): + """Return a list of FTYPES that are in the name file and in ftypekeys. + + Parameters + ---------- + namefile : str + path to a MODFLOW 6 name file + ftypekeys : list + list of desired FTYPEs + + Returns + ------- + ftypes : list + list of FTYPES that match ftypekeys in namefile + + """ + with open(namefile, "r") as f: + lines = f.readlines() + + ftypes = [] + for line in lines: + + # Skip over blank and commented lines + ll = line.strip().split() + if len(ll) < 2: + continue + if line.strip()[0] in ["#", "!"]: + continue + + for key in ftypekeys: + if ll[0].upper() in key: + ftypes.append(ll[0]) + + return ftypes + + +def get_mf6_blockdata(f, blockstr): + """Return list with all non comments between start and end of block + specified by blockstr. + + Parameters + ---------- + f : file object + open file object + blockstr : str + name of block to search + + Returns + ------- + data : list + list of data in specified block + + """ + data = [] + + # find beginning of block + for line in f: + if line[0] != "#": + t = line.split() + if t[0].lower() == "begin" and t[1].lower() == blockstr.lower(): + break + for line in f: + if line[0] != "#": + t = line.split() + if t[0].lower() == "end" and t[1].lower() == blockstr.lower(): + break + else: + data.append(line.rstrip()) + return data diff --git a/modflow_devtools/http.py b/modflow_devtools/http.py deleted file mode 100644 index 41f244a..0000000 --- a/modflow_devtools/http.py +++ /dev/null @@ -1,74 +0,0 @@ -import time - -import requests - - -def head_request(url, max_requests=10, verbose=False): - """Get the headers from a url - Parameters - ---------- - url : str - url address for the zip file - max_requests : int - number of url download request attempts (default is 10) - verbose : bool - boolean indicating if output will be printed to the terminal - (default is False) - Returns - ------- - header : request header object - request header object for url - """ - for idx in range(max_requests): - if verbose: - msg = f"open request attempt {idx + 1} of {max_requests}" - print(msg) - - response = requests.head(url, allow_redirects=True) - if response.status_code != 200: - if idx < max_requests - 1: - time.sleep(13) - continue - else: - msg = "Cannot open request from:\n" + f" {url}\n\n" - print(msg) - response.raise_for_status() - else: - return response - - -def get_request(url, timeout=1, max_requests=10, verbose=False): - """Make a url request - Parameters - ---------- - url : str - url address for the zip file - verify : bool - boolean indicating if the url request should be verified - (default is True) - timeout : int - url request time out length (default is 1 seconds) - max_requests : int - number of url download request attempts (default is 10) - verbose : bool - boolean indicating if output will be printed to the terminal - (default is False) - Returns - ------- - req : request object - request object for url - """ - for idx in range(max_requests): - if verbose: - msg = f"open request attempt {idx + 1} of {max_requests}" - print(msg) - try: - return requests.get(url, stream=True, timeout=timeout) - except: - if idx < max_requests - 1: - time.sleep(13) - continue - else: - msg = "Cannot open request from:\n" + f" {url}\n\n" - print(msg) - raise requests.HTTPError(msg) diff --git a/modflow_devtools/misc.py b/modflow_devtools/misc.py index 897d943..07fa6fa 100644 --- a/modflow_devtools/misc.py +++ b/modflow_devtools/misc.py @@ -1,8 +1,19 @@ +import importlib +import socket +import sys from contextlib import contextmanager -from os import PathLike, chdir, getcwd +from os import PathLike, chdir, environ, getcwd +from os.path import basename, normpath from pathlib import Path +from platform import system +from shutil import which +from subprocess import PIPE, Popen +from typing import List, Optional +from urllib import request -import numpy as np +import pkg_resources +import pytest +from _warnings import warn @contextmanager @@ -12,308 +23,262 @@ def set_dir(path: PathLike): try: chdir(path) - print(f"Changed to directory: {wrkdir} (previously: {origin})") + print(f"Changed to working directory: {wrkdir} (previously: {origin})") yield finally: chdir(origin) - print(f"Returned to directory: {origin}") + print(f"Returned to previous directory: {origin}") -def get_disu_kwargs(nlay, nrow, ncol, delr, delc, tp, botm): +def run_cmd(*args, verbose=False, **kwargs): + """Run any command, return tuple (stdout, stderr, returncode).""" + args = [str(g) for g in args] + if verbose: + print("running: " + " ".join(args)) + p = Popen(args, stdout=PIPE, stderr=PIPE, **kwargs) + stdout, stderr = p.communicate() + stdout = stdout.decode() + stderr = stderr.decode() + returncode = p.returncode + if verbose: + print(f"stdout:\n{stdout}") + print(f"stderr:\n{stderr}") + print(f"returncode: {returncode}") + return stdout, stderr, returncode + + +def run_py_script(script, *args, verbose=False): + """Run a Python script, return tuple (stdout, stderr, returncode).""" + return run_cmd( + sys.executable, script, *args, verbose=verbose, cwd=Path(script).parent + ) + + +def get_current_branch() -> str: + # check if on GitHub Actions CI + ref = environ.get("GITHUB_REF") + if ref is not None: + return basename(normpath(ref)).lower() + + # otherwise ask git about it + if not which("git"): + raise RuntimeError("'git' required to determine current branch") + stdout, stderr, code = run_cmd("git", "rev-parse", "--abbrev-ref", "HEAD") + if code == 0 and stdout: + return stdout.strip().lower() + raise ValueError(f"Could not determine current branch: {stderr}") + + +def get_mf6_ftypes(namefile_path: PathLike, ftypekeys: List[str]) -> List[str]: """ - Simple utility for creating args needed to construct - a disu package + Return a list of FTYPES that are in the name file and in ftypekeys. + Parameters + ---------- + namefile_path : str + path to a MODFLOW 6 name file + ftypekeys : list + list of desired FTYPEs + Returns + ------- + ftypes : list + list of FTYPES that match ftypekeys in namefile + """ + with open(namefile_path, "r") as f: + lines = f.readlines() + + ftypes = [] + for line in lines: + # Skip over blank and commented lines + ll = line.strip().split() + if len(ll) < 2: + continue + + if ll[0] in ["#", "!"]: + continue + + for key in ftypekeys: + if key.lower() in ll[0].lower(): + ftypes.append(ll[0]) + + return ftypes + + +def get_models( + path: PathLike, + prefix: str = None, + namefile: str = "mfsim.nam", + excluded=None, + selected=None, + packages=None, +) -> List[Path]: + """ + Find models in the given filesystem location. """ - def get_nn(k, i, j): - return k * nrow * ncol + i * ncol + j - - nodes = nlay * nrow * ncol - iac = np.zeros((nodes), dtype=int) - ja = [] - area = np.zeros((nodes), dtype=float) - top = np.zeros((nodes), dtype=float) - bot = np.zeros((nodes), dtype=float) - ihc = [] - cl12 = [] - hwva = [] - for k in range(nlay): - for i in range(nrow): - for j in range(ncol): - # diagonal - n = get_nn(k, i, j) - ja.append(n) - iac[n] += 1 - area[n] = delr[i] * delc[j] - ihc.append(n + 1) - cl12.append(n + 1) - hwva.append(n + 1) - if k == 0: - top[n] = tp - else: - top[n] = botm[k - 1] - bot[n] = botm[k] - # up - if k > 0: - ja.append(get_nn(k - 1, i, j)) - iac[n] += 1 - ihc.append(0) - dz = botm[k - 1] - botm[k] - cl12.append(0.5 * dz) - hwva.append(delr[i] * delc[j]) - # back - if i > 0: - ja.append(get_nn(k, i - 1, j)) - iac[n] += 1 - ihc.append(1) - cl12.append(0.5 * delc[i]) - hwva.append(delr[j]) - # left - if j > 0: - ja.append(get_nn(k, i, j - 1)) - iac[n] += 1 - ihc.append(1) - cl12.append(0.5 * delr[j]) - hwva.append(delc[i]) - # right - if j < ncol - 1: - ja.append(get_nn(k, i, j + 1)) - iac[n] += 1 - ihc.append(1) - cl12.append(0.5 * delr[j]) - hwva.append(delc[i]) - # front - if i < nrow - 1: - ja.append(get_nn(k, i + 1, j)) - iac[n] += 1 - ihc.append(1) - cl12.append(0.5 * delc[i]) - hwva.append(delr[j]) - # bottom - if k < nlay - 1: - ja.append(get_nn(k + 1, i, j)) - iac[n] += 1 - ihc.append(0) - if k == 0: - dz = tp - botm[k] - else: - dz = botm[k - 1] - botm[k] - cl12.append(0.5 * dz) - hwva.append(delr[i] * delc[j]) - ja = np.array(ja, dtype=int) - nja = ja.shape[0] - hwva = np.array(hwva, dtype=float) - kw = {} - kw["nodes"] = nodes - kw["nja"] = nja - kw["nvert"] = None - kw["top"] = top - kw["bot"] = bot - kw["area"] = area - kw["iac"] = iac - kw["ja"] = ja - kw["ihc"] = ihc - kw["cl12"] = cl12 - kw["hwva"] = hwva - return kw - - -def uniform_flow_field(qx, qy, qz, shape, delr=None, delc=None, delv=None): - - nlay, nrow, ncol = shape - - # create spdis array for the uniform flow field - dt = np.dtype( - [ - ("ID1", np.int32), - ("ID2", np.int32), - ("FLOW", np.float64), - ("QX", np.float64), - ("QY", np.float64), - ("QZ", np.float64), + # if path doesn't exist, return empty list + if not Path(path).is_dir(): + return [] + + # find namfiles + namfile_paths = [ + p + for p in Path(path).rglob( + f"{prefix}*/{namefile}" if prefix else namefile + ) + ] + + # remove excluded + namfile_paths = [ + p + for p in namfile_paths + if (not excluded or not any(e in str(p) for e in excluded)) + ] + + # filter by package (optional) + def has_packages(nfp, pkgs): + ftypes = [item.upper() for item in get_mf6_ftypes(nfp, pkgs)] + return len(ftypes) > 0 + + if packages: + namfile_paths = [ + p + for p in namfile_paths + if (has_packages(p, packages) if packages else True) + ] + + # get model dir paths + model_paths = [p.parent for p in namfile_paths] + + # filter by model name (optional) + if selected: + model_paths = [ + model + for model in model_paths + if any(s in model.name for s in selected) ] + + # exclude dev examples on master or release branches + branch = get_current_branch() + if "master" in branch.lower() or "release" in branch.lower(): + model_paths = [ + model for model in model_paths if "_dev" not in model.name.lower() + ] + + return model_paths + + +def is_connected(hostname): + """See https://stackoverflow.com/a/20913928/ to test hostname.""" + try: + host = socket.gethostbyname(hostname) + s = socket.create_connection((host, 80), 2) + s.close() + return True + except Exception: + pass + return False + + +def is_in_ci(): + # if running in GitHub Actions CI, "CI" variable always set to true + # https://docs.github.com/en/actions/learn-github-actions/environment-variables#default-environment-variables + return bool(environ.get("CI", None)) + + +def is_github_rate_limited() -> Optional[bool]: + """ + Determines if a GitHub API rate limit is applied to the current IP. + Note that running this function will consume an API request! + + Returns + ------- + True if rate-limiting is applied, otherwise False (or None if the connection fails). + """ + try: + with request.urlopen( + "https://api.github.com/users/octocat" + ) as response: + remaining = int(response.headers["x-ratelimit-remaining"]) + if remaining < 10: + warn( + f"Only {remaining} GitHub API requests remaining before rate-limiting" + ) + return remaining > 0 + except: + return None + + +_has_exe_cache = {} +_has_pkg_cache = {} + + +def has_exe(exe): + if exe not in _has_exe_cache: + _has_exe_cache[exe] = bool(which(exe)) + return _has_exe_cache[exe] + + +def has_pkg(pkg): + if pkg not in _has_pkg_cache: + + # for some dependencies, package name and import name are different + # (e.g. pyshp/shapefile, mfpymake/pymake, python-dateutil/dateutil) + # pkg_resources expects package name, importlib expects import name + try: + _has_pkg_cache[pkg] = bool(importlib.import_module(pkg)) + except (ImportError, ModuleNotFoundError): + try: + _has_pkg_cache[pkg] = bool(pkg_resources.get_distribution(pkg)) + except pkg_resources.DistributionNotFound: + _has_pkg_cache[pkg] = False + + return _has_pkg_cache[pkg] + + +def requires_exe(*exes): + missing = {exe for exe in exes if not has_exe(exe)} + return pytest.mark.skipif( + missing, + reason=f"missing executable{'s' if len(missing) != 1 else ''}: " + + ", ".join(missing), ) - spdis = np.array( - [(id1, id1, 0.0, qx, qy, qz) for id1 in range(nlay * nrow * ncol)], - dtype=dt, + + +def requires_pkg(*pkgs): + missing = {pkg for pkg in pkgs if not has_pkg(pkg)} + return pytest.mark.skipif( + missing, + reason=f"missing package{'s' if len(missing) != 1 else ''}: " + + ", ".join(missing), ) - # create the flowja array for the uniform flow field (assume top-bot = 1) - flowja = [] - if delr is None: - delr = 1.0 - if delc is None: - delc = 1.0 - if delv is None: - delv = 1.0 - for k in range(nlay): - for i in range(nrow): - for j in range(ncol): - # diagonal - flowja.append(0.0) - # up - if k > 0: - flowja.append(-qz * delr * delc) - # back - if i > 0: - flowja.append(-qy * delr * delv) - # left - if j > 0: - flowja.append(qx * delc * delv) - # right - if j < ncol - 1: - flowja.append(-qx * delc * delv) - # front - if i < nrow - 1: - flowja.append(qy * delr * delv) - # bottom - if k < nlay - 1: - flowja.append(qz * delr * delc) - flowja = np.array(flowja, dtype=np.float64) - return spdis, flowja - - -def write_head( - fbin, - data, - kstp=1, - kper=1, - pertim=1.0, - totim=1.0, - text=" HEAD", - ilay=1, -): - dt = np.dtype( - [ - ("kstp", np.int32), - ("kper", np.int32), - ("pertim", np.float64), - ("totim", np.float64), - ("text", "S16"), - ("ncol", np.int32), - ("nrow", np.int32), - ("ilay", np.int32), - ] + +def requires_platform(platform, ci_only=False): + return pytest.mark.skipif( + system().lower() != platform.lower() + and (is_in_ci() if ci_only else True), + reason=f"only compatible with platform: {platform.lower()}", ) - nrow = data.shape[0] - ncol = data.shape[1] - h = np.array((kstp, kper, pertim, totim, text, ncol, nrow, ilay), dtype=dt) - h.tofile(fbin) - data.tofile(fbin) - return - - -def write_budget( - fbin, - data, - kstp=1, - kper=1, - text=" FLOW-JA-FACE", - imeth=1, - delt=1.0, - pertim=1.0, - totim=1.0, - text1id1=" GWF-1", - text2id1=" GWF-1", - text1id2=" GWF-1", - text2id2=" NPF", -): - dt = np.dtype( - [ - ("kstp", np.int32), - ("kper", np.int32), - ("text", "S16"), - ("ndim1", np.int32), - ("ndim2", np.int32), - ("ndim3", np.int32), - ("imeth", np.int32), - ("delt", np.float64), - ("pertim", np.float64), - ("totim", np.float64), - ] + + +def excludes_platform(platform, ci_only=False): + return pytest.mark.skipif( + system().lower() == platform.lower() + and (is_in_ci() if ci_only else True), + reason=f"not compatible with platform: {platform.lower()}", ) - if imeth == 1: - ndim1 = data.shape[0] - ndim2 = 1 - ndim3 = -1 - h = np.array( - ( - kstp, - kper, - text, - ndim1, - ndim2, - ndim3, - imeth, - delt, - pertim, - totim, - ), - dtype=dt, - ) - h.tofile(fbin) - data.tofile(fbin) - - elif imeth == 6: - ndim1 = 1 - ndim2 = 1 - ndim3 = -1 - h = np.array( - ( - kstp, - kper, - text, - ndim1, - ndim2, - ndim3, - imeth, - delt, - pertim, - totim, - ), - dtype=dt, - ) - h.tofile(fbin) - - # write text1id1, ... - dt = np.dtype( - [ - ("text1id1", "S16"), - ("text1id2", "S16"), - ("text2id1", "S16"), - ("text2id2", "S16"), - ] - ) - h = np.array((text1id1, text1id2, text2id1, text2id2), dtype=dt) - h.tofile(fbin) - - # write ndat (number of floating point columns) - colnames = data.dtype.names - ndat = len(colnames) - 2 - dt = np.dtype([("ndat", np.int32)]) - h = np.array([(ndat,)], dtype=dt) - h.tofile(fbin) - - # write auxiliary column names - naux = ndat - 1 - if naux > 0: - auxtxt = [f"{colname:16}" for colname in colnames[3:]] - auxtxt = tuple(auxtxt) - dt = np.dtype([(colname, "S16") for colname in colnames[3:]]) - h = np.array(auxtxt, dtype=dt) - h.tofile(fbin) - - # write nlist - nlist = data.shape[0] - dt = np.dtype([("nlist", np.int32)]) - h = np.array([(nlist,)], dtype=dt) - h.tofile(fbin) - - # write the data - data.tofile(fbin) - pass - else: - raise Exception(f"unknown method code {imeth}") +def requires_branch(branch): + current = get_current_branch() + return pytest.mark.skipif( + current != branch, reason=f"must run on branch: {branch}" + ) + + +def excludes_branch(branch): + current = get_current_branch() + return pytest.mark.skipif( + current == branch, reason=f"can't run on branch: {branch}" + ) diff --git a/modflow_devtools/setup.py b/modflow_devtools/setup.py deleted file mode 100644 index f075e2a..0000000 --- a/modflow_devtools/setup.py +++ /dev/null @@ -1,983 +0,0 @@ -"""A typical example of using the autotest -functionality for MODFLOW-2005 and comparing the MODFLOW-2005 results to -MODFLOW-2000 results is: - -.. code-block:: python - - import pymake - - # Setup - testpth = "../test/mytest" - nam1 = "model1.nam" - pymake.setup(nam1, testpth) - - # run test models - success, buff = flopy.run_model( - "mf2005", nam1, model_ws=testpth, silent=True - ) - if success: - testpth_reg = os.path.join(testpth, "mf2000") - nam2 = "model2.name" - pymake.setup(nam2, testpth_reg) - success_reg, buff = flopy.run_model( - "mf2000", nam2, model_ws=testpth_reg, silent=True - ) - - # compare results - if success and success_reg: - fpth = os.path.split(os.path.join(testpth, nam1))[0] - outfile1 = os.path.join(fpth, "bud.cmp") - fpth = os.path.split(os.path.join(testpth, nam2))[0] - outfile2 = os.path.join(fpth, "hds.cmp") - success_reg = pymake.compare( - os.path.join(testpth, nam1), - os.path.join(testpth_reg, nam2), - max_cumpd=0.01, - max_incpd=0.01, - htol=0.001, - outfile1=outfile1, - outfile2=outfile2, - ) - - # Clean things up - if success_reg: - pymake.teardown(testpth) - -Note: autotest functionality will likely be removed from pymake in the future -to a dedicated GitHub repository. - -""" -import os -import shutil - -ignore_ext = ( - ".hds", - ".hed", - ".bud", - ".cbb", - ".cbc", - ".ddn", - ".ucn", - ".glo", - ".lst", - ".list", - ".gwv", - ".mv", - ".out", -) - - -def model_setup(namefile, dst, remove_existing=True, extrafiles=None): - """Setup MODFLOW-based model files for autotests. - - Parameters - ---------- - namefile : str - MODFLOW-based model name file. - dst : str - destination path for comparison model or file(s) - remove_existing : bool - boolean indicating if an existing comparision model or file(s) should - be replaced (default is True) - extrafiles : str or list of str - list of extra files to include in the comparision - - Returns - ------- - - """ - # Construct src pth from namefile or lgr file - src = os.path.dirname(namefile) - - # Create the destination folder, if required - create_dir = False - if os.path.exists(dst): - if remove_existing: - print("Removing folder " + dst) - shutil.rmtree(dst) - create_dir = True - else: - create_dir = True - if create_dir: - os.mkdir(dst) - - # determine if a namefile is a lgr control file - get individual - # name files out of the lgr control file - namefiles = [namefile] - ext = os.path.splitext(namefile)[1] - if ".lgr" in ext.lower(): - lines = [line.rstrip("\n") for line in open(namefile)] - for line in lines: - if len(line) < 1: - continue - if line[0] == "#": - continue - t = line.split() - if ".nam" in t[0].lower(): - fpth = os.path.join(src, t[0]) - namefiles.append(fpth) - - # Make list of files to copy - files2copy = [] - for fpth in namefiles: - files2copy.append(os.path.basename(fpth)) - ext = os.path.splitext(fpth)[1] - # copy additional files contained in the name file and - # associated package files - if ext.lower() == ".nam": - fname = os.path.abspath(fpth) - files2copy = files2copy + get_input_files(fname) - - if extrafiles is not None: - if isinstance(extrafiles, str): - extrafiles = [extrafiles] - for fl in extrafiles: - files2copy.append(os.path.basename(fl)) - - # Copy the files - for f in files2copy: - srcf = os.path.join(src, f) - dstf = os.path.join(dst, f) - - # Check to see if dstf is going into a subfolder, and create that - # subfolder if it doesn't exist - sf = os.path.dirname(dstf) - if not os.path.isdir(sf): - os.makedirs(sf) - - # Now copy the file - if os.path.exists(srcf): - print("Copy file '" + srcf + "' -> '" + dstf + "'") - shutil.copy(srcf, dstf) - else: - print(srcf + " does not exist") - - -def setup_comparison(namefile, dst, remove_existing=True): - """Setup a comparison model or comparision file(s) for a MODFLOW-based - model. - - Parameters - ---------- - namefile : str - MODFLOW-based model name file. - dst : str - destination path for comparison model or file(s) - remove_existing : bool - boolean indicating if an existing comparision model or file(s) should - be replaced (default is True) - - - Returns - ------- - - """ - # Construct src pth from namefile - src = os.path.dirname(namefile) - action = None - for root, dirs, files in os.walk(src): - dl = [d.lower() for d in dirs] - if any(".cmp" in s for s in dl): - idx = None - for jdx, d in enumerate(dl): - if ".cmp" in d: - idx = jdx - break - if idx is not None: - if "mf2005.cmp" in dl[idx] or "mf2005" in dl[idx]: - action = dirs[idx] - elif "mfnwt.cmp" in dl[idx] or "mfnwt" in dl[idx]: - action = dirs[idx] - elif "mfusg.cmp" in dl[idx] or "mfusg" in dl[idx]: - action = dirs[idx] - elif "mf6.cmp" in dl[idx] or "mf6" in dl[idx]: - action = dirs[idx] - elif "libmf6.cmp" in dl[idx] or "libmf6" in dl[idx]: - action = dirs[idx] - else: - action = dirs[idx] - break - if action is not None: - dst = os.path.join(dst, f"{action}") - if not os.path.isdir(dst): - try: - os.mkdir(dst) - except: - print("Could not make " + dst) - # clean directory - else: - print(f"cleaning...{dst}") - for root, dirs, files in os.walk(dst): - for f in files: - tpth = os.path.join(root, f) - print(f" removing...{tpth}") - os.remove(tpth) - for d in dirs: - tdir = os.path.join(root, d) - print(f" removing...{tdir}") - shutil.rmtree(tdir) - # copy files - cmppth = os.path.join(src, action) - files = os.listdir(cmppth) - files2copy = [] - if action.lower() == ".cmp": - for file in files: - if ".cmp" in os.path.splitext(file)[1].lower(): - files2copy.append(os.path.join(cmppth, file)) - for srcf in files2copy: - f = os.path.basename(srcf) - dstf = os.path.join(dst, f) - # Now copy the file - if os.path.exists(srcf): - print("Copy file '" + srcf + "' -> '" + dstf + "'") - shutil.copy(srcf, dstf) - else: - print(srcf + " does not exist") - else: - for file in files: - if ".nam" in os.path.splitext(file)[1].lower(): - files2copy.append( - os.path.join(cmppth, os.path.basename(file)) - ) - nf = os.path.join(src, action, os.path.basename(file)) - model_setup(nf, dst, remove_existing=remove_existing) - break - - return action - - -def get_input_files(namefile): - """Return a list of all the input files in this model. - - Parameters - ---------- - namefile : str - path to a MODFLOW-based model name file - - Returns - ------- - filelist : list - list of MODFLOW-based model input files - - """ - srcdir = os.path.dirname(namefile) - filelist = [] - fname = os.path.join(srcdir, namefile) - with open(fname, "r") as f: - lines = f.readlines() - - for line in lines: - ll = line.strip().split() - if len(ll) < 2: - continue - if line.strip()[0] in ["#", "!"]: - continue - ext = os.path.splitext(ll[2])[1] - if ext.lower() not in ignore_ext: - if len(ll) > 3: - if "replace" in ll[3].lower(): - continue - filelist.append(ll[2]) - - # Now go through every file and look for other files to copy, - # such as 'OPEN/CLOSE'. If found, then add that file to the - # list of files to copy. - otherfiles = [] - for fname in filelist: - fname = os.path.join(srcdir, fname) - try: - f = open(fname, "r") - for line in f: - - # Skip invalid lines - ll = line.strip().split() - if len(ll) < 2: - continue - if line.strip()[0] in ["#", "!"]: - continue - - if "OPEN/CLOSE" in line.upper(): - for i, s in enumerate(ll): - if "OPEN/CLOSE" in s.upper(): - stmp = ll[i + 1] - stmp = stmp.replace('"', "") - stmp = stmp.replace("'", "") - otherfiles.append(stmp) - break - except: - print(fname + " does not exist") - - filelist = filelist + otherfiles - - return filelist - - -def get_namefiles(pth, exclude=None): - """Search through a path (pth) for all .nam files. - - Parameters - ---------- - pth : str - path to model files - exclude : str or lst - File or list of files to exclude from the search (default is None) - - Returns - ------- - namefiles : lst - List of namefiles with paths - - """ - namefiles = [] - for root, _, files in os.walk(pth): - namefiles += [ - os.path.join(root, file) for file in files if file.endswith(".nam") - ] - if exclude is not None: - if isinstance(exclude, str): - exclude = [exclude] - exclude = [e.lower() for e in exclude] - pop_list = [] - for namefile in namefiles: - for e in exclude: - if e in namefile.lower(): - pop_list.append(namefile) - for e in pop_list: - namefiles.remove(e) - - return namefiles - - -def get_entries_from_namefile(namefile, ftype=None, unit=None, extension=None): - """Get entries from a namefile. Can select using FTYPE, UNIT, or file - extension. - - Parameters - ---------- - namefile : str - path to a MODFLOW-based model name file - ftype : str - package type - unit : int - file unit number - extension : str - file extension - - Returns - ------- - entries : list of tuples - list of tuples containing FTYPE, UNIT, FNAME, STATUS for each - namefile entry that meets a user-specified value. - - """ - entries = [] - f = open(namefile, "r") - for line in f: - if line.strip() == "": - continue - if line[0] == "#": - continue - ll = line.strip().split() - if len(ll) < 3: - continue - status = "UNKNOWN" - if len(ll) > 3: - status = ll[3].upper() - if ftype is not None: - if ftype.upper() == ll[0].upper(): - filename = os.path.join(os.path.split(namefile)[0], ll[2]) - entries.append((filename, ll[0], ll[1], status)) - elif unit is not None: - if int(unit) == int(ll[1]): - filename = os.path.join(os.path.split(namefile)[0], ll[2]) - entries.append((filename, ll[0], ll[1], status)) - elif extension is not None: - filename = os.path.join(os.path.split(namefile)[0], ll[2]) - ext = os.path.splitext(filename)[1] - if len(ext) > 0: - if ext[0] == ".": - ext = ext[1:] - if extension.lower() == ext.lower(): - entries.append((filename, ll[0], ll[1], status)) - f.close() - if len(entries) < 1: - entries.append((None, None, None, None)) - return entries - - -def get_sim_name(namefiles, rootpth=None): - """Get simulation name. - - Parameters - ---------- - namefiles : str or list of strings - path(s) to MODFLOW-based model name files - rootpth : str - optional root directory path (default is None) - - Returns - ------- - simname : list - list of namefiles without the file extension - - """ - if isinstance(namefiles, str): - namefiles = [namefiles] - sim_name = [] - for namefile in namefiles: - t = namefile.split(os.sep) - if rootpth is None: - idx = -1 - else: - idx = t.index(os.path.split(rootpth)[1]) - - # build dst with everything after the rootpth and before - # the namefile file name. - dst = "" - if idx < len(t): - for d in t[idx + 1 : -1]: - dst += f"{d}_" - - # add namefile basename without extension - dst += t[-1].replace(".nam", "") - sim_name.append(dst) - - return sim_name - - -# modflow 6 readers and copiers -def setup_mf6( - src, dst, mfnamefile="mfsim.nam", extrafiles=None, remove_existing=True -): - """Copy all of the MODFLOW 6 input files from the src directory to the dst - directory. - - Parameters - ---------- - src : src - directory path with original MODFLOW 6 input files - dst : str - directory path that original MODFLOW 6 input files will be copied to - mfnamefile : str - optional MODFLOW 6 simulation name file (default is mfsim.nam) - extrafiles : bool - boolean indicating if extra files should be included (default is None) - remove_existing : bool - boolean indicating if existing file in dst should be removed (default - is True) - - Returns - ------- - mf6inp : list - list of MODFLOW 6 input files - mf6outp : list - list of MODFLOW 6 output files - - """ - - # Create the destination folder - create_dir = False - if os.path.exists(dst): - if remove_existing: - print("Removing folder " + dst) - shutil.rmtree(dst) - create_dir = True - else: - create_dir = True - if create_dir: - os.makedirs(dst) - - # Make list of files to copy - fname = os.path.join(src, mfnamefile) - fname = os.path.abspath(fname) - mf6inp, mf6outp = get_mf6_files(fname) - files2copy = [mfnamefile] + mf6inp - - # determine if there are any .ex files - exinp = [] - for f in mf6outp: - ext = os.path.splitext(f)[1] - if ext.lower() == ".hds": - pth = os.path.join(src, f + ".ex") - if os.path.isfile(pth): - exinp.append(f + ".ex") - if len(exinp) > 0: - files2copy += exinp - if extrafiles is not None: - files2copy += extrafiles - - # Copy the files - for f in files2copy: - srcf = os.path.join(src, f) - dstf = os.path.join(dst, f) - - # Check to see if dstf is going into a subfolder, and create that - # subfolder if it doesn't exist - sf = os.path.dirname(dstf) - if not os.path.isdir(sf): - try: - os.mkdir(sf) - except: - print("Could not make " + sf) - - # Now copy the file - if os.path.exists(srcf): - print("Copy file '" + srcf + "' -> '" + dstf + "'") - shutil.copy(srcf, dstf) - else: - print(srcf + " does not exist") - - return mf6inp, mf6outp - - -def get_mf6_comparison(src): - """Determine comparison type for MODFLOW 6 simulation. - - Parameters - ---------- - src : str - directory path to search for comparison types - - Returns - ------- - action : str - comparison type - - """ - action = None - # Possible comparison - the order matters - optcomp = ( - "compare", - ".cmp", - "mf2005", - "mf2005.cmp", - "mfnwt", - "mfnwt.cmp", - "mfusg", - "mfusg.cmp", - "mflgr", - "mflgr.cmp", - "libmf6", - "libmf6.cmp", - "mf6", - "mf6.cmp", - ) - # Construct src pth from namefile - action = None - for _, dirs, _ in os.walk(src): - dl = [d.lower() for d in dirs] - for oc in optcomp: - if any(oc in s for s in dl): - action = oc - break - return action - - -def setup_mf6_comparison(src, dst, remove_existing=True): - """Setup comparision for MODFLOW 6 simulation. - - Parameters - ---------- - src : src - directory path with original MODFLOW 6 input files - dst : str - directory path that original MODFLOW 6 input files will be copied to - remove_existing : bool - boolean indicating if existing file in dst should be removed (default - is True) - - Returns - ------- - action : str - comparison type - - """ - # get the type of comparison to use (compare, mf2005, etc.) - action = get_mf6_comparison(src) - - if action is not None: - dst = os.path.join(dst, f"{action}") - if not os.path.isdir(dst): - try: - os.mkdir(dst) - except: - print("Could not make " + dst) - # clean directory - else: - print(f"cleaning...{dst}") - for root, dirs, files in os.walk(dst): - for f in files: - tpth = os.path.join(root, f) - print(f" removing...{tpth}") - os.remove(tpth) - for d in dirs: - tdir = os.path.join(root, d) - print(f" removing...{tdir}") - shutil.rmtree(tdir) - # copy files - cmppth = os.path.join(src, action) - files = os.listdir(cmppth) - files2copy = [] - if action.lower() == "compare" or action.lower() == ".cmp": - for file in files: - if ".cmp" in os.path.splitext(file)[1].lower(): - files2copy.append(os.path.join(cmppth, file)) - for srcf in files2copy: - f = os.path.basename(srcf) - dstf = os.path.join(dst, f) - # Now copy the file - if os.path.exists(srcf): - print("Copy file '" + srcf + "' -> '" + dstf + "'") - shutil.copy(srcf, dstf) - else: - print(srcf + " does not exist") - else: - if "mf6" in action.lower(): - for file in files: - if "mfsim.nam" in file.lower(): - srcf = os.path.join(cmppth, os.path.basename(file)) - files2copy.append(srcf) - srcdir = os.path.join(src, action) - setup_mf6(srcdir, dst, remove_existing=remove_existing) - break - else: - for file in files: - if ".nam" in os.path.splitext(file)[1].lower(): - srcf = os.path.join(cmppth, os.path.basename(file)) - files2copy.append(srcf) - nf = os.path.join(src, action, os.path.basename(file)) - model_setup(nf, dst, remove_existing=remove_existing) - break - - return action - - -def get_mf6_nper(tdisfile): - """Return the number of stress periods in the MODFLOW 6 model. - - Parameters - ---------- - tdisfile : str - path to the TDIS file - - Returns - ------- - nper : int - number of stress periods in the simulation - - """ - with open(tdisfile, "r") as f: - lines = f.readlines() - line = [line for line in lines if "NPER" in line.upper()][0] - nper = line.strip().split()[1] - return nper - - -def get_mf6_mshape(disfile): - """Return the shape of the MODFLOW 6 model. - - Parameters - ---------- - disfile : str - path to a MODFLOW 6 discretization file - - Returns - ------- - mshape : tuple - tuple with the shape of the MODFLOW 6 model. - - """ - with open(disfile, "r") as f: - lines = f.readlines() - - d = {} - for line in lines: - - # Skip over blank and commented lines - ll = line.strip().split() - if len(ll) < 2: - continue - if line.strip()[0] in ["#", "!"]: - continue - - for key in ["NODES", "NCPL", "NLAY", "NROW", "NCOL"]: - if ll[0].upper() in key: - d[key] = int(ll[1]) - - if "NODES" in d: - mshape = (d["NODES"],) - elif "NCPL" in d: - mshape = (d["NLAY"], d["NCPL"]) - elif "NLAY" in d: - mshape = (d["NLAY"], d["NROW"], d["NCOL"]) - else: - print(d) - raise Exception("Could not determine model shape") - return mshape - - -def get_mf6_files(mfnamefile): - """Return a list of all the MODFLOW 6 input and output files in this model. - - Parameters - ---------- - mfnamefile : str - path to the MODFLOW 6 simulation name file - - Returns - ------- - filelist : list - list of MODFLOW 6 input files in a simulation - outplist : list - list of MODFLOW 6 output files in a simulation - - """ - - srcdir = os.path.dirname(mfnamefile) - filelist = [] - outplist = [] - - filekeys = ["TDIS6", "GWF6", "GWT", "GWF6-GWF6", "GWF-GWT", "IMS6"] - namefilekeys = ["GWF6", "GWT"] - namefiles = [] - - with open(mfnamefile) as f: - - # Read line and skip comments - lines = f.readlines() - - for line in lines: - - # Skip over blank and commented lines - ll = line.strip().split() - if len(ll) < 2: - continue - if line.strip()[0] in ["#", "!"]: - continue - - for key in filekeys: - if key in ll[0].upper(): - fname = ll[1] - filelist.append(fname) - - for key in namefilekeys: - if key in ll[0].upper(): - fname = ll[1] - namefiles.append(fname) - - # Go through name files and get files - for namefile in namefiles: - fname = os.path.join(srcdir, namefile) - with open(fname, "r") as f: - lines = f.readlines() - insideblock = False - - for line in lines: - ll = line.upper().strip().split() - if len(ll) < 2: - continue - if ll[0] in "BEGIN" and ll[1] in "PACKAGES": - insideblock = True - continue - if ll[0] in "END" and ll[1] in "PACKAGES": - insideblock = False - - if insideblock: - ll = line.strip().split() - if len(ll) < 2: - continue - if line.strip()[0] in ["#", "!"]: - continue - filelist.append(ll[1]) - - # Recursively go through every file and look for other files to copy, - # such as 'OPEN/CLOSE' and 'TIMESERIESFILE'. If found, then - # add that file to the list of files to copy. - flist = filelist - # olist = outplist - while True: - olist = [] - flist, olist = _get_mf6_external_files(srcdir, olist, flist) - # add to filelist - if len(flist) > 0: - filelist = filelist + flist - # add to outplist - if len(olist) > 0: - outplist = outplist + olist - # terminate loop if no additional files - # if len(flist) < 1 and len(olist) < 1: - if len(flist) < 1: - break - - return filelist, outplist - - -def _get_mf6_external_files(srcdir, outplist, files): - """Get list of external files in a MODFLOW 6 simulation. - - Parameters - ---------- - srcdir : str - path to a directory containing a MODFLOW 6 simulation - outplist : list - list of output files in a MODFLOW 6 simulation - files : list - list of MODFLOW 6 name files - - Returns - ------- - - """ - extfiles = [] - - for fname in files: - fname = os.path.join(srcdir, fname) - try: - f = open(fname, "r") - for line in f: - - # Skip invalid lines - ll = line.strip().split() - if len(ll) < 2: - continue - if line.strip()[0] in ["#", "!"]: - continue - - if "OPEN/CLOSE" in line.upper(): - for i, s in enumerate(ll): - if s.upper() == "OPEN/CLOSE": - stmp = ll[i + 1] - stmp = stmp.replace('"', "") - stmp = stmp.replace("'", "") - extfiles.append(stmp) - break - - if "TS6" in line.upper(): - for i, s in enumerate(ll): - if s.upper() == "FILEIN": - stmp = ll[i + 1] - stmp = stmp.replace('"', "") - stmp = stmp.replace("'", "") - extfiles.append(stmp) - break - - if "TAS6" in line.upper(): - for i, s in enumerate(ll): - if s.upper() == "FILEIN": - stmp = ll[i + 1] - stmp = stmp.replace('"', "") - stmp = stmp.replace("'", "") - extfiles.append(stmp) - break - - if "OBS6" in line.upper(): - for i, s in enumerate(ll): - if s.upper() == "FILEIN": - stmp = ll[i + 1] - stmp = stmp.replace('"', "") - stmp = stmp.replace("'", "") - extfiles.append(stmp) - break - - if "EXTERNAL" in line.upper(): - for i, s in enumerate(ll): - if s.upper() == "EXTERNAL": - stmp = ll[i + 1] - stmp = stmp.replace('"', "") - stmp = stmp.replace("'", "") - extfiles.append(stmp) - break - - if "FILE" in line.upper(): - for i, s in enumerate(ll): - if s.upper() == "FILEIN": - stmp = ll[i + 1] - stmp = stmp.replace('"', "") - stmp = stmp.replace("'", "") - extfiles.append(stmp) - break - - if "FILE" in line.upper(): - for i, s in enumerate(ll): - if s.upper() == "FILEOUT": - stmp = ll[i + 1] - stmp = stmp.replace('"', "") - stmp = stmp.replace("'", "") - outplist.append(stmp) - break - - except: - print("could not get a list of external mf6 files") - - return extfiles, outplist - - -def get_mf6_ftypes(namefile, ftypekeys): - """Return a list of FTYPES that are in the name file and in ftypekeys. - - Parameters - ---------- - namefile : str - path to a MODFLOW 6 name file - ftypekeys : list - list of desired FTYPEs - - Returns - ------- - ftypes : list - list of FTYPES that match ftypekeys in namefile - - """ - with open(namefile, "r") as f: - lines = f.readlines() - - ftypes = [] - for line in lines: - - # Skip over blank and commented lines - ll = line.strip().split() - if len(ll) < 2: - continue - if line.strip()[0] in ["#", "!"]: - continue - - for key in ftypekeys: - if ll[0].upper() in key: - ftypes.append(ll[0]) - - return ftypes - - -def get_mf6_blockdata(f, blockstr): - """Return list with all non comments between start and end of block - specified by blockstr. - - Parameters - ---------- - f : file object - open file object - blockstr : str - name of block to search - - Returns - ------- - data : list - list of data in specified block - - """ - data = [] - - # find beginning of block - for line in f: - if line[0] != "#": - t = line.split() - if t[0].lower() == "begin" and t[1].lower() == blockstr.lower(): - break - for line in f: - if line[0] != "#": - t = line.split() - if t[0].lower() == "end" and t[1].lower() == blockstr.lower(): - break - else: - data.append(line.rstrip()) - return data diff --git a/modflow_devtools/test/conftest.py b/modflow_devtools/test/conftest.py deleted file mode 100644 index 29a438f..0000000 --- a/modflow_devtools/test/conftest.py +++ /dev/null @@ -1,101 +0,0 @@ -from os import environ -from pathlib import Path - -import pytest -from github import Github - -proj_root = Path(__file__).parent.parent.parent.parent - - -# keepable temporary directory fixtures for various scopes - - -@pytest.fixture(scope="function") -def tmpdir(tmpdir_factory, request) -> Path: - node = ( - request.node.name.replace("/", "_") - .replace("\\", "_") - .replace(":", "_") - ) - temp = Path(tmpdir_factory.mktemp(node)) - yield Path(temp) - - keep = request.config.getoption("--keep") - if keep: - copytree(temp, Path(keep) / temp.name) - - keep_failed = request.config.getoption("--keep-failed") - if keep_failed and request.node.rep_call.failed: - copytree(temp, Path(keep_failed) / temp.name) - - -@pytest.fixture(scope="class") -def class_tmpdir(tmpdir_factory, request) -> Path: - assert ( - request.cls is not None - ), "Class-scoped temp dir fixture must be used on class" - temp = Path(tmpdir_factory.mktemp(request.cls.__name__)) - yield temp - - keep = request.config.getoption("--keep") - if keep: - copytree(temp, Path(keep) / temp.name) - - -@pytest.fixture(scope="module") -def module_tmpdir(tmpdir_factory, request) -> Path: - temp = Path(tmpdir_factory.mktemp(request.module.__name__)) - yield temp - - keep = request.config.getoption("--keep") - if keep: - copytree(temp, Path(keep) / temp.name) - - -@pytest.fixture(scope="session") -def session_tmpdir(tmpdir_factory, request) -> Path: - temp = Path(tmpdir_factory.mktemp(request.session.name)) - yield temp - - keep = request.config.getoption("--keep") - if keep: - copytree(temp, Path(keep) / temp.name) - - -# misc fixtures - - -@pytest.fixture -def gh_api() -> Github: - return Github(environ.get("GITHUB_TOKEN")) - - -@pytest.fixture -def modflow6_path() -> Path: - return Path(environ.get("MODFLOW6_PATH", proj_root / "modflow6")) - - -# pytest configuration hooks - - -def pytest_addoption(parser): - parser.addoption( - "-K", - "--keep", - action="store", - default=None, - help="Move the contents of temporary test directories to correspondingly named subdirectories at the given " - "location after tests complete. This option can be used to exclude test results from automatic cleanup, " - "e.g. for manual inspection. The provided path is created if it does not already exist. An error is " - "thrown if any matching files already exist.", - ) - - parser.addoption( - "--keep-failed", - action="store", - default=None, - help="Move the contents of temporary test directories to correspondingly named subdirectories at the given " - "location if the test case fails. This option automatically saves the outputs of failed tests in the " - "given location. The path is created if it doesn't already exist. An error is thrown if files with the " - "same names already exist in the given location.", - ) diff --git a/modflow_devtools/test/test_build.py b/modflow_devtools/test/test_build.py deleted file mode 100644 index 7cb1194..0000000 --- a/modflow_devtools/test/test_build.py +++ /dev/null @@ -1,34 +0,0 @@ -import platform - -from modflow_devtools.build import meson_build - -system = platform.system() - - -def test_meson_build(modflow6_path, tmpdir): - bld_path = tmpdir / "builddir" - bin_path = tmpdir / "bin" - lib_path = bin_path - - meson_build(modflow6_path, bld_path, bin_path, bin_path, quiet=False) - - # check build directory was populated - assert (bld_path / "build.ninja").is_file() - assert (bld_path / "src").is_dir() - assert (bld_path / "meson-logs").is_dir() - - # check binaries and libraries were created - ext = ".exe" if system == "Windows" else "" - for exe in ["mf6", "mf5to6", "zbud6"]: - assert (bin_path / f"{exe}{ext}").is_file() - assert ( - bin_path - / ( - "libmf6" - + ( - ".so" - if system == "Linux" - else (".dylib" if system == "Darwin" else "") - ) - ) - ).is_file() diff --git a/modflow_devtools/test/test_conftest.py b/modflow_devtools/test/test_fixtures.py similarity index 51% rename from modflow_devtools/test/test_conftest.py rename to modflow_devtools/test/test_fixtures.py index 3ed7ca2..d447b2b 100644 --- a/modflow_devtools/test/test_conftest.py +++ b/modflow_devtools/test/test_fixtures.py @@ -1,53 +1,55 @@ import inspect +import platform from os import environ from pathlib import Path +from shutil import which import pytest - +from modflow_devtools.misc import ( + excludes_platform, + requires_exe, + requires_pkg, + requires_platform, +) + +system = platform.system() proj_root = Path(__file__).parent.parent.parent.parent -# test environment variables - - -def test_environment(): - assert environ.get("GITHUB_TOKEN") - assert Path(environ.get("MODFLOW6_PATH", proj_root / "modflow6")).is_dir() - - # temporary directory fixtures -def test_tmpdirs(tmpdir, module_tmpdir): +def test_tmpdirs(function_tmpdir, module_tmpdir): # function-scoped temporary directory - assert isinstance(tmpdir, Path) - assert tmpdir.is_dir() - assert inspect.currentframe().f_code.co_name in tmpdir.stem + assert isinstance(function_tmpdir, Path) + assert function_tmpdir.is_dir() + assert inspect.currentframe().f_code.co_name in function_tmpdir.stem # module-scoped temp dir (accessible to other tests in the script) assert module_tmpdir.is_dir() assert "test" in module_tmpdir.stem -def test_function_scoped_tmpdir(tmpdir): - assert isinstance(tmpdir, Path) - assert tmpdir.is_dir() - assert inspect.currentframe().f_code.co_name in tmpdir.stem +def test_function_scoped_tmpdir(function_tmpdir): + assert isinstance(function_tmpdir, Path) + assert function_tmpdir.is_dir() + assert inspect.currentframe().f_code.co_name in function_tmpdir.stem @pytest.mark.parametrize("name", ["noslash", "forward/slash", "back\\slash"]) -def test_function_scoped_tmpdir_slash_in_name(tmpdir, name): - assert isinstance(tmpdir, Path) - assert tmpdir.is_dir() +def test_function_scoped_tmpdir_slash_in_name(function_tmpdir, name): + assert isinstance(function_tmpdir, Path) + assert function_tmpdir.is_dir() # node name might have slashes if test function is parametrized # (e.g., test_function_scoped_tmpdir_slash_in_name[a/slash]) replaced1 = name.replace("/", "_").replace("\\", "_").replace(":", "_") replaced2 = name.replace("/", "_").replace("\\", "__").replace(":", "_") assert ( - f"{inspect.currentframe().f_code.co_name}[{replaced1}]" in tmpdir.stem + f"{inspect.currentframe().f_code.co_name}[{replaced1}]" + in function_tmpdir.stem or f"{inspect.currentframe().f_code.co_name}[{replaced2}]" - in tmpdir.stem + in function_tmpdir.stem ) @@ -77,13 +79,43 @@ def test_session_scoped_tmpdir(session_tmpdir): assert session_tmpdir.is_dir() -# test misc fixtures +# test fixtures to require/exclude executables & platforms + + +@requires_exe("mf6") +def test_mf6(): + assert which("mf6") + + +exes = ["mfusg", "mfnwt"] + + +@requires_exe(*exes) +def test_mfusg_and_mfnwt(): + assert all(which(exe) for exe in exes) + + +@requires_pkg("numpy") +def test_numpy(): + import numpy + + assert numpy is not None + + +@requires_pkg("numpy", "matplotlib") +def test_numpy_and_matplotlib(): + import matplotlib + import numpy + + assert numpy is not None and matplotlib is not None -def test_github_api(gh_api): - assert gh_api.get_user().login +@requires_platform("Windows") +def test_needs_windows(): + assert system == "Windows" -def test_modflow6_path(modflow6_path): - assert modflow6_path.is_dir() - assert (modflow6_path / "version.txt").is_file() +@excludes_platform("Darwin", ci_only=True) +def test_breaks_osx_ci(): + if "CI" in environ: + assert system != "Darwin" diff --git a/modflow_devtools/test/test_framework.py b/modflow_devtools/test/test_framework.py index 45771f7..e69de29 100644 --- a/modflow_devtools/test/test_framework.py +++ b/modflow_devtools/test/test_framework.py @@ -1,6 +0,0 @@ -def test_build(): - pass - - -def test_run(): - pass diff --git a/modflow_devtools/test/test_setup.py b/modflow_devtools/test/test_setup.py deleted file mode 100644 index 9ed9553..0000000 --- a/modflow_devtools/test/test_setup.py +++ /dev/null @@ -1,2 +0,0 @@ -def test_model_setup(): - pass diff --git a/modflow_devtools/test/test_zip.py b/modflow_devtools/test/test_zip.py new file mode 100644 index 0000000..e69de29 diff --git a/setup.cfg b/setup.cfg index eedc767..350d2c9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = modflow-devtools -version = attr: modflow_devtools.config.__version__ +version = 0.0.1 description = modflow-devtools is a Python package containing tools for MODFLOW development. long_description = file: README.md, LICENSE.md long_description_content_type = text/markdown @@ -42,8 +42,6 @@ packages = find: python_requires = >=3.7 install_requires = numpy - requests - flopy [options.extras_require] lint = @@ -56,7 +54,6 @@ test = %(lint)s coverage flaky - PyGithub pytest pytest-cov pytest-dotenv