diff --git a/docs/conf.py b/docs/conf.py index d4af59a9c..a2730dc62 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -9,8 +9,8 @@ project = 'pyk' author = 'Runtime Verification, Inc' copyright = '2024, Runtime Verification, Inc' -version = '0.1.627' -release = '0.1.627' +version = '0.1.628' +release = '0.1.628' # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration diff --git a/package/version b/package/version index 83ea503f0..1fa102d5a 100644 --- a/package/version +++ b/package/version @@ -1 +1 @@ -0.1.627 +0.1.628 diff --git a/pyproject.toml b/pyproject.toml index 04c16c9a1..853405d80 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pyk" -version = "0.1.627" +version = "0.1.628" description = "" authors = [ "Runtime Verification, Inc. ", diff --git a/src/pyk/proof/parallel.py b/src/pyk/proof/parallel.py index bb5c1c866..2cf9201d2 100644 --- a/src/pyk/proof/parallel.py +++ b/src/pyk/proof/parallel.py @@ -1,21 +1,27 @@ from __future__ import annotations +import os +import time from abc import ABC, abstractmethod -from concurrent.futures import CancelledError, ProcessPoolExecutor, wait +from dataclasses import dataclass +from multiprocessing import Process, Queue +from queue import Empty + +# from concurrent.futures import CancelledError, ProcessPoolExecutor, wait from typing import TYPE_CHECKING, Any, Generic, TypeVar from pyk.proof.proof import ProofStatus if TYPE_CHECKING: from collections.abc import Iterable, Mapping - from concurrent.futures import Executor, Future P = TypeVar('P', bound='Proof') U = TypeVar('U') +D = TypeVar('D') -class Prover(ABC, Generic[P, U]): +class Prover(ABC, Generic[P, U, D]): """ Should contain all data needed to make progress on a `P` (proof). May be specific to a single `P` (proof) or may be able to handle multiple. @@ -28,7 +34,7 @@ class Prover(ABC, Generic[P, U]): """ @abstractmethod - def steps(self, proof: P) -> Iterable[ProofStep[U]]: + def steps(self, proof: P) -> Iterable[ProofStep[U, D]]: """ Return a list of `ProofStep[U]` which represents all the computation jobs as defined by `ProofStep`, which have not yet been computed and committed, and are available given the current state of `proof`. Note that this is a requirement which is not enforced by the type system. If `steps()` or `commit()` has been called on a proof `proof`, `steps()` may never again be called on `proof`. @@ -66,7 +72,7 @@ def status(self) -> ProofStatus: ... -class ProofStep(ABC, Generic[U]): +class ProofStep(ABC, Generic[U, D]): """ Should be a description of a computation needed to make progress on a `Proof`. Must be hashable. @@ -76,7 +82,7 @@ class ProofStep(ABC, Generic[U]): """ @abstractmethod - def exec(self) -> U: + def exec(self, data: D) -> U: """ Should perform some nontrivial computation given by `self`, which can be done independently of other calls to `exec()`. Allowed to be nondeterministic. @@ -85,59 +91,204 @@ def exec(self) -> U: ... +@dataclass +class ProfilingInfo: + total_commit_time = 0 + total_steps_time = 0 + total_wait_time = 0 + total_time = 0 + + +class SemanticsProcessPool: + _max_workers: int + processes: list[Process] + in_queue: Queue + out_queue: Queue + busy_queue: Queue + pending_jobs: int = 0 + process_data: Mapping[str, Any] + + def __init__(self, max_workers: int): + self._max_workers = max_workers + self.in_queue = Queue() + self.out_queue = Queue() + self.busy_queue = Queue() + self.processes = [] + + def submit(self, proof_id: str, proof_step: ProofStep) -> None: + process_status = {} + + while True: + try: + msg = self.busy_queue.get_nowait() + pid, status = msg + process_status[pid] = status + except Empty: + break + + if not any(process_status.keys()): + self.processes.append(Process(target=self.run_process, args=(self.process_data,))) + + self.in_queue.put((proof_id, proof_step)) + self.pending_jobs += 1 + + def run_process(self) -> None: + while True: + self.busy_queue.put((os.getpid(), True)) + dequeued = self.in_queue.get() + self.busy_queue.put((os.getpid(), False)) + if dequeued == 0: + break + proof_id, proof_step = dequeued + data = self.process_data.get(proof_id) + update = proof_step.exec(data) + self.out_queue.put((proof_id, update)) + + def prove_parallel( proofs: Mapping[str, Proof], provers: Mapping[str, Prover], + process_data: Mapping[str, Any], max_workers: int, -) -> Iterable[Proof]: - pending: dict[Future[Any], str] = {} +) -> tuple[Iterable[Proof], ProfilingInfo]: explored: set[tuple[str, ProofStep]] = set() - def submit(proof_id: str, pool: Executor) -> None: + in_queue: Queue = Queue() + out_queue: Queue = Queue() + + pending_jobs: int = 0 + + profile = ProfilingInfo() + + total_init_time = time.time_ns() + + def run_process(data: Any) -> None: + while True: + dequeued = in_queue.get() + if dequeued == 0: + break + proof_id, proof_step = dequeued + data = process_data.get(proof_id) + update = proof_step.exec(data) + out_queue.put((proof_id, update)) + + def submit(proof_id: str) -> None: proof = proofs[proof_id] prover = provers[proof_id] - for step in prover.steps(proof): # <-- get next steps (represented by e.g. pending nodes, ...) + steps_init_time = time.time_ns() + steps = prover.steps(proof) + profile.total_steps_time += time.time_ns() - steps_init_time + for step in steps: # <-- get next steps (represented by e.g. pending nodes, ...) if (proof_id, step) in explored: continue explored.add((proof_id, step)) - future = pool.submit(step.exec) # <-- schedule steps for execution - pending[future] = proof_id + in_queue.put((proof_id, step)) + nonlocal pending_jobs + pending_jobs += 1 - with ProcessPoolExecutor(max_workers=max_workers) as pool: - for proof_id in proofs: - submit(proof_id, pool) + processes = [Process(target=run_process, args=(process_data,)) for _ in range(max_workers)] + for process in processes: + process.start() - while pending: - done, _ = wait(pending, return_when='FIRST_COMPLETED') - future = done.pop() + for proof_id in proofs.keys(): + submit(proof_id) - proof_id = pending[future] - proof = proofs[proof_id] - prover = provers[proof_id] - try: - update = future.result() - except CancelledError as err: - raise RuntimeError(f'Task was cancelled for proof {proof_id}') from err - except TimeoutError as err: - raise RuntimeError( - f"Future for proof {proof_id} was not finished executing and timed out. This shouldn't happen since this future was already waited on." - ) from err - except Exception as err: - raise RuntimeError('Exception was raised in ProofStep.exec() for proof {proof_id}.') from err - - prover.commit(proof, update) # <-- update the proof (can be in-memory, access disk with locking, ...) - - match proof.status: - # terminate on first failure, yield partial results, etc. - case ProofStatus.FAILED: - ... - case ProofStatus.PENDING: - if not list(prover.steps(proof)): - raise ValueError('Prover violated expectation. status is pending with no further steps.') - case ProofStatus.PASSED: - if list(prover.steps(proof)): - raise ValueError('Prover violated expectation. status is passed with further steps.') - - submit(proof_id, pool) - pending.pop(future) - return proofs.values() + while pending_jobs > 0: + wait_init_time = time.time_ns() + proof_id, update = out_queue.get() + profile.total_wait_time += time.time_ns() - wait_init_time + pending_jobs -= 1 + + proof = proofs[proof_id] + prover = provers[proof_id] + + commit_init_time = time.time_ns() + prover.commit(proof, update) # <-- update the proof (can be in-memory, access disk with locking, ...) + profile.total_commit_time += time.time_ns() - commit_init_time + + match proof.status: + # terminate on first failure, yield partial results, etc. + case ProofStatus.FAILED: + ... + case ProofStatus.PENDING: + steps_init_time = time.time_ns() + if not list(prover.steps(proof)): + raise ValueError('Prover violated expectation. status is pending with no further steps.') + profile.total_steps_time += time.time_ns() - steps_init_time + case ProofStatus.PASSED: + steps_init_time = time.time_ns() + if list(prover.steps(proof)): + raise ValueError('Prover violated expectation. status is passed with further steps.') + profile.total_steps_time += time.time_ns() - steps_init_time + + submit(proof_id) + + for _ in range(max_workers): + in_queue.put(0) + + for process in processes: + process.join() + + profile.total_time = time.time_ns() - total_init_time + + return proofs.values(), profile + + +# def prove_parallel( +# proofs: Mapping[str, Proof], +# provers: Mapping[str, Prover], +# process_data: Mapping[str, Any], +# max_workers: int, +# ) -> Iterable[Proof]: +# pending: dict[Future[Any], str] = {} +# explored: set[tuple[str, ProofStep]] = set() +# +# def submit(proof_id: str, pool: SemanticsProcessPool) -> None: +# proof = proofs[proof_id] +# prover = provers[proof_id] +# for step in prover.steps(proof): # <-- get next steps (represented by e.g. pending nodes, ...) +# if (proof_id, step) in explored: +# continue +# explored.add((proof_id, step)) +# future = pool.submit((proof_id, step)) # <-- schedule steps for execution +# pending[future] = proof_id +# +# pool = SemanticsProcessPool(max_workers=max_workers) +# # with ProcessPoolExecutor(max_workers=max_workers) as pool: +# for proof_id in proofs: +# submit(proof_id, pool) +# +# while pending: +# done, _ = wait(pending, return_when='FIRST_COMPLETED') +# future = done.pop() +# +# proof_id = pending[future] +# proof = proofs[proof_id] +# prover = provers[proof_id] +# try: +# update = future.result() +# except CancelledError as err: +# raise RuntimeError(f'Task was cancelled for proof {proof_id}') from err +# except TimeoutError as err: +# raise RuntimeError( +# f"Future for proof {proof_id} was not finished executing and timed out. This shouldn't happen since this future was already waited on." +# ) from err +# except Exception as err: +# raise RuntimeError('Exception was raised in ProofStep.exec() for proof {proof_id}.') from err +# +# prover.commit(proof, update) # <-- update the proof (can be in-memory, access disk with locking, ...) +# +# match proof.status: +# # terminate on first failure, yield partial results, etc. +# case ProofStatus.FAILED: +# ... +# case ProofStatus.PENDING: +# if not list(prover.steps(proof)): +# raise ValueError('Prover violated expectation. status is pending with no further steps.') +# case ProofStatus.PASSED: +# if list(prover.steps(proof)): +# raise ValueError('Prover violated expectation. status is passed with further steps.') +# +# submit(proof_id, pool) +# pending.pop(future) +# return proofs.values() diff --git a/src/pyk/proof/reachability.py b/src/pyk/proof/reachability.py index a8e2ac018..39e8e5f0b 100644 --- a/src/pyk/proof/reachability.py +++ b/src/pyk/proof/reachability.py @@ -4,15 +4,18 @@ import json import logging import re +import time +from abc import ABC from dataclasses import dataclass from typing import TYPE_CHECKING -from pyk.kore.rpc import LogEntry +import pyk.proof.parallel as parallel +from pyk.kore.rpc import KoreClient, KoreExecLogFormat, LogEntry from ..kast.inner import KInner, Subst from ..kast.manip import flatten_label, ml_pred_to_bool from ..kast.outer import KFlatModule, KImport, KRule -from ..kcfg import KCFG +from ..kcfg import KCFG, KCFGExplore from ..kcfg.exploration import KCFGExploration from ..konvert import kflatmodule_to_kore from ..prelude.kbool import BOOL, TRUE @@ -26,16 +29,21 @@ from pathlib import Path from typing import Any, Final, TypeVar + from pyk.kcfg.semantics import KCFGSemantics + from pyk.utils import BugReport + + from ..cterm import CSubst, CTerm from ..kast.outer import KClaim, KDefinition, KFlatModuleList, KRuleLike - from ..kcfg import KCFGExplore + from ..kcfg.explore import ExtendResult from ..kcfg.kcfg import NodeIdLike + from ..ktool.kprint import KPrint T = TypeVar('T', bound='Proof') _LOGGER: Final = logging.getLogger(__name__) -class APRProof(Proof, KCFGExploration): +class APRProof(Proof, KCFGExploration, parallel.Proof): """APRProof and APRProver implement all-path reachability logic, as introduced by A. Stefanescu and others in their paper 'All-Path Reachability Logic': https://doi.org/10.23638/LMCS-15(2:5)2019 @@ -53,6 +61,7 @@ class APRProof(Proof, KCFGExploration): logs: dict[int, tuple[LogEntry, ...]] circularity: bool failure_info: APRFailureInfo | None + checked_for_subsumption: set[int] def __init__( self, @@ -62,6 +71,7 @@ def __init__( init: NodeIdLike, target: NodeIdLike, logs: dict[int, tuple[LogEntry, ...]], + checked_for_subsumption: set[int] | None = None, bmc_depth: int | None = None, bounded: Iterable[int] | None = None, proof_dir: Path | None = None, @@ -73,6 +83,7 @@ def __init__( Proof.__init__(self, id, proof_dir=proof_dir, subproof_ids=subproof_ids, admitted=admitted) KCFGExploration.__init__(self, kcfg, terminal) + self.checked_for_subsumption = checked_for_subsumption if checked_for_subsumption is not None else set() self.failure_info = None self.init = kcfg._resolve(init) self.target = kcfg._resolve(target) @@ -136,7 +147,7 @@ def is_target(self, node_id: NodeIdLike) -> bool: def is_failing(self, node_id: NodeIdLike) -> bool: return ( self.kcfg.is_leaf(node_id) - and not self.is_explorable(node_id) + and ((node_id in self.checked_for_subsumption) or self.kcfg.is_stuck(node_id)) and not self.is_target(node_id) and not self.is_refuted(node_id) and not self.kcfg.is_vacuous(node_id) @@ -201,6 +212,7 @@ def from_dict(cls: type[APRProof], dct: Mapping[str, Any], proof_dir: Path | Non admitted = dct.get('admitted', False) subproof_ids = dct['subproof_ids'] if 'subproof_ids' in dct else [] node_refutations: dict[int, str] = {} + checked_for_subsumption = set(dct['checked_for_subsumption']) if 'node_refutation' in dct: node_refutations = { kcfg._resolve(int(node_id)): proof_id for node_id, proof_id in dct['node_refutations'].items() @@ -227,6 +239,7 @@ def from_dict(cls: type[APRProof], dct: Mapping[str, Any], proof_dir: Path | Non proof_dir=proof_dir, subproof_ids=subproof_ids, node_refutations=node_refutations, + checked_for_subsumption=checked_for_subsumption, ) @staticmethod @@ -363,6 +376,7 @@ def dict(self) -> dict[str, Any]: dct['bmc_depth'] = self.bmc_depth dct['node_refutations'] = {node_id: proof.id for (node_id, proof) in self.node_refutations.items()} dct['circularity'] = self.circularity + dct['checked_for_subsumption'] = list(self.checked_for_subsumption) logs = {int(k): [l.to_dict() for l in ls] for k, ls in self.logs.items()} dct['logs'] = logs return dct @@ -410,6 +424,7 @@ def read_proof_data(proof_dir: Path, id: str) -> APRProof: terminal = proof_dict['terminal'] logs = {int(k): tuple(LogEntry.from_dict(l) for l in ls) for k, ls in proof_dict['logs'].items()} subproof_ids = proof_dict['subproof_ids'] + checked_for_subsumption = proof_dict['checked_for_subsumption'] node_refutations = { kcfg._resolve(int(node_id)): proof_id for node_id, proof_id in proof_dict['node_refutations'].items() } @@ -428,6 +443,7 @@ def read_proof_data(proof_dir: Path, id: str) -> APRProof: proof_dir=proof_dir, subproof_ids=subproof_ids, node_refutations=node_refutations, + checked_for_subsumption=checked_for_subsumption, ) def write_proof_data(self) -> None: @@ -450,6 +466,7 @@ def write_proof_data(self) -> None: self.kcfg._resolve(node_id): proof.id for (node_id, proof) in self.node_refutations.items() } dct['circularity'] = self.circularity + dct['checked_for_subsumption'] = list(self.checked_for_subsumption) logs = {int(k): [l.to_dict() for l in ls] for k, ls in self.logs.items()} dct['logs'] = logs @@ -635,6 +652,7 @@ def _check_subsume(self, node: KCFG.Node) -> bool: ) return False csubst = self.kcfg_explore.cterm_implies(node.cterm, self.proof.kcfg.node(self.proof.target).cterm) + self.proof.checked_for_subsumption.add(node.id) if csubst is not None: self.proof.kcfg.create_cover(node.id, self.proof.target, csubst=csubst) _LOGGER.info(f'Subsumed into target node {self.proof.id}: {shorten_hashes((node.id, self.proof.target))}') @@ -848,3 +866,371 @@ def print(self) -> list[str]: res_lines.append('') res_lines.append('Join the Runtime Verification Discord server for support: https://discord.gg/CurfmXNtbN') return res_lines + + +@dataclass(frozen=True) +class APRProofResult(ABC): + cterm_implies_time: int + extend_cterm_time: int + + +@dataclass(frozen=True) +class APRProofExtendResult(APRProofResult): + extend_result: ExtendResult + node_id: int + + +@dataclass(frozen=True) +class APRProofSubsumeResult(APRProofResult): + node_id: int + subsume_node_id: int + csubst: CSubst | None + + +class APRProofProcessData: + kprint: KPrint + kcfg_semantics: KCFGSemantics | None + + definition_dir: str | Path + llvm_definition_dir: Path | None + module_name: str + command: str | Iterable[str] | None + smt_timeout: int | None + smt_retry_limit: int | None + smt_tactic: str | None + haskell_log_format: KoreExecLogFormat + haskell_log_entries: Iterable[str] + log_axioms_file: Path | None + + def __init__( + self, + kprint: KPrint, + kcfg_semantics: KCFGSemantics, + definition_dir: str | Path, + module_name: str, + llvm_definition_dir: Path | None = None, + command: str | Iterable[str] | None = None, + smt_timeout: int | None = None, + smt_retry_limit: int | None = None, + smt_tactic: str | None = None, + haskell_log_format: KoreExecLogFormat = KoreExecLogFormat.ONELINE, + haskell_log_entries: Iterable[str] = (), + log_axioms_file: Path | None = None, + ) -> None: + self.kprint = kprint + self.kcfg_semantics = kcfg_semantics + self.definition_dir = definition_dir + self.llvm_definition_dir = llvm_definition_dir + self.module_name = module_name + self.command = command + self.smt_timeout = smt_timeout + self.smt_retry_limit = smt_retry_limit + self.smt_tactic = smt_tactic + self.haskell_log_format = haskell_log_format + self.haskell_log_entries = haskell_log_entries + self.log_axioms_file = log_axioms_file + + +class ParallelAPRProver(parallel.Prover[APRProof, APRProofResult, APRProofProcessData]): + prover: APRProver + kcfg_explore: KCFGExplore + port: int + client: KoreClient + + execute_depth: int | None + cut_point_rules: Iterable[str] + terminal_rules: Iterable[str] + + kprint: KPrint + kcfg_semantics: KCFGSemantics | None + id: str | None + trace_rewrites: bool + + bug_report: BugReport | None + bug_report_id: str | None + + total_cterm_implies_time: int + total_cterm_extend_time: int + + max_iterations: int | None + iterations: int + fail_fast: bool + + counterexample_info: bool + always_check_subsumption: bool + fast_check_subsumption: bool + + _checked_nodes: list[int] + + def __init__( + self, + proof: APRProof, + module_name: str, + definition_dir: str | Path, + execute_depth: int | None, + kprint: KPrint, + kcfg_semantics: KCFGSemantics | None, + port: int, + id: str | None, + trace_rewrites: bool, + cut_point_rules: Iterable[str], + terminal_rules: Iterable[str], + bug_report_id: str | None, + llvm_definition_dir: Path | None = None, + command: str | Iterable[str] | None = None, + bug_report: BugReport | None = None, + smt_timeout: int | None = None, + smt_retry_limit: int | None = None, + smt_tactic: str | None = None, + haskell_log_format: KoreExecLogFormat = KoreExecLogFormat.ONELINE, + haskell_log_entries: Iterable[str] = (), + log_axioms_file: Path | None = None, + max_iterations: int | None = None, + fail_fast: bool = False, + counterexample_info: bool = False, + always_check_subsumption: bool = False, + fast_check_subsumption: bool = False, + ) -> None: + self.execute_depth = execute_depth + self.cut_point_rules = cut_point_rules + self.terminal_rules = terminal_rules + self.kprint = kprint + self.kcfg_semantics = kcfg_semantics + self.id = id + self.trace_rewrites = trace_rewrites + self.port = port + self.bug_report = bug_report + self.bug_report_id = bug_report_id + self.total_cterm_extend_time = 0 + self.total_cterm_implies_time = 0 + self.client = KoreClient( + host='localhost', + port=self.port, + bug_report=self.bug_report, + bug_report_id=self.bug_report_id, + ) + self.kcfg_explore = KCFGExplore( + kprint=self.kprint, + kore_client=self.client, + kcfg_semantics=self.kcfg_semantics, + id=self.id, + trace_rewrites=self.trace_rewrites, + ) + self.prover = APRProver( + proof=proof, + kcfg_explore=self.kcfg_explore, + counterexample_info=counterexample_info, + always_check_subsumption=always_check_subsumption, + fast_check_subsumption=fast_check_subsumption, + ) + self.prover._check_all_terminals() + self.max_iterations = max_iterations + self.iterations = 0 + self.fail_fast = fail_fast + self._checked_nodes = [] + + def __del__(self) -> None: + self.client.close() + + def steps(self, proof: APRProof) -> Iterable[APRProofStep]: + """ + Return a list of `ProofStep[U]` which represents all the computation jobs as defined by `ProofStep`, which have not yet been computed and committed, and are available given the current state of `proof`. Note that this is a requirement which is not enforced by the type system. + If `steps()` or `commit()` has been called on a proof `proof`, `steps()` may never again be called on `proof`. + Must not modify `self` or `proof`. + The output of this function must only change with calls to `self.commit()`. + """ + if self.max_iterations is not None and self.iterations >= self.max_iterations: + return [] + + if self.fail_fast and proof.failed: + _LOGGER.warning( + f'Terminating proof early because fail_fast is set {proof.id}, failing nodes: {[nd.id for nd in proof.failing]}' + ) + return [] + + steps: list[APRProofStep] = [] + target_node = proof.kcfg.node(proof.target) + + for pending_node in proof.pending: + module_name = ( + self.prover.circularities_module_name + if self.prover.nonzero_depth(pending_node) + else self.prover.dependencies_module_name + ) + steps.append( + APRProofStep( + proof_id=proof.id, + cterm=pending_node.cterm, + node_id=pending_node.id, + dependencies_module_name=self.prover.dependencies_module_name, + circularities_module_name=self.prover.circularities_module_name, + target_cterm=target_node.cterm, + target_node_id=target_node.id, + use_module_name=module_name, + port=self.port, + execute_depth=self.execute_depth, + terminal_rules=self.terminal_rules, + cut_point_rules=self.cut_point_rules, + id=self.id, + trace_rewrites=self.trace_rewrites, + bug_report=self.bug_report, + bug_report_id=self.bug_report_id, + is_terminal=(self.kcfg_explore.kcfg_semantics.is_terminal(pending_node.cterm)), + target_is_terminal=(proof.target not in proof._terminal), + main_module_name=self.prover.main_module_name, + circularity=proof.circularity, + depth_is_nonzero=self.prover.nonzero_depth(pending_node), + ) + ) + return steps + + def commit(self, proof: APRProof, update: APRProofResult) -> None: + """ + Should update `proof` according to `update`. + If `steps()` or `commit()` has been called on a proof `proof`, `commit()` may never again be called on `proof`. + Must only be called with an `update` that was returned by `step.execute()` where `step` was returned by `self.steps(proof)`. + Steps for a proof `proof` can have their results submitted any time after they are made available by `self.steps(proof)`, including in any order and multiple times, and the Prover must be able to handle this. + """ + + if self.max_iterations is not None and self.iterations >= self.max_iterations: + return + + if proof.bmc_depth is not None: + if type(update) is APRProofExtendResult: + node = proof.kcfg.node(update.node_id) + if node.id not in self._checked_nodes: + _LOGGER.info(f'Checking bmc depth for node {proof.id}: {node.id}') + self._checked_nodes.append(node.id) + _prior_loops = [ + succ.source.id + for succ in proof.shortest_path_to(node.id) + if self.kcfg_explore.kcfg_semantics.same_loop(succ.source.cterm, node.cterm) + ] + prior_loops: list[NodeIdLike] = [] + for _pl in _prior_loops: + if not ( + proof.kcfg.zero_depth_between(_pl, node.id) + or any(proof.kcfg.zero_depth_between(_pl, pl) for pl in prior_loops) + ): + prior_loops.append(_pl) + _LOGGER.info(f'Prior loop heads for node {proof.id}: {(node.id, prior_loops)}') + if len(prior_loops) > proof.bmc_depth: + proof.add_bounded(node.id) + + self.iterations += 1 + if self.max_iterations is not None and self.iterations >= self.max_iterations: + _LOGGER.warning(f'Reached iteration bound {proof.id}: {self.max_iterations}') + + return + + self.prover._check_all_terminals() + + self.total_cterm_extend_time += update.extend_cterm_time + self.total_cterm_implies_time += update.cterm_implies_time + + # Extend proof as per `update` + if type(update) is APRProofExtendResult: + node = proof.kcfg.node(update.node_id) + self.kcfg_explore.extend_kcfg( + extend_result=update.extend_result, kcfg=proof.kcfg, node=node, logs=proof.logs + ) + elif type(update) is APRProofSubsumeResult: + proof.checked_for_subsumption.add(update.node_id) + if update.csubst is None: + proof._terminal.add(update.node_id) + else: + proof.kcfg.create_cover(update.node_id, proof.target, csubst=update.csubst) + + if proof.failed: + self.prover.save_failure_info() + + proof.write_proof_data() + + self.iterations += 1 + if self.max_iterations is not None and self.iterations >= self.max_iterations: + _LOGGER.warning(f'Reached iteration bound {proof.id}: {self.max_iterations}') + + +@dataclass(frozen=True, eq=True) +class APRProofStep(parallel.ProofStep[APRProofResult, APRProofProcessData]): + proof_id: str + cterm: CTerm + node_id: int + dependencies_module_name: str + circularities_module_name: str + use_module_name: str + target_cterm: CTerm + target_node_id: int + port: int + execute_depth: int | None + cut_point_rules: Iterable[str] + terminal_rules: Iterable[str] + is_terminal: bool + target_is_terminal: bool + + main_module_name: str + + bug_report: BugReport | None + bug_report_id: str | None + + id: str | None + trace_rewrites: bool + + circularity: bool + depth_is_nonzero: bool + + def __hash__(self) -> int: + return hash((self.cterm, self.node_id)) + + def exec(self, data: APRProofProcessData) -> APRProofResult: + """ + Should perform some nontrivial computation given by `self`, which can be done independently of other calls to `exec()`. + Allowed to be nondeterministic. + Able to be called on any `ProofStep` returned by `prover.steps(proof)`. + """ + with KoreClient( + host='localhost', + port=self.port, + bug_report=self.bug_report, + bug_report_id=self.bug_report_id, + ) as client: + kcfg_explore = KCFGExplore( + kprint=data.kprint, + kore_client=client, + kcfg_semantics=data.kcfg_semantics, + id=self.id, + trace_rewrites=self.trace_rewrites, + ) + + cterm_implies_time = 0 + extend_cterm_time = 0 + + if self.is_terminal or self.target_is_terminal: + init_cterm_implies_time = time.time_ns() + csubst = kcfg_explore.cterm_implies(self.cterm, self.target_cterm) + cterm_implies_time = time.time_ns() - init_cterm_implies_time + if csubst is not None or self.is_terminal: + return APRProofSubsumeResult( + node_id=self.node_id, + subsume_node_id=self.target_node_id, + csubst=csubst, + cterm_implies_time=cterm_implies_time, + extend_cterm_time=extend_cterm_time, + ) + + self.circularities_module_name if self.depth_is_nonzero else self.dependencies_module_name + init_extend_cterm_time = time.time_ns() + result = kcfg_explore.extend_cterm( + self.cterm, + module_name=self.use_module_name, + execute_depth=self.execute_depth, + terminal_rules=self.terminal_rules, + cut_point_rules=self.cut_point_rules, + ) + extend_cterm_time = time.time_ns() - init_extend_cterm_time + return APRProofExtendResult( + extend_result=result, + node_id=self.node_id, + cterm_implies_time=cterm_implies_time, + extend_cterm_time=extend_cterm_time, + ) diff --git a/src/pyk/proof/show.py b/src/pyk/proof/show.py index 31a990939..790e110a6 100644 --- a/src/pyk/proof/show.py +++ b/src/pyk/proof/show.py @@ -42,6 +42,8 @@ def node_attrs(self, kcfg: KCFG, node: KCFG.Node) -> list[str]: attrs.append('terminal') if 'stuck' in attrs: attrs.remove('stuck') + if node.id in self.proof.checked_for_subsumption: + attrs.append('checked') if self.proof.is_bounded(node.id): attrs.append('bounded') if 'stuck' in attrs: diff --git a/src/pyk/testing/_kompiler.py b/src/pyk/testing/_kompiler.py index 8eb95a394..248b25107 100644 --- a/src/pyk/testing/_kompiler.py +++ b/src/pyk/testing/_kompiler.py @@ -1,5 +1,6 @@ from __future__ import annotations +import os from abc import ABC, abstractmethod from enum import Enum from typing import ClassVar # noqa: TC003 @@ -192,6 +193,7 @@ def _kore_server( llvm_dir: Path | None, bug_report: BugReport | None, ) -> Iterator[KoreServer]: + os.environ['GHCRTS'] = '-N2' match server_type: case ServerType.LEGACY: assert not llvm_dir diff --git a/src/tests/integration/k-files/imp-simple-spec.k b/src/tests/integration/k-files/imp-simple-spec.k index de91067ea..d1ba40473 100644 --- a/src/tests/integration/k-files/imp-simple-spec.k +++ b/src/tests/integration/k-files/imp-simple-spec.k @@ -49,6 +49,7 @@ endmodule module IMP-SIMPLE-SPEC imports IMP-VERIFICATION imports IMP-SIMPLE-SPEC-DEPENDENCIES + imports K-EQUAL-SYNTAX claim [addition-1]: 3 + 4 => 7 ... @@ -83,6 +84,13 @@ module IMP-SIMPLE-SPEC requires 0 <=Int S ensures 0 while ($n <= $s) { $n = $n + 1 ; } + => . ... + + ($n |-> (0 => (?N))) + ($s |-> S:Int) + + claim [bmc-two-loops-symbolic]: while ($s <= $n) { $n = $n + -1 ; } while ($k <= $m) { $m = $m + -1 ; } $i = 1; @@ -95,6 +103,25 @@ module IMP-SIMPLE-SPEC requires 0 <=Int S andBool 0 <=Int K ensures 0 + if(B:Bool) { + $n = 1; + } + else { + $n = 2; + } + $s = 10; + while (1 <= $s) { + $s = $s + -1; + } + => . + ... + + $s |-> (10 => 0) + $n |-> (0 => (#if B:Bool #then 1 #else 2 #fi)) + + claim [failing-if]: if(_B:Bool) { $n = 1 ; } else { $n = 2 ; } => . ... $n |-> (0 => 1) diff --git a/src/tests/integration/proof/test_imp_parallel.py b/src/tests/integration/proof/test_imp_parallel.py new file mode 100644 index 000000000..4e2894083 --- /dev/null +++ b/src/tests/integration/proof/test_imp_parallel.py @@ -0,0 +1,166 @@ +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +import pytest + +from pyk.proof.parallel import prove_parallel +from pyk.proof.proof import ProofStatus +from pyk.proof.reachability import APRProof, APRProofProcessData, ParallelAPRProver +from pyk.testing import KCFGExploreTest, KPrintTest, KProveTest +from pyk.utils import single + +from ..utils import K_FILES +from .test_imp import ImpSemantics + +if TYPE_CHECKING: + from pytest import TempPathFactory + + from pyk.kast.outer import KDefinition + from pyk.kcfg.explore import KCFGExplore + from pyk.kcfg.semantics import KCFGSemantics + from pyk.kore.rpc import KoreServer + from pyk.ktool.kprove import KProve + +PARALLEL_PROVE_TEST_DATA = ( + ('addition-1', ProofStatus.PASSED, False), + ('sum-10', ProofStatus.PASSED, False), + ('dep-fail-1', ProofStatus.PASSED, True), + ('sum-N', ProofStatus.PASSED, True), + ('sum-loop', ProofStatus.PASSED, False), + ('failing-if', ProofStatus.FAILED, False), + ('long-branches', ProofStatus.PASSED, False), +) + + +@pytest.fixture(scope='function') +def proof_dir(tmp_path_factory: TempPathFactory) -> Path: + return tmp_path_factory.mktemp('proofs') + + +class TestImpParallelProve(KCFGExploreTest, KProveTest, KPrintTest): + KOMPILE_MAIN_FILE = K_FILES / 'imp-verification.k' + + def semantics(self, definition: KDefinition) -> KCFGSemantics: + return ImpSemantics(definition) + + @pytest.mark.parametrize( + 'claim_id,expected_status,admit_deps', + PARALLEL_PROVE_TEST_DATA, + ids=[test_id for test_id, *_ in PARALLEL_PROVE_TEST_DATA], + ) + def test_imp_parallel_prove( + self, + claim_id: str, + expected_status: ProofStatus, + admit_deps: bool, + kcfg_explore: KCFGExplore, + kprove: KProve, + proof_dir: Path, + _kore_server: KoreServer, + ) -> None: + spec_file = K_FILES / 'imp-simple-spec.k' + spec_module = 'IMP-SIMPLE-SPEC' + + spec_modules = kprove.get_claim_modules(Path(spec_file), spec_module_name=spec_module) + spec_label = f'{spec_module}.{claim_id}' + proofs = APRProof.from_spec_modules( + kprove.definition, + spec_modules, + spec_labels=[spec_label], + logs={}, + proof_dir=proof_dir, + ) + proof = single([p for p in proofs if p.id == spec_label]) + + if admit_deps: + for subproof in proof.subproofs: + subproof.admit() + subproof.write_proof_data() + + semantics = self.semantics(kprove.definition) + parallel_prover = ParallelAPRProver( + proof=proof, + module_name=kprove.main_module, + definition_dir=kprove.definition_dir, + execute_depth=100, + kprint=kprove, + kcfg_semantics=semantics, + id=claim_id, + trace_rewrites=False, + cut_point_rules=(), + terminal_rules=(), + bug_report=None, + bug_report_id=None, + port=_kore_server.port, + ) + + process_data = APRProofProcessData( + kprint=kprove, + kcfg_semantics=semantics, + definition_dir=kprove.definition_dir, + module_name=kprove.main_module, + ) + + results, _ = prove_parallel( + proofs={'proof1': proof}, + provers={'proof1': parallel_prover}, + max_workers=2, + process_data={'proof1': process_data}, + ) + + assert len(list(results)) == 1 + assert list(results)[0].status == expected_status + + def test_imp_bmc_parallel_prove( + self, + kcfg_explore: KCFGExplore, + kprove: KProve, + proof_dir: Path, + _kore_server: KoreServer, + ) -> None: + claim_id = 'bmc-infinite-loop' + expected_status = ProofStatus.PASSED + + spec_file = K_FILES / 'imp-simple-spec.k' + spec_module = 'IMP-SIMPLE-SPEC' + + spec_label = f'{spec_module}.{claim_id}' + + claim = single(kprove.get_claims(Path(spec_file), spec_module_name=spec_module, claim_labels=[spec_label])) + proof = APRProof.from_claim(kprove.definition, claim, logs={}, bmc_depth=5) + + semantics = self.semantics(kprove.definition) + parallel_prover = ParallelAPRProver( + proof=proof, + module_name=kprove.main_module, + definition_dir=kprove.definition_dir, + execute_depth=100, + kprint=kprove, + kcfg_semantics=semantics, + id=claim_id, + trace_rewrites=False, + cut_point_rules=['IMP.while'], + terminal_rules=(), + bug_report=None, + bug_report_id=None, + port=_kore_server.port, + ) + + process_data = APRProofProcessData( + kprint=kprove, + kcfg_semantics=semantics, + definition_dir=kprove.definition_dir, + module_name=kprove.main_module, + ) + + results, _ = prove_parallel( + proofs={'proof1': proof}, + provers={'proof1': parallel_prover}, + max_workers=2, + process_data={'proof1': process_data}, + ) + + assert len(list(results)) == 1 + assert list(results)[0].status == expected_status diff --git a/src/tests/integration/proof/test_parallel_prove.py b/src/tests/integration/proof/test_parallel_prove.py index 075e5894d..4515cdfcb 100644 --- a/src/tests/integration/proof/test_parallel_prove.py +++ b/src/tests/integration/proof/test_parallel_prove.py @@ -34,15 +34,15 @@ def status(self) -> ProofStatus: @dataclass(frozen=True) -class TreeExploreProofStep(ProofStep[int]): +class TreeExploreProofStep(ProofStep[int, None]): node: int - def exec(self) -> int: + def exec(self, data: None) -> int: time.sleep(1) return self.node -class TreeExploreProver(Prover[TreeExploreProof, int]): +class TreeExploreProver(Prover[TreeExploreProof, int, None]): def __init__(self) -> None: return @@ -93,7 +93,7 @@ def commit(self, proof: TreeExploreProof, update: int) -> None: def test_parallel_prove() -> None: prover = TreeExploreProver() proof = TreeExploreProof(0, 9, SIMPLE_TREE, set()) - results = prove_parallel({'proof1': proof}, {'proof1': prover}, max_workers=2) + results, _ = prove_parallel({'proof1': proof}, {'proof1': prover}, max_workers=2, process_data={}) assert len(list(results)) == 1 assert len(list(prover.steps(proof))) == 0 assert list(results)[0].status == ProofStatus.PASSED @@ -102,7 +102,7 @@ def test_parallel_prove() -> None: def test_parallel_fail() -> None: prover = TreeExploreProver() proof = TreeExploreProof(0, 9, SIMPLE_TREE, {6}) - results = prove_parallel({'proof1': proof}, {'proof1': prover}, max_workers=2) + results, _ = prove_parallel({'proof1': proof}, {'proof1': prover}, max_workers=2, process_data={}) assert len(list(results)) == 1 assert len(list(prover.steps(proof))) == 0 assert list(results)[0].status == ProofStatus.FAILED @@ -112,10 +112,11 @@ def test_parallel_multiple_proofs() -> None: prover = TreeExploreProver() proofs = {f'proof{i}': TreeExploreProof(0, 9, SIMPLE_TREE, set()) for i in range(3)} provers_map = {f'proof{i}': prover for i in range(3)} - results = prove_parallel( + results, _ = prove_parallel( proofs, provers_map, max_workers=4, + process_data={}, ) assert len(list(results)) == 3 for proof in proofs.values():