From a06ce4421efec4ca7e9df3d9fc7f423f81662332 Mon Sep 17 00:00:00 2001 From: Hannes Weichelt Date: Sat, 25 May 2024 19:46:51 +0200 Subject: [PATCH] work in progress: solver decision proagator for gui --- src/clingexplaid/__main__.py | 2 + src/clingexplaid/cli/textual_gui.py | 53 +++++++- src/clingexplaid/propagators/__init__.py | 2 + .../propagator_solver_decisions.py | 125 ++++++++++++++++++ 4 files changed, 175 insertions(+), 7 deletions(-) create mode 100644 src/clingexplaid/propagators/propagator_solver_decisions.py diff --git a/src/clingexplaid/__main__.py b/src/clingexplaid/__main__.py index 7bd17d7..6719b11 100644 --- a/src/clingexplaid/__main__.py +++ b/src/clingexplaid/__main__.py @@ -4,10 +4,12 @@ import sys +import clingo from clingo.application import clingo_main from .cli.clingo_app import ClingoExplaidApp from .cli.textual_gui import textual_main +from .propagators import SolverDecisionPropagator RUN_TEXTUAL_GUI = False diff --git a/src/clingexplaid/cli/textual_gui.py b/src/clingexplaid/cli/textual_gui.py index 8b834c2..3c50c5c 100644 --- a/src/clingexplaid/cli/textual_gui.py +++ b/src/clingexplaid/cli/textual_gui.py @@ -8,6 +8,7 @@ import re from typing import Any, Dict, Iterable, List, Optional, Set, Tuple +import clingo from textual import on from textual.app import App, ComposeResult from textual.containers import HorizontalScroll, Vertical, VerticalScroll @@ -28,6 +29,8 @@ Tree, ) +from ..propagators import SolverDecisionPropagator + ACTIVE_CLASS = "active" @@ -370,6 +373,20 @@ def action_exit(self) -> None: """ self.exit(0) + async def on_model(self, model): + tree = self.query_one(SolverTreeView).solve_tree + tree.root.add(f"MODEL {' '.join([str(a) for a in model])}") + await asyncio.sleep(0.1) + + def on_propagate(self, decisions): + tree = self.query_one(SolverTreeView).solve_tree + decisions_string = " -> ".join([str(d) for d in decisions]) + tree.root.add(decisions_string) + + def on_undo(self): + tree = self.query_one(SolverTreeView).solve_tree + tree.root.add("UNDO") + async def action_solve(self) -> None: """ Action to exit the textual application @@ -378,13 +395,35 @@ async def action_solve(self) -> None: solve_button = self.query_one("#solve-button") tree = tree_view.solve_tree tree.reset(tree.root.label) - tree_view.add_class("loading") - # deactivate solve button - solve_button.disabled = True - await asyncio.sleep(2) - solve_button.disabled = False - tree_view.remove_class("loading") - tree.root.add("TEST") + # tree_view.add_class("loading") + # # deactivate solve button + # solve_button.disabled = True + # await asyncio.sleep(2) + # solve_button.disabled = False + # tree_view.remove_class("loading") + + sdp = SolverDecisionPropagator( + callback_propagate=self.on_propagate, + callback_undo=self.on_undo, + ) + ctl = clingo.Control("0") + ctl.register_propagator(sdp) + ctl.add("base", [], "1{a;b}. x:-not b.") + ctl.ground([("base", [])]) + + exhausted = False + with ctl.solve(yield_=True) as solver_handle: + while not exhausted: + result = solver_handle.get() + if result.satisfiable: + model = solver_handle.model() + if model is None: + break + await self.on_model(model.symbols(atoms=True)) + exhausted = result.exhausted + if not exhausted: + solver_handle.resume() + tree.root.add("END") def flatten_list(ls: Optional[List[List[Any]]]) -> List: diff --git a/src/clingexplaid/propagators/__init__.py b/src/clingexplaid/propagators/__init__.py index a7df86b..b665ff4 100644 --- a/src/clingexplaid/propagators/__init__.py +++ b/src/clingexplaid/propagators/__init__.py @@ -7,10 +7,12 @@ from typing import List from .propagator_decision_order import DecisionOrderPropagator +from .propagator_solver_decisions import SolverDecisionPropagator DecisionLevel = List[int] DecisionLevelList = List[DecisionLevel] __all__ = [ "DecisionOrderPropagator", + "SolverDecisionPropagator", ] diff --git a/src/clingexplaid/propagators/propagator_solver_decisions.py b/src/clingexplaid/propagators/propagator_solver_decisions.py new file mode 100644 index 0000000..dbaa1a2 --- /dev/null +++ b/src/clingexplaid/propagators/propagator_solver_decisions.py @@ -0,0 +1,125 @@ +""" +Propagator Module: Decision Order +""" + +from dataclasses import dataclass +from typing import Callable, Dict, List, Optional, Sequence, Set, Tuple, Union + +import clingo +from clingo import Propagator + +INTERNAL_STRING = "INTERNAL" + + +@dataclass +class Decision: + positive: bool + literal: int + symbol: Optional[clingo.Symbol] + + def __str__(self) -> str: + symbol_string = str(self.symbol) if self.symbol is not None else INTERNAL_STRING + return f"({[" - ", " + "][self.positive]}{symbol_string})" + + +class SolverDecisionPropagator(Propagator): + """ + Propagator for showing the Solver Decisions of clingo + """ + + def __init__( + self, + signatures: Optional[Set[Tuple[str, int]]] = None, + callback_propagate: Optional[Callable] = None, + callback_undo: Optional[Callable] = None, + ): + # pylint: disable=missing-function-docstring + self.literal_symbol_lookup: Dict[int, clingo.Symbol] = {} + self.signatures = signatures if signatures is not None else set() + + self.callback_propagate: Callable = callback_propagate if callback_propagate is not None else lambda x: None + self.callback_undo: Callable = callback_undo if callback_undo is not None else lambda x: None + + def init(self, init: clingo.PropagateInit) -> None: + """ + Method to initialize the Decision Order Propagator. Here the literals are added to the Propagator's watch list. + """ + for atom in init.symbolic_atoms: + program_literal = atom.literal + solver_literal = init.solver_literal(program_literal) + self.literal_symbol_lookup[solver_literal] = atom.symbol + + for atom in init.symbolic_atoms: + if len(self.signatures) > 0 and not any(atom.match(name=s, arity=a) for s, a in self.signatures): + continue + symbolic_atom = init.symbolic_atoms[atom.symbol] + if symbolic_atom is None: + continue # nocoverage + query_program_literal = symbolic_atom.literal + query_solver_literal = init.solver_literal(query_program_literal) + init.add_watch(query_solver_literal) + init.add_watch(-query_solver_literal) + + def propagate(self, control: clingo.PropagateControl, changes: Sequence[int]) -> None: + """ + Propagate method the is called when one the registered literals is propagated by clasp. Here useful information + about the decision progress is recorded to be visualized later. + """ + # pylint: disable=unused-argument + decisions, entailments = self.get_decisions(control.assignment) + + literal_sequence = [] + for d in decisions: + literal_sequence.append(d) + if d in entailments: + literal_sequence.append(list(entailments[d])) + decision_sequence = self.literal_to_decision_sequence(literal_sequence) + + self.callback_propagate(decision_sequence) + + def undo(self, thread_id: int, assignment: clingo.Assignment, changes: Sequence[int]) -> None: + """ + This function is called when one of the solvers decisions is undone. + """ + # pylint: disable=unused-argument + + self.callback_undo() + + def literal_to_decision(self, literal: int) -> Decision: + is_positive = literal >= 0 + symbol = self.literal_symbol_lookup.get(abs(literal)) + return Decision(literal=abs(literal), positive=is_positive, symbol=symbol) + + def literal_to_decision_sequence( + self, literal_sequence: List[Union[int, List[int]]] + ) -> List[Union[Decision, List[Decision]]]: + new_decision_sequence = [] + for element in literal_sequence: + if isinstance(element, int): + new_decision_sequence.append(self.literal_to_decision(element)) + elif isinstance(element, list): + new_decision_sequence.append([self.literal_to_decision(literal) for literal in element]) + return new_decision_sequence + + @staticmethod + def get_decisions(assignment: clingo.Assignment) -> Tuple[List[int], Dict[int, List[int]]]: + """ + Helper function to extract a list of decisions and entailments from a clingo propagator assignment. + """ + level = 0 + decisions = [] + entailments = {} + try: + while True: + decision = assignment.decision(level) + decisions.append(decision) + + trail = assignment.trail + level_offset_start = trail.begin(level) + level_offset_end = trail.end(level) + level_offset_diff = level_offset_end - level_offset_start + if level_offset_diff > 1: + entailments[decision] = trail[(level_offset_start + 1) : level_offset_end] + level += 1 + except RuntimeError: + return decisions, entailments