Skip to content

Commit

Permalink
Support for OIDC, dex installation, and authorization configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Oct 31, 2024
1 parent ba1da23 commit 089a056
Show file tree
Hide file tree
Showing 67 changed files with 2,780 additions and 906 deletions.
4 changes: 4 additions & 0 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-avro</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-oidc</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.common</groupId>
<artifactId>smallrye-common-annotation</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import org.eclipse.microprofile.openapi.annotations.tags.Tag;

import com.github.streamshub.console.api.model.ConfigEntry;
import com.github.streamshub.console.api.security.Authorized;
import com.github.streamshub.console.api.security.ResourcePrivilege;
import com.github.streamshub.console.api.service.BrokerService;
import com.github.streamshub.console.config.security.Privilege;

@Path("/api/kafkas/{clusterId}/nodes")
@Tag(name = "Kafka Cluster Resources")
Expand All @@ -32,6 +35,8 @@ public class BrokersResource {
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
@Authorized
@ResourcePrivilege(Privilege.GET)
public CompletionStage<Response> describeConfigs(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand Down
115 changes: 44 additions & 71 deletions api/src/main/java/com/github/streamshub/console/api/ClientFactory.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.github.streamshub.console.api;

import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -46,14 +45,13 @@
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
import org.jboss.logging.Logger;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.streamshub.console.api.security.SaslJaasConfigCredential;
import com.github.streamshub.console.api.support.Holder;
import com.github.streamshub.console.api.support.KafkaContext;
import com.github.streamshub.console.api.support.TrustAllCertificateManager;
Expand All @@ -66,6 +64,7 @@
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.quarkus.security.identity.SecurityIdentity;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaClusterSpec;
import io.strimzi.api.kafka.model.kafka.KafkaSpec;
Expand Down Expand Up @@ -95,20 +94,11 @@ public class ClientFactory {
public static final String SCRAM_SHA256 = "SCRAM-SHA-256";
public static final String SCRAM_SHA512 = "SCRAM-SHA-512";

private static final String BEARER = "Bearer ";
private static final String STRIMZI_OAUTH_CALLBACK = "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler";
private static final String SASL_OAUTH_CONFIG_TEMPLATE = OAuthBearerLoginModule.class.getName()
+ " required"
+ " oauth.access.token=\"%s\" ;";

private static final String BASIC = "Basic ";
private static final String BASIC_TEMPLATE = "%s required username=\"%%s\" password=\"%%s\" ;";
private static final String SASL_PLAIN_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(PlainLoginModule.class.getName());
private static final String SASL_SCRAM_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(ScramLoginModule.class.getName());

static final String NO_SUCH_KAFKA_MESSAGE = "Requested Kafka cluster %s does not exist or is not configured";
public static final String NO_SUCH_KAFKA_MESSAGE = "Requested Kafka cluster %s does not exist or is not configured";
private final Function<String, NotFoundException> noSuchKafka =
clusterName -> new NotFoundException(NO_SUCH_KAFKA_MESSAGE.formatted(clusterName));
clusterId -> new NotFoundException(NO_SUCH_KAFKA_MESSAGE.formatted(clusterId));

@Inject
Logger log;
Expand Down Expand Up @@ -168,7 +158,7 @@ Map<String, KafkaContext> produceKafkaContexts(Function<Map<String, Object>, Adm
consoleConfig.getKafka().getClusters()
.stream()
.filter(c -> cachedKafkaResource(c).isEmpty())
.filter(Predicate.not(KafkaClusterConfig::hasNamespace))
//.filter(Predicate.not(KafkaClusterConfig::hasNamespace))
.forEach(clusterConfig -> putKafkaContext(contexts,
clusterConfig,
Optional.empty(),
Expand Down Expand Up @@ -464,6 +454,7 @@ void disposeKafkaContexts(@Disposes Map<String, KafkaContext> contexts) {
@Produces
@RequestScoped
public KafkaContext produceKafkaContext(Map<String, KafkaContext> contexts,
SecurityIdentity identity,
UnaryOperator<Admin> filter,
Function<Map<String, Object>, Admin> adminBuilder) {

Expand All @@ -473,22 +464,29 @@ public KafkaContext produceKafkaContext(Map<String, KafkaContext> contexts,
return KafkaContext.EMPTY;
}

return Optional.ofNullable(contexts.get(clusterId))
.map(ctx -> {
if (ctx.admin() == null) {
/*
* Admin may be null if credentials were not given in the
* configuration. The user must provide the login secrets
* in the request in that case.
*/
var adminConfigs = maybeAuthenticate(ctx, Admin.class);
var admin = adminBuilder.apply(adminConfigs);
return new KafkaContext(ctx, filter.apply(admin));
}
KafkaContext ctx = contexts.get(clusterId);

return ctx;
})
.orElseThrow(() -> noSuchKafka.apply(clusterId));
if (ctx == null) {
throw noSuchKafka.apply(clusterId);
}

if (ctx.admin() == null) {
/*
* Admin may be null if credentials were not given in the
* configuration. The user must provide the login secrets
* in the request in that case.
*
* The identity should already carry the SASL credentials
* at this point (set in ConsoleAuthenticationMechanism),
* so here we will only retrieve them (if applicable) and
* set them in the admin configuration map.
*/
var adminConfigs = maybeAuthenticate(identity, ctx, Admin.class);
var admin = adminBuilder.apply(adminConfigs);
return new KafkaContext(ctx, filter.apply(admin));
}

return ctx;
}

public void disposeKafkaContext(@Disposes KafkaContext context, Map<String, KafkaContext> contexts) {
Expand All @@ -505,8 +503,8 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map<String, Kafk

@Produces
@RequestScoped
public Consumer<RecordData, RecordData> consumerSupplier(KafkaContext context) {
var configs = maybeAuthenticate(context, Consumer.class);
public Consumer<RecordData, RecordData> consumerSupplier(SecurityIdentity identity, KafkaContext context) {
var configs = maybeAuthenticate(identity, context, Consumer.class);

return new KafkaConsumer<>(
configs,
Expand All @@ -520,8 +518,8 @@ public void disposeConsumer(@Disposes Consumer<RecordData, RecordData> consumer)

@Produces
@RequestScoped
public Producer<RecordData, RecordData> producerSupplier(KafkaContext context) {
var configs = maybeAuthenticate(context, Producer.class);
public Producer<RecordData, RecordData> producerSupplier(SecurityIdentity identity, KafkaContext context) {
var configs = maybeAuthenticate(identity, context, Producer.class);
return new KafkaProducer<>(
configs,
context.schemaRegistryContext().keySerializer(),
Expand All @@ -532,13 +530,13 @@ public void disposeProducer(@Disposes Producer<RecordData, RecordData> producer)
producer.close();
}

Map<String, Object> maybeAuthenticate(KafkaContext context, Class<?> clientType) {
Map<String, Object> maybeAuthenticate(SecurityIdentity identity, KafkaContext context, Class<?> clientType) {
Map<String, Object> configs = context.configs(clientType);

if (configs.containsKey(SaslConfigs.SASL_MECHANISM)
&& !configs.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
configs = new HashMap<>(configs);
configureAuthentication(context.saslMechanism(clientType), configs);
configureAuthentication(identity, context.saslMechanism(clientType), configs);
}

return configs;
Expand Down Expand Up @@ -697,63 +695,38 @@ void logConfig(String clientType, Map<String, Object> config) {
}
}

void configureAuthentication(String saslMechanism, Map<String, Object> configs) {
void configureAuthentication(SecurityIdentity identity, String saslMechanism, Map<String, Object> configs) {
SaslJaasConfigCredential credential = identity.getCredential(SaslJaasConfigCredential.class);

switch (saslMechanism) {
case OAUTHBEARER:
configureOAuthBearer(configs);
configureOAuthBearer(credential, configs);
break;
case PLAIN:
configureBasic(configs, SASL_PLAIN_CONFIG_TEMPLATE);
configureBasic(credential, configs);
break;
case SCRAM_SHA256, SCRAM_SHA512:
configureBasic(configs, SASL_SCRAM_CONFIG_TEMPLATE);
configureBasic(credential, configs);
break;
default:
throw new NotAuthorizedException("Unknown");
}
}

void configureOAuthBearer(Map<String, Object> configs) {
void configureOAuthBearer(SaslJaasConfigCredential credential, Map<String, Object> configs) {
log.trace("SASL/OAUTHBEARER enabled");

configs.putIfAbsent(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, STRIMZI_OAUTH_CALLBACK);
// Do not attempt token refresh ahead of expiration (ExpiringCredentialRefreshingLogin)
// May still cause warnings to be logged when token will expire in less than SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS.
configs.putIfAbsent(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, "0");

String jaasConfig = getAuthorization(BEARER)
.map(SASL_OAUTH_CONFIG_TEMPLATE::formatted)
.orElseThrow(() -> new NotAuthorizedException(BEARER.trim()));

configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
configs.put(SaslConfigs.SASL_JAAS_CONFIG, credential.value());
}

void configureBasic(Map<String, Object> configs, String template) {
void configureBasic(SaslJaasConfigCredential credential, Map<String, Object> configs) {
log.trace("SASL/SCRAM enabled");

String jaasConfig = getBasicAuthentication()
.map(template::formatted)
.orElseThrow(() -> new NotAuthorizedException(BASIC.trim()));

configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
}

Optional<String[]> getBasicAuthentication() {
return getAuthorization(BASIC)
.map(Base64.getDecoder()::decode)
.map(String::new)
.filter(authn -> authn.indexOf(':') >= 0)
.map(authn -> new String[] {
authn.substring(0, authn.indexOf(':')),
authn.substring(authn.indexOf(':') + 1)
})
.filter(userPass -> !userPass[0].isEmpty() && !userPass[1].isEmpty());
}

Optional<String> getAuthorization(String scheme) {
return Optional.ofNullable(headers.getHeaderString(HttpHeaders.AUTHORIZATION))
.filter(header -> header.regionMatches(true, 0, scheme, 0, scheme.length()))
.map(header -> header.substring(scheme.length()));
configs.put(SaslConfigs.SASL_JAAS_CONFIG, credential.value());
}

private static final Pattern BOUNDARY_QUOTES = Pattern.compile("(^[\"'])|([\"']$)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@
import com.github.streamshub.console.api.model.ConsumerGroup;
import com.github.streamshub.console.api.model.ConsumerGroupFilterParams;
import com.github.streamshub.console.api.model.ListFetchParams;
import com.github.streamshub.console.api.security.Authorized;
import com.github.streamshub.console.api.security.ResourcePrivilege;
import com.github.streamshub.console.api.service.ConsumerGroupService;
import com.github.streamshub.console.api.support.ErrorCategory;
import com.github.streamshub.console.api.support.FieldFilter;
import com.github.streamshub.console.api.support.ListRequestContext;
import com.github.streamshub.console.api.support.StringEnumeration;
import com.github.streamshub.console.config.security.Privilege;

import io.xlate.validation.constraints.Expression;

Expand All @@ -67,6 +70,8 @@ public class ConsumerGroupsResource {
@APIResponseSchema(ConsumerGroup.ListResponse.class)
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
@Authorized
@ResourcePrivilege(Privilege.LIST)
public CompletionStage<Response> listConsumerGroups(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand Down Expand Up @@ -132,6 +137,8 @@ public CompletionStage<Response> listConsumerGroups(
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
@Authorized
@ResourcePrivilege(Privilege.GET)
public CompletionStage<Response> describeConsumerGroup(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand Down Expand Up @@ -200,6 +207,8 @@ public CompletionStage<Response> describeConsumerGroup(
node = { "data", "id" },
payload = ErrorCategory.InvalidResource.class,
validationAppliesTo = ConstraintTarget.PARAMETERS)
@Authorized
@ResourcePrivilege(Privilege.UPDATE)
public CompletionStage<Response> patchConsumerGroup(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand Down Expand Up @@ -244,6 +253,8 @@ public CompletionStage<Response> patchConsumerGroup(
@Path("{groupId}")
@DELETE
@APIResponseSchema(responseCode = "204", value = Void.class)
@Authorized
@ResourcePrivilege(Privilege.DELETE)
public CompletionStage<Response> deleteConsumerGroup(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@

import com.github.streamshub.console.api.model.KafkaCluster;
import com.github.streamshub.console.api.model.ListFetchParams;
import com.github.streamshub.console.api.security.Authorized;
import com.github.streamshub.console.api.security.ResourcePrivilege;
import com.github.streamshub.console.api.service.KafkaClusterService;
import com.github.streamshub.console.api.support.ErrorCategory;
import com.github.streamshub.console.api.support.FieldFilter;
import com.github.streamshub.console.api.support.ListRequestContext;
import com.github.streamshub.console.api.support.StringEnumeration;
import com.github.streamshub.console.config.security.Privilege;

import io.xlate.validation.constraints.Expression;

Expand All @@ -63,6 +66,8 @@ public class KafkaClustersResource {
@APIResponseSchema(KafkaCluster.KafkaClusterDataList.class)
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
@Authorized
@ResourcePrivilege(Privilege.LIST)
public Response listClusters(
@QueryParam(KafkaCluster.FIELDS_PARAM)
@DefaultValue(KafkaCluster.Fields.LIST_DEFAULT)
Expand Down Expand Up @@ -121,6 +126,8 @@ public Response listClusters(
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
@Authorized
@ResourcePrivilege(Privilege.GET)
public CompletionStage<Response> describeCluster(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand Down Expand Up @@ -194,6 +201,8 @@ public CompletionStage<Response> describeCluster(
node = { "data", "id" },
payload = ErrorCategory.InvalidResource.class,
validationAppliesTo = ConstraintTarget.PARAMETERS)
@Authorized
@ResourcePrivilege(Privilege.UPDATE)
public Response patchCluster(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand All @@ -205,7 +214,7 @@ public Response patchCluster(
// Return all fields
requestedFields.accept(Arrays.asList(KafkaCluster.Fields.DESCRIBE_DEFAULT.split(",\\s*")));

var result = clusterService.patchCluster(clusterId, clusterData.getData());
var result = clusterService.patchCluster(clusterData.getData());
var responseEntity = new KafkaCluster.KafkaClusterData(result);

return Response.ok(responseEntity).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@
import com.github.streamshub.console.api.model.KafkaRebalance;
import com.github.streamshub.console.api.model.KafkaRebalanceFilterParams;
import com.github.streamshub.console.api.model.ListFetchParams;
import com.github.streamshub.console.api.security.Authorized;
import com.github.streamshub.console.api.security.ResourcePrivilege;
import com.github.streamshub.console.api.service.KafkaRebalanceService;
import com.github.streamshub.console.api.support.ErrorCategory;
import com.github.streamshub.console.api.support.FieldFilter;
import com.github.streamshub.console.api.support.ListRequestContext;
import com.github.streamshub.console.api.support.StringEnumeration;
import com.github.streamshub.console.config.security.Privilege;

import io.xlate.validation.constraints.Expression;

Expand All @@ -62,6 +65,8 @@ public class KafkaRebalancesResource {
@APIResponseSchema(KafkaRebalance.RebalanceDataList.class)
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
@Authorized
@ResourcePrivilege(Privilege.LIST)
public Response listRebalances(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand Down Expand Up @@ -158,6 +163,8 @@ public Response listRebalances(
node = { "data", "id" },
payload = ErrorCategory.InvalidResource.class,
validationAppliesTo = ConstraintTarget.PARAMETERS)
@Authorized
@ResourcePrivilege(Privilege.UPDATE)
public Response patchRebalance(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand Down
Loading

0 comments on commit 089a056

Please sign in to comment.