Skip to content

Commit

Permalink
Use local temp pods by ip cache (#330)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao authored Aug 13, 2024
1 parent 95c9e65 commit f72e121
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 16 deletions.
11 changes: 0 additions & 11 deletions cmd/agent/daemon/state/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,6 @@ func NewController(
panic(err)
}

// IP info cache is used only in netflow pipeline.
// It's safe to use non synced lru since it's accessed form on goroutine.
ipInfoCache, err := freelru.New[netip.Addr, *kubepb.IPInfo](1024, func(k netip.Addr) uint32 {
return uint32(xxhash.Sum64(k.AsSlice()))
})
if err != nil {
panic(err)
}

// Conntrack cache is used only in netflow pipeline.
// It's safe to use non synced lru since it's accessed form on goroutine.
conntrackCacheKey := xxhash.New()
Expand Down Expand Up @@ -139,7 +130,6 @@ func NewController(
mutedNamespaces: map[string]struct{}{},
netflows: map[uint64]*netflowVal{},
dnsCache: dnsCache,
ipInfoCache: ipInfoCache,
podCache: podCache,
conntrackCache: conntrackCache,
processTreeCollector: processTreeCollector,
Expand Down Expand Up @@ -177,7 +167,6 @@ type Controller struct {
kubeClient kubepb.KubeAPIClient
dnsCache *freelru.SyncedLRU[uint64, *freelru.SyncedLRU[netip.Addr, string]]
podCache *freelru.SyncedLRU[string, *kubepb.Pod]
ipInfoCache *freelru.LRU[netip.Addr, *kubepb.IPInfo]
conntrackCache *freelru.LRU[types.AddrTuple, netip.AddrPort]
}

Expand Down
13 changes: 8 additions & 5 deletions cmd/agent/daemon/state/netflow_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func (c *Controller) enqueueNetflowExport(now time.Time) {
)
}()

podsByIPCache := map[netip.Addr]*kubepb.IPInfo{}

for key, netflow := range c.netflows {
// Flow was exported before and doesn't have new changes. Delete it and continue.
if netflow.exportedAt.After(time.UnixMicro(int64(netflow.event.Context.Ts) / 1000)) {
Expand Down Expand Up @@ -152,6 +154,7 @@ func (c *Controller) enqueueNetflowExport(now time.Time) {
pbNetFlow.Destinations = make([]*castpb.NetflowDestination, 0, len(activeNetflowDests))
for _, dest := range activeNetflowDests {
flowDest := c.toProtoNetflowDest(
podsByIPCache,
netflow.event.Context.CgroupID,
args.Tuple.Src,
dest.addrPort,
Expand Down Expand Up @@ -236,7 +239,7 @@ func (c *Controller) toProtoNetflow(e *types.Event, args *types.NetFlowBaseArgs)
return res
}

func (c *Controller) toProtoNetflowDest(cgroupID uint64, src, dst netip.AddrPort, txBytes, rxBytes, txPackets, rxPackets uint64) *castpb.NetflowDestination {
func (c *Controller) toProtoNetflowDest(podsByIPCache map[netip.Addr]*kubepb.IPInfo, cgroupID uint64, src, dst netip.AddrPort, txBytes, rxBytes, txPackets, rxPackets uint64) *castpb.NetflowDestination {
dns := c.getAddrDnsQuestion(cgroupID, dst.Addr())

if c.clusterInfo.serviceCidr.Contains(dst.Addr()) {
Expand All @@ -256,7 +259,7 @@ func (c *Controller) toProtoNetflowDest(cgroupID uint64, src, dst netip.AddrPort
}

if c.clusterInfo.serviceCidr.Contains(dst.Addr()) || c.clusterInfo.podCidr.Contains(dst.Addr()) {
ipInfo, found := c.getIPInfo(dst.Addr())
ipInfo, found := c.getIPInfo(podsByIPCache, dst.Addr())
if found {
res.PodName = ipInfo.PodName
res.Namespace = ipInfo.Namespace
Expand All @@ -268,8 +271,8 @@ func (c *Controller) toProtoNetflowDest(cgroupID uint64, src, dst netip.AddrPort
return res
}

func (c *Controller) getIPInfo(addr netip.Addr) (*kubepb.IPInfo, bool) {
ipInfo, found := c.ipInfoCache.Get(addr)
func (c *Controller) getIPInfo(podsByIPCache map[netip.Addr]*kubepb.IPInfo, addr netip.Addr) (*kubepb.IPInfo, bool) {
ipInfo, found := podsByIPCache[addr]
if !found {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
Expand All @@ -279,7 +282,7 @@ func (c *Controller) getIPInfo(addr netip.Addr) (*kubepb.IPInfo, bool) {
return nil, false
}
ipInfo = resp.Info
c.ipInfoCache.Add(addr, ipInfo)
podsByIPCache[addr] = ipInfo
}
return ipInfo, true
}
Expand Down

0 comments on commit f72e121

Please sign in to comment.