-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(k8s): implement wildcard All Namespaces discovery #725
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,7 @@ | |
import java.util.concurrent.TimeUnit; | ||
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
import javax.management.remote.JMXServiceURL; | ||
|
||
|
@@ -70,6 +71,7 @@ | |
@ApplicationScoped | ||
public class KubeApiDiscovery implements ResourceEventHandler<Endpoints> { | ||
|
||
private static final String ALL_NAMESPACES = "*"; | ||
private static final String NAMESPACE_QUERY_ADDR = "NS_QUERY"; | ||
private static final String ENDPOINTS_DISCOVERY_ADDR = "ENDPOINTS_DISC"; | ||
|
||
|
@@ -108,30 +110,42 @@ public class KubeApiDiscovery implements ResourceEventHandler<Endpoints> { | |
@Override | ||
protected HashMap<String, SharedIndexInformer<Endpoints>> initialize() | ||
throws ConcurrentException { | ||
// TODO: add support for some wildcard indicating a single Informer for any | ||
// namespace that Cryostat has permissions to. This will need some restructuring | ||
// of how the namespaces within the discovery tree are mapped. | ||
var result = new HashMap<String, SharedIndexInformer<Endpoints>>(); | ||
kubeConfig | ||
.getWatchNamespaces() | ||
.forEach( | ||
ns -> { | ||
result.put( | ||
ns, | ||
client.endpoints() | ||
.inNamespace(ns) | ||
.inform( | ||
KubeApiDiscovery.this, | ||
informerResyncPeriod.toMillis())); | ||
logger.debugv( | ||
"Started Endpoints SharedInformer for namespace" | ||
+ " \"{0}\" with resync period {1}", | ||
ns, informerResyncPeriod); | ||
}); | ||
if (watchAllNamespaces()) { | ||
result.put( | ||
ALL_NAMESPACES, | ||
client.endpoints() | ||
.inAnyNamespace() | ||
.inform( | ||
KubeApiDiscovery.this, | ||
informerResyncPeriod.toMillis())); | ||
} else { | ||
kubeConfig | ||
.getWatchNamespaces() | ||
.forEach( | ||
ns -> { | ||
result.put( | ||
ns, | ||
client.endpoints() | ||
.inNamespace(ns) | ||
.inform( | ||
KubeApiDiscovery.this, | ||
informerResyncPeriod | ||
.toMillis())); | ||
logger.debugv( | ||
"Started Endpoints SharedInformer for namespace" | ||
+ " \"{0}\" with resync period {1}", | ||
ns, informerResyncPeriod); | ||
}); | ||
} | ||
return result; | ||
} | ||
}; | ||
|
||
private boolean watchAllNamespaces() { | ||
return kubeConfig.getWatchNamespaces().stream().anyMatch(ns -> ALL_NAMESPACES.equals(ns)); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: make sense to bring this method and |
||
|
||
void onStart(@Observes StartupEvent evt) { | ||
if (!enabled()) { | ||
return; | ||
|
@@ -144,18 +158,26 @@ void onStart(@Observes StartupEvent evt) { | |
|
||
logger.debugv("Starting {0} client", REALM); | ||
safeGetInformers(); | ||
resyncWorker.scheduleAtFixedRate( | ||
() -> { | ||
try { | ||
logger.debug("Resyncing"); | ||
notify(NamespaceQueryEvent.from(kubeConfig.getWatchNamespaces())); | ||
} catch (Exception e) { | ||
logger.warn(e); | ||
} | ||
}, | ||
0, | ||
informerResyncPeriod.toMillis(), | ||
TimeUnit.MILLISECONDS); | ||
// TODO we should not need to force manual re-syncs this way - the Informer is already | ||
// supposed to resync itself. | ||
Comment on lines
+160
to
+161
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just curious here: Is it now safe to remove this manual resync? If not, in the case of all namespaces, we should also list namespaces and do resync too? |
||
if (!watchAllNamespaces()) { | ||
resyncWorker.scheduleAtFixedRate( | ||
() -> { | ||
try { | ||
logger.debug("Resyncing"); | ||
notify( | ||
NamespaceQueryEvent.from( | ||
kubeConfig.getWatchNamespaces().stream() | ||
.filter(ns -> !ALL_NAMESPACES.equals(ns)) | ||
.toList())); | ||
} catch (Exception e) { | ||
logger.warn(e); | ||
} | ||
}, | ||
0, | ||
informerResyncPeriod.toMillis(), | ||
TimeUnit.MILLISECONDS); | ||
} | ||
} | ||
|
||
void onStop(@Observes ShutdownEvent evt) { | ||
|
@@ -226,6 +248,15 @@ List<TargetTuple> tuplesFromEndpoints(Endpoints endpoints) { | |
for (EndpointPort port : subset.getPorts()) { | ||
for (EndpointAddress addr : subset.getAddresses()) { | ||
var ref = addr.getTargetRef(); | ||
if (ref == null) { | ||
logger.debugv( | ||
"Endpoints object {0} in {1} with address {2} had a null" | ||
+ " targetRef", | ||
endpoints.getMetadata().getName(), | ||
endpoints.getMetadata().getNamespace(), | ||
addr.getIp()); | ||
continue; | ||
} | ||
tts.add( | ||
new TargetTuple( | ||
ref, | ||
|
@@ -295,8 +326,20 @@ public void handleQueryEvent(NamespaceQueryEvent evt) { | |
persistedTargets.add(node.target); | ||
} | ||
|
||
Stream<Endpoints> endpoints; | ||
if (watchAllNamespaces()) { | ||
endpoints = | ||
safeGetInformers().get(ALL_NAMESPACES).getStore().list().stream() | ||
.filter( | ||
ep -> | ||
Objects.equals( | ||
ep.getMetadata().getNamespace(), | ||
namespace)); | ||
} else { | ||
endpoints = safeGetInformers().get(namespace).getStore().list().stream(); | ||
} | ||
Set<Target> observedTargets = | ||
safeGetInformers().get(namespace).getStore().list().stream() | ||
endpoints | ||
.map((endpoint) -> getTargetTuplesFrom(endpoint)) | ||
.flatMap(List::stream) | ||
.filter((tuple) -> Objects.nonNull(tuple.objRef)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, add a
logger.debugv
here?