Skip to content

Commit

Permalink
Merge pull request #1 from caffinc/development
Browse files Browse the repository at this point in the history
Adding initial working code
  • Loading branch information
SriramKeerthi authored Nov 26, 2016
2 parents 682cb5e + 15d3deb commit 4ae2682
Show file tree
Hide file tree
Showing 28 changed files with 895 additions and 12 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Test
script:
- sh -c 'cd jaggr && mvn clean test'
40 changes: 40 additions & 0 deletions jaggr/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.caffinc</groupId>
<artifactId>jaggr</artifactId>
<version>1.0-SNAPSHOT</version>
<name>jaggr - JSON Aggregator</name>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.6.2</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
60 changes: 60 additions & 0 deletions jaggr/src/main/java/com/caffinc/jaggr/core/Aggregation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.caffinc.jaggr.core;

import com.caffinc.jaggr.core.operations.Operation;
import com.caffinc.jaggr.core.utils.FieldValueExtractor;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Aggregates list of objects based on operations
*
* @author Sriram
* @since 11/26/2016
*/
public class Aggregation {
private String _id;
private String[] idSplit;
private Map<String, Operation> operationMap;

Aggregation(String _id, Map<String, Operation> operationMap) {
this._id = _id;
this.idSplit = _id != null ? _id.split("\\.") : null;
this.operationMap = operationMap;
}

public List<Map<String, Object>> aggregate(List<Map<String, Object>> objects) {
Map<String, Map<String, Object>> workspace = new HashMap<>();
for (Map<String, Object> object : objects) {
String id = "0";
if (_id != null) {
id = String.valueOf(FieldValueExtractor.getValue(idSplit, object));
}
if (!workspace.containsKey(id)) {
Map<String, Object> groupWorkspace = new HashMap<>();
groupWorkspace.put("_id", id);
workspace.put(id, groupWorkspace);
}
Map<String, Object> groupWorkspace = workspace.get(id);
for (Map.Entry<String, Operation> operationEntry : operationMap.entrySet()) {
String field = operationEntry.getKey();
Operation operation = operationEntry.getValue();
Object t0 = groupWorkspace.get(field);
Object t1 = operation.aggregate(t0, object);
groupWorkspace.put(field, t1);
}
}
List<Map<String, Object>> resultList = new ArrayList<>();
for (Map<String, Object> groupWorkspace : workspace.values()) {
for (Map.Entry<String, Operation> operationEntry : operationMap.entrySet()) {
String field = operationEntry.getKey();
Operation operation = operationEntry.getValue();
groupWorkspace.put(field, operation.result(groupWorkspace.get(field)));
}
resultList.add(groupWorkspace);
}
return resultList;
}
}
32 changes: 32 additions & 0 deletions jaggr/src/main/java/com/caffinc/jaggr/core/AggregationBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.caffinc.jaggr.core;


import com.caffinc.jaggr.core.operations.Operation;

import java.util.HashMap;
import java.util.Map;

/**
* Builder for Aggregations
*
* @author Sriram
* @since 11/26/2016
*/
public class AggregationBuilder {
private String _id = null;
private Map<String, Operation> operationMap = new HashMap<>();

public AggregationBuilder setGroupBy(String field) {
_id = field;
return this;
}

public AggregationBuilder addOperation(String field, Operation operation) {
operationMap.put(field, operation);
return this;
}

public Aggregation getAggregation() {
return new Aggregation(_id, operationMap);
}
}
17 changes: 17 additions & 0 deletions jaggr/src/main/java/com/caffinc/jaggr/core/entities/Tuple2.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.caffinc.jaggr.core.entities;

