Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAK-11290 - Improve logging of publish indexing job (WIP) #1887

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,16 @@ public IndexImporter(NodeStore nodeStore, File indexDir, IndexEditorProvider ind
AsyncIndexerLock indexerLock, StatisticsProvider statisticsProvider, IndexingReporter indexingReporter) throws IOException {
this.statisticsProvider = statisticsProvider;
this.indexingReporter = indexingReporter;
checkArgument(indexDir.exists() && indexDir.isDirectory(), "Path [%s] does not point " +
"to existing directory", indexDir.getAbsolutePath());
checkArgument(indexDir.exists() && indexDir.isDirectory(),
"Path [%s] does not point to existing directory", indexDir.getAbsolutePath());
this.nodeStore = nodeStore;
this.indexDir = indexDir;
this.indexEditorProvider = indexEditorProvider;
indexerInfo = IndexerInfo.fromDirectory(indexDir);
this.indexerInfo = IndexerInfo.fromDirectory(indexDir);
this.indexerLock = indexerLock;
indexes = indexerInfo.getIndexes();
indexedState = requireNonNull(nodeStore.retrieve(indexerInfo.checkpoint), String.format("Cannot retrieve " +
"checkpointed state [%s]", indexerInfo.checkpoint));
this.indexes = indexerInfo.getIndexes();
this.indexedState = requireNonNull(nodeStore.retrieve(indexerInfo.checkpoint),
"Cannot retrieve checkpointed state [" + indexerInfo.checkpoint + "]");
this.indexDefinitionUpdater = new IndexDefinitionUpdater(new File(indexDir, INDEX_DEFINITIONS_JSON));
this.asyncLaneToIndexMapping = mapIndexesToLanes(indexes);
this.indexPathsToUpdate = new HashSet<>();
Expand All @@ -127,7 +127,7 @@ enum IndexImportState {

public void importIndex() throws IOException, CommitFailedException {
try {
if (indexes.keySet().isEmpty()) {
if (indexes.isEmpty()) {
LOG.warn("No indexes to import (possibly index definitions outside of a oak:index node?)");
}
LOG.info("Proceeding to import {} indexes from {}", indexes.keySet(), indexDir.getAbsolutePath());
Expand Down Expand Up @@ -178,10 +178,9 @@ public void importIndex() throws IOException, CommitFailedException {
mergeWithConcurrentCheck(nodeStore, builder);
});
} catch (CommitFailedException commitFailedException) {
LOG.error("Unable to revert back index lanes for: "
+ indexPathsToUpdate.stream()
.collect(StringBuilder::new, StringBuilder::append, (a, b) -> a.append(",").append(b)),
commitFailedException);
LOG.error("Unable to revert back index lanes for: {}",
indexPathsToUpdate.stream()
.collect(StringBuilder::new, StringBuilder::append, (a, b) -> a.append(",").append(b)), commitFailedException);
throw e;
}
}
Expand Down Expand Up @@ -265,12 +264,12 @@ private void bringAsyncIndexUpToDate(String laneName, List<IndexInfo> indexInfos
boolean success = false;
try {
String checkpoint = getAsync().getString(laneName);
requireNonNull(checkpoint, String.format("No current checkpoint found for lane [%s]", laneName));
requireNonNull(checkpoint, "No current checkpoint found for lane [" + laneName + "]");

//TODO Support case where checkpoint got lost or complete reindexing is done

NodeState after = nodeStore.retrieve(checkpoint);
requireNonNull(after, String.format("No state found for checkpoint [%s] for lane [%s]", checkpoint, laneName));
requireNonNull(after, "No state found for checkpoint [" + checkpoint + "] for lane [" + laneName + "]");
LOG.info("Proceeding to update imported indexes {} to checkpoint [{}] for lane [{}]",
indexInfos, checkpoint, laneName);

Expand All @@ -297,8 +296,8 @@ private void bringAsyncIndexUpToDate(String laneName, List<IndexInfo> indexInfos
updateIndexImporterState(builder, IndexImportState.IMPORT_INDEX_DATA, IndexImportState.BRING_INDEX_UPTODATE, false);
mergeWithConcurrentCheck(nodeStore, builder);
success = true;
LOG.info("Imported index is updated to repository state at checkpoint [{}] for " +
"indexing lane [{}]", checkpoint, laneName);
LOG.info("Imported index is updated to repository state at checkpoint [{}] for indexing lane [{}]",
checkpoint, laneName);
} catch (CommitFailedException e) {
LOG.error("Failed while performing bringIndexUpToDate and updating indexImportState from [{}] to [{}]",
IndexImportState.IMPORT_INDEX_DATA, IndexImportState.BRING_INDEX_UPTODATE);
Expand Down Expand Up @@ -375,7 +374,7 @@ private LockToken interruptCurrentIndexing(String laneName) throws CommitFailedE

private IndexImporterProvider getImporter(String type) {
IndexImporterProvider provider = importers.get(type);
return requireNonNull(provider, String.format("No IndexImporterProvider found for type [%s]", type));
return requireNonNull(provider, "No IndexImporterProvider found for type [" + type + "]");
}

private ListMultimap<String, IndexInfo> mapIndexesToLanes(Map<String, File> indexes) {
Expand All @@ -391,7 +390,7 @@ private ListMultimap<String, IndexInfo> mapIndexesToLanes(Map<String, File> inde
boolean newIndex = !NodeStateUtils.getNode(rootState, indexPath).exists();

String type = indexState.getString(IndexConstants.TYPE_PROPERTY_NAME);
requireNonNull(type, String.format("No 'type' property found for index at path [%s]", indexPath));
requireNonNull(type, "No 'type' property found for index at path [" + indexPath + "]");

String asyncName = getAsyncLaneName(indexPath, indexState);
if (asyncName == null) {
Expand Down Expand Up @@ -448,8 +447,9 @@ private void releaseCheckpoint() throws CommitFailedException {

private void incrementReIndexCount(NodeBuilder definition) {
long count = 0;
if (definition.hasProperty(REINDEX_COUNT)) {
count = definition.getProperty(REINDEX_COUNT).getValue(Type.LONG);
PropertyState reindexCountProp = definition.getProperty(REINDEX_COUNT);
if (reindexCountProp != null) {
count = reindexCountProp.getValue(Type.LONG);
}
definition.setProperty(REINDEX_COUNT, count + 1);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.index;

public interface IndexerMetrics {
String INDEXER_METRICS_PREFIX = "oak_indexer_";
String METRIC_INDEXING_INDEX_DATA_SIZE = INDEXER_METRICS_PREFIX + "index_data_size";

String INDEXER_PUBLISH_METRICS_PREFIX = "oak_indexer_publish_";
String METRIC_INDEXING_PUBLISH_DURATION_SECONDS = INDEXER_PUBLISH_METRICS_PREFIX + "indexing_duration_seconds";
String METRIC_INDEXING_PUBLISH_NODES_TRAVERSED = INDEXER_PUBLISH_METRICS_PREFIX + "nodes_traversed";
String METRIC_INDEXING_PUBLISH_NODES_INDEXED = INDEXER_PUBLISH_METRICS_PREFIX + "nodes_indexed";
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -29,6 +30,7 @@

import org.apache.commons.io.FileUtils;
import org.apache.felix.inventory.Format;

import java.util.function.Predicate;
import java.util.regex.Pattern;

Expand All @@ -54,7 +56,7 @@
import static java.util.Objects.requireNonNull;

public class IndexerSupport {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Logger LOG = LoggerFactory.getLogger(getClass());
/**
* Directory name in output directory under which indexes are
* stored
Expand Down Expand Up @@ -131,7 +133,7 @@ public NodeState retrieveNodeStateForCheckpoint() {
NodeState checkpointedState;
if (HEAD_AS_CHECKPOINT.equals(checkpoint)) {
checkpointedState = indexHelper.getNodeStore().getRoot();
log.warn("Using head state for indexing. Such an index cannot be imported back");
LOG.warn("Using head state for indexing. Such an index cannot be imported back");
} else {
checkpointedState = indexHelper.getNodeStore().retrieve(checkpoint);
requireNonNull(checkpointedState, String.format("Not able to retrieve revision referred via checkpoint [%s]", checkpoint));
Expand Down Expand Up @@ -169,7 +171,7 @@ public void switchIndexLanesAndReindexFlag(NodeStore copyOnWriteStore) throws Co
}

copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
log.info("Switched the async lane for indexes at {} to {} and marked them for reindex", indexHelper.getIndexPaths(), REINDEX_LANE);
LOG.info("Switched the async lane for indexes at {} to {} and marked them for reindex", indexHelper.getIndexPaths(), REINDEX_LANE);
}

public void postIndexWork(NodeStore copyOnWriteStore) throws CommitFailedException, IOException {
Expand All @@ -187,7 +189,7 @@ protected void switchIndexLanesBack(NodeStore copyOnWriteStore) throws CommitFai
}

copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
log.info("Switched the async lane for indexes at {} back to there original lanes", indexHelper.getIndexPaths());
LOG.info("Switched the async lane for indexes at {} back to there original lanes", indexHelper.getIndexPaths());
}

public Map<String, String> getCheckpointInfo() {
Expand Down Expand Up @@ -254,4 +256,32 @@ public <T> Predicate<T> getFilterPredicate(Set<IndexDefinition> indexDefinitions
public <T> Predicate<T> getFilterPredicateBasedOnCustomRegex(Pattern pattern, Function<T, String> typeToRepositoryPath) {
return t -> !pattern.matcher(typeToRepositoryPath.apply(t)).find();
}

public long computeSizeOfGeneratedIndexData() throws IOException {
File localIndexDir = getLocalIndexDir();
long totalSize = 0;
try {
if (localIndexDir == null || !localIndexDir.isDirectory()) {
LOG.warn("Local index directory is invalid, this should not happen: {}", localIndexDir);
return -1;
} else {
StringBuilder sb = new StringBuilder();
File[] directories = localIndexDir.listFiles(File::isDirectory);
Arrays.sort(directories);
for (File indexDir : directories) {
long size = FileUtils.sizeOfDirectory(indexDir);
long numberOfFiles = indexDir.listFiles(File::isFile).length;
totalSize += size;
sb.append("\n - " + indexDir.getName() + ": " + numberOfFiles + " files, " +
size + " (" + FileUtils.byteCountToDisplaySize(size) + ")");
}
LOG.info("Total size of index data generated: {} ({}){}",
totalSize, FileUtils.byteCountToDisplaySize(totalSize), sb);
return totalSize;
}
} catch (Throwable t) {
LOG.warn("Error while computing size of generated index data: {}", t.toString());
return -1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.jackrabbit.oak.index;

import com.codahale.metrics.MetricRegistry;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.guava.common.io.Closer;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.index.*;
Expand All @@ -38,15 +39,24 @@
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

import static java.util.Objects.requireNonNull;
import static org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_PUBLISH_DURATION_SECONDS;
import static org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_PUBLISH_NODES_INDEXED;
import static org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_PUBLISH_NODES_TRAVERSED;
import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.INDEXING_PHASE_LOGGER;

public abstract class OutOfBandIndexerBase implements Closeable, IndexUpdateCallback, NodeTraversalCallback{
public abstract class OutOfBandIndexerBase implements Closeable, IndexUpdateCallback, NodeTraversalCallback {

protected final Closer closer = Closer.create();
private final IndexHelper indexHelper;
private final IndexerSupport indexerSupport;
private final IndexingReporter indexingReporter;
private final StatisticsProvider statisticsProvider;
private NodeStore copyOnWriteStore;
private IndexerSupport indexerSupport;
private long nodesTraversed = 0;
private long nodesIndexed = 0;

/**
* Index lane name which is used for indexing
Expand All @@ -64,18 +74,35 @@ public abstract class OutOfBandIndexerBase implements Closeable, IndexUpdateCall
public OutOfBandIndexerBase(IndexHelper indexHelper, IndexerSupport indexerSupport) {
this.indexHelper = requireNonNull(indexHelper);
this.indexerSupport = requireNonNull(indexerSupport);
this.indexingReporter = indexHelper.getIndexReporter();
this.statisticsProvider = indexHelper.getStatisticsProvider();
}

public void reindex() throws CommitFailedException, IOException {
NodeState checkpointedState = indexerSupport.retrieveNodeStateForCheckpoint();

copyOnWriteStore = new MemoryNodeStore(checkpointedState);
NodeState baseState = copyOnWriteStore.getRoot();
//TODO Check for indexPaths being empty

indexerSupport.switchIndexLanesAndReindexFlag(copyOnWriteStore);
preformIndexUpdate(baseState);
indexerSupport.postIndexWork(copyOnWriteStore);
INDEXING_PHASE_LOGGER.info("[TASK:FULL_INDEX_CREATION:START] Starting indexing job");
Stopwatch indexJobWatch = Stopwatch.createStarted();
try {
NodeState checkpointedState = indexerSupport.retrieveNodeStateForCheckpoint();

copyOnWriteStore = new MemoryNodeStore(checkpointedState);
NodeState baseState = copyOnWriteStore.getRoot();
//TODO Check for indexPaths being empty

indexerSupport.switchIndexLanesAndReindexFlag(copyOnWriteStore);
preformIndexUpdate(baseState);
indexerSupport.postIndexWork(copyOnWriteStore);

long indexingDurationSeconds = indexJobWatch.elapsed(TimeUnit.SECONDS);
INDEXING_PHASE_LOGGER.info("[TASK:INDEXING:END] Metrics: {}", MetricsFormatter.createMetricsWithDurationOnly(indexingDurationSeconds));
MetricsUtils.addMetric(statisticsProvider, indexingReporter, METRIC_INDEXING_PUBLISH_DURATION_SECONDS, indexingDurationSeconds);
MetricsUtils.addMetric(statisticsProvider, indexingReporter, METRIC_INDEXING_PUBLISH_NODES_TRAVERSED, nodesTraversed);
MetricsUtils.addMetric(statisticsProvider, indexingReporter, METRIC_INDEXING_PUBLISH_NODES_INDEXED, nodesIndexed);
indexingReporter.addTiming("Build Lucene Index", FormattingUtils.formatToSeconds(indexingDurationSeconds));
} catch (Throwable t) {
INDEXING_PHASE_LOGGER.info("[TASK:FULL_INDEX_CREATION:FAIL] Metrics: {}, Error: {}",
MetricsFormatter.createMetricsWithDurationOnly(indexJobWatch), t.toString());
throw t;
}
}

protected File getLocalIndexDir() throws IOException {
Expand All @@ -90,13 +117,13 @@ public void close() throws IOException {
//~---------------------------------------------------< callbacks >

@Override
public void indexUpdate() throws CommitFailedException {

public void indexUpdate() {
nodesIndexed++;
}

@Override
public void traversedNode(NodeTraversalCallback.PathSource pathSource) throws CommitFailedException {

public void traversedNode(NodeTraversalCallback.PathSource pathSource) {
nodesTraversed++;
}

protected void preformIndexUpdate(NodeState baseState) throws IOException, CommitFailedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
public abstract class DocumentStoreIndexerBase implements Closeable {
public static final String INDEXER_METRICS_PREFIX = "oak_indexer_";
public static final String METRIC_INDEXING_DURATION_SECONDS = INDEXER_METRICS_PREFIX + "indexing_duration_seconds";
public static final String METRIC_MERGE_NODE_STORE_DURATION_SECONDS = INDEXER_METRICS_PREFIX + "merge_node_store_duration_seconds";
public static final String METRIC_FULL_INDEX_CREATION_DURATION_SECONDS = INDEXER_METRICS_PREFIX + "full_index_creation_duration_seconds";

private final Logger log = LoggerFactory.getLogger(getClass());
Expand Down Expand Up @@ -198,7 +197,6 @@ private List<IndexStore> buildFlatFileStoreList(NodeState checkpointedState,
.withNodeStore(nodeStore)
.withMongoDocumentStore(getMongoDocumentStore())
.withMongoClientURI(getMongoClientURI())
.withMongoDatabase(getMongoDatabase())
.withNodeStateEntryTraverserFactory(new MongoNodeStateEntryTraverserFactory(rootDocumentState.getRootRevision(),
nodeStore, getMongoDocumentStore(), traversalLog))
.withCheckpoint(indexerSupport.getCheckpoint())
Expand Down Expand Up @@ -420,21 +418,7 @@ public void reindex() throws CommitFailedException, IOException {
MetricsFormatter.createMetricsWithDurationOnly(indexerWatch), t.toString());
throw t;
}

INDEXING_PHASE_LOGGER.info("[TASK:MERGE_NODE_STORE:START] Starting merge node store");
Stopwatch mergeNodeStoreWatch = Stopwatch.createStarted();
try {
copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
long mergeNodeStoreDurationSeconds = mergeNodeStoreWatch.elapsed(TimeUnit.SECONDS);
INDEXING_PHASE_LOGGER.info("[TASK:MERGE_NODE_STORE:END] Metrics: {}", MetricsFormatter.createMetricsWithDurationOnly(mergeNodeStoreDurationSeconds));
MetricsUtils.addMetric(statisticsProvider, indexingReporter, METRIC_MERGE_NODE_STORE_DURATION_SECONDS, mergeNodeStoreDurationSeconds);
indexingReporter.addTiming("Merge node store", FormattingUtils.formatToSeconds(mergeNodeStoreDurationSeconds));
} catch (Throwable t) {
INDEXING_PHASE_LOGGER.info("[TASK:MERGE_NODE_STORE:FAIL] Metrics: {}, Error: {}",
MetricsFormatter.createMetricsWithDurationOnly(mergeNodeStoreWatch), t.toString());
throw t;
}

copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
indexerSupport.postIndexWork(copyOnWriteStore);

long fullIndexCreationDurationSeconds = indexJobWatch.elapsed(TimeUnit.SECONDS);
Expand Down
Loading
Loading