Skip to content

Commit

Permalink
sim.pysim: move name extractor functionality to Fragment.
Browse files Browse the repository at this point in the history
At the moment there are two issues with assignment of names in pysim:
1. Names are not deduplicated. It is possible (and frequent) for names
   to be included twice in VCD output.
2. Names are different compared to what is emitted in RTLIL, Verilog,
   or CXXRTL output.

This commit fixes issue (1), and issue (2) will be fixed by the new IR.
  • Loading branch information
whitequark committed Nov 25, 2023
1 parent e7b15e1 commit 79adbed
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 34 deletions.
78 changes: 78 additions & 0 deletions amaranth/hdl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,84 @@ def prepare(self, ports=None, missing_domain=lambda name: ClockDomain(name)):
fragment._propagate_ports(ports=mapped_ports, all_undef_as_ports=False)
return fragment

def _assign_names_to_signals(self):
"""Assign names to signals used in this fragment.
Returns
-------
SignalDict of Signal to str
A mapping from signals used in this fragment to their local names. Because names are
deduplicated using local information only, the same signal used in a different fragment
may get a different name.
"""

signal_names = SignalDict()
assigned_names = set()

def add_signal_name(signal):
if signal not in signal_names:
if signal.name not in assigned_names:
name = signal.name
else:
name = f"{signal.name}${len(assigned_names)}"
assert name not in assigned_names
signal_names[signal] = name
assigned_names.add(name)

for port in self.ports.keys():
add_signal_name(port)

for domain_name, domain_signals in self.drivers.items():
if domain_name is not None:
domain = self.domains[domain_name]
add_signal_name(domain.clk)
if domain.rst is not None:
add_signal_name(domain.rst)

for statement in self.statements:
for signal in statement._lhs_signals() | statement._rhs_signals():
if not isinstance(signal, (ClockSignal, ResetSignal)):
add_signal_name(signal)

return signal_names

def _assign_names_to_fragments(self, hierarchy=("top",), *, _names=None):
"""Assign names to this fragment and its subfragments.
Subfragments may not necessarily have a name. This method assigns every such subfragment
a name, ``U$<number>``, where ``<number>`` is based on its location in the hierarchy.
Subfragment names may collide with signal names safely in Amaranth, but this may confuse
backends. This method assigns every such subfragment a name, ``<name>$U$<number>``, where
``name`` is its original name, and ``<number>`` is based on its location in the hierarchy.
Arguments
---------
hierarchy : tuple of str
Name of this fragment.
Returns
-------
dict of Fragment to tuple of str
A mapping from this fragment and its subfragments to their full hierarchical names.
"""

if _names is None:
_names = dict()
_names[self] = hierarchy

signal_names = set(self._assign_names_to_signals().values())
for subfragment_index, (subfragment, subfragment_name) in enumerate(self.subfragments):
if subfragment_name is None:
subfragment_name = f"U${subfragment_index}"
elif subfragment_name in signal_names:
subfragment_name = f"{subfragment_name}$U${subfragment_index}"
assert subfragment_name not in signal_names
subfragment._assign_names_to_fragments(hierarchy=(*hierarchy, subfragment_name),
_names=_names)

return _names


class Instance(Fragment):
def __init__(self, type, *args, **kwargs):
Expand Down
42 changes: 8 additions & 34 deletions amaranth/sim/pysim.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,6 @@
__all__ = ["PySimEngine"]


class _NameExtractor:
def __init__(self):
self.names = SignalDict()

def __call__(self, fragment, *, hierarchy=("bench", "top",)):
def add_signal_name(signal):
hierarchical_signal_name = (*hierarchy, signal.name)
if signal not in self.names:
self.names[signal] = {hierarchical_signal_name}
else:
self.names[signal].add(hierarchical_signal_name)

