Skip to content

Commit f74eee0

Browse files
authored
[FlowExporter] More efficient IP checks (#6960)
The FlowExporter in the Agent queries the NodeRouteController to determine whether the source / destination IPs are Pod IPs (NodeIPAM only). Prior to this change, these checks were expensive, involving an indexer lookup and conversions between different IP formats. The new implementation is about 10x faster, and peforms no memory allocations. The new implementation introduces a new set in the NodeRouteController, dedicated to storing all the PodCIDRs in the cluster. While I considered removing the dependency of the FlowExporter on the NodeRouteController altogether, it would have been a much bigger change. Additionally, in the long term, we could consider removing these checks from the FlowExporter altogether, and pushing the logic to the FlowAggregator. We also make a few additional changes to the FlowExporter: * more consistently ignore connections where the source / destination IP is a gateway IP * classify Pod-to-Service traffic where the destination IP is not a Pod IP as Pod-to-External * log a warning if FlowExporter is enabled alongside AntreaIPAM Signed-off-by: Antonin Bas <[email protected]>
1 parent b3c40b0 commit f74eee0

File tree

6 files changed

+285
-141
lines changed

6 files changed

+285
-141
lines changed

cmd/antrea-agent/options.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,10 @@ func (o *Options) validateAntreaProxyConfig(encapMode config.TrafficEncapModeTyp
275275
}
276276

277277
func (o *Options) validateFlowExporterConfig() error {
278-
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
278+
if features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable {
279+
if features.DefaultFeatureGate.Enabled(features.AntreaIPAM) {
280+
klog.InfoS("The FlowExporter feature does not support AntreaIPAM Pods")
281+
}
279282
host, port, proto, err := flowexport.ParseFlowCollectorAddr(o.config.FlowExporter.FlowCollectorAddr, defaultFlowCollectorPort, defaultFlowCollectorTransport)
280283
if err != nil {
281284
return err

pkg/agent/controller/noderoute/node_route_controller.go

+84-34
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@ import (
1818
"context"
1919
"fmt"
2020
"net"
21+
"net/netip"
22+
"sync"
2123
"time"
2224

2325
"github.com/containernetworking/plugins/pkg/ip"
2426
corev1 "k8s.io/api/core/v1"
2527
"k8s.io/apimachinery/pkg/labels"
28+
"k8s.io/apimachinery/pkg/util/sets"
2629
"k8s.io/apimachinery/pkg/util/wait"
2730
coreinformers "k8s.io/client-go/informers/core/v1"
2831
corelisters "k8s.io/client-go/listers/core/v1"
@@ -77,7 +80,13 @@ type Controller struct {
7780
// installedNodes records routes and flows installation states of Nodes.
7881
// The key is the host name of the Node, the value is the nodeRouteInfo of the Node.
7982
// A node will be in the map after its flows and routes are installed successfully.
80-
installedNodes cache.Indexer
83+
installedNodes cache.Indexer
84+
// podSubnetsMutex protects access to the podSubnets set.
85+
podSubnetsMutex sync.RWMutex
86+
// podSubnets is a set which stores all known PodCIDRs in the cluster as masked netip.Prefix objects.
87+
podSubnets sets.Set[netip.Prefix]
88+
maskSizeV4 int
89+
maskSizeV6 int
8190
wireGuardClient wireguard.Interface
8291
// ipsecCertificateManager is useful for determining whether the ipsec certificate has been configured
8392
// or not when IPsec is enabled with "cert" mode. The NodeRouteController must wait for the certificate
@@ -124,10 +133,21 @@ func NewNodeRouteController(
124133
},
125134
),
126135
installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc}),
136+
podSubnets: sets.New[netip.Prefix](),
127137
wireGuardClient: wireguardClient,
128138
ipsecCertificateManager: ipsecCertificateManager,
129139
flowRestoreCompleteWait: flowRestoreCompleteWait.Increment(),
130140
}
141+
if nodeConfig.PodIPv4CIDR != nil {
142+
prefix, _ := cidrToPrefix(nodeConfig.PodIPv4CIDR)
143+
controller.podSubnets.Insert(prefix)
144+
controller.maskSizeV4 = prefix.Bits()
145+
}
146+
if nodeConfig.PodIPv6CIDR != nil {
147+
prefix, _ := cidrToPrefix(nodeConfig.PodIPv6CIDR)
148+
controller.podSubnets.Insert(prefix)
149+
controller.maskSizeV6 = prefix.Bits()
150+
}
131151
registration, _ := nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
132152
cache.ResourceEventHandlerDetailedFuncs{
133153
AddFunc: func(cur interface{}, isInInitialList bool) {
@@ -379,7 +399,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
379399
go func() {
380400
// When the initial list of Nodes has been processed, we decrement flowRestoreCompleteWait.
381401
err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) {
382-
return c.hasProcessedInitialList.HasSynced(), nil
402+
return c.HasSynced(), nil
383403
})
384404
// An error here means the context has been cancelled, which means that the stopCh
385405
// has been closed. While it is still possible for c.hasProcessedInitialList.HasSynced
@@ -395,6 +415,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
395415
<-stopCh
396416
}
397417

418+
// HasSynced returns true when the initial list of Nodes has been processed by the controller.
419+
func (c *Controller) HasSynced() bool {
420+
return c.hasProcessedInitialList.HasSynced()
421+
}
422+
398423
// worker is a long-running function that will continually call the processNextWorkItem function in
399424
// order to read and process a message on the workqueue.
400425
func (c *Controller) worker() {
@@ -482,6 +507,12 @@ func (c *Controller) deleteNodeRoute(nodeName string) error {
482507
return fmt.Errorf("failed to uninstall flows to Node %s: %v", nodeName, err)
483508
}
484509
c.installedNodes.Delete(obj)
510+
func() {
511+
subnets, _ := cidrsToPrefixes(nodeRouteInfo.podCIDRs)
512+
c.podSubnetsMutex.Lock()
513+
defer c.podSubnetsMutex.Unlock()
514+
c.podSubnets.Delete(subnets...)
515+
}()
485516

486517
if c.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeIPSec {
487518
interfaceConfig, ok := c.interfaceStore.GetNodeTunnelInterface(nodeName)
@@ -575,6 +606,13 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error {
575606
peerNodeIP = peerNodeIPs.IPv6
576607
}
577608

609+
func() {
610+
subnet, _ := cidrToPrefix(peerPodCIDR)
611+
c.podSubnetsMutex.Lock()
612+
defer c.podSubnetsMutex.Unlock()
613+
c.podSubnets.Insert(subnet)
614+
}()
615+
578616
klog.InfoS("Adding route and flow to Node", "Node", nodeName, "podCIDR", podCIDR,
579617
"peerNodeIP", peerNodeIP)
580618
}
@@ -773,40 +811,31 @@ func ParseTunnelInterfaceConfig(
773811
return interfaceConfig
774812
}
775813

776-
func (c *Controller) IPInPodSubnets(ip net.IP) bool {
777-
var ipCIDR *net.IPNet
778-
var curNodeCIDRStr string
779-
if ip.To4() != nil {
780-
var podIPv4CIDRMaskSize int
781-
if c.nodeConfig.PodIPv4CIDR != nil {
782-
curNodeCIDRStr = c.nodeConfig.PodIPv4CIDR.String()
783-
podIPv4CIDRMaskSize, _ = c.nodeConfig.PodIPv4CIDR.Mask.Size()
784-
} else {
785-
return false
786-
}
787-
v4Mask := net.CIDRMask(podIPv4CIDRMaskSize, utilip.V4BitLen)
788-
ipCIDR = &net.IPNet{
789-
IP: ip.Mask(v4Mask),
790-
Mask: v4Mask,
791-
}
792-
814+
func (c *Controller) findPodSubnetForIP(ip netip.Addr) (netip.Prefix, bool) {
815+
var maskSize int
816+
if ip.Is4() {
817+
maskSize = c.maskSizeV4
793818
} else {
794-
var podIPv6CIDRMaskSize int
795-
if c.nodeConfig.PodIPv6CIDR != nil {
796-
curNodeCIDRStr = c.nodeConfig.PodIPv6CIDR.String()
797-
podIPv6CIDRMaskSize, _ = c.nodeConfig.PodIPv6CIDR.Mask.Size()
798-
} else {
799-
return false
800-
}
801-
v6Mask := net.CIDRMask(podIPv6CIDRMaskSize, utilip.V6BitLen)
802-
ipCIDR = &net.IPNet{
803-
IP: ip.Mask(v6Mask),
804-
Mask: v6Mask,
805-
}
819+
maskSize = c.maskSizeV6
806820
}
807-
ipCIDRStr := ipCIDR.String()
808-
nodeInCluster, _ := c.installedNodes.ByIndex(nodeRouteInfoPodCIDRIndexName, ipCIDRStr)
809-
return len(nodeInCluster) > 0 || ipCIDRStr == curNodeCIDRStr
821+
if maskSize == 0 {
822+
return netip.Prefix{}, false
823+
}
824+
prefix, _ := ip.Prefix(maskSize)
825+
c.podSubnetsMutex.RLock()
826+
defer c.podSubnetsMutex.RUnlock()
827+
return prefix, c.podSubnets.Has(prefix)
828+
}
829+
830+
// LookupIPInPodSubnets returns two boolean values. The first one indicates whether the IP can be
831+
// found in a PodCIDR for one of the cluster Nodes. The second one indicates whether the IP is used
832+
// as a gateway IP. The second boolean value can only be true if the first one is true.
833+
func (c *Controller) LookupIPInPodSubnets(ip netip.Addr) (bool, bool) {
834+
prefix, ok := c.findPodSubnetForIP(ip)
835+
if !ok {
836+
return false, false
837+
}
838+
return ok, ip == util.GetGatewayIPForPodPrefix(prefix)
810839
}
811840

812841
// getNodeMAC gets Node's br-int MAC from its annotation. It is only for Windows Noencap mode.
@@ -821,3 +850,24 @@ func getNodeMAC(node *corev1.Node) (net.HardwareAddr, error) {
821850
}
822851
return mac, nil
823852
}
853+
854+
func cidrToPrefix(cidr *net.IPNet) (netip.Prefix, error) {
855+
addr, ok := netip.AddrFromSlice(cidr.IP)
856+
if !ok {
857+
return netip.Prefix{}, fmt.Errorf("invalid IP in CIDR: %v", cidr)
858+
}
859+
size, _ := cidr.Mask.Size()
860+
return addr.Prefix(size)
861+
}
862+
863+
func cidrsToPrefixes(cidrs []*net.IPNet) ([]netip.Prefix, error) {
864+
prefixes := make([]netip.Prefix, len(cidrs))
865+
for idx := range cidrs {
866+
prefix, err := cidrToPrefix(cidrs[idx])
867+
if err != nil {
868+
return nil, err
869+
}
870+
prefixes[idx] = prefix
871+
}
872+
return prefixes, nil
873+
}

0 commit comments

Comments
 (0)