Skip to content

Commit

Permalink
Initial implement a agg intf config
Browse files Browse the repository at this point in the history
  • Loading branch information
DanG100 committed Dec 11, 2023
1 parent 4c22063 commit c6e3def
Show file tree
Hide file tree
Showing 6 changed files with 969 additions and 22 deletions.
228 changes: 207 additions & 21 deletions dataplane/dplanerc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@ type ocInterface struct {
}

type interfaceData struct {
portID uint64
hostifID uint64
hostifIfIndex int
hostifDevName string
rifID uint64
portID uint64
hostifID uint64
hostifIfIndex int
hostifDevName string
rifID uint64
lagMembershipID uint64
}

type interfaceMap map[ocInterface]*interfaceData
Expand Down Expand Up @@ -92,6 +93,7 @@ type Reconciler struct {
nextHopClient saipb.NextHopClient
fwdClient fwdpb.ForwardingClient
nextHopGroupClient saipb.NextHopGroupClient
lagClient saipb.LagClient
stateMu sync.RWMutex
// state keeps track of the applied state of the device's interfaces so that we do not issue duplicate configuration commands to the device's interfaces.
state map[string]*oc.Interface
Expand Down Expand Up @@ -133,6 +135,7 @@ func New(conn grpc.ClientConnInterface, switchID, cpuPortID uint64, contextID st
nextHopClient: saipb.NewNextHopClient(conn),
nextHopGroupClient: saipb.NewNextHopGroupClient(conn),
fwdClient: fwdpb.NewForwardingClient(conn),
lagClient: saipb.NewLagClient(conn),
}
return r
}
Expand All @@ -155,6 +158,8 @@ func (ni *Reconciler) StartInterface(ctx context.Context, client *ygnmi.Client)
ocpath.Root().InterfaceAny().Subinterface(0).Ipv4().AddressAny().PrefixLength().Config().PathStruct(),
ocpath.Root().InterfaceAny().Subinterface(0).Ipv6().AddressAny().Ip().Config().PathStruct(),
ocpath.Root().InterfaceAny().Subinterface(0).Ipv6().AddressAny().PrefixLength().Config().PathStruct(),
ocpath.Root().InterfaceAny().Aggregation().LagType().Config().PathStruct(),
ocpath.Root().InterfaceAny().Ethernet().AggregateId().Config().PathStruct(),
)
cancelCtx, cancelFn := context.WithCancel(ctx)

Expand All @@ -169,7 +174,6 @@ func (ni *Reconciler) StartInterface(ctx context.Context, client *ygnmi.Client)
}
return ygnmi.Continue
})

