Skip to content

Commit

Permalink
Merge pull request #11 from caffinc/development
Browse files Browse the repository at this point in the history
Adding support for new aggregations, renaming classes
  • Loading branch information
SriramKeerthi authored Nov 30, 2016
2 parents 0d6de82 + b75cd1a commit 32fe5a2
Show file tree
Hide file tree
Showing 18 changed files with 401 additions and 274 deletions.
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ jaggr is on Bintray and Maven Central (Soon):
<dependency>
<groupId>com.caffinc</groupId>
<artifactId>jaggr</artifactId>
<version>0.4.0</version>
<version>0.5.0</version>
</dependency>

<dependency>
<groupId>com.caffinc</groupId>
<artifactId>jaggr-utils</artifactId>
<version>0.4.0</version>
<version>0.5.0</version>
</dependency>

### Aggregating documents
Expand Down Expand Up @@ -162,6 +162,11 @@ However the `getFinalResult()` method must be called just once to get the final
5. Average
6. Collect as List
7. Collect as Set
8. First Object
9. Last Object
10. Standard Deviation (Population)
11. Top N Objects


## Tests

Expand Down
2 changes: 1 addition & 1 deletion jaggr/jaggr-bench/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>jaggr-parent</artifactId>
<groupId>com.caffinc</groupId>
<version>0.4.0</version>
<version>0.5.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
171 changes: 98 additions & 73 deletions jaggr/jaggr-bench/src/main/java/com/caffinc/jaggr/bench/Benchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@

import com.caffinc.jaggr.core.Aggregation;
import com.caffinc.jaggr.core.AggregationBuilder;
import com.caffinc.jaggr.core.operations.CollectSetOperation;
import com.caffinc.jaggr.core.operations.AverageOperation;
import com.caffinc.jaggr.core.operations.CountOperation;
import com.caffinc.jaggr.core.operations.StdDevPopOperation;
import com.caffinc.jaggr.core.operations.SumOperation;
import com.caffinc.jaggr.utils.JsonFileIterator;
import com.caffinc.jaggr.utils.JsonIterator;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.google.gson.Gson;
import com.mongodb.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;

