Skip to content

Commit

Permalink
Improvements for concurrent indices (#259)
Browse files Browse the repository at this point in the history
* allow BitFileInMemoryLarge as reentrant
* fix bug in synchronisation of output format in ParallelTRECQuerying
* tests for making a index loaded into memory concurrent
* less synchronization
* BaseCompressingMetaIndex is reentrant under fileinmem
* fix for unit test
* use logger not printlns
  • Loading branch information
cmacdonald authored Jan 13, 2025
1 parent 94ff7b3 commit bd0dfd1
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,11 @@ protected Options getOptions() {
public ParallelTRECQuerying() {
super();
pool = Executors.newFixedThreadPool(NUM_PROC);
if (! (super.printer instanceof NullOutputFormat))
super.printer = new SynchronizedOutputFormat(super.printer);
}

public ParallelTRECQuerying(IndexRef i) {
super(i);
pool = Executors.newFixedThreadPool(NUM_PROC);
if (! (super.printer instanceof NullOutputFormat))
super.printer = new SynchronizedOutputFormat(super.printer);
}

@Deprecated
Expand All @@ -133,8 +129,14 @@ public ParallelTRECQuerying(boolean _queryexpansion) {
pool = Executors.newFixedThreadPool(NUM_PROC);
if (_queryexpansion)
controls.put("qe", "on");
if (! (super.printer instanceof NullOutputFormat))
super.printer = new SynchronizedOutputFormat(super.printer);

}

protected OutputFormat getOutputFormat() {
OutputFormat rtr = super.getOutputFormat();
if (! (rtr instanceof NullOutputFormat))
rtr = new SynchronizedOutputFormat(rtr);
return rtr;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
*/
package org.terrier.structures.bit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terrier.compression.bit.*;
import org.terrier.structures.DocumentIndex;
import org.terrier.structures.bit.BitPostingIndex;

public class ConcurrentBitPostingIndexUtilities {

static Logger logger = LoggerFactory.getLogger(ConcurrentBitPostingIndexUtilities.class);
private static final boolean USE_CHANNEL = true;

public static boolean isConcurrent(BitPostingIndex bpi) {
Expand All @@ -40,7 +43,7 @@ public static boolean isConcurrent(BitPostingIndex bpi) {
if (bis instanceof ConcurrentBitFileBuffered)
return true;
if (bis instanceof BitFileInMemoryLarge)
return false;
return true;
if (bis instanceof BitFileInMemory)
return true;
return false;
Expand All @@ -58,8 +61,9 @@ public static void makeConcurrent(BitPostingIndex bpi, DocumentIndex newDoi)
BitFileBuffered theFile = (BitFileBuffered)bis;
BitFileChannel newFile = BitFileChannel.of(theFile);
bpi.file[i] = newFile;
logger.debug("Changing BitFileBuffered to BitFileChannel");
} else if (bis instanceof BitFileInMemoryLarge) {
throw new UnsupportedOperationException("Cannot make BitFileInMemoryLarge thread-safe");
logger.debug("BitFileInMemoryLarge already re-entrant");
}
}
else
Expand All @@ -69,8 +73,9 @@ public static void makeConcurrent(BitPostingIndex bpi, DocumentIndex newDoi)
BitFileBuffered theFile = (BitFileBuffered)bis;
ConcurrentBitFileBuffered newFile = ConcurrentBitFileBuffered.of(theFile);
bpi.file[i] = newFile;
logger.debug("Changing BitFileBuffered to ConcurrentBitFileBuffered");
} else if (bis instanceof BitFileInMemoryLarge) {
throw new UnsupportedOperationException("Cannot make BitFileInMemoryLarge thread-safe");
logger.debug("BitFileInMemoryLarge already re-entrant");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ public DocumentIndexEntry getDocumentEntry(int docid) throws IOException {
}

public int getDocumentLength(int docid) throws IOException {
synchronized (parent) {
return parent.getDocumentLength(docid);
}
// we dont synchronize on getDocumentLength() - we assume this is always in memory and reentrant
return parent.getDocumentLength(docid);
}

public int getNumberOfDocuments() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
package org.terrier.structures.concurrent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terrier.structures.BaseCompressingMetaIndex;
import org.terrier.structures.ConcurrentReadable;
import org.terrier.structures.DocumentIndex;
import org.terrier.structures.FieldDocumentIndex;
Expand All @@ -43,9 +44,10 @@ public class ConcurrentIndexUtils {
public static boolean USE_CONCURRENT_DECODE_METAINDEX = false;
static Logger logger = LoggerFactory.getLogger(ConcurrentIndexUtils.class);
private static final String[] BIT_STRUCTURES = {"inverted", "direct"};
private static final String[] META_STRUCTURES = {"meta"};

public static boolean isConcurrent(Index index) {
String[] structures = new String[]{"document", "lexicon", "meta"};
String[] structures = new String[]{"document", "lexicon"};
for (String s : structures) {
if (! index.hasIndexStructure(s))
continue;
Expand All @@ -55,6 +57,21 @@ public static boolean isConcurrent(Index index) {
return false;
}
}
for (String s : META_STRUCTURES) {
if (! index.hasIndexStructure(s))
continue;
MetaIndex meta = (MetaIndex) index.getIndexStructure(s);
if (meta instanceof BaseCompressingMetaIndex) {
// BaseCompressingMetaIndex already compliant if its loaded in memory
if (! (BaseCompressingMetaIndex.isConcurrent((BaseCompressingMetaIndex)meta))){
logger.debug("Structure " + s + " is a BaseCompressingMetaIndex, but not concurrent readable.");
return false;
}
} else if (! meta.getClass().isAnnotationPresent(ConcurrentReadable.class) ) {
logger.debug("Structure " + s + " is not concurrent readable");
return false;
}
}

for(String s : BIT_STRUCTURES) {
if (! index.hasIndexStructure(s))
Expand Down Expand Up @@ -129,6 +146,13 @@ public static Index makeConcurrentForRetrieval(Index index) {
if (index.hasIndexStructure("meta") && ! index.getMetaIndex().getClass().isAnnotationPresent(ConcurrentReadable.class) )
{
MetaIndex oldmeta = index.getMetaIndex();
if (oldmeta instanceof BaseCompressingMetaIndex) {
// BaseCompressingMetaIndex already compliant if its loaded in memory
if (BaseCompressingMetaIndex.isConcurrent((BaseCompressingMetaIndex)oldmeta)) {
logger.debug("Metaindex already concurrent");
return index;
}
}
logger.debug("Upgrading meta index "+oldmeta.getClass().getName()+" to be concurrent");
//logger.debug(String.valueOf(index.getMetaIndex().getClass().isAnnotationPresent(ConcurrentReadable.class)));
MetaIndex newmeta = new ConcurrentMetaIndex(oldmeta);
Expand All @@ -144,7 +168,7 @@ public static Index makeConcurrentForRetrieval(Index index) {
assert newmeta.getClass().isAnnotationPresent(ConcurrentReadable.class);
IndexUtil.forceStructure(index, "meta", newmeta);
}

// NB: dont add anything else here, as return at line 138
return index;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.terrier.structures.concurrent.ConcurrentIndexUtils;
import org.terrier.structures.ConcurrentReadable;
import org.terrier.structures.Index;
import org.terrier.structures.IndexOnDisk;
import org.terrier.structures.IndexFactory;
import org.terrier.tests.ApplicationSetupBasedTest;

Expand All @@ -56,6 +57,30 @@ public class TestConcurrentIndexLoader extends ApplicationSetupBasedTest {
assertTrue(ConcurrentIndexUtils.isConcurrent(concurrent));
assertTrue(concurrent.getLexicon().getClass().isAnnotationPresent(ConcurrentReadable.class));
}

@Test public void testNewIndex_Mem() throws Exception
{
Index index = IndexTestUtils.makeIndex(new String[]{"doc1", "doc2"}, new String[]{"the quick fox", "and all that stuff"});
IndexOnDisk iod = (IndexOnDisk) index;
iod.setIndexProperty("index.inverted.data-source", "fileinmem");
iod.flush();
//IndexRef ref = iod.getIndexRef();
IndexRef ref = IndexRef.of(iod.getIndexRef().toString());
iod.close();

assertFalse(IndexFactory.isLoaded(ref));
index = IndexFactory.of(ref);
System.out.println(((IndexOnDisk) index).getIndexProperty("index.inverted.data-source", null));
assertFalse(ConcurrentIndexUtils.isConcurrent(index));
System.out.println(ref.toString());

IndexRef concRef = ConcurrentIndexLoader.makeConcurrent(ref);
System.out.println(concRef.toString());
Index concurrent = IndexFactory.of(concRef);
assertNotNull(concurrent);
assertTrue(ConcurrentIndexUtils.isConcurrent(concurrent));
assertTrue(concurrent.getLexicon().getClass().isAnnotationPresent(ConcurrentReadable.class));
}

@Test public void testDirectIndex() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.terrier.structures.concurrent;
import org.terrier.structures.*;
import org.terrier.tests.BatchEndToEndTest;
import org.terrier.tests.BatchEndToEndTest.BatchEndToEndTestEventHooks;

import static org.junit.Assert.assertEquals;
public class TestShakParallelTRECQueryingMem extends TestShakParallelTRECQuerying {

static class Hook extends BatchEndToEndTestEventHooks
{
public void finishedIndexing(BatchEndToEndTest test) throws Exception
{
IndexOnDisk iod = IndexOnDisk.createIndex();
iod.setIndexProperty("index.inverted.data-source", "fileinmem");
iod.flush();
iod.close();
}

public void checkIndex(BatchEndToEndTest test, Index index) throws Exception
{
IndexOnDisk iod = (IndexOnDisk) index;
assertEquals("fileinmem", iod.getIndexProperty("index.inverted.data-source", null));
}
}

public TestShakParallelTRECQueryingMem() {
super();
this.testHooks.add(new Hook());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,7 @@ public void close() throws IOException
protected final String prefix;

protected final ByteAccessor dataSource;
protected boolean threadSafe = false;
protected Map<Text,IntWritable>[] reverseMetaMaps;
protected FixedSizeWriteableFactory<Text>[] keyFactories;

Expand Down Expand Up @@ -707,6 +708,7 @@ public BaseCompressingMetaIndex(IndexOnDisk index, String structureName)
final DataInputStream di = new DataInputStream(Files.openFileStream(dataFilename));
_dataSource = new RandomDataInputAccessor(new RandomDataInputMemory(di, dataFileLength));
di.close();
threadSafe = true;
} catch (OutOfMemoryError oome) {
logger.warn("OutOfMemoryError: Structure "+ structureName + " reading data file directly from disk");
//logger.debug("Metadata will be read directly from disk");
Expand Down Expand Up @@ -1089,6 +1091,11 @@ else if (loadFormat.equals("mapfileinmem"))
i++;
}
}

public static boolean isConcurrent(BaseCompressingMetaIndex m) {
return m.threadSafe;
}

/**
* main
* @param args
Expand Down

0 comments on commit bd0dfd1

Please sign in to comment.