From c2f9cb02a4e6705d381579ad1cd572e283e5068b Mon Sep 17 00:00:00 2001 From: MangaBoba Date: Fri, 15 Sep 2023 22:03:11 +0300 Subject: [PATCH] fix parallel execution, fix modules compatibility --- gefest/core/geometry/datastructs/polygon.py | 14 +- gefest/core/geometry/domain.py | 30 ++-- gefest/core/geometry/geometry.py | 2 +- gefest/core/geometry/geometry_2d.py | 37 ++++- gefest/core/geometry/utils.py | 58 +++++-- gefest/core/opt/adapters/structure.py | 10 +- gefest/core/opt/gen_design.py | 83 ---------- gefest/core/opt/operators/mutations.py | 108 ++++++++++--- gefest/core/opt/operators/selections.py | 2 +- gefest/core/opt/strategies/crossover.py | 62 +++----- gefest/core/opt/strategies/mutation.py | 54 ++----- gefest/core/opt/strategies/strategy.py | 8 +- gefest/core/utils/__init__.py | 2 +- gefest/core/utils/functions.py | 21 ++- gefest/core/utils/mp_manager.py | 108 ------------- gefest/core/utils/parallel_manager.py | 45 ++++++ gefest/core/viz/struct_vizualizer.py | 47 ++++-- refactored.py | 167 ++++++++++---------- 18 files changed, 414 insertions(+), 444 deletions(-) delete mode 100644 gefest/core/opt/gen_design.py delete mode 100644 gefest/core/utils/mp_manager.py create mode 100644 gefest/core/utils/parallel_manager.py diff --git a/gefest/core/geometry/datastructs/polygon.py b/gefest/core/geometry/datastructs/polygon.py index 52b368520..11af66a8b 100644 --- a/gefest/core/geometry/datastructs/polygon.py +++ b/gefest/core/geometry/datastructs/polygon.py @@ -9,13 +9,13 @@ class PolyID(Enum): - TEMP = "tmp" - CONSTR = "constraint" - FIXED_AREA = "fixed_area" - FIXED_POLY = "fixed_poly" - PROH_AREA = "prohibited_area" - PROH_TARG = "prohibited_target" - PROH_POLY = "prohibited_poly" + TEMP = 'tmp' + CONSTR = 'constraint' + FIXED_AREA = 'fixed_area' + FIXED_POLY = 'fixed_poly' + PROH_AREA = 'prohibited_area' + PROH_TARG = 'prohibited_target' + PROH_POLY = 'prohibited_poly' @dataclass diff --git a/gefest/core/geometry/domain.py b/gefest/core/geometry/domain.py index 0c9494533..777d19977 100644 --- a/gefest/core/geometry/domain.py +++ b/gefest/core/geometry/domain.py @@ -2,7 +2,7 @@ from typing import Optional, Union import yaml -from pydantic import Field, computed_field, field_validator, model_validator +from pydantic import Field, computed_field, field_validator from pydantic.dataclasses import dataclass from pydantic_yaml import parse_yaml_raw_as @@ -13,12 +13,14 @@ @dataclass class Domain: allowed_area: Union[Polygon, list[list[float]]] - name: str = "main" + name: str = 'main' min_poly_num: int = 2 max_poly_num: int = 4 min_points_num: int = 20 max_points_num: int = 50 - prohibited_area: Optional[Structure] = None + polygon_side: int = 0.05 + min_dist_from_boundary: float = 1.0 + prohibited_area: Optional[Structure] = Field(default=Structure([])) fixed_points: Optional[Union[Polygon, list[list[float]]]] = Field(default_factory=list) geometry: Optional[Geometry2D] = Geometry2D() @@ -27,7 +29,7 @@ def parse_raw(self, cfg_file: Path): with open(cfg_file) as f: cfg = yaml.safe_load(f) - if "domain" not in cfg: + if 'domain' not in cfg: raise AttributeError("No 'domain' section {cfg_file} config file.") return parse_yaml_raw_as(Domain, yaml.dump(cfg['domain'])) @@ -44,22 +46,30 @@ def __contains__(self, point: Point): def __post_init__(self): if self.min_poly_num > self.max_poly_num: - raise ValueError("Invalid points number interval.") + raise ValueError('Invalid points number interval.') if self.min_points_num > self.max_points_num: - raise ValueError("Invalid points number interval.") + raise ValueError('Invalid points number interval.') - @field_validator("fixed_points") + @field_validator('fixed_points') def parse_allowed_area(cls, data: Union[Polygon, list[list[float]]]): if isinstance(data, Polygon): return data return Polygon([Point(*coords) for coords in data]) - @field_validator("allowed_area") + @field_validator('allowed_area') def parse_allowed_area(cls, data: Union[Polygon, list[list[float]]]): if data is None or len(data) <= 2: - raise ValueError("Not enough points for allowed_area.") + raise ValueError('Not enough points for allowed_area.') return Polygon([Point(*coords) for coords in data]) + @computed_field + def dist_between_polygons(self) -> float: + return max(self.max_x - self.min_x, self.max_y - self.min_y) / 35 + + @computed_field + def dist_between_points(self) -> float: + return self.dist_between_polygons * 15 * self.polygon_side + @computed_field def min_x(self) -> int: return min(p.x for p in self.allowed_area) @@ -86,4 +96,4 @@ def len_y(self) -> int: @computed_field def bound_poly(self) -> Polygon: - return self.allowed_area \ No newline at end of file + return self.allowed_area diff --git a/gefest/core/geometry/geometry.py b/gefest/core/geometry/geometry.py index 9b6f56909..837dbdb61 100644 --- a/gefest/core/geometry/geometry.py +++ b/gefest/core/geometry/geometry.py @@ -32,7 +32,7 @@ def get_square(self, polygon: Polygon): pass @abstractmethod - def is_contain_point(self, poly: Polygon, point: "Point"): + def is_contain_point(self, poly: Polygon, point: Point): pass @abstractmethod diff --git a/gefest/core/geometry/geometry_2d.py b/gefest/core/geometry/geometry_2d.py index a56609e35..d7c560fb8 100755 --- a/gefest/core/geometry/geometry_2d.py +++ b/gefest/core/geometry/geometry_2d.py @@ -4,11 +4,13 @@ import numpy as np from pydantic import BaseModel from pydantic.dataclasses import dataclass -from shapely import affinity +from shapely import affinity, get_parts +from shapely.affinity import scale from shapely.geometry import LineString, MultiLineString from shapely.geometry import Point as GeomPoint from shapely.geometry import Polygon as GeomPolygon -from shapely.ops import nearest_points +from shapely.geometry import mapping +from shapely.ops import nearest_points, split from gefest.core.geometry import Point, Polygon, Structure @@ -23,6 +25,7 @@ class Geometry2D(Geometry): (first Point is equal to the last one), otherwise ``False``. Default value is ``True`` """ + is_closed: bool = True def get_length(self, polygon: Polygon): @@ -94,6 +97,19 @@ def resize_poly( return rescaled_poly + def rotate_point( + self, + point: Point, + origin: Point, + angle: float, + ) -> Polygon: + rotated = affinity.rotate( + GeomPoint(point.x, point.y), + angle, + GeomPoint(origin.x, origin.y), + ) + return Point(rotated.x, rotated.y) + def rotate_poly( self, poly: Polygon, @@ -264,6 +280,23 @@ def _pt_to_geom(self, pt: Point) -> GeomPoint: """ return GeomPoint(pt.x, pt.y) + def split_polygon(self, poly, line: tuple[Point, Point], scale_factor=1000): + poly = GeomPolygon([(p.x, p.y) for p in poly]) + line = LineString( + [ + (line[0].x, line[0].y), + (line[1].x, line[1].y), + ], + ) + line = scale( + line, + scale_factor, + scale_factor, + ) + parts = get_parts(split(poly, line)).tolist() + parts = list(map(lambda p: list(mapping(p)['coordinates'][0][:-1]), parts)) + return parts + def min_distance(self, obj_1, obj_2) -> float: """Smallest distance between two objects Args: diff --git a/gefest/core/geometry/utils.py b/gefest/core/geometry/utils.py index 3044c60b0..7e6457439 100644 --- a/gefest/core/geometry/utils.py +++ b/gefest/core/geometry/utils.py @@ -4,6 +4,7 @@ from typing import Optional import numpy as np +from loguru import logger from gefest.core.geometry import Point, Polygon, Structure from gefest.core.geometry.geometry_2d import Geometry2D @@ -11,7 +12,7 @@ from .domain import Domain -def get_random_structure(domain: Domain) -> Structure: +def get_random_structure(domain: Domain, **kwargs) -> Structure: # Creating structure with random number of polygons structure = Structure(polygons=[]) @@ -24,10 +25,6 @@ def get_random_structure(domain: Domain) -> Structure: structure.polygons.append(polygon) else: continue - - for poly in structure: - print(poly[0], poly[-1]) - return structure @@ -82,21 +79,45 @@ def get_random_point(polygon: Polygon, structure: Structure, domain: Domain) -> def create_poly(centroid: Point, sigma: int, domain: Domain, geometry: Geometry2D) -> Polygon: # Creating polygon in the neighborhood of the centroid # sigma defines neighborhood - num_points = randint( - domain.min_points_num, domain.max_points_num, + domain.min_points_num, + domain.max_points_num, ) # Number of points in a polygon points = [] - for _ in range(num_points): - point = create_polygon_point(centroid, sigma) # point in polygon - while not in_bound(point, domain): # checking if a point is in domain + cntr = 0 + while len(points) < num_points: + cntr += 1 + + point = create_polygon_point(centroid, sigma) + while not in_bound(point, domain): point = create_polygon_point(centroid, sigma) points.append(point) - if domain.geometry.is_closed: - points.append(points[0]) - - poly = geometry.get_convex(Polygon(points=points)) # avoid self intersection in polygon - + ind = len(points) - 1 + if ind > 0: + if ( + np.linalg.norm( + np.array(points[ind].coords[:2]) - np.array(points[ind - 1].coords[:2]), ord=1 + ) + < domain.dist_between_points + ): + del points[ind] + if len(points) == num_points: + if ( + np.linalg.norm( + np.array(points[-1].coords[:2]) - np.array(points[0].coords[:2]), ord=1 + ) + < domain.dist_between_points + ): + del points[-1] + if len(points) == num_points: + if domain.geometry.is_closed: + points.append(points[0]) + poly = geometry.get_convex(Polygon(points=points)) + points = poly.points + if cntr > 100 and len(points) > 4: + break + + # logger.info(f'Create poly finish, {cntr} iterations.') return poly @@ -117,7 +138,9 @@ def create_area(domain: Domain, structure: Structure, geometry: Geometry2D) -> ( """ centroid = create_random_point(domain) min_dist = distance( - centroid, structure, geometry, + centroid, + structure, + geometry, ) # Distance to the nearest polygon in the structure max_attempts = 20 while min_dist < 2.5 * sigma: @@ -149,7 +172,8 @@ def create_random_point(domain: Domain) -> Point: def create_polygon_point(centroid: Point, sigma: int) -> Point: # Creating polygon point inside the neighborhood defined by the centroid point = Point( - np.random.normal(centroid.x, sigma, 1)[0], np.random.normal(centroid.y, sigma, 1)[0], + np.random.normal(centroid.x, sigma, 1)[0], + np.random.normal(centroid.y, sigma, 1)[0], ) return point diff --git a/gefest/core/opt/adapters/structure.py b/gefest/core/opt/adapters/structure.py index 3bd8d0b23..e96bc9200 100644 --- a/gefest/core/opt/adapters/structure.py +++ b/gefest/core/opt/adapters/structure.py @@ -16,11 +16,11 @@ def __init__(self): def _point_to_node(self, point): # Prepare content for nodes if isinstance(point, OptNode): - self._log.warn("Unexpected: OptNode found in adapter instead" "Point.") + self._log.warn('Unexpected: OptNode found in adapter instead' 'Point.') else: - content = {"name": f"pt_{point.x}_{point.y}", "params": {}} + content = {'name': f'pt_{point.x}_{point.y}', 'params': {}} node = OptNode(content=content) - node.content["params"] = {"x": point.x, "y": point.y} + node.content['params'] = {'x': point.x, 'y': point.y} return node def _adapt(self, adaptee: Structure): @@ -39,7 +39,9 @@ def _adapt(self, adaptee: Structure): return graph def _restore( - self, opt_graph: OptGraph, metadata: Optional[Dict[str, Any]] = None, + self, + opt_graph: OptGraph, + metadata: Optional[Dict[str, Any]] = None, ) -> Structure: """Convert OptGraph class into Structure class""" structure = [] diff --git a/gefest/core/opt/gen_design.py b/gefest/core/opt/gen_design.py deleted file mode 100644 index c2201b320..000000000 --- a/gefest/core/opt/gen_design.py +++ /dev/null @@ -1,83 +0,0 @@ -import os -import pickle -import shutil -from pathlib import Path - -from tqdm import tqdm - - -def design(n_steps: int, pop_size: int, estimator, sampler, optimizer, extra=False): - """ - Generative design procedure - :param n_steps: (Int) number of generative design steps - :param pop_size: (Int) number of samples in population - :param estimator: (Object) estimator with .estimate() method - :param sampler: (Object) sampler with .sample() method - :param optimizer: (Object) optimizer with .optimize() method - :param extra: (Bool) flag for extra sampling - :return: (List[Structure]) designed samples - """ - - def _save_res(performance, samples): - """ - Saving results in pickle format - :param performance: (List), performance of samples - :param samples: (List), samples to save - :return: None - """ - with open(Path(path, f"performance_{i}.pickle"), "wb") as handle: - pickle.dump(performance, handle, protocol=pickle.HIGHEST_PROTOCOL) - - with open(Path(path, f"population_{i}.pickle"), "wb") as handle: - pickle.dump(samples, handle, protocol=pickle.HIGHEST_PROTOCOL) - - return - - def _remain_best(performance, samples): - """ - From current population we remain best only - :param performance: (List), performance of samples - :param samples: (List), samples to save - :return: (Tuple), performance and samples - """ - # Combination of performance and samples - perf_samples = list(zip(performance, samples)) - - # Sorting with respect to performance - sorted_pop = sorted(perf_samples, key=lambda x: x[0])[:pop_size] - - performance = [x[0] for x in sorted_pop] - samples = [x[1] for x in sorted_pop] - - return performance, samples - - path = "HistoryFiles" - - if os.path.exists(path): - shutil.rmtree(path) - os.makedirs(path) - - samples = sampler.sample(n_samples=pop_size) - - for i in tqdm(range(n_steps)): - performance = estimator.estimate(population=samples) - - # Choose best and save the results - performance, samples = _remain_best(performance, samples) - print(f"\nBest performance is {performance[0]}") - - _save_res(performance, samples) - - if optimizer: - samples = optimizer.step(population=samples, performance=performance, n_step=i) - - # Extra sampling if necessary - # or if optimizer is missing - if not optimizer or extra: - if not optimizer: - samples = sampler.sample(n_samples=pop_size) - else: - extra_samples = sampler.sample(n_samples=pop_size) - samples = samples + extra_samples - - return samples diff --git a/gefest/core/opt/operators/mutations.py b/gefest/core/opt/operators/mutations.py index 4371e4472..acb90d6da 100644 --- a/gefest/core/opt/operators/mutations.py +++ b/gefest/core/opt/operators/mutations.py @@ -1,8 +1,5 @@ import copy import random -from copy import deepcopy -from enum import Enum -from functools import partial from typing import Callable import numpy as np @@ -19,8 +16,20 @@ def mutate_structure( mutations: list[Callable], mutation_chance: float, mutations_probs: list[int], -): - +) -> Structure: + """Apply mutation for polygons in structure. + + Args: + structure (Structure): _description_ + domain (Domain): _description_ + mutations (list[Callable]): _description_ + mutation_chance (float): _description_ + mutations_probs (list[int]): _description_ + + Returns: + Structure: Mutated structure. It is not guaranteed + that the resulting structure will be valid, dont + """ new_structure = copy.deepcopy(structure) for _ in enumerate(range(len(new_structure))): @@ -33,6 +42,8 @@ def mutate_structure( ) new_structure = chosen_mutation[0](new_structure, domain, idx_) + return new_structure + def rotate_poly(new_structure: Structure, domain: Domain, idx_: int = None) -> Structure: angle = float(np.random.randint(-120, 120)) @@ -81,37 +92,88 @@ def resize_poly( return new_structure +from math import cos, pi, sin, sqrt + + +def random_polar(rscale, dx, dy): + theta = random.random() * 2 * pi + r = random.random() * rscale + return (r * cos(theta)) + dx, (r * sin(theta)) + dy + + def pos_change_point_mutation( new_structure: Structure, domain: Domain, idx_: int = None, ) -> Structure: - mutate_point_idx = int(np.random.randint(0, len(new_structure[idx_]))) - # Neighborhood to reposition - eps_x = round(domain.len_x / 10) - eps_y = round(domain.len_y / 10) - structure = copy.deepcopy(new_structure) - # Displacement in the neighborhood - displacement_x = random.randint(-eps_x, eps_x) - displacement_y = random.randint(-eps_y, eps_y) - - x_new = structure.polygons[idx_].points[mutate_point_idx].x + displacement_x - y_new = structure.polygons[idx_].points[mutate_point_idx].y + displacement_y - - i = 20 # Number of attempts to change the position of the point + mutate_point_idx = int(np.random.randint(0, len(structure[idx_]))) + if mutate_point_idx == len(structure[idx_]) - 1: + neighbour_left = mutate_point_idx - 1 + neighbour_right = 0 + elif mutate_point_idx == 0: + neighbour_left = len(new_structure[idx_]) - 1 + neighbour_right = 1 + else: + neighbour_left = mutate_point_idx - 1 + neighbour_right = mutate_point_idx + 1 + + x1 = structure[idx_][neighbour_left].x + y1 = structure[idx_][neighbour_left].y + x2 = structure[idx_][neighbour_right].x + y2 = structure[idx_][neighbour_right].y + base_x = structure[idx_][mutate_point_idx].x + base_y = structure[idx_][mutate_point_idx].y + + dx, dy = (x1 - x2) / 2, (y1 - y2) / 2 + d = sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2) + + delta_point = random_polar((d / 2), dx, dy) + x_new, y_new = base_x + delta_point[0], base_y + delta_point[1] + + i = 20 while Point(x_new, y_new) not in domain: - x_new = structure.polygons[idx_].points[mutate_point_idx].x + displacement_x - y_new = structure.polygons[idx_].points[mutate_point_idx].y + displacement_y + delta_point = random_polar((d / 2), dx, dy) + x_new, y_new = base_x + delta_point[0], base_y + delta_point[1] i -= 1 if i == 0: - return new_structure + structure[idx_][mutate_point_idx].x += delta_point[0] + structure[idx_][mutate_point_idx].y += delta_point[1] + return structure + + structure[idx_][mutate_point_idx].x += delta_point[0] + structure[idx_][mutate_point_idx].y += delta_point[1] + + # return structure + + # ''' + # mutate_point_idx = int(np.random.randint(0, len(new_structure[idx_]))) + # # Neighborhood to reposition + # eps_x = round(domain.len_x / 10) + # eps_y = round(domain.len_y / 10) + + # structure = copy.deepcopy(new_structure) + + # # Displacement in the neighborhood + # displacement_x = random.randint(-eps_x, eps_x) + # displacement_y = random.randint(-eps_y, eps_y) + + # x_new = structure.polygons[idx_].points[mutate_point_idx].x + displacement_x + # y_new = structure.polygons[idx_].points[mutate_point_idx].y + displacement_y - structure.polygons[idx_].points[mutate_point_idx].x = x_new - structure.polygons[idx_].points[mutate_point_idx].y = y_new + # i = 20 # Number of attempts to change the position of the point + # while Point(x_new, y_new) not in domain: + # x_new = structure.polygons[idx_].points[mutate_point_idx].x + displacement_x + # y_new = structure.polygons[idx_].points[mutate_point_idx].y + displacement_y + # i -= 1 + # if i == 0: + # return new_structure + # structure.polygons[idx_].points[mutate_point_idx].x = x_new + # structure.polygons[idx_].points[mutate_point_idx].y = y_new + # ''' # from gefest.core.viz.struct_vizualizer import StructVizualizer # from matplotlib import pyplot as plt diff --git a/gefest/core/opt/operators/selections.py b/gefest/core/opt/operators/selections.py index b47219b3e..2e20a779c 100644 --- a/gefest/core/opt/operators/selections.py +++ b/gefest/core/opt/operators/selections.py @@ -36,7 +36,7 @@ def tournament_selection( pop_size: int, fraction: float = 0.1, ) -> list[Structure]: - """ Selects the best ones from provided population. + """Selects the best ones from provided population. Args: pop (list[Structure]): population diff --git a/gefest/core/opt/strategies/crossover.py b/gefest/core/opt/strategies/crossover.py index a9de594ef..ad28e2a36 100644 --- a/gefest/core/opt/strategies/crossover.py +++ b/gefest/core/opt/strategies/crossover.py @@ -1,17 +1,18 @@ import copy +from functools import partial from typing import Callable import numpy as np from gefest.core.geometry import Structure -from gefest.core.utils import WorkerData, where +from gefest.core.utils import chain, where +from gefest.core.utils.parallel_manager import BaseParallelDispatcher from .strategy import Strategy class CrossoverStrategy(Strategy): def __init__(self, opt_params): - super().__init__(opt_params.workers_manager) self.prob = opt_params.crossover_prob self.crossovers = opt_params.crossovers @@ -19,57 +20,32 @@ def __init__(self, opt_params): self.postprocess: Callable = opt_params.postprocessor self.parent_pairs_selector: Callable = opt_params.pair_selector self.sampler: Callable = opt_params.sampler - self.attempts = 3 + self.postprocess_attempts = opt_params.postprocess_attempts + self._pm = BaseParallelDispatcher(opt_params.n_jobs) def __call__(self, pop: list[Structure]) -> list[Structure]: return self.crossover(pop=pop) def crossover(self, pop: list[Structure]): - pairs_to_crossover = copy.deepcopy(self.parent_pairs_selector(pop)) - new_generation = np.full(len(pairs_to_crossover), None) - chosen_crossovers = np.random.choice( + + chosen_crossover = np.random.choice( a=self.crossovers, - size=len(pairs_to_crossover), + size=1, p=self.each_prob, - ) + )[0] + pairs = copy.deepcopy(self.parent_pairs_selector(pop)) - chosen_crossovers = [(cm, self.postprocess) for cm in chosen_crossovers] - children, _ = self._mp( - [ - WorkerData(funcs, idx, args) - for funcs, idx, args in zip( - chosen_crossovers, - range(len(pairs_to_crossover)), - pairs_to_crossover, - ) - ], + new_generation = self._pm.exec_parallel( + func=chain(chosen_crossover, partial(self.postprocess, attempts=3)), + arguments=pairs, + use=True, ) - succes_crossover_ids = where(children, lambda ind: ind != None) - for idx in succes_crossover_ids: - new_generation[idx] = children[idx] + idx_failed = where(new_generation, lambda ind: ind is None) + if len(idx_failed) > 0: + generated = self.sampler(len(idx_failed)) + for enum_id, idx in enumerate(idx_failed): + new_generation[idx] = generated[enum_id] - for _ in range(self.attempts): - failed_idx = where(new_generation, lambda ind: ind == None) - if len(failed_idx) > 0: - children, _ = self._mp( - [ - WorkerData(funcs, idx, args) - for funcs, idx, args in zip( - [chosen_crossovers[idx] for idx in failed_idx], - failed_idx, - [pairs_to_crossover[idx] for idx in failed_idx], - ) - ], - ) - succes_crossover_ids = where(children, lambda ind: ind != None) - for idx in succes_crossover_ids: - new_generation[idx] = children[idx] - else: - break - failed_idx = where(new_generation, lambda ind: ind == None) - if len(failed_idx) > 0: - new_generation[failed_idx] = self.sampler(len(failed_idx)) pop.extend(new_generation) - return pop diff --git a/gefest/core/opt/strategies/mutation.py b/gefest/core/opt/strategies/mutation.py index 816265f1a..ba6fdff98 100644 --- a/gefest/core/opt/strategies/mutation.py +++ b/gefest/core/opt/strategies/mutation.py @@ -4,28 +4,29 @@ from gefest.core.geometry import Structure from gefest.core.opt.operators.mutations import mutate_structure -from gefest.core.utils import WorkerData, where +from gefest.core.utils import chain, where +from gefest.core.utils.parallel_manager import BaseParallelDispatcher from .strategy import Strategy class MutationStrategy(Strategy): def __init__(self, opt_params): - super().__init__(opt_params.workers_manager) + self.domain = opt_params.domain self.mutation_prob = opt_params.mutation_prob self.mutations = opt_params.mutations self.each_prob = opt_params.mutation_each_prob self.postprocess: Callable = opt_params.postprocessor self.sampler = opt_params.sampler - self.postprocess_attempts = 3 + self.postprocess_attempts = opt_params.postprocess_attempts + self._pm = BaseParallelDispatcher(opt_params.n_jobs) def __call__(self, pop: list[Structure]) -> list[Structure]: return self.mutate(pop=pop) def mutate(self, pop: list[Structure]): - mutated_pop = copy.deepcopy(pop) mutator = partial( mutate_structure, domain=self.domain, @@ -33,45 +34,18 @@ def mutate(self, pop: list[Structure]): mutation_chance=self.mutation_prob, mutations_probs=self.each_prob, ) - chosen_mutations = [(mutator, self.postprocess) for _ in range(len(pop))] + pop_ = copy.deepcopy(pop) - mutated, _ = self._mp( - [ - WorkerData(funcs, idx, args) - for funcs, idx, args in zip( - chosen_mutations, - range(len(pop)), - pop, - ) - ], + mutated_pop = self._pm.exec_parallel( + func=chain(mutator, partial(self.postprocess, attempts=3)), + arguments=pop_, + use=True, ) - succes_mutated_ids = where(mutated, lambda ind: ind != None) - for idx in succes_mutated_ids: - mutated_pop[idx] = mutated[idx] - - for _ in range(self.postprocess_attempts): - failed_idx = where(mutated, lambda ind: ind == None) - if len(failed_idx) > 0: - mutated, _ = self._mp( - [ - WorkerData(funcs, idx, args) - for funcs, idx, args in zip( - [(self.postprocess,) for idx in failed_idx], - failed_idx, - [mutated_pop[idx] for idx in failed_idx], - ) - ], - ) - - succes_mutated_ids = where(mutated, lambda ind: ind != None) - for idx in succes_mutated_ids: - mutated_pop[idx] = mutated[idx] - - failed_idx = where(mutated, lambda ind: ind == None) - if len(failed_idx) > 0: - generated = self.sampler(len(failed_idx)) - for enum_id, idx in enumerate(failed_idx): + idx_failed = where(mutated_pop, lambda ind: ind is None) + if len(idx_failed) > 0: + generated = self.sampler(len(idx_failed)) + for enum_id, idx in enumerate(idx_failed): mutated_pop[idx] = generated[enum_id] return mutated_pop diff --git a/gefest/core/opt/strategies/strategy.py b/gefest/core/opt/strategies/strategy.py index 5b9146723..9d4227809 100644 --- a/gefest/core/opt/strategies/strategy.py +++ b/gefest/core/opt/strategies/strategy.py @@ -2,16 +2,10 @@ from typing import Any from gefest.core.geometry import Structure -from gefest.core.utils import WorkersManager class Strategy(metaclass=ABCMeta): - """Abstract class for algorithm steps. - Provides shared multiprocessing pool. - """ - - def __init__(self, wm: WorkersManager): - self._mp: WorkersManager = wm + """Abstract class for algorithm steps.""" @abstractmethod def __call__( diff --git a/gefest/core/utils/__init__.py b/gefest/core/utils/__init__.py index e1faf2c27..d9e3867e1 100644 --- a/gefest/core/utils/__init__.py +++ b/gefest/core/utils/__init__.py @@ -1,2 +1,2 @@ from .functions import chain, project_root, where -from .mp_manager import WorkerData, WorkersManager +from .parallel_manager import BaseParallelDispatcher diff --git a/gefest/core/utils/functions.py b/gefest/core/utils/functions.py index adbe2de3a..6538fd1f4 100644 --- a/gefest/core/utils/functions.py +++ b/gefest/core/utils/functions.py @@ -8,14 +8,19 @@ def project_root() -> Path: return Path(__file__).parent.parent.parent -# def chain(*funcs): -# def chained_call(args): -# return reduce(lambda r, f: f(r), funcs, args) -# return chained_call - - -def where(array: list[Any], condition: Callable[[Callable], bool]) -> list[int]: - return [idx for idx, ind in enumerate(array) if condition(ind)] +def where( + sequence: list[Any], + mask_rule: Callable[[Callable], bool], +) -> list[int]: + """Finds indexes of values in a sequence satisfying the mask_rule. + Args: + sequence (list[Any]): list of values + mask_rule (Callable[[Callable], bool]): rule for selecting values + (e.g. lambda val: val is not None) + Returns: + list[int]: _description_ + """ + return [idx for idx, ind in enumerate(sequence) if mask_rule(ind)] def chain(*funcs): diff --git a/gefest/core/utils/mp_manager.py b/gefest/core/utils/mp_manager.py deleted file mode 100644 index 20f87f0a1..000000000 --- a/gefest/core/utils/mp_manager.py +++ /dev/null @@ -1,108 +0,0 @@ -import multiprocessing -from typing import Any, Callable, Optional - -from loguru import logger -from pydantic.dataclasses import dataclass - -from gefest.core.utils import chain - - -@dataclass -class WorkerData: - funcs: list[Callable] - idx: int - initial_args: Optional[Any] = None - - -class WorkersManager: - def __init__( - self, - num_workers: Optional[int] = None, - worker_func: Optional[Callable] = None, - ) -> None: - self.worker_func = worker_func if worker_func else WorkersManager.worker - self.num_workers = num_workers if num_workers else multiprocessing.cpu_count() - self.buffer_size = self.num_workers - self.pool = multiprocessing.Pool(processes=self.num_workers) - self.mpm = multiprocessing.Manager() - self.processing_queue = self.mpm.Queue() - self.result_queue = self.mpm.Queue() - self.__out__queue_expected_size = 0 - self.workers = [ - self.pool.apply_async( - self.worker_func, - (self.processing_queue, self.result_queue), - ) - for _ in range(self.num_workers) - ] - - def __del__(self, *args): - self.pool.close() - self.pool.terminate() - - def get_queue_size(self) -> int: - return self.__out__queue_expected_size - - def _queue_pop(self, args) -> None: - self.__out__queue_expected_size += 1 - self.processing_queue.put(args) - - @logger.catch - def _queue_get(self): - self.__out__queue_expected_size -= 1 - return self.result_queue.get() - - @staticmethod - @logger.catch - def worker( - processing_queue: multiprocessing.Queue, - result_queue: multiprocessing.Queue, - ) -> None: - """Executes given func with it params. - Args: - processing_queue: multiprocessing.Queue - input queue - result_queue: multiprocessing.Queue - output queue - Processing_queue provides data of the form: - tuple(list[Callable], int, Optional[Any]) with - funcs, idx, funcs arguments - """ - while True: - data: WorkerData = processing_queue.get() - if data.initial_args is None: - output = chain(*data.funcs)() - else: - output = chain(*data.funcs)(data.initial_args) - - result_queue.put((output, data.idx)) - - def __call__(self, workers_data: list[WorkerData]): - return self.multiprocess(workers_data) - - def multiprocess(self, workers_data: list[WorkerData]): - """Executes provided functions with args in parallel. - Args: - args: tuple, contains some func, (e.g. mutation, crossover operators), - its arguments and index - Returns: - results: [list, list], contains provided functions outputs and indices they sorted by - """ - res_ids_pairs = [] - - for idx, operand in enumerate(workers_data): - - self._queue_pop(operand) - if idx < self.buffer_size: - continue - - else: - res_ids_pairs.append(self._queue_get()) - - for _ in range(self.get_queue_size()): - res_ids_pairs.append(self._queue_get()) - - if len(res_ids_pairs) == 0: - print('err') - res_ids_pairs = sorted(res_ids_pairs, key=lambda x: x[1]) - res, ids = list(zip(*res_ids_pairs)) - - return list(res), list(ids) diff --git a/gefest/core/utils/parallel_manager.py b/gefest/core/utils/parallel_manager.py new file mode 100644 index 000000000..d02b581a8 --- /dev/null +++ b/gefest/core/utils/parallel_manager.py @@ -0,0 +1,45 @@ +from typing import Any, Callable + +from joblib import Parallel, cpu_count, delayed +from loguru import logger + + +class BaseParallelDispatcher: + """Provides interface for parallel execution.""" + + def __init__(self, n_jobs: int = -1): + self.n_jobs = self._determine_n_jobs(n_jobs) + + # def __call__(self, *args, **kwargs) -> list[Any]: + # return self.exec_parallel(*args, **kwargs) + + def _determine_n_jobs(self, n_jobs: int = -1): + if n_jobs > cpu_count() or n_jobs == -1: + n_jobs = cpu_count() + return n_jobs + + @logger.catch + def exec_parallel( + self, + func: Callable, + arguments: list[Any], + use: bool = True, + ) -> list[Any]: + """Executes provided function in parallel. + + Args: + func (Callable): Function to execute. + arguments (list[Any]): Each element in list is arguments for separate func call. + use (bool, optional): If True, each arg in args will be used as func argument, + otherwise func will be called without arguments len(args) times. Defaults to True. + + Returns: + list[Any] + """ + + parallel = Parallel(n_jobs=self.n_jobs, verbose=0, pre_dispatch='2*n_jobs') + if use: + result = parallel(delayed(func)(arg) for arg in arguments) + else: + result = parallel(delayed(func)() for _ in arguments) + return result diff --git a/gefest/core/viz/struct_vizualizer.py b/gefest/core/viz/struct_vizualizer.py index 985a6a861..f9c912fee 100644 --- a/gefest/core/viz/struct_vizualizer.py +++ b/gefest/core/viz/struct_vizualizer.py @@ -1,5 +1,7 @@ import matplotlib.pyplot as plt +import moviepy.editor as mp from matplotlib.lines import Line2D +from moviepy.video.io.bindings import mplfig_to_npimage from gefest.core.geometry import Structure from gefest.core.geometry.domain import Domain @@ -17,7 +19,7 @@ class StructVizualizer: def __init__(self, domain: Domain): self.domain = domain - def plot_structure(self, structs: list[Structure], infos, linestyles="-"): + def plot_structure(self, structs: list[Structure], infos=None, linestyles='-'): """The method displays the given list[obj:`Structure`] Args: structs: the list[obj:`Structure`] for displaying @@ -35,21 +37,25 @@ def plot_structure(self, structs: list[Structure], infos, linestyles="-"): |viz_struct| .. |viz_struct| image::https://ibb.co/fN7XCXh """ - + if not isinstance(structs, list): + structs = [structs] + if not isinstance(infos, list): + infos = [infos] + fig = plt.figure() for struct, linestyle in zip(structs, linestyles): - for poly in struct.polygons: - self.plot_poly(poly, linestyle) - boundary = self.domain.bound_poly x = [pt.x for pt in boundary.points] y = [pt.y for pt in boundary.points] + plt.plot(x, y, 'k') - plt.plot(x, y) + for poly in struct.polygons: + self.plot_poly(poly, linestyle) lines = [ - Line2D([0], [0], color="black", linewidth=3, linestyle=style) for style in linestyles + Line2D([0], [0], color='black', linewidth=3, linestyle=style) for style in linestyles ] plt.legend(lines, infos, loc=2) + return fig def plot_poly(self, poly, linestyle): """The method displays the given :obj:`Polygon` @@ -66,7 +72,30 @@ def plot_poly(self, poly, linestyle): """ x_ = [pt.x for pt in poly.points] y_ = [pt.y for pt in poly.points] + + plt.plot(x_, y_, linestyle=linestyle) for i, p in enumerate(zip(x_, y_)): plt.plot(p[0], p[1], marker='${}$'.format(i), color='black') - - plt.plot(x_, y_,linestyle=linestyle) + + +class GIFMaker(StructVizualizer): + def __init__(self, domain) -> None: + super().__init__(domain=domain) + self.frames = [] + self.counter = 0 + + def create_frame(self, structure, infos): + fig = self.plot_structure(structure, infos) + numpy_fig = mplfig_to_npimage(fig) + self.frames.append(numpy_fig) + plt.close() + + def make_gif(self, gifname, duration=1500, loop=-1): + + # imgs = [Image.fromarray(img) for img in self.frames] + # imgs[0].save(f"./{gifname}.fig", save_all=True, append_images=imgs[1:], duration=duration, loop=0) + # clip = mp.VideoFileClip("mygif.gif") + clip = mp.ImageSequenceClip(self.frames, durations=[1500] * len(self.frames), fps=0.66) + clip.write_videofile(f'./{gifname}.mp4') + self.frames = [] + self.counter = 0 diff --git a/refactored.py b/refactored.py index 5bfd56d33..0703336ab 100644 --- a/refactored.py +++ b/refactored.py @@ -1,5 +1,5 @@ from functools import partial -from typing import Callable, Optional +from typing import Callable, Optional, Union import numpy as np from loguru import logger @@ -7,20 +7,17 @@ from pydantic.dataclasses import dataclass from tqdm import tqdm -from gefest.core.algs.geom.validation import * -from gefest.core.algs.postproc.resolve_errors import * +from gefest.core.algs.postproc.resolve_errors import Rules, PolygonRule, StructureRule from gefest.core.geometry import Structure from gefest.core.geometry.domain import Domain from gefest.core.geometry.utils import * -from gefest.core.opt.operators.crossovers import panmixis, structure_level_crossover +from gefest.core.opt.operators.crossovers import panmixis, structure_level_crossover, polygon_level_crossover from gefest.core.opt.operators.mutations import * -from gefest.core.opt.operators.selections import tournament_selection +from gefest.core.opt.operators.selections import tournament_selection, roulette_selection from gefest.core.opt.strategies.crossover import CrossoverStrategy from gefest.core.opt.strategies.mutation import MutationStrategy from gefest.core.opt.strategies.strategy import Strategy from gefest.core.structure.prohibited import create_prohibited -from gefest.core.utils.mp_manager import WorkersManager -from gefest.core.viz.struct_vizualizer import StructVizualizer from gefest.tools.optimizers.optimizer import Optimizer from gefest.tools.samplers.standard.standard import StandardSampler @@ -40,12 +37,12 @@ class OptimizationParams: sampler: Callable estimator: Callable postprocessor: Callable - postprocess_rules: dict[str, list[Callable, Callable]] + postprocess_rules: list[Union[PolygonRule, StructureRule]] mutation_prob: float = 0.6 crossover_prob: float = 0.6 mutation_each_prob: Optional[list[float]] = None crossover_each_prob: Optional[list[float]] = None - workers_manager: Optional[object] = None + n_jobs: Optional[int] = -1 def __post_init__(self): self.crossovers = [ @@ -55,52 +52,53 @@ def __post_init__(self): self.postprocessor = partial( self.postprocessor, domain=self.domain, - rule_fix_pairs=self.postprocess_rules, + rules=self.postprocess_rules, ) self.sampler = self.sampler(opt_params=self) -@logger.catch -def main(opt_params): +# def main(opt_params): - optimizer = BaseGA(opt_params) +# optimizer = BaseGA(opt_params) - optimizer.optimize(1) +# optimizer.optimize(15) - from gefest.core.viz.struct_vizualizer import StructVizualizer +# from gefest.core.viz.struct_vizualizer import StructVizualizer - plt.figure(figsize=(7, 7)) - visualiser = StructVizualizer(domain) +# plt.figure(figsize=(7, 7)) +# visualiser = StructVizualizer(domain) - info = { - 'spend_time': 1, - 'fitness': optimizer._pop[0].fitness, - 'type': 'prediction', - } - visualiser.plot_structure( - [optimizer._pop[0]], [info], ['-'], - ) +# info = { +# 'spend_time': 1, +# 'fitness': optimizer._pop[0].fitness, +# 'type': 'prediction', +# } +# visualiser.plot_structure( +# [optimizer._pop[0]], [info], ['-'], +# ) - plt.show(block=True) +# plt.show(block=True) -from contextlib import contextmanager +# from contextlib import contextmanager -@contextmanager -def configuration(subprocess_holder): - try: - # load config from yaml - yield subprocess_holder - except Exception: - raise - finally: - # add simulators safe exit - subprocess_holder.workers_manager.pool.close() - subprocess_holder.workers_manager.pool.terminate() +# @contextmanager +# def configuration(subprocess_holder): +# try: +# # load config from yaml +# yield subprocess_holder +# except Exception: +# raise +# finally: +# # add simulators safe exit +# subprocess_holder.parallel_manager.pool.close() +# subprocess_holder.parallel_manager.pool.terminate() if __name__ == '__main__': - logger.add('somefile.log', enqueue=True) + # logger.add('somefile.log', enqueue=True) + + logger.disable('__main__') class BaseGA(Optimizer): def __init__( self, @@ -114,16 +112,18 @@ def __init__( self.estimator: Callable[[list[Structure]], list[Structure]] = opt_params.estimator self.selector: Callable = opt_params.selector self.pop_size = opt_params.pop_size - self._pop: list[Structure] = self.sampler(self.opt_params.pop_size) self.domain = self.opt_params.domain + self._pop: list[Structure] = self.sampler(self.opt_params.pop_size) + self._pop = self.estimator(self._pop) def optimize(self, n_steps: int) -> list[Structure]: for _ in tqdm(range(n_steps)): - self._pop = self.estimator(pop=self._pop) self._pop = self.selector(self._pop, self.opt_params.pop_size) self._pop = self.crossover(self._pop) self._pop = self.mutation(self._pop) + self._pop = self.estimator(self._pop) + [print(x.fitness) for x in self._pop] self._pop = sorted(self._pop, key=lambda x: x.fitness) return self._pop @@ -178,12 +178,17 @@ def area_length_ratio(pop: list[Structure], domain: Domain): min_points_num=6, is_closed=True, ) + print(domain.dist_between_points) + print(domain.dist_between_polygons) + print(domain.min_dist_from_boundary) from pathlib import Path from gefest.core.geometry import PolyID from gefest.tools import Estimator from gefest.tools.estimators.simulators.swan.swan_interface import Swan + + # root_path = Path(__file__).parent.parent.parent.parent # path_sim = ( # 'F:/Git_Repositories/gefest_fork/GEFEST/gefest/tools/estimators/simulators/swan/swan_model/' @@ -223,7 +228,9 @@ def area_length_ratio(pop: list[Structure], domain: Domain): # estimator = ComsolFitness(simulator=comsol, domain=domain) opt_params = OptimizationParams( - crossovers=[partial(structure_level_crossover, domain=domain)], + crossovers=[ + partial(polygon_level_crossover, domain=domain), + partial(structure_level_crossover, domain=domain)], mutations=[ rotate_poly, resize_poly, @@ -237,50 +244,50 @@ def area_length_ratio(pop: list[Structure], domain: Domain): crossover_strategy=CrossoverStrategy, mutation_prob=0.6, crossover_prob=0.6, - crossover_each_prob=[1], - mutation_each_prob=[0.125, 0.125, 0.35, 0.05, 0.1, 0.05, 0.2], + crossover_each_prob=[0.0, 1.0], + mutation_each_prob=[0.125, 0.125, 0.35, 0.05, 0.01, 0.01, 0.33], n_steps=5, - pop_size=50, + pop_size=25, selector=tournament_selection, pair_selector=panmixis, - postprocess_attempts=3, + postprocess_attempts=5, domain=domain, postprocessor=postprocess, - postprocess_rules={ - 'unclosed': [ - unclosed_poly, - correct_unclosed_poly, - ], - 'self_intersect': [ - self_intersection, - correct_self_intersection, - ], - 'wrong_points': [ - out_of_bound, - correct_wrong_point, - ], - }, + postprocess_rules=[ + + Rules.not_closed_polygon.value, + Rules.not_self_intersects.value, + Rules.not_out_of_bounds.value, + Rules.not_too_close_points.value, + Rules.not_too_close_polygons.value, + ], sampler=StandardSampler, estimator=partial(area_length_ratio, domain=domain), - workers_manager=WorkersManager(), + n_jobs=-1, ) - with configuration(opt_params): - optimizer = BaseGA(opt_params) - - optimizer.optimize(100) - - from gefest.core.viz.struct_vizualizer import StructVizualizer - plt.figure(figsize=(7, 7)) - visualiser = StructVizualizer(domain) - - info = { - 'spend_time': 1, - 'fitness': optimizer._pop[0].fitness, - 'type': 'prediction', - } - visualiser.plot_structure( - [optimizer._pop[0]], [info], ['-'], - ) - - plt.show(block=True) + # with configuration(opt_params): + optimizer = BaseGA(opt_params) + logger.disable('Standard') + import cProfile + optimizer.optimize(25) + + from gefest.core.viz.struct_vizualizer import GIFMaker + gm = GIFMaker(domain=domain) + for s in optimizer._pop: + gm.create_frame(s, {'Fitness': s.fitness}) + gm.make_gif('test', 500, ) + # from gefest.core.viz.struct_vizualizer import StructVizualizer + # plt.figure(figsize=(7, 7)) + # visualiser = StructVizualizer(domain) + + # info = { + # 'spend_time': 1, + # 'fitness': optimizer._pop[0].fitness, + # 'type': 'prediction', + # } + # visualiser.plot_structure( + # [optimizer._pop[0]], [info], ['-'], + # ) + + # plt.show(block=True)