diff --git a/collector/collector.go b/collector/collector.go index fd0c0e1..f4672af 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -14,6 +14,7 @@ import ( "github.com/sirupsen/logrus" "inet.af/netaddr" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1" ) func New( @@ -96,7 +97,8 @@ func (a *Collector) run() error { return err } - podIPs := make(map[netaddr.IP]struct{}, 0) + // Filter pods for tracking. + ips := make(map[netaddr.IP]struct{}, 0) var filteredPods []*corev1.Pod for _, pod := range pods { podIP := pod.Status.PodIP @@ -110,11 +112,37 @@ func (a *Collector) run() error { if _, found := a.excludeNsMap[pod.Namespace]; found { continue } - podIPs[netaddr.MustParseIP(pod.Status.PodIP)] = struct{}{} + ips[netaddr.MustParseIP(pod.Status.PodIP)] = struct{}{} filteredPods = append(filteredPods, pod) } - conns, err := a.conntracker.ListEntries(conntrack.FilterByIPs(podIPs)) + // Add special host network pod to handle host network traffic. + srcNode, err := a.kubeWatcher.GetNodeByName(a.cfg.NodeName) + if err != nil && !errors.Is(err, kube.ErrNotFound) { + return err + } + if srcNode != nil { + nodeIP := getNodePrivateIP(srcNode) + podIP := nodeIP.String() + if podIP != "" { + hostNetworkPod := &corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Name: "host-network", + Namespace: "host-network", + }, + Spec: corev1.PodSpec{ + NodeName: a.cfg.NodeName, + }, + Status: corev1.PodStatus{ + PodIP: podIP, + }, + } + filteredPods = append(filteredPods, hostNetworkPod) + ips[nodeIP] = struct{}{} + } + } + + conns, err := a.conntracker.ListEntries(conntrack.FilterByIPs(ips)) if err != nil { return err } @@ -140,7 +168,6 @@ func (a *Collector) run() error { a.log.Warning("dropping metric event, channel is full") } } - records = append(records, podConns...) } @@ -153,6 +180,7 @@ func (a *Collector) markProcessedEntries(entries []conntrack.Entry) { newCache := make(map[uint64]*conntrack.Entry) for _, e := range entries { e := e + // TODO: Cache key is now calculated 2 times. Precalculate once during conntrack records fetch. newCache[entryKey(&e)] = &e } a.log.Infof("updating conntrack records, old length: %d, new length: %d", len(a.processedEntriesCache), len(newCache)) @@ -346,3 +374,12 @@ func ipType(ip netaddr.IP) string { } return "public" } + +func getNodePrivateIP(n *corev1.Node) netaddr.IP { + for _, addr := range n.Status.Addresses { + if addr.Type == corev1.NodeInternalIP { + return netaddr.MustParseIP(addr.Address) + } + } + return netaddr.IP{} +} diff --git a/collector/collector_test.go b/collector/collector_test.go index e69fa07..1143773 100644 --- a/collector/collector_test.go +++ b/collector/collector_test.go @@ -46,6 +46,15 @@ func TestCollector(t *testing.T) { RxPackets: 3, Proto: 6, }, + { + Src: netaddr.MustParseIPPort("10.10.0.16:40002"), + Dst: netaddr.MustParseIPPort("10.10.0.17:80"), + TxBytes: 101, + TxPackets: 2, + RxBytes: 201, + RxPackets: 3, + Proto: 6, + }, } connTracker := &mockConntrack{ @@ -98,6 +107,14 @@ func TestCollector(t *testing.T) { "topology.kubernetes.io/zone": "us-east1-a", }, }, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + { + Type: corev1.NodeInternalIP, + Address: "10.10.0.16", + }, + }, + }, }, { ObjectMeta: v1.ObjectMeta{ @@ -138,7 +155,7 @@ func TestCollector(t *testing.T) { <-done // First entry is summed, second is not added as it didn't change - r.Len(metrics, 2) + r.Len(metrics, 3) m1 := metrics[0] r.Equal(PodNetworkMetric{ SrcIP: "10.14.7.12", @@ -180,6 +197,27 @@ func TestCollector(t *testing.T) { Proto: "TCP", TS: 1577840461000, }, m2) + + m3 := metrics[2] + r.Equal(PodNetworkMetric{ + SrcIP: "10.10.0.16", + SrcPod: "host-network", + SrcNamespace: "host-network", + SrcNode: "n1", + SrcZone: "us-east1-a", + DstIP: "10.10.0.17", + DstIPType: "private", + DstPod: "", + DstNamespace: "", + DstNode: "", + DstZone: "", + TxBytes: 101, + TxPackets: 2, + RxBytes: 201, + RxPackets: 3, + Proto: "TCP", + TS: 1577840461000, + }, m3) } func TestGroupPodConns(t *testing.T) { diff --git a/examples/vector/vector-values.yaml b/examples/vector/vector-values.yaml index b58d158..378d008 100644 --- a/examples/vector/vector-values.yaml +++ b/examples/vector/vector-values.yaml @@ -37,6 +37,10 @@ customConfig: tags: src_pod: '{{ "{{ src_pod }}" }}' dst_pod: '{{ "{{ dst_pod }}" }}' + src_node: '{{ "{{ src_node }}" }}' + dst_node: '{{ "{{ dst_node }}" }}' + src_zone: '{{ "{{ src_zone }}" }}' + dst_zone: '{{ "{{ dst_zone }}" }}' src_namespace: '{{ "{{ src_namespace }}" }}' dst_namespace: '{{ "{{ dst_namespace }}" }}' src_ip: '{{ "{{ src_ip }}" }}' @@ -54,6 +58,10 @@ customConfig: tags: src_pod: '{{ "{{ src_pod }}" }}' dst_pod: '{{ "{{ dst_pod }}" }}' + src_node: '{{ "{{ src_node }}" }}' + dst_node: '{{ "{{ dst_node }}" }}' + src_zone: '{{ "{{ src_zone }}" }}' + dst_zone: '{{ "{{ dst_zone }}" }}' src_namespace: '{{ "{{ src_namespace }}" }}' dst_namespace: '{{ "{{ dst_namespace }}" }}' src_ip: '{{ "{{ src_ip }}" }}' diff --git a/kube/watcher.go b/kube/watcher.go index 7cc9723..994eece 100644 --- a/kube/watcher.go +++ b/kube/watcher.go @@ -183,7 +183,9 @@ func transformFunc(obj interface{}) (interface{}, error) { case *corev1.Node: t.SetManagedFields(nil) t.Spec = corev1.NodeSpec{} - t.Status = corev1.NodeStatus{} + t.Status = corev1.NodeStatus{ + Addresses: t.Status.Addresses, + } return t, nil } return nil, fmt.Errorf("unknown type %T", obj)