Skip to content

Commit

Permalink
Merge pull request #85 from experiandataquality/2.4.0
Browse files Browse the repository at this point in the history
2.4.0
  • Loading branch information
chungkhenhah authored Jun 24, 2021
2 parents 87b3d3e + d5a8b49 commit d43dc40
Show file tree
Hide file tree
Showing 5 changed files with 389 additions and 3 deletions.
41 changes: 41 additions & 0 deletions ExampleSteps/DemoAggregateStep/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
buildscript {
dependencies {
classpath 'com.github.jengelman.gradle.plugins:shadow:5.2.0'
}
}

plugins {
id 'java-library'
id 'com.github.johnrengelman.shadow' version '5.2.0'
}

repositories {
mavenCentral()
maven {
url 'https://raw.githubusercontent.com/experiandataquality/aperture-data-studio-sdk/github-maven-repository/maven'
}
maven {
// TODO: to be removed once SDK 2.4.0 released
url 'https://raw.githubusercontent.com/experiandataquality/aperture-data-studio-sdk/2.4.0-SDK/maven'
}
}

dependencies {
compileOnly(
"com.experian.datastudio:sdkapi:2.4.0",
)
}

/*
Package the step into a jar together with its dependencies.
Relocate the dependencies to ensure dependencies version doesn't conflict with data studio
*/
shadowJar {
//minimize() - DO NOT USE minimize() because classes initialized through reflections or service loader pattern will be excluded
zip64 = true
relocate 'org.eclipse.collections', 'example.org.eclipse.collections'
}

tasks.test.dependsOn tasks.shadowJar
jar.enabled = false
assemble.dependsOn(shadowJar)
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package com.experian.aperture.datastudio.sdk.step.examples;

import com.experian.datastudio.sdk.api.CustomTypeMetadata;
import com.experian.datastudio.sdk.api.CustomTypeMetadataBuilder;
import com.experian.datastudio.sdk.api.step.CustomStepDefinition;
import com.experian.datastudio.sdk.api.step.configuration.StepConfiguration;
import com.experian.datastudio.sdk.api.step.configuration.StepConfigurationBuilder;
import com.experian.datastudio.sdk.api.step.configuration.StepIcon;
import com.experian.datastudio.sdk.api.step.processor.*;

