From b75cd1a5581d00c4e2e7aa7525e5453cb8230a75 Mon Sep 17 00:00:00 2001 From: SriramKeerthi Date: Wed, 30 Nov 2016 01:01:14 -0800 Subject: [PATCH] Adding support for new aggregations, renaming classes --- README.md | 9 +- jaggr/jaggr-bench/pom.xml | 2 +- .../com/caffinc/jaggr/bench/Benchmark.java | 171 +++++++------ jaggr/jaggr-utils/pom.xml | 2 +- ...ingIterator.java => JsonFileIterator.java} | 27 ++- ...{JsonFileReader.java => JsonFileUtil.java} | 4 +- .../com/caffinc/jaggr/utils/JsonIterator.java | 2 +- ...torTest.java => JsonFileIteratorTest.java} | 8 +- ...eReaderTest.java => JsonFileUtilTest.java} | 10 +- jaggr/jaggr/pom.xml | 8 +- .../caffinc/jaggr/core/entities/Tuple3.java | 19 ++ .../core/operations/FirstObjectOperation.java | 37 +++ .../core/operations/LastObjectOperation.java | 36 +++ .../core/operations/StdDevPopOperation.java | 61 +++++ .../jaggr/core/operations/TopNOperation.java | 46 ++++ .../caffinc/jaggr/core/AggregationTest.java | 227 +++++------------- .../jaggr/core/BatchAggregationTest.java | 4 +- jaggr/pom.xml | 2 +- 18 files changed, 401 insertions(+), 274 deletions(-) rename jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/{JsonStringIterator.java => JsonFileIterator.java} (83%) rename jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/{JsonFileReader.java => JsonFileUtil.java} (93%) rename jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/{JsonStringIteratorTest.java => JsonFileIteratorTest.java} (85%) rename jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/{JsonFileReaderTest.java => JsonFileUtilTest.java} (87%) create mode 100644 jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/entities/Tuple3.java create mode 100644 jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/operations/FirstObjectOperation.java create mode 100644 jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/operations/LastObjectOperation.java create mode 100644 jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/operations/StdDevPopOperation.java create mode 100644 jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/operations/TopNOperation.java diff --git a/README.md b/README.md index 540e663..848af42 100644 --- a/README.md +++ b/README.md @@ -12,13 +12,13 @@ jaggr is on Bintray and Maven Central (Soon): com.caffinc jaggr - 0.4.0 + 0.5.0 com.caffinc jaggr-utils - 0.4.0 + 0.5.0 ### Aggregating documents @@ -162,6 +162,11 @@ However the `getFinalResult()` method must be called just once to get the final 5. Average 6. Collect as List 7. Collect as Set +8. First Object +9. Last Object +10. Standard Deviation (Population) +11. Top N Objects + ## Tests diff --git a/jaggr/jaggr-bench/pom.xml b/jaggr/jaggr-bench/pom.xml index 14e210e..de880c2 100644 --- a/jaggr/jaggr-bench/pom.xml +++ b/jaggr/jaggr-bench/pom.xml @@ -5,7 +5,7 @@ jaggr-parent com.caffinc - 0.4.0 + 0.5.0 4.0.0 diff --git a/jaggr/jaggr-bench/src/main/java/com/caffinc/jaggr/bench/Benchmark.java b/jaggr/jaggr-bench/src/main/java/com/caffinc/jaggr/bench/Benchmark.java index 59ea100..46f8ca1 100644 --- a/jaggr/jaggr-bench/src/main/java/com/caffinc/jaggr/bench/Benchmark.java +++ b/jaggr/jaggr-bench/src/main/java/com/caffinc/jaggr/bench/Benchmark.java @@ -2,17 +2,19 @@ import com.caffinc.jaggr.core.Aggregation; import com.caffinc.jaggr.core.AggregationBuilder; -import com.caffinc.jaggr.core.operations.CollectSetOperation; +import com.caffinc.jaggr.core.operations.AverageOperation; +import com.caffinc.jaggr.core.operations.CountOperation; +import com.caffinc.jaggr.core.operations.StdDevPopOperation; +import com.caffinc.jaggr.core.operations.SumOperation; +import com.caffinc.jaggr.utils.JsonFileIterator; import com.caffinc.jaggr.utils.JsonIterator; -import com.mongodb.BasicDBObject; -import com.mongodb.DBCollection; -import com.mongodb.DBObject; -import com.mongodb.MongoClient; +import com.google.gson.Gson; +import com.mongodb.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.CountDownLatch; /** * @author srira @@ -30,29 +32,35 @@ public class Benchmark { "STRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRING"; public static void main(String[] args) throws Exception { -// generateDocuments(BENCHMARK_COLLECTION, 1000000, getFieldDefinitions()); +// generateDocuments(BENCHMARK_COLLECTION, 10000000, getFieldDefinitions()); final Aggregation aggregation = new AggregationBuilder() - .setGroupBy("name") - .addOperation("groupedIds", new CollectSetOperation("_id")) +// .setGroupBy("name") +// .addOperation("groupedIds", new CollectSetOperation("_id")) + .addOperation("sum", new SumOperation("appeal")) + .addOperation("avg", new AverageOperation("age")) + .addOperation("count", new CountOperation()) + .addOperation("stddev", new StdDevPopOperation("age")) .getAggregation(); long startTime; - LOG.info("Computing read time"); + List> result; + +// LOG.info("Computing read time"); +// startTime = System.currentTimeMillis(); +// Iterator> dbObjectIterator = new JsonIterator(BENCHMARK_COLLECTION.find().iterator()) { +// @Override +// public Map toJson(DBObject element) { +// return element.toMap(); +// } +// }; +// List> inMemList = new ArrayList<>(); +// while (dbObjectIterator.hasNext()) { +// inMemList.add(dbObjectIterator.next()); +// } +// LOG.info("Read time: {}ms {} docs", (System.currentTimeMillis() - startTime), inMemList.size()); + + LOG.info("Starting Mongo Cursor aggregation"); startTime = System.currentTimeMillis(); - Iterator> dbObjectIterator = new JsonIterator(BENCHMARK_COLLECTION.find().iterator()) { - @Override - public Map toJson(DBObject element) { - return element.toMap(); - } - }; - List> inMemList = new ArrayList<>(); - while (dbObjectIterator.hasNext()) { - inMemList.add(dbObjectIterator.next()); - } - LOG.info("Read time: {}ms {} docs", (System.currentTimeMillis() - startTime), inMemList.size()); - - LOG.info("Starting aggregation"); - startTime = System.currentTimeMillis(); - List> result = aggregation.aggregate(new JsonIterator(BENCHMARK_COLLECTION.find().iterator()) { + result = aggregation.aggregate(new JsonIterator(BENCHMARK_COLLECTION.find()) { @Override public Map toJson(DBObject element) { return element.toMap(); @@ -60,69 +68,86 @@ public Map toJson(DBObject element) { }); LOG.info("Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size()); - startTime = System.currentTimeMillis(); + LOG.info("Result: " + new Gson().toJson(result)); + + +// LOG.info("Starting in-memory aggregation"); +// startTime = System.currentTimeMillis(); +// result = aggregation.aggregate(inMemList); +// LOG.info("In-memory Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size()); +// +// LOG.info("Starting multithreaded aggregation"); +// int docCount = (int) BENCHMARK_COLLECTION.count(); +// int threadCount = 10; +// final int limit = Double.valueOf(Math.ceil(((double) docCount) / threadCount)).intValue(); +// final CountDownLatch latch = new CountDownLatch(docCount); +// startTime = System.currentTimeMillis(); +// for (int i = 0; i < threadCount; i++) { +// final int batchId = i; +// new Thread() { +// @Override +// public void run() { +// runAggregation(latch, aggregation, limit, batchId); +// } +// }.start(); +// } +// latch.await(); +// LOG.info("Multi-threaded Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size()); + LOG.info("Starting native aggregation"); + startTime = System.currentTimeMillis(); List> mongoResults = new ArrayList<>(); - for (DBObject dbObject : BENCHMARK_COLLECTION.aggregate( - Arrays.asList( - new BasicDBObject("$group", new BasicDBObject("_id", "$name").append("groupedIds", new BasicDBObject("$addToSet", "$_id"))) - ) - ).results()) { + for (DBObject dbObject : new Iterable() { + @Override + public Iterator iterator() { + return BENCHMARK_COLLECTION.aggregate(Arrays.asList( + new BasicDBObject("$group", + new BasicDBObject("_id", null) +// .append("groupedIds", new BasicDBObject("$addToSet", "$_id")) + .append("sum", new BasicDBObject("$sum", "$appeal")) + .append("avg", new BasicDBObject("$avg", "$age")) + .append("stddev", new BasicDBObject("$stdDevPop", "$age")) + .append("count", new BasicDBObject("$sum", 1))) + + ), AggregationOptions.builder().allowDiskUse(true).build()); + } + }) { mongoResults.add(dbObject.toMap()); } LOG.info("Mongo Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), mongoResults.size()); + LOG.info("Result: " + new Gson().toJson(mongoResults)); - LOG.info("Starting aggregation"); + LOG.info("Aggregating file"); startTime = System.currentTimeMillis(); - result = aggregation.aggregate(inMemList); - LOG.info("In-memory Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size()); - - int docCount = inMemList.size(); - int threadCount = 10; - final int limit = Double.valueOf(Math.ceil(((double) docCount) / threadCount)).intValue(); - final AtomicInteger counter = new AtomicInteger(0); - LOG.info("Starting multithreaded aggregation"); + result = aggregation.aggregate(new JsonFileIterator("C:\\Users\\srira\\Documents\\caffinc\\playarea\\bm.json")); + LOG.info("File Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size()); + + LOG.info("Reading file"); startTime = System.currentTimeMillis(); - for (int i = 0; i < threadCount; i++) { - final int batchId = i; - new Thread() { - @Override - public void run() { - runAggregation(counter, aggregation, limit, batchId); + int count = 0; + for (Map obj : new Iterable>() { + @Override + public Iterator> iterator() { + try { + return new JsonFileIterator("C:\\Users\\srira\\Documents\\caffinc\\playarea\\bm.json"); + } catch (Exception e) { + throw new IllegalArgumentException(e); } - }.start(); - } - while (counter.get() < docCount) { - Thread.sleep(1); + } + }) { + count = Double.valueOf((double) obj.get("_id")).intValue(); } - LOG.info("Multi-threaded Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size()); - + LOG.info("File Read time: {}ms {} docs", (System.currentTimeMillis() - startTime), count); } - private static void runAggregation(final AtomicInteger counter, final Aggregation aggregation, final int limit, final int batchId) { - LOG.info("Starting aggregation"); + private static void runAggregation(final CountDownLatch latch, final Aggregation aggregation, final int limit, final int batchId) { long startTime = System.currentTimeMillis(); - List> result = aggregation.aggregate(new Iterator>() { - private Iterator objectIterator = BENCHMARK_COLLECTION.find().skip(limit * batchId).limit(limit).iterator(); - + List> result = aggregation.aggregate(new JsonIterator(BENCHMARK_COLLECTION.find().skip(limit * batchId).limit(limit)) { @Override - public boolean hasNext() { - return objectIterator.hasNext(); - } - - @Override - public Map next() { - counter.incrementAndGet(); - if (objectIterator.hasNext()) - return objectIterator.next().toMap(); - else - return null; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); + public Map toJson(DBObject element) { + latch.countDown(); + return element.toMap(); } }); LOG.info("Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size()); diff --git a/jaggr/jaggr-utils/pom.xml b/jaggr/jaggr-utils/pom.xml index c821038..23587f8 100644 --- a/jaggr/jaggr-utils/pom.xml +++ b/jaggr/jaggr-utils/pom.xml @@ -5,7 +5,7 @@ jaggr-parent com.caffinc - 0.4.0 + 0.5.0 4.0.0 diff --git a/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonStringIterator.java b/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonFileIterator.java similarity index 83% rename from jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonStringIterator.java rename to jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonFileIterator.java index 7908c9a..84e7f44 100644 --- a/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonStringIterator.java +++ b/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonFileIterator.java @@ -3,6 +3,9 @@ import com.google.gson.Gson; import java.io.*; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -14,7 +17,7 @@ * @author Sriram * @since 11/27/2016 */ -public class JsonStringIterator implements Iterator>, Closeable { +public class JsonFileIterator implements Iterator>, Closeable { private final BufferedReader bufferedReader; private String cachedLine; private boolean finished = false; @@ -26,8 +29,8 @@ public class JsonStringIterator implements Iterator>, Closea * @param fileName the fileName to read from * @throws IOException thrown if there is a problem accessing the file */ - public JsonStringIterator(final String fileName) throws IOException { - this(new BufferedReader(new FileReader(fileName))); + public JsonFileIterator(final String fileName) throws IOException { + this(Files.newBufferedReader(Paths.get(fileName), Charset.defaultCharset())); } /** @@ -36,7 +39,7 @@ public JsonStringIterator(final String fileName) throws IOException { * @param reader the Reader to read from, not null * @throws IllegalArgumentException if the reader is null */ - public JsonStringIterator(final Reader reader) throws IllegalArgumentException { + public JsonFileIterator(final Reader reader) throws IllegalArgumentException { if (reader == null) { throw new IllegalArgumentException("Reader must not be null"); } @@ -62,15 +65,13 @@ public boolean hasNext() { return false; } else { try { - while (true) { - final String line = bufferedReader.readLine(); - if (line == null) { - finished = true; - return false; - } - cachedLine = line; - return true; + final String line = bufferedReader.readLine(); + if (line == null) { + finished = true; + return false; } + cachedLine = line; + return true; } catch (final IOException ioe) { close(); throw new IllegalStateException(ioe); @@ -126,7 +127,7 @@ public void close() { * @throws UnsupportedOperationException always */ public void remove() { - throw new UnsupportedOperationException("Remove unsupported on JsonStringIterator"); + throw new UnsupportedOperationException("Remove unsupported on JsonFileIterator"); } } diff --git a/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonFileReader.java b/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonFileUtil.java similarity index 93% rename from jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonFileReader.java rename to jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonFileUtil.java index 5d7bcbc..c9b9cc9 100644 --- a/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonFileReader.java +++ b/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonFileUtil.java @@ -17,7 +17,7 @@ * @author Sriram * @since 11/26/2016 */ -public class JsonFileReader { +public class JsonFileUtil { private static final Gson gson = new Gson(); /** @@ -31,7 +31,7 @@ public static List> readJsonFromResource(final String resour List> jsonList = new ArrayList<>(); String json; try (BufferedReader br = new BufferedReader( - new InputStreamReader(JsonFileReader.class.getClassLoader().getResourceAsStream(resourceName)))) { + new InputStreamReader(JsonFileUtil.class.getClassLoader().getResourceAsStream(resourceName)))) { while ((json = br.readLine()) != null) { jsonList.add(gson.fromJson(json, HashMap.class)); } diff --git a/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonIterator.java b/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonIterator.java index b4b02a4..40dd2c9 100644 --- a/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonIterator.java +++ b/jaggr/jaggr-utils/src/main/java/com/caffinc/jaggr/utils/JsonIterator.java @@ -55,7 +55,7 @@ public Map next() { * @throws UnsupportedOperationException always */ public void remove() { - throw new UnsupportedOperationException("Remove unsupported on JsonStringIterator"); + throw new UnsupportedOperationException("Remove unsupported on JsonFileIterator"); } diff --git a/jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonStringIteratorTest.java b/jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonFileIteratorTest.java similarity index 85% rename from jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonStringIteratorTest.java rename to jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonFileIteratorTest.java index f99417c..8c18b25 100644 --- a/jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonStringIteratorTest.java +++ b/jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonFileIteratorTest.java @@ -12,12 +12,12 @@ import java.util.*; /** - * Tests for the JsonStringIterator + * Tests for the JsonFileIterator * * @author Sriram * @since 11/27/2016 */ -public class JsonStringIteratorTest { +public class JsonFileIteratorTest { private static final String TEMP_DIR = System.getProperty("java.io.tmpdir"); private static final Random RANDOM = new Random(); private static final Gson GSON = new Gson(); @@ -37,9 +37,9 @@ public void testJsonFileIterator() throws Exception { br.write(GSON.toJson(json) + "\n"); } } - try (JsonStringIterator jsonStringIterator = new JsonStringIterator(tempFilePath.toString())) { + try (JsonFileIterator jsonFileIterator = new JsonFileIterator(tempFilePath.toString())) { for (Map expected : expectedData) { - Map actual = jsonStringIterator.next(); + Map actual = jsonFileIterator.next(); Assert.assertEquals("Value should match value written to file", expected, actual); } } diff --git a/jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonFileReaderTest.java b/jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonFileUtilTest.java similarity index 87% rename from jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonFileReaderTest.java rename to jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonFileUtilTest.java index acba751..a7f8904 100644 --- a/jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonFileReaderTest.java +++ b/jaggr/jaggr-utils/src/test/java/com/caffinc/jaggr/utils/JsonFileUtilTest.java @@ -12,12 +12,12 @@ import java.util.*; /** - * Tests the JsonFileReader utility + * Tests the JsonFileUtil utility * * @author Sriram * @since 11/27/2016 */ -public class JsonFileReaderTest { +public class JsonFileUtilTest { private static final String TEMP_DIR = System.getProperty("java.io.tmpdir"); private static final Random RANDOM = new Random(); private static final Gson GSON = new Gson(); @@ -32,7 +32,7 @@ public void testGetFileLines() throws Exception { for (int i = 0; i < 10; i++) br.write(i + "\n"); } - lines = JsonFileReader.getFileLines(tempFilePath.toString()); + lines = JsonFileUtil.getFileLines(tempFilePath.toString()); for (int i = 0; i < 10; i++) { Assert.assertEquals("Value should match value written to file", String.valueOf(i), lines.get(i)); } @@ -56,7 +56,7 @@ public void testReadJsonFromFile() throws Exception { br.write(GSON.toJson(json) + "\n"); } } - List> jsonLines = JsonFileReader.readJsonFromFile(tempFilePath.toString()); + List> jsonLines = JsonFileUtil.readJsonFromFile(tempFilePath.toString()); for (int i = 0; i < 10; i++) { Assert.assertEquals("Value should match value written to file", expectedData.get(i), jsonLines.get(i)); } @@ -67,7 +67,7 @@ public void testReadJsonFromFile() throws Exception { @Test public void testReadJsonFromResource() throws Exception { - List> result = JsonFileReader.readJsonFromResource("raw.json"); + List> result = JsonFileUtil.readJsonFromResource("raw.json"); Assert.assertEquals("There must be ten lines in the file", 10, result.size()); for (Map obj : result) { Assert.assertEquals("There must be 9 elements in the object", 9, obj.size()); diff --git a/jaggr/jaggr/pom.xml b/jaggr/jaggr/pom.xml index be5e9a7..f1379d4 100644 --- a/jaggr/jaggr/pom.xml +++ b/jaggr/jaggr/pom.xml @@ -5,7 +5,7 @@ jaggr-parent com.caffinc - 0.4.0 + 0.5.0 4.0.0 @@ -80,5 +80,11 @@ ${project.parent.version} test + + org.apache.commons + commons-math3 + 3.6.1 + test + \ No newline at end of file diff --git a/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/entities/Tuple3.java b/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/entities/Tuple3.java new file mode 100644 index 0000000..91c047a --- /dev/null +++ b/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/entities/Tuple3.java @@ -0,0 +1,19 @@ +package com.caffinc.jaggr.core.entities; + +/** + * Holds a 3-tuple of values + * + * @author Sriram + * @since 11/26/2016 + */ +public class Tuple3 { + public T1 _1; + public T2 _2; + public T3 _3; + + public Tuple3(T1 _1, T2 _2, T3 _3) { + this._1 = _1; + this._2 = _2; + this._3 = _3; + } +} diff --git a/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/operations/FirstObjectOperation.java b/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/operations/FirstObjectOperation.java new file mode 100644 index 0000000..fbfe438 --- /dev/null +++ b/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/operations/FirstObjectOperation.java @@ -0,0 +1,37 @@ +package com.caffinc.jaggr.core.operations; + + +import com.caffinc.jaggr.core.utils.FieldValueExtractor; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Gets the first non-null object in the stream + * + * @author Sriram + * @since 11/26/2016 + */ +public class FirstObjectOperation implements Operation { + private String[] field; + private String unsplitField; + + public FirstObjectOperation(String field) { + this.unsplitField = field; + this.field = field.split("\\."); + } + + @Override + public Object aggregate(Object previousAccumulatedValue, Map object) { + if (previousAccumulatedValue == null) + return FieldValueExtractor.getValue(field, object); + else + return previousAccumulatedValue; + } + + @Override + public Object result(Object accumulatedValue) { + return accumulatedValue; + } +} diff --git a/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/operations/LastObjectOperation.java b/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/operations/LastObjectOperation.java new file mode 100644 index 0000000..c963073 --- /dev/null +++ b/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/operations/LastObjectOperation.java @@ -0,0 +1,36 @@ +package com.caffinc.jaggr.core.operations; + + +import com.caffinc.jaggr.core.utils.FieldValueExtractor; + +import java.util.Map; + +/** + * Gets the last non-null object in the stream + * + * @author Sriram + * @since 11/26/2016 + */ +public class LastObjectOperation implements Operation { + private String[] field; + private String unsplitField; + + public LastObjectOperation(String field) { + this.unsplitField = field; + this.field = field.split("\\."); + } + + @Override + public Object aggregate(Object previousAccumulatedValue, Map object) { + Object value = FieldValueExtractor.getValue(field, object); + if (value != null) + return value; + else + return previousAccumulatedValue; + } + + @Override + public Object result(Object accumulatedValue) { + return accumulatedValue; + } +} diff --git a/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/operations/StdDevPopOperation.java b/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/operations/StdDevPopOperation.java new file mode 100644 index 0000000..7b30675 --- /dev/null +++ b/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/operations/StdDevPopOperation.java @@ -0,0 +1,61 @@ +package com.caffinc.jaggr.core.operations; + +import com.caffinc.jaggr.core.entities.Tuple3; +import com.caffinc.jaggr.core.utils.FieldValueExtractor; + +import java.util.Map; + +/** + * Computes Standard Deviation of the Population using Welford's method + * + * @author Sriram + * @since 11/29/2016 + */ +public class StdDevPopOperation implements Operation { + private String[] field; + private String unsplitField; + + public StdDevPopOperation(String field) { + this.unsplitField = field; + this.field = field.split("\\."); + } + + @Override + public Object aggregate(Object previousAccumulatedValue, Map object) { + Object value = FieldValueExtractor.getValue(field, object); + if (value == null) + return previousAccumulatedValue; + Tuple3 accumulator = previousAccumulatedValue == null + ? new Tuple3<>(0.0d, 0.0d, 0) + : (Tuple3) previousAccumulatedValue; + + Double parsedValue = 0d; + if (value instanceof Double) { + parsedValue = (Double) value; + } else if (value instanceof Float) { + parsedValue += (Float) value; + } else if (value instanceof Long) { + parsedValue += (Long) value; + } else if (value instanceof Integer) { + parsedValue += (Integer) value; + } else { + throw new IllegalArgumentException("Field " + unsplitField + " isn't a Double, Float, Long or Integer"); + } + accumulator._3++; + double tmpM = accumulator._1; + accumulator._1 += (parsedValue - tmpM) / accumulator._3; + accumulator._2 += (parsedValue - tmpM) * (parsedValue - accumulator._1); + return accumulator; + } + + @Override + public Object result(Object accumulatedValue) { + Tuple3 accumulator = accumulatedValue == null + ? null + : (Tuple3) accumulatedValue; + if (accumulator == null || accumulator._3 == 1 || accumulator._2 < 0) + return null; + else + return Math.sqrt(accumulator._2 / (accumulator._3 - 1)); + } +} diff --git a/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/operations/TopNOperation.java b/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/operations/TopNOperation.java new file mode 100644 index 0000000..e7e7623 --- /dev/null +++ b/jaggr/jaggr/src/main/java/com/caffinc/jaggr/core/operations/TopNOperation.java @@ -0,0 +1,46 @@ +package com.caffinc.jaggr.core.operations; + +import com.caffinc.jaggr.core.utils.FieldValueExtractor; + +import java.util.*; + +/** + * Computes the Top N objects in the data + * + * @author Sriram + * @since 11/30/2016 + */ +public class TopNOperation implements Operation { + private String[] field; + private String unsplitField; + private int n; + private Comparator comparator; + + public TopNOperation(String field, int n, Comparator comparator) { + this.unsplitField = field; + this.field = field.split("\\."); + this.n = n; + this.comparator = comparator; + } + + @Override + public Object aggregate(Object previousAccumulatedValue, Map object) { + T value = (T) FieldValueExtractor.getValue(field, object); + if (value == null) + return null; + PriorityQueue priorityQueue = previousAccumulatedValue == null ? new PriorityQueue<>(n, comparator) + : (PriorityQueue) previousAccumulatedValue; + priorityQueue.add(value); + if (priorityQueue.size() > n) + priorityQueue.poll(); + return priorityQueue; + } + + @Override + public Object result(Object accumulatedValue) { + if (accumulatedValue == null) + return null; + else + return new ArrayList<>((Collection) accumulatedValue); + } +} diff --git a/jaggr/jaggr/src/test/java/com/caffinc/jaggr/core/AggregationTest.java b/jaggr/jaggr/src/test/java/com/caffinc/jaggr/core/AggregationTest.java index 6da4a50..bd5eaa4 100644 --- a/jaggr/jaggr/src/test/java/com/caffinc/jaggr/core/AggregationTest.java +++ b/jaggr/jaggr/src/test/java/com/caffinc/jaggr/core/AggregationTest.java @@ -2,8 +2,9 @@ import com.caffinc.jaggr.core.operations.*; import com.caffinc.jaggr.core.utils.FieldValueExtractor; -import com.caffinc.jaggr.utils.JsonFileReader; +import com.caffinc.jaggr.utils.JsonFileUtil; import com.google.gson.Gson; +import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation; import org.junit.BeforeClass; import org.junit.Test; @@ -27,7 +28,7 @@ private static T roughen(Object o, Class t) { @BeforeClass public static void setUp() throws Exception { - jsonList = JsonFileReader.readJsonFromResource("raw.json"); + jsonList = JsonFileUtil.readJsonFromResource("raw.json"); } @Test @@ -71,7 +72,7 @@ public void testNestedGrouping() throws Exception { @Test public void testCountOperation() throws Exception { String field = "f"; - Set> expected = new HashSet<>(JsonFileReader.readJsonFromResource("countResult.json")); + Set> expected = new HashSet<>(JsonFileUtil.readJsonFromResource("countResult.json")); Aggregation aggregation = new AggregationBuilder() .setGroupBy(field) @@ -85,7 +86,7 @@ public void testCountOperation() throws Exception { public void testMaxOperation() throws Exception { String field = "f"; String maxField = "test.f"; - Set> expected = new HashSet<>(JsonFileReader.readJsonFromResource("maxResult.json")); + Set> expected = new HashSet<>(JsonFileUtil.readJsonFromResource("maxResult.json")); Aggregation aggregation = new AggregationBuilder() .setGroupBy(field) @@ -99,7 +100,7 @@ public void testMaxOperation() throws Exception { public void testMinOperation() throws Exception { String field = "f"; String minField = "test.f"; - Set> expected = new HashSet<>(JsonFileReader.readJsonFromResource("minResult.json")); + Set> expected = new HashSet<>(JsonFileUtil.readJsonFromResource("minResult.json")); Aggregation aggregation = new AggregationBuilder() .setGroupBy(field) @@ -113,7 +114,7 @@ public void testMinOperation() throws Exception { public void testCollectOperation() throws Exception { String field = "f"; String collectField = "_id"; - Set> expected = new HashSet<>(JsonFileReader.readJsonFromResource("collectResult.json")); + Set> expected = new HashSet<>(JsonFileUtil.readJsonFromResource("collectResult.json")); Aggregation aggregation = new AggregationBuilder() .setGroupBy(field) @@ -128,7 +129,7 @@ public void testCollectSetOperation() throws Exception { String field = "f"; String collectField = "test.f"; Map expectedMap = new HashMap<>(); - for (Map expectedObject : JsonFileReader.readJsonFromResource("collectSetResult.json")) { + for (Map expectedObject : JsonFileUtil.readJsonFromResource("collectSetResult.json")) { expectedMap.put(String.valueOf(expectedObject.get("_id")), new HashSet((List) expectedObject.get("set"))); } @@ -147,7 +148,7 @@ public void testCollectSetOperation() throws Exception { public void testSumOperation() throws Exception { String field = "f"; String sumField = "test.f"; - Set> expected = new HashSet<>(JsonFileReader.readJsonFromResource("sumResult.json")); + Set> expected = new HashSet<>(JsonFileUtil.readJsonFromResource("sumResult.json")); Aggregation aggregation = new AggregationBuilder() .setGroupBy(field) @@ -161,7 +162,7 @@ public void testSumOperation() throws Exception { public void testAverageOperation() throws Exception { String field = "f"; String avgField = "test.f"; - Set> expected = new HashSet<>(JsonFileReader.readJsonFromResource("avgResult.json")); + Set> expected = new HashSet<>(JsonFileUtil.readJsonFromResource("avgResult.json")); Aggregation aggregation = new AggregationBuilder() .setGroupBy(field) @@ -173,7 +174,7 @@ public void testAverageOperation() throws Exception { @Test public void testOperationWithoutGrouping() throws Exception { - Set> expected = new HashSet<>(JsonFileReader.readJsonFromResource("grouplessResult.json")); + Set> expected = new HashSet<>(JsonFileUtil.readJsonFromResource("grouplessResult.json")); Aggregation aggregation = new AggregationBuilder() .addOperation("count", new CountOperation()) @@ -189,7 +190,7 @@ public void testMultiOperation() throws Exception { String sumField = "test.f"; String minField = "test.f"; String maxField = "test.f"; - Set> expected = new HashSet<>(JsonFileReader.readJsonFromResource("multiResult.json")); + Set> expected = new HashSet<>(JsonFileUtil.readJsonFromResource("multiResult.json")); Aggregation aggregation = new AggregationBuilder() .setGroupBy(field) @@ -209,7 +210,7 @@ public void testCollectStrings() throws Exception { String collectField = "f"; Map expectedMap1 = new HashMap<>(); Map expectedMap2 = new HashMap<>(); - for (Map expectedObject : JsonFileReader.readJsonFromResource("collectStringsResult.json")) { + for (Map expectedObject : JsonFileUtil.readJsonFromResource("collectStringsResult.json")) { expectedMap1.put(String.valueOf(expectedObject.get("_id")), new HashSet((List) expectedObject.get("set"))); expectedMap2.put(String.valueOf(expectedObject.get("_id")), expectedObject.get("list")); } @@ -232,7 +233,7 @@ public void testCollectStrings() throws Exception { @Test - public void testIterativeSimpleGrouping() throws Exception { + public void testIterativeAggregation() throws Exception { String field = "f"; Set expectedResult = new HashSet<>(); for (Map obj : jsonList) { @@ -251,183 +252,73 @@ public void testIterativeSimpleGrouping() throws Exception { } @Test - public void testIterativeNestedGrouping() throws Exception { - String field = "test.f"; - Set expectedResult = new HashSet<>(); - for (Map obj : jsonList) { - expectedResult.add(String.valueOf(FieldValueExtractor.getValue(field, obj))); - } - - Set result = new HashSet<>(); - Aggregation aggregation = new AggregationBuilder() - .setGroupBy(field).getAggregation(); - List> resultList = aggregation.aggregate(jsonList.iterator()); - for (Map resultObj : resultList) { - result.add(resultObj.get("_id")); - } - - assertEquals("Grouping by field should match", expectedResult, result); - } - - @Test - public void testIterativeCountOperation() throws Exception { - String field = "f"; - Set> expected = new HashSet<>(JsonFileReader.readJsonFromResource("countResult.json")); - - Aggregation aggregation = new AggregationBuilder() - .setGroupBy(field) - .addOperation("count", new CountOperation()) - .getAggregation(); - Set> result = roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class); - assertEquals("Counts should be as expected", expected, result); - } - - @Test - public void testIterativeMaxOperation() throws Exception { - String field = "f"; - String maxField = "test.f"; - Set> expected = new HashSet<>(JsonFileReader.readJsonFromResource("maxResult.json")); - - Aggregation aggregation = new AggregationBuilder() - .setGroupBy(field) - .addOperation("max", new MaxOperation(maxField)) - .getAggregation(); - Set> result = roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class); - assertEquals("Max should be as expected", expected, result); - } - - @Test - public void testIterativeMinOperation() throws Exception { - String field = "f"; - String minField = "test.f"; - Set> expected = new HashSet<>(JsonFileReader.readJsonFromResource("minResult.json")); + public void testFirstObjectOperation() throws Exception { + List> expected = JsonFileUtil.readJsonFromResource("raw.json"); Aggregation aggregation = new AggregationBuilder() - .setGroupBy(field) - .addOperation("min", new MinOperation(minField)) + .addOperation("first", new FirstObjectOperation("test.f")) .getAggregation(); - Set> result = roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class); - assertEquals("Min should be as expected", expected, result); + List> result = aggregation.aggregate(jsonList); + assertEquals("First object should be as expected", + ((Map) expected.get(0).get("test")).get("f"), + result.get(0).get("first")); } @Test - public void testIterativeCollectOperation() throws Exception { - String field = "f"; - String collectField = "_id"; - Set> expected = new HashSet<>(JsonFileReader.readJsonFromResource("collectResult.json")); + public void testLastObjectOperation() throws Exception { + List> expected = JsonFileUtil.readJsonFromResource("raw.json"); Aggregation aggregation = new AggregationBuilder() - .setGroupBy(field) - .addOperation("list", new CollectOperation(collectField)) + .addOperation("last", new LastObjectOperation("test.f")) .getAggregation(); - Set> result = roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class); - assertEquals("Collected lists should be as expected", expected, result); + List> result = aggregation.aggregate(jsonList); + assertEquals("Last object should be as expected", + ((Map) expected.get(expected.size() - 1).get("test")).get("f"), + result.get(0).get("last")); } @Test - public void testIterativeCollectSetOperation() throws Exception { - String field = "f"; - String collectField = "test.f"; - Map expectedMap = new HashMap<>(); - for (Map expectedObject : JsonFileReader.readJsonFromResource("collectSetResult.json")) { - expectedMap.put(String.valueOf(expectedObject.get("_id")), new HashSet((List) expectedObject.get("set"))); + public void testStdDevOperation() throws Exception { + List> expected = JsonFileUtil.readJsonFromResource("raw.json"); + double[] doubleList = new double[expected.size()]; + int i = 0; + for (Map o : expected) { + doubleList[i++] = (double) ((Map) o.get("test")).get("f"); } - - Aggregation aggregation = new AggregationBuilder() - .setGroupBy(field) - .addOperation("set", new CollectSetOperation(collectField)) - .getAggregation(); - Map resultMap = new HashMap<>(); - for (Map resultObject : (Set>) roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class)) { - resultMap.put(String.valueOf(resultObject.get("_id")), new HashSet((List) resultObject.get("set"))); - } - assertEquals("Collected sets should be as expected", expectedMap, resultMap); - } - - @Test - public void testIterativeSumOperation() throws Exception { - String field = "f"; - String sumField = "test.f"; - Set> expected = new HashSet<>(JsonFileReader.readJsonFromResource("sumResult.json")); - + double stddev = new StandardDeviation().evaluate(doubleList); Aggregation aggregation = new AggregationBuilder() - .setGroupBy(field) - .addOperation("sum", new SumOperation(sumField)) + .addOperation("stddev", new StdDevPopOperation("test.f")) .getAggregation(); - Set> result = roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class); - assertEquals("Sum should be as expected", expected, result); + List> result = aggregation.aggregate(jsonList); + assertEquals("Standard Deviation should be as expected", + stddev, + result.get(0).get("stddev")); } - @Test - public void testIterativeAverageOperation() throws Exception { - String field = "f"; - String avgField = "test.f"; - Set> expected = new HashSet<>(JsonFileReader.readJsonFromResource("avgResult.json")); - - Aggregation aggregation = new AggregationBuilder() - .setGroupBy(field) - .addOperation("avg", new AverageOperation(avgField)) - .getAggregation(); - Set> result = roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class); - assertEquals("Average should be as expected", expected, result); - } - - @Test - public void testIterativeOperationWithoutGrouping() throws Exception { - Set> expected = new HashSet<>(JsonFileReader.readJsonFromResource("grouplessResult.json")); - - Aggregation aggregation = new AggregationBuilder() - .addOperation("count", new CountOperation()) - .getAggregation(); - Set> result = roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class); - assertEquals("Counts without grouping should be as expected", expected, result); - } - - @Test - public void testIterativeMultiOperation() throws Exception { - String field = "f"; - String avgField = "test.f"; - String sumField = "test.f"; - String minField = "test.f"; - String maxField = "test.f"; - Set> expected = new HashSet<>(JsonFileReader.readJsonFromResource("multiResult.json")); - - Aggregation aggregation = new AggregationBuilder() - .setGroupBy(field) - .addOperation("avg", new AverageOperation(avgField)) - .addOperation("sum", new SumOperation(sumField)) - .addOperation("min", new MinOperation(minField)) - .addOperation("max", new MaxOperation(maxField)) - .addOperation("count", new CountOperation()) - .getAggregation(); - Set> result = roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class); - assertEquals("Multiple aggregations result should be as expected", expected, result); - } @Test - public void testIterativeCollectStrings() throws Exception { - String field = "test.f"; - String collectField = "f"; - Map expectedMap1 = new HashMap<>(); - Map expectedMap2 = new HashMap<>(); - for (Map expectedObject : JsonFileReader.readJsonFromResource("collectStringsResult.json")) { - expectedMap1.put(String.valueOf(expectedObject.get("_id")), new HashSet((List) expectedObject.get("set"))); - expectedMap2.put(String.valueOf(expectedObject.get("_id")), expectedObject.get("list")); + public void testTopNOperation() throws Exception { + Comparator comparator = new Comparator() { + @Override + public int compare(Double o1, Double o2) { + return Double.compare(o1, o2); + } + }; + List> expected = JsonFileUtil.readJsonFromResource("raw.json"); + List doubleList = new ArrayList<>(); + for (Map o : expected) { + doubleList.add((double) ((Map) o.get("test")).get("f")); } - + Collections.sort(doubleList, comparator); Aggregation aggregation = new AggregationBuilder() - .setGroupBy(field) - .addOperation("list", new CollectOperation(collectField)) - .addOperation("set", new CollectSetOperation(collectField)) + .addOperation("topn", new TopNOperation<>("test.f", 3, comparator)) .getAggregation(); - Map resultMap1 = new HashMap<>(); - Map resultMap2 = new HashMap<>(); - for (Map resultObject : (Set>) roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class)) { - resultMap1.put(String.valueOf(resultObject.get("_id")), new HashSet((List) resultObject.get("set"))); - resultMap2.put(String.valueOf(resultObject.get("_id")), resultObject.get("list")); + List result = (List) aggregation.aggregate(jsonList).get(0).get("topn"); + Collections.sort(result, comparator); + for (int i = 0; i < result.size(); i++) { + assertEquals("Top N should be as expected", + doubleList.get(doubleList.size() - 1 - i), + result.get(result.size() - 1 - i)); } - - assertEquals("Collect for Strings should work as expected", expectedMap1, resultMap1); - assertEquals("CollectSet for Strings should work as expected", expectedMap2, resultMap2); } } diff --git a/jaggr/jaggr/src/test/java/com/caffinc/jaggr/core/BatchAggregationTest.java b/jaggr/jaggr/src/test/java/com/caffinc/jaggr/core/BatchAggregationTest.java index fbabebd..d3b7019 100644 --- a/jaggr/jaggr/src/test/java/com/caffinc/jaggr/core/BatchAggregationTest.java +++ b/jaggr/jaggr/src/test/java/com/caffinc/jaggr/core/BatchAggregationTest.java @@ -1,7 +1,7 @@ package com.caffinc.jaggr.core; import com.caffinc.jaggr.core.operations.CountOperation; -import com.caffinc.jaggr.utils.JsonFileReader; +import com.caffinc.jaggr.utils.JsonFileUtil; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -21,7 +21,7 @@ public class BatchAggregationTest { @BeforeClass public static void setUp() throws Exception { - jsonData.addAll(JsonFileReader.readJsonFromResource("raw.json")); + jsonData.addAll(JsonFileUtil.readJsonFromResource("raw.json")); } @Test diff --git a/jaggr/pom.xml b/jaggr/pom.xml index 608bbe7..fbd7d61 100644 --- a/jaggr/pom.xml +++ b/jaggr/pom.xml @@ -7,7 +7,7 @@ com.caffinc jaggr-parent pom - 0.4.0 + 0.5.0 jaggr-utils jaggr