From 3d26f7318fbcff6f53e0c30f28d8f12f68ee62f1 Mon Sep 17 00:00:00 2001 From: runner Date: Sat, 30 Sep 2023 11:43:24 +0000 Subject: [PATCH 01/10] updating poms for 9.1.11-SNAPSHOT development --- compiler/pom.xml | 2 +- parent-root/pom.xml | 2 +- pom.xml | 2 +- runtime/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/compiler/pom.xml b/compiler/pom.xml index 71bdc645c..074ac330a 100644 --- a/compiler/pom.xml +++ b/compiler/pom.xml @@ -19,7 +19,7 @@ Copyright (C) 2018 V12 Technology Ltd. com.fluxtion root-parent-pom - 9.1.10-SNAPSHOT + 9.1.11-SNAPSHOT ../parent-root/pom.xml diff --git a/parent-root/pom.xml b/parent-root/pom.xml index f1fe52a76..58e8af066 100644 --- a/parent-root/pom.xml +++ b/parent-root/pom.xml @@ -21,7 +21,7 @@ 4.0.0 com.fluxtion root-parent-pom - 9.1.10-SNAPSHOT + 9.1.11-SNAPSHOT pom fluxtion :: poms :: parent root diff --git a/pom.xml b/pom.xml index 4e2c07d15..57b9627d0 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ along with this program. If not, see 4.0.0 com.fluxtion fluxtion.master - 9.1.10-SNAPSHOT + 9.1.11-SNAPSHOT pom fluxtion diff --git a/runtime/pom.xml b/runtime/pom.xml index 8b3e64c63..bbb767752 100644 --- a/runtime/pom.xml +++ b/runtime/pom.xml @@ -20,7 +20,7 @@ Copyright (C) 2018 V12 Technology Ltd. com.fluxtion root-parent-pom - 9.1.10-SNAPSHOT + 9.1.11-SNAPSHOT ../parent-root/pom.xml From 76dc51db6f899cb6837432482689e77b71c70e71 Mon Sep 17 00:00:00 2001 From: runner Date: Sat, 30 Sep 2023 11:43:34 +0000 Subject: [PATCH 02/10] updating poms for branch'release/9.1.10' with non-snapshot versions --- compiler/pom.xml | 2 +- parent-root/pom.xml | 2 +- pom.xml | 2 +- runtime/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/compiler/pom.xml b/compiler/pom.xml index 71bdc645c..cd7055c08 100644 --- a/compiler/pom.xml +++ b/compiler/pom.xml @@ -19,7 +19,7 @@ Copyright (C) 2018 V12 Technology Ltd. com.fluxtion root-parent-pom - 9.1.10-SNAPSHOT + 9.1.10 ../parent-root/pom.xml diff --git a/parent-root/pom.xml b/parent-root/pom.xml index f1fe52a76..a8d5b0671 100644 --- a/parent-root/pom.xml +++ b/parent-root/pom.xml @@ -21,7 +21,7 @@ 4.0.0 com.fluxtion root-parent-pom - 9.1.10-SNAPSHOT + 9.1.10 pom fluxtion :: poms :: parent root diff --git a/pom.xml b/pom.xml index 4e2c07d15..192a63a23 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ along with this program. If not, see 4.0.0 com.fluxtion fluxtion.master - 9.1.10-SNAPSHOT + 9.1.10 pom fluxtion diff --git a/runtime/pom.xml b/runtime/pom.xml index 8b3e64c63..374436d6d 100644 --- a/runtime/pom.xml +++ b/runtime/pom.xml @@ -20,7 +20,7 @@ Copyright (C) 2018 V12 Technology Ltd. com.fluxtion root-parent-pom - 9.1.10-SNAPSHOT + 9.1.10 ../parent-root/pom.xml From f9e413221866503e1e4a22850951aa9be09958f7 Mon Sep 17 00:00:00 2001 From: runner Date: Sat, 30 Sep 2023 11:47:41 +0000 Subject: [PATCH 03/10] updating develop poms to master versions to avoid merge conflicts --- compiler/pom.xml | 2 +- parent-root/pom.xml | 2 +- pom.xml | 2 +- runtime/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/compiler/pom.xml b/compiler/pom.xml index 074ac330a..cd7055c08 100644 --- a/compiler/pom.xml +++ b/compiler/pom.xml @@ -19,7 +19,7 @@ Copyright (C) 2018 V12 Technology Ltd. com.fluxtion root-parent-pom - 9.1.11-SNAPSHOT + 9.1.10 ../parent-root/pom.xml diff --git a/parent-root/pom.xml b/parent-root/pom.xml index 58e8af066..a8d5b0671 100644 --- a/parent-root/pom.xml +++ b/parent-root/pom.xml @@ -21,7 +21,7 @@ 4.0.0 com.fluxtion root-parent-pom - 9.1.11-SNAPSHOT + 9.1.10 pom fluxtion :: poms :: parent root diff --git a/pom.xml b/pom.xml index 57b9627d0..192a63a23 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ along with this program. If not, see 4.0.0 com.fluxtion fluxtion.master - 9.1.11-SNAPSHOT + 9.1.10 pom fluxtion diff --git a/runtime/pom.xml b/runtime/pom.xml index bbb767752..374436d6d 100644 --- a/runtime/pom.xml +++ b/runtime/pom.xml @@ -20,7 +20,7 @@ Copyright (C) 2018 V12 Technology Ltd. com.fluxtion root-parent-pom - 9.1.11-SNAPSHOT + 9.1.10 ../parent-root/pom.xml From 6afec4c53294e9ea0386c4f84fe56fffdc070dee Mon Sep 17 00:00:00 2001 From: runner Date: Sat, 30 Sep 2023 11:47:42 +0000 Subject: [PATCH 04/10] Updating develop poms back to pre merge state --- compiler/pom.xml | 2 +- parent-root/pom.xml | 2 +- pom.xml | 2 +- runtime/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/compiler/pom.xml b/compiler/pom.xml index cd7055c08..074ac330a 100644 --- a/compiler/pom.xml +++ b/compiler/pom.xml @@ -19,7 +19,7 @@ Copyright (C) 2018 V12 Technology Ltd. com.fluxtion root-parent-pom - 9.1.10 + 9.1.11-SNAPSHOT ../parent-root/pom.xml diff --git a/parent-root/pom.xml b/parent-root/pom.xml index a8d5b0671..58e8af066 100644 --- a/parent-root/pom.xml +++ b/parent-root/pom.xml @@ -21,7 +21,7 @@ 4.0.0 com.fluxtion root-parent-pom - 9.1.10 + 9.1.11-SNAPSHOT pom fluxtion :: poms :: parent root diff --git a/pom.xml b/pom.xml index 192a63a23..57b9627d0 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ along with this program. If not, see 4.0.0 com.fluxtion fluxtion.master - 9.1.10 + 9.1.11-SNAPSHOT pom fluxtion diff --git a/runtime/pom.xml b/runtime/pom.xml index 374436d6d..bbb767752 100644 --- a/runtime/pom.xml +++ b/runtime/pom.xml @@ -20,7 +20,7 @@ Copyright (C) 2018 V12 Technology Ltd. com.fluxtion root-parent-pom - 9.1.10 + 9.1.11-SNAPSHOT ../parent-root/pom.xml From b97198c72b2d0444407971121496db7adb498c34 Mon Sep 17 00:00:00 2001 From: greg higgins Date: Tue, 24 Oct 2023 08:43:00 +0100 Subject: [PATCH 05/10] docs (#242) Co-authored-by: greg --- .../fluxtion/compiler/FluxtionCompilerConfig.java | 13 +++++++++++++ .../compiler/EventProcessorCompilation.java | 2 +- .../compiler/EventProcessorGenerator.java | 13 ++++++++++--- 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/compiler/src/main/java/com/fluxtion/compiler/FluxtionCompilerConfig.java b/compiler/src/main/java/com/fluxtion/compiler/FluxtionCompilerConfig.java index 42b3ff417..25db8b8ab 100644 --- a/compiler/src/main/java/com/fluxtion/compiler/FluxtionCompilerConfig.java +++ b/compiler/src/main/java/com/fluxtion/compiler/FluxtionCompilerConfig.java @@ -94,6 +94,10 @@ public class FluxtionCompilerConfig { * The if {@link #writeSourceToFile} is false this writer will capture the content of the generation process */ private Writer sourceWriter; + /** + * Flag controlling adding build time to generated source files + */ + private boolean addBuildTime; private transient ClassLoader classLoader; @@ -101,6 +105,7 @@ public FluxtionCompilerConfig() { generateDescription = false; writeSourceToFile = false; compileSource = true; + addBuildTime = false; formatSource = true; templateSep = JAVA_TEMPLATE; classLoader = FluxtionCompilerConfig.class.getClassLoader(); @@ -157,6 +162,14 @@ public void setWriteSourceToFile(boolean writeSourceToFile) { this.writeSourceToFile = writeSourceToFile; } + public boolean isAddBuildTime() { + return addBuildTime; + } + + public void setAddBuildTime(boolean addBuildTime) { + this.addBuildTime = addBuildTime; + } + public void setPackageName(String packageName) { this.packageName = packageName; } diff --git a/compiler/src/main/java/com/fluxtion/compiler/generation/compiler/EventProcessorCompilation.java b/compiler/src/main/java/com/fluxtion/compiler/generation/compiler/EventProcessorCompilation.java index 83a80f261..ae0fb81c7 100644 --- a/compiler/src/main/java/com/fluxtion/compiler/generation/compiler/EventProcessorCompilation.java +++ b/compiler/src/main/java/com/fluxtion/compiler/generation/compiler/EventProcessorCompilation.java @@ -105,7 +105,7 @@ private Class generateSep() throws Exception { } EventProcessorGenerator eventProcessorGenerator = new EventProcessorGenerator(); - eventProcessorGenerator.templateSep(builderConfig, compilerConfig.isGenerateDescription(), writer); + eventProcessorGenerator.templateSep(builderConfig, compilerConfig, writer); GenerationContext generationConfig = GenerationContext.SINGLETON; String fqn = generationConfig.getPackageName() + "." + generationConfig.getSepClassName(); File file = new File(generationConfig.getPackageDirectory(), generationConfig.getSepClassName() + ".java"); diff --git a/compiler/src/main/java/com/fluxtion/compiler/generation/compiler/EventProcessorGenerator.java b/compiler/src/main/java/com/fluxtion/compiler/generation/compiler/EventProcessorGenerator.java index 6bc1ed0c9..5a6f8c2ce 100644 --- a/compiler/src/main/java/com/fluxtion/compiler/generation/compiler/EventProcessorGenerator.java +++ b/compiler/src/main/java/com/fluxtion/compiler/generation/compiler/EventProcessorGenerator.java @@ -18,6 +18,7 @@ package com.fluxtion.compiler.generation.compiler; import com.fluxtion.compiler.EventProcessorConfig; +import com.fluxtion.compiler.FluxtionCompilerConfig; import com.fluxtion.compiler.builder.factory.NodeFactoryLocator; import com.fluxtion.compiler.builder.factory.NodeFactoryRegistration; import com.fluxtion.compiler.generation.GenerationContext; @@ -63,6 +64,7 @@ public class EventProcessorGenerator { private EventProcessorConfig config; private static final Logger LOG = LoggerFactory.getLogger(EventProcessorGenerator.class); private SimpleEventProcessorModel simpleEventProcessorModel; + private FluxtionCompilerConfig compilerConfig; public InMemoryEventProcessor inMemoryProcessor(EventProcessorConfig config, boolean generateDescription) throws Exception { config.buildConfig(); @@ -95,10 +97,11 @@ public InMemoryEventProcessor inMemoryProcessor(EventProcessorConfig config, boo return new InMemoryEventProcessor(simpleEventProcessorModel, config); } - public void templateSep(EventProcessorConfig config, boolean generateDescription, Writer writer) throws Exception { + public void templateSep(EventProcessorConfig config, FluxtionCompilerConfig compilerConfig, Writer writer) throws Exception { ExecutorService execSvc = Executors.newCachedThreadPool(); config.buildConfig(); this.config = config; + this.compilerConfig = compilerConfig; LOG.debug("init velocity"); initVelocity(); LOG.debug("start graph calc"); @@ -117,7 +120,7 @@ public void templateSep(EventProcessorConfig config, boolean generateDescription simpleEventProcessorModel.generateMetaModel(config.isSupportDirtyFiltering()); //TODO add conditionality for different target languages //buildJava output - if (generateDescription) { + if (compilerConfig.isGenerateDescription()) { execSvc.submit(() -> { LOG.debug("start exporting graphML/images"); exportGraphMl(graph); @@ -190,7 +193,11 @@ private void templateJavaOutput(Writer templateWriter) throws Exception { private void addVersionInformation(Context ctx) { ctx.put("generator_version_information", this.getClass().getPackage().getImplementationVersion()); ctx.put("api_version_information", OnEventHandler.class.getPackage().getImplementationVersion()); - ctx.put("build_time", LocalDateTime.now()); + if (compilerConfig.isAddBuildTime()) { + ctx.put("build_time", LocalDateTime.now()); + } else { + ctx.put("build_time", "Not available"); + } } public static void formatSource(File outFile) { From 1194ed38b04ffb5b33ef8c7901b9971a197e0be7 Mon Sep 17 00:00:00 2001 From: runner Date: Tue, 24 Oct 2023 08:20:15 +0000 Subject: [PATCH 06/10] updating poms for 9.1.12-SNAPSHOT development --- compiler/pom.xml | 2 +- parent-root/pom.xml | 2 +- pom.xml | 2 +- runtime/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/compiler/pom.xml b/compiler/pom.xml index 074ac330a..3ae00c815 100644 --- a/compiler/pom.xml +++ b/compiler/pom.xml @@ -19,7 +19,7 @@ Copyright (C) 2018 V12 Technology Ltd. com.fluxtion root-parent-pom - 9.1.11-SNAPSHOT + 9.1.12-SNAPSHOT ../parent-root/pom.xml diff --git a/parent-root/pom.xml b/parent-root/pom.xml index 58e8af066..c81a26276 100644 --- a/parent-root/pom.xml +++ b/parent-root/pom.xml @@ -21,7 +21,7 @@ 4.0.0 com.fluxtion root-parent-pom - 9.1.11-SNAPSHOT + 9.1.12-SNAPSHOT pom fluxtion :: poms :: parent root diff --git a/pom.xml b/pom.xml index 57b9627d0..fb72804dc 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ along with this program. If not, see 4.0.0 com.fluxtion fluxtion.master - 9.1.11-SNAPSHOT + 9.1.12-SNAPSHOT pom fluxtion diff --git a/runtime/pom.xml b/runtime/pom.xml index bbb767752..2d2facd4e 100644 --- a/runtime/pom.xml +++ b/runtime/pom.xml @@ -20,7 +20,7 @@ Copyright (C) 2018 V12 Technology Ltd. com.fluxtion root-parent-pom - 9.1.11-SNAPSHOT + 9.1.12-SNAPSHOT ../parent-root/pom.xml From ea972c023c9ec28d5b710d83cf6e0518e310d7e4 Mon Sep 17 00:00:00 2001 From: runner Date: Tue, 24 Oct 2023 08:20:22 +0000 Subject: [PATCH 07/10] updating poms for branch'release/9.1.11' with non-snapshot versions --- compiler/pom.xml | 2 +- parent-root/pom.xml | 2 +- pom.xml | 2 +- runtime/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/compiler/pom.xml b/compiler/pom.xml index 074ac330a..e5a92d8aa 100644 --- a/compiler/pom.xml +++ b/compiler/pom.xml @@ -19,7 +19,7 @@ Copyright (C) 2018 V12 Technology Ltd. com.fluxtion root-parent-pom - 9.1.11-SNAPSHOT + 9.1.11 ../parent-root/pom.xml diff --git a/parent-root/pom.xml b/parent-root/pom.xml index 58e8af066..1f6e4efeb 100644 --- a/parent-root/pom.xml +++ b/parent-root/pom.xml @@ -21,7 +21,7 @@ 4.0.0 com.fluxtion root-parent-pom - 9.1.11-SNAPSHOT + 9.1.11 pom fluxtion :: poms :: parent root diff --git a/pom.xml b/pom.xml index 57b9627d0..783f20d16 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ along with this program. If not, see 4.0.0 com.fluxtion fluxtion.master - 9.1.11-SNAPSHOT + 9.1.11 pom fluxtion diff --git a/runtime/pom.xml b/runtime/pom.xml index bbb767752..500a3e2a7 100644 --- a/runtime/pom.xml +++ b/runtime/pom.xml @@ -20,7 +20,7 @@ Copyright (C) 2018 V12 Technology Ltd. com.fluxtion root-parent-pom - 9.1.11-SNAPSHOT + 9.1.11 ../parent-root/pom.xml From 940234aa0daf88cff0a2f991325ec469e7a801d6 Mon Sep 17 00:00:00 2001 From: runner Date: Tue, 24 Oct 2023 08:27:04 +0000 Subject: [PATCH 08/10] updating develop poms to master versions to avoid merge conflicts --- compiler/pom.xml | 2 +- parent-root/pom.xml | 2 +- pom.xml | 2 +- runtime/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/compiler/pom.xml b/compiler/pom.xml index 3ae00c815..e5a92d8aa 100644 --- a/compiler/pom.xml +++ b/compiler/pom.xml @@ -19,7 +19,7 @@ Copyright (C) 2018 V12 Technology Ltd. com.fluxtion root-parent-pom - 9.1.12-SNAPSHOT + 9.1.11 ../parent-root/pom.xml diff --git a/parent-root/pom.xml b/parent-root/pom.xml index c81a26276..1f6e4efeb 100644 --- a/parent-root/pom.xml +++ b/parent-root/pom.xml @@ -21,7 +21,7 @@ 4.0.0 com.fluxtion root-parent-pom - 9.1.12-SNAPSHOT + 9.1.11 pom fluxtion :: poms :: parent root diff --git a/pom.xml b/pom.xml index fb72804dc..783f20d16 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ along with this program. If not, see 4.0.0 com.fluxtion fluxtion.master - 9.1.12-SNAPSHOT + 9.1.11 pom fluxtion diff --git a/runtime/pom.xml b/runtime/pom.xml index 2d2facd4e..500a3e2a7 100644 --- a/runtime/pom.xml +++ b/runtime/pom.xml @@ -20,7 +20,7 @@ Copyright (C) 2018 V12 Technology Ltd. com.fluxtion root-parent-pom - 9.1.12-SNAPSHOT + 9.1.11 ../parent-root/pom.xml From 5ebccebd404600088a4a77a64d82da20f6bc8964 Mon Sep 17 00:00:00 2001 From: runner Date: Tue, 24 Oct 2023 08:27:05 +0000 Subject: [PATCH 09/10] Updating develop poms back to pre merge state --- compiler/pom.xml | 2 +- parent-root/pom.xml | 2 +- pom.xml | 2 +- runtime/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/compiler/pom.xml b/compiler/pom.xml index e5a92d8aa..3ae00c815 100644 --- a/compiler/pom.xml +++ b/compiler/pom.xml @@ -19,7 +19,7 @@ Copyright (C) 2018 V12 Technology Ltd. com.fluxtion root-parent-pom - 9.1.11 + 9.1.12-SNAPSHOT ../parent-root/pom.xml diff --git a/parent-root/pom.xml b/parent-root/pom.xml index 1f6e4efeb..c81a26276 100644 --- a/parent-root/pom.xml +++ b/parent-root/pom.xml @@ -21,7 +21,7 @@ 4.0.0 com.fluxtion root-parent-pom - 9.1.11 + 9.1.12-SNAPSHOT pom fluxtion :: poms :: parent root diff --git a/pom.xml b/pom.xml index 783f20d16..fb72804dc 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ along with this program. If not, see 4.0.0 com.fluxtion fluxtion.master - 9.1.11 + 9.1.12-SNAPSHOT pom fluxtion diff --git a/runtime/pom.xml b/runtime/pom.xml index 500a3e2a7..2d2facd4e 100644 --- a/runtime/pom.xml +++ b/runtime/pom.xml @@ -20,7 +20,7 @@ Copyright (C) 2018 V12 Technology Ltd. com.fluxtion root-parent-pom - 9.1.11 + 9.1.12-SNAPSHOT ../parent-root/pom.xml From f3d16f1585e30a1c99c4c0d69cde1040d699193f Mon Sep 17 00:00:00 2001 From: command-line Date: Sun, 3 Dec 2023 16:29:34 +0000 Subject: [PATCH 10/10] Adds support in data flow for mapping to sets and lists in FlowBuilder. Compound keys in groupBy are supported with GroupByKey in FlowBuilder. --- .../builder/dataflow/FlowBuilder.java | 168 ++++++++++++++++-- .../builder/dataflow/FlowBuilderBase.java | 4 +- .../serialiser/FieldSerializer.java | 5 + .../dataflow/EventStreamBuildTest.java | 117 ++++++++++-- .../builder/dataflow/GroupByTest.java | 160 +++++++++++++++++ .../builder/dataflow/NestedGroupByTest.java | 4 +- .../function/AggregateToListFlowFunction.java | 2 +- .../function/AggregateToSetFlowFunction.java | 9 +- .../runtime/dataflow/groupby/GroupByKey.java | 109 ++++++++++++ .../runtime/dataflow/helpers/Collectors.java | 21 ++- .../dataflow/helpers/GroupingFactory.java | 2 +- 11 files changed, 561 insertions(+), 40 deletions(-) create mode 100644 runtime/src/main/java/com/fluxtion/runtime/dataflow/groupby/GroupByKey.java diff --git a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java index c4598bd95..6989be7b5 100644 --- a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java +++ b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java @@ -9,16 +9,9 @@ import com.fluxtion.runtime.dataflow.aggregate.function.TimedSlidingWindow; import com.fluxtion.runtime.dataflow.aggregate.function.TumblingWindow; import com.fluxtion.runtime.dataflow.function.BinaryMapFlowFunction.BinaryMapToRefFlowFunction; -import com.fluxtion.runtime.dataflow.function.FlatMapArrayFlowFunction; -import com.fluxtion.runtime.dataflow.function.FlatMapFlowFunction; -import com.fluxtion.runtime.dataflow.function.LookupFlowFunction; -import com.fluxtion.runtime.dataflow.function.MapFlowFunction; +import com.fluxtion.runtime.dataflow.function.*; import com.fluxtion.runtime.dataflow.function.MapFlowFunction.MapRef2RefFlowFunction; -import com.fluxtion.runtime.dataflow.function.MergeFlowFunction; -import com.fluxtion.runtime.dataflow.groupby.GroupBy; -import com.fluxtion.runtime.dataflow.groupby.GroupByFlowFunctionWrapper; -import com.fluxtion.runtime.dataflow.groupby.GroupByTimedSlidingWindow; -import com.fluxtion.runtime.dataflow.groupby.GroupByTumblingWindow; +import com.fluxtion.runtime.dataflow.groupby.*; import com.fluxtion.runtime.dataflow.helpers.Aggregates; import com.fluxtion.runtime.dataflow.helpers.Collectors; import com.fluxtion.runtime.dataflow.helpers.DefaultValue; @@ -80,6 +73,30 @@ public FlowBuilder map(SerializableFunction mapFunction) { return super.mapBase(mapFunction); } + public FlowBuilder> mapToSet() { + return map(Collectors.toSet()); + } + + public FlowBuilder> mapToSet(SerializableFunction mapFunction) { + return map(mapFunction).map(Collectors.toSet()); + } + + public FlowBuilder> mapToList() { + return map(Collectors.toList()); + } + + public FlowBuilder> mapToList(SerializableFunction mapFunction) { + return map(mapFunction).map(Collectors.toList()); + } + + public FlowBuilder> mapToList(int maxElements) { + return map(Collectors.toList(maxElements)); + } + + public FlowBuilder> mapToList(SerializableFunction mapFunction, int maxElements) { + return map(mapFunction).map(Collectors.toList(maxElements)); + } + public FlowBuilder mapBiFunction(SerializableBiFunction int2IntFunction, FlowBuilder stream2Builder) { @@ -130,6 +147,19 @@ public FlowBuilder flatMapFromArray(SerializableFunction iterable new TimedSlidingWindow<>(eventStream, aggregateFunction, bucketSizeMillis, bucketsPerWindow)); } + /** + * Aggregates a flow using a key function to group by and an aggregating function to process new values for a keyed + * bucket. + * + * @param keyFunction The key function that groups and buckets incoming values + * @param valueFunction The value that is extracted from the incoming stream and applied to the aggregating function + * @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance + * @param Value type extracted from the incoming data flow + * @param The type of the key used to group values + * @param The return type of the aggregating function + * @param The aggregating function type + * @return A GroupByFlowBuilder for the aggregated flow + */ public > GroupByFlowBuilder groupBy(SerializableFunction keyFunction, SerializableFunction valueFunction, @@ -139,33 +169,147 @@ public FlowBuilder flatMapFromArray(SerializableFunction iterable return new GroupByFlowBuilder<>(x); } + /** + * Specialisation of groupBy where the value is the identity of the incoming data flow + * + * @param keyFunction The key function that groups and buckets incoming values + * @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance + * @param The type of the key used to group values + * @param The return type of the aggregating function + * @param The aggregating function type + * @return A GroupByFlowBuilder for the aggregated flow + * @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier) + */ public > GroupByFlowBuilder groupBy(SerializableFunction keyFunction, SerializableSupplier aggregateFunctionSupplier) { return groupBy(keyFunction, Mappers::identity, aggregateFunctionSupplier); } + /** + * Specialisation of groupBy where the output of the groupBy is the last value received for a bucket. The value is + * extracted using the value function + * + * @param keyFunction The key function that groups and buckets incoming values + * @param valueFunction The value that is extracted from the incoming stream and applied to the aggregating function + * @param Value type extracted from the incoming data flow + * @param The type of the key used to group values + * @return A GroupByFlowBuilder for the aggregated flow + * @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier) + */ public GroupByFlowBuilder groupBy( SerializableFunction keyFunction, SerializableFunction valueFunction) { return groupBy(keyFunction, valueFunction, Aggregates.identityFactory()); } + /** + * Specialisation of groupBy where the output of the groupBy is the last value received for a bucket, where + * the value is the identity of the incoming data flow + * + * @param keyFunction The key function that groups and buckets incoming values + * @param The type of the key used to group values + * @return A GroupByFlowBuilder for the aggregated flow + */ public GroupByFlowBuilder groupBy(SerializableFunction keyFunction) { return groupBy(keyFunction, Mappers::identity); } + /** + * Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the value. + * The value is the last value supplied + * + * @param keyFunction key accessor + * @param keyFunctions multi arg key accessors + * @return GroupByFlowBuilder keyed on properties + */ + @SafeVarargs + public final GroupByFlowBuilder, T> groupByFields( + SerializableFunction keyFunction, + SerializableFunction... keyFunctions) { + return groupBy(GroupByKey.build(keyFunction, keyFunctions)); + } + + /** + * Aggregates a flow using a key to group by and an aggregating function to process new values for a keyed + * bucket. The key is a compound key created by a set of method reference accessors to for the value. + * + * @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance + * @param keyFunction key accessor + * @param keyFunctions multi arg key accessors + * @param The return type of the aggregating function + * @param The aggregating function type + * @return A GroupByFlowBuilder for the aggregated flow + * @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier) + */ + @SafeVarargs + public final > GroupByFlowBuilder, A> groupByFieldsAggregate( + SerializableSupplier aggregateFunctionSupplier, + SerializableFunction keyFunction, + SerializableFunction... keyFunctions) { + return groupBy(GroupByKey.build(keyFunction, keyFunctions), aggregateFunctionSupplier); + } + + /** + * Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the key + * The value is extracted from the input using the value function + * + * @param valueFunction the value that will be stored in the groupBy + * @param keyFunction key accessor + * @param keyFunctions multi arg key accessors + * @return GroupByFlowBuilder keyed on properties + */ + @SafeVarargs + public final GroupByFlowBuilder, V> groupByFieldsAndGet( + SerializableFunction valueFunction, + SerializableFunction keyFunction, + SerializableFunction... keyFunctions) { + return groupBy(GroupByKey.build(keyFunction, keyFunctions), valueFunction); + } + + /** + * Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the key + * The value is extracted from the input using the value function and is used as an input to the aggregating function + * + * @param valueFunction the value that will be stored in the groupBy + * @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance + * @param keyFunction key accessor + * @param keyFunctions multi arg key accessors + * @param Value type extracted from the incoming data flow + * @param The return type of the aggregating function + * @param The aggregating function type + * @return A GroupByFlowBuilder for the aggregated flow + * @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier) + */ + @SafeVarargs + public final > GroupByFlowBuilder, A> groupByFieldsGetAndAggregate( + SerializableFunction valueFunction, + SerializableSupplier aggregateFunctionSupplier, + SerializableFunction keyFunction, + SerializableFunction... keyFunctions) { + return groupBy(GroupByKey.build(keyFunction, keyFunctions), valueFunction, aggregateFunctionSupplier); + } + public GroupByFlowBuilder> groupByToList(SerializableFunction keyFunction) { - return groupBy(keyFunction, Mappers::identity, Collectors.toList()); + return groupBy(keyFunction, Mappers::identity, Collectors.listFactory()); + } + + public GroupByFlowBuilder> groupByToList( + SerializableFunction keyFunction, SerializableFunction valueFunction) { + return groupBy(keyFunction, valueFunction, Collectors.listFactory()); } public GroupByFlowBuilder> groupByToSet(SerializableFunction keyFunction) { - return groupBy(keyFunction, Mappers::identity, Collectors.toSet()); + return groupBy(keyFunction, Mappers::identity, Collectors.setFactory()); + } + + public GroupByFlowBuilder> groupByToSet(SerializableFunction keyFunction, SerializableFunction valueFunction) { + return groupBy(keyFunction, valueFunction, Collectors.setFactory()); } public GroupByFlowBuilder> groupByToList( SerializableFunction keyFunction, int maxElementsInList) { - return groupBy(keyFunction, Mappers::identity, Collectors.toList(maxElementsInList)); + return groupBy(keyFunction, Mappers::identity, Collectors.listFactory(maxElementsInList)); } public > GroupByFlowBuilder diff --git a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilderBase.java b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilderBase.java index 4b996ef51..382c552e9 100644 --- a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilderBase.java +++ b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilderBase.java @@ -129,12 +129,12 @@ public FlowBuilderBase merge(FlowBuilderBase streamToMerge) { public GroupByFlowBuilder> groupByAsList(SerializableFunction keyFunction) { - return groupBy(keyFunction, Mappers::identity, Collectors.toList()); + return groupBy(keyFunction, Mappers::identity, Collectors.listFactory()); } public GroupByFlowBuilder> groupByAsList(SerializableFunction keyFunction, int maxElementsInList) { - return groupBy(keyFunction, Mappers::identity, Collectors.toList(maxElementsInList)); + return groupBy(keyFunction, Mappers::identity, Collectors.listFactory(maxElementsInList)); } public > GroupByFlowBuilder diff --git a/compiler/src/main/java/com/fluxtion/compiler/generation/serialiser/FieldSerializer.java b/compiler/src/main/java/com/fluxtion/compiler/generation/serialiser/FieldSerializer.java index 71a7a03dd..f6f635ea7 100644 --- a/compiler/src/main/java/com/fluxtion/compiler/generation/serialiser/FieldSerializer.java +++ b/compiler/src/main/java/com/fluxtion/compiler/generation/serialiser/FieldSerializer.java @@ -4,6 +4,7 @@ import com.fluxtion.compiler.generation.GenerationContext; import com.fluxtion.compiler.generation.model.Field; import com.fluxtion.compiler.generation.util.ClassUtils; +import com.fluxtion.runtime.dataflow.groupby.GroupByKey; import com.fluxtion.runtime.dataflow.helpers.GroupingFactory; import org.jetbrains.annotations.NotNull; @@ -184,6 +185,10 @@ public String buildTypeDeclaration(Field field, Function, String> class String genericDeclaration = "<" + inputClass + ", " + returnType + ", ?, ?>"; return genericDeclaration; } + if (instance instanceof GroupByKey) { + GroupByKey groupByKey = (GroupByKey) instance; + return "<" + classNameConverter.apply(groupByKey.getValueClass()) + ">"; + } return ""; } diff --git a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/EventStreamBuildTest.java b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/EventStreamBuildTest.java index 2559511c0..f0f3d6f80 100644 --- a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/EventStreamBuildTest.java +++ b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/EventStreamBuildTest.java @@ -27,13 +27,7 @@ import org.junit.Test; import java.util.AbstractMap.SimpleEntry; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.concurrent.atomic.LongAdder; import static com.fluxtion.compiler.builder.dataflow.DataFlow.*; @@ -494,7 +488,7 @@ public void aggregateTest() { @Test public void aggregateToLIstTest() { sep(c -> subscribe(String.class) - .aggregate(Collectors.toList(4)) + .aggregate(Collectors.listFactory(4)) .id("myList")); onEvent("A"); @@ -546,6 +540,111 @@ public void tumblingMap() { assertThat(getStreamed("sum"), is(0)); } + @Test + public void testMapToSet() { + sep(c -> DataFlow.subscribe(String.class).mapToSet().id("set")); + HashSet set = new HashSet<>(); + set.add("test"); + onEvent("test"); + onEvent("test"); + assertThat(getStreamed("set"), is(set)); + onEvent("test2"); + set.add("test2"); + assertThat(getStreamed("set"), is(set)); + onEvent("test"); + assertThat(getStreamed("set"), is(set)); + } + + @Test + public void testMapToSetFromProperty() { + sep(c -> DataFlow.subscribe(GroupByTest.Data.class).mapToSet(GroupByTest.Data::getName).id("set")); + HashSet set = new HashSet<>(); + set.add("test"); + onEvent(new GroupByTest.Data("test", 22)); + onEvent(new GroupByTest.Data("test", 31)); + assertThat(getStreamed("set"), is(set)); + onEvent(new GroupByTest.Data("test2", 2334)); + set.add("test2"); + assertThat(getStreamed("set"), is(set)); + onEvent(new GroupByTest.Data("test", 31)); + assertThat(getStreamed("set"), is(set)); + } + + @Test + public void testMapToList() { + sep(c -> DataFlow.subscribe(String.class).mapToList().id("list")); + List list = new ArrayList<>(); + list.add("test"); + list.add("test"); + onEvent("test"); + onEvent("test"); + assertThat(getStreamed("list"), is(list)); + onEvent("test2"); + list.add("test2"); + assertThat(getStreamed("list"), is(list)); + onEvent("test"); + list.add("test"); + assertThat(getStreamed("list"), is(list)); + } + + @Test + public void testMapToList_MaxElements() { + sep(c -> DataFlow.subscribe(String.class).mapToList(2).id("list")); + List list = new ArrayList<>(); + list.add("test"); + list.add("test"); + onEvent("test"); + onEvent("test"); + assertThat(getStreamed("list"), is(list)); + //deleting + onEvent("test2"); + list.add("test2"); + list.remove(0); + assertThat(getStreamed("list"), is(list)); + //deleting + onEvent("test"); + list.add("test"); + list.remove(0); + assertThat(getStreamed("list"), is(list)); + } + + @Test + public void testMapToListFromProperty() { + sep(c -> DataFlow.subscribe(GroupByTest.Data.class).mapToList(GroupByTest.Data::getName).id("list")); + List list = new ArrayList<>(); + list.add("test"); + list.add("test"); + onEvent(new GroupByTest.Data("test", 22)); + onEvent(new GroupByTest.Data("test", 31)); + assertThat(getStreamed("list"), is(list)); + onEvent(new GroupByTest.Data("test2", 2334)); + list.add("test2"); + assertThat(getStreamed("list"), is(list)); + onEvent(new GroupByTest.Data("test3", 3451)); + list.add("test3"); + assertThat(getStreamed("list"), is(list)); + } + + @Test + public void testMapToListFromProperty_MaxElements() { + sep(c -> DataFlow.subscribe(GroupByTest.Data.class).mapToList(GroupByTest.Data::getName, 2).id("list")); + List list = new ArrayList<>(); + list.add("test1"); + list.add("test2"); + onEvent(new GroupByTest.Data("test1", 22)); + onEvent(new GroupByTest.Data("test2", 31)); + assertThat(getStreamed("list"), is(list)); + //deleting + onEvent(new GroupByTest.Data("test3", 2334)); + list.add("test3"); + list.remove(0); + assertThat(getStreamed("list"), is(list)); + //deleting + onEvent(new GroupByTest.Data("tes4", 3451)); + list.add("tes4"); + list.remove(0); + assertThat(getStreamed("list"), is(list)); + } @Value public static class Person { @@ -554,12 +653,10 @@ public static class Person { String gender; } - public static int doubleInt(int value) { return value * 2; } - @Value public static class MergedType { int value; diff --git a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/GroupByTest.java b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/GroupByTest.java index 936831e2a..ff64cc25b 100644 --- a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/GroupByTest.java +++ b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/GroupByTest.java @@ -5,13 +5,16 @@ import com.fluxtion.compiler.builder.dataflow.EventStreamBuildTest.MyIntFilter; import com.fluxtion.compiler.generation.util.CompiledAndInterpretedSepTest.SepTestConfig; import com.fluxtion.compiler.generation.util.MultipleSepTargetInProcessTest; +import com.fluxtion.runtime.dataflow.aggregate.AggregateFlowFunction; import com.fluxtion.runtime.dataflow.aggregate.function.primitive.DoubleSumFlowFunction; import com.fluxtion.runtime.dataflow.aggregate.function.primitive.IntSumFlowFunction; import com.fluxtion.runtime.dataflow.groupby.GroupBy; import com.fluxtion.runtime.dataflow.groupby.GroupBy.KeyValue; +import com.fluxtion.runtime.dataflow.groupby.GroupByKey; import com.fluxtion.runtime.dataflow.helpers.Aggregates; import com.fluxtion.runtime.dataflow.helpers.Mappers; import com.fluxtion.runtime.dataflow.helpers.Tuples; +import lombok.Getter; import lombok.Value; import lombok.val; import org.hamcrest.CoreMatchers; @@ -605,6 +608,163 @@ public void maintainModel() { onEvent(new MyEvent(SubSystem.REFERENCE, Change_type.DELETE, "greg-1")); } + @Value + public static class Data3 { + String name; + int value; + int x; + + + } + + @Getter + public static class Data3Aggregate implements AggregateFlowFunction { + int value; + + @Override + public Integer reset() { + return value; + } + + @Override + public Integer get() { + return value; + } + + @Override + public Integer aggregate(Data3 input) { + value += input.getX(); + return get(); + } + } + + @Test + public void groupingKey() { + Map, Data3> expected = new HashMap<>(); + sep(c -> { + subscribe(Data3.class) + .groupByFields(Data3::getName, Data3::getValue) + .map(GroupBy::toMap) + .id("results"); + }); + val keyFactory = GroupByKey.build(Data3::getName, Data3::getValue);//.apply(); + + onEvent(new Data3("A", 10, 1)); + expected.put(keyFactory.apply(new Data3("A", 10, 1)), new Data3("A", 10, 1)); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 data2 = new Data3("A", 10, 2); + onEvent(data2); + expected.put(keyFactory.apply(data2), data2); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 data3 = new Data3("A", 10, 3); + onEvent(data3); + expected.put(keyFactory.apply(data3), data3); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 data4 = new Data3("A", 15, 111); + onEvent(data4); + expected.put(keyFactory.apply(data4), data4); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 dataB1 = new Data3("B", 10, 1); + onEvent(dataB1); + expected.put(keyFactory.apply(dataB1), dataB1); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 dataB2 = new Data3("B", 10, 99); + onEvent(dataB2); + expected.put(keyFactory.apply(dataB2), dataB2); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + } + + @Test + public void aggregateCompoundField() { + Map, Integer> expected = new HashMap<>(); + sep(c -> { + subscribe(Data3.class) + .groupByFieldsAggregate(Data3Aggregate::new, Data3::getName, Data3::getValue) + .map(GroupBy::toMap) + .id("results"); + }); + val keyFactory = GroupByKey.build(Data3::getName, Data3::getValue);//.apply(); + + onEvent(new Data3("A", 10, 1)); + expected.put(keyFactory.apply(new Data3("A", 10, 1)), 1); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 data2 = new Data3("A", 10, 2); + onEvent(data2); + expected.put(keyFactory.apply(data2), 3); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 data3 = new Data3("A", 10, 3); + onEvent(data3); + expected.put(keyFactory.apply(data3), 6); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 data4 = new Data3("A", 15, 111); + onEvent(data4); + expected.put(keyFactory.apply(data4), 111); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 dataB1 = new Data3("B", 10, 1); + onEvent(dataB1); + expected.put(keyFactory.apply(dataB1), 1); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 dataB2 = new Data3("B", 10, 99); + onEvent(dataB2); + expected.put(keyFactory.apply(dataB2), 100); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + } + + @Test + public void aggregateExtractedPropertyCompoundField() { + Map, Integer> expected = new HashMap<>(); + sep(c -> { + subscribe(Data3.class) + .groupByFieldsGetAndAggregate( + Data3::getX, + Aggregates.intSumFactory(), + Data3::getName, Data3::getValue) + .map(GroupBy::toMap) + .id("results"); + }); + val keyFactory = GroupByKey.build(Data3::getName, Data3::getValue);//.apply(); + + onEvent(new Data3("A", 10, 1)); + expected.put(keyFactory.apply(new Data3("A", 10, 1)), 1); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 data2 = new Data3("A", 10, 2); + onEvent(data2); + expected.put(keyFactory.apply(data2), 3); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 data3 = new Data3("A", 10, 3); + onEvent(data3); + expected.put(keyFactory.apply(data3), 6); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 data4 = new Data3("A", 15, 111); + onEvent(data4); + expected.put(keyFactory.apply(data4), 111); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 dataB1 = new Data3("B", 10, 1); + onEvent(dataB1); + expected.put(keyFactory.apply(dataB1), 1); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + + Data3 dataB2 = new Data3("B", 10, 99); + onEvent(dataB2); + expected.put(keyFactory.apply(dataB2), 100); + MatcherAssert.assertThat(getStreamed("results"), is(expected)); + } + + public static MyModel updateItemScalar(MyModel model, MyEvent myEvent) { model.createItem(myEvent.getData()); return model; diff --git a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/NestedGroupByTest.java b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/NestedGroupByTest.java index 2dbe54c78..f0148d4a2 100644 --- a/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/NestedGroupByTest.java +++ b/compiler/src/test/java/com/fluxtion/compiler/builder/dataflow/NestedGroupByTest.java @@ -120,7 +120,7 @@ public void nestedGroupByToList_WithHelper() { @Test public void nestedDataFlowGroupBy_toCollector() { sep(c -> { - DataFlow.groupBy(Person::getCountry, Collectors.groupingBy(Person::getGender, Collectors.toList())) + DataFlow.groupBy(Person::getCountry, Collectors.groupingBy(Person::getGender, Collectors.listFactory())) .sink("groupBy"); }); this.addSink("groupBy", this::convertToMapList); @@ -179,7 +179,7 @@ public void nestedGroupByToCollector_List_WithHelper() { subscribe(Person.class) .groupBy( Person::getCountry, - Collectors.groupingBy(Person::getGender, Collectors.toList())) + Collectors.groupingBy(Person::getGender, Collectors.listFactory())) .sink("groupBy"); }); this.addSink("groupBy", this::convertToMapList); diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToListFlowFunction.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToListFlowFunction.java index 0bd71d06a..8e723cac2 100644 --- a/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToListFlowFunction.java +++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToListFlowFunction.java @@ -7,7 +7,7 @@ public class AggregateToListFlowFunction implements AggregateFlowFunction, AggregateToListFlowFunction> { - private final List list = new ArrayList<>(); + private transient final List list = new ArrayList<>(); private final int maxElementCount; diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToSetFlowFunction.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToSetFlowFunction.java index 8b33fe38e..405a6c825 100644 --- a/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToSetFlowFunction.java +++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/aggregate/function/AggregateToSetFlowFunction.java @@ -7,7 +7,7 @@ public class AggregateToSetFlowFunction implements AggregateFlowFunction, AggregateToSetFlowFunction> { - private final Set list = new HashSet<>(); + private transient final Set list = new HashSet<>(); @Override public Set reset() { @@ -36,11 +36,4 @@ public Set aggregate(T input) { return list; } - - public static class AggregateToSetFactory { - - public AggregateToSetFlowFunction newList() { - return new AggregateToSetFlowFunction<>(); - } - } } diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/groupby/GroupByKey.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/groupby/GroupByKey.java new file mode 100644 index 000000000..e2e307217 --- /dev/null +++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/groupby/GroupByKey.java @@ -0,0 +1,109 @@ +package com.fluxtion.runtime.dataflow.groupby; + +import com.fluxtion.runtime.partition.LambdaReflection; +import lombok.Getter; +import lombok.ToString; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Constructs a compound key for using on group by constructs in a data flow. The key is composed using method references + * of the type to be grouped by. + * + * @param The type of data flow to create a key for + */ +@ToString(of = {"key", "name"}) +public class GroupByKey { + public final List> accessors; + private final transient StringBuilder keyHolder = new StringBuilder(); + @Getter + private final transient Class valueClass; + @Getter + private transient String key; + private transient final String name; + + public GroupByKey(List> accessorsToAdd) { + this.accessors = new ArrayList<>(); + String tmpName = ""; + for (LambdaReflection.SerializableFunction element : accessorsToAdd) { + if (!accessors.contains(element)) { + accessors.add(element); + tmpName += "_" + element.method().getName(); + } + } + valueClass = (Class) accessors.get(0).method().getDeclaringClass(); + name = valueClass.getName() + tmpName; + } + + public GroupByKey(LambdaReflection.SerializableFunction accessor) { + this(Arrays.asList(accessor)); + } + + @SafeVarargs + public GroupByKey(LambdaReflection.SerializableFunction... accessorList) { + this(Arrays.asList(accessorList)); + } + + private GroupByKey(GroupByKey toClone) { + accessors = toClone.accessors; + valueClass = toClone.getValueClass(); + name = toClone.name; + } + + public static LambdaReflection.SerializableFunction> build(LambdaReflection.SerializableFunction accessor) { + return new GroupByKey<>(accessor)::toKey; + } + + @SafeVarargs + public static LambdaReflection.SerializableFunction> build( + LambdaReflection.SerializableFunction accessor, + LambdaReflection.SerializableFunction... accessorList) { + List> accessors = new ArrayList<>(); + accessors.add(accessor); + accessors.addAll(Arrays.asList(accessorList)); + GroupByKey accessorKey = new GroupByKey<>(accessors); + return accessorKey::toKey; + } + + + public boolean keyPresent(LambdaReflection.SerializableFunction keyToCheck) { + return accessors.contains(keyToCheck); + } + + public GroupByKey toKey(T input) { + //TODO add object pooling + GroupByKey cloned = new GroupByKey<>(this); + cloned.keyHolder.setLength(0); + for (int i = 0, accessorsSize = accessors.size(); i < accessorsSize; i++) { + LambdaReflection.SerializableFunction accessor = accessors.get(i); + cloned.keyHolder.append(accessor.apply(input).toString()); + cloned.keyHolder.append("_"); + } + cloned.key = cloned.keyHolder.toString(); + return cloned; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + GroupByKey that = (GroupByKey) o; + + if (!valueClass.equals(that.valueClass)) return false; + if (!Objects.equals(key, that.key)) return false; + return name.equals(that.name); + } + + @Override + public int hashCode() { + int result = valueClass.hashCode(); + result = 31 * result + (key != null ? key.hashCode() : 0); + result = 31 * result + name.hashCode(); + return result; + } + +} diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Collectors.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Collectors.java index fa2990506..bb8765007 100644 --- a/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Collectors.java +++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/Collectors.java @@ -10,18 +10,31 @@ import com.fluxtion.runtime.partition.LambdaReflection.SerializableSupplier; import java.util.List; +import java.util.Set; public interface Collectors { - static SerializableSupplier> toList(int maximumElementCount) { + static SerializableFunction> toSet() { + return new AggregateToSetFlowFunction()::aggregate; + } + + static SerializableFunction> toList() { + return new AggregateToListFlowFunction()::aggregate; + } + + static SerializableFunction> toList(int maxElements) { + return new AggregateToListFlowFunction(maxElements)::aggregate; + } + + static SerializableSupplier> listFactory(int maximumElementCount) { return new AggregateToListFactory(maximumElementCount)::newList; } - static SerializableSupplier> toList() { - return toList(-1); + static SerializableSupplier> listFactory() { + return listFactory(-1); } - static SerializableSupplier> toSet() { + static SerializableSupplier> setFactory() { return AggregateToSetFlowFunction::new; } diff --git a/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/GroupingFactory.java b/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/GroupingFactory.java index 464639661..73311bb9e 100644 --- a/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/GroupingFactory.java +++ b/runtime/src/main/java/com/fluxtion/runtime/dataflow/helpers/GroupingFactory.java @@ -31,7 +31,7 @@ public SerializableFunction getKeyFunction() { } public GroupByFlowFunctionWrapper, AggregateToListFlowFunction> groupByToList() { - SerializableSupplier> list = Collectors.toList(); + SerializableSupplier> list = Collectors.listFactory(); return new GroupByFlowFunctionWrapper<>(keyFunction, Mappers::identity, list); }