import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class DemoAggregateStep implements CustomStepDefinition {
private static final String INDEX_NAME = "groupindex";
private static final String INPUT_ID = "input0";
private static final String OUTPUT_ID = "output0";
private static final String GROUP_COLUMN_PROP = "groupcolumnproperty";
private static final String AGGREGATE_COLUMN_PROP = "aggregatecolumnproperty";
private static final String AGGREGATE_TYPE_PROP = "aggregatetypeproperty";

private static final String GROUP_COLUMN = "Group";
private static final String AGGREGATE_COLUMN = "Aggregate";

private static final String SUM = "sum";
private static final String AVG = "avg";
private static final String MAX = "max";
private static final String MIN = "min";
private static final String COUNT = "count";
private static final String FIRST = "first";
private static final String LAST = "last";
private static final String PIPE = "pipe";

@Override
public StepConfiguration createConfiguration(StepConfigurationBuilder configurationBuilder) {
return configurationBuilder
.withNodes(stepNodeBuilder -> stepNodeBuilder
.addInputNode(INPUT_ID)
.addOutputNode(OUTPUT_ID)
.build())
.withStepProperties(stepPropertiesBuilder -> stepPropertiesBuilder
.addStepProperty(stepPropertyBuilder -> stepPropertyBuilder
.asColumnChooser(GROUP_COLUMN_PROP)
.forInputNode(INPUT_ID)
.withAllowSelectAll(false)
.withMultipleSelect(false)
.withIsRequired(true)
// .withShouldRebuildIndex(true) step property by default will rebuild index on change (true).
.withLabelSupplier(uiCallbackContext -> "Group Column")
.build())
.addStepProperty(stepPropertyBuilder -> stepPropertyBuilder
.asColumnChooser(AGGREGATE_COLUMN_PROP)
.forInputNode(INPUT_ID)
.withAllowSelectAll(false)
.withMultipleSelect(false)
.withIsRequired(true)
// .withShouldRebuildIndex(true) step property by default will rebuild index on change (true).
.withLabelSupplier(uiCallbackContext -> "Aggregate Column")
.build())
.addStepProperty(stepPropertyBuilder -> stepPropertyBuilder
.asCustomChooser(AGGREGATE_TYPE_PROP)
.withAllowValuesProvider(ctx -> Arrays.asList(SUM, AVG, MAX, MIN, COUNT, FIRST, LAST, PIPE))
// should not rebuild index on property change because technically the group index for all aggregate type is the same
.withShouldRebuildIndex(false)
.withAllowSelectAll(false)
.withMultipleSelect(false)
.withIsRequired(true)
.withLabelSupplier(uiCallbackContext -> "Aggregate type")
.build())
.build())
.withOutputLayouts(outputLayoutBuilder -> outputLayoutBuilder
.forOutputNode(OUTPUT_ID, outputColumnBuilder -> outputColumnBuilder
.addColumn(GROUP_COLUMN)
.addColumn(AGGREGATE_COLUMN)
.build())
.build())
.withIcon(StepIcon.FILTER_NONE)
.build();
}

@Override
public StepProcessor createProcessor(StepProcessorBuilder processorBuilder) {
return processorBuilder
.registerIndex(INDEX_NAME, indexBuilder -> indexBuilder
.indexTypeRows()
.provideIndexValues(ctx -> {
final InputColumn groupColumn = ctx.getColumnFromChooserValues(GROUP_COLUMN_PROP).get(0);
final long rowCount = ctx.getInputContext(INPUT_ID).orElseThrow(IllegalArgumentException::new)
.getRowCount();

final Set<Integer> constructedGroup = new HashSet<>();
final BitSet visitedRows = new BitSet((int) rowCount);
final AtomicInteger outputRowCount = new AtomicInteger();

// the first index's row is just the total aggregate row number
ctx.appendRow(() -> Collections.singletonList(outputRowCount.get()));

for (long i = 0; i < rowCount; i++) {
final int currentGroupRow = (int) i;
final CellValue currentGroup = groupColumn.getValueAt(i);
final int currentGroupHash = currentGroup.hashCode();
if (constructedGroup.contains(currentGroupHash)) {
// this group already fully constructed, we don't have to iterate the rows again
continue;
}

// index structure outputRowIndex + 1 -> [groupValue, aggregateColumnRow1Value, aggregateColumnRow2Value, ...]
ctx.appendRow(() -> {
// This callback is executed lazily as iterator exactly before the index row is written.
final List<Integer> groupRow = new ArrayList<>();
// first index column is always the group values.
groupRow.add(currentGroupRow);

// safe to cast to int here since max rowcount for index type rows is Integer.MAX_VALUE
for (int row = visitedRows.nextClearBit(0); row < rowCount; row++) {
if (visitedRows.get(row) || !groupColumn.getValueAt(row).equals(currentGroup)) {
continue; // this row belong to another group, so skip...
}
groupRow.add(row);
visitedRows.set(row);
}
return groupRow;
});
constructedGroup.add(currentGroupHash);
}
outputRowCount.set(constructedGroup.size());
})
.build())
.forOutputNode(OUTPUT_ID, (ctx, columnManager) -> {
//noinspection unchecked
final String aggregateType = ((List<String>) ctx.getStepPropertyValue(AGGREGATE_TYPE_PROP).orElseThrow(IllegalArgumentException::new)).get(0);

columnManager.onValue(GROUP_COLUMN, row -> {
final List<CellValue> indexRow = ctx.getIndexRowValues(INDEX_NAME, (int) row + 1);
final InputColumn groupColumn = ctx.getColumnFromChooserValues(GROUP_COLUMN_PROP).get(0);
if (indexRow.isEmpty()) {
throw new IllegalStateException("Index row " + row + " must not be empty.");
}
return groupColumn.getValueAt(indexRow.get(0).toLong());
});

columnManager.onValue(AGGREGATE_COLUMN, row -> {
final List<CellValue> indexRow = ctx.getIndexRowValues(INDEX_NAME, (int) row + 1);
final InputColumn aggregateColumn = ctx.getColumnFromChooserValues(AGGREGATE_COLUMN_PROP).get(0);
if (indexRow.size() <= 1) {
throw new IllegalStateException("Index row " + row + " has invalid structure: index columns size <= 1");
}
final List<CellValue> values = indexRow.subList(1, indexRow.size());
switch (aggregateType) {
case SUM:
return values.stream().map(v -> aggregateColumn.getValueAt(v.toLong())).mapToDouble(v -> v.toDouble()).sum();
case AVG:
return values.stream().map(v -> aggregateColumn.getValueAt(v.toLong())).mapToDouble(v -> v.toDouble()).average().orElse(0.0);
case MAX:
return values.stream().map(v -> aggregateColumn.getValueAt(v.toLong())).mapToDouble(v -> v.toDouble()).max().orElse(0.0);
case MIN:
return values.stream().map(v -> aggregateColumn.getValueAt(v.toLong())).mapToDouble(v -> v.toDouble()).min().orElse(0.0);
case COUNT:
return values.size();
case FIRST:
return aggregateColumn.getValueAt(values.get(0).toLong());
case LAST:
return aggregateColumn.getValueAt(values.get(values.size() - 1).toLong());
case PIPE:
return values.stream().map(v -> aggregateColumn.getValueAt(v.toLong()).toString()).collect(Collectors.joining("|"));
default:
throw new IllegalStateException("Unsupported aggregate operation: " + aggregateType);
}
});

final List<CellValue> indexRow = ctx.getIndexRowValues(INDEX_NAME, 0);
if (indexRow.isEmpty()) {
throw new IllegalStateException("Index row " + 0 + " must not be empty.");
}
return indexRow.get(0).toLong();
})
.build();
}

@Override
public CustomTypeMetadata createMetadata(CustomTypeMetadataBuilder metadataBuilder) {
return metadataBuilder
.withName("Example: Aggregate Step")
.withDescription("Step to demonstrate aggregate using index API")
.withMajorVersion(0)
.withMinorVersion(0)
.withPatchVersion(0)
.withDeveloper("Experian")
.withLicense("Apache License Version 2.0")
.build();
}
}
6 changes: 6 additions & 0 deletions ExampleSteps/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ The [IPGeolocation](IPGeolocation/src/main/java/com/experian/aperture/datastudio
- Step Settings (retrieving lang settings from the UI for query)
- Concurrent asynchronous requests (using Java CompletableFuture)

## Demo Aggregate Example Step

The [DemoAggregateStep](DemoAggregateStep/src/main/java/com/experian/aperture/datastudio/sdk/step/examples/DemoAggregateStep.java) perform various aggregates operation on a single group column.

This examples relies on the SDK 2.4.0 preprocessing API.

#### HTTP Requests
The HTTP requests are made using the SDK HTTP libraries/helper classes (i.e. `WebHttpClient`, `WebHttpRequest`, `WebHttpResponse`). First, an HTTP web client (`WebHttpClient`) is set up, and a request (`WebHttpRequest`) is sent through the client using the `sendAsync()` method. This returns a `WebHttpResponse` which contains the location data of the IP address in JSON format.

Expand Down
Loading

0 comments on commit d43dc40

Please sign in to comment.