Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move PySim name extraction functionality to Fragment and improve it #965

Merged
merged 2 commits into from
Nov 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions amaranth/hdl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,84 @@ def prepare(self, ports=None, missing_domain=lambda name: ClockDomain(name)):
fragment._propagate_ports(ports=mapped_ports, all_undef_as_ports=False)
return fragment

def _assign_names_to_signals(self):
"""Assign names to signals used in this fragment.

Returns
-------
SignalDict of Signal to str
A mapping from signals used in this fragment to their local names. Because names are
deduplicated using local information only, the same signal used in a different fragment
may get a different name.
"""

signal_names = SignalDict()
assigned_names = set()

def add_signal_name(signal):
if signal not in signal_names:
if signal.name not in assigned_names:
name = signal.name
else:
name = f"{signal.name}${len(assigned_names)}"
assert name not in assigned_names
signal_names[signal] = name
assigned_names.add(name)

for port in self.ports.keys():
add_signal_name(port)

for domain_name, domain_signals in self.drivers.items():
if domain_name is not None:
domain = self.domains[domain_name]
add_signal_name(domain.clk)
if domain.rst is not None:
add_signal_name(domain.rst)

for statement in self.statements:
for signal in statement._lhs_signals() | statement._rhs_signals():
if not isinstance(signal, (ClockSignal, ResetSignal)):
add_signal_name(signal)

return signal_names

def _assign_names_to_fragments(self, hierarchy=("top",), *, _names=None):
"""Assign names to this fragment and its subfragments.

Subfragments may not necessarily have a name. This method assigns every such subfragment
a name, ``U$<number>``, where ``<number>`` is based on its location in the hierarchy.

Subfragment names may collide with signal names safely in Amaranth, but this may confuse
backends. This method assigns every such subfragment a name, ``<name>$U$<number>``, where
``name`` is its original name, and ``<number>`` is based on its location in the hierarchy.

Arguments
---------
hierarchy : tuple of str
Name of this fragment.

Returns
-------
dict of Fragment to tuple of str
A mapping from this fragment and its subfragments to their full hierarchical names.
"""

if _names is None:
_names = dict()
_names[self] = hierarchy

signal_names = set(self._assign_names_to_signals().values())
for subfragment_index, (subfragment, subfragment_name) in enumerate(self.subfragments):
if subfragment_name is None:
subfragment_name = f"U${subfragment_index}"
elif subfragment_name in signal_names:
subfragment_name = f"{subfragment_name}$U${subfragment_index}"
assert subfragment_name not in signal_names
subfragment._assign_names_to_fragments(hierarchy=(*hierarchy, subfragment_name),
_names=_names)

return _names


class Instance(Fragment):
def __init__(self, type, *args, **kwargs):
Expand Down
10 changes: 6 additions & 4 deletions amaranth/sim/_pyrtl.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@

__all__ = ["PyRTLProcess"]


_USE_PATTERN_MATCHING = (sys.version_info >= (3, 10))


class PyRTLProcess(BaseProcess):
__slots__ = ("is_comb", "runnable", "passive", "run")

Expand Down Expand Up @@ -83,11 +85,11 @@ def on_value(self, value):
"simulate in reasonable time"
.format(src, len(value)))

v = super().on_value(value)
if isinstance(v, str) and len(v) > 1000:
code = super().on_value(value)
if isinstance(code, str) and len(code) > 1000:
# Avoid parser stack overflow on older Pythons.
return self.emitter.def_var("intermediate", v)
return v
return self.emitter.def_var("expr_split", code)
return code

def on_ClockSignal(self, value):
raise NotImplementedError # :nocov:
Expand Down
42 changes: 8 additions & 34 deletions amaranth/sim/pysim.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,6 @@
__all__ = ["PySimEngine"]


class _NameExtractor:
def __init__(self):
self.names = SignalDict()

def __call__(self, fragment, *, hierarchy=("bench", "top",)):
def add_signal_name(signal):
hierarchical_signal_name = (*hierarchy, signal.name)
if signal not in self.names:
self.names[signal] = {hierarchical_signal_name}
else:
self.names[signal].add(hierarchical_signal_name)

