Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Health Status of security-auditlog Index in Single Node Clusters #5030

Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@
package org.opensearch.security.auditlog.sink;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

import com.google.common.collect.ImmutableMap;

import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.support.WriteRequest.RefreshPolicy;
import org.opensearch.client.Client;
Expand All @@ -30,6 +36,8 @@ public abstract class AbstractInternalOpenSearchSink extends AuditLogSink {
protected final Client clientProvider;
private final ThreadPool threadPool;
private final DocWriteRequest.OpType storeOpType;
private String lastUsedIndexName;
final static Map<String, Object> indexSettings = ImmutableMap.of("index.number_of_shards", 1, "index.auto_expand_replicas", "0-1");

public AbstractInternalOpenSearchSink(
final String name,
Expand All @@ -44,13 +52,29 @@ public AbstractInternalOpenSearchSink(
this.clientProvider = clientProvider;
this.threadPool = threadPool;
this.storeOpType = storeOpType;
this.lastUsedIndexName = null;
}

@Override
public void close() throws IOException {

}

private boolean createIndexIfAbsent(String indexName) {
Ganesh-RB marked this conversation as resolved.
Show resolved Hide resolved
if (Objects.equals(indexName, lastUsedIndexName)) return true;

try {
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(indexSettings);
final boolean ok = clientProvider.admin().indices().create(createIndexRequest).actionGet().isAcknowledged();
lastUsedIndexName = indexName;
log.info("Index {} created?: {}", indexName, ok);
return ok;
} catch (ResourceAlreadyExistsException resourceAlreadyExistsException) {
log.info("Index {} already exists", indexName);
Ganesh-RB marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
}

public boolean doStore(final AuditMessage msg, String indexName) {

if (Boolean.parseBoolean(
Expand All @@ -64,6 +88,12 @@ public boolean doStore(final AuditMessage msg, String indexName) {

try (StoredContext ctx = threadPool.getThreadContext().stashContext()) {
try {
boolean ok = createIndexIfAbsent(indexName);
if (!ok) {
log.error("Server not acknowledge for creation of index {}", indexName);
Ganesh-RB marked this conversation as resolved.
Show resolved Hide resolved
return false;
}

final IndexRequestBuilder irb = clientProvider.prepareIndex(indexName)
.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
.setSource(msg.getAsMap());
Expand Down
Loading