Skip to content

Commit

Permalink
Merge pull request #3 from caffinc/development
Browse files Browse the repository at this point in the history
Adding support for iterators
  • Loading branch information
SriramKeerthi authored Nov 27, 2016
2 parents 44e5d91 + 5ad7d8f commit b62789c
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 25 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jaggr is on Bintray and Maven Central (Soon):
<dependency>
<groupId>com.caffinc</groupId>
<artifactId>jaggr</artifactId>
<version>0.1</version>
<version>0.2</version>
</dependency>

Assume the following JSON documents are stored in a file called `raw.json`:
Expand Down Expand Up @@ -45,6 +45,10 @@ Aggregation can now be performed using the `aggregate()` method:

List<Map<String, Object>> result = aggregation.aggregate(jsonList);

Aggregation also supports Iterators:

List<Map<String, Object>> result = aggregation.aggregate(jsonList.iterator());

The result of the above aggregation would look as follows:

{"_id": "a", "avg": 2.0, "sum": 10, "min": -1, "max": 5, "count": 5}
Expand Down
2 changes: 1 addition & 1 deletion jaggr/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.caffinc</groupId>
<artifactId>jaggr</artifactId>
<version>0.1</version>
<version>0.2</version>
<name>jaggr</name>
<description>Simple JSON Aggregator for Java</description>
<url>https://github.com/caffinc/jaggr</url>
Expand Down
95 changes: 73 additions & 22 deletions jaggr/src/main/java/com/caffinc/jaggr/core/Aggregation.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@
import com.caffinc.jaggr.core.operations.Operation;
import com.caffinc.jaggr.core.utils.FieldValueExtractor;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

