Skip to content

Commit

Permalink
issue#120: Support for Customizing Private Subnet Recognition in Egre…
Browse files Browse the repository at this point in the history
…ssd Metrics
  • Loading branch information
Barrera, Angel authored and Barrera, Angel committed Jan 24, 2025
1 parent 376e6cb commit 2022ca6
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 21 deletions.
35 changes: 28 additions & 7 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"errors"
"flag"
"fmt"
"net"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"strings"
"syscall"
"time"

Expand Down Expand Up @@ -52,6 +54,11 @@ var (

// Explicitly allow to set conntrack client mode.
ctMode = flag.String("ct-mode", "", "Explicitly set conntract mode (netfilter,cilium)")

// Flag that specifies a set of custom private cidr ranges
// It could be the case that you use a public cidr in your private network
// and you want to exclude it to be marked as public
privateCIDRs = flag.String("private-cidrs", "", "Comma-separated list of private CIDRs")
)

// These should be set via `go build` during a release.
Expand Down Expand Up @@ -133,14 +140,28 @@ func run(log logrus.FieldLogger) error {
ip2dns = dns.NewIP2DNS(tracer, log)
}

var customPrivateCIDRs []*net.IPNet
// privateCIDRs is a comma separated list of private CIDRs
if *privateCIDRs != "" {
customPrivateCIDRs = make([]*net.IPNet, 0)
for _, cidr := range strings.Split(*privateCIDRs, ",") {
_, ipnet, err := net.ParseCIDR(cidr)
if err != nil {
return fmt.Errorf("parsing custom private CIDRs: %w", err)
}
customPrivateCIDRs = append(customPrivateCIDRs, ipnet)
}
}

cfg := collector.Config{
ReadInterval: *readInterval,
CleanupInterval: *cleanupInterval,
NodeName: os.Getenv("NODE_NAME"),
ExcludeNamespaces: *excludeNamespaces,
GroupPublicIPs: *groupPublicIPs,
SendTrafficDelta: *sendTrafficDelta,
LogEntries: *logEntries,
ReadInterval: *readInterval,
CleanupInterval: *cleanupInterval,
NodeName: os.Getenv("NODE_NAME"),
ExcludeNamespaces: *excludeNamespaces,
GroupPublicIPs: *groupPublicIPs,
SendTrafficDelta: *sendTrafficDelta,
LogEntries: *logEntries,
CustomPrivateCIDRs: customPrivateCIDRs,
}
coll := collector.New(
cfg,
Expand Down
24 changes: 23 additions & 1 deletion cmd/exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"errors"
"flag"
"fmt"
"net"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"strings"
"syscall"
"time"

Expand All @@ -34,6 +36,11 @@ var (
logLevel = flag.String("log-level", logrus.InfoLevel.String(), "Log level")
httpListenPort = flag.Int("http-listen-port", 6060, "HTTP server listen port")
configPath = flag.String("config-path", "/etc/egressd/config/config.yaml", "Path to exporter config path")

// Flag that specifies a set of custom private cidr ranges
// It could be the case that you use a public cidr in your private network
// and you want to exclude it to be marked as public
privateCIDRs = flag.String("private-cidrs", "", "Comma-separated list of private CIDRs")
)

// These should be set via `go build` during a release.
Expand Down Expand Up @@ -113,11 +120,26 @@ func run(log logrus.FieldLogger) error {
nodeByIP: nodeByIPCache,
}

var customPrivateCIDRs []*net.IPNet
// privateCIDRs is a comma separated list of private CIDRs
if *privateCIDRs != "" {
customPrivateCIDRs = make([]*net.IPNet, 0)
for _, cidr := range strings.Split(*privateCIDRs, ",") {
_, ipnet, err := net.ParseCIDR(cidr)
if err != nil {
return fmt.Errorf("parsing custom private CIDRs: %w", err)
}
customPrivateCIDRs = append(customPrivateCIDRs, ipnet)
}
}

cfg, err := config.Load(*configPath)
if err != nil {
return err
}

cfg.CustomPrivateCIDRs = customPrivateCIDRs

var sinksList []sinks.Sink
for name, s := range cfg.Sinks {
if s.HTTPConfig != nil {
Expand All @@ -129,7 +151,7 @@ func run(log logrus.FieldLogger) error {
))
} else if s.PromRemoteWriteConfig != nil {
sinksList = append(sinksList, sinks.NewPromRemoteWriteSink(
log, name, *s.PromRemoteWriteConfig,
log, name, *s.PromRemoteWriteConfig, cfg,
))
}
}
Expand Down
15 changes: 13 additions & 2 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/binary"
"fmt"
"hash/maphash"
"net"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -49,6 +50,9 @@ type Config struct {
// or as the constantly growing counter value
SendTrafficDelta bool
LogEntries bool

// CustomPrivateCIDRs is a list of custom private CIDRs that should be considered as private networks.
CustomPrivateCIDRs []*net.IPNet
}

