Skip to content

Commit

Permalink
Add new mutation and recombination operators (#205)
Browse files Browse the repository at this point in the history
Built upon the newly added tree nodes, five new operators were
introduced that are able to manipulate the tree while it
still conforms the grammar.

Additionally, forcing to use Python 3.11.5 in GitHub actions,
since Python 3.11.7 causes an inexplicable failure in multiprocessing.
  • Loading branch information
renatahodovan authored Dec 11, 2023
1 parent f48f3d6 commit 4ec72bd
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 46 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
python-version: [3.7, 3.8, 3.9, '3.10', '3.11', '3.12', 'pypy-3.9']
python-version: [3.7, 3.8, 3.9, '3.10', '3.11.5', '3.12', 'pypy-3.9'] # FIXME: forcing to use py3.11.5 since py3.11.7 causes an inexplicable failure in multiprocessing.
runs-on: ${{ matrix.os }}
steps:
- run: git config --global core.autocrlf input
Expand Down
8 changes: 7 additions & 1 deletion grammarinator/runtime/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ def __repr__(self):
return f'{self.__class__.__name__}({", ".join(parts)})'

def _dbg_(self):
return '{name}\n{children}'.format(name=self.name, children=indent('\n'.join(child._dbg_() for child in self.children), '| '))
return '{name}\n{children}'.format(name=self.name or self.__class__.__name__, children=indent('\n'.join(child._dbg_() for child in self.children), '| '))


class UnparserRule(ParentRule):
Expand Down Expand Up @@ -412,6 +412,9 @@ def __repr__(self):
def __deepcopy__(self, memo):
return UnparserRuleQuantifier(idx=deepcopy(self.idx, memo), start=deepcopy(self.start, memo), stop=deepcopy(self.stop, memo), children=[deepcopy(child, memo) for child in self.children])

def _dbg_(self):
return '{name}:[{idx}]\n{children}'.format(idx=self.idx, name=self.__class__.__name__, children=indent('\n'.join(child._dbg_() for child in self.children), '| '))


class UnparserRuleQuantified(ParentRule):
"""
Expand Down Expand Up @@ -455,3 +458,6 @@ def __deepcopy__(self, memo):
return UnparserRuleAlternative(alt_idx=deepcopy(self.alt_idx, memo),
idx=deepcopy(self.idx, memo),
children=[deepcopy(child, memo) for child in self.children])

def _dbg_(self):
return '{name}:[{alt_idx}/{idx}]\n{children}'.format(name=self.__class__.__name__, alt_idx=self.alt_idx, idx=self.idx, children=indent('\n'.join(child._dbg_() for child in self.children), '| '))
240 changes: 196 additions & 44 deletions grammarinator/tool/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
import random

from contextlib import nullcontext
from copy import deepcopy
from os.path import abspath, dirname
from shutil import rmtree

from ..runtime import CooldownModel, DefaultModel, ParentRule, RuleSize, UnlexerRule, UnparserRule
from ..runtime import CooldownModel, DefaultModel, ParentRule, RuleSize, UnlexerRule, UnparserRule, UnparserRuleAlternative, UnparserRuleQuantifier

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -45,8 +46,10 @@ def __init__(self, generator_class):
# Exposing some class variables of the encapsulated generator.
# In the generator class, they start with `_` to avoid any kind of
# collision with rule names, so they start with `_` here as well.
self._rule_sizes = generator_class._rule_sizes
self._immutable_rules = generator_class._immutable_rules
self._rule_sizes = generator_class._rule_sizes
self._alt_sizes = generator_class._alt_sizes
self._quant_sizes = generator_class._quant_sizes

def __call__(self, limit=None):
"""
Expand Down Expand Up @@ -228,9 +231,9 @@ def create(self, index):
creators.append(self.generate)
if self._population:
if self._enable_mutation:
creators.append(self.mutate)
creators.extend((self.regenerate_rule, self.delete_quantified, self.replicate_quantified, self.shuffle_quantifieds, self.hoist_rule))
if self._enable_recombination:
creators.append(self.recombine)
creators.extend((self.replace_node, self.insert_quantified))

creator = random.choice(creators)
root = creator()
Expand Down Expand Up @@ -268,22 +271,35 @@ def generate(self, *, rule=None, reserve=None):
:rtype: Rule
"""
reserve = reserve if reserve is not None else RuleSize()
limit = self._limit - reserve
generator = self._generator_factory(limit=limit)

generator = self._generator_factory(limit=self._limit - reserve)
rule = rule or self._rule or generator._default_rule.__name__
rule_size = generator._rule_sizes.get(rule, None)

if not rule_size:
logger.warning('The size limits of %r are not known.', rule)
elif rule_size.depth > limit.depth:
raise ValueError(f'{rule!r} cannot be generated within the given depth: {limit.depth} (min needed: {rule_size.depth}).')
elif rule_size.tokens > limit.tokens:
raise ValueError(f'{rule!r} cannot be generated within the given token count: {limit.tokens} (min needed: {rule_size.tokens}).')

return getattr(generator, rule)()

def mutate(self):
"""
Dispatcher method for mutation operators: it picks one operator
randomly and creates a new tree with it.
Supported mutation operators: :meth:`regenerate_rule`, :meth:`delete_quantified`, :meth:`replicate_quantified`, :meth:`shuffle_quantifieds`, :meth:`hoist_rule`
:return: The root of the mutated tree.
:rtype: Rule
"""
return random.choice((self.regenerate_rule, self.delete_quantified, self.replicate_quantified, self.shuffle_quantifieds, self.hoist_rule))()

def recombine(self):
"""
Dispatcher method for recombination operators: it picks one operator
randomly and creates a new tree with it.
Supported recombination operators: :meth:`replace_node`, :meth:`insert_quantified`
:return: The root of the recombined tree.
:rtype: Rule
"""
return random.choice((self.replace_node, self.insert_quantified))()

def regenerate_rule(self):
"""
Mutate a tree at a random position, i.e., discard and re-generate its
sub-tree at a randomly selected node.
Expand All @@ -293,7 +309,15 @@ def mutate(self):
"""
root, annot = self._select_individual()

options = self._filter_nodes((node for nodes in annot.nodes_by_name.values() for node in nodes), root, annot)
# Filter items from the nodes of the selected tree that can be regenerated
# within the current maximum depth and token limit (except '<INVALID>' and
# immutable nodes).
options = [node for nodes in annot.rules_by_name.values() for node in nodes
if (node.parent is not None
and node.name != '<INVALID>'
and node.name not in self._generator_factory._immutable_rules
and annot.node_levels[node] + self._generator_factory._rule_sizes.get(node.name, RuleSize(0, 0)).depth < self._limit.depth
and annot.token_counts[root] - annot.token_counts[node] + self._generator_factory._rule_sizes.get(node.name, RuleSize(0, 0)).tokens < self._limit.tokens)]
if options:
mutated_node = random.choice(options)
reserve = RuleSize(depth=annot.node_levels[mutated_node],
Expand All @@ -303,10 +327,9 @@ def mutate(self):

# If selection strategy fails, we fallback and discard the whole tree
# and generate a brand new one instead.
logger.debug('Could not choose node to mutate.')
return self.generate(rule=root.name)

def recombine(self):
def replace_node(self):
"""
Recombine two trees at random positions where the nodes are compatible
with each other (i.e., they share the same node name). One of the trees
Expand All @@ -318,24 +341,134 @@ def recombine(self):
:rtype: Rule
"""
recipient_root, recipient_annot = self._select_individual()
donor_root, donor_annot = self._select_individual()
_, donor_annot = self._select_individual()

common_types = sorted(set(recipient_annot.nodes_by_name.keys()).intersection(set(donor_annot.nodes_by_name.keys())))
recipient_options = self._filter_nodes((node for rule_name in common_types for node in recipient_annot.nodes_by_name[rule_name]), recipient_root, recipient_annot)
recipient_lookup = dict(recipient_annot.rules_by_name)
recipient_lookup.update(recipient_annot.quants_by_name)
recipient_lookup.update(recipient_annot.alts_by_name)

donor_lookup = dict(donor_annot.rules_by_name)
donor_lookup.update(donor_annot.quants_by_name)
donor_lookup.update(donor_annot.alts_by_name)
common_types = sorted((set(recipient_lookup.keys()) - {(x, ) for x in self._generator_factory._immutable_rules} - {('<INVALID>', )}) & set(donor_lookup.keys()))

recipient_options = [(rule_name, node) for rule_name in common_types for node in recipient_lookup[rule_name] if node.parent]
# Shuffle suitable nodes with sample.
for recipient_node in random.sample(recipient_options, k=len(recipient_options)):
donor_options = donor_annot.nodes_by_name[recipient_node.name]
for rule_name, recipient_node in random.sample(recipient_options, k=len(recipient_options)):
donor_options = donor_lookup[rule_name]
for donor_node in random.sample(donor_options, k=len(donor_options)):
# Make sure that the output tree won't exceed the depth limit.
if (recipient_annot.node_levels[recipient_node] + donor_annot.node_depths[donor_node] <= self._limit.depth
and recipient_annot.token_counts[recipient_root] - recipient_annot.token_counts[recipient_node] + donor_annot.token_counts[donor_node] < self._limit.tokens):
recipient_node = recipient_node.replace(donor_node)
return recipient_node.root
recipient_node.replace(donor_node)
return recipient_root

# If selection strategy fails, we practically cause the whole donor tree
# If selection strategy fails, we practically cause the whole recipient tree
# to be the result of recombination.
logger.debug('Could not find node pairs to recombine.')
return donor_root
return recipient_root

def insert_quantified(self):
"""
Selects two compatible quantifier nodes from two trees randomly and if
the quantifier node of the recipient tree is not full (the number of
its children is less than the maximum count), then add one new child
to it at a random position from the children of donors quantifier node.
:return: The root of the extended tree.
:rtype: Rule
"""
recipient_root, recipient_annot = self._select_individual()
_, donor_annot = self._select_individual()

common_types = sorted(set(recipient_annot.quants_by_name.keys()) & set(donor_annot.quants_by_name.keys()))
recipient_options = [(name, node) for name in common_types for node in recipient_annot.quants_by_name[name] if len(node.children) < node.stop]
for rule_name, recipient_node in random.sample(recipient_options, k=len(recipient_options)):
donor_options = [quantified for quantifier in donor_annot.quants_by_name[rule_name] for quantified in quantifier.children]
for donor_node in random.sample(donor_options, k=len(donor_options)):
# Make sure that the output tree won't exceed the depth and token limits.
if (recipient_annot.node_levels[recipient_node] + donor_annot.node_depths[donor_node] <= self._limit.depth
and recipient_annot.token_counts[recipient_root] + donor_annot.token_counts[donor_node] < self._limit.tokens):
recipient_node.insert_child(random.randint(0, len(recipient_node.children)), donor_node)
return recipient_root

# If selection strategy fails, we practically cause the whole recipient tree
# to be the result of insertion.
return recipient_root

def delete_quantified(self):
"""
Removes an optional subtree randomly from a quantifier node.
:return: The root of the modified tree.
:rtype: Rule
"""
root, annot = self._select_individual()
options = [child for node in annot.quants if len(node.children) > node.start for child in node.children]
if options:
removed_node = random.choice(options)
removed_node.remove()

# Return with the original root, whether the deletion was successful or not.
return root

def replicate_quantified(self):
"""
Select a quantified sub-tree randomly, replicate it and insert it again if
the maximum quantification count is not reached yet.
:return: The root of the modified tree.
:rtype: Rule
"""
root, annot = self._select_individual()
root_options = [node for node in annot.quants if node.stop > len(node.children)]
node_options = [child for root in root_options for child in root.children if
annot.token_counts[root] + annot.token_counts[child] <= self._limit.tokens]
if node_options:
node_to_repeat = random.choice(node_options)
node_to_repeat.parent.insert_child(idx=random.randint(0, len(node_to_repeat.parent.children)), node=deepcopy(node_to_repeat))

# Return with the original root, whether the replication was successful or not.
return root

def shuffle_quantifieds(self):
"""
Select a quantifier node and shuffle its quantified sub-trees.
:return: The root of the modified tree.
:rtype: Rule
"""
root, annot = self._select_individual()
options = [node for node in annot.quants if len(node.children) > 1]
if options:
node_to_shuffle = random.choice(options)
random.shuffle(node_to_shuffle.children)

# Return with the original root, whether the shuffling was successful or not.
return root

def hoist_rule(self):
"""
Select an individual of the population to be mutated and select two
rule nodes from it which share the same rule name and are in
ancestor-descendant relationship making possible for the descendant
to replace its ancestor.
:return: The root of the hoisted tree.
:rtype: Rule
"""
root, annot = self._select_individual()
options = [node for node in annot.rules
if node.parent
and node.name != '<INVALID>'
and node.name not in self._generator_factory._immutable_rules]
for rule in random.sample(options, k=len(options)):
parent = rule.parent
while parent:
if parent.name == rule.name:
parent.replace(rule)
return root
parent = parent.parent
return root

def _select_individual(self):
root, annot = self._population.select_individual()
Expand All @@ -348,29 +481,33 @@ def _add_individual(self, root, path):
# superfluous here, but we have no way of knowing that in advance
self._population.add_individual(root, Annotations(root), path)

# Filter items from ``nodes`` that can be regenerated within the current
# maximum depth and token limit (except '<INVALID>' and immutable nodes
# and nodes without name).
def _filter_nodes(self, nodes, root, annot):
return [node for node in nodes
if node.parent is not None
and node.name not in self._generator_factory._immutable_rules
and node.name not in [None, '<INVALID>']
and annot.node_levels[node] + self._generator_factory._rule_sizes.get(node.name, RuleSize(0, 0)).depth < self._limit.depth
and annot.token_counts[root] - annot.token_counts[node] + self._generator_factory._rule_sizes.get(node.name, RuleSize(0, 0)).tokens < self._limit.tokens]


class Annotations:

def __init__(self, root):
def _annotate(current, level):
nonlocal current_rule_name
self.node_levels[current] = level

if isinstance(current, (UnlexerRule, UnparserRule)):
if current.name:
if current.name not in self.nodes_by_name:
self.nodes_by_name[current.name] = []
self.nodes_by_name[current.name].append(current)
current_rule_name = (current.name,)
if current_rule_name not in self.rules_by_name:
self.rules_by_name[current_rule_name] = []
self.rules_by_name[current_rule_name].append(current)
else:
current_rule_name = None
elif current_rule_name:
if isinstance(current, UnparserRuleQuantifier):
node_name = current_rule_name + ('q', current.idx,)
if node_name not in self.quants_by_name:
self.quants_by_name[node_name] = []
self.quants_by_name[node_name].append(current)
elif isinstance(current, UnparserRuleAlternative):
node_name = current_rule_name + ('a', current.alt_idx,)
if node_name not in self.alts_by_name:
self.alts_by_name[node_name] = []
self.alts_by_name[node_name].append(current)

self.node_depths[current] = 0
self.token_counts[current] = 0
Expand All @@ -380,8 +517,23 @@ def _annotate(current, level):
self.node_depths[current] = max(self.node_depths[current], self.node_depths[child] + 1)
self.token_counts[current] += self.token_counts[child] if isinstance(child, ParentRule) else child.size.tokens + 1

self.nodes_by_name = {}
current_rule_name = None
self.rules_by_name = {}
self.alts_by_name = {}
self.quants_by_name = {}
self.node_levels = {}
self.node_depths = {}
self.token_counts = {}
_annotate(root, 0)

@property
def rules(self):
return [rule for rules in self.rules_by_name.values() for rule in rules]

@property
def alts(self):
return [alt for alts in self.alts_by_name.values() for alt in alts]

@property
def quants(self):
return [quant for quants in self.quants_by_name.values() for quant in quants]

0 comments on commit 4ec72bd

Please sign in to comment.