Skip to content

Commit

Permalink
Improve detection of incoming traffic
Browse files Browse the repository at this point in the history
  • Loading branch information
omris94 committed Sep 12, 2024
1 parent 2a98e7d commit a8b2978
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 21 deletions.
99 changes: 85 additions & 14 deletions src/mapper/pkg/kubefinder/kubefinder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ import (
)

const (
podIPIndexField = "ip"
endpointIPPortIndexField = "ipPort"
serviceIPIndexField = "spec.ip"
externalIPIndexField = "spec.externalIPs"
portNumberIndexField = "service.spec.ports.nodePort"
nodeIPIndexField = "node.status.Addresses.ExternalIP"
IstioCanonicalNameLabelKey = "service.istio.io/canonical-name"
apiServerName = "kubernetes"
apiServerNamespace = "default"
podIPIndexField = "ip"
podIPIncludingHostNetworkIndexField = "ipAndHostNetwork"
endpointIPPortIndexField = "ipPort"
serviceIPIndexField = "spec.ip"
externalIPIndexField = "spec.externalIPs"
portNumberIndexField = "service.spec.ports.nodePort"
nodeIPIndexField = "node.status.Addresses.ExternalIP"
IstioCanonicalNameLabelKey = "service.istio.io/canonical-name"
apiServerName = "kubernetes"
apiServerNamespace = "default"
)