for domain_name, domain_signals in fragment.drivers.items():
if domain_name is not None:
domain = fragment.domains[domain_name]
add_signal_name(domain.clk)
if domain.rst is not None:
add_signal_name(domain.rst)

for statement in fragment.statements:
for signal in statement._lhs_signals() | statement._rhs_signals():
if not isinstance(signal, (ClockSignal, ResetSignal)):
add_signal_name(signal)

for subfragment_index, (subfragment, subfragment_name) in enumerate(fragment.subfragments):
if subfragment_name is None:
subfragment_name = f"U${subfragment_index}"
self(subfragment, hierarchy=(*hierarchy, subfragment_name))

return self.names


class _VCDWriter:
@staticmethod
def decode_to_vcd(signal, value):
Expand All @@ -69,12 +37,18 @@

self.traces = []

signal_names = _NameExtractor()(fragment)
signal_names = SignalDict()
for subfragment, subfragment_name in \
fragment._assign_names_to_fragments(hierarchy=("bench", "top",)).items():
for signal, signal_name in subfragment._assign_names_to_signals().items():
if signal not in signal_names:
signal_names[signal] = set()
signal_names[signal].add((*subfragment_name, signal_name))

trace_names = SignalDict()
for trace in traces:
if trace not in signal_names:
trace_names[trace] = {('bench', trace.name)}
trace_names[trace] = {("bench", trace.name)}

Check warning on line 51 in amaranth/sim/pysim.py

View check run for this annotation

Codecov / codecov/patch

amaranth/sim/pysim.py#L51

Added line #L51 was not covered by tests
self.traces.append(trace)

if self.vcd_writer is None:
Expand Down
68 changes: 68 additions & 0 deletions tests/test_hdl_ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,3 +834,71 @@ def test_prepare_attrs(self):
self.assertEqual(f.attrs, OrderedDict([
("ATTR", 1),
]))

def test_assign_names_to_signals(self):
i = Signal()
rst = Signal()
o1 = Signal()
o2 = Signal()
o3 = Signal()
i1 = Signal(name="i")

f = Fragment()
f.add_domains(cd_sync := ClockDomain())
f.add_domains(cd_sync_norst := ClockDomain(reset_less=True))
f.add_ports((i, rst), dir="i")
f.add_ports((o1, o2, o3), dir="o")
f.add_statements([o1.eq(0)])
f.add_driver(o1, domain=None)
f.add_statements([o2.eq(i1)])
f.add_driver(o2, domain="sync")
f.add_statements([o3.eq(i1)])
f.add_driver(o3, domain="sync_norst")

names = f._assign_names_to_signals()
self.assertEqual(names, SignalDict([
(i, "i"),
(rst, "rst"),
(o1, "o1"),
(o2, "o2"),
(o3, "o3"),
(cd_sync.clk, "clk"),
(cd_sync.rst, "rst$6"),
(cd_sync_norst.clk, "sync_norst_clk"),
(i1, "i$8"),
]))

def test_assign_names_to_fragments(self):
f = Fragment()
f.add_subfragment(a := Fragment())
f.add_subfragment(b := Fragment(), name="b")

names = f._assign_names_to_fragments()
self.assertEqual(names, {
f: ("top",),
a: ("top", "U$0"),
b: ("top", "b")
})

def test_assign_names_to_fragments_rename_top(self):
f = Fragment()
f.add_subfragment(a := Fragment())
f.add_subfragment(b := Fragment(), name="b")

names = f._assign_names_to_fragments(hierarchy=("bench", "cpu"))
self.assertEqual(names, {
f: ("bench", "cpu",),
a: ("bench", "cpu", "U$0"),
b: ("bench", "cpu", "b")
})

def test_assign_names_to_fragments_collide_with_signal(self):
f = Fragment()
f.add_subfragment(a_f := Fragment(), name="a")
f.add_ports((a_s := Signal(name="a"),), dir="o")

names = f._assign_names_to_fragments()
self.assertEqual(names, {
f: ("top",),
a_f: ("top", "a$U$0")
})