/**
* Aggregates list of objects based on operations
* Aggregates list or iterators of JSON objects based on aggregation operations
*
* @author Sriram
* @since 11/26/2016
Expand All @@ -25,29 +22,83 @@ public class Aggregation {
this.operationMap = operationMap;
}

public List<Map<String, Object>> aggregate(List<Map<String, Object>> objects) {
/**
* Aggregates over an Iterator of JSON Objects
*
* @param objectIterator JSON Object iterator
* @return aggregation result
*/
public List<Map<String, Object>> aggregate(Iterator<Map<String, Object>> objectIterator) {
Map<String, Map<String, Object>> workspace = new HashMap<>();
for (Map<String, Object> object : objects) {
String id = "0";
if (_id != null) {
id = String.valueOf(FieldValueExtractor.getValue(idSplit, object));
if (objectIterator != null) {
while (objectIterator.hasNext()) {
Map<String, Object> object = objectIterator.next();
aggregate(object, workspace);
}
if (!workspace.containsKey(id)) {
Map<String, Object> groupWorkspace = new HashMap<>();
groupWorkspace.put("_id", id);
workspace.put(id, groupWorkspace);
}
Map<String, Object> groupWorkspace = workspace.get(id);
for (Map.Entry<String, Operation> operationEntry : operationMap.entrySet()) {
String field = operationEntry.getKey();
Operation operation = operationEntry.getValue();
Object t0 = groupWorkspace.get(field);
Object t1 = operation.aggregate(t0, object);
groupWorkspace.put(field, t1);
}
return computeResults(workspace);
}

/**
* Aggregates over a list of JSON Objects
*
* @param objectList JSON Object list
* @return aggregation result
*/
public List<Map<String, Object>> aggregate(List<Map<String, Object>> objectList) {
Map<String, Map<String, Object>> workspace = new HashMap<>();
if (objectList != null) {
for (Map<String, Object> object : objectList) {
aggregate(object, workspace);
}
}
return computeResults(workspace);
}

/**
* Aggregates a single object into the workspace
*
* @param object Object to perform aggregations on
* @param workspace Workspace to hold temporary aggregation results in
*/
private void aggregate(Map<String, Object> object, Map<String, Map<String, Object>> workspace) {
// Identify the ID of the document
String id = "0";
if (_id != null) {
id = String.valueOf(FieldValueExtractor.getValue(idSplit, object));
}
if (!workspace.containsKey(id)) {
Map<String, Object> groupWorkspace = new HashMap<>();
groupWorkspace.put("_id", id);
workspace.put(id, groupWorkspace);
}
// Get the workspace for the given ID
Map<String, Object> groupWorkspace = workspace.get(id);
for (Map.Entry<String, Operation> operationEntry : operationMap.entrySet()) {
// Get the key in the workspace
String field = operationEntry.getKey();
// Get the operation
Operation operation = operationEntry.getValue();
// Get the accumulated value in the workspace
Object t0 = groupWorkspace.get(field);
// Get the new value after performing the operation
Object t1 = operation.aggregate(t0, object);
// Write the result back in the workspace
groupWorkspace.put(field, t1);
}
}

/**
* Computes final results from the workspace
*
* @param workspace Workspace holding intermediate results
* @return Final aggregation result
*/
private List<Map<String, Object>> computeResults(Map<String, Map<String, Object>> workspace) {
List<Map<String, Object>> resultList = new ArrayList<>();
// Loop through all the values in the workspace map
for (Map<String, Object> groupWorkspace : workspace.values()) {
// Perform final result computation
for (Map.Entry<String, Operation> operationEntry : operationMap.entrySet()) {
String field = operationEntry.getKey();
Operation operation = operationEntry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*/
public class AggregationBuilderTest {
private static final Gson gson = new Gson();
private static List<Map<String, Object>> jsonList = new ArrayList<>();
private static List<Map<String, Object>> jsonList;

private static <T> T roughen(Object o, Class<T> t) {
return gson.fromJson(gson.toJson(o), t);
Expand Down Expand Up @@ -202,4 +202,232 @@ public void testMultiOperation() throws Exception {
Set<Map<String, Object>> result = roughen(aggregation.aggregate(jsonList), HashSet.class);
assertEquals("Multiple aggregations result should be as expected", expected, result);
}

@Test
public void testCollectStrings() throws Exception {
String field = "test.f";
String collectField = "f";
Map<String, Object> expectedMap1 = new HashMap<>();
Map<String, Object> expectedMap2 = new HashMap<>();
for (Map<String, Object> expectedObject : JsonFileReader.readJsonFromResource("collectStringsResult.json")) {
expectedMap1.put(String.valueOf(expectedObject.get("_id")), new HashSet((List) expectedObject.get("set")));
expectedMap2.put(String.valueOf(expectedObject.get("_id")), expectedObject.get("list"));
}

Aggregation aggregation = new AggregationBuilder()
.setGroupBy(field)
.addOperation("list", new CollectOperation(collectField))
.addOperation("set", new CollectSetOperation(collectField))
.getAggregation();
Map<String, Object> resultMap1 = new HashMap<>();
Map<String, Object> resultMap2 = new HashMap<>();
for (Map<String, Object> resultObject : (Set<Map<String, Object>>) roughen(aggregation.aggregate(jsonList), HashSet.class)) {
resultMap1.put(String.valueOf(resultObject.get("_id")), new HashSet((List) resultObject.get("set")));
resultMap2.put(String.valueOf(resultObject.get("_id")), resultObject.get("list"));
}

assertEquals("Collect for Strings should work as expected", expectedMap1, resultMap1);
assertEquals("CollectSet for Strings should work as expected", expectedMap2, resultMap2);
}


@Test
public void testIterativeSimpleGrouping() throws Exception {
String field = "f";
Set<Object> expectedResult = new HashSet<>();
for (Map<String, Object> obj : jsonList) {
expectedResult.add(String.valueOf(obj.get(field)));
}

Set<Object> result = new HashSet<>();
Aggregation aggregation = new AggregationBuilder()
.setGroupBy(field).getAggregation();
List<Map<String, Object>> resultList = aggregation.aggregate(jsonList.iterator());
for (Map<String, Object> resultObj : resultList) {
result.add(resultObj.get("_id"));
}

assertEquals("Grouping by ID should match", expectedResult, result);
}

@Test
public void testIterativeNestedGrouping() throws Exception {
String field = "test.f";
Set<Object> expectedResult = new HashSet<>();
for (Map<String, Object> obj : jsonList) {
expectedResult.add(String.valueOf(FieldValueExtractor.getValue(field, obj)));
}

Set<Object> result = new HashSet<>();
Aggregation aggregation = new AggregationBuilder()
.setGroupBy(field).getAggregation();
List<Map<String, Object>> resultList = aggregation.aggregate(jsonList.iterator());
for (Map<String, Object> resultObj : resultList) {
result.add(resultObj.get("_id"));
}

assertEquals("Grouping by field should match", expectedResult, result);
}

@Test
public void testIterativeCountOperation() throws Exception {
String field = "f";
Set<Map<String, Object>> expected = new HashSet<>(JsonFileReader.readJsonFromResource("countResult.json"));

Aggregation aggregation = new AggregationBuilder()
.setGroupBy(field)
.addOperation("count", new CountOperation())
.getAggregation();
Set<Map<String, Object>> result = roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class);
assertEquals("Counts should be as expected", expected, result);
}

@Test
public void testIterativeMaxOperation() throws Exception {
String field = "f";
String maxField = "test.f";
Set<Map<String, Object>> expected = new HashSet<>(JsonFileReader.readJsonFromResource("maxResult.json"));

Aggregation aggregation = new AggregationBuilder()
.setGroupBy(field)
.addOperation("max", new MaxOperation(maxField))
.getAggregation();
Set<Map<String, Object>> result = roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class);
assertEquals("Max should be as expected", expected, result);
}

@Test
public void testIterativeMinOperation() throws Exception {
String field = "f";
String minField = "test.f";
Set<Map<String, Object>> expected = new HashSet<>(JsonFileReader.readJsonFromResource("minResult.json"));

Aggregation aggregation = new AggregationBuilder()
.setGroupBy(field)
.addOperation("min", new MinOperation(minField))
.getAggregation();
Set<Map<String, Object>> result = roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class);
assertEquals("Min should be as expected", expected, result);
}

@Test
public void testIterativeCollectOperation() throws Exception {
String field = "f";
String collectField = "_id";
Set<Map<String, Object>> expected = new HashSet<>(JsonFileReader.readJsonFromResource("collectResult.json"));

Aggregation aggregation = new AggregationBuilder()
.setGroupBy(field)
.addOperation("list", new CollectOperation(collectField))
.getAggregation();
Set<Map<String, Object>> result = roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class);
assertEquals("Collected lists should be as expected", expected, result);
}

@Test
public void testIterativeCollectSetOperation() throws Exception {
String field = "f";
String collectField = "test.f";
Map<String, Object> expectedMap = new HashMap<>();
for (Map<String, Object> expectedObject : JsonFileReader.readJsonFromResource("collectSetResult.json")) {
expectedMap.put(String.valueOf(expectedObject.get("_id")), new HashSet((List) expectedObject.get("set")));
}

Aggregation aggregation = new AggregationBuilder()
.setGroupBy(field)
.addOperation("set", new CollectSetOperation(collectField))
.getAggregation();
Map<String, Object> resultMap = new HashMap<>();
for (Map<String, Object> resultObject : (Set<Map<String, Object>>) roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class)) {
resultMap.put(String.valueOf(resultObject.get("_id")), new HashSet((List) resultObject.get("set")));
}
assertEquals("Collected sets should be as expected", expectedMap, resultMap);
}

@Test
public void testIterativeSumOperation() throws Exception {
String field = "f";
String sumField = "test.f";
Set<Map<String, Object>> expected = new HashSet<>(JsonFileReader.readJsonFromResource("sumResult.json"));

Aggregation aggregation = new AggregationBuilder()
.setGroupBy(field)
.addOperation("sum", new SumOperation(sumField))
.getAggregation();
Set<Map<String, Object>> result = roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class);
assertEquals("Sum should be as expected", expected, result);
}

@Test
public void testIterativeAverageOperation() throws Exception {
String field = "f";
String avgField = "test.f";
Set<Map<String, Object>> expected = new HashSet<>(JsonFileReader.readJsonFromResource("avgResult.json"));

Aggregation aggregation = new AggregationBuilder()
.setGroupBy(field)
.addOperation("avg", new AverageOperation(avgField))
.getAggregation();
Set<Map<String, Object>> result = roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class);
assertEquals("Average should be as expected", expected, result);
}

@Test
public void testIterativeOperationWithoutGrouping() throws Exception {
Set<Map<String, Object>> expected = new HashSet<>(JsonFileReader.readJsonFromResource("grouplessResult.json"));

Aggregation aggregation = new AggregationBuilder()
.addOperation("count", new CountOperation())
.getAggregation();
Set<Map<String, Object>> result = roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class);
assertEquals("Counts without grouping should be as expected", expected, result);
}

