Skip to content

Commit

Permalink
KAFKA-10895: Gracefully handle invalid JAAS configs (follow up fix) (a…
Browse files Browse the repository at this point in the history
…pache#9987)

Fixes a regression introduced by apache#9806 in the original fix for KAFKA-10895

It was discovered that if an invalid JAAS config is present on the worker, invoking Configuration::getConfiguration throws an exception. The changes from apache#9806 cause that exception to be thrown during plugin scanning, which causes the worker to fail even if it is not configured to use the basic auth extension at all.

This follow-up handles invalid JAAS configurations more gracefully, and only throws them if the worker is actually configured to use the basic auth extension, at the time that the extension is instantiated and configured.

Two unit tests are added.

Reviewers: Greg Harris <[email protected]>, Konstantine Karantasis <[email protected]>
  • Loading branch information
C0urante authored and kkonstantine committed Feb 3, 2021
1 parent a645c25 commit 8cef4cd
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kafka.connect.rest.basic.auth.extension;

import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.slf4j.Logger;
Expand All @@ -26,6 +27,7 @@
import javax.security.auth.login.Configuration;
import java.io.IOException;
import java.util.Map;
import java.util.function.Supplier;

/**
* Provides the ability to authenticate incoming BasicAuth credentials using the configured JAAS {@link
Expand Down Expand Up @@ -62,14 +64,38 @@ public class BasicAuthSecurityRestExtension implements ConnectRestExtension {

private static final Logger log = LoggerFactory.getLogger(BasicAuthSecurityRestExtension.class);

private static final Supplier<Configuration> CONFIGURATION = initializeConfiguration(Configuration::getConfiguration);

// Capture the JVM's global JAAS configuration as soon as possible, as it may be altered later
// by connectors, converters, other REST extensions, etc.
private static final Configuration CONFIGURATION = Configuration.getConfiguration();
static Supplier<Configuration> initializeConfiguration(Supplier<Configuration> configurationSupplier) {
try {
Configuration configuration = configurationSupplier.get();
return () -> configuration;
} catch (Exception e) {
// We have to be careful not to throw anything here as this static block gets executed during plugin scanning and any exceptions will
// cause the worker to fail during startup, even if it's not configured to use the basic auth extension.
return () -> {
throw new ConnectException("Failed to retrieve JAAS configuration", e);
};
}
}

private final Supplier<Configuration> configuration;

public BasicAuthSecurityRestExtension() {
this(CONFIGURATION);
}

// For testing
BasicAuthSecurityRestExtension(Supplier<Configuration> configuration) {
this.configuration = configuration;
}

@Override
public void register(ConnectRestExtensionContext restPluginContext) {
log.trace("Registering JAAS basic auth filter");
restPluginContext.configurable().register(new JaasBasicAuthFilter(CONFIGURATION));
restPluginContext.configurable().register(new JaasBasicAuthFilter(configuration.get()));
log.trace("Finished registering JAAS basic auth filter");
}

Expand All @@ -80,7 +106,8 @@ public void close() throws IOException {

@Override
public void configure(Map<String, ?> configs) {

// If we failed to retrieve a JAAS configuration during startup, throw that exception now
configuration.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kafka.connect.rest.basic.auth.extension;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.easymock.Capture;
import org.easymock.EasyMock;
Expand All @@ -27,7 +28,15 @@
import javax.security.auth.login.Configuration;
import javax.ws.rs.core.Configurable;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

public class BasicAuthSecurityRestExtensionTest {

Expand Down Expand Up @@ -66,4 +75,44 @@ public void testJaasConfigurationNotOverwritten() {
jaasFilter.getValue().configuration
);
}

@Test
public void testBadJaasConfigInitialization() {
SecurityException jaasConfigurationException = new SecurityException(new IOException("Bad JAAS config is bad"));
Supplier<Configuration> configuration = BasicAuthSecurityRestExtension.initializeConfiguration(() -> {
throw jaasConfigurationException;
});

ConnectException thrownException = assertThrows(ConnectException.class, configuration::get);
assertEquals(jaasConfigurationException, thrownException.getCause());
}

@Test
public void testGoodJaasConfigInitialization() {
AtomicBoolean configurationInitializerEvaluated = new AtomicBoolean(false);
Configuration mockConfiguration = EasyMock.mock(Configuration.class);
Supplier<Configuration> configuration = BasicAuthSecurityRestExtension.initializeConfiguration(() -> {
configurationInitializerEvaluated.set(true);
return mockConfiguration;
});

assertTrue(configurationInitializerEvaluated.get());
assertEquals(mockConfiguration, configuration.get());
}

@Test
public void testBadJaasConfigExtensionSetup() {
SecurityException jaasConfigurationException = new SecurityException(new IOException("Bad JAAS config is bad"));
Supplier<Configuration> configuration = () -> {
throw jaasConfigurationException;
};

BasicAuthSecurityRestExtension extension = new BasicAuthSecurityRestExtension(configuration);

Exception thrownException = assertThrows(Exception.class, () -> extension.configure(Collections.emptyMap()));
assertEquals(jaasConfigurationException, thrownException);

thrownException = assertThrows(Exception.class, () -> extension.register(EasyMock.mock(ConnectRestExtensionContext.class)));
assertEquals(jaasConfigurationException, thrownException);
}
}

0 comments on commit 8cef4cd

Please sign in to comment.