Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make cpu stream more resilient #526

Merged
merged 3 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dataplane/forwarding/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"//dataplane/forwarding/fwdaction",
"//dataplane/forwarding/fwdaction/actions",
"//dataplane/forwarding/fwdport",
"//dataplane/forwarding/fwdport/ports",
"//dataplane/forwarding/fwdtable",
"//dataplane/forwarding/fwdtable/action",
"//dataplane/forwarding/fwdtable/bridge",
Expand Down
1 change: 1 addition & 0 deletions dataplane/forwarding/fwd.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
// essential, and there can be more to come, importing here is more
// beneficial.
_ "github.com/openconfig/lemming/dataplane/forwarding/fwdaction/actions"
_ "github.com/openconfig/lemming/dataplane/forwarding/fwdport/ports"
_ "github.com/openconfig/lemming/dataplane/forwarding/fwdtable/action"
_ "github.com/openconfig/lemming/dataplane/forwarding/fwdtable/bridge"
_ "github.com/openconfig/lemming/dataplane/forwarding/fwdtable/exact"
Expand Down
2 changes: 1 addition & 1 deletion dataplane/forwarding/fwdport/port.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func Input(port Port, packet fwdpacket.Packet, dir fwdpb.PortAction, ctx *fwdcon
port.Increment(fwdpb.CounterId_COUNTER_ID_RX_NON_UCAST_PACKETS, 1)
}

