From 6a0f5f499ea52f2fe79333e863602197a37225fc Mon Sep 17 00:00:00 2001 From: SriramKeerthi Date: Mon, 28 Nov 2016 23:48:31 -0800 Subject: [PATCH 1/3] Adding JsonIterator to convert between Iterator types, starting benchmarking module --- jaggr/jaggr-bench/pom.xml | 31 +++ .../com/caffinc/jaggr/bench/Benchmark.java | 221 ++++++++++++++++++ .../src/main/resources/log4j.properties | 8 + .../com/caffinc/jaggr/utils/JsonIterator.java | 109 ++------- .../jaggr/utils/JsonStringIterator.java | 132 +++++++++++ .../caffinc/jaggr/utils/JsonIteratorTest.java | 61 ++--- .../jaggr/utils/JsonStringIteratorTest.java | 50 ++++ .../com/caffinc/jaggr/core/Aggregation.java | 16 +- jaggr/pom.xml | 15 +- 9 files changed, 514 insertions(+), 129 deletions(-) create mode 100644 jaggr/jaggr-bench/pom.xml create mode 100644 jaggr/jaggr-bench/src/main/java/com/caffinc/jaggr/bench/Benchmark.java create mode 100644 jaggr/jaggr-bench/src/main/resources/log4j.properties create mode 100644 jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonStringIterator.java create mode 100644 jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonStringIteratorTest.java diff --git a/jaggr/jaggr-bench/pom.xml b/jaggr/jaggr-bench/pom.xml new file mode 100644 index 0000000..26097b6 --- /dev/null +++ b/jaggr/jaggr-bench/pom.xml @@ -0,0 +1,31 @@ + + + + jaggr-parent + com.caffinc + 0.2.2 + + 4.0.0 + + jaggr-bench + + + + org.slf4j + slf4j-log4j12 + 1.7.21 + + + com.caffinc + jaggr + 0.2.2 + + + org.mongodb + mongo-java-driver + 3.2.2 + + + \ No newline at end of file diff --git a/jaggr/jaggr-bench/src/main/java/com/caffinc/jaggr/bench/Benchmark.java b/jaggr/jaggr-bench/src/main/java/com/caffinc/jaggr/bench/Benchmark.java new file mode 100644 index 0000000..64e3842 --- /dev/null +++ b/jaggr/jaggr-bench/src/main/java/com/caffinc/jaggr/bench/Benchmark.java @@ -0,0 +1,221 @@ +package com.caffinc.jaggr.bench; + +import com.caffinc.jaggr.core.Aggregation; +import com.caffinc.jaggr.core.AggregationBuilder; +import com.caffinc.jaggr.core.operations.CollectSetOperation; +import com.mongodb.BasicDBObject; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author srira + * @since 11/28/2016 + */ +public class Benchmark { + private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class); + private static final int BATCH_SIZE = 10000; + private static final DBCollection BENCHMARK_COLLECTION = + new MongoClient("localhost").getDB("jaggr").getCollection("benchmark"); + private static final Random RANDOM = new Random(); + private static final String PLACEHOLDER = + "STRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRING" + + "STRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRING" + + "STRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRING"; + + public static void main(String[] args) throws Exception { +// generateDocuments(BENCHMARK_COLLECTION, 1000000, getFieldDefinitions()); + final Aggregation aggregation = new AggregationBuilder() + .setGroupBy("name") + .addOperation("groupedIds", new CollectSetOperation("_id")) + .getAggregation(); + long startTime; + LOG.info("Computing read time"); + startTime = System.currentTimeMillis(); + Iterator> dbObjectIterator = new Iterator>() { + private Iterator objectIterator = BENCHMARK_COLLECTION.find().iterator(); + + @Override + public boolean hasNext() { + return objectIterator.hasNext(); + } + + @Override + public Map next() { + if (objectIterator.hasNext()) + return objectIterator.next().toMap(); + else + return null; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + List> inMemList = new ArrayList<>(); + while (dbObjectIterator.hasNext()) { + inMemList.add(dbObjectIterator.next()); + } + LOG.info("Read time: {}ms {} docs", (System.currentTimeMillis() - startTime), inMemList.size()); + + LOG.info("Starting aggregation"); + startTime = System.currentTimeMillis(); + List> result = aggregation.aggregate(new Iterator>() { + private Iterator objectIterator = BENCHMARK_COLLECTION.find().iterator(); + + @Override + public boolean hasNext() { + return objectIterator.hasNext(); + } + + @Override + public Map next() { + if (objectIterator.hasNext()) + return objectIterator.next().toMap(); + else + return null; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }); + LOG.info("Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size()); + + startTime = System.currentTimeMillis(); + LOG.info("Starting native aggregation"); + List> mongoResults = new ArrayList<>(); + for (DBObject dbObject : BENCHMARK_COLLECTION.aggregate( + Arrays.asList( + new BasicDBObject("$group", new BasicDBObject("_id", "$name").append("groupedIds", new BasicDBObject("$addToSet", "$_id"))) + ) + ).results()) { + mongoResults.add(dbObject.toMap()); + } + LOG.info("Mongo Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), mongoResults.size()); + + + LOG.info("Starting aggregation"); + startTime = System.currentTimeMillis(); + result = aggregation.aggregate(inMemList); + LOG.info("In-memory Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size()); + + int docCount = inMemList.size(); + int threadCount = 10; + final int limit = Double.valueOf(Math.ceil(((double) docCount) / threadCount)).intValue(); + final AtomicInteger counter = new AtomicInteger(0); + LOG.info("Starting multithreaded aggregation"); + startTime = System.currentTimeMillis(); + for (int i = 0; i < threadCount; i++) { + final int batchId = i; + new Thread() { + @Override + public void run() { + runAggregation(counter, aggregation, limit, batchId); + } + }.start(); + } + while(counter.get() < docCount) { + Thread.sleep(1); + } + LOG.info("Multi-threaded Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size()); + } + + private static void runAggregation(final AtomicInteger counter, final Aggregation aggregation, final int limit, final int batchId) { + LOG.info("Starting aggregation"); + long startTime = System.currentTimeMillis(); + List> result = aggregation.aggregate(new Iterator>() { + private Iterator objectIterator = BENCHMARK_COLLECTION.find().skip(limit * batchId).limit(limit).iterator(); + + @Override + public boolean hasNext() { + return objectIterator.hasNext(); + } + + @Override + public Map next() { + counter.incrementAndGet(); + if (objectIterator.hasNext()) + return objectIterator.next().toMap(); + else + return null; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }); + LOG.info("Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size()); + } + + private static Map getFieldDefinitions() { + Map fieldDefinitions = new HashMap<>(); + fieldDefinitions.put("_id", "$id"); + fieldDefinitions.put("name", "$string_10_5"); + fieldDefinitions.put("age", "$int_10_99"); + fieldDefinitions.put("sex", "$choice_m|f"); + fieldDefinitions.put("salary", "$int_100000_1000000"); + fieldDefinitions.put("appeal", "$double"); + return fieldDefinitions; + } + + private static void generateDocuments( + DBCollection benchmarkCollection, int count, Map fieldDefinitions) { + List dbObjectList = new ArrayList<>(); + for (int i = 0; i < count; i++) { + Map doc = new HashMap<>(); + for (Map.Entry field : fieldDefinitions.entrySet()) { + doc.put(field.getKey(), generateField(field.getValue(), field.getKey(), i)); + } + dbObjectList.add(new BasicDBObject(doc)); + if (dbObjectList.size() >= BATCH_SIZE) { + LOG.info("Generated " + (i + 1) + " documents"); + benchmarkCollection.insert(dbObjectList); + dbObjectList = new ArrayList<>(); + } + } + if (dbObjectList.size() > 0) { + LOG.info("Generated " + count + " documents"); + benchmarkCollection.insert(dbObjectList); + } + } + + private static Object generateField(String fieldDefinition, String field, Integer id) { + if (fieldDefinition == null || !fieldDefinition.startsWith("$")) { + return fieldDefinition; + } else { + String[] definitionParts = fieldDefinition.split("_"); + String type = definitionParts[0]; + int min; + int max; + switch (type) { + case "$id": + return id; + case "$string": + min = Integer.parseInt(definitionParts[1]); + max = Integer.parseInt(definitionParts[2]); + int minVal = Double.valueOf(Math.pow(10, max - 1)).intValue(); + int maxVal = Double.valueOf(Math.pow(10, max)).intValue(); + return PLACEHOLDER.substring(0, min - max) + (minVal + RANDOM.nextInt(maxVal - minVal)); + case "$int": + min = Integer.parseInt(definitionParts[1]); + max = Integer.parseInt(definitionParts[2]); + return min + RANDOM.nextInt(max - min); + case "$choice": + String[] choices = definitionParts[1].split("\\|"); + return choices[RANDOM.nextInt(choices.length)]; + case "$double": + return RANDOM.nextDouble(); + } + } + return fieldDefinition; + } +} diff --git a/jaggr/jaggr-bench/src/main/resources/log4j.properties b/jaggr/jaggr-bench/src/main/resources/log4j.properties new file mode 100644 index 0000000..1da8230 --- /dev/null +++ b/jaggr/jaggr-bench/src/main/resources/log4j.properties @@ -0,0 +1,8 @@ +# Root logger option +log4j.rootLogger=INFO, stdout + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n diff --git a/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonIterator.java b/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonIterator.java index ae533e8..b4b02a4 100644 --- a/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonIterator.java +++ b/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonIterator.java @@ -1,123 +1,52 @@ package com.caffinc.jaggr.utils; -import com.google.gson.Gson; - -import java.io.*; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.NoSuchElementException; /** - * Iterates a JSON file + * Converts any Iterator into a JSON Iterator for jaggr * * @author Sriram - * @since 11/27/2016 + * @since 11/28/2016 */ -public class JsonIterator implements Iterator>, Closeable { - private final BufferedReader bufferedReader; - private String cachedLine; - private boolean finished = false; - private Gson gson = new Gson(); - - /** - * Constructs an iterator of the lines for a fileName. - * - * @param fileName the fileName to read from - * @throws IOException thrown if there is a problem accessing the file - */ - public JsonIterator(final String fileName) throws IOException { - this(new BufferedReader(new FileReader(fileName))); - } +public abstract class JsonIterator implements Iterator> { + private Iterator iterator; /** - * Constructs an iterator of the lines for a Reader. + * Constructs an iterator wrapper of the objects in the passed Iterator. * - * @param reader the Reader to read from, not null + * @param iterator the underlying Iterator to read from, not null * @throws IllegalArgumentException if the reader is null */ - public JsonIterator(final Reader reader) throws IllegalArgumentException { - if (reader == null) { - throw new IllegalArgumentException("Reader must not be null"); - } - if (reader instanceof BufferedReader) { - bufferedReader = (BufferedReader) reader; - } else { - bufferedReader = new BufferedReader(reader); - } + public JsonIterator(Iterator iterator) { + if (iterator == null) + throw new IllegalArgumentException("Iterator must not be null"); + this.iterator = iterator; } /** - * Indicates whether the Reader has more lines. - * If there is an IOException then {@link #close()} will - * be called on this instance. + * Indicates whether the underlying Iterator has more objects * - * @return {@code true} if the Reader has more lines - * @throws IllegalStateException if an IO exception occurs + * @return {@code true} if the Iterator has more objects */ + @Override public boolean hasNext() { - if (cachedLine != null) { - return true; - } else if (finished) { - return false; - } else { - try { - while (true) { - final String line = bufferedReader.readLine(); - if (line == null) { - finished = true; - return false; - } - cachedLine = line; - return true; - } - } catch (final IOException ioe) { - close(); - throw new IllegalStateException(ioe); - } - } + return iterator.hasNext(); } /** - * Returns the next object in the file or wrapped Reader. + * Returns the next object in the wrapped Iterator. * * @return the next JSON object from the input * @throws NoSuchElementException if there is no object to return */ + @Override public Map next() { - return nextObject(); - } - - /** - * Returns the next object in the file or wrapped Reader. - * - * @return the next JSON object from the input - * @throws NoSuchElementException if there is no object to return - */ - public Map nextObject() { if (!hasNext()) { throw new NoSuchElementException("No more objects"); } - final String currentLine = cachedLine; - cachedLine = null; - return gson.fromJson(currentLine, HashMap.class); - } - - /** - * Closes the underlying Reader quietly. - * This method is useful if you only want to process the first few - * lines of a larger file. If you do not close the iterator - * then the Reader remains open. - * This method can safely be called multiple times. - */ - public void close() { - finished = true; - try { - bufferedReader.close(); - } catch (final IOException ioe) { - // ignore - } - cachedLine = null; + return toJson(iterator.next()); } /** @@ -126,7 +55,9 @@ public void close() { * @throws UnsupportedOperationException always */ public void remove() { - throw new UnsupportedOperationException("Remove unsupported on JsonIterator"); + throw new UnsupportedOperationException("Remove unsupported on JsonStringIterator"); } + + public abstract Map toJson(T element); } diff --git a/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonStringIterator.java b/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonStringIterator.java new file mode 100644 index 0000000..7908c9a --- /dev/null +++ b/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonStringIterator.java @@ -0,0 +1,132 @@ +package com.caffinc.jaggr.utils; + +import com.google.gson.Gson; + +import java.io.*; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; + +/** + * Iterates a JSON file + * + * @author Sriram + * @since 11/27/2016 + */ +public class JsonStringIterator implements Iterator>, Closeable { + private final BufferedReader bufferedReader; + private String cachedLine; + private boolean finished = false; + private Gson gson = new Gson(); + + /** + * Constructs an iterator of the lines for a fileName. + * + * @param fileName the fileName to read from + * @throws IOException thrown if there is a problem accessing the file + */ + public JsonStringIterator(final String fileName) throws IOException { + this(new BufferedReader(new FileReader(fileName))); + } + + /** + * Constructs an iterator of the lines for a Reader. + * + * @param reader the Reader to read from, not null + * @throws IllegalArgumentException if the reader is null + */ + public JsonStringIterator(final Reader reader) throws IllegalArgumentException { + if (reader == null) { + throw new IllegalArgumentException("Reader must not be null"); + } + if (reader instanceof BufferedReader) { + bufferedReader = (BufferedReader) reader; + } else { + bufferedReader = new BufferedReader(reader); + } + } + + /** + * Indicates whether the Reader has more lines. + * If there is an IOException then {@link #close()} will + * be called on this instance. + * + * @return {@code true} if the Reader has more lines + * @throws IllegalStateException if an IO exception occurs + */ + public boolean hasNext() { + if (cachedLine != null) { + return true; + } else if (finished) { + return false; + } else { + try { + while (true) { + final String line = bufferedReader.readLine(); + if (line == null) { + finished = true; + return false; + } + cachedLine = line; + return true; + } + } catch (final IOException ioe) { + close(); + throw new IllegalStateException(ioe); + } + } + } + + /** + * Returns the next object in the file or wrapped Reader. + * + * @return the next JSON object from the input + * @throws NoSuchElementException if there is no object to return + */ + public Map next() { + return nextObject(); + } + + /** + * Returns the next object in the file or wrapped Reader. + * + * @return the next JSON object from the input + * @throws NoSuchElementException if there is no object to return + */ + public Map nextObject() { + if (!hasNext()) { + throw new NoSuchElementException("No more objects"); + } + final String currentLine = cachedLine; + cachedLine = null; + return gson.fromJson(currentLine, HashMap.class); + } + + /** + * Closes the underlying Reader quietly. + * This method is useful if you only want to process the first few + * lines of a larger file. If you do not close the iterator + * then the Reader remains open. + * This method can safely be called multiple times. + */ + public void close() { + finished = true; + try { + bufferedReader.close(); + } catch (final IOException ioe) { + // ignore + } + cachedLine = null; + } + + /** + * Unsupported. + * + * @throws UnsupportedOperationException always + */ + public void remove() { + throw new UnsupportedOperationException("Remove unsupported on JsonStringIterator"); + } + +} diff --git a/jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonIteratorTest.java b/jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonIteratorTest.java index 42d48de..511d412 100644 --- a/jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonIteratorTest.java +++ b/jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonIteratorTest.java @@ -4,47 +4,48 @@ import org.junit.Assert; import org.junit.Test; -import java.io.BufferedWriter; -import java.io.FileWriter; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.*; /** * Tests for the JsonIterator * * @author Sriram - * @since 11/27/2016 + * @since 11/28/2016 */ public class JsonIteratorTest { - private static final String TEMP_DIR = System.getProperty("java.io.tmpdir"); - private static final Random RANDOM = new Random(); - private static final Gson GSON = new Gson(); - @Test - public void testJsonFileIterator() throws Exception { - Path tempFilePath = Paths.get(TEMP_DIR, "jsontest" + RANDOM.nextInt() + ".json"); - try { - List> expectedData = new ArrayList<>(); - try (BufferedWriter br = new BufferedWriter(new FileWriter(tempFilePath.toFile())) - ) { - for (int i = 0; i < 10; i++) { - Map json = new HashMap<>(); - json.put("_id", (double) i); - json.put("val", RANDOM.nextDouble()); - expectedData.add(json); - br.write(GSON.toJson(json) + "\n"); - } + public void testIterator() { + List stringList = Arrays.asList( + "{\"i\": 0}", + "{\"i\": 1}", + "{\"i\": 2}", + "{\"i\": 3}", + "{\"i\": 4}", + "{\"i\": 5}", + "{\"i\": 6}", + "{\"i\": 7}", + "{\"i\": 8}", + "{\"i\": 9}" + ); + final JsonIterator jsonIterator = new JsonIterator(stringList.iterator()) { + private Gson gson = new Gson(); + + @Override + public Map toJson(String element) { + return gson.fromJson(element, HashMap.class); } - try (JsonIterator jsonIterator = new JsonIterator(tempFilePath.toString())) { - for (Map expected : expectedData) { - Map actual = jsonIterator.next(); - Assert.assertEquals("Value should match value written to file", expected, actual); - } + }; + Iterable> iterable = new Iterable>() { + @Override + public Iterator> iterator() { + return jsonIterator; } - } finally { - Files.delete(tempFilePath); + }; + int size = 0; + for (Map obj : iterable) { + Assert.assertEquals("Value must match the input", (double) size, obj.get("i")); + size++; } + Assert.assertEquals("Number of values should match input", stringList.size(), size); } } diff --git a/jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonStringIteratorTest.java b/jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonStringIteratorTest.java new file mode 100644 index 0000000..f99417c --- /dev/null +++ b/jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonStringIteratorTest.java @@ -0,0 +1,50 @@ +package com.caffinc.jaggr.utils; + +import com.google.gson.Gson; +import org.junit.Assert; +import org.junit.Test; + +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; + +/** + * Tests for the JsonStringIterator + * + * @author Sriram + * @since 11/27/2016 + */ +public class JsonStringIteratorTest { + private static final String TEMP_DIR = System.getProperty("java.io.tmpdir"); + private static final Random RANDOM = new Random(); + private static final Gson GSON = new Gson(); + + @Test + public void testJsonFileIterator() throws Exception { + Path tempFilePath = Paths.get(TEMP_DIR, "jsontest" + RANDOM.nextInt() + ".json"); + try { + List> expectedData = new ArrayList<>(); + try (BufferedWriter br = new BufferedWriter(new FileWriter(tempFilePath.toFile())) + ) { + for (int i = 0; i < 10; i++) { + Map json = new HashMap<>(); + json.put("_id", (double) i); + json.put("val", RANDOM.nextDouble()); + expectedData.add(json); + br.write(GSON.toJson(json) + "\n"); + } + } + try (JsonStringIterator jsonStringIterator = new JsonStringIterator(tempFilePath.toString())) { + for (Map expected : expectedData) { + Map actual = jsonStringIterator.next(); + Assert.assertEquals("Value should match value written to file", expected, actual); + } + } + } finally { + Files.delete(tempFilePath); + } + } +} diff --git a/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/Aggregation.java b/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/Aggregation.java index f61cbe6..7e23695 100644 --- a/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/Aggregation.java +++ b/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/Aggregation.java @@ -28,15 +28,13 @@ public class Aggregation { * @param objectIterator JSON Object iterator * @return aggregation result */ - public List> aggregate(Iterator> objectIterator) { - Map> workspace = new HashMap<>(); - if (objectIterator != null) { - while (objectIterator.hasNext()) { - Map object = objectIterator.next(); - aggregate(object, workspace); + public List> aggregate(final Iterator> objectIterator) { + return aggregate(new Iterable>() { + @Override + public Iterator> iterator() { + return objectIterator; } - } - return computeResults(workspace); + }); } /** @@ -45,7 +43,7 @@ public List> aggregate(Iterator> objectI * @param objectList Iterable list of JSON Objects * @return aggregation result */ - public List> aggregate(Iterable> objectList) { + public List> aggregate(final Iterable> objectList) { Map> workspace = new HashMap<>(); if (objectList != null) { for (Map object : objectList) { diff --git a/jaggr/pom.xml b/jaggr/pom.xml index 109c0c0..80934c1 100644 --- a/jaggr/pom.xml +++ b/jaggr/pom.xml @@ -11,6 +11,7 @@ jaggr-utils jaggr + jaggr-bench jaggr Parent Simple JSON Aggregator for Java @@ -42,7 +43,19 @@ 2.6.2 UTF-8 - + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.3 + + 1.7 + 1.7 + + + + junit From 04d8cdb29de88a572e8a999569c846f4c1086104 Mon Sep 17 00:00:00 2001 From: SriramKeerthi Date: Mon, 28 Nov 2016 23:50:08 -0800 Subject: [PATCH 2/3] Updating pom version --- jaggr/jaggr-bench/pom.xml | 4 ++-- jaggr/jaggr-utils/pom.xml | 2 +- jaggr/jaggr/pom.xml | 2 +- jaggr/pom.xml | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/jaggr/jaggr-bench/pom.xml b/jaggr/jaggr-bench/pom.xml index 26097b6..5f70da6 100644 --- a/jaggr/jaggr-bench/pom.xml +++ b/jaggr/jaggr-bench/pom.xml @@ -5,7 +5,7 @@ jaggr-parent com.caffinc - 0.2.2 + 0.3.0 4.0.0 @@ -20,7 +20,7 @@ com.caffinc jaggr - 0.2.2 + 0.3.0 org.mongodb diff --git a/jaggr/jaggr-utils/pom.xml b/jaggr/jaggr-utils/pom.xml index 4287100..5f92905 100644 --- a/jaggr/jaggr-utils/pom.xml +++ b/jaggr/jaggr-utils/pom.xml @@ -5,7 +5,7 @@ jaggr-parent com.caffinc - 0.2.2 + 0.3.0 4.0.0 diff --git a/jaggr/jaggr/pom.xml b/jaggr/jaggr/pom.xml index 87ec8b4..4717875 100644 --- a/jaggr/jaggr/pom.xml +++ b/jaggr/jaggr/pom.xml @@ -5,7 +5,7 @@ jaggr-parent com.caffinc - 0.2.2 + 0.3.0 4.0.0 diff --git a/jaggr/pom.xml b/jaggr/pom.xml index 80934c1..bbaffca 100644 --- a/jaggr/pom.xml +++ b/jaggr/pom.xml @@ -7,7 +7,7 @@ com.caffinc jaggr-parent pom - 0.2.2 + 0.3.0 jaggr-utils jaggr From 98d81755396f2542b8757c84043e5c600edfc7d3 Mon Sep 17 00:00:00 2001 From: SriramKeerthi Date: Tue, 29 Nov 2016 00:29:21 -0800 Subject: [PATCH 3/3] Updating benchmark code and README.md --- README.md | 46 +++++++++++++++++++ jaggr/jaggr-bench/pom.xml | 9 +++- .../com/caffinc/jaggr/bench/Benchmark.java | 46 ++++--------------- 3 files changed, 62 insertions(+), 39 deletions(-) diff --git a/README.md b/README.md index a4ce625..bcbbc31 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,8 @@ Simple JSON Aggregator for Java ![Travis-CI Build Status](https://travis-ci.org/caffinc/jaggr.svg?branch=master) ## Usage + +### Adding dependency jaggr is on Bintray and Maven Central (Soon): @@ -19,6 +21,7 @@ jaggr is on Bintray and Maven Central (Soon): 0.2.2 +### Aggregating documents Assume the following JSON documents are stored in a file called `raw.json`: {"_id": 1, "f": "a", "test": {"f": 3}} @@ -62,6 +65,49 @@ The result of the above aggregation would look as follows: {"_id": "a", "avg": 2.0, "sum": 10, "min": -1, "max": 5, "count": 5} {"_id": "b", "avg": 1.0, "sum": 5, "min": 1, "max": 1, "count": 5} +### Aggregating other data sources + +While aggregating files or Lists of JSON documents might be good for some use cases, not all data fits this paradigm. + +There are three utilities in the `jaggr-utils` library which can be used to aggregate other sources of data. + +#### Aggregating small JSON files in the file system or resources + +The `JsonFileReader` class exposes the `readJsonFromFile` and `readJsonFromResource` methods which can be used to read in all the JSON objects from the file into memory for aggregation. + +It is generally not a good idea to read in large files due to obvious reasons. + + List> jsonData = JsonFileReader.readJsonFromFile("afile.json"); + + List> jsonData = JsonFileReader.readJsonFromResource("aFileInResources.json"); + + List> result = aggregation.aggregate(iterator); + +#### Aggregating large JSON files or readers + +The `JsonStringIterator` class provides constructors to iterate through a JSON file or a `Reader` object pointing to an underlying JSON String source without loading all the data into memory. + + Iterator> iterator = new JsonStringIterator("afile.json"); + + Iterator> iterator = new JsonStringIterator(new BufferedReader(new FileReader("afile.json"))); + + List> result = aggregation.aggregate(iterator); + +#### Aggregate arbitrary object Iterators + +The `JsonIterator` abstract class provides a way to convert an `Iterator` from any type to JSON. This can be used to iterate through data coming from arbitrary databases. For example, `MongoDB` provides `Iterable` interfaces to the data. You could aggregate an entire collection as follows: + + + Iterator> iterator = new JsonIterator(mongoCollection.find().iterator()) { + @Override + public Map toJson(DBObject element) { + return element.toMap(); + } + }; + + List> result = aggregation.aggregate(iterator); + + ## Supported Aggregations `jaggr` provides the following aggregations: diff --git a/jaggr/jaggr-bench/pom.xml b/jaggr/jaggr-bench/pom.xml index 5f70da6..5844490 100644 --- a/jaggr/jaggr-bench/pom.xml +++ b/jaggr/jaggr-bench/pom.xml @@ -18,9 +18,14 @@ 1.7.21 - com.caffinc + ${project.groupId} jaggr - 0.3.0 + ${project.version} + + + ${project.groupId} + jaggr-utils + ${project.version} org.mongodb diff --git a/jaggr/jaggr-bench/src/main/java/com/caffinc/jaggr/bench/Benchmark.java b/jaggr/jaggr-bench/src/main/java/com/caffinc/jaggr/bench/Benchmark.java index 64e3842..59ea100 100644 --- a/jaggr/jaggr-bench/src/main/java/com/caffinc/jaggr/bench/Benchmark.java +++ b/jaggr/jaggr-bench/src/main/java/com/caffinc/jaggr/bench/Benchmark.java @@ -3,6 +3,7 @@ import com.caffinc.jaggr.core.Aggregation; import com.caffinc.jaggr.core.AggregationBuilder; import com.caffinc.jaggr.core.operations.CollectSetOperation; +import com.caffinc.jaggr.utils.JsonIterator; import com.mongodb.BasicDBObject; import com.mongodb.DBCollection; import com.mongodb.DBObject; @@ -37,25 +38,10 @@ public static void main(String[] args) throws Exception { long startTime; LOG.info("Computing read time"); startTime = System.currentTimeMillis(); - Iterator> dbObjectIterator = new Iterator>() { - private Iterator objectIterator = BENCHMARK_COLLECTION.find().iterator(); - + Iterator> dbObjectIterator = new JsonIterator(BENCHMARK_COLLECTION.find().iterator()) { @Override - public boolean hasNext() { - return objectIterator.hasNext(); - } - - @Override - public Map next() { - if (objectIterator.hasNext()) - return objectIterator.next().toMap(); - else - return null; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); + public Map toJson(DBObject element) { + return element.toMap(); } }; List> inMemList = new ArrayList<>(); @@ -66,25 +52,10 @@ public void remove() { LOG.info("Starting aggregation"); startTime = System.currentTimeMillis(); - List> result = aggregation.aggregate(new Iterator>() { - private Iterator objectIterator = BENCHMARK_COLLECTION.find().iterator(); - + List> result = aggregation.aggregate(new JsonIterator(BENCHMARK_COLLECTION.find().iterator()) { @Override - public boolean hasNext() { - return objectIterator.hasNext(); - } - - @Override - public Map next() { - if (objectIterator.hasNext()) - return objectIterator.next().toMap(); - else - return null; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); + public Map toJson(DBObject element) { + return element.toMap(); } }); LOG.info("Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size()); @@ -122,10 +93,11 @@ public void run() { } }.start(); } - while(counter.get() < docCount) { + while (counter.get() < docCount) { Thread.sleep(1); } LOG.info("Multi-threaded Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size()); + } private static void runAggregation(final AtomicInteger counter, final Aggregation aggregation, final int limit, final int batchId) {