Skip to content

Commit

Permalink
feat(cocotbext): add gRPC servicer and server
Browse files Browse the repository at this point in the history
  • Loading branch information
martinspinler committed Jan 9, 2025
1 parent f67f42d commit ee73098
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 163 deletions.
5 changes: 3 additions & 2 deletions python/cocotbext/cocotbext/nfb/ext/grpc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .servicer import Servicer
from .server import NfbDmaThreadedGrpcServer
from .dma import RAM

__all__ = ['Servicer']
__all__ = ["NfbDmaThreadedGrpcServer", "RAM"]
94 changes: 94 additions & 0 deletions python/cocotbext/cocotbext/nfb/ext/grpc/dma.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import queue
import logging
from threading import Event

import cocotbext.ofm.utils

import nfb.ext.protobuf.v1.dma_pb2 as dma_pb2
import nfb.ext.protobuf.v1.dma_pb2_grpc as dma_pb2_grpc


class DmaRequest():
def __init__(self, rq):
self.rq = rq
self.event = Event()

def set(self, data):
self.rq.data = data
self.event.set()

def wait(self):
self.event.wait()
return self.rq.data


class DmaServicer(dma_pb2_grpc.DmaServicer):
def __init__(self, ram):
self._log = logging.getLogger(__name__)
self._ram = ram
self._bind = False
self._req_queue = None

def RqStream(self, request_iterator, context):
if self._req_queue is not None:
self._log.warning(f"Another context on Dma.RqStream is already active, ignoring current: {context.peer()}")
return

self._log.info(f"Using new Dma.RqStream context: {context.peer()}")

self._bind = True
self._req_queue = queue.Queue(10)
self._ram.connect(self._req_queue)

context.add_callback(self._logout)

while self._bind:
req = self._req_queue.get()
if req is not None:
yield req.rq
resp = next(request_iterator)
if req.rq.type == dma_pb2.DmaOperation.DMA_READ:
req.set(resp.data)
else:
req.set(bytes())

self._req_queue = None
self._ram.close()
self._log.info(f"Closing Dma.RqStream context: {context.peer()}")

def _logout(self):
self._bind = False
if self._req_queue is not None:
self._req_queue.put(None)

def Logout(self, request_iterator, context):
self._logout()


class RAM(cocotbext.ofm.utils.RAM):
def __init__(self):
self._rq = None
self._log = logging.getLogger(__name__)

def connect(self, request_queue):
self._rq = request_queue

def close(self):
self._rq = None

def w(self, addr, data):
if self._rq is None:
self._log.error(f"Dma client for RAM access not connected: write {len(data)}B to {addr:0x}")
return

resp = DmaRequest(dma_pb2.DmaRequest(type=dma_pb2.DmaOperation.DMA_WRITE, addr=addr, nbyte=len(data), data=bytes(data)))
self._rq.put(resp)

def r(self, addr, byte_count):
if self._rq is None:
self._log.error(f"Dma client for RAM access not connected: read {byte_count}B from {addr:0x}")
return list(bytes(byte_count))

resp = DmaRequest(dma_pb2.DmaRequest(type=dma_pb2.DmaOperation.DMA_READ, addr=addr, nbyte=byte_count, data=None))
self._rq.put(resp)
return list(resp.wait())
97 changes: 97 additions & 0 deletions python/cocotbext/cocotbext/nfb/ext/grpc/nfb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import re
import logging
import queue
import cocotb

from cocotb.triggers import Timer

from threading import Event

import nfb.ext.protobuf.v1.nfb_pb2 as nfb_pb
import nfb.ext.protobuf.v1.nfb_pb2_grpc as nfb_pb_grpc


class CompRequest():
def __init__(self, rd, path, offset, nbyte, data=None):
self.rd = rd
self.path = path
self.offset = offset
self.nbyte = nbyte
self.data = data
self.event = Event()

def set(self, data):
self.data = data
self.event.set()

def wait(self):
self.event.wait()
return self.data


