diff --git a/integration_tests/dataplane/mplsoudp/BUILD b/integration_tests/dataplane/mplsoudp/BUILD new file mode 100644 index 00000000..3dbbe748 --- /dev/null +++ b/integration_tests/dataplane/mplsoudp/BUILD @@ -0,0 +1,27 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "mplsoudp_test", + srcs = ["mplsoverudp_test.go"], + data = [ + "testbed.pb.txt", + ], + deps = [ + "//dataplane/proto/sai", + "//dataplane/saiserver", + "//gnmi/oc", + "//integration_tests/saiutil", + "//internal/attrs", + "//internal/binding", + "//proto/forwarding", + "@com_github_google_go_cmp//cmp", + "@com_github_google_go_cmp//cmp/cmpopts", + "@com_github_google_gopacket//:gopacket", + "@com_github_google_gopacket//layers", + "@com_github_openconfig_ondatra//:ondatra", + "@com_github_openconfig_ondatra//binding", + "@com_github_openconfig_ygot//ygot", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_protobuf//proto", + ], +) diff --git a/integration_tests/dataplane/mplsoudp/mplsoverudp_test.go b/integration_tests/dataplane/mplsoudp/mplsoverudp_test.go new file mode 100644 index 00000000..7b0befcc --- /dev/null +++ b/integration_tests/dataplane/mplsoudp/mplsoverudp_test.go @@ -0,0 +1,262 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mplsoudp + +import ( + "context" + "encoding/binary" + "net" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/openconfig/ondatra" + "github.com/openconfig/ygot/ygot" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" + + "github.com/openconfig/lemming/gnmi/oc" + "github.com/openconfig/lemming/integration_tests/saiutil" + "github.com/openconfig/lemming/internal/attrs" + "github.com/openconfig/lemming/internal/binding" + + obind "github.com/openconfig/ondatra/binding" + + saipb "github.com/openconfig/lemming/dataplane/proto/sai" + "github.com/openconfig/lemming/dataplane/saiserver" + fwdpb "github.com/openconfig/lemming/proto/forwarding" +) + +var pm = &binding.PortMgr{} + +func TestMain(m *testing.M) { + ondatra.RunTests(m, binding.Local(".", binding.WithOverridePortManager(pm))) +} + +func dataplaneConn(t testing.TB, dut *ondatra.DUTDevice) *grpc.ClientConn { + t.Helper() + var lemming interface { + DataplaneConn(ctx context.Context, opts ...grpc.DialOption) (*grpc.ClientConn, error) + } + if err := obind.DUTAs(dut.RawAPIs().BindingDUT(), &lemming); err != nil { + t.Fatalf("failed to get lemming dut: %v", err) + } + conn, err := lemming.DataplaneConn(context.Background()) + if err != nil { + t.Fatal(err) + } + return conn +} + +var ( + dutPort1 = attrs.Attributes{ + Desc: "dutPort1", + MAC: "10:10:10:10:10:10", + } + + dutPort2 = attrs.Attributes{ + Desc: "dutPort2", + MAC: "10:10:10:10:10:11", + } +) + +const neighborMAC = "10:10:10:10:10:12" + +func configureDUT(t testing.TB, dut *ondatra.DUTDevice, hop *oc.NetworkInstance_Afts_NextHop, routePrefix string) { + t.Helper() + conn := dataplaneConn(t, dut) + // Allow all traffic to L3 processing. + mmc := saipb.NewMyMacClient(conn) + _, err := mmc.CreateMyMac(context.Background(), &saipb.CreateMyMacRequest{ + Switch: 1, + Priority: proto.Uint32(1), + MacAddress: []byte{0, 0, 0, 0, 0, 0}, + MacAddressMask: []byte{0, 0, 0, 0, 0, 0}, + }) + if err != nil { + t.Fatal(err) + } + saiutil.CreateRIF(t, dut, dut.Port(t, "port1"), dutPort1.MAC) + outRIF := saiutil.CreateRIF(t, dut, dut.Port(t, "port2"), dutPort2.MAC) + + nhc := saipb.NewNextHopClient(conn) + + nh, err := nhc.CreateNextHop(context.Background(), &saipb.CreateNextHopRequest{ + Switch: 1, + Type: saipb.NextHopType_NEXT_HOP_TYPE_IP.Enum(), + RouterInterfaceId: &outRIF, + Ip: net.ParseIP(*hop.IpAddress), + }) + if err != nil { + t.Fatal(err) + } + + ip := &layers.IPv6{ + Version: 6, + NextHeader: layers.IPProtocolUDP, + SrcIP: net.ParseIP(*hop.EncapHeader[0].UdpV6.SrcIp), + DstIP: net.ParseIP(*hop.EncapHeader[0].UdpV6.DstIp), + HopLimit: *hop.EncapHeader[0].UdpV6.IpTtl, + } + udp := &layers.UDP{ + SrcPort: layers.UDPPort(*hop.EncapHeader[0].UdpV6.SrcUdpPort), + DstPort: layers.UDPPort(*hop.EncapHeader[0].UdpV6.DstUdpPort), + } + mpls := &layers.MPLS{ + Label: uint32(hop.EncapHeader[1].Mpls.MplsLabelStack[0].(oc.UnionUint32)), + } + + buf := gopacket.NewSerializeBuffer() + if err := gopacket.SerializeLayers(buf, gopacket.SerializeOptions{}, ip, udp, mpls); err != nil { + t.Fatalf("failed to serialize headers: %v", err) + } + + acts := []*fwdpb.ActionDesc{{ + ActionType: fwdpb.ActionType_ACTION_TYPE_REPARSE, + Action: &fwdpb.ActionDesc_Reparse{ + Reparse: &fwdpb.ReparseActionDesc{ + HeaderId: fwdpb.PacketHeaderId_PACKET_HEADER_ID_IP6, + FieldIds: []*fwdpb.PacketFieldId{ // Copy all metadata fields. + {Field: &fwdpb.PacketField{FieldNum: fwdpb.PacketFieldNum_PACKET_FIELD_NUM_NEXT_HOP_IP}}, + {Field: &fwdpb.PacketField{FieldNum: fwdpb.PacketFieldNum_PACKET_FIELD_NUM_PACKET_PORT_INPUT}}, + {Field: &fwdpb.PacketField{FieldNum: fwdpb.PacketFieldNum_PACKET_FIELD_NUM_PACKET_PORT_OUTPUT}}, + {Field: &fwdpb.PacketField{FieldNum: fwdpb.PacketFieldNum_PACKET_FIELD_NUM_INPUT_IFACE}}, + {Field: &fwdpb.PacketField{FieldNum: fwdpb.PacketFieldNum_PACKET_FIELD_NUM_OUTPUT_IFACE}}, + {Field: &fwdpb.PacketField{FieldNum: fwdpb.PacketFieldNum_PACKET_FIELD_NUM_TRAP_ID}}, + {Field: &fwdpb.PacketField{FieldNum: fwdpb.PacketFieldNum_PACKET_FIELD_NUM_NEXT_HOP_GROUP_ID}}, + {Field: &fwdpb.PacketField{FieldNum: fwdpb.PacketFieldNum_PACKET_FIELD_NUM_NEXT_HOP_ID}}, + {Field: &fwdpb.PacketField{FieldNum: fwdpb.PacketFieldNum_PACKET_FIELD_NUM_PACKET_VRF}}, + }, + // After the UDP header, the rest of the packet (original packet) will be classified as payload. + Prepend: buf.Bytes(), + }, + }, + }} + actReq := &fwdpb.TableEntryAddRequest{ + ContextId: &fwdpb.ContextId{Id: "lucius"}, + TableId: &fwdpb.TableId{ObjectId: &fwdpb.ObjectId{Id: saiserver.NHActionTable}}, + EntryDesc: &fwdpb.EntryDesc{Entry: &fwdpb.EntryDesc_Exact{ + Exact: &fwdpb.ExactEntryDesc{ + Fields: []*fwdpb.PacketFieldBytes{{ + FieldId: &fwdpb.PacketFieldId{Field: &fwdpb.PacketField{FieldNum: fwdpb.PacketFieldNum_PACKET_FIELD_NUM_NEXT_HOP_ID}}, + Bytes: binary.BigEndian.AppendUint64(nil, nh.GetOid()), + }}, + }, + }}, + Actions: acts, + } + fwd := fwdpb.NewForwardingClient(conn) + if _, err := fwd.TableEntryAdd(context.Background(), actReq); err != nil { + t.Fatal(err) + } + saiutil.CreateRoute(t, dut, routePrefix, nh.GetOid()) + saiutil.CreateNeighbor(t, dut, *hop.IpAddress, neighborMAC, outRIF) +} + +func parseMac(t testing.TB, mac string) net.HardwareAddr { + addr, err := net.ParseMAC(mac) + if err != nil { + t.Fatal(err) + } + return addr +} + +func TestMPLSoverUDP(t *testing.T) { + hop := &oc.NetworkInstance_Afts_NextHop{ + IpAddress: ygot.String("2003::3"), + EncapHeader: map[uint8]*oc.NetworkInstance_Afts_NextHop_EncapHeader{ + 0: { + UdpV6: &oc.NetworkInstance_Afts_NextHop_EncapHeader_UdpV6{ + SrcIp: ygot.String("2003::1"), + DstIp: ygot.String("2003::2"), + SrcUdpPort: ygot.Uint16(60000), + DstUdpPort: ygot.Uint16(60001), + IpTtl: ygot.Uint8(10), + }, + }, + 1: { + Mpls: &oc.NetworkInstance_Afts_NextHop_EncapHeader_Mpls{ + MplsLabelStack: []oc.NetworkInstance_Afts_NextHop_EncapHeader_Mpls_MplsLabelStack_Union{oc.UnionUint32(100)}, + }, + }, + }, + } + + configureDUT(t, ondatra.DUT(t, "dut"), hop, "2003::10/128") + + // Create test packet + eth := &layers.Ethernet{ + SrcMAC: parseMac(t, "00:00:00:00:00:01"), + DstMAC: parseMac(t, dutPort1.MAC), + EthernetType: layers.EthernetTypeIPv6, + } + + ip := &layers.IPv6{ + Version: 6, + SrcIP: net.ParseIP("2003::9"), + DstIP: net.ParseIP("2003::10"), + HopLimit: 255, + } + payload := gopacket.Payload([]byte("hello world")) + + buf := gopacket.NewSerializeBuffer() + if err := gopacket.SerializeLayers(buf, gopacket.SerializeOptions{FixLengths: true}, eth, ip, payload); err != nil { + t.Fatalf("failed to serialize headers: %v", err) + } + + // Send test packet to port1. + p1 := pm.GetPort(ondatra.DUT(t, "dut").Port(t, "port1")) + p1.RXQueue.Write(buf.Bytes()) + + // Received forwarded packet from port2. + p2 := pm.GetPort(ondatra.DUT(t, "dut").Port(t, "port2")) + packet := (<-p2.TXQueue.Receive()).([]byte) + p := gopacket.NewPacket(packet, layers.LayerTypeEthernet, gopacket.Default) + t.Log(p.Dump()) + + wantEth := &layers.Ethernet{ + SrcMAC: parseMac(t, dutPort2.MAC), + DstMAC: parseMac(t, neighborMAC), + EthernetType: layers.EthernetTypeIPv6, + } + + wantIP := &layers.IPv6{ + Version: 6, + SrcIP: net.ParseIP(*hop.EncapHeader[0].UdpV6.SrcIp), + DstIP: net.ParseIP(*hop.EncapHeader[0].UdpV6.DstIp), + NextHeader: layers.IPProtocolUDP, + HopLimit: 9, + Length: 63, + } + wantUDP := &layers.UDP{ + SrcPort: layers.UDPPort(*hop.EncapHeader[0].UdpV6.SrcUdpPort), + DstPort: layers.UDPPort(*hop.EncapHeader[0].UdpV6.DstUdpPort), + } + + wantMPLS := &layers.MPLS{ + Label: 100, + } + buf = gopacket.NewSerializeBuffer() + if err := gopacket.SerializeLayers(buf, gopacket.SerializeOptions{FixLengths: true}, wantMPLS, ip, payload); err != nil { + t.Fatalf("failed to serialize GUE headers: %v", err) + } + wantPayload := gopacket.Payload(buf.Bytes()) + + if d := cmp.Diff(p.Layers(), []gopacket.Layer{wantEth, wantIP, wantUDP, &wantPayload}, cmpopts.IgnoreUnexported(layers.IPv6{}, layers.UDP{}, layers.MPLS{}), cmpopts.IgnoreFields(layers.UDP{}, "BaseLayer"), cmpopts.IgnoreFields(layers.Ethernet{}, "BaseLayer"), cmpopts.IgnoreFields(layers.IPv6{}, "BaseLayer")); d != "" { + t.Fatal(d) + } +} diff --git a/integration_tests/dataplane/mplsoudp/testbed.pb.txt b/integration_tests/dataplane/mplsoudp/testbed.pb.txt new file mode 100644 index 00000000..b3fbc9b7 --- /dev/null +++ b/integration_tests/dataplane/mplsoudp/testbed.pb.txt @@ -0,0 +1,9 @@ +duts { + id: "dut" + ports { + id: "port1" + } + ports { + id: "port2" + } +} diff --git a/integration_tests/saiutil/BUILD b/integration_tests/saiutil/BUILD new file mode 100644 index 00000000..b0158d57 --- /dev/null +++ b/integration_tests/saiutil/BUILD @@ -0,0 +1,15 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "saiutil", + srcs = ["saiutil.go"], + importpath = "github.com/openconfig/lemming/integration_tests/saiutil", + visibility = ["//visibility:public"], + deps = [ + "//dataplane/proto/sai", + "@com_github_openconfig_ondatra//:ondatra", + "@com_github_openconfig_ondatra//binding", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_protobuf//proto", + ], +) diff --git a/integration_tests/saiutil/saiutil.go b/integration_tests/saiutil/saiutil.go new file mode 100644 index 00000000..d9fe15b2 --- /dev/null +++ b/integration_tests/saiutil/saiutil.go @@ -0,0 +1,105 @@ +package saiutil + +import ( + "context" + "net" + "net/netip" + "strconv" + "testing" + + "github.com/openconfig/ondatra" + obind "github.com/openconfig/ondatra/binding" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" + + saipb "github.com/openconfig/lemming/dataplane/proto/sai" +) + +func dataplaneConn(t testing.TB, dut *ondatra.DUTDevice) *grpc.ClientConn { + t.Helper() + var lemming interface { + DataplaneConn(ctx context.Context, opts ...grpc.DialOption) (*grpc.ClientConn, error) + } + if err := obind.DUTAs(dut.RawAPIs().BindingDUT(), &lemming); err != nil { + t.Fatalf("failed to get lemming dut: %v", err) + } + conn, err := lemming.DataplaneConn(context.Background()) + if err != nil { + t.Fatal(err) + } + return conn +} + +func CreateRIF(t testing.TB, dut *ondatra.DUTDevice, port *ondatra.Port, smac string) uint64 { + t.Helper() + conn := dataplaneConn(t, dut) + ric := saipb.NewRouterInterfaceClient(conn) + port1ID, err := strconv.ParseUint(port.Name(), 10, 64) + if err != nil { + t.Fatal(err) + } + mac, err := net.ParseMAC(smac) + if err != nil { + t.Fatal(err) + } + resp, err := ric.CreateRouterInterface(context.Background(), &saipb.CreateRouterInterfaceRequest{ + Switch: 1, + PortId: proto.Uint64(port1ID), + Type: saipb.RouterInterfaceType_ROUTER_INTERFACE_TYPE_PORT.Enum(), + SrcMacAddress: mac, + }) + if err != nil { + t.Fatal(err) + } + return resp.Oid +} + +func CreateRoute(t testing.TB, dut *ondatra.DUTDevice, prefix string, nexthop uint64) { + t.Helper() + conn := dataplaneConn(t, dut) + rc := saipb.NewRouteClient(conn) + pre, err := netip.ParsePrefix(prefix) + if err != nil { + t.Fatal(err) + } + ip := pre.Addr().AsSlice() + mask := net.CIDRMask(pre.Bits(), pre.Addr().BitLen()) + + _, err = rc.CreateRouteEntry(context.Background(), &saipb.CreateRouteEntryRequest{ + Entry: &saipb.RouteEntry{ + SwitchId: 1, + VrId: 0, + Destination: &saipb.IpPrefix{ + Addr: ip, + Mask: mask, + }, + }, + PacketAction: saipb.PacketAction_PACKET_ACTION_FORWARD.Enum(), + NextHopId: &nexthop, + }) + if err != nil { + t.Fatal(err) + } +} + +func CreateNeighbor(t testing.TB, dut *ondatra.DUTDevice, ip string, dmac string, rif uint64) { + t.Helper() + conn := dataplaneConn(t, dut) + nc := saipb.NewNeighborClient(conn) + mac, err := net.ParseMAC(dmac) + if err != nil { + t.Fatal(err) + } + + _, err = nc.CreateNeighborEntry(context.Background(), &saipb.CreateNeighborEntryRequest{ + Entry: &saipb.NeighborEntry{ + SwitchId: 1, + RifId: rif, + IpAddress: net.ParseIP(ip), + }, + DstMacAddress: mac, + }) + if err != nil { + t.Fatal(err) + } +} diff --git a/internal/binding/local.go b/internal/binding/local.go index 2b7117bd..2afbfadc 100644 --- a/internal/binding/local.go +++ b/internal/binding/local.go @@ -38,6 +38,7 @@ import ( "github.com/openconfig/magna/lwotg" "github.com/openconfig/magna/lwotgtelem" "github.com/openconfig/magna/telemetry/arp" + "github.com/openconfig/ondatra" "k8s.io/klog" "github.com/openconfig/ondatra/binding" @@ -66,18 +67,31 @@ import ( // LocalBind is an Ondatra binding for in-process testbed. Only Lemming and Magna are supported. type LocalBind struct { binding.Binding - portMgr *portMgr + portMgr *PortMgr closers []func() error } +type Option func(lb *LocalBind) + +func WithOverridePortManager(mgr *PortMgr) Option { + return func(lb *LocalBind) { + lb.portMgr = mgr + } +} + // Local is a local (in-process) binding for lemming and magna. -func Local(topoDir string) func() (binding.Binding, error) { +func Local(topoDir string, opts ...Option) func() (binding.Binding, error) { dir, _ := filepath.Abs(topoDir) testbedFile := filepath.Join(dir, "testbed.pb.txt") + lb := &LocalBind{} + for _, opt := range opts { + opt(lb) + } + flag.Set("testbed", testbedFile) return func() (binding.Binding, error) { - return &LocalBind{}, nil + return lb, nil } } @@ -226,14 +240,24 @@ func (lb *LocalBind) Reserve(ctx context.Context, tb *opb.Testbed, _, _ time.Dur ATEs: make(map[string]binding.ATE), } - lb.portMgr = &portMgr{ - ports: map[string]*chanPort{}, - dutLaneToPort: map[string]map[string]string{}, + if lb.portMgr == nil { + lb.portMgr = &PortMgr{} + } + if lb.portMgr.ports == nil { + lb.portMgr.ports = map[string]*ChanPort{} + } + if lb.portMgr.dutLaneToPort == nil { + lb.portMgr.dutLaneToPort = map[string]map[string]string{} } if err := lb.portMgr.createPorts(tb); err != nil { return nil, err } + for _, l := range tb.Links { + if err := lb.portMgr.linkPorts(l.A, l.B); err != nil { + return nil, err + } + } intf.OverrideAccessor(&accessor{}) common.OverrideHandleCreator(lb.portMgr) @@ -266,16 +290,10 @@ func (lb *LocalBind) Reserve(ctx context.Context, tb *opb.Testbed, _, _ time.Dur resv.DUTs[dut.Id] = lemming } - for _, l := range tb.Links { - if err := lb.portMgr.linkPorts(l.A, l.B); err != nil { - return nil, err - } - } - return &resv, nil } -func (lb *LocalBind) createDUT(ctx context.Context, dut *opb.Device, addr string, portMgr *portMgr) (*localLemming, error) { +func (lb *LocalBind) createDUT(ctx context.Context, dut *opb.Device, addr string, portMgr *PortMgr) (*localLemming, error) { dutID := uuid.New().String() l, err := lemming.New(dut.Id, fmt.Sprintf("unix:/tmp/zserv-test%s.api", dutID), @@ -339,7 +357,7 @@ func (lb *LocalBind) createDUT(ctx context.Context, dut *opb.Device, addr string } // TODO: this should probably be a library in magna. -func (lb *LocalBind) createATE(_ context.Context, ate *opb.Device, addr string, portMgr *portMgr) (*localMagna, error) { +func (lb *LocalBind) createATE(_ context.Context, ate *opb.Device, addr string, portMgr *PortMgr) (*localMagna, error) { otgSrv := lwotg.New() telemSrv, err := lwotgtelem.New(context.Background(), ate.Id) if err != nil { @@ -423,8 +441,8 @@ func (lb *LocalBind) Release(context.Context) error { } for name, port := range lb.portMgr.ports { - fmt.Printf("port %s, tx enqueue %d, tx dequeue %d\n", name, port.txQueue.EnqueueCount(), port.txQueue.DequeueCount()) - fmt.Printf("port %s, rx enqueue %d, rx dequeue %d\n", name, port.rxQueue.EnqueueCount(), port.rxQueue.DequeueCount()) + fmt.Printf("port %s, tx enqueue %d, tx dequeue %d\n", name, port.TXQueue.EnqueueCount(), port.TXQueue.DequeueCount()) + fmt.Printf("port %s, rx enqueue %d, rx dequeue %d\n", name, port.RXQueue.EnqueueCount(), port.RXQueue.DequeueCount()) } return nil } @@ -447,25 +465,35 @@ func findAvailableLoopbackIP() (string, func() error, error) { return addr, closer, nil } -func newPort(name string) (*chanPort, error) { +func newPort(name string) (*ChanPort, error) { tx, err := queue.NewUnbounded(name + "_tx") if err != nil { return nil, err } - p := &chanPort{ - txQueue: tx, + rx, err := queue.NewUnbounded(name + "_rx") + if err != nil { + return nil, err + } + p := &ChanPort{ + TXQueue: tx, + RXQueue: rx, } tx.Run() + rx.Run() p.run() return p, nil } -type portMgr struct { - ports map[string]*chanPort +type PortMgr struct { + ports map[string]*ChanPort dutLaneToPort map[string]map[string]string } -func (pm *portMgr) createPorts(tb *opb.Testbed) error { +func (pm *PortMgr) GetPort(p *ondatra.Port) *ChanPort { + return pm.ports[fmt.Sprintf("%s:%s", p.Device().ID(), p.ID())] +} + +func (pm *PortMgr) createPorts(tb *opb.Testbed) error { for _, dut := range tb.Duts { pm.dutLaneToPort[dut.GetId()] = make(map[string]string) for i, port := range dut.Ports { @@ -492,7 +520,7 @@ func (pm *portMgr) createPorts(tb *opb.Testbed) error { } // CreateHandle implements magna's API for creating handles. -func (pm *portMgr) CreateHandle(name string) (common.Port, error) { +func (pm *PortMgr) CreateHandle(name string) (common.Port, error) { port, ok := pm.ports[name] if !ok { return nil, fmt.Errorf("port %v not found", name) @@ -500,29 +528,35 @@ func (pm *portMgr) CreateHandle(name string) (common.Port, error) { return port.newHandle(), nil } -func (pm *portMgr) dutManager(dutID string) *dutManager { +func (pm *PortMgr) dutManager(dutID string) *dutManager { return &dutManager{ mgr: pm, dutID: dutID, } } -func (pm *portMgr) linkPorts(a, b string) error { +func (pm *PortMgr) linkPorts(a, b string) error { aPort := pm.ports[a] bPort := pm.ports[b] if aPort == nil || bPort == nil { return fmt.Errorf("ports do not exist: a %v b %v", a, b) } - aPort.rxQueue = bPort.txQueue - bPort.rxQueue = aPort.txQueue + aPort.mu.Lock() + bPort.mu.Lock() + aPort.RXQueue.Close() + bPort.RXQueue.Close() + aPort.RXQueue = bPort.TXQueue + bPort.RXQueue = aPort.TXQueue + aPort.mu.Unlock() + bPort.mu.Unlock() return nil } // dutManager handles mapping from the SAI API hardware lane to Ondatra port ID. type dutManager struct { - mgr *portMgr + mgr *PortMgr dutID string } @@ -535,22 +569,22 @@ func (dm *dutManager) CreatePort(name string) (fwdcontext.Port, error) { return port.newHandle(), nil } -// chanPort is a fake port implemented using channels. -type chanPort struct { +// ChanPort is a fake port implemented using channels. +type ChanPort struct { mu sync.RWMutex - txQueue *queue.Queue - rxQueue *queue.Queue + TXQueue *queue.Queue + RXQueue *queue.Queue handles []*portHandle } -func (p *chanPort) newHandle() *portHandle { +func (p *ChanPort) newHandle() *portHandle { p.mu.Lock() defer p.mu.Unlock() h := &portHandle{ rx: make(chan []byte, 1024), - tx: p.txQueue, + tx: p.TXQueue, } p.handles = append(p.handles, h) i := len(p.handles) - 1 @@ -563,18 +597,24 @@ func (p *chanPort) newHandle() *portHandle { return h } -func (p *chanPort) run() { +func (p *ChanPort) run() { go func() { for { - if p.rxQueue == nil { + p.mu.RLock() + if p.RXQueue == nil { + p.mu.RUnlock() + continue + } + q := p.RXQueue.Receive() + p.mu.RUnlock() + packet, ok := <-q + if !ok { + time.Sleep(time.Millisecond) continue } - packet := (<-p.rxQueue.Receive()).([]byte) - p.mu.RLock() for _, h := range p.handles { - h.rx <- packet + h.rx <- packet.([]byte) } - p.mu.RUnlock() } }() }