From 2e67e53345ec0c1607db9441be47c359be8b9a8b Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Tue, 26 Nov 2024 14:36:32 -0500 Subject: [PATCH 1/5] feat(k8s): implement wildcard All Namespaces discovery --- .../cryostat/discovery/KubeApiDiscovery.java | 52 +++++++++++++++---- 1 file changed, 43 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 685dbd92c..10134cf91 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.management.remote.JMXServiceURL; @@ -70,6 +71,7 @@ @ApplicationScoped public class KubeApiDiscovery implements ResourceEventHandler { + private static final String ALL_NAMESPACES = "*"; private static final String NAMESPACE_QUERY_ADDR = "NS_QUERY"; private static final String ENDPOINTS_DISCOVERY_ADDR = "ENDPOINTS_DISC"; @@ -116,13 +118,25 @@ protected HashMap> initialize() .getWatchNamespaces() .forEach( ns -> { - result.put( - ns, - client.endpoints() - .inNamespace(ns) - .inform( - KubeApiDiscovery.this, - informerResyncPeriod.toMillis())); + SharedIndexInformer informer; + if (ALL_NAMESPACES.equals(ns)) { + informer = + client.endpoints() + .inAnyNamespace() + .inform( + KubeApiDiscovery.this, + informerResyncPeriod + .toMillis()); + } else { + informer = + client.endpoints() + .inNamespace(ns) + .inform( + KubeApiDiscovery.this, + informerResyncPeriod + .toMillis()); + } + result.put(ns, informer); logger.debugv( "Started Endpoints SharedInformer for namespace" + " \"{0}\" with resync period {1}", @@ -148,7 +162,11 @@ void onStart(@Observes StartupEvent evt) { () -> { try { logger.debug("Resyncing"); - notify(NamespaceQueryEvent.from(kubeConfig.getWatchNamespaces())); + notify( + NamespaceQueryEvent.from( + kubeConfig.getWatchNamespaces().stream() + .filter(ns -> !ALL_NAMESPACES.equals(ns)) + .toList())); } catch (Exception e) { logger.warn(e); } @@ -226,6 +244,15 @@ List tuplesFromEndpoints(Endpoints endpoints) { for (EndpointPort port : subset.getPorts()) { for (EndpointAddress addr : subset.getAddresses()) { var ref = addr.getTargetRef(); + if (ref == null) { + logger.debugv( + "Endpoints object {0} in {1} with address {2} had a null" + + " targetRef", + endpoints.getMetadata().getName(), + endpoints.getMetadata().getNamespace(), + addr.getIp()); + continue; + } tts.add( new TargetTuple( ref, @@ -295,8 +322,15 @@ public void handleQueryEvent(NamespaceQueryEvent evt) { persistedTargets.add(node.target); } + Stream endpoints; + if (safeGetInformers().containsKey(namespace)) { + endpoints = safeGetInformers().get(namespace).getStore().list().stream(); + } else { + endpoints = + client.endpoints().inNamespace(namespace).list().getItems().stream(); + } Set observedTargets = - safeGetInformers().get(namespace).getStore().list().stream() + endpoints .map((endpoint) -> getTargetTuplesFrom(endpoint)) .flatMap(List::stream) .filter((tuple) -> Objects.nonNull(tuple.objRef)) From 908dedab8c2d5f2ae74d3123e37f059abe885d04 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Tue, 26 Nov 2024 14:51:37 -0500 Subject: [PATCH 2/5] refactor --- .../cryostat/discovery/KubeApiDiscovery.java | 101 ++++++++++-------- 1 file changed, 55 insertions(+), 46 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 10134cf91..d5cd02f2e 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -110,42 +110,42 @@ public class KubeApiDiscovery implements ResourceEventHandler { @Override protected HashMap> initialize() throws ConcurrentException { - // TODO: add support for some wildcard indicating a single Informer for any - // namespace that Cryostat has permissions to. This will need some restructuring - // of how the namespaces within the discovery tree are mapped. var result = new HashMap>(); - kubeConfig - .getWatchNamespaces() - .forEach( - ns -> { - SharedIndexInformer informer; - if (ALL_NAMESPACES.equals(ns)) { - informer = - client.endpoints() - .inAnyNamespace() - .inform( - KubeApiDiscovery.this, - informerResyncPeriod - .toMillis()); - } else { - informer = + if (watchAllNamespaces()) { + result.put( + ALL_NAMESPACES, + client.endpoints() + .inAnyNamespace() + .inform( + KubeApiDiscovery.this, + informerResyncPeriod.toMillis())); + } else { + kubeConfig + .getWatchNamespaces() + .forEach( + ns -> { + result.put( + ns, client.endpoints() .inNamespace(ns) .inform( KubeApiDiscovery.this, informerResyncPeriod - .toMillis()); - } - result.put(ns, informer); - logger.debugv( - "Started Endpoints SharedInformer for namespace" - + " \"{0}\" with resync period {1}", - ns, informerResyncPeriod); - }); + .toMillis())); + logger.debugv( + "Started Endpoints SharedInformer for namespace" + + " \"{0}\" with resync period {1}", + ns, informerResyncPeriod); + }); + } return result; } }; + private boolean watchAllNamespaces() { + return kubeConfig.getWatchNamespaces().stream().anyMatch(ns -> ALL_NAMESPACES.equals(ns)); + } + void onStart(@Observes StartupEvent evt) { if (!enabled()) { return; @@ -158,22 +158,26 @@ void onStart(@Observes StartupEvent evt) { logger.debugv("Starting {0} client", REALM); safeGetInformers(); - resyncWorker.scheduleAtFixedRate( - () -> { - try { - logger.debug("Resyncing"); - notify( - NamespaceQueryEvent.from( - kubeConfig.getWatchNamespaces().stream() - .filter(ns -> !ALL_NAMESPACES.equals(ns)) - .toList())); - } catch (Exception e) { - logger.warn(e); - } - }, - 0, - informerResyncPeriod.toMillis(), - TimeUnit.MILLISECONDS); + // TODO we should not need to force manual re-syncs this way - the Informer is already + // supposed to resync itself. + if (!watchAllNamespaces()) { + resyncWorker.scheduleAtFixedRate( + () -> { + try { + logger.debug("Resyncing"); + notify( + NamespaceQueryEvent.from( + kubeConfig.getWatchNamespaces().stream() + .filter(ns -> !ALL_NAMESPACES.equals(ns)) + .toList())); + } catch (Exception e) { + logger.warn(e); + } + }, + 0, + informerResyncPeriod.toMillis(), + TimeUnit.MILLISECONDS); + } } void onStop(@Observes ShutdownEvent evt) { @@ -323,11 +327,16 @@ public void handleQueryEvent(NamespaceQueryEvent evt) { } Stream endpoints; - if (safeGetInformers().containsKey(namespace)) { - endpoints = safeGetInformers().get(namespace).getStore().list().stream(); - } else { + if (watchAllNamespaces()) { endpoints = - client.endpoints().inNamespace(namespace).list().getItems().stream(); + safeGetInformers().get(ALL_NAMESPACES).getStore().list().stream() + .filter( + ep -> + Objects.equals( + ep.getMetadata().getNamespace(), + namespace)); + } else { + endpoints = safeGetInformers().get(namespace).getStore().list().stream(); } Set observedTargets = endpoints From 621538eca7ef9a7d667de79fb9c2646f49d8e1d6 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 4 Dec 2024 13:49:24 -0500 Subject: [PATCH 3/5] log --- src/main/java/io/cryostat/discovery/KubeApiDiscovery.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index d5cd02f2e..42c350129 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -119,6 +119,10 @@ protected HashMap> initialize() .inform( KubeApiDiscovery.this, informerResyncPeriod.toMillis())); + logger.debugv( + "Started Endpoints Informer for All Namespaces with resync period" + + " {0}", + informerResyncPeriod); } else { kubeConfig .getWatchNamespaces() From 41a4d4cfbd118f878bd7e1b2abd1e0984d0ba5fd Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 4 Dec 2024 13:50:10 -0500 Subject: [PATCH 4/5] forced resync on all namespaces --- .../cryostat/discovery/KubeApiDiscovery.java | 40 +++++++++++-------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 42c350129..9c255cac1 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -164,24 +164,30 @@ void onStart(@Observes StartupEvent evt) { safeGetInformers(); // TODO we should not need to force manual re-syncs this way - the Informer is already // supposed to resync itself. - if (!watchAllNamespaces()) { - resyncWorker.scheduleAtFixedRate( - () -> { - try { - logger.debug("Resyncing"); - notify( - NamespaceQueryEvent.from( - kubeConfig.getWatchNamespaces().stream() - .filter(ns -> !ALL_NAMESPACES.equals(ns)) - .toList())); - } catch (Exception e) { - logger.warn(e); + resyncWorker.scheduleAtFixedRate( + () -> { + try { + logger.debug("Resyncing"); + List namespaces; + if (watchAllNamespaces()) { + namespaces = + client.namespaces().list().getItems().stream() + .map(ns -> ns.getMetadata().getName()) + .toList(); + } else { + namespaces = + kubeConfig.getWatchNamespaces().stream() + .filter(ns -> !ALL_NAMESPACES.equals(ns)) + .toList(); } - }, - 0, - informerResyncPeriod.toMillis(), - TimeUnit.MILLISECONDS); - } + notify(NamespaceQueryEvent.from(namespaces)); + } catch (Exception e) { + logger.warn(e); + } + }, + 0, + informerResyncPeriod.toMillis(), + TimeUnit.MILLISECONDS); } void onStop(@Observes ShutdownEvent evt) { From 2169300eb935a27b2684941732a97dd51d74fcf8 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 4 Dec 2024 13:55:42 -0500 Subject: [PATCH 5/5] refactor --- .../cryostat/discovery/KubeApiDiscovery.java | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 9c255cac1..848cb3043 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -71,7 +71,6 @@ @ApplicationScoped public class KubeApiDiscovery implements ResourceEventHandler { - private static final String ALL_NAMESPACES = "*"; private static final String NAMESPACE_QUERY_ADDR = "NS_QUERY"; private static final String ENDPOINTS_DISCOVERY_ADDR = "ENDPOINTS_DISC"; @@ -111,9 +110,9 @@ public class KubeApiDiscovery implements ResourceEventHandler { protected HashMap> initialize() throws ConcurrentException { var result = new HashMap>(); - if (watchAllNamespaces()) { + if (kubeConfig.watchAllNamespaces()) { result.put( - ALL_NAMESPACES, + KubeConfig.ALL_NAMESPACES, client.endpoints() .inAnyNamespace() .inform( @@ -146,10 +145,6 @@ protected HashMap> initialize() } }; - private boolean watchAllNamespaces() { - return kubeConfig.getWatchNamespaces().stream().anyMatch(ns -> ALL_NAMESPACES.equals(ns)); - } - void onStart(@Observes StartupEvent evt) { if (!enabled()) { return; @@ -168,19 +163,7 @@ void onStart(@Observes StartupEvent evt) { () -> { try { logger.debug("Resyncing"); - List namespaces; - if (watchAllNamespaces()) { - namespaces = - client.namespaces().list().getItems().stream() - .map(ns -> ns.getMetadata().getName()) - .toList(); - } else { - namespaces = - kubeConfig.getWatchNamespaces().stream() - .filter(ns -> !ALL_NAMESPACES.equals(ns)) - .toList(); - } - notify(NamespaceQueryEvent.from(namespaces)); + notify(NamespaceQueryEvent.from(getWatchNamespaces())); } catch (Exception e) { logger.warn(e); } @@ -190,6 +173,17 @@ void onStart(@Observes StartupEvent evt) { TimeUnit.MILLISECONDS); } + private List getWatchNamespaces() { + if (kubeConfig.watchAllNamespaces()) { + return client.namespaces().list().getItems().stream() + .map(ns -> ns.getMetadata().getName()) + .toList(); + } + return kubeConfig.getWatchNamespaces().stream() + .filter(ns -> !KubeConfig.ALL_NAMESPACES.equals(ns)) + .toList(); + } + void onStop(@Observes ShutdownEvent evt) { if (!(enabled() && available())) { return; @@ -337,9 +331,13 @@ public void handleQueryEvent(NamespaceQueryEvent evt) { } Stream endpoints; - if (watchAllNamespaces()) { + if (kubeConfig.watchAllNamespaces()) { endpoints = - safeGetInformers().get(ALL_NAMESPACES).getStore().list().stream() + safeGetInformers() + .get(KubeConfig.ALL_NAMESPACES) + .getStore() + .list() + .stream() .filter( ep -> Objects.equals( @@ -586,6 +584,7 @@ private Pair queryForNode( @ApplicationScoped static final class KubeConfig { + static final String ALL_NAMESPACES = "*"; private static final String OWN_NAMESPACE = "."; @Inject Logger logger; @@ -600,6 +599,11 @@ static final class KubeConfig { @ConfigProperty(name = "cryostat.discovery.kubernetes.namespace-path") String namespacePath; + boolean watchAllNamespaces() { + return watchNamespaces.orElse(List.of()).stream() + .anyMatch(ns -> ALL_NAMESPACES.equals(ns)); + } + Collection getWatchNamespaces() { return watchNamespaces.orElse(List.of()).stream() .map(