class NfbServicer(nfb_pb_grpc.NfbServicer):
def __init__(self, dev):
self._log = logging.getLogger(__name__)
self._dev = dev
self._events = queue.Queue()
self._requests = queue.Queue(10)

cocotb.start_soon(self._mi_req_thread())

async def _mi_req_thread(self):
timer = Timer(10, units='ns')

while True:
while self._requests.empty():
await timer

req = self._requests.get(False)

mi, base = self._comp_addr(req.path)
addr = req.offset + base
req_type = "Read" if req.rd else "Write"
self._log.debug(f"{req_type:<5}: size: {req.nbyte:>4}, offset: {hex(req.offset):>8}, base: {hex(base):>10} {req.path}")

if req.rd:
data = await mi.read(addr, req.nbyte)
else:
data = await mi.write(addr, req.data)

req.set(data)

def _comp_addr(self, path):
node = self._dev.nfb.fdt.get_node(path)
base = node.get_property("reg")[0]

p = node.parent
while p:
compatible = p.get_property("compatible")
if compatible and compatible.value == "netcope,bus,mi":
m = re.search(r'PCI(?P<pci>\d+),BAR(?P<bar>\d+)', p.get_property("resource").value)
pci, _ = int(m.group('pci')), int(m.group('bar'))
mi = self._dev.mi[pci]
break
p = p.parent

return mi, base

def resp_force(self):
while not self._events.empty():
self._events.get(False).set()

def GetFdt(self, req, context):
return nfb_pb.FdtResponse(fdt=bytes(self._dev.nfb.fdt.to_dtb()))

def ReadComp(self, req, context):
req = CompRequest(True, req.path, req.offset, req.nbyte)
self._requests.put(req)
self._events.put(req.event)
data = req.wait()
self._events.get(False)
return nfb_pb.ReadCompResponse(status=0, data=data)

def WriteComp(self, req, context):
req = CompRequest(False, req.path, req.offset, req.nbyte, req.data)
self._requests.put(req)
#_ = req.wait()
return nfb_pb.WriteCompResponse(status=0)
58 changes: 58 additions & 0 deletions python/cocotbext/cocotbext/nfb/ext/grpc/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import time
import logging
import socket

import grpc
import cocotb

from concurrent import futures
from threading import Thread

from .nfb import NfbServicer
from .dma import DmaServicer

import nfb.ext.protobuf.v1.nfb_pb2_grpc as nfb_pb_grpc
import nfb.ext.protobuf.v1.dma_pb2_grpc as dma_pb_grpc


class NfbDmaThreadedGrpcServer:
def __init__(self, ram, dev, addr="127.0.0.1", port=50051):
super().__init__()
self._log = logging.getLogger(__name__)

self._port = port

self._mi_reciver = NfbServicer(dev)
self._dma_reciver = DmaServicer(ram)

self._server = grpc.server(futures.ThreadPoolExecutor())
self._server.add_insecure_port(f"{addr}:{port}")
nfb_pb_grpc.add_NfbServicer_to_server(self._mi_reciver, self._server)
dma_pb_grpc.add_DmaServicer_to_server(self._dma_reciver, self._server)

self._thread = Thread(target=self._run)
self._thread_terminate = False

def _run(self):
self._server.start()

while not cocotb.regression_manager._tearing_down and not self._thread_terminate:
time.sleep(0.1)

self._dma_reciver._logout()
self._mi_reciver.resp_force()
self._server.stop(2.0)
self._server.wait_for_termination()

def start(self):
self._thread.start()
self._log.info(f"gRPC server started, listening on {self._port}. Device string: libnfb-ext-grpc.so:grpc+dma_vas:{socket.gethostname()}:{self._port}")

def close(self):
self._thread_terminate = True

def __enter__(self):
self.start()

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
74 changes: 0 additions & 74 deletions python/cocotbext/cocotbext/nfb/ext/grpc/servicer.py

This file was deleted.

Loading

0 comments on commit ee73098

Please sign in to comment.