for domain_name, domain_signals in fragment.drivers.items():
if domain_name is not None:
domain = fragment.domains[domain_name]
add_signal_name(domain.clk)
if domain.rst is not None:
add_signal_name(domain.rst)

for statement in fragment.statements:
for signal in statement._lhs_signals() | statement._rhs_signals():
if not isinstance(signal, (ClockSignal, ResetSignal)):
add_signal_name(signal)

for subfragment_index, (subfragment, subfragment_name) in enumerate(fragment.subfragments):
if subfragment_name is None:
subfragment_name = f"U${subfragment_index}"
self(subfragment, hierarchy=(*hierarchy, subfragment_name))

return self.names


class _VCDWriter:
@staticmethod
def decode_to_vcd(signal, value):
Expand All @@ -69,12 +37,18 @@ def __init__(self, fragment, *, vcd_file, gtkw_file=None, traces=()):

self.traces = []

signal_names = _NameExtractor()(fragment)
signal_names = SignalDict()
for subfragment, subfragment_name in \
fragment._assign_names_to_fragments(hierarchy=("bench", "top",)).items():
for signal, signal_name in subfragment._assign_names_to_signals().items():
if signal not in signal_names:
signal_names[signal] = set()
signal_names[signal].add((*subfragment_name, signal_name))

trace_names = SignalDict()
for trace in traces:
if trace not in signal_names:
trace_names[trace] = {('bench', trace.name)}
trace_names[trace] = {("bench", trace.name)}
self.traces.append(trace)

if self.vcd_writer is None:
Expand Down
68 changes: 68 additions & 0 deletions tests/test_hdl_ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,3 +834,71 @@ def test_prepare_attrs(self):
self.assertEqual(f.attrs, OrderedDict([
("ATTR", 1),
]))

def test_assign_names_to_signals(self):
i = Signal()
rst = Signal()
o1 = Signal()
o2 = Signal()
o3 = Signal()
i1 = Signal(name="i")

f = Fragment()
f.add_domains(cd_sync := ClockDomain())
f.add_domains(cd_sync_norst := ClockDomain(reset_less=True))
f.add_ports((i, rst), dir="i")
f.add_ports((o1, o2, o3), dir="o")
f.add_statements([o1.eq(0)])
f.add_driver(o1, domain=None)
f.add_statements([o2.eq(i1)])
f.add_driver(o2, domain="sync")
f.add_statements([o3.eq(i1)])
f.add_driver(o3, domain="sync_norst")

names = f._assign_names_to_signals()
self.assertEqual(names, SignalDict([
(i, "i"),
(rst, "rst"),
(o1, "o1"),
(o2, "o2"),
(o3, "o3"),
(cd_sync.clk, "clk"),
(cd_sync.rst, "rst$6"),
(cd_sync_norst.clk, "sync_norst_clk"),
(i1, "i$8"),
]))

def test_assign_names_to_fragments(self):
f = Fragment()
f.add_subfragment(a := Fragment())
f.add_subfragment(b := Fragment(), name="b")

names = f._assign_names_to_fragments()
self.assertEqual(names, {
f: ("top",),
a: ("top", "U$0"),
b: ("top", "b")
})

def test_assign_names_to_fragments_rename_top(self):
f = Fragment()
f.add_subfragment(a := Fragment())
f.add_subfragment(b := Fragment(), name="b")

names = f._assign_names_to_fragments(hierarchy=("bench", "cpu"))
self.assertEqual(names, {
f: ("bench", "cpu",),
a: ("bench", "cpu", "U$0"),
b: ("bench", "cpu", "b")
})

def test_assign_names_to_fragments_collide_with_signal(self):
f = Fragment()
f.add_subfragment(a_f := Fragment(), name="a")
f.add_ports((a_s := Signal(name="a"),), dir="o")

names = f._assign_names_to_fragments()
self.assertEqual(names, {
f: ("top",),
a_f: ("top", "a$U$0")
})

0 comments on commit 79adbed

Please sign in to comment.