@Test
public void testIterativeMultiOperation() throws Exception {
String field = "f";
String avgField = "test.f";
String sumField = "test.f";
String minField = "test.f";
String maxField = "test.f";
Set<Map<String, Object>> expected = new HashSet<>(JsonFileReader.readJsonFromResource("multiResult.json"));

Aggregation aggregation = new AggregationBuilder()
.setGroupBy(field)
.addOperation("avg", new AverageOperation(avgField))
.addOperation("sum", new SumOperation(sumField))
.addOperation("min", new MinOperation(minField))
.addOperation("max", new MaxOperation(maxField))
.addOperation("count", new CountOperation())
.getAggregation();
Set<Map<String, Object>> result = roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class);
assertEquals("Multiple aggregations result should be as expected", expected, result);
}

@Test
public void testIterativeCollectStrings() throws Exception {
String field = "test.f";
String collectField = "f";
Map<String, Object> expectedMap1 = new HashMap<>();
Map<String, Object> expectedMap2 = new HashMap<>();
for (Map<String, Object> expectedObject : JsonFileReader.readJsonFromResource("collectStringsResult.json")) {
expectedMap1.put(String.valueOf(expectedObject.get("_id")), new HashSet((List) expectedObject.get("set")));
expectedMap2.put(String.valueOf(expectedObject.get("_id")), expectedObject.get("list"));
}

Aggregation aggregation = new AggregationBuilder()
.setGroupBy(field)
.addOperation("list", new CollectOperation(collectField))
.addOperation("set", new CollectSetOperation(collectField))
.getAggregation();
Map<String, Object> resultMap1 = new HashMap<>();
Map<String, Object> resultMap2 = new HashMap<>();
for (Map<String, Object> resultObject : (Set<Map<String, Object>>) roughen(aggregation.aggregate(jsonList.iterator()), HashSet.class)) {
resultMap1.put(String.valueOf(resultObject.get("_id")), new HashSet((List) resultObject.get("set")));
resultMap2.put(String.valueOf(resultObject.get("_id")), resultObject.get("list"));
}

assertEquals("Collect for Strings should work as expected", expectedMap1, resultMap1);
assertEquals("CollectSet for Strings should work as expected", expectedMap2, resultMap2);
}
}
5 changes: 5 additions & 0 deletions jaggr/src/test/resources/collectStringsResult.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"_id": "1.0", "set": ["a", "b"], "list": ["a", "b", "b", "b", "b", "b"]}
{"_id": "-1.0", "set": ["a"], "list": ["a"]}
{"_id": "2.0", "set": ["a"], "list": ["a"]}
{"_id": "3.0", "set": ["a"], "list": ["a"]}
{"_id": "5.0", "set": ["a"], "list": ["a"]}

0 comments on commit b62789c

Please sign in to comment.