Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hdl: allow driving individual bits of signals from multiple modules or domains. [0.5 backport] #1513

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 90 additions & 83 deletions amaranth/hdl/_dsl.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,82 +19,78 @@
__all__ = ["SyntaxError", "SyntaxWarning", "Module"]


class _Visitor:
def __init__(self):
self.driven_signals = SignalSet()

def visit_stmt(self, stmt):
if isinstance(stmt, _StatementList):
for s in stmt:
self.visit_stmt(s)
elif isinstance(stmt, Assign):
self.visit_lhs(stmt.lhs)
self.visit_rhs(stmt.rhs)
elif isinstance(stmt, Print):
def _check_stmt(stmt):
if isinstance(stmt, _StatementList):
for s in stmt:
_check_stmt(s)
elif isinstance(stmt, Assign):
_check_lhs(stmt.lhs)
_check_rhs(stmt.rhs)
elif isinstance(stmt, Print):
for chunk in stmt.message._chunks:
if not isinstance(chunk, str):
obj, format_spec = chunk
_check_rhs(obj)
elif isinstance(stmt, Property):
_check_rhs(stmt.test)
if stmt.message is not None:
for chunk in stmt.message._chunks:
if not isinstance(chunk, str):
obj, format_spec = chunk
self.visit_rhs(obj)
elif isinstance(stmt, Property):
self.visit_rhs(stmt.test)
if stmt.message is not None:
for chunk in stmt.message._chunks:
if not isinstance(chunk, str):
obj, format_spec = chunk
self.visit_rhs(obj)
elif isinstance(stmt, Switch):
self.visit_rhs(stmt.test)
for _patterns, stmts, _src_loc in stmt.cases:
self.visit_stmt(stmts)
elif isinstance(stmt, _LateBoundStatement):
pass
else:
assert False # :nocov:

def visit_lhs(self, value):
if isinstance(value, Operator) and value.operator in ("u", "s"):
self.visit_lhs(value.operands[0])
elif isinstance(value, (Signal, ClockSignal, ResetSignal)):
self.driven_signals.add(value)
elif isinstance(value, Slice):
self.visit_lhs(value.value)
elif isinstance(value, Part):
self.visit_lhs(value.value)
self.visit_rhs(value.offset)
elif isinstance(value, Concat):
for part in value.parts:
self.visit_lhs(part)
elif isinstance(value, SwitchValue):
self.visit_rhs(value.test)
for _patterns, elem in value.cases:
self.visit_lhs(elem)
elif isinstance(value, MemoryData._Row):
raise ValueError(f"Value {value!r} can only be used in simulator processes")
else:
raise ValueError(f"Value {value!r} cannot be assigned to")

def visit_rhs(self, value):
if isinstance(value, (Const, Signal, ClockSignal, ResetSignal, Initial, AnyValue)):
pass
elif isinstance(value, Operator):
for op in value.operands:
self.visit_rhs(op)
elif isinstance(value, Slice):
self.visit_rhs(value.value)
elif isinstance(value, Part):
self.visit_rhs(value.value)
self.visit_rhs(value.offset)
elif isinstance(value, Concat):
for part in value.parts:
self.visit_rhs(part)
elif isinstance(value, SwitchValue):
self.visit_rhs(value.test)
for _patterns, elem in value.cases:
self.visit_rhs(elem)
elif isinstance(value, MemoryData._Row):
raise ValueError(f"Value {value!r} can only be used in simulator processes")
else:
assert False # :nocov:
_check_rhs(obj)
elif isinstance(stmt, Switch):
_check_rhs(stmt.test)
for _patterns, stmts, _src_loc in stmt.cases:
_check_stmt(stmts)
elif isinstance(stmt, _LateBoundStatement):
pass
else:
assert False # :nocov:

def _check_lhs(value):
if isinstance(value, Operator) and value.operator in ("u", "s"):
_check_lhs(value.operands[0])
elif isinstance(value, (Signal, ClockSignal, ResetSignal)):
pass
elif isinstance(value, Slice):
_check_lhs(value.value)
elif isinstance(value, Part):
_check_lhs(value.value)
_check_rhs(value.offset)
elif isinstance(value, Concat):
for part in value.parts:
_check_lhs(part)
elif isinstance(value, SwitchValue):
_check_rhs(value.test)
for _patterns, elem in value.cases:
_check_lhs(elem)
elif isinstance(value, MemoryData._Row):
raise ValueError(f"Value {value!r} can only be used in simulator processes")
else:
raise ValueError(f"Value {value!r} cannot be assigned to")

def _check_rhs(value):
if isinstance(value, (Const, Signal, ClockSignal, ResetSignal, Initial, AnyValue)):
pass
elif isinstance(value, Operator):
for op in value.operands:
_check_rhs(op)
elif isinstance(value, Slice):
_check_rhs(value.value)
elif isinstance(value, Part):
_check_rhs(value.value)
_check_rhs(value.offset)
elif isinstance(value, Concat):
for part in value.parts:
_check_rhs(part)
elif isinstance(value, SwitchValue):
_check_rhs(value.test)
for _patterns, elem in value.cases:
_check_rhs(elem)
elif isinstance(value, MemoryData._Row):
raise ValueError(f"Value {value!r} can only be used in simulator processes")
else:
assert False # :nocov:


class _ModuleBuilderProxy:
Expand Down Expand Up @@ -632,16 +628,27 @@ def _add_statement(self, assigns, domain, depth):

stmt._MustUse__used = True

visitor = _Visitor()
visitor.visit_stmt(stmt)
for signal in visitor.driven_signals:
if signal not in self._driving:
self._driving[signal] = domain
elif self._driving[signal] != domain:
cd_curr = self._driving[signal]
raise SyntaxError(
f"Driver-driver conflict: trying to drive {signal!r} from d.{domain}, but it is "
f"already driven from d.{cd_curr}")
_check_stmt(stmt)

lhs_masks = LHSMaskCollector()
# This is an opportunistic early check — not much harm skipping it, since
# the whole-design check will be later done in NIR emitter.
if not isinstance(stmt, _LateBoundStatement):
lhs_masks.visit_stmt(stmt)

for sig, mask in lhs_masks.masks():
if sig not in self._driving:
self._driving[sig] = [None] * len(sig)
sig_domain = self._driving[sig]
for bit in range(len(sig)):
if not (mask & (1 << bit)):
continue
if sig_domain[bit] is None:
sig_domain[bit] = domain
if sig_domain[bit] != domain:
raise SyntaxError(
f"Driver-driver conflict: trying to drive {sig!r} bit {bit} from d.{domain}, but it is "
f"already driven from d.{sig_domain[bit]}")

self._statements.setdefault(domain, []).append(stmt)

Expand Down
Loading