diff --git a/src/phylum/ci/ci_base.py b/src/phylum/ci/ci_base.py index 7bbb0b71..7163119b 100644 --- a/src/phylum/ci/ci_base.py +++ b/src/phylum/ci/ci_base.py @@ -22,9 +22,17 @@ from packaging.version import Version from rich.markdown import Markdown -from phylum.ci.common import DataclassJSONEncoder, JobPolicyEvalResult, LockfileEntries, LockfileEntry, ReturnCode +from phylum.ci.common import ( + DataclassJSONEncoder, + JobPolicyEvalResult, + LockfileEntries, + LockfileEntry, + PackageDescriptor, + Packages, + ReturnCode, +) from phylum.ci.depfile import Depfile, Depfiles, DepfileType, parse_current_depfile -from phylum.ci.git import git_hash_object, git_repo_name +from phylum.ci.git import git_hash_object, git_repo_name, remove_git_worktree from phylum.console import console from phylum.constants import ENVVAR_NAME_TOKEN, MIN_CLI_VER_INSTALLED from phylum.exceptions import PhylumCalledProcessError, pprint_subprocess_error @@ -173,13 +181,13 @@ def _filter_depfiles(self, provided_depfiles: LockfileEntries) -> Depfiles: for provided_depfile in provided_depfiles: # Make sure it exists if not provided_depfile.path.exists(): - LOG.warning("Provided dependency file does not exist: %s", provided_depfile.path) + LOG.warning("Provided dependency file does not exist: %r", provided_depfile) self.returncode = ReturnCode.DEPFILE_FILTER continue # Make sure it is not an empty file if not provided_depfile.path.stat().st_size: - LOG.warning("Provided dependency file is an empty file: %s", provided_depfile.path) + LOG.warning("Provided dependency file is an empty file: %r", provided_depfile) self.returncode = ReturnCode.DEPFILE_FILTER continue @@ -188,14 +196,13 @@ def _filter_depfiles(self, provided_depfiles: LockfileEntries) -> Depfiles: _ = parse_current_depfile(self.cli_path, provided_depfile.type, provided_depfile.path) except subprocess.CalledProcessError as err: pprint_subprocess_error(err) - _path, _type = provided_depfile.path, provided_depfile.type msg = f"""\ - Provided dependency file [code]{_path}[/] failed to parse as - lockfile type [code]{_type}[/]. If this is a manifest, consider + Provided dependency file [code]{provided_depfile!r}[/] failed to parse as + lockfile type [code]{provided_depfile.type}[/]. If this is a manifest, consider supplying lockfile type explicitly in the `.phylum_project` file. For more info, see: https://docs.phylum.io/docs/lockfile_generation - Please report this as a bug if you believe [code]{_path}[/] - is a valid [code]{_type}[/] dependency file.""" + Please report this as a bug if you believe [code]{provided_depfile!r}[/] + is a valid [code]{provided_depfile.type}[/] dependency file.""" LOG.warning(textwrap.dedent(msg), extra=MARKUP) self.returncode = ReturnCode.DEPFILE_FILTER continue @@ -203,23 +210,23 @@ def _filter_depfiles(self, provided_depfiles: LockfileEntries) -> Depfiles: # Classify the file as a manifest or lockfile if provided_depfile in self.potential_manifests and provided_depfile in self.potential_lockfiles: msg = f"""\ - Provided dependency file [code]{provided_depfile.path}[/] is a lockifest. - It will be treated as a manifest. + Provided dependency file [code]{provided_depfile!r}[/] is a [b]lockifest[/]. + It will be treated as a [b]manifest[/]. For more info, see: https://docs.phylum.io/docs/lockfile_generation""" LOG.warning(textwrap.dedent(msg), extra=MARKUP) - depfile = Depfile(provided_depfile, self.cli_path, self.common_ancestor_commit, DepfileType.LOCKIFEST) + depfile = Depfile(provided_depfile, self.cli_path, DepfileType.LOCKIFEST) elif provided_depfile in self.potential_manifests: - LOG.debug("Provided dependency file [code]%s[/] is a manifest", provided_depfile.path, extra=MARKUP) - depfile = Depfile(provided_depfile, self.cli_path, self.common_ancestor_commit, DepfileType.MANIFEST) + LOG.debug("Provided dependency file [code]%r[/] is a [b]manifest[/]", provided_depfile, extra=MARKUP) + depfile = Depfile(provided_depfile, self.cli_path, DepfileType.MANIFEST) elif provided_depfile in self.potential_lockfiles: - LOG.debug("Provided dependency file [code]%s[/] is a lockfile", provided_depfile.path, extra=MARKUP) - depfile = Depfile(provided_depfile, self.cli_path, self.common_ancestor_commit, DepfileType.LOCKFILE) + LOG.debug("Provided dependency file [code]%r[/] is a [b]lockfile[/]", provided_depfile, extra=MARKUP) + depfile = Depfile(provided_depfile, self.cli_path, DepfileType.LOCKFILE) else: msg = f"""\ - Provided dependency file [code]{provided_depfile.path}[/] is an unknown type. - It will be treated as a manifest.""" + Provided dependency file [code]{provided_depfile!r}[/] is an [b]unknown[/] type. + It will be treated as a [b]manifest[/].""" LOG.warning(textwrap.dedent(msg), extra=MARKUP) - depfile = Depfile(provided_depfile, self.cli_path, self.common_ancestor_commit, DepfileType.UNKNOWN) + depfile = Depfile(provided_depfile, self.cli_path, DepfileType.UNKNOWN) depfiles.append(depfile) # Check for the presence of a manifest file @@ -512,15 +519,15 @@ def analyze(self) -> None: if self.all_deps: LOG.info("Considering all current dependencies ...") - base_pkgs = [] - total_packages = {pkg for depfile in self.depfiles for pkg in depfile.current_deps} - LOG.debug("%s unique current dependencies", len(total_packages)) + base_packages = [] + current_packages = {pkg for depfile in self.depfiles for pkg in depfile.deps} + LOG.debug("%s unique current dependencies from %s file(s)", len(current_packages), len(self.depfiles)) else: LOG.info("Only considering newly added dependencies ...") - base_pkgs = sorted({pkg for depfile in self.depfiles for pkg in depfile.base_deps}) + base_packages = self._get_base_packages() with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8", prefix="base_", suffix=".json") as base_fd: - json.dump(base_pkgs, base_fd, cls=DataclassJSONEncoder) + json.dump(base_packages, base_fd, cls=DataclassJSONEncoder) base_fd.flush() cmd.append(base_fd.name) cmd.extend(f"{depfile.path}:{depfile.type}" for depfile in self.depfiles) @@ -544,6 +551,67 @@ def analyze(self) -> None: self._parse_analysis_result(analysis_result) + def _get_base_packages(self) -> Packages: + """Get the dependencies from the common ancestor commit and return them in sorted order.""" + if not self.common_ancestor_commit: + LOG.info("No common ancestor commit for `%r`. Assuming no base dependencies.", self) + return [] + + with tempfile.TemporaryDirectory(prefix="phylum_") as temp_dir: + cmd = ["git", "worktree", "add", "--detach", temp_dir, self.common_ancestor_commit] + LOG.debug("Adding a git worktree for the base iteration in a temporary directory ...") + LOG.debug("Using command: %s", shlex.join(cmd)) + try: + subprocess.run(cmd, check=True, capture_output=True, text=True) # noqa: S603 + except subprocess.CalledProcessError as err: + pprint_subprocess_error(err) + LOG.error("Due to error, assuming no base dependencies. Please report this as a bug.") + return [] + + temp_dir_path = Path(temp_dir).resolve() + base_packages = set() + for depfile in self.depfiles: + prev_depfile_path = temp_dir_path / depfile.path.relative_to(Path.cwd()) + cmd = [str(self.cli_path), "parse", "--lockfile-type", depfile.type, str(prev_depfile_path)] + LOG.info( + "Parsing [code]%r[/] as previous [code]%s[/] [b]%s[/] ...", + depfile, + depfile.type, + depfile.depfile_type.value, + extra=MARKUP, + ) + if depfile.is_manifest: + LOG.info(" This may take a while") + LOG.debug("Using parse command: %s", shlex.join(cmd)) + try: + parse_result = subprocess.run( + cmd, # noqa: S603 + cwd=temp_dir_path, + check=True, + capture_output=True, + text=True, + ).stdout.strip() + except subprocess.CalledProcessError as err: + pprint_subprocess_error(err) + msg = f"""\ + Due to error, assuming no previous [b]{depfile.depfile_type.value}[/] packages. + Consider supplying lockfile type explicitly in `.phylum_project` file. + For more info, see: https://docs.phylum.io/docs/lockfile_generation + Please report this as a bug if you believe [code]{depfile!r}[/] + is a valid [code]{depfile.type}[/] [b]{depfile.depfile_type.value}[/] at revision + [code]{self.common_ancestor_commit}[/].""" + LOG.warning(textwrap.dedent(msg), extra=MARKUP) + remove_git_worktree(temp_dir_path) + return [] + + parsed_pkgs = json.loads(parse_result) + prev_depfile_packages = [PackageDescriptor(**pkg) for pkg in parsed_pkgs] + base_packages.update(prev_depfile_packages) + + remove_git_worktree(temp_dir_path) + + return sorted(base_packages) + def _parse_analysis_result(self, analysis_result: str) -> None: """Parse the results of a Phylum analysis command output.""" analysis = JobPolicyEvalResult(**json.loads(analysis_result)) diff --git a/src/phylum/ci/ci_precommit.py b/src/phylum/ci/ci_precommit.py index 2f237f97..b494a88e 100644 --- a/src/phylum/ci/ci_precommit.py +++ b/src/phylum/ci/ci_precommit.py @@ -38,7 +38,10 @@ def _check_prerequisites(self) -> None: * The extra unparsed arguments passed to the CLI represent the staged files, no more and no less """ super()._check_prerequisites() + + # "Repeat" these calls from the base class to ensure properties are set before using them self._backup_project_file() + self._find_potential_depfiles() cmd = ["git", "diff", "--cached", "--name-only"] try: diff --git a/src/phylum/ci/depfile.py b/src/phylum/ci/depfile.py index 7464c73b..0357234d 100644 --- a/src/phylum/ci/depfile.py +++ b/src/phylum/ci/depfile.py @@ -12,12 +12,11 @@ import shlex import shutil import subprocess -import tempfile import textwrap from typing import Optional, TypeVar from phylum.ci.common import LockfileEntry, PackageDescriptor, Packages -from phylum.exceptions import PhylumCalledProcessError, pprint_subprocess_error +from phylum.exceptions import PhylumCalledProcessError from phylum.logger import LOG, MARKUP # Starting with Python 3.11, the `typing.Self` type was introduced to do this same thing. @@ -45,18 +44,11 @@ class DepfileType(Enum): class Depfile: """Provide methods for an instance of a dependency file.""" - def __init__( - self, - provided_depfile: LockfileEntry, - cli_path: Path, - common_ancestor_commit: Optional[str], - depfile_type: DepfileType, - ) -> None: + def __init__(self, provided_depfile: LockfileEntry, cli_path: Path, depfile_type: DepfileType) -> None: """Initialize a `Depfile` object.""" self._path = provided_depfile.path.resolve() self._type = provided_depfile.type self.cli_path = cli_path - self._common_ancestor_commit = common_ancestor_commit self._depfile_type = depfile_type self._is_depfile_changed: Optional[bool] = None @@ -103,15 +95,6 @@ def is_depfile_changed(self, value: bool) -> None: """Set the value for whether the dependency file has changed.""" self._is_depfile_changed = value - @property - def common_ancestor_commit(self) -> Optional[str]: - """Get the common ancestor commit. - - It will be returned as a string of the SHA1 sum representing the commit. - When it isn't provided, `None` will be returned. - """ - return self._common_ancestor_commit - @property def depfile_type(self) -> DepfileType: """Get the dependency file type.""" @@ -131,7 +114,7 @@ def is_manifest(self) -> bool: return self.depfile_type in {DepfileType.MANIFEST, DepfileType.LOCKIFEST, DepfileType.UNKNOWN} @cached_property - def current_deps(self) -> Packages: + def deps(self) -> Packages: """Get the dependencies from the current iteration of the dependency file and return them in sorted order.""" try: curr_depfile_packages = parse_current_depfile(self.cli_path, self.type, self.path) @@ -139,74 +122,15 @@ def current_deps(self) -> Packages: if self.is_lockfile: msg = f"""\ Please report this as a bug if you believe [code]{self!r}[/] - is a valid [code]{self.type}[/] lockfile.""" + is a valid [code]{self.type}[/] lockfile.""" else: msg = f"""\ Consider supplying lockfile type explicitly in the `.phylum_project` file. - For more info, see: https://docs.phylum.io/docs/lockfile_generation - Please report this as a bug if you believe [code]{self!r}[/] - is a valid [code]{self.type}[/] manifest file.""" - raise PhylumCalledProcessError(err, textwrap.dedent(msg)) from err - return sorted(set(curr_depfile_packages)) - - @cached_property - def base_deps(self) -> Packages: - """Get the dependencies from the base iteration of the dependency file and return them in sorted order. - - The base iteration is determined by the common ancestor commit. - """ - if not self.common_ancestor_commit: - LOG.info("No common ancestor commit for `%r`. Assuming no base dependencies.", self) - return [] - - with tempfile.TemporaryDirectory(prefix="phylum_") as temp_dir: - cmd = ["git", "worktree", "add", "--detach", temp_dir, self.common_ancestor_commit] - LOG.debug("Adding a git worktree for the base iteration in a temporary directory ...") - LOG.debug("Using command: %s", shlex.join(cmd)) - try: - subprocess.run(cmd, check=True, capture_output=True, text=True) # noqa: S603 - except subprocess.CalledProcessError as err: - pprint_subprocess_error(err) - LOG.error("Due to error, assuming no base dependencies. Please report this as a bug.") - return [] - - temp_dir_path = Path(temp_dir).resolve() - prev_depfile_path = temp_dir_path / self.path.relative_to(Path.cwd()) - cmd = [str(self.cli_path), "parse", "--lockfile-type", self.type, str(prev_depfile_path)] - LOG.info( - "Attempting to parse [code]%s[/] as previous [code]%s[/] %s. This may take a while.", - self.path, - self.type, - self.depfile_type.value, - extra=MARKUP, - ) - LOG.debug("Using parse command: %s", shlex.join(cmd)) - try: - parse_result = subprocess.run( - cmd, # noqa: S603 - cwd=temp_dir_path, - check=True, - capture_output=True, - text=True, - ).stdout.strip() - except subprocess.CalledProcessError as err: - pprint_subprocess_error(err) - msg = f"""\ - Due to error, assuming no previous {self.depfile_type.value} packages. - Consider supplying lockfile type explicitly in `.phylum_project` file. For more info, see: https://docs.phylum.io/docs/lockfile_generation Please report this as a bug if you believe [code]{self!r}[/] - is a valid [code]{self.type}[/] {self.depfile_type.value} at revision - [code]{self.common_ancestor_commit}[/].""" - LOG.warning(textwrap.dedent(msg), extra=MARKUP) - remove_git_worktree(temp_dir_path) - return [] - - remove_git_worktree(temp_dir_path) - - parsed_pkgs = json.loads(parse_result) - prev_depfile_packages = [PackageDescriptor(**pkg) for pkg in parsed_pkgs] - return sorted(set(prev_depfile_packages)) + is a valid [code]{self.type}[/] manifest file.""" + raise PhylumCalledProcessError(err, textwrap.dedent(msg)) from err + return sorted(set(curr_depfile_packages)) # Type alias @@ -231,22 +155,3 @@ def parse_current_depfile(cli_path: Path, lockfile_type: str, depfile_path: Path parsed_pkgs = json.loads(parse_result) curr_depfile_packages = [PackageDescriptor(**pkg) for pkg in parsed_pkgs] return curr_depfile_packages - - -def remove_git_worktree(worktree: Path) -> None: - """Remove a given git worktree. - - If a working tree is deleted without using `git worktree remove`, then its associated administrative files, which - reside in the repository, will eventually be removed automatically. - Ref: https://git-scm.com/docs/git-worktree - - Use this function to remove the worktree now since the default for the `gc.worktreePruneExpire` setting is 3 months. - """ - cmd = ["git", "worktree", "remove", "--force", str(worktree)] - LOG.debug("Removing the git worktree for the base iteration ...") - LOG.debug("Using command: %s", shlex.join(cmd)) - try: - subprocess.run(cmd, check=True, capture_output=True, text=True) # noqa: S603 - except subprocess.CalledProcessError as err: - pprint_subprocess_error(err) - LOG.warning("Unable to remove the git worktree. Try running `git worktree prune` manually.") diff --git a/src/phylum/ci/git.py b/src/phylum/ci/git.py index 1d8a46b5..b89ef319 100644 --- a/src/phylum/ci/git.py +++ b/src/phylum/ci/git.py @@ -214,3 +214,26 @@ def git_repo_name(git_c_path: Optional[Path] = None) -> str: repo_name = full_repo_path.stem return repo_name + + +def remove_git_worktree(worktree: Path, git_c_path: Optional[Path] = None) -> None: + """Remove a given git worktree. + + The optional `git_c_path` is used to tell `git` to run as if it were started in that + path instead of the current working directory, which is the default when not provided. + + If a working tree is deleted without using `git worktree remove`, then its associated + administrative files, which reside in the repository, will eventually be removed automatically. + Ref: https://git-scm.com/docs/git-worktree + + Use this function to remove the worktree now since the default for the `gc.worktreePruneExpire` setting is 3 months. + """ + base_cmd = git_base_cmd(git_c_path=git_c_path) + cmd = [*base_cmd, "worktree", "remove", "--force", str(worktree)] + LOG.debug("Removing the git worktree for the base iteration ...") + LOG.debug("Using command: %s", shlex.join(cmd)) + try: + subprocess.run(cmd, check=True, capture_output=True, text=True) # noqa: S603 + except subprocess.CalledProcessError as err: + pprint_subprocess_error(err) + LOG.warning("Unable to remove the git worktree. Try running `git worktree prune` manually.") diff --git a/tests/unit/test_lockfile_parse.py b/tests/unit/test_lockfile_parse.py index 14bab47f..20803ded 100644 --- a/tests/unit/test_lockfile_parse.py +++ b/tests/unit/test_lockfile_parse.py @@ -33,10 +33,10 @@ def test_current_deps(mock_run): provided_lockfile_type = "cargo" cli_path = Path("dummy_cli_path") lockfile_entry = LockfileEntry(depfile_path, provided_lockfile_type) - depfile = Depfile(lockfile_entry, cli_path, None, DepfileType.LOCKFILE) + depfile = Depfile(lockfile_entry, cli_path, DepfileType.LOCKFILE) # Test the `current_deps` property - packages = depfile.current_deps + packages = depfile.deps expected_cargo_package = PackageDescriptor("quote", "1.0.21", "cargo", "Cargo.lock") expected_npm_package = PackageDescriptor("example", "0.1.0", "npm")