diff --git a/verification/block/pic/Makefile b/verification/block/pic/Makefile new file mode 100644 index 00000000000..9b9ab29bbe0 --- /dev/null +++ b/verification/block/pic/Makefile @@ -0,0 +1,16 @@ +null := +space := $(null) # +comma := , + +CURDIR := $(abspath $(dir $(lastword $(MAKEFILE_LIST)))) +SRCDIR := $(abspath $(CURDIR)../../../../design) + +TEST_FILES = $(sort $(wildcard test_*.py)) + +MODULE ?= $(subst $(space),$(comma),$(subst .py,,$(TEST_FILES))) +TOPLEVEL = el2_pic_ctrl + +VERILOG_SOURCES = \ + $(SRCDIR)/el2_pic_ctrl.sv + +include $(CURDIR)/../common.mk diff --git a/verification/block/pic/test_clken.py b/verification/block/pic/test_clken.py new file mode 100644 index 00000000000..ce931eb1bb0 --- /dev/null +++ b/verification/block/pic/test_clken.py @@ -0,0 +1,358 @@ +# Copyright (c) 2023 Antmicro +# SPDX-License-Identifier: Apache-2.0 + +from copy import deepcopy + +import cocotb +import pyuvm +from cocotb.result import SimTimeoutError +from cocotb.triggers import Edge, Lock, RisingEdge, Timer, with_timeout +from pyuvm import * +from testbench import BaseEnv, BaseTest, collect_signals + +# ============================================================================= + + +class ClockEnableItem(uvm_sequence_item): + def __init__(self, clk_en, io_clk_en): + super().__init__("ClockEnableItem") + self.clk_en = clk_en + self.io_clk_en = io_clk_en + + +class ClockStateItem(uvm_sequence_item): + def __init__(self, state): + super().__init__("ClockStateItem") + self.state = deepcopy(state) + + +# ============================================================================= + + +class ClkenDriver(uvm_driver): + """ + A driver for clock gating override signals + """ + + SIGNALS = [ + "clk_override", + "io_clk_override", + ] + + def __init__(self, *args, **kwargs): + uut = kwargs["uut"] + del kwargs["uut"] + + super().__init__(*args, **kwargs) + + collect_signals(self.SIGNALS, uut, self) + + async def run_phase(self): + while True: + it = await self.seq_item_port.get_next_item() + + if isinstance(it, ClockEnableItem): + self.clk_override.value = it.clk_en + self.io_clk_override.value = it.io_clk_en + else: + raise RuntimeError("Unknown item '{}'".format(type(it))) + + self.seq_item_port.item_done() + + +class ClkenMonitor(uvm_component): + """ + A monitor for clock gating override signals + """ + + SIGNALS = [ + "clk", + "clk_override", + "io_clk_override", + ] + + def __init__(self, *args, **kwargs): + uut = kwargs["uut"] + del kwargs["uut"] + + super().__init__(*args, **kwargs) + + collect_signals(self.SIGNALS, uut, self) + + self.prev_clk_override = 0 + self.prev_io_clk_override = 0 + + def build_phase(self): + self.ap = uvm_analysis_port("ap", self) + + async def run_phase(self): + while True: + # Sample control signals + await RisingEdge(self.clk) + clk_override = int(self.clk_override.value) + io_clk_override = int(self.io_clk_override.value) + + if ( + self.prev_clk_override != clk_override + or self.prev_io_clk_override != io_clk_override + ): + self.ap.write(ClockEnableItem(clk_override, io_clk_override)) + + self.prev_clk_override = clk_override + self.prev_io_clk_override = io_clk_override + + +# ============================================================================= + + +class ClockMonitor(uvm_component): + """ + A monitor for clock signal activity. + + The monitor spawns one task per clock signal. Each task waits either for a + signal transition or a fixed time equal to twice the expected clock period + (its actually important that the time is greater than half-period) If, + during the waiting time, the task detects any signal transition + (1->0, 0->1), then it marks the signal as an active clock. Otherwise, the + signal is marked as inactive. + + The main task of the monitor periodically samples the state vector reported + by monitoring tasks and sends a message through its analysis port. This is + scheduled to happen periodically every 5 * the expected clock period. The + scheduling is chosen arbitrarily. + """ + + SIGNALS = [ + "pic_raddr_c1_clk", + "pic_data_c1_clk", + "pic_pri_c1_clk", + "pic_int_c1_clk", + "gw_config_c1_clk", + ] + + def __init__(self, *args, **kwargs): + uut = kwargs["uut"] + del kwargs["uut"] + + super().__init__(*args, **kwargs) + + collect_signals(self.SIGNALS, uut, self) + + self.lock = Lock() + self.state = {sig: False for sig in self.SIGNALS} + + def build_phase(self): + self.ap = uvm_analysis_port("ap", self) + + async def run_phase(self): + period = ConfigDB().get(None, "", "TEST_CLK_PERIOD") + + # Start monitoring tasks + for name in self.SIGNALS: + cocotb.start_soon(self.monitor_clock(name)) + + # Periodically sample clock state and send messages + while True: + # Wait + await Timer(period * 5, "ns") + + # Sample state and send item + async with self.lock: + self.ap.write(ClockStateItem(self.state)) + + async def monitor_clock(self, name): + period = ConfigDB().get(None, "", "TEST_CLK_PERIOD") + signal = getattr(self, name) + + # Monitor the clock signal + while True: + # Wait for clock edges with timeout + try: + await with_timeout(Edge(signal), 2.0 * period, "ns") + toggling = True + except SimTimeoutError: + toggling = False + + # Update the state + async with self.lock: + self.state[name] = toggling + + +# ============================================================================= + + +class Scoreboard(uvm_component): + """ + Clock activity scoreboard. + """ + + CLOCKS = [ + "pic_raddr_c1_clk", + "pic_data_c1_clk", + "pic_pri_c1_clk", + "pic_int_c1_clk", + ] + + IO_CLOCKS = [ + # FIXME: "IO" clock gates are instanced along with gateway modules + # inside a generate block. It appears that cocotb does not have access + # to them. + ] + + def __init__(self, name, parent): + super().__init__(name, parent) + + self.passed = None + + def build_phase(self): + self.fifo = uvm_tlm_analysis_fifo("fifo", self) + self.port = uvm_get_port("port", self) + + def connect_phase(self): + self.port.connect(self.fifo.get_export) + + def check_phase(self): + while self.port.can_get(): + # Get an item + got_item, item = self.port.try_get() + assert got_item + + # Got a change in clock override control + if isinstance(item, ClockEnableItem): + # Initially pass + if self.passed is None: + self.passed = True + + # Reject next clock state item + got_it, it = self.port.try_get() + assert got_it and isinstance(it, ClockStateItem) + + # Get next clock state item and process it + got_it, it = self.port.try_get() + assert got_it and isinstance(it, ClockStateItem) + + # Check clocks + for clk in self.CLOCKS: + if it.state[clk] != item.clk_en: + self.passed = False + self.logger.error( + "Clock '{}' is {}toggling".format( + clk, + "not " if item.clk_en else "", + ) + ) + + for clk in self.IO_CLOCKS: + if it.state[clk] != item.io_clk_en: + self.passed = False + self.logger.error( + "IO clock '{}' is {}toggling".format( + clk, + "not " if item.io_clk_en else "", + ) + ) + + def final_phase(self): + if not self.passed: + self.logger.critical("{} reports a failure".format(type(self))) + assert False + + +# ============================================================================= + + +class TestSequence(uvm_sequence): + """ + A sequence which instructs a driver to enable/disable clock gating override + """ + + async def body(self): + period = ConfigDB().get(None, "", "TEST_CLK_PERIOD") + + # Disable overrides + item = ClockEnableItem(0, 0) + await self.start_item(item) + await self.finish_item(item) + + # Wait + await Timer(20 * period, "ns") + + # Enable clock override + item = ClockEnableItem(1, 0) + await self.start_item(item) + await self.finish_item(item) + + # Wait + await Timer(20 * period, "ns") + + # Disable overrides + item = ClockEnableItem(0, 0) + await self.start_item(item) + await self.finish_item(item) + + # Wait + await Timer(20 * period, "ns") + + # Enable clock override + item = ClockEnableItem(0, 1) + await self.start_item(item) + await self.finish_item(item) + + # Wait + await Timer(20 * period, "ns") + + # Disable overrides + item = ClockEnableItem(0, 0) + await self.start_item(item) + await self.finish_item(item) + + # Wait + await Timer(20 * period, "ns") + + +# ============================================================================= + + +class TestEnv(BaseEnv): + """ + Test environment + """ + + def build_phase(self): + super().build_phase() + + # Sequencers + self.clken_seqr = uvm_sequencer("clken_seqr", self) + + # Clock enable driver and monitor + self.clken_drv = ClkenDriver("clken_drv", self, uut=cocotb.top) + self.clken_mon = ClkenMonitor("clken_mon", self, uut=cocotb.top) + + # Clock monitor + self.clk_mon = ClockMonitor("clk_mon", self, uut=cocotb.top) + + # Add scoreboard + self.scoreboard = Scoreboard("scoreboard", self) + + def connect_phase(self): + super().connect_phase() + self.clken_drv.seq_item_port.connect(self.clken_seqr.seq_item_export) + self.clken_mon.ap.connect(self.scoreboard.fifo.analysis_export) + self.clk_mon.ap.connect(self.scoreboard.fifo.analysis_export) + + +@pyuvm.test() +class TestClockEnable(BaseTest): + """ + A test that checks forcing clock gates open + """ + + def __init__(self, name, parent): + super().__init__(name, parent, TestEnv) + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = TestSequence.create("stimulus") + + async def run(self): + await self.seq.start(self.env.clken_seqr) diff --git a/verification/block/pic/test_config.py b/verification/block/pic/test_config.py new file mode 100644 index 00000000000..4c763eb4847 --- /dev/null +++ b/verification/block/pic/test_config.py @@ -0,0 +1,163 @@ +# Copyright (c) 2023 Antmicro +# SPDX-License-Identifier: Apache-2.0 +import random + +import pyuvm +from pyuvm import * +from testbench import BaseEnv, BaseTest, BusReadItem, BusWriteItem, RegisterMap + +# ============================================================================= + + +class TestSequence(uvm_sequence): + """ + A sequence of randomized register and content writes followed by randomized + reads of them. + """ + + def __init__(self, name): + super().__init__(name) + self.reg_map = RegisterMap() + + async def body(self): + num_itr = ConfigDB().get(None, "", "TEST_ITERATIONS") + max_pri = ConfigDB().get(None, "", "PIC_NUM_PRIORITIES") + + for i in range(num_itr): + # Issue register writes + names = list(self.reg_map.reg.keys()) + random.shuffle(names) + + written = [] + + for name in names: + reg = self.reg_map.reg[name] + val = None + + if name.startswith("meipl"): + val = random.randint(0, max_pri) + elif name.startswith("meigwctrl"): + val = random.randint(0, 3) # 2-bit combinations + elif name.startswith("meie"): + val = random.randint(0, 1) # 1-bit combinations + + if val is None: + continue + + item = BusWriteItem(reg, val) + await self.start_item(item) + await self.finish_item(item) + + written.append(reg) + + # Issue register reads for the written ones + random.shuffle(written) + + for reg in written: + item = BusReadItem(reg) + await self.start_item(item) + await self.finish_item(item) + + +class Scoreboard(uvm_component): + """ + A scoreboard that keeps track of data written to registers and compares + it with data read afterwards. + """ + + def __init__(self, name, parent): + super().__init__(name, parent) + + self.passed = None + self.reg_map = RegisterMap() + self.reg_content = dict() + + def build_phase(self): + self.fifo = uvm_tlm_analysis_fifo("fifo", self) + self.port = uvm_get_port("port", self) + + def connect_phase(self): + self.port.connect(self.fifo.get_export) + + def check_phase(self): + while self.port.can_get(): + # Get an item + got_item, item = self.port.try_get() + assert got_item + + # Initially pass + if self.passed is None: + self.passed = True + + # Bus write + if isinstance(item, BusWriteItem): + self.reg_content[item.addr] = item.data + + # Bus read + elif isinstance(item, BusReadItem): + # Get register name + reg_name = "0x{:08X}".format(item.addr) + name = self.reg_map.adr.get(item.addr) + if name is not None: + reg_name += " '{}'".format(name) + + # No entry + golden = self.reg_content.get(item.addr) + if golden is None: + self.logger.error("Register {} was not written".format(reg_name)) + self.passed = False + + # Mismatch + elif golden != item.data: + self.logger.error( + "Register {} content mismatch, is 0x{:08X} should be 0x{:08X}".format( + reg_name, item.data, golden + ) + ) + self.passed = False + else: + self.logger.debug( + "Register {} ok, 0x{:08X}".format( + reg_name, + item.data, + ) + ) + + def final_phase(self): + if not self.passed: + self.logger.critical("{} reports a failure".format(type(self))) + assert False + + +# ============================================================================== + + +class TestEnv(BaseEnv): + def build_phase(self): + super().build_phase() + + # Add scoreboard + self.scoreboard = Scoreboard("scoreboard", self) + + def connect_phase(self): + super().connect_phase() + + # Connect monitors + self.reg_mon.ap.connect(self.scoreboard.fifo.analysis_export) + + +@pyuvm.test() +class TestConfig(BaseTest): + """ + A test for PIC configuration register access + """ + + def __init__(self, name, parent): + super().__init__(name, parent, TestEnv) + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = TestSequence.create("stimulus") + + async def run(self): + await self.seq.start(self.env.reg_seqr) diff --git a/verification/block/pic/test_prioritization.py b/verification/block/pic/test_prioritization.py new file mode 100644 index 00000000000..f4e3699a6a2 --- /dev/null +++ b/verification/block/pic/test_prioritization.py @@ -0,0 +1,315 @@ +# Copyright (c) 2023 Antmicro +# SPDX-License-Identifier: Apache-2.0 +import random + +import cocotb +import pyuvm +from cocotb.triggers import ClockCycles +from pyuvm import * +from testbench import ( + BaseEnv, + BaseTest, + BusReadItem, + BusWriteItem, + ClaimItem, + IrqItem, + PrioLvlItem, + PriorityPredictor, + PrioThrItem, + RegisterMap, +) + +# ============================================================================= + + +class TestSequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + self.reg_seqr = ConfigDB().get(None, "", "REG_SEQR") + self.pri_seqr = ConfigDB().get(None, "", "PRI_SEQR") + self.irq_seqr = ConfigDB().get(None, "", "IRQ_SEQR") + + self.regs = RegisterMap() + + async def body(self): + num_irq = ConfigDB().get(None, "", "PIC_NUM_INTERRUPTS") + max_pri = ConfigDB().get(None, "", "PIC_NUM_PRIORITIES") + num_itr = ConfigDB().get(None, "", "TEST_ITERATIONS") + ena_prob = ConfigDB().get(None, "", "TEST_IRQ_ENA_PROB") + irq_prob = ConfigDB().get(None, "", "TEST_IRQ_REQ_PROB") + + # Basic PIC config + item = BusWriteItem(self.regs.reg["mpiccfg"], random.randint(0, 1)) + await self.reg_seqr.start_item(item) + await self.reg_seqr.finish_item(item) + + for i in range(num_itr): + # Randomize priorities + for i in range(1, num_irq): + reg = self.regs.reg["meipl{}".format(i)] + val = random.randint(0, max_pri) + + item = BusWriteItem(reg, val) + await self.reg_seqr.start_item(item) + await self.reg_seqr.finish_item(item) + + # Randomize enables + for i in range(1, num_irq): + reg = self.regs.reg["meie{}".format(i)] + val = int(random.random() < ena_prob) + + item = BusWriteItem(reg, val) + await self.reg_seqr.start_item(item) + await self.reg_seqr.finish_item(item) + + # Configure gateways + for i in range(1, num_irq): + reg = self.regs.reg["meigwctrl{}".format(i)] + val = 0x2 # Edge, active-high + + item = BusWriteItem(reg, val) + await self.reg_seqr.start_item(item) + await self.reg_seqr.finish_item(item) + + # Randomize current priority and threshold + lvl = random.randint(0, max_pri) + thr = random.randint(0, max_pri) + + item = PrioLvlItem(lvl) + await self.pri_seqr.start_item(item) + await self.pri_seqr.finish_item(item) + + item = PrioThrItem(thr) + await self.pri_seqr.start_item(item) + await self.pri_seqr.finish_item(item) + + # Wait + await ClockCycles(cocotb.top.clk, 4) + + # Randomize IRQ + irqs = 0 + while irqs == 0: + for i in range(1, num_irq): + if random.random() > irq_prob: + irqs |= 1 << i + + item = IrqItem(irqs) + await self.irq_seqr.start_item(item) + await self.irq_seqr.finish_item(item) + + # Wait + await ClockCycles(cocotb.top.clk, 2) + + # Clear IRQ + item = IrqItem(0) + await self.irq_seqr.start_item(item) + await self.irq_seqr.finish_item(item) + + # Wait + await ClockCycles(cocotb.top.clk, 4) + + # Clear pending gateways + for i in range(1, num_irq): + if irqs & (1 << i): + reg = self.regs.reg["meigwclr{}".format(i)] + val = 0 + + item = BusWriteItem(reg, val) + await self.reg_seqr.start_item(item) + await self.reg_seqr.finish_item(item) + + # Wait + await ClockCycles(cocotb.top.clk, 5) + + +# ============================================================================== + + +class Scoreboard(uvm_component): + def __init__(self, name, parent): + super().__init__(name, parent) + + self.passed = None + self.predictor = PriorityPredictor(self.logger) + self.regs = RegisterMap() + + def build_phase(self): + self.fifo = uvm_tlm_analysis_fifo("fifo", self) + self.port = uvm_get_port("port", self) + + def connect_phase(self): + self.port.connect(self.fifo.get_export) + + def check_phase(self): + num_irq = ConfigDB().get(None, "", "PIC_NUM_INTERRUPTS") + max_pri = ConfigDB().get(None, "", "PIC_NUM_PRIORITIES") + + pri_lvl = 0 + pri_thr = 0 + irqs = 0 + + claimid = 0 + mexintpend = 0 + mhwakeup = 0 + + while self.port.can_get(): + _, item = self.port.try_get() + + # Register write + if isinstance(item, BusWriteItem): + # Get the reg name + reg = self.regs.adr.get(item.addr) + if not reg: + self.logger.error("Unknown register address 0x{:08X}".format(item.addr)) + self.passed = False + continue + + if reg.startswith("meipl"): + s = int(reg[5:]) + self.predictor.irqs[s].priority = item.data + + if reg.startswith("meie"): + s = int(reg[4:]) + self.predictor.irqs[s].enabled = bool(item.data) + + if reg == "mpiccfg": + self.predictor.inv_order = bool(item.data) + + # Priority level + elif isinstance(item, PrioLvlItem): + pri_lvl = item.prio + + # Priority threshold + elif isinstance(item, PrioThrItem): + pri_thr = item.prio + + # IRQ + elif isinstance(item, IrqItem): + # Mark triggered interrupts + for i in range(1, num_irq): + if item.irqs & (1 << i): + self.predictor.irqs[i].triggered = True + + # Store requested irqs + if item.irqs != 0: + irqs = item.irqs + + # Interrupt claim + elif isinstance(item, ClaimItem): + claimid = item.claimid + mexintpend = item.mexintpend + mhwakeup = item.mhwakeup + + # Check only if IRQs were requested + if not irqs: + continue + irqs = 0 + + # Initially pass + if self.passed is None: + self.passed = True + + # Predict the claim + pred = self.predictor.predict() + + # Check + if claimid != pred.id: + self.logger.error( + "Interrupt mismatch, is {} should be {}".format(claimid, pred.id) + ) + self.passed = False + + # Check if the interrupt is above the current priority level + # and threshold. Check if it is signaled correctly. + else: + # Predict mexintpend + if self.predictor.inv_order: + pred_mexintpend = ( + pred.id != 0 + and pred.priority != max_pri + and pred.priority < pri_thr + and pred.priority < pri_lvl + ) + else: + pred_mexintpend = ( + pred.id != 0 + and pred.priority != 0 + and pred.priority > pri_thr + and pred.priority > pri_lvl + ) + + # Predict mhwakeup + if self.predictor.inv_order: + pred_mhwakeup = pred.id != 0 and pred.priority == 0 + else: + pred_mhwakeup = pred.id != 0 and pred.priority == max_pri + + # Check + if pred_mexintpend != mexintpend: + self.logger.error( + "Signaling mismatch, mexintpend is {} should be {}. irq {}, meicurpl={}, meipt={}".format( + bool(mexintpend), + bool(pred_mexintpend), + pred.id, + pri_lvl, + pri_thr, + ) + ) + self.passed = False + + if pred_mhwakeup != mhwakeup: + self.logger.error( + "Signaling mismatch, mhwakeup is {} should be {}. irq {}, meicurpl={}, meipt={}".format( + bool(mhwakeup), + bool(pred_mhwakeup), + pred.id, + pri_lvl, + pri_thr, + ) + ) + self.passed = False + + # Clear triggers + for i in range(1, num_irq): + self.predictor.irqs[i].triggered = False + + def final_phase(self): + if not self.passed: + self.logger.critical("{} reports a failure".format(type(self))) + assert False + + +# ============================================================================== + + +class TestEnv(BaseEnv): + def build_phase(self): + super().build_phase() + + # Add scoreboard + self.scoreboard = Scoreboard("scoreboard", self) + + def connect_phase(self): + super().connect_phase() + + # Connect monitors + self.reg_mon.ap.connect(self.scoreboard.fifo.analysis_export) + self.pri_mon.ap.connect(self.scoreboard.fifo.analysis_export) + self.irq_mon.ap.connect(self.scoreboard.fifo.analysis_export) + self.claim_mon.ap.connect(self.scoreboard.fifo.analysis_export) + + +@pyuvm.test() +class TestPrioritization(BaseTest): + """ """ + + def __init__(self, name, parent): + super().__init__(name, parent, TestEnv) + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = TestSequence.create("stimulus") + + async def run(self): + await self.seq.start() diff --git a/verification/block/pic/test_reset.py b/verification/block/pic/test_reset.py new file mode 100644 index 00000000000..6a791f3ca90 --- /dev/null +++ b/verification/block/pic/test_reset.py @@ -0,0 +1,28 @@ +# Copyright (c) 2023 Antmicro +# SPDX-License-Identifier: Apache-2.0 + +import cocotb +import pyuvm +from testbench import BaseTest + +# ============================================================================= + + +@pyuvm.test() +class TestReset(BaseTest): + """ + A basic test that resets the DUT + """ + + async def run(self): + # Check state of DUT signals after reset + state = { + "mexintpend": 0, + "mhwakeup": 0, + "pl": 0, + "claimid": 0, + } + + for name, value in state.items(): + signal = getattr(cocotb.top, name) + assert signal.value == value, "{}={}, should be {}".format(name, signal.value, value) diff --git a/verification/block/pic/test_servicing.py b/verification/block/pic/test_servicing.py new file mode 100644 index 00000000000..cc8cb403017 --- /dev/null +++ b/verification/block/pic/test_servicing.py @@ -0,0 +1,297 @@ +# Copyright (c) 2023 Antmicro +# SPDX-License-Identifier: Apache-2.0 +import random + +import cocotb +import pyuvm +from cocotb.triggers import ClockCycles +from pyuvm import * +from testbench import ( + BaseEnv, + BaseTest, + BusReadItem, + BusWriteItem, + ClaimItem, + IrqItem, + PrioLvlItem, + PriorityPredictor, + PrioThrItem, + RegisterMap, +) + +# ============================================================================= + + +class TestSequence(uvm_sequence): + def __init__(self, name): + super().__init__(name) + + self.reg_seqr = ConfigDB().get(None, "", "REG_SEQR") + self.pri_seqr = ConfigDB().get(None, "", "PRI_SEQR") + self.irq_seqr = ConfigDB().get(None, "", "IRQ_SEQR") + + self.regs = RegisterMap() + + async def body(self): + num_irq = ConfigDB().get(None, "", "PIC_NUM_INTERRUPTS") + max_pri = ConfigDB().get(None, "", "PIC_NUM_PRIORITIES") + num_itr = ConfigDB().get(None, "", "TEST_ITERATIONS") + ena_prob = ConfigDB().get(None, "", "TEST_IRQ_ENA_PROB") + + # Basic PIC config + item = BusWriteItem(self.regs.reg["mpiccfg"], 0) + await self.reg_seqr.start_item(item) + await self.reg_seqr.finish_item(item) + + predictor = PriorityPredictor() + enabled_irqs = list() + + for i in range(num_itr): + # Randomize priorities + for i in range(1, num_irq): + reg = self.regs.reg["meipl{}".format(i)] + val = random.randint(0, max_pri) + + predictor.irqs[i].priority = val + + item = BusWriteItem(reg, val) + await self.reg_seqr.start_item(item) + await self.reg_seqr.finish_item(item) + + # Randomize enables + for i in range(1, num_irq): + reg = self.regs.reg["meie{}".format(i)] + val = int(random.random() < ena_prob) + + predictor.irqs[i].enabled = val + + if val: + enabled_irqs.append(i) + + item = BusWriteItem(reg, val) + await self.reg_seqr.start_item(item) + await self.reg_seqr.finish_item(item) + + # Configure gateways + for i in range(1, num_irq): + reg = self.regs.reg["meigwctrl{}".format(i)] + val = 0x2 # Edge, active-high + + item = BusWriteItem(reg, val) + await self.reg_seqr.start_item(item) + await self.reg_seqr.finish_item(item) + + # Set interrupt threshold + item = PrioThrItem(random.randint(0, max_pri)) + await self.pri_seqr.start_item(item) + await self.pri_seqr.finish_item(item) + + # Wait + await ClockCycles(cocotb.top.clk, 4) + + # Request IRQs + irqs = 0 + for i in enabled_irqs: + predictor.irqs[i].triggered = val + irqs |= 1 << i + + item = IrqItem(irqs) + await self.irq_seqr.start_item(item) + await self.irq_seqr.finish_item(item) + + # Wait + await ClockCycles(cocotb.top.clk, 2) + + # Clear IRQ + item = IrqItem(0) + await self.irq_seqr.start_item(item) + await self.irq_seqr.finish_item(item) + + # Wait + await ClockCycles(cocotb.top.clk, 4) + + # Mimic interrupt servicing + for i in range(50): # Limit iterations + # Predict the IRQ to be serviced + irq = predictor.predict() + if irq.id == 0: + break + + # Begin servicing, set meicurpl + item = PrioLvlItem(irq.priority) + await self.pri_seqr.start_item(item) + await self.pri_seqr.finish_item(item) + + # Servicing period + await ClockCycles(cocotb.top.clk, 5) + + # Finish servicing, set meicurpl to 0 + item = PrioLvlItem(0) + await self.pri_seqr.start_item(item) + await self.pri_seqr.finish_item(item) + + # Clear pending + reg = self.regs.reg["meigwclr{}".format(irq.id)] + val = 0 + + item = BusWriteItem(reg, val) + await self.reg_seqr.start_item(item) + await self.reg_seqr.finish_item(item) + + predictor.irqs[irq.id].triggered = False + + await ClockCycles(cocotb.top.clk, 4) + + +# ============================================================================== + + +class Scoreboard(uvm_component): + def __init__(self, name, parent): + super().__init__(name, parent) + + self.passed = None + self.predictor = PriorityPredictor(self.logger) + self.regs = RegisterMap() + + def build_phase(self): + self.fifo = uvm_tlm_analysis_fifo("fifo", self) + self.port = uvm_get_port("port", self) + + def connect_phase(self): + self.port.connect(self.fifo.get_export) + + def check_phase(self): + num_irq = ConfigDB().get(None, "", "PIC_NUM_INTERRUPTS") + + pri_thr = 0 + + irq_order = [] + + while self.port.can_get(): + _, item = self.port.try_get() + + # Register write + if isinstance(item, BusWriteItem): + # Get the reg name + reg = self.regs.adr.get(item.addr) + if not reg: + self.logger.error("Unknown register address 0x{:08X}".format(item.addr)) + self.passed = False + continue + + if reg.startswith("meipl"): + s = int(reg[5:]) + self.predictor.irqs[s].priority = item.data + + if reg.startswith("meie"): + s = int(reg[4:]) + self.predictor.irqs[s].enabled = bool(item.data) + + # Priority threshold + elif isinstance(item, PrioThrItem): + pri_thr = item.prio + + # IRQ + elif isinstance(item, IrqItem): + # Nothing triggered + if not item.irqs: + continue + + # Mark triggered interrupts + for i in range(1, num_irq): + if item.irqs & (1 << i): + self.predictor.irqs[i].triggered = True + + # Predict the order of interrupt servicing + for i in range(50): # Limit iterations + # Predict the IRQ to be serviced + irq = self.predictor.predict() + if irq.id == 0: + break + + irq_order.append(irq.id) + + # Clear pending + self.predictor.irqs[irq.id].triggered = False + + self.logger.debug("Interrupt order: {}".format(irq_order)) + + # Interrupt claim + elif isinstance(item, ClaimItem): + # Not waiting for any interrupt + if not irq_order: + continue + + # Initially pass + if self.passed is None: + self.passed = True + + self.logger.debug( + "Servicing {}, mexintpend={}".format( + item.claimid, + item.mexintpend, + ) + ) + + # check id + if item.claimid != irq_order[0]: + self.logger.error( + "Incorrect interrupt servicing order, claimed {} should be {}".format( + item.claimid, irq_order[0] + ) + ) + self.passed = False + + # mexintpend must be set + if not item.mexintpend and item.claimpl > pri_thr: + self.logger.error("Interrupt not reported to the core") + self.passed = False + + # Remove the serviced id + irq_order = irq_order[1:] + + # Check if all interrupts were services + if irq_order: + self.logger.error("Interrupts {} were not serviced".format(irq_order)) + self.passed = False + + def final_phase(self): + if not self.passed: + self.logger.critical("{} reports a failure".format(type(self))) + assert False + + +# ============================================================================== + + +class TestEnv(BaseEnv): + def build_phase(self): + super().build_phase() + + # Add scoreboard + self.scoreboard = Scoreboard("scoreboard", self) + + def connect_phase(self): + super().connect_phase() + + # Connect monitors + self.reg_mon.ap.connect(self.scoreboard.fifo.analysis_export) + self.pri_mon.ap.connect(self.scoreboard.fifo.analysis_export) + self.irq_mon.ap.connect(self.scoreboard.fifo.analysis_export) + self.claim_mon.ap.connect(self.scoreboard.fifo.analysis_export) + + +@pyuvm.test() +class TestServicing(BaseTest): + """ """ + + def __init__(self, name, parent): + super().__init__(name, parent, TestEnv) + + def end_of_elaboration_phase(self): + super().end_of_elaboration_phase() + self.seq = TestSequence.create("stimulus") + + async def run(self): + await self.seq.start() diff --git a/verification/block/pic/testbench.py b/verification/block/pic/testbench.py new file mode 100644 index 00000000000..e92feb83448 --- /dev/null +++ b/verification/block/pic/testbench.py @@ -0,0 +1,656 @@ +# +# Copyright (c) 2023 Antmicro +# SPDX-License-Identifier: Apache-2.0 + +import os + +import pyuvm +from cocotb.clock import Clock +from cocotb.triggers import ClockCycles, FallingEdge, RisingEdge +from pyuvm import * + +# ============================================================================== + + +class RegisterMap: + """ + Map of PIC memory-mapped registers + """ + + def __init__(self, max_irqs=32, base_addr=0xF00C0000): + self.reg = dict() + self.adr = dict() + + self.add_reg("mpiccfg", base_addr + 0x3000) + + for s in range(1, max_irqs): + name = "meipl{}".format(s) + addr = base_addr + 4 * s + self.add_reg(name, addr) + + for x in range(0, max_irqs // 32): + name = "meip{}".format(x) + addr = base_addr + 0x1000 + 4 * x + self.add_reg(name, addr) + + for s in range(1, max_irqs): + name = "meie{}".format(s) + addr = base_addr + 0x2000 + 4 * s + self.add_reg(name, addr) + + for s in range(1, max_irqs): + name = "meigwctrl{}".format(s) + addr = base_addr + 0x4000 + 4 * s + self.add_reg(name, addr) + + for s in range(1, max_irqs): + name = "meigwclr{}".format(s) + addr = base_addr + 0x5000 + 4 * s + self.add_reg(name, addr) + + def add_reg(self, name, addr): + self.reg[name] = addr + self.adr[addr] = name + + +# ============================================================================== + + +class BusWriteItem(uvm_sequence_item): + """ + A generic data bus write request / response + """ + + def __init__(self, addr, data): + super().__init__("BusWriteItem") + self.addr = addr + self.data = data + + def randomize(self): + pass + + +class BusReadItem(uvm_sequence_item): + """ + A generic data bus read request / response + """ + + def __init__(self, addr, data=None): + super().__init__("BusReadItem") + self.addr = addr + self.data = data + + def randomize(self): + pass + + +class PrioLvlItem(uvm_sequence_item): + def __init__(self, prio): + super().__init__("PrioLvlItem") + self.prio = prio + + +class PrioThrItem(uvm_sequence_item): + def __init__(self, prio): + super().__init__("PrioThrItem") + self.prio = prio + + +class IrqItem(uvm_sequence_item): + def __init__(self, irqs): + super().__init__("IrqItem") + self.irqs = irqs + + +class ClaimItem(uvm_sequence_item): + def __init__(self, claimid, claimpl, mexintpend, mhwakeup): + super().__init__("ClaimItem") + self.claimid = claimid + self.claimpl = claimpl + + self.mexintpend = mexintpend + self.mhwakeup = mhwakeup + + +class WaitItem(uvm_sequence_item): + """ + A generic wait item. Used to instruct a driver to wait N cycles + """ + + def __init__(self, cycles): + super().__init__("WaitItem") + self.cycles = cycles + + def randomize(self): + pass + + +# ============================================================================== + + +def collect_signals(signals, uut, obj): + """ + Collects signal objects from UUT and attaches them to the given object + """ + + for sig in signals: + if hasattr(uut, sig): + s = getattr(uut, sig) + + else: + s = None + logging.error("Module {} does not have a signal '{}'".format(str(uut), sig)) + + setattr(obj, sig, s) + + +# ============================================================================== + + +class RegisterBfm: + """ + A BFM for the PIC configuration (registers) interface. + """ + + SIGNALS = [ + "picm_rden", + "picm_rdaddr", + "picm_rd_data", + "picm_wren", + "picm_wraddr", + "picm_wr_data", + "picm_mken", + ] + + def __init__(self, uut, clk): + # Collect signals + collect_signals(self.SIGNALS, uut, self) + + # Get the clock + obj = getattr(uut, clk) + setattr(self, "picm_clk", obj) + + async def read(self, addr): + """ + Reads a register + """ + + await RisingEdge(self.picm_clk) + + self.picm_rdaddr.value = addr + self.picm_rden.value = 1 + self.picm_mken.value = 0 + + await RisingEdge(self.picm_clk) + + self.picm_rden.value = 0 + + await FallingEdge(self.picm_clk) + + data = self.picm_rd_data.value + + return data + + async def write(self, addr, data): + """ + Writes a register + """ + + await RisingEdge(self.picm_clk) + + self.picm_wraddr.value = addr + self.picm_wr_data.value = data + self.picm_wren.value = 1 + self.picm_mken.value = 0 + + await RisingEdge(self.picm_clk) + + self.picm_wren.value = 0 + + +class RegisterDriver(uvm_driver): + """ + Configuration (register) interface driver + """ + + def __init__(self, *args, **kwargs): + self.bfm = kwargs["bfm"] + del kwargs["bfm"] + super().__init__(*args, **kwargs) + + async def run_phase(self): + while True: + it = await self.seq_item_port.get_next_item() + + if isinstance(it, BusWriteItem): + await self.bfm.write(it.addr, it.data) + elif isinstance(it, BusReadItem): + it.data = await self.bfm.read(it.addr) + elif isinstance(it, WaitItem): + await ClockCycles(self.bfm.picm_clk) + else: + raise RuntimeError("Unknown item '{}'".format(type(it))) + + self.seq_item_port.item_done() + + +class RegisterMonitor(uvm_component): + """ + Configuration (register) interface monitor + """ + + def __init__(self, *args, **kwargs): + self.bfm = kwargs["bfm"] + del kwargs["bfm"] + super().__init__(*args, **kwargs) + + def build_phase(self): + self.ap = uvm_analysis_port("ap", self) + + async def run_phase(self): + while True: + await RisingEdge(self.bfm.picm_clk) + + # Read + if self.bfm.picm_rden.value: + addr = int(self.bfm.picm_rdaddr.value) + + await FallingEdge(self.bfm.picm_clk) + + data = int(self.bfm.picm_rd_data.value) + self.logger.debug("read 0x{:08X} -> 0x{:08X}".format(addr, data)) + self.ap.write(BusReadItem(addr, data)) + + # Write + if self.bfm.picm_wren.value: + addr = int(self.bfm.picm_wraddr.value) + data = int(self.bfm.picm_wr_data.value) + self.logger.debug("write 0x{:08X} <- 0x{:08X}".format(addr, data)) + self.ap.write(BusWriteItem(addr, data)) + + +# ============================================================================== + + +class PrioDriver(uvm_driver): + """ + A driver for priority and priority threshold inputs of the PIC + """ + + SIGNALS = [ + "meicurpl", + "meipt", + ] + + def __init__(self, *args, **kwargs): + uut = kwargs["uut"] + del kwargs["uut"] + + super().__init__(*args, **kwargs) + + collect_signals(self.SIGNALS, uut, self) + + async def run_phase(self): + while True: + it = await self.seq_item_port.get_next_item() + + if isinstance(it, PrioLvlItem): + self.meicurpl.value = it.prio + elif isinstance(it, PrioThrItem): + self.meipt.value = it.prio + else: + raise RuntimeError("Unknown item '{}'".format(type(it))) + + self.seq_item_port.item_done() + + +class PrioMonitor(uvm_component): + """ + A monitor for priority and priority threshold of the PIC + """ + + SIGNALS = [ + "clk", + "meicurpl", + "meipt", + ] + + def __init__(self, *args, **kwargs): + uut = kwargs["uut"] + del kwargs["uut"] + + super().__init__(*args, **kwargs) + + collect_signals(self.SIGNALS, uut, self) + + self.prev_meicurpl = None + self.prev_meipt = None + + def build_phase(self): + self.ap = uvm_analysis_port("ap", self) + + async def run_phase(self): + while True: + # Even though the signals are not registered sample them on + # rising clock edge + await RisingEdge(self.clk) + + # Sample signals + curr_meicurpl = int(self.meicurpl.value) + curr_meipt = int(self.meipt.value) + + # Send an item in case of a change + if self.prev_meicurpl != curr_meicurpl: + self.ap.write(PrioLvlItem(curr_meicurpl)) + if self.prev_meipt != curr_meipt: + self.ap.write(PrioThrItem(curr_meipt)) + + self.prev_meicurpl = curr_meicurpl + self.prev_meipt = curr_meipt + + +# ============================================================================== + + +class IrqDriver(uvm_driver): + """ + A driver for interrupt requests + """ + + SIGNALS = [ + "extintsrc_req", + ] + + def __init__(self, *args, **kwargs): + uut = kwargs["uut"] + del kwargs["uut"] + + super().__init__(*args, **kwargs) + + collect_signals(self.SIGNALS, uut, self) + + async def run_phase(self): + while True: + it = await self.seq_item_port.get_next_item() + + if isinstance(it, IrqItem): + self.extintsrc_req.value = it.irqs + else: + raise RuntimeError("Unknown item '{}'".format(type(it))) + + self.seq_item_port.item_done() + + +class IrqMonitor(uvm_component): + """ + A monitor for interrupt requests + """ + + SIGNALS = [ + "clk", + "extintsrc_req", + ] + + def __init__(self, *args, **kwargs): + uut = kwargs["uut"] + del kwargs["uut"] + + super().__init__(*args, **kwargs) + + collect_signals(self.SIGNALS, uut, self) + + self.prev_irqs = None + + def build_phase(self): + self.ap = uvm_analysis_port("ap", self) + + async def run_phase(self): + while True: + # Sample signals + await RisingEdge(self.clk) + curr_irqs = int(self.extintsrc_req.value) + + # Send an item in case of a change + if self.prev_irqs != curr_irqs: + self.ap.write(IrqItem(curr_irqs)) + + self.prev_irqs = curr_irqs + + +# ============================================================================== + + +class ClaimMonitor(uvm_component): + SIGNALS = [ + "clk", + "extintsrc_req", + "picm_wren", + "claimid", + "pl", + "mexintpend", + "mhwakeup", + ] + + def __init__(self, *args, **kwargs): + uut = kwargs["uut"] + del kwargs["uut"] + + super().__init__(*args, **kwargs) + + collect_signals(self.SIGNALS, uut, self) + + self.prev_irqs = 0 + self.prev_wren = 0 + + def build_phase(self): + self.ap = uvm_analysis_port("ap", self) + + async def run_phase(self): + while True: + # Sample control signals + await RisingEdge(self.clk) + irqs = int(self.extintsrc_req.value) + wren = int(self.picm_wren.value) + + is_wren = wren and not self.prev_wren # Rising edge of wren + is_irqs = irqs & ~self.prev_irqs # Rising edge of any IRQ + + if is_wren or is_irqs: + # Sample signals after a delay to give PIC time to react. + # It was observed that in the simulation that the wait must + # be at least 3 clock cycles long. + await ClockCycles(self.clk, 3) + + claimid = int(self.claimid.value) + claimpl = int(self.pl.value) + mexintpend = int(self.mexintpend.value) + mhwakeup = int(self.mhwakeup.value) + + self.ap.write(ClaimItem(claimid, claimpl, mexintpend, mhwakeup)) + + self.prev_irqs = irqs + self.prev_wren = wren + + +# ============================================================================== + + +class PriorityPredictor: + class Irq: + """ + Interrupt request state + """ + + def __init__(self, n): + self.id = n + self.priority = 0 + self.enabled = False + self.triggered = False + + def __str__(self): + return "id={:3d} en={} pri={:2d} trg={}".format( + self.id, + int(self.enabled), + self.priority, + int(self.triggered), + ) + + def __repr__(self): + return str(self) + + def __init__(self, logger=None): + self.inv_order = False + self.irqs = {i: self.Irq(i) for i in range(1, 32)} + self.logger = logger + + if self.logger is None: + self.logger = uvm_root().logger + + def predict(self): + # Dump IRQs + self.logger.debug("IRQs:") + keys = sorted(list(self.irqs)) + for k in keys: + self.logger.debug(" " + str(self.irqs[k])) + + # Filter only enabled and triggered + irqs = {k: v for k, v in self.irqs.items() if v.enabled and v.triggered} + + # Get the highest priority + pred = None + for irq in irqs.values(): + # Skip priority 0 or 15 + if self.inv_order: + if irq.priority == 15: + continue + else: + if irq.priority == 0: + continue + + # Find max priority and min id + if pred is None: + pred = irq + else: + if self.inv_order: + if irq.priority < pred.priority: + pred = irq + else: + if irq.priority > pred.priority: + pred = irq + + if irq.priority == pred.priority: + if irq.id < pred.id: + pred = irq + + if pred is None: + return self.Irq(0) + + self.logger.debug("pred:") + self.logger.debug(" " + str(pred)) + + return pred + + +# ============================================================================== + + +class BaseEnv(uvm_env): + """ + Base PyUVM test environment + """ + + def build_phase(self): + # Config + ConfigDB().set(None, "*", "PIC_NUM_INTERRUPTS", 32) + ConfigDB().set(None, "*", "PIC_NUM_PRIORITIES", 15) + + ConfigDB().set(None, "*", "TEST_CLK_PERIOD", 1) + ConfigDB().set(None, "*", "TEST_ITERATIONS", 50) + ConfigDB().set(None, "*", "TEST_IRQ_ENA_PROB", 0.75) + ConfigDB().set(None, "*", "TEST_IRQ_REQ_PROB", 0.90) + + # Sequencers + self.reg_seqr = uvm_sequencer("reg_seqr", self) + self.pri_seqr = uvm_sequencer("pri_seqr", self) + self.irq_seqr = uvm_sequencer("irq_seqr", self) + + # Register interface + bfm = RegisterBfm(cocotb.top, "clk") + self.reg_drv = RegisterDriver("reg_drv", self, bfm=bfm) + self.reg_mon = RegisterMonitor("reg_mon", self, bfm=bfm) + + # Current priority and priority threshold interface + self.pri_drv = PrioDriver("pri_drv", self, uut=cocotb.top) + self.pri_mon = PrioMonitor("pri_mon", self, uut=cocotb.top) + + # Interrupt request + self.irq_drv = IrqDriver("irq_drv", self, uut=cocotb.top) + self.irq_mon = IrqMonitor("irq_mon", self, uut=cocotb.top) + + # Interrupt claim monitor + self.claim_mon = ClaimMonitor("claim_mon", self, uut=cocotb.top) + + ConfigDB().set(None, "*", "REG_SEQR", self.reg_seqr) + ConfigDB().set(None, "*", "PRI_SEQR", self.pri_seqr) + ConfigDB().set(None, "*", "IRQ_SEQR", self.irq_seqr) + + def connect_phase(self): + self.reg_drv.seq_item_port.connect(self.reg_seqr.seq_item_export) + self.pri_drv.seq_item_port.connect(self.pri_seqr.seq_item_export) + self.irq_drv.seq_item_port.connect(self.irq_seqr.seq_item_export) + + +# ============================================================================== + + +class BaseTest(uvm_test): + """ + Base test for the module + """ + + def __init__(self, name, parent, env_class=BaseEnv): + super().__init__(name, parent) + self.env_class = env_class + + # Synchronize pyuvm logging level with cocotb logging level. + level = logging.getLevelName(os.environ.get("COCOTB_LOG_LEVEL", "INFO")) + uvm_report_object.set_default_logging_level(level) + + def build_phase(self): + self.env = self.env_class("env", self) + + def start_clock(self, name): + period = ConfigDB().get(None, "", "TEST_CLK_PERIOD") + sig = getattr(cocotb.top, name) + clock = Clock(sig, period, units="ns") + cocotb.start_soon(clock.start(start_high=False)) + + async def do_reset(self): + cocotb.top.rst_l.value = 0 + await ClockCycles(cocotb.top.clk, 2) + await FallingEdge(cocotb.top.clk) + cocotb.top.rst_l.value = 1 + + async def run_phase(self): + self.raise_objection() + + # Start clocks + self.start_clock("clk") + self.start_clock("free_clk") + + # Issue reset + await self.do_reset() + + # Wait some cycles + await ClockCycles(cocotb.top.clk, 2) + + # Run the actual test + await self.run() + + # Wait some cycles + await ClockCycles(cocotb.top.clk, 10) + + self.drop_objection() + + async def run(self): + raise NotImplementedError()