Skip to content

Commit

Permalink
Merge pull request #8 from caffinc/development
Browse files Browse the repository at this point in the history
Mode updates!
  • Loading branch information
SriramKeerthi authored Nov 29, 2016
2 parents 2a43e67 + 98d8175 commit b0c0f83
Show file tree
Hide file tree
Showing 12 changed files with 540 additions and 132 deletions.
46 changes: 46 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ Simple JSON Aggregator for Java
![Travis-CI Build Status](https://travis-ci.org/caffinc/jaggr.svg?branch=master)

## Usage

### Adding dependency
jaggr is on Bintray and Maven Central (Soon):

<dependency>
Expand All @@ -19,6 +21,7 @@ jaggr is on Bintray and Maven Central (Soon):
<version>0.2.2</version>
</dependency>

### Aggregating documents
Assume the following JSON documents are stored in a file called `raw.json`:

{"_id": 1, "f": "a", "test": {"f": 3}}
Expand Down Expand Up @@ -62,6 +65,49 @@ The result of the above aggregation would look as follows:
{"_id": "a", "avg": 2.0, "sum": 10, "min": -1, "max": 5, "count": 5}
{"_id": "b", "avg": 1.0, "sum": 5, "min": 1, "max": 1, "count": 5}

### Aggregating other data sources

While aggregating files or Lists of JSON documents might be good for some use cases, not all data fits this paradigm.

There are three utilities in the `jaggr-utils` library which can be used to aggregate other sources of data.

#### Aggregating small JSON files in the file system or resources

The `JsonFileReader` class exposes the `readJsonFromFile` and `readJsonFromResource` methods which can be used to read in all the JSON objects from the file into memory for aggregation.

It is generally not a good idea to read in large files due to obvious reasons.

List<Map<String, Object>> jsonData = JsonFileReader.readJsonFromFile("afile.json");

List<Map<String, Object>> jsonData = JsonFileReader.readJsonFromResource("aFileInResources.json");

List<Map<String, Object>> result = aggregation.aggregate(iterator);

#### Aggregating large JSON files or readers

The `JsonStringIterator` class provides constructors to iterate through a JSON file or a `Reader` object pointing to an underlying JSON String source without loading all the data into memory.

Iterator<Map<String, Object>> iterator = new JsonStringIterator("afile.json");

Iterator<Map<String, Object>> iterator = new JsonStringIterator(new BufferedReader(new FileReader("afile.json")));

List<Map<String, Object>> result = aggregation.aggregate(iterator);

#### Aggregate arbitrary object Iterators

The `JsonIterator` abstract class provides a way to convert an `Iterator` from any type to JSON. This can be used to iterate through data coming from arbitrary databases. For example, `MongoDB` provides `Iterable` interfaces to the data. You could aggregate an entire collection as follows:


Iterator<Map<String, Object>> iterator = new JsonIterator<DBObject>(mongoCollection.find().iterator()) {
@Override
public Map<String, Object> toJson(DBObject element) {
return element.toMap();
}
};

List<Map<String, Object>> result = aggregation.aggregate(iterator);

## Supported Aggregations

`jaggr` provides the following aggregations:
Expand Down
36 changes: 36 additions & 0 deletions jaggr/jaggr-bench/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>jaggr-parent</artifactId>
<groupId>com.caffinc</groupId>
<version>0.3.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>jaggr-bench</artifactId>

<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jaggr</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jaggr-utils</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.2.2</version>
</dependency>
</dependencies>
</project>
193 changes: 193 additions & 0 deletions jaggr/jaggr-bench/src/main/java/com/caffinc/jaggr/bench/Benchmark.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package com.caffinc.jaggr.bench;

import com.caffinc.jaggr.core.Aggregation;
import com.caffinc.jaggr.core.AggregationBuilder;
import com.caffinc.jaggr.core.operations.CollectSetOperation;
import com.caffinc.jaggr.utils.JsonIterator;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author srira
* @since 11/28/2016
*/
public class Benchmark {
private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class);
private static final int BATCH_SIZE = 10000;
private static final DBCollection BENCHMARK_COLLECTION =
new MongoClient("localhost").getDB("jaggr").getCollection("benchmark");
private static final Random RANDOM = new Random();
private static final String PLACEHOLDER =
"STRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRING" +
"STRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRING" +
"STRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRINGSTRING";

public static void main(String[] args) throws Exception {
// generateDocuments(BENCHMARK_COLLECTION, 1000000, getFieldDefinitions());
final Aggregation aggregation = new AggregationBuilder()
.setGroupBy("name")
.addOperation("groupedIds", new CollectSetOperation("_id"))
.getAggregation();
long startTime;
LOG.info("Computing read time");
startTime = System.currentTimeMillis();
Iterator<Map<String, Object>> dbObjectIterator = new JsonIterator<DBObject>(BENCHMARK_COLLECTION.find().iterator()) {
@Override
public Map<String, Object> toJson(DBObject element) {
return element.toMap();
}
};
List<Map<String, Object>> inMemList = new ArrayList<>();
while (dbObjectIterator.hasNext()) {
inMemList.add(dbObjectIterator.next());
}
LOG.info("Read time: {}ms {} docs", (System.currentTimeMillis() - startTime), inMemList.size());

LOG.info("Starting aggregation");
startTime = System.currentTimeMillis();
List<Map<String, Object>> result = aggregation.aggregate(new JsonIterator<DBObject>(BENCHMARK_COLLECTION.find().iterator()) {
@Override
public Map<String, Object> toJson(DBObject element) {
return element.toMap();
}
});
LOG.info("Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size());

startTime = System.currentTimeMillis();
LOG.info("Starting native aggregation");
List<Map<String, Object>> mongoResults = new ArrayList<>();
for (DBObject dbObject : BENCHMARK_COLLECTION.aggregate(
Arrays.asList(
new BasicDBObject("$group", new BasicDBObject("_id", "$name").append("groupedIds", new BasicDBObject("$addToSet", "$_id")))
)
).results()) {
mongoResults.add(dbObject.toMap());
}
LOG.info("Mongo Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), mongoResults.size());


LOG.info("Starting aggregation");
startTime = System.currentTimeMillis();
result = aggregation.aggregate(inMemList);
LOG.info("In-memory Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size());

int docCount = inMemList.size();
int threadCount = 10;
final int limit = Double.valueOf(Math.ceil(((double) docCount) / threadCount)).intValue();
final AtomicInteger counter = new AtomicInteger(0);
LOG.info("Starting multithreaded aggregation");
startTime = System.currentTimeMillis();
for (int i = 0; i < threadCount; i++) {
final int batchId = i;
new Thread() {
@Override
public void run() {
runAggregation(counter, aggregation, limit, batchId);
}
}.start();
}
while (counter.get() < docCount) {
Thread.sleep(1);
}
LOG.info("Multi-threaded Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size());

}

private static void runAggregation(final AtomicInteger counter, final Aggregation aggregation, final int limit, final int batchId) {
LOG.info("Starting aggregation");
long startTime = System.currentTimeMillis();
List<Map<String, Object>> result = aggregation.aggregate(new Iterator<Map<String, Object>>() {
private Iterator<DBObject> objectIterator = BENCHMARK_COLLECTION.find().skip(limit * batchId).limit(limit).iterator();

@Override
public boolean hasNext() {
return objectIterator.hasNext();
}

@Override
public Map<String, Object> next() {
counter.incrementAndGet();
if (objectIterator.hasNext())
return objectIterator.next().toMap();
else
return null;
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
});
LOG.info("Aggregation time: {}ms {} docs", (System.currentTimeMillis() - startTime), result.size());
}

private static Map<String, String> getFieldDefinitions() {
Map<String, String> fieldDefinitions = new HashMap<>();
fieldDefinitions.put("_id", "$id");
fieldDefinitions.put("name", "$string_10_5");
fieldDefinitions.put("age", "$int_10_99");
fieldDefinitions.put("sex", "$choice_m|f");
fieldDefinitions.put("salary", "$int_100000_1000000");
fieldDefinitions.put("appeal", "$double");
return fieldDefinitions;
}

private static void generateDocuments(
DBCollection benchmarkCollection, int count, Map<String, String> fieldDefinitions) {
List<DBObject> dbObjectList = new ArrayList<>();
for (int i = 0; i < count; i++) {
Map<String, Object> doc = new HashMap<>();
for (Map.Entry<String, String> field : fieldDefinitions.entrySet()) {
doc.put(field.getKey(), generateField(field.getValue(), field.getKey(), i));
}
dbObjectList.add(new BasicDBObject(doc));
if (dbObjectList.size() >= BATCH_SIZE) {
LOG.info("Generated " + (i + 1) + " documents");
benchmarkCollection.insert(dbObjectList);
dbObjectList = new ArrayList<>();
}
}
if (dbObjectList.size() > 0) {
LOG.info("Generated " + count + " documents");
benchmarkCollection.insert(dbObjectList);
}
}

private static Object generateField(String fieldDefinition, String field, Integer id) {
if (fieldDefinition == null || !fieldDefinition.startsWith("$")) {
return fieldDefinition;
} else {
String[] definitionParts = fieldDefinition.split("_");
String type = definitionParts[0];
int min;
int max;
switch (type) {
case "$id":
return id;
case "$string":
min = Integer.parseInt(definitionParts[1]);
max = Integer.parseInt(definitionParts[2]);
int minVal = Double.valueOf(Math.pow(10, max - 1)).intValue();
int maxVal = Double.valueOf(Math.pow(10, max)).intValue();
return PLACEHOLDER.substring(0, min - max) + (minVal + RANDOM.nextInt(maxVal - minVal));
case "$int":
min = Integer.parseInt(definitionParts[1]);
max = Integer.parseInt(definitionParts[2]);
return min + RANDOM.nextInt(max - min);
case "$choice":
String[] choices = definitionParts[1].split("\\|");
return choices[RANDOM.nextInt(choices.length)];
case "$double":
return RANDOM.nextDouble();
}
}
return fieldDefinition;
}
}
8 changes: 8 additions & 0 deletions jaggr/jaggr-bench/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Root logger option
log4j.rootLogger=INFO, stdout

# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
2 changes: 1 addition & 1 deletion jaggr/jaggr-utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>jaggr-parent</artifactId>
<groupId>com.caffinc</groupId>
<version>0.2.2</version>
<version>0.3.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Loading

0 comments on commit b0c0f83

Please sign in to comment.