/**
* @author srira
Expand All @@ -30,99 +32,122 @@ public class Benchmark {
"STRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRING";

public static void main(String[] args) throws Exception {
// generateDocuments(BENCHMARK_COLLECTION, 1000000, getFieldDefinitions());
// generateDocuments(BENCHMARK_COLLECTION, 10000000, getFieldDefinitions());
final Aggregation aggregation = new AggregationBuilder()
.setGroupBy("name")
.addOperation("groupedIds", new CollectSetOperation("_id"))
// .setGroupBy("name")
// .addOperation("groupedIds", new CollectSetOperation("_id"))
.addOperation("sum", new SumOperation("appeal"))
.addOperation("avg", new AverageOperation("age"))
.addOperation("count", new CountOperation())
.addOperation("stddev", new StdDevPopOperation("age"))
.getAggregation();
long startTime;
LOG.info("Computing read time");
List<Map<String, Object>> result;

// LOG.info("Computing read time");
// startTime = System.currentTimeMillis();
// Iterator<Map<String, Object>> dbObjectIterator = new JsonIterator<DBObject>(BENCHMARK_COLLECTION.find().iterator()) {
// @Override
// public Map<String, Object> toJson(DBObject element) {
// return element.toMap();
// }
// };
// List<Map<String, Object>> inMemList = new ArrayList<>();
// while (dbObjectIterator.hasNext()) {
// inMemList.add(dbObjectIterator.next());
// }
// LOG.info("Read time: {}ms {} docs", (System.currentTimeMillis() - startTime), inMemList.size());

LOG.info("Starting Mongo Cursor aggregation");
startTime = System.currentTimeMillis();
Iterator<Map<String, Object>> dbObjectIterator = new JsonIterator<DBObject>(BENCHMARK_COLLECTION.find().iterator()) {
@Override
public Map<String, Object> toJson(DBObject element) {
return element.toMap();
}
};
List<Map<String, Object>> inMemList = new ArrayList<>();
while (dbObjectIterator.hasNext()) {
inMemList.add(dbObjectIterator.next());
}
LOG.info("Read time: {}ms {} docs", (System.currentTimeMillis() - startTime), inMemList.size());

LOG.info("Starting aggregation");
startTime = System.currentTimeMillis();
List<Map<String, Object>> result = aggregation.aggregate(new JsonIterator<DBObject>(BENCHMARK_COLLECTION.find().iterator()) {
result = aggregation.aggregate(new JsonIterator<DBObject>(BENCHMARK_COLLECTION.find()) {
@Override
public Map<String, Object> toJson(DBObject element) {
return element.toMap();
}
});
LOG.info("Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size());

startTime = System.currentTimeMillis();
LOG.info("Result: " + new Gson().toJson(result));


// LOG.info("Starting in-memory aggregation");
// startTime = System.currentTimeMillis();
// result = aggregation.aggregate(inMemList);
// LOG.info("In-memory Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size());
//
// LOG.info("Starting multithreaded aggregation");
// int docCount = (int) BENCHMARK_COLLECTION.count();
// int threadCount = 10;
// final int limit = Double.valueOf(Math.ceil(((double) docCount) / threadCount)).intValue();
// final CountDownLatch latch = new CountDownLatch(docCount);
// startTime = System.currentTimeMillis();
// for (int i = 0; i < threadCount; i++) {
// final int batchId = i;
// new Thread() {
// @Override
// public void run() {
// runAggregation(latch, aggregation, limit, batchId);
// }
// }.start();
// }
// latch.await();
// LOG.info("Multi-threaded Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size());

LOG.info("Starting native aggregation");
startTime = System.currentTimeMillis();
List<Map<String, Object>> mongoResults = new ArrayList<>();
for (DBObject dbObject : BENCHMARK_COLLECTION.aggregate(
Arrays.asList(
new BasicDBObject("$group", new BasicDBObject("_id", "$name").append("groupedIds", new BasicDBObject("$addToSet", "$_id")))
)
).results()) {
for (DBObject dbObject : new Iterable<DBObject>() {
@Override
public Iterator<DBObject> iterator() {
return BENCHMARK_COLLECTION.aggregate(Arrays.asList(
new BasicDBObject("$group",
new BasicDBObject("_id", null)
// .append("groupedIds", new BasicDBObject("$addToSet", "$_id"))
.append("sum", new BasicDBObject("$sum", "$appeal"))
.append("avg", new BasicDBObject("$avg", "$age"))
.append("stddev", new BasicDBObject("$stdDevPop", "$age"))
.append("count", new BasicDBObject("$sum", 1)))

), AggregationOptions.builder().allowDiskUse(true).build());
}
}) {
mongoResults.add(dbObject.toMap());
}
LOG.info("Mongo Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), mongoResults.size());

LOG.info("Result: " + new Gson().toJson(mongoResults));

LOG.info("Starting aggregation");
LOG.info("Aggregating file");
startTime = System.currentTimeMillis();
result = aggregation.aggregate(inMemList);
LOG.info("In-memory Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size());

int docCount = inMemList.size();
int threadCount = 10;
final int limit = Double.valueOf(Math.ceil(((double) docCount) / threadCount)).intValue();
final AtomicInteger counter = new AtomicInteger(0);
LOG.info("Starting multithreaded aggregation");
result = aggregation.aggregate(new JsonFileIterator("C:\\Users\\srira\\Documents\\caffinc\\playarea\\bm.json"));
LOG.info("File Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size());

LOG.info("Reading file");
startTime = System.currentTimeMillis();
for (int i = 0; i < threadCount; i++) {
final int batchId = i;
new Thread() {
@Override
public void run() {
runAggregation(counter, aggregation, limit, batchId);
int count = 0;
for (Map<String, Object> obj : new Iterable<Map<String, Object>>() {
@Override
public Iterator<Map<String, Object>> iterator() {
try {
return new JsonFileIterator("C:\\Users\\srira\\Documents\\caffinc\\playarea\\bm.json");
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
}.start();
}
while (counter.get() < docCount) {
Thread.sleep(1);
}
}) {
count = Double.valueOf((double) obj.get("_id")).intValue();
}
LOG.info("Multi-threaded Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size());

LOG.info("File Read time: {}ms {} docs", (System.currentTimeMillis() - startTime), count);
}

private static void runAggregation(final AtomicInteger counter, final Aggregation aggregation, final int limit, final int batchId) {
LOG.info("Starting aggregation");
private static void runAggregation(final CountDownLatch latch, final Aggregation aggregation, final int limit, final int batchId) {
long startTime = System.currentTimeMillis();
List<Map<String, Object>> result = aggregation.aggregate(new Iterator<Map<String, Object>>() {
private Iterator<DBObject> objectIterator = BENCHMARK_COLLECTION.find().skip(limit * batchId).limit(limit).iterator();

List<Map<String, Object>> result = aggregation.aggregate(new JsonIterator<DBObject>(BENCHMARK_COLLECTION.find().skip(limit * batchId).limit(limit)) {
@Override
public boolean hasNext() {
return objectIterator.hasNext();
}

@Override
public Map<String, Object> next() {
counter.incrementAndGet();
if (objectIterator.hasNext())
return objectIterator.next().toMap();
else
return null;
}

@Override
public void remove() {
throw new UnsupportedOperationException();
public Map<String, Object> toJson(DBObject element) {
latch.countDown();
return element.toMap();
}
});
LOG.info("Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size());
Expand Down
2 changes: 1 addition & 1 deletion jaggr/jaggr-utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>jaggr-parent</artifactId>
<groupId>com.caffinc</groupId>
<version>0.4.0</version>
<version>0.5.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import com.google.gson.Gson;

import java.io.*;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand All @@ -14,7 +17,7 @@
* @author Sriram
* @since 11/27/2016
*/
public class JsonStringIterator implements Iterator<Map<String, Object>>, Closeable {
public class JsonFileIterator implements Iterator<Map<String, Object>>, Closeable {
private final BufferedReader bufferedReader;
private String cachedLine;
private boolean finished = false;
Expand All @@ -26,8 +29,8 @@ public class JsonStringIterator implements Iterator<Map<String, Object>>, Closea
* @param fileName the <code>fileName</code> to read from
* @throws IOException thrown if there is a problem accessing the file
*/
public JsonStringIterator(final String fileName) throws IOException {
this(new BufferedReader(new FileReader(fileName)));
public JsonFileIterator(final String fileName) throws IOException {
this(Files.newBufferedReader(Paths.get(fileName), Charset.defaultCharset()));
}

/**
Expand All @@ -36,7 +39,7 @@ public JsonStringIterator(final String fileName) throws IOException {
* @param reader the <code>Reader</code> to read from, not null
* @throws IllegalArgumentException if the reader is null
*/
public JsonStringIterator(final Reader reader) throws IllegalArgumentException {
public JsonFileIterator(final Reader reader) throws IllegalArgumentException {
if (reader == null) {
throw new IllegalArgumentException("Reader must not be null");
}
Expand All @@ -62,15 +65,13 @@ public boolean hasNext() {
return false;
} else {
try {
while (true) {
final String line = bufferedReader.readLine();
if (line == null) {
finished = true;
return false;
}
cachedLine = line;
return true;
final String line = bufferedReader.readLine();
if (line == null) {
finished = true;
return false;
}
cachedLine = line;
return true;
} catch (final IOException ioe) {
close();
throw new IllegalStateException(ioe);
Expand Down Expand Up @@ -126,7 +127,7 @@ public void close() {
* @throws UnsupportedOperationException always
*/
public void remove() {
throw new UnsupportedOperationException("Remove unsupported on JsonStringIterator");
throw new UnsupportedOperationException("Remove unsupported on JsonFileIterator");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* @author Sriram
* @since 11/26/2016
*/
public class JsonFileReader {
public class JsonFileUtil {
private static final Gson gson = new Gson();

/**
Expand All @@ -31,7 +31,7 @@ public static List<Map<String, Object>> readJsonFromResource(final String resour
List<Map<String, Object>> jsonList = new ArrayList<>();
String json;
try (BufferedReader br = new BufferedReader(
new InputStreamReader(JsonFileReader.class.getClassLoader().getResourceAsStream(resourceName)))) {
new InputStreamReader(JsonFileUtil.class.getClassLoader().getResourceAsStream(resourceName)))) {
while ((json = br.readLine()) != null) {
jsonList.add(gson.fromJson(json, HashMap.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Map<String, Object> next() {
* @throws UnsupportedOperationException always
*/
public void remove() {
throw new UnsupportedOperationException("Remove unsupported on JsonStringIterator");
throw new UnsupportedOperationException("Remove unsupported on JsonFileIterator");
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
import java.util.*;

/**
* Tests for the JsonStringIterator
* Tests for the JsonFileIterator
*
* @author Sriram
* @since 11/27/2016
*/
public class JsonStringIteratorTest {
public class JsonFileIteratorTest {
private static final String TEMP_DIR = System.getProperty("java.io.tmpdir");
private static final Random RANDOM = new Random();
private static final Gson GSON = new Gson();
Expand All @@ -37,9 +37,9 @@ public void testJsonFileIterator() throws Exception {
br.write(GSON.toJson(json) + "\n");
}
}
try (JsonStringIterator jsonStringIterator = new JsonStringIterator(tempFilePath.toString())) {
try (JsonFileIterator jsonFileIterator = new JsonFileIterator(tempFilePath.toString())) {
for (Map<String, Object> expected : expectedData) {
Map<String, Object> actual = jsonStringIterator.next();
Map<String, Object> actual = jsonFileIterator.next();
Assert.assertEquals("Value should match value written to file", expected, actual);
}
}
Expand Down
Loading

0 comments on commit 32fe5a2

Please sign in to comment.