packet.Log().V(3).Info("input packet", "port", port.ID(), "frame", fwdpacket.IncludeFrameInLog)
packet.Log().V(1).Info("input packet", "port", port.ID(), "frame", fwdpacket.IncludeFrameInLog)
state, err := fwdaction.ProcessPacket(packet, port.Actions(dir), port)
if err != nil {
Increment(port, packet.Length(), fwdpb.CounterId_COUNTER_ID_RX_ERROR_PACKETS, fwdpb.CounterId_COUNTER_ID_RX_ERROR_OCTETS)
Expand Down
17 changes: 6 additions & 11 deletions dataplane/forwarding/fwdport/ports/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,12 @@ import (
// A CPUPort is a port that receives from and transmits to the controller.
type CPUPort struct {
fwdobject.Base
queueID string // CPU queue id
queue *queue.Queue // Queue of packets
input fwdaction.Actions // Actions used to process received packets
output fwdaction.Actions // Actions used to process transmitted packets
ctx *fwdcontext.Context // Forwarding context containing the port
export []*fwdpb.PacketFieldId // List of fields to export when writing the packet
queueID string // CPU queue id
queue *queue.Queue // Queue of packets
input fwdaction.Actions // Actions used to process received packets
output fwdaction.Actions // Actions used to process transmitted packets
ctx *fwdcontext.Context // Forwarding context containing the port
desc *fwdpb.PortDesc
remote bool
}

// Desc returns the port description proto.
Expand All @@ -52,7 +50,7 @@ func (p *CPUPort) Desc() *fwdpb.PortDesc {

// String returns the port as a formatted string.
func (p *CPUPort) String() string {
desc := fmt.Sprintf("Type=%v;CPU=%v;%v;<Queue=%v><Input=%v>;<Output=%v>;<Export=%v>", fwdpb.PortType_PORT_TYPE_CPU_PORT, p.queueID, p.BaseInfo(), p.queue, p.input, p.output, p.export)
desc := fmt.Sprintf("Type=%v;CPU=%v;%v;<Queue=%v><Input=%v>;<Output=%v>", fwdpb.PortType_PORT_TYPE_CPU_PORT, p.queueID, p.BaseInfo(), p.queue, p.input, p.output)
if state, err := p.State(nil); err == nil {
desc += fmt.Sprintf("<State=%v>;", state)
}
Expand All @@ -69,7 +67,6 @@ func (p *CPUPort) Cleanup() {
p.output.Cleanup()
p.input = nil
p.output = nil
p.export = nil
}

// Update updates the actions for the port.
Expand Down Expand Up @@ -196,8 +193,6 @@ func (*cpuBuilder) Build(pd *fwdpb.PortDesc, ctx *fwdcontext.Context) (fwdport.P
p := CPUPort{
ctx: ctx,
queueID: cpu.Cpu.GetQueueId(),
export: cpu.Cpu.GetExportFieldIds(),
remote: cpu.Cpu.GetRemotePort(),
desc: pd,
}
var err error
Expand Down
6 changes: 5 additions & 1 deletion dataplane/saiserver/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ go_library(
"//dataplane/dplaneopts",
"//dataplane/forwarding",
"//dataplane/forwarding/fwdconfig",
"//dataplane/forwarding/fwdport/ports",
"//dataplane/forwarding/infra/fwdcontext",
"//dataplane/proto/packetio",
"//dataplane/proto/sai",
Expand Down Expand Up @@ -54,19 +53,24 @@ go_test(
embed = [":saiserver"],
deps = [
"//dataplane/dplaneopts",
"//dataplane/forwarding/fwdport",
"//dataplane/forwarding/infra/fwdcontext",
"//dataplane/forwarding/infra/fwdobject",
"//dataplane/forwarding/infra/fwdpacket",
"//dataplane/proto/packetio",
"//dataplane/proto/sai",
"//dataplane/saiserver/attrmgr",
"//proto/forwarding",
"@com_github_google_go_cmp//cmp",
"@com_github_google_gopacket//:gopacket",
"@com_github_google_gopacket//layers",
"@com_github_openconfig_gnmi//errdiff",
"@org_golang_google_genproto_googleapis_rpc//status",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_protobuf//proto",
"@org_golang_google_protobuf//reflect/protoreflect",
"@org_golang_google_protobuf//testing/protocmp",
],
)
33 changes: 18 additions & 15 deletions dataplane/saiserver/hostif.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

"github.com/openconfig/lemming/dataplane/dplaneopts"
"github.com/openconfig/lemming/dataplane/forwarding/fwdconfig"
"github.com/openconfig/lemming/dataplane/forwarding/fwdport/ports"
"github.com/openconfig/lemming/dataplane/saiserver/attrmgr"

pktiopb "github.com/openconfig/lemming/dataplane/proto/packetio"
Expand All @@ -45,7 +44,6 @@ func newHostif(mgr *attrmgr.AttrMgr, dataplane switchDataplaneAPI, s *grpc.Serve
remoteHostifs: map[uint64]*pktiopb.HostPortControlMessage{},
opts: opts,
}

saipb.RegisterHostifServer(s, hostif)
pktiopb.RegisterPacketIOServer(s, hostif)
return hostif
Expand Down Expand Up @@ -384,24 +382,25 @@ func (hostif *hostif) CPUPacketStream(srv pktiopb.PacketIO_CPUPacketStreamServer
return err
}

fwdCtx.RLock()
// This RPC may be called before the CPU port is ready. In such cases, it will drop the packets until it is.
cpuPortID := ""
for _, id := range fwdCtx.Objects.IDs() {
obj, err := fwdCtx.Objects.FindID(&fwdpb.ObjectId{Id: string(id)})
if err != nil {
fwdCtx.RUnlock()
return err
updateCPUPortID := func() {
if cpuPortID != "" {
return
}
if _, ok := obj.(*ports.CPUPort); ok {
cpuPortID = string(obj.ID())
attrReq := &saipb.GetSwitchAttributeRequest{
Oid: switchID,
AttrType: []saipb.SwitchAttr{saipb.SwitchAttr_SWITCH_ATTR_CPU_PORT},
}
}
fwdCtx.RUnlock()
if cpuPortID == "" {
return fmt.Errorf("couldn't find cpu port")
resp := &saipb.GetSwitchAttributeResponse{}
if err := hostif.mgr.PopulateAttributes(attrReq, resp); err != nil {
slog.WarnContext(srv.Context(), "failed to failed cpu port", "error", err)
return
}
cpuPortID = fmt.Sprint(resp.GetAttr().GetCpuPort())
}

packetCh := make(chan *pktiopb.PacketIn)
packetCh := make(chan *pktiopb.PacketIn, 1000)
ctx, cancel := context.WithCancel(srv.Context())

// Since Recv() is blocking and we want this func to return immediately on cancel.
Expand Down Expand Up @@ -429,6 +428,10 @@ func (hostif *hostif) CPUPacketStream(srv pktiopb.PacketIO_CPUPacketStreamServer
case <-ctx.Done():
return nil
case pkt := <-packetCh:
updateCPUPortID()
if cpuPortID == "" {
continue
}
slog.Debug("received packet", "packet", pkt.GetPacket().GetFrame())

acts := []*fwdpb.ActionDesc{fwdconfig.Action(fwdconfig.UpdateAction(fwdpb.UpdateType_UPDATE_TYPE_SET, fwdpb.PacketFieldNum_PACKET_FIELD_NUM_HOST_PORT_ID).
Expand Down
117 changes: 116 additions & 1 deletion dataplane/saiserver/hostif_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,31 @@ package saiserver

import (
"context"
"encoding/binary"
"net"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/openconfig/gnmi/errdiff"
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/testing/protocmp"

"github.com/openconfig/lemming/dataplane/dplaneopts"
"github.com/openconfig/lemming/dataplane/forwarding/fwdport"
"github.com/openconfig/lemming/dataplane/forwarding/infra/fwdcontext"
"github.com/openconfig/lemming/dataplane/forwarding/infra/fwdpacket"
"github.com/openconfig/lemming/dataplane/saiserver/attrmgr"

pktiopb "github.com/openconfig/lemming/dataplane/proto/packetio"
saipb "github.com/openconfig/lemming/dataplane/proto/sai"
"github.com/openconfig/lemming/dataplane/saiserver/attrmgr"

fwdpb "github.com/openconfig/lemming/proto/forwarding"
)

Expand Down Expand Up @@ -209,6 +218,112 @@ func TestRemoveHostif(t *testing.T) {
}
}

func TestCPUPacketStream(t *testing.T) {
dplane := &fakeSwitchDataplane{
ctx: fwdcontext.New("test", "test"),
}
p, err := fwdport.New(&fwdpb.PortDesc{
PortType: fwdpb.PortType_PORT_TYPE_CPU_PORT,
PortId: &fwdpb.PortId{ObjectId: &fwdpb.ObjectId{Id: "2"}},
Port: &fwdpb.PortDesc_Cpu{},
}, dplane.ctx)
if err != nil {
t.Fatal(err)
}

c, mgr, stopFn := newTestHostif(t, dplane)
mgr.StoreAttributes(1, &saipb.SwitchAttribute{
CpuPort: proto.Uint64(2),
})

defer stopFn()

s, err := c.PacketIOClient.CPUPacketStream(context.Background())
if err != nil {
t.Fatal(err)
}
if err := s.Send(&pktiopb.PacketIn{}); err != nil {
t.Fatal(err)
}

t.Run("send", func(t *testing.T) {
if err := s.Send(&pktiopb.PacketIn{
Msg: &pktiopb.PacketIn_Packet{
Packet: &pktiopb.Packet{
Frame: []byte("hello"),
},
},
}); err != nil {
t.Fatal(err)
}
time.Sleep(10 * time.Millisecond)
want := [][]byte{[]byte("hello")}
if d := cmp.Diff(dplane.gotPackets, want); d != "" {
t.Errorf("PacketStream() failed: diff(-got,+want)\n:%s", d)
}
})
t.Run("recv", func(t *testing.T) {
if _, err := p.Write(testPacket(t, uint64(p.NID()))); err != nil {
t.Fatal(err)
}
got, err := s.Recv()
if err != nil {
t.Fatal(err)
}
want := &pktiopb.PacketOut{
Packet: &pktiopb.Packet{
HostPort: 1,
InputPort: 2,
},
}
if d := cmp.Diff(got, want, protocmp.Transform(), protocmp.IgnoreFields(&pktiopb.Packet{}, protoreflect.Name("frame"))); d != "" {
t.Errorf("PacketStream() failed: diff(-got,+want)\n:%s", d)
}
})
}

func testPacket(t testing.TB, nid uint64) fwdpacket.Packet {
DanG100 marked this conversation as resolved.
Show resolved Hide resolved
eth := &layers.Ethernet{
SrcMAC: parseMac(t, "00:00:00:00:00:01"),
DstMAC: parseMac(t, "00:00:00:00:00:02"),
EthernetType: layers.EthernetTypeIPv6,
}
ip := &layers.IPv6{
Version: 6,
SrcIP: net.ParseIP("2003::9"),
DstIP: net.ParseIP("2003::10"),
HopLimit: 255,
}
payload := gopacket.Payload([]byte("hello world"))
buf := gopacket.NewSerializeBuffer()
if err := gopacket.SerializeLayers(buf, gopacket.SerializeOptions{FixLengths: true}, eth, ip, payload); err != nil {
t.Fatalf("failed to serialize headers: %v", err)
}

p, err := fwdpacket.New(fwdpb.PacketHeaderId_PACKET_HEADER_ID_ETHERNET, buf.Bytes())
if err != nil {
t.Fatal(err)
}
err = p.Update(fwdpacket.NewFieldIDFromNum(fwdpb.PacketFieldNum_PACKET_FIELD_NUM_HOST_PORT_ID, 0), fwdpacket.OpSet, binary.BigEndian.AppendUint64(nil, 1))
if err != nil {
t.Fatal(err)
}
err = p.Update(fwdpacket.NewFieldIDFromNum(fwdpb.PacketFieldNum_PACKET_FIELD_NUM_PACKET_PORT_INPUT, 0), fwdpacket.OpSet, binary.BigEndian.AppendUint64(nil, nid))
if err != nil {
t.Fatal(err)
}

return p
}

func parseMac(t testing.TB, mac string) net.HardwareAddr {
addr, err := net.ParseMAC(mac)
if err != nil {
t.Fatal(err)
}
return addr
}

type hostifClient struct {
saipb.HostifClient
pktiopb.PacketIOClient
Expand Down
4 changes: 3 additions & 1 deletion dataplane/saiserver/switch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ type fakeSwitchDataplane struct {
gotFlowCounterCreateReqs []*fwdpb.FlowCounterCreateRequest
gotFlowCounterQueryReqs []*fwdpb.FlowCounterQueryRequest
gotEntryRemoveReqs []*fwdpb.TableEntryRemoveRequest
gotPackets [][]byte
portIDToNID map[string]uint64
counterRepliesIdx int
flowQueryReplies []*fwdpb.FlowCounterQueryReply
Expand Down Expand Up @@ -295,7 +296,8 @@ func (f *fakeSwitchDataplane) ObjectNID(context.Context, *fwdpb.ObjectNIDRequest
return nil, nil
}

func (f *fakeSwitchDataplane) InjectPacket(*fwdpb.ContextId, *fwdpb.PortId, fwdpb.PacketHeaderId, []byte, []*fwdpb.ActionDesc, bool, fwdpb.PortAction) error {
func (f *fakeSwitchDataplane) InjectPacket(_ *fwdpb.ContextId, _ *fwdpb.PortId, _ fwdpb.PacketHeaderId, pkt []byte, _ []*fwdpb.ActionDesc, _ bool, _ fwdpb.PortAction) error {
f.gotPackets = append(f.gotPackets, pkt)
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion sysrib/zapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,9 +739,13 @@ func testRouteRedistribution(t *testing.T, routeReadyBeforeDial bool) {
t.Fatalf("Got unexpected error during call to SetRoute: %v", err)
}
}
deadline := time.Now().Add(10 * time.Second)
if routeReadyBeforeDial {
deadline = time.Now().Add(30 * time.Second)
}

// TODO: see if large timeout helps flakiness
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
conn.SetReadDeadline(deadline)
m, err := zebra.ReceiveSingleMsg(topicLogger, conn, version, software, "test-client")
if tt.inExpectTimeout {
if err == nil {
Expand Down
Loading