/**
* Holds a pair of values
*
* @author Sriram
* @since 11/26/2016
*/
public class Tuple2<T1, T2> {
public T1 _1;
public T2 _2;

public Tuple2(T1 _1, T2 _2) {
this._1 = _1;
this._2 = _2;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.caffinc.jaggr.core.operations;

import com.caffinc.jaggr.core.entities.Tuple2;
import com.caffinc.jaggr.core.utils.FieldValueExtractor;

import java.util.Map;

/**
* Performs averaging aggregation
*
* @author Sriram
* @since 11/26/2016
*/
public class AverageOperation implements Operation {
private String[] field;
private String unsplitField;

public AverageOperation(String field) {
this.unsplitField = field;
this.field = field.split("\\.");
}

@Override
public Object aggregate(Object previousAccumulatedValue, Map<String, Object> object) {
Object value = FieldValueExtractor.getValue(field, object);
if (value == null)
return previousAccumulatedValue;
Tuple2<Double, Integer> accumulator = previousAccumulatedValue == null
? new Tuple2<>(0.0d, 0)
: (Tuple2<Double, Integer>) previousAccumulatedValue;

if (value instanceof Double) {
accumulator._1 = accumulator._1 + (Double) value;
} else if (value instanceof Float) {
accumulator._1 = accumulator._1 + (Float) value;
} else if (value instanceof Long) {
accumulator._1 = accumulator._1 + (Long) value;
} else if (value instanceof Integer) {
accumulator._1 = accumulator._1 + (Integer) value;
} else {
throw new IllegalArgumentException("Field " + unsplitField + " isn't a Double, Float, Long or Integer");
}
accumulator._2++;
return accumulator;
}

@Override
public Object result(Object accumulatedValue) {
Tuple2<Double, Integer> accumulator = accumulatedValue == null
? new Tuple2<>(0.0d, 1)
: (Tuple2<Double, Integer>) accumulatedValue;
return accumulator._1 / accumulator._2;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.caffinc.jaggr.core.operations;


import com.caffinc.jaggr.core.utils.FieldValueExtractor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Collects all values into a list
*
* @author Sriram
* @since 11/26/2016
*/
public class CollectOperation implements Operation {
private String[] field;
private String unsplitField;

public CollectOperation(String field) {
this.unsplitField = field;
this.field = field.split("\\.");
}

@Override
public Object aggregate(Object previousAccumulatedValue, Map<String, Object> object) {
Object value = FieldValueExtractor.getValue(field, object);
if (value == null)
return previousAccumulatedValue;
List<Object> accumulator = previousAccumulatedValue == null
? new ArrayList<>()
: (List<Object>) previousAccumulatedValue;
accumulator.add(value);
return accumulator;
}

@Override
public Object result(Object accumulatedValue) {
return accumulatedValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.caffinc.jaggr.core.operations;


import com.caffinc.jaggr.core.utils.FieldValueExtractor;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Collects all values into a set
*
* @author Sriram
* @since 11/26/2016
*/
public class CollectSetOperation implements Operation {
private String[] field;
private String unsplitField;

public CollectSetOperation(String field) {
this.unsplitField = field;
this.field = field.split("\\.");
}

@Override
public Object aggregate(Object previousAccumulatedValue, Map<String, Object> object) {
Object value = FieldValueExtractor.getValue(field, object);
if (value == null)
return previousAccumulatedValue;
Set<Object> accumulator = previousAccumulatedValue == null
? new HashSet<>()
: (Set<Object>) previousAccumulatedValue;
accumulator.add(value);
return accumulator;
}

@Override
public Object result(Object accumulatedValue) {
return accumulatedValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.caffinc.jaggr.core.operations;

import java.util.Map;

/**
* Performs count aggregation
*
* @author Sriram
* @since 11/26/2016
*/
public class CountOperation implements Operation {
private Integer counterValue;

public CountOperation(Integer counterValue) {
this.counterValue = counterValue;
}

public CountOperation() {
this(1);
}

@Override
public Object aggregate(Object previousAccumulatedValue, Map<String, Object> object) {
return (previousAccumulatedValue == null ? 0 : (Integer) previousAccumulatedValue) + counterValue;
}

@Override
public Object result(Object accumulatedValue) {
return accumulatedValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.caffinc.jaggr.core.operations;


import com.caffinc.jaggr.core.utils.FieldValueExtractor;

import java.util.Map;

/**
* Performs maximum value aggregation
*
* @author Sriram
* @since 11/26/2016
*/
public class MaxOperation implements Operation {
private String[] field;
private String unsplitField;
private boolean isDouble = false;

public MaxOperation(String field) {
this.unsplitField = field;
this.field = field.split("\\.");
}

@Override
public Object aggregate(Object previousAccumulatedValue, Map<String, Object> object) {
Object value = FieldValueExtractor.getValue(field, object);
if (value == null)
return previousAccumulatedValue;
Double accumulator = previousAccumulatedValue == null
? Double.MIN_VALUE
: (Double) previousAccumulatedValue;
if (value instanceof Double) {
isDouble = true;
return (accumulator < (Double) value) ? value : accumulator;
}
if (value instanceof Float) {
isDouble = true;
return (accumulator < (Float) value) ? value : accumulator;
}
if (value instanceof Long) {
return (accumulator < (Long) value) ? value : accumulator;
}
if (value instanceof Integer) {
return (accumulator < (Integer) value) ? value : accumulator;
}
throw new IllegalArgumentException("Field " + unsplitField + " isn't a Double, Float, Long or Integer");
}

@Override
public Object result(Object accumulatedValue) {
if (accumulatedValue != null)
return isDouble ? accumulatedValue : ((Double) accumulatedValue).longValue();
else
return null;
}
}
Loading

0 comments on commit 4ae2682

Please sign in to comment.