Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add host network tracking #12

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/sirupsen/logrus"
"inet.af/netaddr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
)

func New(
Expand Down Expand Up @@ -96,7 +97,8 @@ func (a *Collector) run() error {
return err
}

podIPs := make(map[netaddr.IP]struct{}, 0)
// Filter pods for tracking.
ips := make(map[netaddr.IP]struct{}, 0)
var filteredPods []*corev1.Pod
for _, pod := range pods {
podIP := pod.Status.PodIP
Expand All @@ -110,11 +112,37 @@ func (a *Collector) run() error {
if _, found := a.excludeNsMap[pod.Namespace]; found {
continue
}
podIPs[netaddr.MustParseIP(pod.Status.PodIP)] = struct{}{}
ips[netaddr.MustParseIP(pod.Status.PodIP)] = struct{}{}
filteredPods = append(filteredPods, pod)
}

conns, err := a.conntracker.ListEntries(conntrack.FilterByIPs(podIPs))
// Add special host network pod to handle host network traffic.
srcNode, err := a.kubeWatcher.GetNodeByName(a.cfg.NodeName)
if err != nil && !errors.Is(err, kube.ErrNotFound) {
return err
}
if srcNode != nil {
nodeIP := getNodePrivateIP(srcNode)
podIP := nodeIP.String()
if podIP != "" {
hostNetworkPod := &corev1.Pod{
ObjectMeta: v1.ObjectMeta{
Name: "host-network",
Namespace: "host-network",
},
Spec: corev1.PodSpec{
NodeName: a.cfg.NodeName,
},
Status: corev1.PodStatus{
PodIP: podIP,
},
}
filteredPods = append(filteredPods, hostNetworkPod)
ips[nodeIP] = struct{}{}
}
}

conns, err := a.conntracker.ListEntries(conntrack.FilterByIPs(ips))
if err != nil {
return err
}
Expand All @@ -140,7 +168,6 @@ func (a *Collector) run() error {
a.log.Warning("dropping metric event, channel is full")
}
}

records = append(records, podConns...)
}

Expand All @@ -153,6 +180,7 @@ func (a *Collector) markProcessedEntries(entries []conntrack.Entry) {
newCache := make(map[uint64]*conntrack.Entry)
for _, e := range entries {
e := e
// TODO: Cache key is now calculated 2 times. Precalculate once during conntrack records fetch.
newCache[entryKey(&e)] = &e
}
a.log.Infof("updating conntrack records, old length: %d, new length: %d", len(a.processedEntriesCache), len(newCache))
Expand Down Expand Up @@ -346,3 +374,12 @@ func ipType(ip netaddr.IP) string {
}
return "public"
}

func getNodePrivateIP(n *corev1.Node) netaddr.IP {
for _, addr := range n.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
return netaddr.MustParseIP(addr.Address)
}
}
return netaddr.IP{}
}
40 changes: 39 additions & 1 deletion collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ func TestCollector(t *testing.T) {
RxPackets: 3,
Proto: 6,
},
{
Src: netaddr.MustParseIPPort("10.10.0.16:40002"),
Dst: netaddr.MustParseIPPort("10.10.0.17:80"),
TxBytes: 101,
TxPackets: 2,
RxBytes: 201,
RxPackets: 3,
Proto: 6,
},
}

connTracker := &mockConntrack{
Expand Down Expand Up @@ -98,6 +107,14 @@ func TestCollector(t *testing.T) {
"topology.kubernetes.io/zone": "us-east1-a",
},
},
Status: corev1.NodeStatus{
Addresses: []corev1.NodeAddress{
{
Type: corev1.NodeInternalIP,
Address: "10.10.0.16",
},
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Expand Down Expand Up @@ -138,7 +155,7 @@ func TestCollector(t *testing.T) {
<-done

// First entry is summed, second is not added as it didn't change
r.Len(metrics, 2)
r.Len(metrics, 3)
m1 := metrics[0]
r.Equal(PodNetworkMetric{
SrcIP: "10.14.7.12",
Expand Down Expand Up @@ -180,6 +197,27 @@ func TestCollector(t *testing.T) {
Proto: "TCP",
TS: 1577840461000,
}, m2)

m3 := metrics[2]
r.Equal(PodNetworkMetric{
SrcIP: "10.10.0.16",
SrcPod: "host-network",
SrcNamespace: "host-network",
SrcNode: "n1",
SrcZone: "us-east1-a",
DstIP: "10.10.0.17",
DstIPType: "private",
DstPod: "",
DstNamespace: "",
DstNode: "",
DstZone: "",
TxBytes: 101,
TxPackets: 2,
RxBytes: 201,
RxPackets: 3,
Proto: "TCP",
TS: 1577840461000,
}, m3)
}

func TestGroupPodConns(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions examples/vector/vector-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ customConfig:
tags:
src_pod: '{{ "{{ src_pod }}" }}'
dst_pod: '{{ "{{ dst_pod }}" }}'
src_node: '{{ "{{ src_node }}" }}'
dst_node: '{{ "{{ dst_node }}" }}'
src_zone: '{{ "{{ src_zone }}" }}'
dst_zone: '{{ "{{ dst_zone }}" }}'
src_namespace: '{{ "{{ src_namespace }}" }}'
dst_namespace: '{{ "{{ dst_namespace }}" }}'
src_ip: '{{ "{{ src_ip }}" }}'
Expand All @@ -54,6 +58,10 @@ customConfig:
tags:
src_pod: '{{ "{{ src_pod }}" }}'
dst_pod: '{{ "{{ dst_pod }}" }}'
src_node: '{{ "{{ src_node }}" }}'
dst_node: '{{ "{{ dst_node }}" }}'
src_zone: '{{ "{{ src_zone }}" }}'
dst_zone: '{{ "{{ dst_zone }}" }}'
src_namespace: '{{ "{{ src_namespace }}" }}'
dst_namespace: '{{ "{{ dst_namespace }}" }}'
src_ip: '{{ "{{ src_ip }}" }}'
Expand Down
4 changes: 3 additions & 1 deletion kube/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ func transformFunc(obj interface{}) (interface{}, error) {
case *corev1.Node:
t.SetManagedFields(nil)
t.Spec = corev1.NodeSpec{}
t.Status = corev1.NodeStatus{}
t.Status = corev1.NodeStatus{
Addresses: t.Status.Addresses,
}
return t, nil
}
return nil, fmt.Errorf("unknown type %T", obj)
Expand Down