type podsWatcher interface {
Expand Down Expand Up @@ -255,7 +259,7 @@ func (c *Collector) collect() error {
continue
}

if c.cfg.GroupPublicIPs && !isPrivateNetwork(conn.Dst.IP()) {
if c.cfg.GroupPublicIPs && !IsPrivateNetwork(conn.Dst.IP(), c.cfg.CustomPrivateCIDRs...) {
conn.Dst = netaddr.IPPortFrom(netaddr.IPv4(0, 0, 0, 0), 0)
}

Expand Down Expand Up @@ -385,7 +389,14 @@ func conntrackEntryKey(conn *conntrack.Entry) uint64 {
return res
}

func isPrivateNetwork(ip netaddr.IP) bool {
func IsPrivateNetwork(ip netaddr.IP, customCIDRs ...*net.IPNet) bool {
// Check if the IP is in the custom CIDRs
for _, cidr := range customCIDRs {
if cidr.Contains(ip.IPAddr().IP) {
return true
}
}

return ip.IsPrivate() ||
ip.IsLoopback() ||
ip.IsMulticast() ||
Expand Down
4 changes: 4 additions & 0 deletions exporter/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"errors"
"net"
"os"
"time"

Expand All @@ -16,6 +17,9 @@ type Config struct {
CollectorsConcurrentFetchCount int `envconfig:"COLLECTORS_CONCURRENT_FETCH_COUNT" yaml:"collectorsConcurrentFetchCount"`
CollectorFetchTimeout time.Duration `envconfig:"COLLECTOR_FETCH_TIMEOUT" yaml:"collectorFetchTimeout"`
Sinks map[string]Sink `yaml:"sinks"`

// CustomPrivateCIDRs is a list of custom private CIDRs that should be considered as private networks.
CustomPrivateCIDRs []*net.IPNet
}

type SinkType string
Expand Down
3 changes: 3 additions & 0 deletions exporter/config/config_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"net"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -51,6 +52,7 @@ sinks:
r := require.New(t)
expectedCfg := newTestConfig()
expectedCfg.PodNamespace = "from-env"
expectedCfg.CustomPrivateCIDRs = []*net.IPNet{}
r.NoError(os.Setenv("POD_NAMESPACE", expectedCfg.PodNamespace))

cfgBytes, err := yaml.Marshal(expectedCfg)
Expand All @@ -71,6 +73,7 @@ func newTestConfig() Config {
ExportInterval: 60 * time.Second,
CollectorsConcurrentFetchCount: 20,
CollectorFetchTimeout: 3 * time.Second,
CustomPrivateCIDRs: nil,
Sinks: map[string]Sink{
"castai": {
HTTPConfig: &SinkHTTPConfig{
Expand Down
3 changes: 2 additions & 1 deletion exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"

"github.com/castai/egressd/collector"
"github.com/castai/egressd/dns"
"github.com/castai/egressd/exporter/config"
"github.com/castai/egressd/exporter/sinks"
Expand Down Expand Up @@ -225,7 +226,7 @@ func (e *Exporter) buildPodNetworkMetric(conn *pb.RawNetworkMetric) (*pb.PodNetw
}

// Try to find destination pod and node info.
if dstIP.IsPrivate() {
if collector.IsPrivateNetwork(dstIP, e.cfg.CustomPrivateCIDRs...) {
dstIPStr := dstIP.String()
// First try finding destination pod by ip.
dstPod, err := e.kubeWatcher.GetPodByIP(dstIPStr)
Expand Down
28 changes: 18 additions & 10 deletions exporter/sinks/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,23 @@ import (
"strings"
"time"

"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"inet.af/netaddr"

"github.com/castai/egressd/collector"
"github.com/castai/egressd/exporter/config"
"github.com/castai/egressd/pb"
"github.com/castai/promwrite"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"inet.af/netaddr"
)

type promWriter interface {
Write(ctx context.Context, req *promwrite.WriteRequest, options ...promwrite.WriteOption) (*promwrite.WriteResponse, error)
}

func NewPromRemoteWriteSink(log logrus.FieldLogger, sinkName string, cfg config.SinkPromRemoteWriteConfig) Sink {
func NewPromRemoteWriteSink(log logrus.FieldLogger, sinkName string, cfg config.SinkPromRemoteWriteConfig, exporterConfig config.Config) Sink {

return &PromRemoteWriteSink{
exporterConfig: exporterConfig,
log: log.WithFields(map[string]interface{}{
"sink_type": "prom_remote_write",
"sink_name": sinkName,
Expand All @@ -38,10 +39,11 @@ func timeGetter() time.Time {
}

type PromRemoteWriteSink struct {
cfg config.SinkPromRemoteWriteConfig
log logrus.FieldLogger
client promWriter
timeGetter func() time.Time
exporterConfig config.Config
cfg config.SinkPromRemoteWriteConfig
log logrus.FieldLogger
client promWriter
timeGetter func() time.Time
}

func (s *PromRemoteWriteSink) Push(ctx context.Context, batch *pb.PodNetworkMetricBatch) error {
Expand Down Expand Up @@ -70,7 +72,13 @@ func (s *PromRemoteWriteSink) pushMetric(ctx context.Context, batch *pb.PodNetwo
for _, m := range batch.Items {
dstIP, _ := netaddr.ParseIP(m.DstIp)
dstIPType := "public"
if dstIP.IsPrivate() {
s.log.Debug("dstIP: ", dstIP)
// Print the different CustomPrivateCIDRs
for _, cidr := range s.exporterConfig.CustomPrivateCIDRs {
s.log.Debug("cidr: ", cidr)
}
if collector.IsPrivateNetwork(dstIP, s.exporterConfig.CustomPrivateCIDRs...) {
s.log.Debug("dstIP is private")
dstIPType = "private"
}
// Initial labels, sorted by label name asc.
Expand Down

0 comments on commit 2022ca6

Please sign in to comment.