Skip to content
This repository has been archived by the owner on Apr 9, 2020. It is now read-only.

Commit

Permalink
ACCUMULO-375 hybridized ingest to use some bulk and some streaming
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1245142 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Adam Fuchs committed Feb 16, 2012
1 parent 0e1e67d commit ec56d2d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import java.util.Map.Entry;
import java.util.Set;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.ColumnVisibility;
Expand Down Expand Up @@ -112,6 +115,7 @@ public CountAndSet(String entry)
}
}

MultiTableBatchWriter mtbw;

@Override
public void setup(final Context context) {
Expand All @@ -121,6 +125,14 @@ public void setup(final Context context) {
reverseIndexTableName = new Text(tablename + "ReverseIndex");
metadataTableName = new Text(tablename + "Metadata");

try {
mtbw = WikipediaConfiguration.getConnector(conf).createMultiTableBatchWriter(10000000, 1000, 10);
} catch (AccumuloException e) {
throw new RuntimeException(e);
} catch (AccumuloSecurityException e) {
throw new RuntimeException(e);
}

final Text metadataTableNameFinal = metadataTableName;
final Text indexTableNameFinal = indexTableName;
final Text reverseIndexTableNameFinal = reverseIndexTableName;
Expand Down Expand Up @@ -163,7 +175,7 @@ public void output(MutationInfo key, CountAndSet value)
Mutation m = new Mutation(key.row);
m.put(key.colfam, key.colqual, key.cv, key.timestamp, val);
try {
context.write(indexTableNameFinal, m);
mtbw.getBatchWriter(indexTableNameFinal.toString()).addMutation(m);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -189,7 +201,7 @@ public void output(MutationInfo key, CountAndSet value)
Mutation m = new Mutation(key.row);
m.put(key.colfam, key.colqual, key.cv, key.timestamp, val);
try {
context.write(reverseIndexTableNameFinal, m);
mtbw.getBatchWriter(reverseIndexTableNameFinal.toString()).addMutation(m);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -210,7 +222,7 @@ public void output(MutationInfo key, Value value) {
Mutation m = new Mutation(key.row);
m.put(key.colfam, key.colqual, key.cv, key.timestamp, value);
try {
context.write(metadataTableNameFinal, m);
mtbw.getBatchWriter(metadataTableNameFinal.toString()).addMutation(m);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ private void flushLargestTable() throws IOException
if (buffer.size() == 0)
return;

// TODO fix the filename
String file = filenamePrefix + "/" + tablename + "/" + taskID + "_" + (fileCount++) + ".rf";
// TODO get the table configuration for the given table?
FileSKVWriter writer = RFileOperations.getInstance().openWriter(file, fs, conf, acuconf);

// forget locality groups for now, just write everything to the default
Expand Down Expand Up @@ -110,17 +110,18 @@ public void write(Text table, Mutation mutation) throws IOException, Interrupted
{
Key k = new Key(mutation.getRow(),update.getColumnFamily(),update.getColumnQualifier(),update.getColumnVisibility(),update.getTimestamp(),update.isDeleted());
Value v = new Value(update.getValue());
// TODO account for object overhead
mutationSize += k.getSize();
mutationSize += v.getSize();
buffer.put(k, v);
}
size += mutationSize;
long bufferSize = bufferSizes.get(table);

// TODO use a MutableLong instead
bufferSize += mutationSize;
bufferSizes.put(table, bufferSize);

// TODO add object overhead size


while (size >= maxSize) {
flushLargestTable();
}
Expand Down

0 comments on commit ec56d2d

Please sign in to comment.