From e037f95b08cf992f210ae40ffb2256485e46661d Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Wed, 11 Jan 2023 10:02:49 -0500 Subject: [PATCH] MGDSTRM-10301 making the logic conditional on the cluster / dns config --- .../managers/IngressControllerManager.java | 79 +++++++++++++++---- operator/src/main/kubernetes/kubernetes.yml | 6 +- .../IngressControllerManagerTest.java | 16 ++++ 3 files changed, 83 insertions(+), 18 deletions(-) diff --git a/operator/src/main/java/org/bf2/operator/managers/IngressControllerManager.java b/operator/src/main/java/org/bf2/operator/managers/IngressControllerManager.java index 775fca026..ea0929f70 100644 --- a/operator/src/main/java/org/bf2/operator/managers/IngressControllerManager.java +++ b/operator/src/main/java/org/bf2/operator/managers/IngressControllerManager.java @@ -26,6 +26,8 @@ import io.fabric8.kubernetes.client.informers.cache.Cache; import io.fabric8.kubernetes.client.utils.CachedSingleThreadScheduler; import io.fabric8.kubernetes.client.utils.Serialization; +import io.fabric8.openshift.api.model.DNS; +import io.fabric8.openshift.api.model.DNSSpec; import io.fabric8.openshift.api.model.Route; import io.fabric8.openshift.api.model.operator.v1.ConfigBuilder; import io.fabric8.openshift.api.model.operator.v1.IngressController; @@ -93,6 +95,7 @@ @UnlessBuildProperty(name = "kafka", stringValue = "dev", enableIfMissing = true) public class IngressControllerManager { + static final String DNS_NAME = "cluster"; private static final String MAX_CONNECTIONS = "maxConnections"; private static final String RELOAD_INTERVAL = "reloadInterval"; private static final String UNSUPPORTED_CONFIG_OVERRIDES = "unsupportedConfigOverrides"; @@ -115,6 +118,12 @@ public class IngressControllerManager { protected static final String WORKER_NODE_LABEL = "node-role.kubernetes.io/worker"; + /** + * Domain part prefixed to domain reported on IngressController status. The CNAME DNS records + * need to point to a sub-domain on the IngressController domain, so we just add this. + */ + private static final String ROUTER_SUBDOMAIN = "ingresscontroller."; + /** * Predicate that will return true if the input string looks like a broker resource name. */ @@ -178,6 +187,7 @@ public class IngressControllerManager { private Set deploymentsToReconcile = new HashSet<>(); private ResourceRequirements azDeploymentResourceRequirements; private ResourceRequirements defaultDeploymentResourceRequirements; + private ResourceInformer dnses; public Map getRouteMatchLabels() { return routeMatchLabels; @@ -188,7 +198,13 @@ public void addToRouteMatchLabels(String key, String value) { } public List getManagedKafkaRoutesFor(ManagedKafka mk) { - String multiZoneRoute = getIngressControllerDomain("kas"); + boolean hasPublicDns = Optional.ofNullable(dnses.getByKey(Cache.namespaceKeyFunc(null, DNS_NAME))).map(DNS::getSpec).map(DNSSpec::getPublicZone).isPresent(); + + return getManagedKafkaRoutesFor(mk, hasPublicDns); + } + + List getManagedKafkaRoutesFor(ManagedKafka mk, boolean hasPublicDns) { + String multiZoneRoute = getIngressControllerDomain("kas", hasPublicDns); String bootstrapDomain = mk.getSpec().getEndpoint().getBootstrapServerHost(); return Stream.concat( @@ -198,7 +214,7 @@ public List getManagedKafkaRoutesFor(ManagedKafka mk) { routesFor(mk) .filter(IS_BROKER_ROUTE) .map(r -> { - String router = getIngressControllerDomain("kas-" + getZoneForBrokerRoute(r)); + String router = getIngressControllerDomain("kas-" + getZoneForBrokerRoute(r), hasPublicDns); String routePrefix = r.getSpec().getHost().replaceFirst("-" + bootstrapDomain, ""); return new ManagedKafkaRoute(routePrefix, routePrefix, router); @@ -315,6 +331,25 @@ public void onDelete(Deployment deployment, boolean deletedFinalStateUnknown) { }); } + dnses = this.resourceInformerFactory.create(DNS.class, + this.openShiftClient.resources(DNS.class).withName(DNS_NAME), + new ResourceEventHandler() { + + @Override + public void onAdd(DNS obj) { + reconcileIngressControllers(); + } + + @Override + public void onUpdate(DNS oldObj, DNS newObj) { + reconcileIngressControllers(); + } + + @Override + public void onDelete(DNS obj, boolean deletedFinalStateUnknown) { + // do nothing - not expected, just assume we're keeping the current desired state + }}); + ready = true; reconcileIngressControllers(); } @@ -503,12 +538,7 @@ IngressController buildIngressController(String name, String domain, .withNewEndpointPublishingStrategy() .withType("LoadBalancerService") .withNewLoadBalancer() - .withScope(Optional.ofNullable(agent) - .map(ManagedKafkaAgent::getSpec) - .map(ManagedKafkaAgentSpec::getNet) - .filter(NetworkConfiguration::isPrivate) - .map(a -> "Internal") - .orElse("External")) + .withScope(isPrivateNetwork(agent)?"Internal":"External") .withNewProviderParameters() .withType("AWS") .withNewAws() @@ -665,14 +695,31 @@ private int numReplicasForConnectionDemand(double connectionDemand) { return (int)Math.ceil(connectionDemand * (peakConnectionPercentage / 100D) / maxIngressConnections); } - private String getIngressControllerDomain(String ingressControllerName) { - return Optional.ofNullable(informerManager.getLocalService(INGRESS_ROUTER_NAMESPACE, "router-" + ingressControllerName)) - .map(Service::getStatus) - .map(ServiceStatus::getLoadBalancer) - .map(LoadBalancerStatus::getIngress) - .flatMap(l -> l.stream().findFirst()) - .map(LoadBalancerIngress::getHostname) - .orElse(null); + private String getIngressControllerDomain(String ingressControllerName, boolean hasPublicDns) { + if (!isPrivateNetwork(informerManager.getLocalAgent()) && !hasPublicDns) { + // special case - nothing is creating the dns entries for the ingress controller, so we map to the load balancer + return Optional.ofNullable(informerManager.getLocalService(INGRESS_ROUTER_NAMESPACE, "router-" + ingressControllerName)) + .map(Service::getStatus) + .map(ServiceStatus::getLoadBalancer) + .map(LoadBalancerStatus::getIngress) + .flatMap(l -> l.stream().findFirst()) + .map(LoadBalancerIngress::getHostname) + .filter(Objects::nonNull) + .orElse(""); + } + return ingressControllerInformer.getList().stream() + .filter(ic -> ic.getMetadata().getName().equals(ingressControllerName)) + .map(ic -> ROUTER_SUBDOMAIN + (ic.getStatus() != null ? ic.getStatus().getDomain() : ic.getSpec().getDomain())) + .findFirst() + .orElse(""); + } + + private boolean isPrivateNetwork(ManagedKafkaAgent agent) { + return Optional.ofNullable(agent) + .map(ManagedKafkaAgent::getSpec) + .map(ManagedKafkaAgentSpec::getNet) + .filter(NetworkConfiguration::isPrivate) + .isPresent(); } private Stream routesFor(ManagedKafka managedKafka) { diff --git a/operator/src/main/kubernetes/kubernetes.yml b/operator/src/main/kubernetes/kubernetes.yml index 0158fed6e..8b82d179a 100644 --- a/operator/src/main/kubernetes/kubernetes.yml +++ b/operator/src/main/kubernetes/kubernetes.yml @@ -224,8 +224,10 @@ spec: env: - name: QUARKUS_PROFILE value: prod - - name: INGRESSCONTROLLER_AZ_REPLICA_COUNT - value: "1" + - name: KUBERNETES_MAX_CONCURRENT_REQUESTS + value: 100 + - name: KUBERNETES_MAX_CONCURRENT_REQUESTS_PER_HOST + value: 100 ports: - containerPort: 8080 name: http diff --git a/operator/src/test/java/org/bf2/operator/managers/IngressControllerManagerTest.java b/operator/src/test/java/org/bf2/operator/managers/IngressControllerManagerTest.java index 088d83970..b6e8c66c3 100644 --- a/operator/src/test/java/org/bf2/operator/managers/IngressControllerManagerTest.java +++ b/operator/src/test/java/org/bf2/operator/managers/IngressControllerManagerTest.java @@ -389,6 +389,22 @@ void testGetManagedKafkaRoutesFor() { assertEquals("broker-2", managedKafkaRoutes.get(4).getName()); assertEquals("broker-2", managedKafkaRoutes.get(4).getPrefix()); assertEquals("kas-zone-broker-2-loadbalancer.provider.com", managedKafkaRoutes.get(4).getRouter()); + + // once more, but with public dns + // we cannot add a dns to the mock server in fabric8 5.12 as it cannot be looked back up due to a plural error + managedKafkaRoutes = ingressControllerManager.getManagedKafkaRoutesFor(mk, true); + assertEquals(5, managedKafkaRoutes.size()); + + assertEquals( + managedKafkaRoutes.stream().sorted(Comparator.comparing(ManagedKafkaRoute::getName)).collect(Collectors.toList()), + managedKafkaRoutes, + "Expected list of ManagedKafkaRoutes to be sorted by name"); + + assertEquals("ingresscontroller.kas.testing.domain.tld", managedKafkaRoutes.get(0).getRouter()); + assertEquals("ingresscontroller.kas.testing.domain.tld", managedKafkaRoutes.get(1).getRouter()); + assertEquals("ingresscontroller.kas-zone-broker-0.testing.domain.tld", managedKafkaRoutes.get(2).getRouter()); + assertEquals("ingresscontroller.kas-zone-broker-1.testing.domain.tld", managedKafkaRoutes.get(3).getRouter()); + assertEquals("ingresscontroller.kas-zone-broker-2.testing.domain.tld", managedKafkaRoutes.get(4).getRouter()); } @Test