Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make packetio its own RPC service #390

Merged
merged 3 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dataplane/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//dataplane/dplaneopts",
"//dataplane/proto/packetio",
"//dataplane/proto/sai",
"//dataplane/saiserver",
"//dataplane/saiserver/attrmgr",
"//dataplane/standalone/pkthandler/pktiohandler",
"//gnmi/oc",
"//gnmi/reconciler",
"//proto/forwarding",
"@com_github_openconfig_gnmi//proto/gnmi",
"@com_github_openconfig_ygnmi//ygnmi",
"@org_golang_google_grpc//:go_default_library",
Expand Down
3 changes: 0 additions & 3 deletions dataplane/forwarding/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ go_library(
deps = [
"//dataplane/forwarding/fwdaction",
"//dataplane/forwarding/fwdaction/actions",
"//dataplane/forwarding/fwdconfig",
"//dataplane/forwarding/fwdport",
"//dataplane/forwarding/fwdport/ports",
"//dataplane/forwarding/fwdtable",
"//dataplane/forwarding/fwdtable/action",
"//dataplane/forwarding/fwdtable/bridge",
Expand All @@ -38,6 +36,5 @@ go_library(
"//dataplane/forwarding/protocol/udp",
"//proto/forwarding",
"@com_github_golang_glog//:glog",
"@org_golang_google_grpc//status",
],
)
117 changes: 2 additions & 115 deletions dataplane/forwarding/fwd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@ import (
"sync"

log "github.com/golang/glog"
"google.golang.org/grpc/status"

"github.com/openconfig/lemming/dataplane/forwarding/fwdaction"
"github.com/openconfig/lemming/dataplane/forwarding/fwdconfig"
"github.com/openconfig/lemming/dataplane/forwarding/fwdport"
"github.com/openconfig/lemming/dataplane/forwarding/fwdport/ports"
"github.com/openconfig/lemming/dataplane/forwarding/fwdtable"
"github.com/openconfig/lemming/dataplane/forwarding/infra/deadlock"
"github.com/openconfig/lemming/dataplane/forwarding/infra/fwdattribute"
Expand Down Expand Up @@ -744,7 +741,7 @@ func (e *Server) FlowCounterQuery(_ context.Context, request *fwdpb.FlowCounterQ
return reply, nil
}

func (e *Server) injectPacket(contextID *fwdpb.ContextId, id *fwdpb.PortId, hid fwdpb.PacketHeaderId, frame []byte, preActions []*fwdpb.ActionDesc, debug bool, dir fwdpb.PortAction) error {
func (e *Server) InjectPacket(contextID *fwdpb.ContextId, id *fwdpb.PortId, hid fwdpb.PacketHeaderId, frame []byte, preActions []*fwdpb.ActionDesc, debug bool, dir fwdpb.PortAction) error {
timer := deadlock.NewTimer(deadlock.Timeout, fmt.Sprintf("Processing packet"))
defer timer.Stop()

Expand Down Expand Up @@ -821,7 +818,7 @@ func (e *Server) PacketInject(srv fwdpb.Forwarding_PacketInjectServer) error {
if err != nil {
return err
}
err = e.injectPacket(request.GetContextId(), request.GetPortId(), request.GetStartHeader(),
err = e.InjectPacket(request.GetContextId(), request.GetPortId(), request.GetStartHeader(),
request.GetBytes(), request.GetPreprocesses(), request.GetDebug(), request.GetAction())
if err != nil {
return err
Expand Down Expand Up @@ -939,113 +936,3 @@ func (e *Server) ObjectNID(_ context.Context, request *fwdpb.ObjectNIDRequest) (
}
return &fwdpb.ObjectNIDReply{Nid: uint64(obj.NID())}, nil
}

func (e *Server) CPUPacketStream(srv fwdpb.Forwarding_CPUPacketStreamServer) error {
init, err := srv.Recv()
if err != nil {
return err
}
fwdCtx, err := e.FindContext(init.GetContextId())
if err != nil {
return fmt.Errorf("failed to get context, err %v", err)
}

fwdCtx.RLock()
cpuPortID := ""
for _, id := range fwdCtx.Objects.IDs() {
obj, err := fwdCtx.Objects.FindID(&fwdpb.ObjectId{Id: string(id)})
if err != nil {
fwdCtx.RUnlock()
return err
}
if _, ok := obj.(*ports.CPUPort); ok {
cpuPortID = string(obj.ID())
}
}
fwdCtx.RUnlock()
if cpuPortID == "" {
return fmt.Errorf("couldn't find cpu port")
}

packetCh := make(chan *fwdpb.PacketIn)
ctx, cancel := context.WithCancel(srv.Context())

// Since Recv() is blocking and we want this func to return immediately on cancel.
// Run the Recv in a seperate goroutine.
go func() {
for {
pkt, err := srv.Recv()
if err != nil {
continue
}
packetCh <- pkt
}
}()

fn := func(po *fwdpb.PacketOut) error {
return srv.Send(po)
}

fwdCtx.Lock()
fwdCtx.SetCPUPortSink(fn, cancel)
fwdCtx.Unlock()

for {
select {
case <-ctx.Done():
return nil
case pkt := <-packetCh:
acts := []*fwdpb.ActionDesc{fwdconfig.Action(fwdconfig.UpdateAction(fwdpb.UpdateType_UPDATE_TYPE_SET, fwdpb.PacketFieldNum_PACKET_FIELD_NUM_HOST_PORT_ID).
WithUint64Value(pkt.GetPacket().GetHostPort())).Build()}
err = e.injectPacket(init.GetContextId(), &fwdpb.PortId{ObjectId: &fwdpb.ObjectId{Id: cpuPortID}}, fwdpb.PacketHeaderId_PACKET_HEADER_ID_ETHERNET,
pkt.GetPacket().GetFrame(), acts, true, fwdpb.PortAction_PORT_ACTION_INPUT)
if err != nil {
log.Warningf("inject err: %v", err)
continue
}
}
}
}

func (e *Server) HostPortControl(srv fwdpb.Forwarding_HostPortControlServer) error {
init, err := srv.Recv()
if err != nil {
return err
}
fwdCtx, err := e.FindContext(init.GetContextId())
if err != nil {
return fmt.Errorf("failed to get context, err %v", err)
}
reqCh := make(chan *fwdpb.HostPortControlMessage)
respCh := make(chan *fwdpb.HostPortControlRequest)
ctx, cancel := context.WithCancel(srv.Context())
defer close(respCh)

fn := func(msg *fwdpb.HostPortControlMessage) error {
reqCh <- msg
resp := <-respCh
return status.FromProto(resp.GetStatus()).Err()
}
fwdCtx.Lock()
fwdCtx.SetPortControl(fn, cancel)
fwdCtx.Unlock()
log.Info("initialized host port control channel")

for {
select {
case <-ctx.Done():
return nil
case req := <-reqCh:
if err := srv.Send(req); err != nil {
return err
}
log.Info("sent message to client: %+v", req)
resp, err := srv.Recv()
if err != nil {
return err
}
respCh <- resp
log.Info("received message from client: %+v", req)
}
}
}
1 change: 1 addition & 0 deletions dataplane/forwarding/fwdport/ports/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//dataplane/forwarding/infra/fwdpacket",
"//dataplane/forwarding/util/hash/crc16",
"//dataplane/forwarding/util/queue",
"//dataplane/proto/packetio",
"//internal/debug",
"//proto/forwarding",
"@com_github_golang_glog//:glog",
Expand Down
19 changes: 15 additions & 4 deletions dataplane/forwarding/fwdport/ports/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ports
import (
"encoding/binary"
"fmt"
"strconv"

log "github.com/golang/glog"

Expand All @@ -27,6 +28,7 @@ import (
"github.com/openconfig/lemming/dataplane/forwarding/infra/fwdobject"
"github.com/openconfig/lemming/dataplane/forwarding/infra/fwdpacket"
"github.com/openconfig/lemming/dataplane/forwarding/util/queue"
pktiopb "github.com/openconfig/lemming/dataplane/proto/packetio"
fwdpb "github.com/openconfig/lemming/proto/forwarding"
)

Expand Down Expand Up @@ -201,10 +203,19 @@ func (p *CPUPort) puntRemotePort(v any) {
return
}

response := &fwdpb.PacketOut{
Packet: &fwdpb.Packet{
InputPort: ingressPID,
OutputPort: egressPID,
ingressID, err := strconv.Atoi(ingressPID.GetObjectId().GetId())
if err != nil {
return
}
egressID, err := strconv.Atoi(egressPID.GetObjectId().GetId())
if err != nil {
return
}

response := &pktiopb.PacketOut{
Packet: &pktiopb.Packet{
InputPort: uint64(ingressID),
OutputPort: uint64(egressID),
HostPort: binary.BigEndian.Uint64(hostPort),
Frame: packet.Frame(),
},
Expand Down
1 change: 1 addition & 0 deletions dataplane/forwarding/infra/fwdcontext/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//dataplane/forwarding/infra/fwdattribute",
"//dataplane/forwarding/infra/fwdobject",
"//dataplane/forwarding/util/queue",
"//dataplane/proto/packetio",
"//proto/forwarding",
"@com_github_golang_glog//:glog",
"@com_github_google_gopacket//:gopacket",
Expand Down
23 changes: 2 additions & 21 deletions dataplane/forwarding/infra/fwdcontext/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/openconfig/lemming/dataplane/forwarding/infra/fwdattribute"
"github.com/openconfig/lemming/dataplane/forwarding/infra/fwdobject"
"github.com/openconfig/lemming/dataplane/forwarding/util/queue"
pktiopb "github.com/openconfig/lemming/dataplane/proto/packetio"
fwdpb "github.com/openconfig/lemming/proto/forwarding"
)

Expand Down Expand Up @@ -77,14 +78,10 @@ type Context struct {

// FakePortManager is the implementation of the port creator for the Fake port type.
FakePortManager FakePortManager
portCtl PortControl
portCtlDone func()
cpuPortSink CPUPortSink
cpuPortSinkDone func()
}

type PortControl func(*fwdpb.HostPortControlMessage) error

// New creates a new forwarding context with the specified id and fwd engine
// name. The id identifies the forwarding context in an forwarding engine
// instance, and the instance identifies the forwarding engine instance in the
Expand Down Expand Up @@ -175,7 +172,7 @@ func (ctx *Context) Notify(event *fwdpb.EventDesc) error {
return nq.Write(event)
}

type CPUPortSink func(*fwdpb.PacketOut) error
type CPUPortSink func(*pktiopb.PacketOut) error

// SetPacketSink sets the packet sink service for the context. If the packet
// sink service is not set to nil, packets are dropped.
Expand All @@ -190,18 +187,6 @@ func (ctx *Context) PacketSink() PacketCallback {
return ctx.packets
}

// SetPacketSink sets the port control service for the context
func (ctx *Context) SetPortControl(fn PortControl, doneFn func()) error {
ctx.portCtl = fn
ctx.portCtlDone = doneFn
return nil
}

// PortControl returns a handler to port control service
func (ctx *Context) PortControl() PortControl {
return ctx.portCtl
}

// SetCPUPortSink sets the port control service for the context
func (ctx *Context) SetCPUPortSink(fn CPUPortSink, doneFn func()) error {
ctx.cpuPortSink = fn
Expand All @@ -225,11 +210,7 @@ func (ctx *Context) Cleanup(ch chan bool, isPort func(*fwdpb.ObjectId) bool) {
if ctx.cpuPortSinkDone != nil {
ctx.cpuPortSinkDone()
}
if ctx.portCtlDone != nil {
ctx.portCtlDone()
}
ctx.SetCPUPortSink(nil, nil)
ctx.SetPortControl(nil, nil)

ids := ctx.Objects.IDs()

Expand Down
26 changes: 26 additions & 0 deletions dataplane/proto/packetio/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")

proto_library(
name = "packetio_proto",
srcs = ["packetio.proto"],
visibility = ["//visibility:public"],
deps = ["@googleapis//google/rpc:status_proto"],
)

go_proto_library(
name = "packetio_go_proto",
compilers = ["@io_bazel_rules_go//proto:go_grpc"],
importpath = "github.com/openconfig/lemming/dataplane/proto/packetio",
proto = ":packetio_proto",
visibility = ["//visibility:public"],
deps = ["@org_golang_google_genproto_googleapis_rpc//status"],
)

go_library(
name = "packetio",
embed = [":packetio_go_proto"],
importpath = "github.com/openconfig/lemming/dataplane/proto/packetio",
visibility = ["//visibility:public"],
)
Loading
Loading