Skip to content

Commit

Permalink
MGDSTRM-10301 making the logic conditional on the cluster / dns config
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Jan 11, 2023
1 parent b0e0eef commit e037f95
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.fabric8.kubernetes.client.utils.CachedSingleThreadScheduler;
import io.fabric8.kubernetes.client.utils.Serialization;
import io.fabric8.openshift.api.model.DNS;
import io.fabric8.openshift.api.model.DNSSpec;
import io.fabric8.openshift.api.model.Route;
import io.fabric8.openshift.api.model.operator.v1.ConfigBuilder;
import io.fabric8.openshift.api.model.operator.v1.IngressController;
Expand Down Expand Up @@ -93,6 +95,7 @@
@UnlessBuildProperty(name = "kafka", stringValue = "dev", enableIfMissing = true)
public class IngressControllerManager {

static final String DNS_NAME = "cluster";
private static final String MAX_CONNECTIONS = "maxConnections";
private static final String RELOAD_INTERVAL = "reloadInterval";
private static final String UNSUPPORTED_CONFIG_OVERRIDES = "unsupportedConfigOverrides";
Expand All @@ -115,6 +118,12 @@ public class IngressControllerManager {

protected static final String WORKER_NODE_LABEL = "node-role.kubernetes.io/worker";

/**
* Domain part prefixed to domain reported on IngressController status. The CNAME DNS records
* need to point to a sub-domain on the IngressController domain, so we just add this.
*/
private static final String ROUTER_SUBDOMAIN = "ingresscontroller.";

/**
* Predicate that will return true if the input string looks like a broker resource name.
*/
Expand Down Expand Up @@ -178,6 +187,7 @@ public class IngressControllerManager {
private Set<String> deploymentsToReconcile = new HashSet<>();
private ResourceRequirements azDeploymentResourceRequirements;
private ResourceRequirements defaultDeploymentResourceRequirements;
private ResourceInformer<DNS> dnses;

public Map<String, String> getRouteMatchLabels() {
return routeMatchLabels;
Expand All @@ -188,7 +198,13 @@ public void addToRouteMatchLabels(String key, String value) {
}

public List<ManagedKafkaRoute> getManagedKafkaRoutesFor(ManagedKafka mk) {
String multiZoneRoute = getIngressControllerDomain("kas");
boolean hasPublicDns = Optional.ofNullable(dnses.getByKey(Cache.namespaceKeyFunc(null, DNS_NAME))).map(DNS::getSpec).map(DNSSpec::getPublicZone).isPresent();

return getManagedKafkaRoutesFor(mk, hasPublicDns);
}

List<ManagedKafkaRoute> getManagedKafkaRoutesFor(ManagedKafka mk, boolean hasPublicDns) {
String multiZoneRoute = getIngressControllerDomain("kas", hasPublicDns);
String bootstrapDomain = mk.getSpec().getEndpoint().getBootstrapServerHost();

return Stream.concat(
Expand All @@ -198,7 +214,7 @@ public List<ManagedKafkaRoute> getManagedKafkaRoutesFor(ManagedKafka mk) {
routesFor(mk)
.filter(IS_BROKER_ROUTE)
.map(r -> {
String router = getIngressControllerDomain("kas-" + getZoneForBrokerRoute(r));
String router = getIngressControllerDomain("kas-" + getZoneForBrokerRoute(r), hasPublicDns);
String routePrefix = r.getSpec().getHost().replaceFirst("-" + bootstrapDomain, "");

return new ManagedKafkaRoute(routePrefix, routePrefix, router);
Expand Down Expand Up @@ -315,6 +331,25 @@ public void onDelete(Deployment deployment, boolean deletedFinalStateUnknown) {
});
}

dnses = this.resourceInformerFactory.create(DNS.class,
this.openShiftClient.resources(DNS.class).withName(DNS_NAME),
new ResourceEventHandler<DNS>() {

@Override
public void onAdd(DNS obj) {
reconcileIngressControllers();
}

@Override
public void onUpdate(DNS oldObj, DNS newObj) {
reconcileIngressControllers();
}

@Override
public void onDelete(DNS obj, boolean deletedFinalStateUnknown) {
// do nothing - not expected, just assume we're keeping the current desired state
}});

ready = true;
reconcileIngressControllers();
}
Expand Down Expand Up @@ -503,12 +538,7 @@ IngressController buildIngressController(String name, String domain,
.withNewEndpointPublishingStrategy()
.withType("LoadBalancerService")
.withNewLoadBalancer()
.withScope(Optional.ofNullable(agent)
.map(ManagedKafkaAgent::getSpec)
.map(ManagedKafkaAgentSpec::getNet)
.filter(NetworkConfiguration::isPrivate)
.map(a -> "Internal")
.orElse("External"))
.withScope(isPrivateNetwork(agent)?"Internal":"External")
.withNewProviderParameters()
.withType("AWS")
.withNewAws()
Expand Down Expand Up @@ -665,14 +695,31 @@ private int numReplicasForConnectionDemand(double connectionDemand) {
return (int)Math.ceil(connectionDemand * (peakConnectionPercentage / 100D) / maxIngressConnections);
}

private String getIngressControllerDomain(String ingressControllerName) {
return Optional.ofNullable(informerManager.getLocalService(INGRESS_ROUTER_NAMESPACE, "router-" + ingressControllerName))
.map(Service::getStatus)
.map(ServiceStatus::getLoadBalancer)
.map(LoadBalancerStatus::getIngress)
.flatMap(l -> l.stream().findFirst())
.map(LoadBalancerIngress::getHostname)
.orElse(null);
private String getIngressControllerDomain(String ingressControllerName, boolean hasPublicDns) {
if (!isPrivateNetwork(informerManager.getLocalAgent()) && !hasPublicDns) {
// special case - nothing is creating the dns entries for the ingress controller, so we map to the load balancer
return Optional.ofNullable(informerManager.getLocalService(INGRESS_ROUTER_NAMESPACE, "router-" + ingressControllerName))
.map(Service::getStatus)
.map(ServiceStatus::getLoadBalancer)
.map(LoadBalancerStatus::getIngress)
.flatMap(l -> l.stream().findFirst())
.map(LoadBalancerIngress::getHostname)
.filter(Objects::nonNull)
.orElse("");
}
return ingressControllerInformer.getList().stream()
.filter(ic -> ic.getMetadata().getName().equals(ingressControllerName))
.map(ic -> ROUTER_SUBDOMAIN + (ic.getStatus() != null ? ic.getStatus().getDomain() : ic.getSpec().getDomain()))
.findFirst()
.orElse("");
}

private boolean isPrivateNetwork(ManagedKafkaAgent agent) {
return Optional.ofNullable(agent)
.map(ManagedKafkaAgent::getSpec)
.map(ManagedKafkaAgentSpec::getNet)
.filter(NetworkConfiguration::isPrivate)
.isPresent();
}

private Stream<Route> routesFor(ManagedKafka managedKafka) {
Expand Down
6 changes: 4 additions & 2 deletions operator/src/main/kubernetes/kubernetes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,10 @@ spec:
env:
- name: QUARKUS_PROFILE
value: prod
- name: INGRESSCONTROLLER_AZ_REPLICA_COUNT
value: "1"
- name: KUBERNETES_MAX_CONCURRENT_REQUESTS
value: 100
- name: KUBERNETES_MAX_CONCURRENT_REQUESTS_PER_HOST
value: 100
ports:
- containerPort: 8080
name: http
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,22 @@ void testGetManagedKafkaRoutesFor() {
assertEquals("broker-2", managedKafkaRoutes.get(4).getName());
assertEquals("broker-2", managedKafkaRoutes.get(4).getPrefix());
assertEquals("kas-zone-broker-2-loadbalancer.provider.com", managedKafkaRoutes.get(4).getRouter());

// once more, but with public dns
// we cannot add a dns to the mock server in fabric8 5.12 as it cannot be looked back up due to a plural error
managedKafkaRoutes = ingressControllerManager.getManagedKafkaRoutesFor(mk, true);
assertEquals(5, managedKafkaRoutes.size());

assertEquals(
managedKafkaRoutes.stream().sorted(Comparator.comparing(ManagedKafkaRoute::getName)).collect(Collectors.toList()),
managedKafkaRoutes,
"Expected list of ManagedKafkaRoutes to be sorted by name");

assertEquals("ingresscontroller.kas.testing.domain.tld", managedKafkaRoutes.get(0).getRouter());
assertEquals("ingresscontroller.kas.testing.domain.tld", managedKafkaRoutes.get(1).getRouter());
assertEquals("ingresscontroller.kas-zone-broker-0.testing.domain.tld", managedKafkaRoutes.get(2).getRouter());
assertEquals("ingresscontroller.kas-zone-broker-1.testing.domain.tld", managedKafkaRoutes.get(3).getRouter());
assertEquals("ingresscontroller.kas-zone-broker-2.testing.domain.tld", managedKafkaRoutes.get(4).getRouter());
}

@Test
Expand Down

0 comments on commit e037f95

Please sign in to comment.