diff --git a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java index 6989be7b5..a5ee2eb07 100644 --- a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java +++ b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java @@ -218,15 +218,13 @@ public GroupByFlowBuilder groupBy(SerializableFunction keyFuncti * Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the value. * The value is the last value supplied * - * @param keyFunction key accessor * @param keyFunctions multi arg key accessors * @return GroupByFlowBuilder keyed on properties */ @SafeVarargs public final GroupByFlowBuilder, T> groupByFields( - SerializableFunction keyFunction, SerializableFunction... keyFunctions) { - return groupBy(GroupByKey.build(keyFunction, keyFunctions)); + return groupBy(GroupByKey.build(keyFunctions)); } /** @@ -234,7 +232,6 @@ public final GroupByFlowBuilder, T> groupByFields( * bucket. The key is a compound key created by a set of method reference accessors to for the value. * * @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance - * @param keyFunction key accessor * @param keyFunctions multi arg key accessors * @param The return type of the aggregating function * @param The aggregating function type @@ -244,9 +241,8 @@ public final GroupByFlowBuilder, T> groupByFields( @SafeVarargs public final > GroupByFlowBuilder, A> groupByFieldsAggregate( SerializableSupplier aggregateFunctionSupplier, - SerializableFunction keyFunction, SerializableFunction... keyFunctions) { - return groupBy(GroupByKey.build(keyFunction, keyFunctions), aggregateFunctionSupplier); + return groupBy(GroupByKey.build(keyFunctions), aggregateFunctionSupplier); } /** @@ -254,16 +250,14 @@ public final > GroupByFlowBuilder GroupByFlowBuilder, V> groupByFieldsAndGet( SerializableFunction valueFunction, - SerializableFunction keyFunction, SerializableFunction... keyFunctions) { - return groupBy(GroupByKey.build(keyFunction, keyFunctions), valueFunction); + return groupBy(GroupByKey.build(keyFunctions), valueFunction); } /** @@ -272,7 +266,6 @@ public final GroupByFlowBuilder, V> groupByFieldsAndGet( * * @param valueFunction the value that will be stored in the groupBy * @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance - * @param keyFunction key accessor * @param keyFunctions multi arg key accessors * @param Value type extracted from the incoming data flow * @param The return type of the aggregating function @@ -284,9 +277,8 @@ public final GroupByFlowBuilder, V> groupByFieldsAndGet( public final > GroupByFlowBuilder, A> groupByFieldsGetAndAggregate( SerializableFunction valueFunction, SerializableSupplier aggregateFunctionSupplier, - SerializableFunction keyFunction, SerializableFunction... keyFunctions) { - return groupBy(GroupByKey.build(keyFunction, keyFunctions), valueFunction, aggregateFunctionSupplier); + return groupBy(GroupByKey.build(keyFunctions), valueFunction, aggregateFunctionSupplier); } public GroupByFlowBuilder> groupByToList(SerializableFunction keyFunction) { diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/groupby/GroupByKey.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/groupby/GroupByKey.java index e2e307217..2210a6e8c 100644 --- a/runtime/src/main/java/com/fluxtion/runtime/dataflow/groupby/GroupByKey.java +++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/groupby/GroupByKey.java @@ -4,6 +4,7 @@ import lombok.Getter; import lombok.ToString; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -18,20 +19,25 @@ @ToString(of = {"key", "name"}) public class GroupByKey { public final List> accessors; + public final transient List accessorsMethods; private final transient StringBuilder keyHolder = new StringBuilder(); @Getter private final transient Class valueClass; @Getter private transient String key; + @Getter + private transient T keyInstance; private transient final String name; public GroupByKey(List> accessorsToAdd) { this.accessors = new ArrayList<>(); + this.accessorsMethods = new ArrayList<>(); String tmpName = ""; for (LambdaReflection.SerializableFunction element : accessorsToAdd) { if (!accessors.contains(element)) { accessors.add(element); tmpName += "_" + element.method().getName(); + accessorsMethods.add(element.method()); } } valueClass = (Class) accessors.get(0).method().getDeclaringClass(); @@ -49,6 +55,7 @@ public GroupByKey(LambdaReflection.SerializableFunction... accessorList) { private GroupByKey(GroupByKey toClone) { accessors = toClone.accessors; + accessorsMethods = toClone.accessorsMethods; valueClass = toClone.getValueClass(); name = toClone.name; } @@ -59,10 +66,8 @@ public static LambdaReflection.SerializableFunction> build( @SafeVarargs public static LambdaReflection.SerializableFunction> build( - LambdaReflection.SerializableFunction accessor, LambdaReflection.SerializableFunction... accessorList) { List> accessors = new ArrayList<>(); - accessors.add(accessor); accessors.addAll(Arrays.asList(accessorList)); GroupByKey accessorKey = new GroupByKey<>(accessors); return accessorKey::toKey; @@ -70,13 +75,14 @@ public static LambdaReflection.SerializableFunction> build( public boolean keyPresent(LambdaReflection.SerializableFunction keyToCheck) { - return accessors.contains(keyToCheck); + return accessorsMethods.contains(keyToCheck.method()); } public GroupByKey toKey(T input) { //TODO add object pooling GroupByKey cloned = new GroupByKey<>(this); cloned.keyHolder.setLength(0); + cloned.keyInstance = input; for (int i = 0, accessorsSize = accessors.size(); i < accessorsSize; i++) { LambdaReflection.SerializableFunction accessor = accessors.get(i); cloned.keyHolder.append(accessor.apply(input).toString());