-
Notifications
You must be signed in to change notification settings - Fork 0
/
smart_ooasp.py
533 lines (471 loc) · 20.3 KB
/
smart_ooasp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
# Copyright (c) 2024 Siemens AG Oesterreich
# SPDX-License-Identifier: MIT
import os
import time
from collections import defaultdict
from clingo import Control, Function, Model, Number, parse_term
from clingo.statistics import StatisticsMap
from clingo.symbol import Symbol
from clingraph.clingo_utils import ClingraphContext
from clingraph.graphviz import compute_graphs, render
from clingraph.orm import Factbase
import ooasp.settings as settings
from ooasp.utils import green, red, subtitle, title
SMART_FUNCTIONS = {
"assoc_needs_object": {"type": "cautious", "arity": 6},
"global_ub_gap": {"type": "cautious", "arity": 3},
"global_lb_gap": {"type": "cautious", "arity": 3},
"association_possible": {"type": "brave", "arity": 4},
}
class SmartOOASPSolver:
"""
Basic functionalities for the SmartOOASP solver.
"""
def __init__(
self,
initial_objects=None,
smart_generation_functions=None,
verbose=False,
view=False,
ctl=None,
associations_with_priority=None,
):
"""
Initialize the solver.
Args:
initial_objects (list[str]): List of initial objects in the configuration
(Example: ["rack", "frame", "frame", "elementA", "elementB"]).
smart_generation_functions (list[str]): List of smart generation functions to use for incremental generation.
verbose (bool): Verbose output will show the logs
view (bool): If True, saves the solution as a PNG generated by clingraph
ctl (Control): The control object to use for the solver. This can come from the Application class or True
associations_with_priority (list[str]): List of associations which will be associated with priority (eg. for performance reasons).
"""
self.initial_objects = initial_objects if initial_objects is not None else []
self.smart_generation_functions = (
smart_generation_functions if smart_generation_functions is not None else []
)
self.associations_with_priority = (
associations_with_priority if associations_with_priority is not None else []
)
self.view = view
self.next_id = 1
self.assumptions = set()
self.model = None
self.shown_model = None
self.cautious = None
self.brave = None
self.times = {
"initialization": 0,
"smart_generation": {
"time": 0,
"cautious": 0,
"brave": 0,
"functions": {n: 0 for n in self.smart_generation_functions},
},
"ground": 0,
}
self.objects = defaultdict(int)
def log(*args):
if verbose:
print(*args)
self.log = log
if ctl is not None:
self.ctl = ctl
else:
self.ctl = Control(["--warn=none"])
@property
def stats(self) -> dict:
"""
Returns the statistics of the run.
"""
times = {
"initialization": round(self.times["initialization"], 3),
"smart_generation": {
"time": round(self.times["smart_generation"]["time"], 3),
"cautious": round(self.times["smart_generation"]["cautious"], 3),
"brave": round(self.times["smart_generation"]["brave"], 3),
"functions": {
k: round(v, 3)
for k, v in self.times["smart_generation"]["functions"].items()
},
},
"ground": round(self.times["ground"], 3),
}
considered_conseq = {"cautious": False, "brave": False}
for f in self.smart_generation_functions:
conseq_type = SMART_FUNCTIONS[f]["type"]
if not considered_conseq[conseq_type]:
considered_conseq[conseq_type] = True
times["smart_generation"]["functions"][f] = round(
(
times["smart_generation"]["functions"][f]
- times["smart_generation"][conseq_type]
),
3,
)
results = {
"#objects": self.next_id - 1,
"#objects_added_per_type": self.objects,
"times": times,
}
return results
@property
def assumption_list(self) -> list[Symbol]:
"""
List of assumptions for the solver. All association instances are used as assumptions.
"""
return [(parse_term(a), True) for a in self.assumptions]
def create_initial_objects(self) -> None:
"""
Creates the initial objects based on the input parameter by adding and grounding the objects.
"""
start = time.time()
for o in self.initial_objects:
self.add_object(o)
self.times["initialization"] = time.time() - start
def ground_forced_inclusion(self, o: str) -> None:
"""
Grounds the program corresponding to the inclusion of the object as a fact of ooasp_isa.
We use a different program since adding this as a fact to the right program with .add is not simple.
Args:
o (str): The name of the class of the object to ground.
"""
self.log(f"\t\tGrounding forced inclusion {self.next_id} {o}")
start = time.time()
self.ctl.ground([("include", [Number(self.next_id), Function(o, [])])])
self.times["ground"] += time.time() - start
def ground(self, o: str) -> None:
"""
Grounds the program corresponding to the new object.
It also releases the external for the previous object and assigns the new object as active
to account for the current point in the incremental grounding.
Args:
o (str): The name of the class of the object to ground.
"""
self.log(f"\t\tGrounding {self.next_id} {o}")
start = time.time()
self.ctl.ground([("domain", [Number(self.next_id), Function(o, [])])])
self.times["ground"] += time.time() - start
if self.next_id > 0:
self.ctl.release_external(Function("active", [Number(self.next_id - 1)]))
self.ctl.assign_external(Function("active", [Number(self.next_id)]), True)
def add_object(self, o: str, must_be_used: bool = True) -> None:
"""
Adds a new object to the configuration. This addition includes the user predicate
to know the class for the object that was added and distinguish it in the encodings.
Args:
o (str): The name of the class of the object to ground.
"""
obj_atom = f"ooasp_isa({o},{self.next_id})"
self.log(green(f"\t\tAdding object {obj_atom}"))
self.ctl.add(
"domain", [str(self.next_id), o], f"user({obj_atom})."
) # Needed for symmetry breaking
if must_be_used:
self.ground_forced_inclusion(o) # Used to improve performance
self.assumptions.add(obj_atom)
self.objects[o] += 1
self.ground(o)
self.next_id += 1
self.cautious = None
self.brave = None
def associate(self, association: tuple[str, int, int]) -> None:
"""_summary_
Associates two objects with a given association from the model.
Args:
association (tuple[str, int, int]): The association instance to add. The tuple contains the name of the association and the ids of the two objects to associate.
"""
assoc_atom = (
f"ooasp_associated({association[0]},{association[1]},{association[2]})"
)
self.log(green(f"\t\Associating {assoc_atom}"))
self.assumptions.add(assoc_atom)
self.cautious = None
self.brave = None
def choose_attribute_value(self, attr_data) -> None:
"""
Adds value to an object attribute.
"""
for assumption in self.assumptions:
if f"ooasp_attr_value({attr_data[0]},{attr_data[1]}" in assumption:
self.assumptions.remove(assumption)
attr_atom = (f"ooasp_attr_value({attr_data[0]},{attr_data[1]},{attr_data[2]})")
self.assumptions.add(attr_atom)
self.cautious = None
self.brave = None
def get_cautious(self) -> list[Symbol]:
"""
Obtains and stores the cautious consequences of the current configuration.
Returns:
list[clingo.Symbol]: List of symbols representing the cautious consequences.
"""
if self.cautious is not None:
return self.cautious
start = time.time()
self.ctl.assign_external(Function("check_potential_cv"), False)
self.ctl.configuration.solve.models = "0"
self.ctl.configuration.solve.enum_mode = "cautious"
with self.ctl.solve(
yield_=True,
assumptions=self.assumption_list,
on_statistics=self.on_statistics,
) as hdn:
for model in hdn:
self.cautious = model.symbols(shown=True)
if self.cautious is None:
raise Exception("UNSAT cautious!")
self.ctl.assign_external(Function("check_potential_cv"), True)
self.ctl.configuration.solve.models = "1"
self.ctl.configuration.solve.enum_mode = "auto"
self.times["smart_generation"]["cautious"] += time.time() - start
return self.cautious
def get_brave(self) -> list[Symbol]:
"""
Obtains and stores the brave consequences of the current configuration.
Returns:
list[clingo.Symbol]: List of symbols representing the brave consequences.
"""
if self.brave is not None:
return self.brave
start = time.time()
self.ctl.assign_external(Function("check_potential_cv"), False)
self.ctl.configuration.solve.models = "0"
self.ctl.configuration.solve.enum_mode = "brave"
with self.ctl.solve(
yield_=True,
assumptions=self.assumption_list,
on_statistics=self.on_statistics,
) as hdn:
for model in hdn:
self.brave = model.symbols(shown=True)
if self.brave is None:
raise Exception("UNSAT brave!")
self.ctl.assign_external(Function("check_potential_cv"), True)
self.ctl.configuration.solve.models = "1"
self.ctl.configuration.solve.enum_mode = "auto"
self.times["smart_generation"]["brave"] += time.time() - start
return self.brave
def save_png(
self, directory: str = "./out", suffix: str = "", extra_prg="", name="config"
) -> None:
"""
Saves the configuration as a png using clingraph
Args:
directory (str): The directory to save the png
suffix (str): The suffix to add to the name of the png
"""
if self.model:
config = "\n".join([str(c) for c in self.model])
else:
config = "\n".join([str(c) + "." for c in self.assumptions])
viz_ctl = Control(["--warn=none"])
fbs = []
path = settings.encodings_path.joinpath("viz_config.lp")
viz_ctl.load(str(path))
path = settings.encodings_path.joinpath("ooasp_aux_kb.lp")
viz_ctl.load(str(path))
viz_ctl.load("examples/racks/kb.lp")
viz_ctl.add("base", [], extra_prg)
viz_ctl.add("base", [], config)
viz_ctl.ground([("base", [])], ClingraphContext())
viz_ctl.solve(
on_model=lambda m: fbs.append(
Factbase.from_model(m, default_graph="config")
)
)
graphs = compute_graphs(fbs[0])
f = render(
graphs,
format="png",
name_format=name + suffix,
directory=directory,
view=True,
)
print(green(f"Visualization saved in {f['config']}"))
# ------------------------Smart Generation------------------------
def smart_generation(self) -> bool:
"""
Calls the smart generation functions in the order specified by the user.
It will stop once a function adds objects or associations.
Returns:
bool: True if any of the smart generation functions added objects or associations, False otherwise.
"""
self.log(subtitle("Smart generation"))
initial_assumptions = len(self.assumptions)
for f in self.smart_generation_functions:
start = time.time()
done = getattr(self, f)()
self.times["smart_generation"]["functions"][f] += time.time() - start
if done:
self.log(
f"Smart generation: added {len(self.assumptions) - initial_assumptions} assumptions"
)
return True
return False
def assoc_needs_object(self) -> bool:
"""
This is one of the smart functions that can be chosen for incremental generation.
The appearance of predicate assoc_needs_object(ID1, ASSOC, X, C2, SIDE, new_object)
in the cautious consequences indicates the need to add
at least X objects of type C2 which can be immediately associated to object ID1.
We choose the first appearance of this predicate and add the needed objects
for the selected object and association until the needed number of needed objects is reached.
We order the objects needed to give preference to association specializations
Returns:
bool: True if objects were added, False otherwise
"""
self.log("\t+++++ assoc_needs_object")
added_key = None
added = 0
cautious = self.get_cautious()
assoc_needs_object = [s for s in cautious if s.match("assoc_needs_object", 6)]
assoc_needs_object.sort(
key=lambda x: str(x.arguments[1]) not in self.associations_with_priority
)
for s in assoc_needs_object:
o_id, assoc, needed, c, opt, _ = s.arguments
if added_key is None:
self.log("\t ---> Apply ", s)
added_key = (o_id, assoc)
if added_key != (o_id, assoc):
continue
for _ in range(added, needed.number):
if str(opt) == "1":
a = (str(assoc), o_id, self.next_id)
else:
a = (str(assoc), self.next_id, o_id)
self.add_object(c.name)
self.associate(a)
added += 1
return added > 0
def global_ub_gap(self) -> bool:
"""
This is one of the smart functions that can be chosen for incremental generation.
The appearance of predicate global_ub_gap(C2, N, new_object)
in the cautious consequences indicates the need to add N objects of type C2
Given a target association ASSOC where each C2 can be associated to at most MAX objects of C1
and each C1 has to be associated to exactly one C2.
We count the global number of objects of class C2 and this is not enough to cover all C1 objects.
Therefore, the upper bound of classes C1 was already reached and we need to add N objects of class C2
to fill the gap.
Returns:
bool: True if objects were added, False otherwise
"""
self.log("\t+++++ global_ub_gap")
added_key = None
added = 0
cautious = self.get_cautious()
global_ub = [s for s in cautious if s.match("global_ub", 3)]
for s in global_ub:
c, needed, _ = s.arguments
if added_key is None:
self.log("\t ---> Apply ", s)
added_key = c
if added_key != c:
continue
for _ in range(added, needed.number):
self.add_object(c.name)
added += 1
return added > 0
def global_lb_gap(self) -> bool:
"""
This is one of the smart functions that can be chosen for incremental generation.
The appearance of predicate global_lb_gap(C1, N, new_object)
in the cautious consequences indicates the need to add N objects of type C1
Given a target association ASSOC where each C2 must be associated to at least MIN objects of C1
and each C1 has to be associated to exactly one C2.
We count the global number of objects of class C1 and this is not enough to cover all C2 lower bounds.
Therefore, the lower bound of classes C2 cant be filled and we need to add N objects of class C1.
to fill the gap.
Returns:
bool: True if objects were added, False otherwise
"""
self.log("\t+++++ global_lb_gap")
added_key = None
added = 0
cautious = self.get_cautious()
global_lb = [s for s in cautious if s.match("global_lb", 3)]
for s in global_lb:
c, needed, _ = s.arguments
if added_key is None:
self.log("\t ---> Apply ", s)
added_key = c
if added_key != c:
continue
for _ in range(added, needed.number):
self.add_object(c.name)
added += 1
return added > 0
def association_possible(self) -> bool:
"""
This is one of the smart functions that can be chosen for incremental generation.
The appearance of predicate association_possible(ASSOC, ID1, ID2, new_object)
in the brave consequences indicates the need to add an association between two objects.
We know that ID1 needs at least one object of class C1 and ID2 needs at least one object of class C2
We know that the objects ID1 and ID2 can potentially be associated by ASSOC
We also know that the classes of these objects was set by the user or the smart association.
This makes sure that the association added does not determine the classes of the objects if not previously given.
Associating objects this way restricts completeness of the solving approach by disregarding symmetric solutions.
Returns:
bool: True if associations were added, False otherwise
"""
self.log("\t+++++ association_possible")
brave = self.get_brave()
for s in brave:
if s.match("association_possible", 4):
self.log("\t ---> Apply ", s)
assoc, id1, id2, _ = s.arguments
a = (str(assoc), id1, id2)
self.associate(a)
return True
return False
def load_base(self) -> None:
"""
Loads the base encodings to solve the configuration
"""
encodings_path = os.path.join("ooasp", "encodings", "ooasp.lp")
self.ctl.load(encodings_path)
self.ctl.ground([("base", [])])
def on_model(self, m: Model) -> None:
"""
Callback when a model is found
"""
self.model = [str(s) + "." for s in m.symbols(atoms=True)]
def on_statistics(self, step: StatisticsMap, accu: StatisticsMap) -> None:
"""
Callback when the statistics are updated
"""
update_dict = {"OOASP": self.stats}
accu.update(update_dict)
def smart_complete(self) -> None:
"""
Iterates over the smart generation and solving steps to complete the configuration.
It stops when a solution is found.
"""
done = False
iteration = 0
while not done:
iteration += 1
self.log("\n" + title(f"Next iteration: {self.next_id - 1} objects"))
start = time.time()
things_done = self.smart_generation()
self.times["smart_generation"]["time"] += time.time() - start
if things_done:
continue
self.log(subtitle(f"Solving for size {self.next_id - 1}...", "RED"))
self.ctl.configuration.solve.models = "1"
self.ctl.assign_external(Function("check_potential_cv"), True)
with self.ctl.solve(
assumptions=self.assumption_list,
on_model=self.on_model,
yield_=True,
on_statistics=self.on_statistics,
) as hdl:
if hdl.get().satisfiable:
self.log(green("SAT"))
done = True
else:
self.log(red("UNSAT"))
self.add_object("object")
continue