linkDoneCh := make(chan struct{})
linkUpdateCh := make(chan netlink.LinkUpdate)
addrDoneCh := make(chan struct{})
Expand Down Expand Up @@ -262,8 +266,12 @@ func (ni *Reconciler) startCounterUpdates(ctx context.Context) {
}
ni.stateMu.RUnlock()
for _, intfName := range intfNames {
intf := ni.ocInterfaceData[intfName]
if intf == nil {
continue
}
stats, err := ni.portClient.GetPortStats(ctx, &saipb.GetPortStatsRequest{
Oid: ni.ocInterfaceData[intfName].portID,
Oid: intf.portID,
CounterIds: []saipb.PortStat{
saipb.PortStat_PORT_STAT_IF_IN_UCAST_PKTS,
saipb.PortStat_PORT_STAT_IF_IN_NON_UCAST_PKTS,
Expand All @@ -287,17 +295,188 @@ func (ni *Reconciler) startCounterUpdates(ctx context.Context) {
}()
}

func (ni *Reconciler) createLAG(ctx context.Context, intf ocInterface, lagType oc.E_IfAggregate_AggregationType) error {
bond := netlink.NewLinkBond(netlink.NewLinkAttrs())
bond.Name = intf.name
bond.Mode = netlink.BOND_MODE_BALANCE_XOR
if err := netlink.LinkAdd(bond); err != nil {
return fmt.Errorf("failed to create kernel lag interface: %v", err)
}
lagResp, err := ni.lagClient.CreateLag(ctx, &saipb.CreateLagRequest{
Switch: ni.switchID,
})
if err != nil {
return fmt.Errorf("failed to create router interface %q: %v", intf.name, err)
}
l, err := netlink.LinkByName(intf.name)
if err != nil {
return fmt.Errorf("failed to get bond intf %q: %v", intf.name, err)
}
rifResp, err := ni.ifaceClient.CreateRouterInterface(ctx, &saipb.CreateRouterInterfaceRequest{
Switch: ni.switchID,
Type: saipb.RouterInterfaceType_ROUTER_INTERFACE_TYPE_PORT.Enum(),
PortId: &lagResp.Oid,
VirtualRouterId: proto.Uint64(0),
SrcMacAddress: l.Attrs().HardwareAddr,
})
if err != nil {
return fmt.Errorf("failed to create router interface %q: %v", intf.name, err)
}
aggData := &interfaceData{
portID: lagResp.Oid,
rifID: rifResp.Oid,
hostifIfIndex: bond.Index,
hostifDevName: intf.name,
}

ni.getOrCreateInterface(intf.name).GetOrCreateEthernet().SetHwMacAddress(l.Attrs().HardwareAddr.String())
ni.getOrCreateInterface(intf.name).GetOrCreateEthernet().SetMacAddress(l.Attrs().HardwareAddr.String())
ni.getOrCreateInterface(intf.name).GetOrCreateAggregation().SetLagType(lagType)

sb := &ygnmi.SetBatch{}
gnmiclient.BatchUpdate(sb, ocpath.Root().Interface(intf.name).Ethernet().HwMacAddress().State(), l.Attrs().HardwareAddr.String())
gnmiclient.BatchUpdate(sb, ocpath.Root().Interface(intf.name).Ethernet().MacAddress().State(), l.Attrs().HardwareAddr.String())
gnmiclient.BatchUpdate(sb, ocpath.Root().Interface(intf.name).Aggregation().LagType().State(), lagType)
gnmiclient.BatchUpdate(sb, ocpath.Root().Interface(intf.name).AdminStatus().State(), oc.Interface_AdminStatus_UP)
gnmiclient.BatchUpdate(sb, ocpath.Root().Interface(intf.name).OperStatus().State(), oc.Interface_OperStatus_UP)
gnmiclient.BatchUpdate(sb, ocpath.Root().Interface(intf.name).Type().State(), oc.IETFInterfaces_InterfaceType_ieee8023adLag)
if _, err := sb.Set(ctx, ni.c); err != nil {
return fmt.Errorf("failed to update agg state: %v", err)
}

ni.ocInterfaceData[intf] = aggData
return nil
}

func (ni *Reconciler) addLAGMember(ctx context.Context, intf ocInterface, memberData *interfaceData, aggID string) error {
agg, ok := ni.ocInterfaceData[ocInterface{name: aggID}]
if !ok {
return fmt.Errorf("unknown aggregate id %q", aggID)
}
bondLink, err := netlink.LinkByIndex(agg.hostifIfIndex)
if err != nil {
return fmt.Errorf("failed to find bond link: %v", err)
}
memberLink, err := netlink.LinkByIndex(memberData.hostifIfIndex)
if err != nil {
return fmt.Errorf("failed to find member link: %v", err)
}

// Can only add links to a bond interface when it's down.
if memberLink.Attrs().OperState != netlink.OperDown {
log.Infof("aggregate link %v oper status %v, setting to down", intf.name, bondLink.Attrs().OperState)
if err := netlink.LinkSetDown(memberLink); err != nil {
log.Warningf("failed to set link %v down: %v", intf.name, err)
}
defer func() {
if err := netlink.LinkSetUp(memberLink); err != nil {
log.Warningf("failed to set link %v up: %v", intf.name, err)
}
}()
}

if err := netlink.LinkSetMaster(memberLink, bondLink); err != nil {
return fmt.Errorf("failed to add bond member: %v", err)
}
resp, err := ni.lagClient.CreateLagMember(ctx, &saipb.CreateLagMemberRequest{
Switch: ni.switchID,
LagId: proto.Uint64(agg.portID),
PortId: proto.Uint64(memberData.portID),
})
if err != nil {
return fmt.Errorf("failed to create lag member: %v", err)
}
sb := &ygnmi.SetBatch{}
gnmiclient.BatchUpdate(sb, ocpath.Root().Interface(intf.name).Ethernet().AggregateId().State(), aggID)
if _, err := sb.Set(ctx, ni.c); err != nil {
return fmt.Errorf("failed to update agg state: %v", err)
}
ni.getOrCreateInterface(intf.name).GetOrCreateEthernet().AggregateId = &aggID
memberData.lagMembershipID = resp.Oid
return nil
}

func (ni *Reconciler) removeLAGMember(ctx context.Context, intf ocInterface, memberData *interfaceData) error {
memberLink, err := netlink.LinkByIndex(memberData.hostifIfIndex)
if err != nil {
return fmt.Errorf("failed to find member link: %v", err)
}
if err := netlink.LinkSetNoMaster(memberLink); err != nil {
return fmt.Errorf("failed to remove bond: %v", err)
}
_, err = ni.lagClient.RemoveLagMember(ctx, &saipb.RemoveLagMemberRequest{
Oid: memberData.lagMembershipID,
})
if err != nil {
return fmt.Errorf("failed to remove lag member: %v", err)
}
sb := &ygnmi.SetBatch{}
ni.getOrCreateInterface(intf.name).GetOrCreateEthernet().AggregateId = nil
gnmiclient.BatchDelete(sb, ocpath.Root().Interface(intf.name).Ethernet().AggregateId().State())
if _, err := sb.Set(ctx, ni.c); err != nil {
return fmt.Errorf("failed to update agg state: %v", err)
}
memberData.lagMembershipID = 0
return nil
}

func (ni *Reconciler) setMinLinks(intf ocInterface, data *interfaceData, minLink uint16) error {
link, err := netlink.LinkByIndex(data.hostifIfIndex)
if err != nil {
return fmt.Errorf("failed to find link: %v", err)
}
bond, ok := link.(*netlink.Bond)
if !ok {
return fmt.Errorf("link %s is not a bond interface", intf.name)
}
bond.MinLinks = int(minLink)
if err := netlink.LinkModify(bond); err != nil {
return fmt.Errorf("failed to modify link %s: %v", intf.name, err)
}
ni.getOrCreateInterface(intf.name).GetOrCreateAggregation().SetMinLinks(minLink)
return nil
}

// reconcile compares the interface config with state and modifies state to match config.
func (ni *Reconciler) reconcile(ctx context.Context, config *oc.Interface) {
ni.stateMu.RLock()
defer ni.stateMu.RUnlock()

intf := ocInterface{name: config.GetName(), subintf: 0}
data := ni.ocInterfaceData[intf]
state := ni.getOrCreateInterface(config.GetName())
data, ok := ni.ocInterfaceData[intf]

// Create a new lag interface
if !ok && config.GetAggregation().GetLagType() != oc.IfAggregate_AggregationType_UNSET {
log.Infof("creating new lag interface: %v", intf.name)
if err := ni.createLAG(ctx, intf, config.GetAggregation().GetLagType()); err != nil {
log.Warningf("failed to create lag: %v", err)
}
}
if data == nil {
return
}
state := ni.getOrCreateInterface(config.GetName())
if config.GetOrCreateEthernet().GetAggregateId() != state.GetOrCreateEthernet().GetAggregateId() {
log.Infof("reconciling lag member intf %v: config agg id %v, state agg id %v", intf.name, config.GetEthernet().GetAggregateId(), state.GetEthernet().GetAggregateId())
if data.lagMembershipID != 0 {
log.Infof("intf %v has existing lab membership lag membership id %d", intf.name, data.lagMembershipID)
if err := ni.removeLAGMember(ctx, intf, data); err != nil {
log.Warningf("intf %v failed to remove lag member: %v", intf.name, err)
}
}
if config.GetEthernet().GetAggregateId() != "" {
log.Infof("intf %v adding to agg id %v ", intf.name, config.GetEthernet().GetAggregateId())
if err := ni.addLAGMember(ctx, intf, data, config.GetEthernet().GetAggregateId()); err != nil {
log.Warningf("intf %v failed to add lag member %v: %v", intf.name, config.GetEthernet().GetAggregateId(), err)
}
}
}

if config.GetOrCreateAggregation().GetMinLinks() != state.GetOrCreateAggregation().GetMinLinks() {
if err := ni.setMinLinks(intf, data, config.GetOrCreateAggregation().GetMinLinks()); err != nil {
log.Warningf("failed to set min links: %v", err)
}
}

if config.GetOrCreateEthernet().MacAddress != nil {
if config.GetEthernet().GetMacAddress() != state.GetEthernet().GetMacAddress() {
Expand All @@ -320,14 +499,16 @@ func (ni *Reconciler) reconcile(ctx context.Context, config *oc.Interface) {
if config.GetOrCreateSubinterface(intf.subintf).Enabled != nil {
if state.GetOrCreateSubinterface(intf.subintf).Enabled == nil || config.GetSubinterface(intf.subintf).GetEnabled() != state.GetSubinterface(intf.subintf).GetEnabled() {
log.V(1).Infof("setting interface %s enabled %t", data.hostifDevName, config.GetSubinterface(intf.subintf).GetEnabled())
_, err := ni.hostifClient.SetHostifAttribute(ctx, &saipb.SetHostifAttributeRequest{
Oid: data.hostifID,
OperStatus: proto.Bool(config.GetSubinterface(0).GetEnabled()),
})
if err != nil {
log.Warningf("Failed to set state address of hostif: %v", err)
if data.hostifID != 0 {
_, err := ni.hostifClient.SetHostifAttribute(ctx, &saipb.SetHostifAttributeRequest{
Oid: data.hostifID,
OperStatus: proto.Bool(config.GetSubinterface(0).GetEnabled()),
})
if err != nil {
log.Warningf("Failed to set state address of hostif: %v", err)
}
}
_, err = ni.portClient.SetPortAttribute(ctx, &saipb.SetPortAttributeRequest{
_, err := ni.portClient.SetPortAttribute(ctx, &saipb.SetPortAttributeRequest{
Oid: data.portID,
AdminState: proto.Bool(config.GetSubinterface(0).GetEnabled()),
})
Expand Down Expand Up @@ -726,13 +907,18 @@ func (ni *Reconciler) setupPorts(ctx context.Context) error {
return fmt.Errorf("failed to update MAC address for interface %q: %w", i.Name, err)
}
data.rifID = rifResp.Oid

ni.getOrCreateInterface(i.Name).GetOrCreateEthernet().SetHwMacAddress(tap.HardwareAddr.String())
ni.getOrCreateInterface(i.Name).GetOrCreateEthernet().SetMacAddress(tap.HardwareAddr.String())
if _, err := gnmiclient.Update(ctx, ni.c, ocpath.Root().Interface(i.Name).Ethernet().HwMacAddress().State(), tap.HardwareAddr.String()); err != nil {
return fmt.Errorf("failed to set hw addr of interface %q: %v", tap.Name, err)
}
if _, err := gnmiclient.Update(ctx, ni.c, ocpath.Root().Interface(i.Name).Ethernet().MacAddress().State(), tap.HardwareAddr.String()); err != nil {
return fmt.Errorf("failed to set hw addr of interface %q: %v", tap.Name, err)

sb := &ygnmi.SetBatch{}
gnmiclient.BatchUpdate(sb, ocpath.Root().Interface(i.Name).Ethernet().HwMacAddress().State(), tap.HardwareAddr.String())
gnmiclient.BatchUpdate(sb, ocpath.Root().Interface(i.Name).Ethernet().MacAddress().State(), tap.HardwareAddr.String())
gnmiclient.BatchUpdate(sb, ocpath.Root().Interface(i.Name).OperStatus().State(), oc.Interface_OperStatus_UP)
gnmiclient.BatchUpdate(sb, ocpath.Root().Interface(i.Name).AdminStatus().State(), oc.Interface_AdminStatus_UP)
gnmiclient.BatchUpdate(sb, ocpath.Root().Interface(i.Name).Subinterface(0).AdminStatus().State(), oc.Interface_AdminStatus_UP)
if _, err := sb.Set(ctx, ni.c); err != nil {
log.Warningf("failed to set link status: %v", err)
}
ni.ocInterfaceData[ocIntf] = data
}
Expand Down
2 changes: 1 addition & 1 deletion dataplane/saiserver/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ func (sw *saiSwitch) PortStateChangeNotification(_ *saipb.PortStateChangeNotific
continue
}
oType := sw.mgr.GetType(ed.GetPort().GetPortId().GetObjectId().GetId())
if oType != saipb.ObjectType_OBJECT_TYPE_PORT {
if oType != saipb.ObjectType_OBJECT_TYPE_PORT && oType != saipb.ObjectType_OBJECT_TYPE_LAG && oType != saipb.ObjectType_OBJECT_TYPE_BRIDGE_PORT {
log.Infof("skipping port state event for type %v", oType)
continue
}
Expand Down
22 changes: 22 additions & 0 deletions integration_tests/onedut_oneotg_tests/aggregate/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "aggregate_test",
srcs = ["aggregate_test.go"],
data = ["testbed.pb.txt","topology.pb.txt"],
tags = [
"exclusive",
],
deps = [
"//internal/attrs",
"//internal/binding",
"@com_github_open_traffic_generator_snappi_gosnappi//:gosnappi",
"@com_github_openconfig_ondatra//:ondatra",
"@com_github_openconfig_ondatra//gnmi",
"@com_github_openconfig_ondatra//gnmi/oc",
"@com_github_openconfig_ondatra//gnmi/otg",
"@com_github_openconfig_ondatra//netutil",
"@com_github_openconfig_ygnmi//ygnmi",
"@com_github_openconfig_ygot//ygot",
],
)
Loading

0 comments on commit c6e3def

Please sign in to comment.