Skip to content

Commit

Permalink
Improve Health Status of security-auditlog Index in Single Node Clust…
Browse files Browse the repository at this point in the history
…ers (#5030)

Signed-off-by: Ganesh Bombatkar <[email protected]>
Signed-off-by: Craig Perkins <[email protected]>
Co-authored-by: Ganesh Bombatkar <[email protected]>
Co-authored-by: Craig Perkins <[email protected]>
  • Loading branch information
3 people authored Feb 4, 2025
1 parent 3c62a83 commit dbfecca
Show file tree
Hide file tree
Showing 13 changed files with 178 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/
package org.opensearch.security;

import java.io.IOException;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.opensearch.test.framework.AuditCompliance;
import org.opensearch.test.framework.AuditConfiguration;
import org.opensearch.test.framework.AuditFilters;
import org.opensearch.test.framework.cluster.ClusterManager;
import org.opensearch.test.framework.cluster.LocalCluster;
import org.opensearch.test.framework.cluster.TestRestClient;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.opensearch.test.framework.TestSecurityConfig.AuthcDomain.AUTHC_HTTPBASIC_INTERNAL;
import static org.opensearch.test.framework.TestSecurityConfig.User.USER_ADMIN;

@RunWith(com.carrotsearch.randomizedtesting.RandomizedRunner.class)
@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
public class InternalAuditLogTest {

private static final Logger log = LogManager.getLogger(InternalAuditLogTest.class);

@ClassRule
public static final LocalCluster cluster = new LocalCluster.Builder().clusterManager(ClusterManager.SINGLENODE)
.anonymousAuth(false)
.authc(AUTHC_HTTPBASIC_INTERNAL)
.users(USER_ADMIN)
.internalAudit(
new AuditConfiguration(true).compliance(new AuditCompliance().enabled(true))
.filters(new AuditFilters().enabledRest(true).enabledTransport(true))
)
.build();

@Test
public void testAuditLogShouldBeGreenInSingleNodeCluster() throws IOException {
try (TestRestClient client = cluster.getRestClient(USER_ADMIN)) {
client.get(""); // demo request for insuring audit-log index is created beforehand
TestRestClient.HttpResponse indicesResponse = client.get("_cat/indices");

assertThat(indicesResponse.getBody(), containsString("security-auditlog"));
assertThat(indicesResponse.getBody(), containsString("green"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,18 @@ public Builder audit(AuditConfiguration auditConfiguration) {
return this;
}

public Builder internalAudit(AuditConfiguration auditConfiguration) {
if (auditConfiguration != null) {
testSecurityConfig.audit(auditConfiguration);
}
if (auditConfiguration.isEnabled()) {
nodeOverrideSettingsBuilder.put("plugins.security.audit.type", "internal_opensearch");
} else {
nodeOverrideSettingsBuilder.put("plugins.security.audit.type", "noop");
}
return this;
}

public List<TestSecurityConfig.User> getUsers() {
return testSecurityConfig.getUsers();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public AuditLogImpl(
) {
super(settings, threadPool, resolver, clusterService, environment);
this.settings = settings;
this.messageRouter = new AuditMessageRouter(settings, clientProvider, threadPool, configPath);
this.messageRouter = new AuditMessageRouter(settings, clientProvider, threadPool, configPath, clusterService);
this.messageRouterEnabled = this.messageRouter.isEnabled();

log.info("Message routing enabled: {}", this.messageRouterEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.Logger;

import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.security.auditlog.config.ThreadPoolConfig;
import org.opensearch.security.auditlog.impl.AuditCategory;
Expand All @@ -43,9 +44,15 @@ public class AuditMessageRouter {
final SinkProvider sinkProvider;
final AsyncStoragePool storagePool;

public AuditMessageRouter(final Settings settings, final Client clientProvider, ThreadPool threadPool, final Path configPath) {
public AuditMessageRouter(
final Settings settings,
final Client clientProvider,
ThreadPool threadPool,
final Path configPath,
final ClusterService clusterService
) {
this(
new SinkProvider(settings, clientProvider, threadPool, configPath),
new SinkProvider(settings, clientProvider, threadPool, configPath, clusterService),
new AsyncStoragePool(ThreadPoolConfig.getConfig(settings))
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
package org.opensearch.security.auditlog.sink;

import java.io.IOException;
import java.util.Map;

import com.google.common.collect.ImmutableMap;

import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.support.WriteRequest.RefreshPolicy;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext.StoredContext;
Expand All @@ -29,7 +33,9 @@ public abstract class AbstractInternalOpenSearchSink extends AuditLogSink {

protected final Client clientProvider;
private final ThreadPool threadPool;
protected final ClusterService clusterService;
private final DocWriteRequest.OpType storeOpType;
final static Map<String, Object> indexSettings = ImmutableMap.of("index.number_of_shards", 1, "index.auto_expand_replicas", "0-1");

public AbstractInternalOpenSearchSink(
final String name,
Expand All @@ -38,19 +44,23 @@ public AbstractInternalOpenSearchSink(
final Client clientProvider,
ThreadPool threadPool,
AuditLogSink fallbackSink,
DocWriteRequest.OpType storeOpType
DocWriteRequest.OpType storeOpType,
ClusterService clusterService
) {
super(name, settings, settingsPrefix, fallbackSink);
this.clientProvider = clientProvider;
this.threadPool = threadPool;
this.storeOpType = storeOpType;
this.clusterService = clusterService;
}

@Override
public void close() throws IOException {

}

protected abstract boolean createIndexIfAbsent(String indexName);

public boolean doStore(final AuditMessage msg, String indexName) {

if (Boolean.parseBoolean(
Expand All @@ -64,6 +74,12 @@ public boolean doStore(final AuditMessage msg, String indexName) {

try (StoredContext ctx = threadPool.getThreadContext().stashContext()) {
try {
boolean ok = createIndexIfAbsent(indexName);
if (!ok) {
log.error("Failed to create index {}", indexName);
return false;
}

final IndexRequestBuilder irb = clientProvider.prepareIndex(indexName)
.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
.setSource(msg.getAsMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.cluster.metadata.ComposableIndexTemplate;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.cluster.metadata.Template;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.security.auditlog.impl.AuditMessage;
import org.opensearch.security.support.ConfigConstants;
Expand All @@ -43,9 +44,10 @@ public InternalOpenSearchDataStreamSink(
final Path configPath,
final Client clientProvider,
ThreadPool threadPool,
AuditLogSink fallbackSink
AuditLogSink fallbackSink,
ClusterService clusterService
) {
super(name, settings, settingsPrefix, clientProvider, threadPool, fallbackSink, DocWriteRequest.OpType.CREATE);
super(name, settings, settingsPrefix, clientProvider, threadPool, fallbackSink, DocWriteRequest.OpType.CREATE, clusterService);
Settings sinkSettings = getSinkSettings(settingsPrefix);

this.dataStreamName = sinkSettings.get(ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_NAME, "opensearch-security-auditlog");
Expand Down Expand Up @@ -132,6 +134,12 @@ private boolean initDataStream() {
return this.dataStreamInitialized;
}

@Override
public boolean createIndexIfAbsent(String indexName) {
// datastream is initialized in initDataStream
return true;
}

@Override
public void close() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
import java.io.IOException;
import java.nio.file.Path;

import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.security.auditlog.impl.AuditMessage;
import org.opensearch.security.support.ConfigConstants;
Expand All @@ -36,9 +39,10 @@ public InternalOpenSearchSink(
final Path configPath,
final Client clientProvider,
ThreadPool threadPool,
AuditLogSink fallbackSink
AuditLogSink fallbackSink,
ClusterService clusterService
) {
super(name, settings, settingsPrefix, clientProvider, threadPool, fallbackSink, null);
super(name, settings, settingsPrefix, clientProvider, threadPool, fallbackSink, null, clusterService);

Settings sinkSettings = getSinkSettings(settingsPrefix);
this.index = sinkSettings.get(ConfigConstants.SECURITY_AUDIT_OPENSEARCH_INDEX, "'security-auditlog-'YYYY.MM.dd");
Expand All @@ -54,6 +58,23 @@ public InternalOpenSearchSink(
}
}

@Override
public boolean createIndexIfAbsent(String indexName) {
if (clusterService.state().metadata().hasIndex(indexName)) {
return true;
}

try {
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(indexSettings);
final boolean ok = clientProvider.admin().indices().create(createIndexRequest).actionGet().isAcknowledged();
log.info("Index {} created?: {}", indexName, ok);
return ok;
} catch (ResourceAlreadyExistsException resourceAlreadyExistsException) {
log.info("Index {} already exists", indexName);
return true;
}
}

@Override
public void close() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.logging.log4j.Logger;

import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.security.dlic.rest.support.Utils;
import org.opensearch.security.support.ConfigConstants;
Expand All @@ -34,21 +35,35 @@ public class SinkProvider {
private final ThreadPool threadPool;
private final Path configPath;
private final Settings settings;
private final ClusterService clusterService;
final Map<String, AuditLogSink> allSinks = new HashMap<>();
AuditLogSink defaultSink;
AuditLogSink fallbackSink;

public SinkProvider(final Settings settings, final Client clientProvider, ThreadPool threadPool, final Path configPath) {
public SinkProvider(
final Settings settings,
final Client clientProvider,
ThreadPool threadPool,
final Path configPath,
final ClusterService clusterService
) {
this.settings = settings;
this.clientProvider = clientProvider;
this.threadPool = threadPool;
this.configPath = configPath;
this.clusterService = clusterService;

// fall back sink, make sure we don't lose messages
String fallbackConfigPrefix = ConfigConstants.SECURITY_AUDIT_CONFIG_ENDPOINTS + "." + FALLBACKSINK_NAME;
Settings fallbackSinkSettings = settings.getAsSettings(fallbackConfigPrefix);
if (!fallbackSinkSettings.isEmpty()) {
this.fallbackSink = createSink(FALLBACKSINK_NAME, fallbackSinkSettings.get("type"), settings, fallbackConfigPrefix + ".config");
this.fallbackSink = createSink(
FALLBACKSINK_NAME,
fallbackSinkSettings.get("type"),
settings,
fallbackConfigPrefix + ".config",
clusterService
);
}

// make sure we always have a fallback to write to
Expand All @@ -63,7 +78,8 @@ public SinkProvider(final Settings settings, final Client clientProvider, Thread
DEFAULTSINK_NAME,
settings.get(ConfigConstants.SECURITY_AUDIT_TYPE_DEFAULT),
settings,
ConfigConstants.SECURITY_AUDIT_CONFIG_DEFAULT
ConfigConstants.SECURITY_AUDIT_CONFIG_DEFAULT,
clusterService
);
if (defaultSink == null) {
log.error("Default endpoint could not be created, auditlog will not work properly.");
Expand Down Expand Up @@ -92,7 +108,8 @@ public SinkProvider(final Settings settings, final Client clientProvider, Thread
sinkName,
type,
this.settings,
ConfigConstants.SECURITY_AUDIT_CONFIG_ENDPOINTS + "." + sinkName + ".config"
ConfigConstants.SECURITY_AUDIT_CONFIG_ENDPOINTS + "." + sinkName + ".config",
clusterService
);
if (sink == null) {
log.error("Endpoint '{}' could not be created, check log file for further information.", sinkName);
Expand Down Expand Up @@ -128,12 +145,27 @@ protected void close(AuditLogSink sink) {
}
}

private final AuditLogSink createSink(final String name, final String type, final Settings settings, final String settingsPrefix) {
private final AuditLogSink createSink(
final String name,
final String type,
final Settings settings,
final String settingsPrefix,
final ClusterService clusterService
) {
AuditLogSink sink = null;
if (type != null) {
switch (type.toLowerCase()) {
case "internal_opensearch":
sink = new InternalOpenSearchSink(name, settings, settingsPrefix, configPath, clientProvider, threadPool, fallbackSink);
sink = new InternalOpenSearchSink(
name,
settings,
settingsPrefix,
configPath,
clientProvider,
threadPool,
fallbackSink,
clusterService
);
break;
case "internal_opensearch_data_stream":
sink = new InternalOpenSearchDataStreamSink(
Expand All @@ -143,7 +175,8 @@ private final AuditLogSink createSink(final String name, final String type, fina
configPath,
clientProvider,
threadPool,
fallbackSink
fallbackSink,
clusterService
);
break;
case "external_opensearch":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected void validateJson(final String json) throws Exception { // this functi
}

protected AuditMessageRouter createMessageRouterComplianceEnabled(Settings settings) {
AuditMessageRouter router = new AuditMessageRouter(settings, null, null, null);
AuditMessageRouter router = new AuditMessageRouter(settings, null, null, null, null);
router.enableRoutes(settings);
return router;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void testNoDefaultSink() throws Exception {
)
)
.build();
AuditMessageRouter router = new AuditMessageRouter(settings, null, null, null);
AuditMessageRouter router = new AuditMessageRouter(settings, null, null, null, null);
// no default sink, audit log not enabled
assertThat(router.isEnabled(), is(false));
assertThat(router.defaultSink, is(nullValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testKafka() throws Exception {
consumer.subscribe(Arrays.asList("compliance"));

Settings settings = settingsBuilder.put("path.home", ".").build();
SinkProvider provider = new SinkProvider(settings, null, null, null);
SinkProvider provider = new SinkProvider(settings, null, null, null, null);
AuditLogSink sink = provider.getDefaultSink();
try {
assertThat(sink.getClass(), is(KafkaSink.class));
Expand Down
Loading

0 comments on commit dbfecca

Please sign in to comment.