From 2822aa0a7dabfec13732e2cf644527f502657bf8 Mon Sep 17 00:00:00 2001 From: Daniel Grau Date: Tue, 14 Jan 2025 19:09:25 +0000 Subject: [PATCH 1/3] Make cpu stream more resilient --- dataplane/forwarding/BUILD | 1 + dataplane/forwarding/fwd.go | 1 + dataplane/forwarding/fwdport/port.go | 2 +- dataplane/saiserver/BUILD | 1 - dataplane/saiserver/hostif.go | 30 ++++++++++++++++------------ 5 files changed, 20 insertions(+), 15 deletions(-) diff --git a/dataplane/forwarding/BUILD b/dataplane/forwarding/BUILD index d08366a8..fee2cac9 100644 --- a/dataplane/forwarding/BUILD +++ b/dataplane/forwarding/BUILD @@ -12,6 +12,7 @@ go_library( "//dataplane/forwarding/fwdaction", "//dataplane/forwarding/fwdaction/actions", "//dataplane/forwarding/fwdport", + "//dataplane/forwarding/fwdport/ports", "//dataplane/forwarding/fwdtable", "//dataplane/forwarding/fwdtable/action", "//dataplane/forwarding/fwdtable/bridge", diff --git a/dataplane/forwarding/fwd.go b/dataplane/forwarding/fwd.go index f73014e6..d09d2664 100644 --- a/dataplane/forwarding/fwd.go +++ b/dataplane/forwarding/fwd.go @@ -39,6 +39,7 @@ import ( // essential, and there can be more to come, importing here is more // beneficial. _ "github.com/openconfig/lemming/dataplane/forwarding/fwdaction/actions" + _ "github.com/openconfig/lemming/dataplane/forwarding/fwdport/ports" _ "github.com/openconfig/lemming/dataplane/forwarding/fwdtable/action" _ "github.com/openconfig/lemming/dataplane/forwarding/fwdtable/bridge" _ "github.com/openconfig/lemming/dataplane/forwarding/fwdtable/exact" diff --git a/dataplane/forwarding/fwdport/port.go b/dataplane/forwarding/fwdport/port.go index 724f5df6..d46afe66 100644 --- a/dataplane/forwarding/fwdport/port.go +++ b/dataplane/forwarding/fwdport/port.go @@ -259,7 +259,7 @@ func Input(port Port, packet fwdpacket.Packet, dir fwdpb.PortAction, ctx *fwdcon port.Increment(fwdpb.CounterId_COUNTER_ID_RX_NON_UCAST_PACKETS, 1) } - packet.Log().V(3).Info("input packet", "port", port.ID(), "frame", fwdpacket.IncludeFrameInLog) + packet.Log().V(1).Info("input packet", "port", port.ID(), "frame", fwdpacket.IncludeFrameInLog) state, err := fwdaction.ProcessPacket(packet, port.Actions(dir), port) if err != nil { Increment(port, packet.Length(), fwdpb.CounterId_COUNTER_ID_RX_ERROR_PACKETS, fwdpb.CounterId_COUNTER_ID_RX_ERROR_OCTETS) diff --git a/dataplane/saiserver/BUILD b/dataplane/saiserver/BUILD index d63009a5..0a745423 100644 --- a/dataplane/saiserver/BUILD +++ b/dataplane/saiserver/BUILD @@ -21,7 +21,6 @@ go_library( "//dataplane/dplaneopts", "//dataplane/forwarding", "//dataplane/forwarding/fwdconfig", - "//dataplane/forwarding/fwdport/ports", "//dataplane/forwarding/infra/fwdcontext", "//dataplane/proto/packetio", "//dataplane/proto/sai", diff --git a/dataplane/saiserver/hostif.go b/dataplane/saiserver/hostif.go index f6e5789c..0861d032 100644 --- a/dataplane/saiserver/hostif.go +++ b/dataplane/saiserver/hostif.go @@ -28,7 +28,6 @@ import ( "github.com/openconfig/lemming/dataplane/dplaneopts" "github.com/openconfig/lemming/dataplane/forwarding/fwdconfig" - "github.com/openconfig/lemming/dataplane/forwarding/fwdport/ports" "github.com/openconfig/lemming/dataplane/saiserver/attrmgr" pktiopb "github.com/openconfig/lemming/dataplane/proto/packetio" @@ -384,21 +383,22 @@ func (hostif *hostif) CPUPacketStream(srv pktiopb.PacketIO_CPUPacketStreamServer return err } - fwdCtx.RLock() + // This RPC may be called before the CPU port is ready. In such cases, it will drop the packets until it is. cpuPortID := "" - for _, id := range fwdCtx.Objects.IDs() { - obj, err := fwdCtx.Objects.FindID(&fwdpb.ObjectId{Id: string(id)}) - if err != nil { - fwdCtx.RUnlock() - return err + updateCPUPortID := func() { + if cpuPortID != "" { + return } - if _, ok := obj.(*ports.CPUPort); ok { - cpuPortID = string(obj.ID()) + attrReq := &saipb.GetSwitchAttributeRequest{ + Oid: switchID, + AttrType: []saipb.SwitchAttr{saipb.SwitchAttr_SWITCH_ATTR_CPU_PORT}, } - } - fwdCtx.RUnlock() - if cpuPortID == "" { - return fmt.Errorf("couldn't find cpu port") + resp := &saipb.GetSwitchAttributeResponse{} + if err := hostif.mgr.PopulateAttributes(attrReq, resp); err != nil { + slog.WarnContext(srv.Context(), "failed to failed cpu port", "error", err) + return + } + cpuPortID = fmt.Sprint(resp.GetAttr().GetCpuPort()) } packetCh := make(chan *pktiopb.PacketIn) @@ -429,6 +429,10 @@ func (hostif *hostif) CPUPacketStream(srv pktiopb.PacketIO_CPUPacketStreamServer case <-ctx.Done(): return nil case pkt := <-packetCh: + updateCPUPortID() + if cpuPortID == "" { + continue + } slog.Debug("received packet", "packet", pkt.GetPacket().GetFrame()) acts := []*fwdpb.ActionDesc{fwdconfig.Action(fwdconfig.UpdateAction(fwdpb.UpdateType_UPDATE_TYPE_SET, fwdpb.PacketFieldNum_PACKET_FIELD_NUM_HOST_PORT_ID). From 9750302b764d19c3e3509835094a0fbad1542331 Mon Sep 17 00:00:00 2001 From: Daniel Grau Date: Wed, 15 Jan 2025 21:07:10 +0000 Subject: [PATCH 2/3] Tests --- dataplane/forwarding/fwdport/ports/cpu.go | 17 ++-- dataplane/saiserver/BUILD | 5 + dataplane/saiserver/hostif.go | 3 +- dataplane/saiserver/hostif_test.go | 117 +++++++++++++++++++++- dataplane/saiserver/switch_test.go | 4 +- sysrib/zapi_test.go | 6 +- 6 files changed, 136 insertions(+), 16 deletions(-) diff --git a/dataplane/forwarding/fwdport/ports/cpu.go b/dataplane/forwarding/fwdport/ports/cpu.go index 37583f84..3a643412 100644 --- a/dataplane/forwarding/fwdport/ports/cpu.go +++ b/dataplane/forwarding/fwdport/ports/cpu.go @@ -35,14 +35,12 @@ import ( // A CPUPort is a port that receives from and transmits to the controller. type CPUPort struct { fwdobject.Base - queueID string // CPU queue id - queue *queue.Queue // Queue of packets - input fwdaction.Actions // Actions used to process received packets - output fwdaction.Actions // Actions used to process transmitted packets - ctx *fwdcontext.Context // Forwarding context containing the port - export []*fwdpb.PacketFieldId // List of fields to export when writing the packet + queueID string // CPU queue id + queue *queue.Queue // Queue of packets + input fwdaction.Actions // Actions used to process received packets + output fwdaction.Actions // Actions used to process transmitted packets + ctx *fwdcontext.Context // Forwarding context containing the port desc *fwdpb.PortDesc - remote bool } // Desc returns the port description proto. @@ -52,7 +50,7 @@ func (p *CPUPort) Desc() *fwdpb.PortDesc { // String returns the port as a formatted string. func (p *CPUPort) String() string { - desc := fmt.Sprintf("Type=%v;CPU=%v;%v;;;", fwdpb.PortType_PORT_TYPE_CPU_PORT, p.queueID, p.BaseInfo(), p.queue, p.input, p.output, p.export) + desc := fmt.Sprintf("Type=%v;CPU=%v;%v;;", fwdpb.PortType_PORT_TYPE_CPU_PORT, p.queueID, p.BaseInfo(), p.queue, p.input, p.output) if state, err := p.State(nil); err == nil { desc += fmt.Sprintf(";", state) } @@ -69,7 +67,6 @@ func (p *CPUPort) Cleanup() { p.output.Cleanup() p.input = nil p.output = nil - p.export = nil } // Update updates the actions for the port. @@ -196,8 +193,6 @@ func (*cpuBuilder) Build(pd *fwdpb.PortDesc, ctx *fwdcontext.Context) (fwdport.P p := CPUPort{ ctx: ctx, queueID: cpu.Cpu.GetQueueId(), - export: cpu.Cpu.GetExportFieldIds(), - remote: cpu.Cpu.GetRemotePort(), desc: pd, } var err error diff --git a/dataplane/saiserver/BUILD b/dataplane/saiserver/BUILD index 0a745423..f9676433 100644 --- a/dataplane/saiserver/BUILD +++ b/dataplane/saiserver/BUILD @@ -53,19 +53,24 @@ go_test( embed = [":saiserver"], deps = [ "//dataplane/dplaneopts", + "//dataplane/forwarding/fwdport", "//dataplane/forwarding/infra/fwdcontext", "//dataplane/forwarding/infra/fwdobject", + "//dataplane/forwarding/infra/fwdpacket", "//dataplane/proto/packetio", "//dataplane/proto/sai", "//dataplane/saiserver/attrmgr", "//proto/forwarding", "@com_github_google_go_cmp//cmp", + "@com_github_google_gopacket//:gopacket", + "@com_github_google_gopacket//layers", "@com_github_openconfig_gnmi//errdiff", "@org_golang_google_genproto_googleapis_rpc//status", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//credentials/insecure", "@org_golang_google_protobuf//proto", + "@org_golang_google_protobuf//reflect/protoreflect", "@org_golang_google_protobuf//testing/protocmp", ], ) diff --git a/dataplane/saiserver/hostif.go b/dataplane/saiserver/hostif.go index 0861d032..8381a61c 100644 --- a/dataplane/saiserver/hostif.go +++ b/dataplane/saiserver/hostif.go @@ -44,7 +44,6 @@ func newHostif(mgr *attrmgr.AttrMgr, dataplane switchDataplaneAPI, s *grpc.Serve remoteHostifs: map[uint64]*pktiopb.HostPortControlMessage{}, opts: opts, } - saipb.RegisterHostifServer(s, hostif) pktiopb.RegisterPacketIOServer(s, hostif) return hostif @@ -401,7 +400,7 @@ func (hostif *hostif) CPUPacketStream(srv pktiopb.PacketIO_CPUPacketStreamServer cpuPortID = fmt.Sprint(resp.GetAttr().GetCpuPort()) } - packetCh := make(chan *pktiopb.PacketIn) + packetCh := make(chan *pktiopb.PacketIn, 1000) ctx, cancel := context.WithCancel(srv.Context()) // Since Recv() is blocking and we want this func to return immediately on cancel. diff --git a/dataplane/saiserver/hostif_test.go b/dataplane/saiserver/hostif_test.go index 3e32f85e..77222747 100644 --- a/dataplane/saiserver/hostif_test.go +++ b/dataplane/saiserver/hostif_test.go @@ -16,22 +16,31 @@ package saiserver import ( "context" + "encoding/binary" + "net" "testing" "time" "github.com/google/go-cmp/cmp" + "github.com/google/gopacket" + "github.com/google/gopacket/layers" "github.com/openconfig/gnmi/errdiff" "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/testing/protocmp" "github.com/openconfig/lemming/dataplane/dplaneopts" + "github.com/openconfig/lemming/dataplane/forwarding/fwdport" "github.com/openconfig/lemming/dataplane/forwarding/infra/fwdcontext" + "github.com/openconfig/lemming/dataplane/forwarding/infra/fwdpacket" + "github.com/openconfig/lemming/dataplane/saiserver/attrmgr" + pktiopb "github.com/openconfig/lemming/dataplane/proto/packetio" saipb "github.com/openconfig/lemming/dataplane/proto/sai" - "github.com/openconfig/lemming/dataplane/saiserver/attrmgr" + fwdpb "github.com/openconfig/lemming/proto/forwarding" ) @@ -209,6 +218,112 @@ func TestRemoveHostif(t *testing.T) { } } +func TestCPUPacketStream(t *testing.T) { + dplane := &fakeSwitchDataplane{ + ctx: fwdcontext.New("test", "test"), + } + p, err := fwdport.New(&fwdpb.PortDesc{ + PortType: fwdpb.PortType_PORT_TYPE_CPU_PORT, + PortId: &fwdpb.PortId{ObjectId: &fwdpb.ObjectId{Id: "2"}}, + Port: &fwdpb.PortDesc_Cpu{}, + }, dplane.ctx) + if err != nil { + t.Fatal(err) + } + + c, mgr, stopFn := newTestHostif(t, dplane) + mgr.StoreAttributes(1, &saipb.SwitchAttribute{ + CpuPort: proto.Uint64(2), + }) + + defer stopFn() + + s, err := c.PacketIOClient.CPUPacketStream(context.Background()) + if err != nil { + t.Fatal(err) + } + if err := s.Send(&pktiopb.PacketIn{}); err != nil { + t.Fatal(err) + } + + t.Run("send", func(t *testing.T) { + if err := s.Send(&pktiopb.PacketIn{ + Msg: &pktiopb.PacketIn_Packet{ + Packet: &pktiopb.Packet{ + Frame: []byte("hello"), + }, + }, + }); err != nil { + t.Fatal(err) + } + time.Sleep(10 * time.Millisecond) + want := [][]byte{[]byte("hello")} + if d := cmp.Diff(dplane.gotPackets, want); d != "" { + t.Errorf("PacketStream() failed: diff(-got,+want)\n:%s", d) + } + }) + t.Run("recv", func(t *testing.T) { + if _, err := p.Write(testPacket(t, uint64(p.NID()))); err != nil { + t.Fatal(err) + } + got, err := s.Recv() + if err != nil { + t.Fatal(err) + } + want := &pktiopb.PacketOut{ + Packet: &pktiopb.Packet{ + HostPort: 1, + InputPort: 2, + }, + } + if d := cmp.Diff(got, want, protocmp.Transform(), protocmp.IgnoreFields(&pktiopb.Packet{}, protoreflect.Name("frame"))); d != "" { + t.Errorf("PacketStream() failed: diff(-got,+want)\n:%s", d) + } + }) +} + +func testPacket(t testing.TB, nid uint64) fwdpacket.Packet { + eth := &layers.Ethernet{ + SrcMAC: parseMac(t, "00:00:00:00:00:01"), + DstMAC: parseMac(t, "00:00:00:00:00:02"), + EthernetType: layers.EthernetTypeIPv6, + } + ip := &layers.IPv6{ + Version: 6, + SrcIP: net.ParseIP("2003::9"), + DstIP: net.ParseIP("2003::10"), + HopLimit: 255, + } + payload := gopacket.Payload([]byte("hello world")) + buf := gopacket.NewSerializeBuffer() + if err := gopacket.SerializeLayers(buf, gopacket.SerializeOptions{FixLengths: true}, eth, ip, payload); err != nil { + t.Fatalf("failed to serialize headers: %v", err) + } + + p, err := fwdpacket.New(fwdpb.PacketHeaderId_PACKET_HEADER_ID_ETHERNET, buf.Bytes()) + if err != nil { + t.Fatal(err) + } + err = p.Update(fwdpacket.NewFieldIDFromNum(fwdpb.PacketFieldNum_PACKET_FIELD_NUM_HOST_PORT_ID, 0), fwdpacket.OpSet, binary.BigEndian.AppendUint64(nil, 1)) + if err != nil { + t.Fatal(err) + } + err = p.Update(fwdpacket.NewFieldIDFromNum(fwdpb.PacketFieldNum_PACKET_FIELD_NUM_PACKET_PORT_INPUT, 0), fwdpacket.OpSet, binary.BigEndian.AppendUint64(nil, nid)) + if err != nil { + t.Fatal(err) + } + + return p +} + +func parseMac(t testing.TB, mac string) net.HardwareAddr { + addr, err := net.ParseMAC(mac) + if err != nil { + t.Fatal(err) + } + return addr +} + type hostifClient struct { saipb.HostifClient pktiopb.PacketIOClient diff --git a/dataplane/saiserver/switch_test.go b/dataplane/saiserver/switch_test.go index acc0d2ee..bdc128bf 100644 --- a/dataplane/saiserver/switch_test.go +++ b/dataplane/saiserver/switch_test.go @@ -222,6 +222,7 @@ type fakeSwitchDataplane struct { gotFlowCounterCreateReqs []*fwdpb.FlowCounterCreateRequest gotFlowCounterQueryReqs []*fwdpb.FlowCounterQueryRequest gotEntryRemoveReqs []*fwdpb.TableEntryRemoveRequest + gotPackets [][]byte portIDToNID map[string]uint64 counterRepliesIdx int flowQueryReplies []*fwdpb.FlowCounterQueryReply @@ -295,7 +296,8 @@ func (f *fakeSwitchDataplane) ObjectNID(context.Context, *fwdpb.ObjectNIDRequest return nil, nil } -func (f *fakeSwitchDataplane) InjectPacket(*fwdpb.ContextId, *fwdpb.PortId, fwdpb.PacketHeaderId, []byte, []*fwdpb.ActionDesc, bool, fwdpb.PortAction) error { +func (f *fakeSwitchDataplane) InjectPacket(_ *fwdpb.ContextId, _ *fwdpb.PortId, _ fwdpb.PacketHeaderId, pkt []byte, _ []*fwdpb.ActionDesc, _ bool, _ fwdpb.PortAction) error { + f.gotPackets = append(f.gotPackets, pkt) return nil } diff --git a/sysrib/zapi_test.go b/sysrib/zapi_test.go index e5126e7a..984e86cb 100644 --- a/sysrib/zapi_test.go +++ b/sysrib/zapi_test.go @@ -739,9 +739,13 @@ func testRouteRedistribution(t *testing.T, routeReadyBeforeDial bool) { t.Fatalf("Got unexpected error during call to SetRoute: %v", err) } } + deadline := time.Now().Add(10 * time.Second) + if routeReadyBeforeDial { + deadline = time.Now().Add(30 * time.Second) + } // TODO: see if large timeout helps flakiness - conn.SetReadDeadline(time.Now().Add(10 * time.Second)) + conn.SetReadDeadline(deadline) m, err := zebra.ReceiveSingleMsg(topicLogger, conn, version, software, "test-client") if tt.inExpectTimeout { if err == nil { From 00e799b7d4e1561dbdf305ea1f8a8db6c9bb63b3 Mon Sep 17 00:00:00 2001 From: Daniel Grau Date: Wed, 15 Jan 2025 22:25:21 +0000 Subject: [PATCH 3/3] feedback --- dataplane/saiserver/hostif_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dataplane/saiserver/hostif_test.go b/dataplane/saiserver/hostif_test.go index 77222747..afa230b0 100644 --- a/dataplane/saiserver/hostif_test.go +++ b/dataplane/saiserver/hostif_test.go @@ -263,7 +263,7 @@ func TestCPUPacketStream(t *testing.T) { } }) t.Run("recv", func(t *testing.T) { - if _, err := p.Write(testPacket(t, uint64(p.NID()))); err != nil { + if _, err := p.Write(createPacket(t, uint64(p.NID()))); err != nil { t.Fatal(err) } got, err := s.Recv() @@ -282,7 +282,8 @@ func TestCPUPacketStream(t *testing.T) { }) } -func testPacket(t testing.TB, nid uint64) fwdpacket.Packet { +func createPacket(t testing.TB, nid uint64) fwdpacket.Packet { + t.Helper() eth := &layers.Ethernet{ SrcMAC: parseMac(t, "00:00:00:00:00:01"), DstMAC: parseMac(t, "00:00:00:00:00:02"),