From 4ec72bd99e9088e16e645beb4c44932e287d6c5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A1ta=20Hodov=C3=A1n?= Date: Mon, 11 Dec 2023 17:08:15 +0100 Subject: [PATCH] Add new mutation and recombination operators (#205) Built upon the newly added tree nodes, five new operators were introduced that are able to manipulate the tree while it still conforms the grammar. Additionally, forcing to use Python 3.11.5 in GitHub actions, since Python 3.11.7 causes an inexplicable failure in multiprocessing. --- .github/workflows/main.yml | 2 +- grammarinator/runtime/rule.py | 8 +- grammarinator/tool/generator.py | 240 ++++++++++++++++++++++++++------ 3 files changed, 204 insertions(+), 46 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index d44a3cc..15a88c7 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -6,7 +6,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, macos-latest, windows-latest] - python-version: [3.7, 3.8, 3.9, '3.10', '3.11', '3.12', 'pypy-3.9'] + python-version: [3.7, 3.8, 3.9, '3.10', '3.11.5', '3.12', 'pypy-3.9'] # FIXME: forcing to use py3.11.5 since py3.11.7 causes an inexplicable failure in multiprocessing. runs-on: ${{ matrix.os }} steps: - run: git config --global core.autocrlf input diff --git a/grammarinator/runtime/rule.py b/grammarinator/runtime/rule.py index a6a573a..fa531b1 100644 --- a/grammarinator/runtime/rule.py +++ b/grammarinator/runtime/rule.py @@ -311,7 +311,7 @@ def __repr__(self): return f'{self.__class__.__name__}({", ".join(parts)})' def _dbg_(self): - return '{name}\n{children}'.format(name=self.name, children=indent('\n'.join(child._dbg_() for child in self.children), '| ')) + return '{name}\n{children}'.format(name=self.name or self.__class__.__name__, children=indent('\n'.join(child._dbg_() for child in self.children), '| ')) class UnparserRule(ParentRule): @@ -412,6 +412,9 @@ def __repr__(self): def __deepcopy__(self, memo): return UnparserRuleQuantifier(idx=deepcopy(self.idx, memo), start=deepcopy(self.start, memo), stop=deepcopy(self.stop, memo), children=[deepcopy(child, memo) for child in self.children]) + def _dbg_(self): + return '{name}:[{idx}]\n{children}'.format(idx=self.idx, name=self.__class__.__name__, children=indent('\n'.join(child._dbg_() for child in self.children), '| ')) + class UnparserRuleQuantified(ParentRule): """ @@ -455,3 +458,6 @@ def __deepcopy__(self, memo): return UnparserRuleAlternative(alt_idx=deepcopy(self.alt_idx, memo), idx=deepcopy(self.idx, memo), children=[deepcopy(child, memo) for child in self.children]) + + def _dbg_(self): + return '{name}:[{alt_idx}/{idx}]\n{children}'.format(name=self.__class__.__name__, alt_idx=self.alt_idx, idx=self.idx, children=indent('\n'.join(child._dbg_() for child in self.children), '| ')) diff --git a/grammarinator/tool/generator.py b/grammarinator/tool/generator.py index 0ce2b6c..0e51c4e 100644 --- a/grammarinator/tool/generator.py +++ b/grammarinator/tool/generator.py @@ -11,10 +11,11 @@ import random from contextlib import nullcontext +from copy import deepcopy from os.path import abspath, dirname from shutil import rmtree -from ..runtime import CooldownModel, DefaultModel, ParentRule, RuleSize, UnlexerRule, UnparserRule +from ..runtime import CooldownModel, DefaultModel, ParentRule, RuleSize, UnlexerRule, UnparserRule, UnparserRuleAlternative, UnparserRuleQuantifier logger = logging.getLogger(__name__) @@ -45,8 +46,10 @@ def __init__(self, generator_class): # Exposing some class variables of the encapsulated generator. # In the generator class, they start with `_` to avoid any kind of # collision with rule names, so they start with `_` here as well. - self._rule_sizes = generator_class._rule_sizes self._immutable_rules = generator_class._immutable_rules + self._rule_sizes = generator_class._rule_sizes + self._alt_sizes = generator_class._alt_sizes + self._quant_sizes = generator_class._quant_sizes def __call__(self, limit=None): """ @@ -228,9 +231,9 @@ def create(self, index): creators.append(self.generate) if self._population: if self._enable_mutation: - creators.append(self.mutate) + creators.extend((self.regenerate_rule, self.delete_quantified, self.replicate_quantified, self.shuffle_quantifieds, self.hoist_rule)) if self._enable_recombination: - creators.append(self.recombine) + creators.extend((self.replace_node, self.insert_quantified)) creator = random.choice(creators) root = creator() @@ -268,22 +271,35 @@ def generate(self, *, rule=None, reserve=None): :rtype: Rule """ reserve = reserve if reserve is not None else RuleSize() - limit = self._limit - reserve - generator = self._generator_factory(limit=limit) - + generator = self._generator_factory(limit=self._limit - reserve) rule = rule or self._rule or generator._default_rule.__name__ - rule_size = generator._rule_sizes.get(rule, None) - - if not rule_size: - logger.warning('The size limits of %r are not known.', rule) - elif rule_size.depth > limit.depth: - raise ValueError(f'{rule!r} cannot be generated within the given depth: {limit.depth} (min needed: {rule_size.depth}).') - elif rule_size.tokens > limit.tokens: - raise ValueError(f'{rule!r} cannot be generated within the given token count: {limit.tokens} (min needed: {rule_size.tokens}).') - return getattr(generator, rule)() def mutate(self): + """ + Dispatcher method for mutation operators: it picks one operator + randomly and creates a new tree with it. + + Supported mutation operators: :meth:`regenerate_rule`, :meth:`delete_quantified`, :meth:`replicate_quantified`, :meth:`shuffle_quantifieds`, :meth:`hoist_rule` + + :return: The root of the mutated tree. + :rtype: Rule + """ + return random.choice((self.regenerate_rule, self.delete_quantified, self.replicate_quantified, self.shuffle_quantifieds, self.hoist_rule))() + + def recombine(self): + """ + Dispatcher method for recombination operators: it picks one operator + randomly and creates a new tree with it. + + Supported recombination operators: :meth:`replace_node`, :meth:`insert_quantified` + + :return: The root of the recombined tree. + :rtype: Rule + """ + return random.choice((self.replace_node, self.insert_quantified))() + + def regenerate_rule(self): """ Mutate a tree at a random position, i.e., discard and re-generate its sub-tree at a randomly selected node. @@ -293,7 +309,15 @@ def mutate(self): """ root, annot = self._select_individual() - options = self._filter_nodes((node for nodes in annot.nodes_by_name.values() for node in nodes), root, annot) + # Filter items from the nodes of the selected tree that can be regenerated + # within the current maximum depth and token limit (except '' and + # immutable nodes). + options = [node for nodes in annot.rules_by_name.values() for node in nodes + if (node.parent is not None + and node.name != '' + and node.name not in self._generator_factory._immutable_rules + and annot.node_levels[node] + self._generator_factory._rule_sizes.get(node.name, RuleSize(0, 0)).depth < self._limit.depth + and annot.token_counts[root] - annot.token_counts[node] + self._generator_factory._rule_sizes.get(node.name, RuleSize(0, 0)).tokens < self._limit.tokens)] if options: mutated_node = random.choice(options) reserve = RuleSize(depth=annot.node_levels[mutated_node], @@ -303,10 +327,9 @@ def mutate(self): # If selection strategy fails, we fallback and discard the whole tree # and generate a brand new one instead. - logger.debug('Could not choose node to mutate.') return self.generate(rule=root.name) - def recombine(self): + def replace_node(self): """ Recombine two trees at random positions where the nodes are compatible with each other (i.e., they share the same node name). One of the trees @@ -318,24 +341,134 @@ def recombine(self): :rtype: Rule """ recipient_root, recipient_annot = self._select_individual() - donor_root, donor_annot = self._select_individual() + _, donor_annot = self._select_individual() - common_types = sorted(set(recipient_annot.nodes_by_name.keys()).intersection(set(donor_annot.nodes_by_name.keys()))) - recipient_options = self._filter_nodes((node for rule_name in common_types for node in recipient_annot.nodes_by_name[rule_name]), recipient_root, recipient_annot) + recipient_lookup = dict(recipient_annot.rules_by_name) + recipient_lookup.update(recipient_annot.quants_by_name) + recipient_lookup.update(recipient_annot.alts_by_name) + + donor_lookup = dict(donor_annot.rules_by_name) + donor_lookup.update(donor_annot.quants_by_name) + donor_lookup.update(donor_annot.alts_by_name) + common_types = sorted((set(recipient_lookup.keys()) - {(x, ) for x in self._generator_factory._immutable_rules} - {('', )}) & set(donor_lookup.keys())) + + recipient_options = [(rule_name, node) for rule_name in common_types for node in recipient_lookup[rule_name] if node.parent] # Shuffle suitable nodes with sample. - for recipient_node in random.sample(recipient_options, k=len(recipient_options)): - donor_options = donor_annot.nodes_by_name[recipient_node.name] + for rule_name, recipient_node in random.sample(recipient_options, k=len(recipient_options)): + donor_options = donor_lookup[rule_name] for donor_node in random.sample(donor_options, k=len(donor_options)): # Make sure that the output tree won't exceed the depth limit. if (recipient_annot.node_levels[recipient_node] + donor_annot.node_depths[donor_node] <= self._limit.depth and recipient_annot.token_counts[recipient_root] - recipient_annot.token_counts[recipient_node] + donor_annot.token_counts[donor_node] < self._limit.tokens): - recipient_node = recipient_node.replace(donor_node) - return recipient_node.root + recipient_node.replace(donor_node) + return recipient_root - # If selection strategy fails, we practically cause the whole donor tree + # If selection strategy fails, we practically cause the whole recipient tree # to be the result of recombination. - logger.debug('Could not find node pairs to recombine.') - return donor_root + return recipient_root + + def insert_quantified(self): + """ + Selects two compatible quantifier nodes from two trees randomly and if + the quantifier node of the recipient tree is not full (the number of + its children is less than the maximum count), then add one new child + to it at a random position from the children of donors quantifier node. + + :return: The root of the extended tree. + :rtype: Rule + """ + recipient_root, recipient_annot = self._select_individual() + _, donor_annot = self._select_individual() + + common_types = sorted(set(recipient_annot.quants_by_name.keys()) & set(donor_annot.quants_by_name.keys())) + recipient_options = [(name, node) for name in common_types for node in recipient_annot.quants_by_name[name] if len(node.children) < node.stop] + for rule_name, recipient_node in random.sample(recipient_options, k=len(recipient_options)): + donor_options = [quantified for quantifier in donor_annot.quants_by_name[rule_name] for quantified in quantifier.children] + for donor_node in random.sample(donor_options, k=len(donor_options)): + # Make sure that the output tree won't exceed the depth and token limits. + if (recipient_annot.node_levels[recipient_node] + donor_annot.node_depths[donor_node] <= self._limit.depth + and recipient_annot.token_counts[recipient_root] + donor_annot.token_counts[donor_node] < self._limit.tokens): + recipient_node.insert_child(random.randint(0, len(recipient_node.children)), donor_node) + return recipient_root + + # If selection strategy fails, we practically cause the whole recipient tree + # to be the result of insertion. + return recipient_root + + def delete_quantified(self): + """ + Removes an optional subtree randomly from a quantifier node. + + :return: The root of the modified tree. + :rtype: Rule + """ + root, annot = self._select_individual() + options = [child for node in annot.quants if len(node.children) > node.start for child in node.children] + if options: + removed_node = random.choice(options) + removed_node.remove() + + # Return with the original root, whether the deletion was successful or not. + return root + + def replicate_quantified(self): + """ + Select a quantified sub-tree randomly, replicate it and insert it again if + the maximum quantification count is not reached yet. + + :return: The root of the modified tree. + :rtype: Rule + """ + root, annot = self._select_individual() + root_options = [node for node in annot.quants if node.stop > len(node.children)] + node_options = [child for root in root_options for child in root.children if + annot.token_counts[root] + annot.token_counts[child] <= self._limit.tokens] + if node_options: + node_to_repeat = random.choice(node_options) + node_to_repeat.parent.insert_child(idx=random.randint(0, len(node_to_repeat.parent.children)), node=deepcopy(node_to_repeat)) + + # Return with the original root, whether the replication was successful or not. + return root + + def shuffle_quantifieds(self): + """ + Select a quantifier node and shuffle its quantified sub-trees. + + :return: The root of the modified tree. + :rtype: Rule + """ + root, annot = self._select_individual() + options = [node for node in annot.quants if len(node.children) > 1] + if options: + node_to_shuffle = random.choice(options) + random.shuffle(node_to_shuffle.children) + + # Return with the original root, whether the shuffling was successful or not. + return root + + def hoist_rule(self): + """ + Select an individual of the population to be mutated and select two + rule nodes from it which share the same rule name and are in + ancestor-descendant relationship making possible for the descendant + to replace its ancestor. + + :return: The root of the hoisted tree. + :rtype: Rule + """ + root, annot = self._select_individual() + options = [node for node in annot.rules + if node.parent + and node.name != '' + and node.name not in self._generator_factory._immutable_rules] + for rule in random.sample(options, k=len(options)): + parent = rule.parent + while parent: + if parent.name == rule.name: + parent.replace(rule) + return root + parent = parent.parent + return root def _select_individual(self): root, annot = self._population.select_individual() @@ -348,29 +481,33 @@ def _add_individual(self, root, path): # superfluous here, but we have no way of knowing that in advance self._population.add_individual(root, Annotations(root), path) - # Filter items from ``nodes`` that can be regenerated within the current - # maximum depth and token limit (except '' and immutable nodes - # and nodes without name). - def _filter_nodes(self, nodes, root, annot): - return [node for node in nodes - if node.parent is not None - and node.name not in self._generator_factory._immutable_rules - and node.name not in [None, ''] - and annot.node_levels[node] + self._generator_factory._rule_sizes.get(node.name, RuleSize(0, 0)).depth < self._limit.depth - and annot.token_counts[root] - annot.token_counts[node] + self._generator_factory._rule_sizes.get(node.name, RuleSize(0, 0)).tokens < self._limit.tokens] - class Annotations: def __init__(self, root): def _annotate(current, level): + nonlocal current_rule_name self.node_levels[current] = level if isinstance(current, (UnlexerRule, UnparserRule)): if current.name: - if current.name not in self.nodes_by_name: - self.nodes_by_name[current.name] = [] - self.nodes_by_name[current.name].append(current) + current_rule_name = (current.name,) + if current_rule_name not in self.rules_by_name: + self.rules_by_name[current_rule_name] = [] + self.rules_by_name[current_rule_name].append(current) + else: + current_rule_name = None + elif current_rule_name: + if isinstance(current, UnparserRuleQuantifier): + node_name = current_rule_name + ('q', current.idx,) + if node_name not in self.quants_by_name: + self.quants_by_name[node_name] = [] + self.quants_by_name[node_name].append(current) + elif isinstance(current, UnparserRuleAlternative): + node_name = current_rule_name + ('a', current.alt_idx,) + if node_name not in self.alts_by_name: + self.alts_by_name[node_name] = [] + self.alts_by_name[node_name].append(current) self.node_depths[current] = 0 self.token_counts[current] = 0 @@ -380,8 +517,23 @@ def _annotate(current, level): self.node_depths[current] = max(self.node_depths[current], self.node_depths[child] + 1) self.token_counts[current] += self.token_counts[child] if isinstance(child, ParentRule) else child.size.tokens + 1 - self.nodes_by_name = {} + current_rule_name = None + self.rules_by_name = {} + self.alts_by_name = {} + self.quants_by_name = {} self.node_levels = {} self.node_depths = {} self.token_counts = {} _annotate(root, 0) + + @property + def rules(self): + return [rule for rules in self.rules_by_name.values() for rule in rules] + + @property + def alts(self): + return [alt for alts in self.alts_by_name.values() for alt in alts] + + @property + def quants(self): + return [quant for quants in self.quants_by_name.values() for quant in quants]