type KubeFinder struct {
Expand Down Expand Up @@ -70,6 +71,24 @@ func (k *KubeFinder) initIndexes(ctx context.Context) error {
return errors.Wrap(err)
}

err = k.mgr.GetCache().IndexField(ctx, &corev1.Pod{}, podIPIncludingHostNetworkIndexField, func(object client.Object) []string {
res := make([]string, 0)
pod := object.(*corev1.Pod)

// TODO: SHOULD I REMOVE IT??
if pod.DeletionTimestamp != nil {
return res
}

for _, ip := range pod.Status.PodIPs {
res = append(res, ip.IP)
}
return res
})
if err != nil {
return errors.Wrap(err)
}

err = k.mgr.GetCache().IndexField(ctx, &corev1.Service{}, serviceIPIndexField, func(object client.Object) []string {
res := make([]string, 0)
svc := object.(*corev1.Service)
Expand Down Expand Up @@ -105,7 +124,9 @@ func (k *KubeFinder) initIndexes(ctx context.Context) error {
if svc.DeletionTimestamp != nil {
return nil
}
if svc.Spec.Type != corev1.ServiceTypeNodePort {

// Node ports are unique regardless of the service type
if svc.Spec.Type != corev1.ServiceTypeNodePort && svc.Spec.Type != corev1.ServiceTypeLoadBalancer {
return nil
}

Expand Down Expand Up @@ -252,22 +273,22 @@ func (k *KubeFinder) ResolveIPToControlPlane(ctx context.Context, ip string) (*c
}

func (k *KubeFinder) ResolveIPToExternalAccessService(ctx context.Context, ip string, port int) (*corev1.Service, bool, error) {
nodePortService, ok, err := k.resolveNodePortService(ctx, ip, port)
nodePortService, ok, err := k.resolveServiceByNodeIPAndPort(ctx, ip, port)
if err != nil {
return nil, false, errors.Wrap(err)
}
if ok {
return nodePortService, true, nil
}

loadBalancerService, ok, err := k.resolveLoadBalancerService(ctx, ip, port)
loadBalancerService, ok, err := k.resolveLoadBalancerServiceByExternalIP(ctx, ip, port)
if err != nil {
return nil, false, errors.Wrap(err)
}
return loadBalancerService, ok, nil
}

func (k *KubeFinder) resolveLoadBalancerService(ctx context.Context, ip string, port int) (*corev1.Service, bool, error) {
func (k *KubeFinder) resolveLoadBalancerServiceByExternalIP(ctx context.Context, ip string, port int) (*corev1.Service, bool, error) {
var services corev1.ServiceList
err := k.client.List(ctx, &services, client.MatchingFields{externalIPIndexField: ip})
if err != nil {
Expand All @@ -288,7 +309,7 @@ func (k *KubeFinder) resolveLoadBalancerService(ctx context.Context, ip string,
return &service, true, nil
}

func (k *KubeFinder) resolveNodePortService(ctx context.Context, ip string, port int) (*corev1.Service, bool, error) {
func (k *KubeFinder) resolveServiceByNodeIPAndPort(ctx context.Context, ip string, port int) (*corev1.Service, bool, error) {
var nodes corev1.NodeList
err := k.client.List(ctx, &nodes, client.MatchingFields{nodeIPIndexField: ip})
if err != nil {
Expand Down Expand Up @@ -442,3 +463,53 @@ func (k *KubeFinder) ResolveOtterizeIdentityForService(ctx context.Context, svc
dstSvcIdentity.KubernetesService = lo.ToPtr(svc.Name)
return dstSvcIdentity, true, nil
}

func (k *KubeFinder) IsSrcIpLocal(ctx context.Context, ip string) (bool, error) {
isNode, err := k.IsNodeIP(ctx, ip)
if err != nil {
return false, errors.Wrap(err)
}
if isNode {
return true, nil
}

isPod, err := k.IsPodIp(ctx, ip)
if err != nil {
return false, errors.Wrap(err)
}
if isPod {
return true, nil
}

_, isControlPlane, err := k.ResolveIPToControlPlane(ctx, ip)
if err != nil {
return false, errors.Wrap(err)
}
if isControlPlane {
return true, nil
}

//TODO: Should we check for services/endpoints as well?
//TODO: Should we check PodCIDR
//TODO: Should we have cache for pod ips and node ips?

return false, nil
}

func (k *KubeFinder) IsPodIp(ctx context.Context, ip string) (bool, error) {
var pods corev1.PodList
err := k.client.List(ctx, &pods, client.MatchingFields{podIPIncludingHostNetworkIndexField: ip})
if err != nil {
return false, errors.Wrap(err)
}
return len(pods.Items) > 0, nil
}

func (k *KubeFinder) IsNodeIP(ctx context.Context, ip string) (bool, error) {
var nodes corev1.NodeList
err := k.client.List(ctx, &nodes, client.MatchingFields{nodeIPIndexField: ip})
if err != nil {
return false, errors.Wrap(err)
}
return len(nodes.Items) > 0, nil
}
17 changes: 10 additions & 7 deletions src/mapper/pkg/resolvers/schema.helpers.resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,22 +378,25 @@ func (r *Resolver) handleReportTCPCaptureResults(ctx context.Context, results mo
for _, captureItem := range results.Results {
logrus.Debugf("Handling TCP capture result from %s to %s:%d", captureItem.SrcIP, captureItem.Destinations[0].Destination, lo.FromPtr(captureItem.Destinations[0].DestinationPort))

srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, captureItem)
if errors.Is(err, kubefinder.ErrNoPodFound) {
isLocal, err := r.kubeFinder.IsSrcIpLocal(ctx, captureItem.SrcIP)
if err != nil {
logrus.WithError(err).WithField("ip", captureItem.SrcIP).Error("could not determine if source IP is local")
continue
}
if !isLocal {
err := r.reportIncomingInternetTraffic(ctx, captureItem.SrcIP, captureItem.Destinations)
if err != nil {
logrus.WithError(err).Error("could not report incoming internet traffic")
continue
}
continue
}

srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, captureItem)
if err != nil {
logrus.WithError(err).Debugf("could not discover src identity for '%s'", captureItem.SrcIP)
continue
}

for _, dest := range captureItem.Destinations {
r.handleExternalIncomingTrafficTCPResult(ctx, srcSvcIdentity, dest)
r.handleInternalTrafficTCPResult(ctx, srcSvcIdentity, dest)
}
}
telemetrysender.SendNetworkMapper(telemetriesgql.EventTypeIntentsDiscoveredCapture, len(results.Results))
Expand Down Expand Up @@ -423,7 +426,7 @@ func (r *Resolver) reportIncomingInternetTraffic(ctx context.Context, srcIP stri
return nil
}

func (r *Resolver) handleExternalIncomingTrafficTCPResult(ctx context.Context, srcIdentity model.OtterizeServiceIdentity, dest model.Destination) {
func (r *Resolver) handleInternalTrafficTCPResult(ctx context.Context, srcIdentity model.OtterizeServiceIdentity, dest model.Destination) {
lastSeen := dest.LastSeen
destIdentity, ok, err := r.resolveDestIdentity(ctx, dest, lastSeen)
if err != nil {
Expand Down

0 comments on commit a8b2978

Please sign in to comment.