Skip to content

Commit

Permalink
hdl: allow driving individual bits of signals from multiple modules o…
Browse files Browse the repository at this point in the history
…r domains.

Fixes #1454.
  • Loading branch information
wanda-phi authored and whitequark committed Sep 18, 2024
1 parent ba6a749 commit 91233ef
Show file tree
Hide file tree
Showing 7 changed files with 362 additions and 163 deletions.
173 changes: 90 additions & 83 deletions amaranth/hdl/_dsl.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,82 +19,78 @@
__all__ = ["SyntaxError", "SyntaxWarning", "Module"]


class _Visitor:
def __init__(self):
self.driven_signals = SignalSet()

def visit_stmt(self, stmt):
if isinstance(stmt, _StatementList):
for s in stmt:
self.visit_stmt(s)
elif isinstance(stmt, Assign):
self.visit_lhs(stmt.lhs)
self.visit_rhs(stmt.rhs)
elif isinstance(stmt, Print):
def _check_stmt(stmt):
if isinstance(stmt, _StatementList):
for s in stmt:
_check_stmt(s)
elif isinstance(stmt, Assign):
_check_lhs(stmt.lhs)
_check_rhs(stmt.rhs)
elif isinstance(stmt, Print):
for chunk in stmt.message._chunks:
if not isinstance(chunk, str):
obj, format_spec = chunk
_check_rhs(obj)
elif isinstance(stmt, Property):
_check_rhs(stmt.test)
if stmt.message is not None:
for chunk in stmt.message._chunks:
if not isinstance(chunk, str):
obj, format_spec = chunk
self.visit_rhs(obj)
elif isinstance(stmt, Property):
self.visit_rhs(stmt.test)
if stmt.message is not None:
for chunk in stmt.message._chunks:
if not isinstance(chunk, str):
obj, format_spec = chunk
self.visit_rhs(obj)
elif isinstance(stmt, Switch):
self.visit_rhs(stmt.test)
for _patterns, stmts, _src_loc in stmt.cases:
self.visit_stmt(stmts)
elif isinstance(stmt, _LateBoundStatement):
pass
else:
assert False # :nocov:

def visit_lhs(self, value):
if isinstance(value, Operator) and value.operator in ("u", "s"):
self.visit_lhs(value.operands[0])
elif isinstance(value, (Signal, ClockSignal, ResetSignal)):
self.driven_signals.add(value)
elif isinstance(value, Slice):
self.visit_lhs(value.value)
elif isinstance(value, Part):
self.visit_lhs(value.value)
self.visit_rhs(value.offset)
elif isinstance(value, Concat):
for part in value.parts:
self.visit_lhs(part)
elif isinstance(value, SwitchValue):
self.visit_rhs(value.test)
for _patterns, elem in value.cases:
self.visit_lhs(elem)
elif isinstance(value, MemoryData._Row):
raise ValueError(f"Value {value!r} can only be used in simulator processes")
else:
raise ValueError(f"Value {value!r} cannot be assigned to")

def visit_rhs(self, value):
if isinstance(value, (Const, Signal, ClockSignal, ResetSignal, Initial, AnyValue)):
pass
elif isinstance(value, Operator):
for op in value.operands:
self.visit_rhs(op)
elif isinstance(value, Slice):
self.visit_rhs(value.value)
elif isinstance(value, Part):
self.visit_rhs(value.value)
self.visit_rhs(value.offset)
elif isinstance(value, Concat):
for part in value.parts:
self.visit_rhs(part)
elif isinstance(value, SwitchValue):
self.visit_rhs(value.test)
for _patterns, elem in value.cases:
self.visit_rhs(elem)
elif isinstance(value, MemoryData._Row):
raise ValueError(f"Value {value!r} can only be used in simulator processes")
else:
assert False # :nocov:
_check_rhs(obj)
elif isinstance(stmt, Switch):
_check_rhs(stmt.test)
for _patterns, stmts, _src_loc in stmt.cases:
_check_stmt(stmts)
elif isinstance(stmt, _LateBoundStatement):
pass
else:
assert False # :nocov:

def _check_lhs(value):
if isinstance(value, Operator) and value.operator in ("u", "s"):
_check_lhs(value.operands[0])
elif isinstance(value, (Signal, ClockSignal, ResetSignal)):
pass
elif isinstance(value, Slice):
_check_lhs(value.value)
elif isinstance(value, Part):
_check_lhs(value.value)
_check_rhs(value.offset)
elif isinstance(value, Concat):
for part in value.parts:
_check_lhs(part)
elif isinstance(value, SwitchValue):
_check_rhs(value.test)
for _patterns, elem in value.cases:
_check_lhs(elem)
elif isinstance(value, MemoryData._Row):
raise ValueError(f"Value {value!r} can only be used in simulator processes")
else:
raise ValueError(f"Value {value!r} cannot be assigned to")

def _check_rhs(value):
if isinstance(value, (Const, Signal, ClockSignal, ResetSignal, Initial, AnyValue)):
pass
elif isinstance(value, Operator):
for op in value.operands:
_check_rhs(op)
elif isinstance(value, Slice):
_check_rhs(value.value)
elif isinstance(value, Part):
_check_rhs(value.value)
_check_rhs(value.offset)
elif isinstance(value, Concat):
for part in value.parts:
_check_rhs(part)
elif isinstance(value, SwitchValue):
_check_rhs(value.test)
for _patterns, elem in value.cases:
_check_rhs(elem)
elif isinstance(value, MemoryData._Row):
raise ValueError(f"Value {value!r} can only be used in simulator processes")
else:
assert False # :nocov:


class _ModuleBuilderProxy:
Expand Down Expand Up @@ -632,16 +628,27 @@ def _add_statement(self, assigns, domain, depth):

stmt._MustUse__used = True

visitor = _Visitor()
visitor.visit_stmt(stmt)
for signal in visitor.driven_signals:
if signal not in self._driving:
self._driving[signal] = domain
elif self._driving[signal] != domain:
cd_curr = self._driving[signal]
raise SyntaxError(
f"Driver-driver conflict: trying to drive {signal!r} from d.{domain}, but it is "
f"already driven from d.{cd_curr}")
_check_stmt(stmt)

lhs_masks = LHSMaskCollector()
# This is an opportunistic early check — not much harm skipping it, since
# the whole-design check will be later done in NIR emitter.
if not isinstance(stmt, _LateBoundStatement):
lhs_masks.visit_stmt(stmt)

for sig, mask in lhs_masks.masks():
if sig not in self._driving:
self._driving[sig] = [None] * len(sig)
sig_domain = self._driving[sig]
for bit in range(len(sig)):
if not (mask & (1 << bit)):
continue
if sig_domain[bit] is None:
sig_domain[bit] = domain
if sig_domain[bit] != domain:
raise SyntaxError(
f"Driver-driver conflict: trying to drive {sig!r} bit {bit} from d.{domain}, but it is "
f"already driven from d.{sig_domain[bit]}")

self._statements.setdefault(domain, []).append(stmt)

Expand Down
Loading

0 comments on commit 91233ef

Please sign in to comment.