sepCo
return EventProcessorFactory.interpreted(sepConfig, generateDescription);
}
+ static EventProcessor interpret(Object... nodes) {
+ return interpret(c -> {
+ for (int i = 0; i < nodes.length; i++) {
+ c.addNode(nodes[i]);
+ }
+ });
+ }
+
/**
* Generates and compiles Java source code for a {@link StaticEventProcessor}. The compiled version only requires
* the Fluxtion runtime dependencies to operate and process events.
@@ -236,6 +270,7 @@ static int scanAndCompileFluxtionBuilders(File... files) {
generationCount.increment();
System.out.println(generationCount.intValue() + ": invoking builder " + c.getName());
try {
+
final FluxtionGraphBuilder newInstance = (FluxtionGraphBuilder) c.loadClass().getDeclaredConstructor().newInstance();
compile(newInstance::buildGraph, newInstance::configureGeneration);
} catch (InstantiationException | IllegalAccessException | InvocationTargetException |
@@ -246,4 +281,44 @@ static int scanAndCompileFluxtionBuilders(File... files) {
}
return generationCount.intValue();
}
+
+ /**
+ * Scans the supplied File resources for any classes that implement the {@link FluxtionGraphBuilder} interface
+ * and will generate an {@link EventProcessor} for any located builders.
+ *
+ * Any builder marked with the {@link Disabled} annotation will be ignored
+ *
+ * No compilations are carried out
+ *
+ * @param classLoader the classloader to be used for the generation
+ * @param files The locations to search for {@link FluxtionGraphBuilder} classes
+ * @return The number of processors generated
+ */
+ static int scanAndGenerateFluxtionBuilders(ClassLoader classLoader, File... files) {
+ Objects.requireNonNull(files, "provide valid locations to search for fluxtion builders");
+ System.setProperty(RuntimeConstants.FLUXTION_NO_COMPILE, "true");
+ LongAdder generationCount = new LongAdder();
+ try (ScanResult scanResult = new ClassGraph()
+ .enableAllInfo()
+ .overrideClasspath(files)
+ .scan()) {
+
+ ClassInfoList builderList = scanResult
+ .getClassesImplementing(FluxtionGraphBuilder.class)
+ .exclude(scanResult.getClassesWithAnnotation(Disabled.class.getCanonicalName()));
+
+ builderList.forEach(c -> {
+ generationCount.increment();
+ System.out.println(generationCount.intValue() + ": invoking builder " + c.getName());
+ try {
+ final FluxtionGraphBuilder newInstance = (FluxtionGraphBuilder) classLoader.loadClass(c.getName()).getDeclaredConstructor().newInstance();
+ compile(newInstance::buildGraph, newInstance::configureGeneration);
+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException |
+ NoSuchMethodException | ClassNotFoundException e) {
+ throw new RuntimeException("cannot instantiate FluxtionGraphBuilder", e);
+ }
+ });
+ }
+ return generationCount.intValue();
+ }
}
diff --git a/compiler/src/main/java/com/fluxtion/compiler/FluxtionCompilerConfig.java b/compiler/src/main/java/com/fluxtion/compiler/FluxtionCompilerConfig.java
index 42b3ff417..25db8b8ab 100644
--- a/compiler/src/main/java/com/fluxtion/compiler/FluxtionCompilerConfig.java
+++ b/compiler/src/main/java/com/fluxtion/compiler/FluxtionCompilerConfig.java
@@ -94,6 +94,10 @@ public class FluxtionCompilerConfig {
* The if {@link #writeSourceToFile} is false this writer will capture the content of the generation process
*/
private Writer sourceWriter;
+ /**
+ * Flag controlling adding build time to generated source files
+ */
+ private boolean addBuildTime;
private transient ClassLoader classLoader;
@@ -101,6 +105,7 @@ public FluxtionCompilerConfig() {
generateDescription = false;
writeSourceToFile = false;
compileSource = true;
+ addBuildTime = false;
formatSource = true;
templateSep = JAVA_TEMPLATE;
classLoader = FluxtionCompilerConfig.class.getClassLoader();
@@ -157,6 +162,14 @@ public void setWriteSourceToFile(boolean writeSourceToFile) {
this.writeSourceToFile = writeSourceToFile;
}
+ public boolean isAddBuildTime() {
+ return addBuildTime;
+ }
+
+ public void setAddBuildTime(boolean addBuildTime) {
+ this.addBuildTime = addBuildTime;
+ }
+
public void setPackageName(String packageName) {
this.packageName = packageName;
}
diff --git a/compiler/src/main/java/com/fluxtion/compiler/Sample.java b/compiler/src/main/java/com/fluxtion/compiler/Sample.java
deleted file mode 100644
index bb129d72b..000000000
--- a/compiler/src/main/java/com/fluxtion/compiler/Sample.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package com.fluxtion.compiler;
-
-public class Sample {
-
- public static String foo(Object bar) {
-// return switch(bar) {
-// case Integer i -> "I'm an Integer: " + i;
-// case Long l -> "I'm a Long: " + l;
-// default -> "I'm an object";
-// };
- return "";
- }
-}
diff --git a/compiler/src/main/java/com/fluxtion/compiler/builder/context/InstanceSupplierFactory.java b/compiler/src/main/java/com/fluxtion/compiler/builder/context/InstanceSupplierFactory.java
index bbcef24f1..8f67fa05b 100644
--- a/compiler/src/main/java/com/fluxtion/compiler/builder/context/InstanceSupplierFactory.java
+++ b/compiler/src/main/java/com/fluxtion/compiler/builder/context/InstanceSupplierFactory.java
@@ -3,6 +3,7 @@
import com.fluxtion.compiler.builder.factory.NodeFactory;
import com.fluxtion.compiler.builder.factory.NodeRegistry;
import com.fluxtion.compiler.generation.GenerationContext;
+import com.fluxtion.runtime.EventProcessorContext;
import com.fluxtion.runtime.audit.Auditor;
import com.fluxtion.runtime.node.InstanceSupplier;
import com.fluxtion.runtime.node.InstanceSupplierNode;
@@ -35,10 +36,11 @@ public InstanceSupplier> createNode(Map config, NodeRegistry r
rawType = Object.class;
}
final String typeName = "contextService_" + rawType.getSimpleName() + "_" + instanceName + count++;
+
return new InstanceSupplierNode<>(
hasInstanceQualifier ? rawType.getCanonicalName() + "_" + instanceName : rawType.getCanonicalName(),
true,
- null,
+ registry.findOrCreateNode(EventProcessorContext.class, config, null),
typeName.replace(".", "_"));
}
diff --git a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/AbstractFlowBuilder.java b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/AbstractFlowBuilder.java
index 91963f344..ea8bf5f05 100644
--- a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/AbstractFlowBuilder.java
+++ b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/AbstractFlowBuilder.java
@@ -16,11 +16,13 @@
import com.fluxtion.runtime.dataflow.function.PushFlowFunction;
import com.fluxtion.runtime.dataflow.helpers.InternalEventDispatcher;
import com.fluxtion.runtime.dataflow.helpers.Peekers;
+import com.fluxtion.runtime.dataflow.helpers.Predicates.PredicateWrapper;
import com.fluxtion.runtime.output.SinkPublisher;
import com.fluxtion.runtime.partition.LambdaReflection;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableBiFunction;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableConsumer;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableFunction;
+import com.fluxtion.runtime.partition.LambdaReflection.SerializableSupplier;
public abstract class AbstractFlowBuilder> {
@@ -68,6 +70,10 @@ public B filter(SerializableFunction filterFunction) {
return connect(new FilterFlowFunction<>(eventStream, filterFunction));
}
+ public B filter(SerializableSupplier filterFunction) {
+ return filter(new PredicateWrapper(filterFunction)::test);
+ }
+
public B filterByProperty(SerializableFunction accessor, SerializableFunction filterFunction) {
return connect(new FilterByPropertyFlowFunction<>(eventStream, accessor, filterFunction));
}
diff --git a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/DataFlow.java b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/DataFlow.java
index 3b3913feb..9b4e80a11 100644
--- a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/DataFlow.java
+++ b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/DataFlow.java
@@ -9,6 +9,7 @@
import com.fluxtion.runtime.event.Event;
import com.fluxtion.runtime.event.Signal;
import com.fluxtion.runtime.node.DefaultEventHandlerNode;
+import com.fluxtion.runtime.partition.LambdaReflection.SerializableBiFunction;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableFunction;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableSupplier;
@@ -69,132 +70,271 @@ static FlowBuilder subscribe(Class classSubscription, in
);
}
- static GroupByFlowBuilder groupBy(SerializableFunction keyFunction) {
- @SuppressWarnings("unchecked")
- Class classSubscription = (Class) keyFunction.method().getDeclaringClass();
- return subscribe(classSubscription).groupBy(keyFunction);
- }
-
- static > GroupByFlowBuilder groupBy(
- SerializableFunction keyFunction, SerializableSupplier aggregateFunctionSupplier) {
- @SuppressWarnings("unchecked")
- Class classSubscription = (Class) keyFunction.method().getDeclaringClass();
- return subscribe(classSubscription).groupBy(keyFunction, aggregateFunctionSupplier);
- }
-
- //SerializableSupplier aggregateFunctionSupplier
- static GroupByFlowBuilder> groupByToList(SerializableFunction keyFunction) {
- @SuppressWarnings("unchecked")
- Class classSubscription = (Class) keyFunction.method().getDeclaringClass();
- return subscribe(classSubscription).groupByToList(keyFunction);
- }
-
- static GroupByFlowBuilder> groupByToSet(SerializableFunction keyFunction) {
- @SuppressWarnings("unchecked")
- Class classSubscription = (Class) keyFunction.method().getDeclaringClass();
- return subscribe(classSubscription).groupByToSet(keyFunction);
+ /**
+ * Subscribes to an internal node within the processing graph and presents it as an {@link FlowBuilder}
+ * for constructing stream processing logic.
+ *
+ * @param source The node to be wrapped and made head of this stream
+ * @param The type of the node
+ * @return An {@link FlowBuilder} that can used to construct stream processing logic
+ */
+ static FlowBuilder subscribeToNode(T source) {
+ return new FlowBuilder<>(new NodeToFlowFunction<>(source));
}
- static GroupByFlowBuilder groupBy(
- SerializableFunction keyFunction,
- SerializableFunction valueFunction) {
- @SuppressWarnings("unchecked")
- Class classSubscription = (Class) keyFunction.method().getDeclaringClass();
- return subscribe(classSubscription).groupBy(keyFunction, valueFunction);
+ /**
+ * Subscribes to a property on an internal node within the processing graph and presents it as an {@link FlowBuilder}
+ * for constructing stream processing logic. The node will be created and added to the graph
+ *
+ * @param sourceProperty The property accessor
+ * @param The type of the node
+ * @param The type of the property that will be supplied in the stream
+ * @return An {@link FlowBuilder} that can used to construct stream processing logic
+ */
+ static FlowBuilder subscribeToNodeProperty(SerializableFunction sourceProperty) {
+ T source;
+ if (sourceProperty.captured().length == 0) {
+ try {
+ source = (T) sourceProperty.getContainingClass().getDeclaredConstructor().newInstance();
+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException |
+ NoSuchMethodException e) {
+ throw new RuntimeException("no default constructor found for class:"
+ + sourceProperty.getContainingClass()
+ + " either add default constructor or pass in a node instance");
+ }
+ } else {
+ source = (T) sourceProperty.captured()[0];
+ }
+ return subscribeToNode(source).map(sourceProperty);
}
- static > GroupByFlowBuilder
- groupBy(SerializableFunction keyFunction,
- SerializableFunction valueFunction,
- SerializableSupplier aggregateFunctionSupplier) {
- @SuppressWarnings("unchecked")
- Class classSubscription = (Class) keyFunction.method().getDeclaringClass();
- return subscribe(classSubscription).groupBy(keyFunction, valueFunction, aggregateFunctionSupplier);
+ /**
+ * Subscribes to a property on an internal node within the processing graph and presents it as an {@link FlowBuilder}
+ * for constructing stream processing logic.
+ *
+ * @param propertySupplier The property accessor
+ * @param The type of the property that will be supplied in the stream
+ * @return An {@link FlowBuilder} that can used to construct stream processing logic
+ */
+ static FlowBuilder subscribeToNodeProperty(SerializableSupplier propertySupplier) {
+ EventProcessorBuilderService.service().addOrReuse(propertySupplier.captured()[0]);
+ return new FlowBuilder<>(new NodePropertyToFlowFunction<>(propertySupplier));
}
+ /**
+ * See {@link DataFlow#subscribe(Class)} shortcut method to create a stream that subscribes to a filtered
+ * {@link Signal} event. Useful for invoking triggers on flows.
+ *
+ * @param filterId The filter string to apply
+ * @return An {@link FlowBuilder} that can used to construct stream processing logic
+ */
static FlowBuilder subscribeToSignal(String filterId) {
return subscribe(Signal.class, filterId);
}
+ /**
+ * See {@link DataFlow#subscribe(Class)} shortcut method to create a stream that subscribes to a filtered
+ * {@link Signal} event, containing an event of the type specified. Useful for invoking triggers on flows.
+ *
+ * @param filterId The filter string to apply
+ * @param signalType The type of value that is held by the {@link Signal} event
+ * @return An {@link FlowBuilder} that can used to construct stream processing logic
+ */
static FlowBuilder subscribeToSignal(String filterId, Class signalType) {
- //subscribe(Signal.class, filterId);
return subscribe(Signal.class, filterId).map(Signal::getValue);
}
+ /**
+ * See {@link DataFlow#subscribe(Class)} shortcut method to create a stream that subscribes to a filtered
+ * {@link Signal} event, containing an event of the type specified. A default value is provided if the signal
+ * event contains a null value. Useful for invoking triggers on flows.
+ *
+ * @param filterId The filter string to apply
+ * @param signalType The type of value that is held by the {@link Signal} event
+ * @param defaultValue the value to use if the signal event value is null
+ * @return An {@link FlowBuilder} that can used to construct stream processing logic
+ */
static FlowBuilder subscribeToSignal(String filterId, Class signalType, T defaultValue) {
return subscribe(Signal.class, filterId).map(Signal::getValue).defaultValue(defaultValue);
}
+ /**
+ * See {@link DataFlow#subscribe(Class)} shortcut method to create a int stream that subscribes to a filtered
+ * {@link Signal} event. Useful for invoking triggers on flows.
+ *
+ * @param filterId The filter string to apply
+ * @return An {@link IntFlowBuilder} that can used to construct stream processing logic
+ */
static IntFlowBuilder subscribeToIntSignal(String filterId) {
return subscribe(Signal.IntSignal.class, filterId).mapToInt(Signal.IntSignal::getValue);
}
+ /**
+ * See {@link DataFlow#subscribe(Class)} shortcut method to create a int stream that subscribes to a filtered
+ * {@link Signal} event. Useful for invoking triggers on flows. A default value is provided if the signal
+ * event contains a 0 value
+ *
+ * @param filterId The filter string to apply
+ * @param defaultValue to use if the signal event value is 0
+ * @return An {@link IntFlowBuilder} that can used to construct stream processing logic
+ */
static IntFlowBuilder subscribeToIntSignal(String filterId, int defaultValue) {
return subscribe(Signal.IntSignal.class, filterId).mapToInt(Signal.IntSignal::getValue)
.defaultValue(defaultValue);
}
+ /**
+ * See {@link DataFlow#subscribe(Class)} shortcut method to create a double stream that subscribes to a filtered
+ * {@link Signal} event. Useful for invoking triggers on flows.
+ *
+ * @param filterId The filter string to apply
+ * @return An {@link DoubleFlowBuilder} that can used to construct stream processing logic
+ */
static DoubleFlowBuilder subscribeToDoubleSignal(String filterId) {
return subscribe(Signal.DoubleSignal.class, filterId).mapToDouble(Signal.DoubleSignal::getValue);
}
+ /**
+ * See {@link DataFlow#subscribe(Class)} shortcut method to create a double stream that subscribes to a filtered
+ * {@link Signal} event. Useful for invoking triggers on flows. A default value is provided if the signal
+ * event contains a 0 value
+ *
+ * @param filterId The filter string to apply
+ * @param defaultValue to use if the signal event value is 0
+ * @return An {@link DoubleFlowBuilder} that can used to construct stream processing logic
+ */
static DoubleFlowBuilder subscribeToDoubleSignal(String filterId, double defaultValue) {
return subscribe(Signal.DoubleSignal.class, filterId).mapToDouble(Signal.DoubleSignal::getValue)
.defaultValue(defaultValue);
}
+ /**
+ * See {@link DataFlow#subscribe(Class)} shortcut method to create a long stream that subscribes to a filtered
+ * {@link Signal} event. Useful for invoking triggers on flows.
+ *
+ * @param filterId The filter string to apply
+ * @return An {@link LongFlowBuilder} that can used to construct stream processing logic
+ */
static LongFlowBuilder subscribeToLongSignal(String filterId) {
return subscribe(Signal.LongSignal.class, filterId).mapToLong(Signal.LongSignal::getValue);
}
+ /**
+ * See {@link DataFlow#subscribe(Class)} shortcut method to create a long stream that subscribes to a filtered
+ * {@link Signal} event. Useful for invoking triggers on flows. A default value is provided if the signal
+ * event contains a 0 value
+ *
+ * @param filterId The filter string to apply
+ * @param defaultValue to use if the signal event value is 0
+ * @return An {@link LongFlowBuilder} that can used to construct stream processing logic
+ */
static LongFlowBuilder subscribeToLongSignal(String filterId, long defaultValue) {
return subscribe(Signal.LongSignal.class, filterId).mapToLong(Signal.LongSignal::getValue)
.defaultValue(defaultValue);
}
+
/**
- * Subscribes to an internal node within the processing graph and presents it as an {@link FlowBuilder}
- * for constructing stream processing logic.
+ * Merges and maps several {@link FlowFunction}'s into a single event stream of type T
*
- * @param source The node to be wrapped and made head of this stream
- * @param The type of the node
+ * @param builder The builder defining the merge operations
+ * @param The output type of the merged stream
* @return An {@link FlowBuilder} that can used to construct stream processing logic
*/
- static FlowBuilder subscribeToNode(T source) {
- return new FlowBuilder<>(new NodeToFlowFunction<>(source));
+ static FlowBuilder mergeMap(MergeAndMapFlowBuilder builder) {
+ MergeMapFlowFunction build = builder.build();
+ return new FlowBuilder<>(build);
}
- static FlowBuilder subscribeToNodeProperty(SerializableFunction sourceProperty) {
- T source;
- if (sourceProperty.captured().length == 0) {
- try {
- source = (T) sourceProperty.getContainingClass().getDeclaredConstructor().newInstance();
- } catch (InstantiationException | IllegalAccessException | InvocationTargetException |
- NoSuchMethodException e) {
- throw new RuntimeException("no default constructor found for class:"
- + sourceProperty.getContainingClass()
- + " either add default constructor or pass in a node instance");
- }
- } else {
- source = (T) sourceProperty.captured()[0];
- }
- return subscribeToNode(source).map(sourceProperty);
+ /**
+ * Merges two {@link FlowBuilder}'s into a single event stream of type T
+ *
+ * @param streamAToMerge stream A to merge
+ * @param streamBToMerge stream B to merge
+ * @param type of stream A
+ * @param type of stream B
+ * @return An {@link FlowBuilder} that can used to construct stream processing logic
+ */
+ static FlowBuilder merge(FlowBuilder streamAToMerge, FlowBuilder streamBToMerge) {
+ return streamAToMerge.merge(streamBToMerge);
}
- static FlowBuilder subscribeToNodeProperty(SerializableSupplier propertySupplier) {
- EventProcessorBuilderService.service().addOrReuse(propertySupplier.captured()[0]);
- return new FlowBuilder<>(new NodePropertyToFlowFunction<>(propertySupplier));
+ /**
+ * Merges multiple {@link FlowBuilder}'s into a single event stream of type T
+ *
+ * @param streamAToMerge stream A to merge
+ * @param streamBToMerge stream B to merge
+ * @param streamsToMerge streams to merge
+ * @param type of stream A
+ * @param type of stream B
+ * @return An {@link FlowBuilder} that can used to construct stream processing logic
+ */
+ @SuppressWarnings("unchecked")
+ static FlowBuilder merge(
+ FlowBuilder streamAToMerge,
+ FlowBuilder streamBToMerge,
+ FlowBuilder extends T>... streamsToMerge) {
+ return streamAToMerge.merge(streamBToMerge, streamsToMerge);
}
/**
- * Merges and maps several {@link FlowFunction}'s into a single event stream of type T
+ * Applies a mapping bi function to a pair of streams, creating a stream that is the output of the function. The
+ * mapping function will be invoked whenever either stream triggers a notification
*
- * @param builder The builder defining the merge operations
- * @param The output type of the merged stream
+ * @param biFunction The mapping {@link java.util.function.BiFunction}
+ * @param streamArg1 Stream providing the first argument to the mapping function
+ * @param streamArg2 Stream providing the second argument to the mapping function
+ * @param The type of argument 1 stream
+ * @param The type of argument 2 stream
+ * @param The return type of the mapping function
* @return An {@link FlowBuilder} that can used to construct stream processing logic
*/
- static FlowBuilder mergeMap(MergeAndMapFlowBuilder builder) {
- MergeMapFlowFunction build = builder.build();
- return new FlowBuilder<>(build);
+ static FlowBuilder mapBiFunction(SerializableBiFunction biFunction,
+ FlowBuilder streamArg1,
+ FlowBuilder streamArg2) {
+ return streamArg1.mapBiFunction(biFunction, streamArg2);
+ }
+
+ static GroupByFlowBuilder groupBy(SerializableFunction keyFunction) {
+ @SuppressWarnings("unchecked")
+ Class classSubscription = (Class) keyFunction.method().getDeclaringClass();
+ return subscribe(classSubscription).groupBy(keyFunction);
+ }
+
+ static > GroupByFlowBuilder groupBy(
+ SerializableFunction keyFunction, SerializableSupplier aggregateFunctionSupplier) {
+ @SuppressWarnings("unchecked")
+ Class classSubscription = (Class) keyFunction.method().getDeclaringClass();
+ return subscribe(classSubscription).groupBy(keyFunction, aggregateFunctionSupplier);
+ }
+
+ //SerializableSupplier aggregateFunctionSupplier
+ static GroupByFlowBuilder> groupByToList(SerializableFunction keyFunction) {
+ @SuppressWarnings("unchecked")
+ Class classSubscription = (Class) keyFunction.method().getDeclaringClass();
+ return subscribe(classSubscription).groupByToList(keyFunction);
+ }
+
+ static GroupByFlowBuilder> groupByToSet(SerializableFunction keyFunction) {
+ @SuppressWarnings("unchecked")
+ Class classSubscription = (Class) keyFunction.method().getDeclaringClass();
+ return subscribe(classSubscription).groupByToSet(keyFunction);
+ }
+
+ static GroupByFlowBuilder groupBy(
+ SerializableFunction keyFunction,
+ SerializableFunction valueFunction) {
+ @SuppressWarnings("unchecked")
+ Class classSubscription = (Class) keyFunction.method().getDeclaringClass();
+ return subscribe(classSubscription).groupBy(keyFunction, valueFunction);
+ }
+
+ static > GroupByFlowBuilder
+ groupBy(SerializableFunction keyFunction,
+ SerializableFunction valueFunction,
+ SerializableSupplier aggregateFunctionSupplier) {
+ @SuppressWarnings("unchecked")
+ Class classSubscription = (Class) keyFunction.method().getDeclaringClass();
+ return subscribe(classSubscription).groupBy(keyFunction, valueFunction, aggregateFunctionSupplier);
}
}
diff --git a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java
index 9f24da887..6989be7b5 100644
--- a/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java
+++ b/compiler/src/main/java/com/fluxtion/compiler/builder/dataflow/FlowBuilder.java
@@ -1,6 +1,7 @@
package com.fluxtion.compiler.builder.dataflow;
import com.fluxtion.runtime.EventProcessorBuilderService;
+import com.fluxtion.runtime.dataflow.FlowFunction;
import com.fluxtion.runtime.dataflow.FlowSupplier;
import com.fluxtion.runtime.dataflow.TriggeredFlowFunction;
import com.fluxtion.runtime.dataflow.aggregate.AggregateFlowFunction;
@@ -8,16 +9,9 @@
import com.fluxtion.runtime.dataflow.aggregate.function.TimedSlidingWindow;
import com.fluxtion.runtime.dataflow.aggregate.function.TumblingWindow;
import com.fluxtion.runtime.dataflow.function.BinaryMapFlowFunction.BinaryMapToRefFlowFunction;
-import com.fluxtion.runtime.dataflow.function.FlatMapArrayFlowFunction;
-import com.fluxtion.runtime.dataflow.function.FlatMapFlowFunction;
-import com.fluxtion.runtime.dataflow.function.LookupFlowFunction;
-import com.fluxtion.runtime.dataflow.function.MapFlowFunction;
+import com.fluxtion.runtime.dataflow.function.*;
import com.fluxtion.runtime.dataflow.function.MapFlowFunction.MapRef2RefFlowFunction;
-import com.fluxtion.runtime.dataflow.function.MergeFlowFunction;
-import com.fluxtion.runtime.dataflow.groupby.GroupBy;
-import com.fluxtion.runtime.dataflow.groupby.GroupByFlowFunctionWrapper;
-import com.fluxtion.runtime.dataflow.groupby.GroupByTimedSlidingWindow;
-import com.fluxtion.runtime.dataflow.groupby.GroupByTumblingWindow;
+import com.fluxtion.runtime.dataflow.groupby.*;
import com.fluxtion.runtime.dataflow.helpers.Aggregates;
import com.fluxtion.runtime.dataflow.helpers.Collectors;
import com.fluxtion.runtime.dataflow.helpers.DefaultValue;
@@ -27,6 +21,7 @@
import com.fluxtion.runtime.partition.LambdaReflection.SerializableFunction;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableSupplier;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@@ -78,6 +73,30 @@ public FlowBuilder map(SerializableFunction mapFunction) {
return super.mapBase(mapFunction);
}
+ public FlowBuilder> mapToSet() {
+ return map(Collectors.toSet());
+ }
+
+ public FlowBuilder> mapToSet(SerializableFunction mapFunction) {
+ return map(mapFunction).map(Collectors.toSet());
+ }
+
+ public FlowBuilder> mapToList() {
+ return map(Collectors.toList());
+ }
+
+ public FlowBuilder> mapToList(SerializableFunction mapFunction) {
+ return map(mapFunction).map(Collectors.toList());
+ }
+
+ public FlowBuilder> mapToList(int maxElements) {
+ return map(Collectors.toList(maxElements));
+ }
+
+ public FlowBuilder> mapToList(SerializableFunction mapFunction, int maxElements) {
+ return map(mapFunction).map(Collectors.toList(maxElements));
+ }
+
public FlowBuilder mapBiFunction(SerializableBiFunction int2IntFunction,
FlowBuilder stream2Builder) {
@@ -92,6 +111,17 @@ public FlowBuilder merge(FlowBuilder extends T> streamToMerge) {
return new FlowBuilder<>(new MergeFlowFunction<>(eventStream, streamToMerge.eventStream));
}
+ @SuppressWarnings("unchecked")
+ public FlowBuilder merge(FlowBuilder extends T> streamToMerge, FlowBuilder extends T>... streamsToMerge) {
+ List> mergeList = new ArrayList<>();
+ mergeList.add(eventStream);
+ mergeList.add(streamToMerge.eventStream);
+ for (FlowBuilder extends T> flowBuilder : streamsToMerge) {
+ mergeList.add(flowBuilder.eventStream);
+ }
+ return new FlowBuilder<>(new MergeFlowFunction<>(mergeList));
+ }
+
public FlowBuilder flatMap(SerializableFunction> iterableFunction) {
return new FlowBuilder<>(new FlatMapFlowFunction<>(eventStream, iterableFunction));
}
@@ -117,6 +147,19 @@ public FlowBuilder flatMapFromArray(SerializableFunction iterable
new TimedSlidingWindow<>(eventStream, aggregateFunction, bucketSizeMillis, bucketsPerWindow));
}
+ /**
+ * Aggregates a flow using a key function to group by and an aggregating function to process new values for a keyed
+ * bucket.
+ *
+ * @param keyFunction The key function that groups and buckets incoming values
+ * @param valueFunction The value that is extracted from the incoming stream and applied to the aggregating function
+ * @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance
+ * @param Value type extracted from the incoming data flow
+ * @param The type of the key used to group values
+ * @param The return type of the aggregating function
+ * @param The aggregating function type
+ * @return A GroupByFlowBuilder for the aggregated flow
+ */
public > GroupByFlowBuilder
groupBy(SerializableFunction keyFunction,
SerializableFunction valueFunction,
@@ -126,33 +169,147 @@ public FlowBuilder flatMapFromArray(SerializableFunction iterable
return new GroupByFlowBuilder<>(x);
}
+ /**
+ * Specialisation of groupBy where the value is the identity of the incoming data flow
+ *
+ * @param keyFunction The key function that groups and buckets incoming values
+ * @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance
+ * @param The type of the key used to group values
+ * @param The return type of the aggregating function
+ * @param The aggregating function type
+ * @return A GroupByFlowBuilder for the aggregated flow
+ * @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier)
+ */
public > GroupByFlowBuilder
groupBy(SerializableFunction keyFunction, SerializableSupplier aggregateFunctionSupplier) {
return groupBy(keyFunction, Mappers::identity, aggregateFunctionSupplier);
}
+ /**
+ * Specialisation of groupBy where the output of the groupBy is the last value received for a bucket. The value is
+ * extracted using the value function
+ *
+ * @param keyFunction The key function that groups and buckets incoming values
+ * @param valueFunction The value that is extracted from the incoming stream and applied to the aggregating function
+ * @param Value type extracted from the incoming data flow
+ * @param The type of the key used to group values
+ * @return A GroupByFlowBuilder for the aggregated flow
+ * @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier)
+ */
public GroupByFlowBuilder groupBy(
SerializableFunction keyFunction,
SerializableFunction valueFunction) {
return groupBy(keyFunction, valueFunction, Aggregates.identityFactory());
}
+ /**
+ * Specialisation of groupBy where the output of the groupBy is the last value received for a bucket, where
+ * the value is the identity of the incoming data flow
+ *
+ * @param keyFunction The key function that groups and buckets incoming values
+ * @param The type of the key used to group values
+ * @return A GroupByFlowBuilder for the aggregated flow
+ */
public GroupByFlowBuilder groupBy(SerializableFunction keyFunction) {
return groupBy(keyFunction, Mappers::identity);
}
+ /**
+ * Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the value.
+ * The value is the last value supplied
+ *
+ * @param keyFunction key accessor
+ * @param keyFunctions multi arg key accessors
+ * @return GroupByFlowBuilder keyed on properties
+ */
+ @SafeVarargs
+ public final GroupByFlowBuilder, T> groupByFields(
+ SerializableFunction keyFunction,
+ SerializableFunction... keyFunctions) {
+ return groupBy(GroupByKey.build(keyFunction, keyFunctions));
+ }
+
+ /**
+ * Aggregates a flow using a key to group by and an aggregating function to process new values for a keyed
+ * bucket. The key is a compound key created by a set of method reference accessors to for the value.
+ *
+ * @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance
+ * @param keyFunction key accessor
+ * @param keyFunctions multi arg key accessors
+ * @param The return type of the aggregating function
+ * @param The aggregating function type
+ * @return A GroupByFlowBuilder for the aggregated flow
+ * @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier)
+ */
+ @SafeVarargs
+ public final > GroupByFlowBuilder, A> groupByFieldsAggregate(
+ SerializableSupplier aggregateFunctionSupplier,
+ SerializableFunction keyFunction,
+ SerializableFunction... keyFunctions) {
+ return groupBy(GroupByKey.build(keyFunction, keyFunctions), aggregateFunctionSupplier);
+ }
+
+ /**
+ * Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the key
+ * The value is extracted from the input using the value function
+ *
+ * @param valueFunction the value that will be stored in the groupBy
+ * @param keyFunction key accessor
+ * @param keyFunctions multi arg key accessors
+ * @return GroupByFlowBuilder keyed on properties
+ */
+ @SafeVarargs
+ public final GroupByFlowBuilder, V> groupByFieldsAndGet(
+ SerializableFunction valueFunction,
+ SerializableFunction keyFunction,
+ SerializableFunction... keyFunctions) {
+ return groupBy(GroupByKey.build(keyFunction, keyFunctions), valueFunction);
+ }
+
+ /**
+ * Creates a GroupByFlowBuilder using a compound key created by a set of method reference accessors to for the key
+ * The value is extracted from the input using the value function and is used as an input to the aggregating function
+ *
+ * @param valueFunction the value that will be stored in the groupBy
+ * @param aggregateFunctionSupplier A factory that supplies aggregating functions, each function has its own function instance
+ * @param keyFunction key accessor
+ * @param keyFunctions multi arg key accessors
+ * @param Value type extracted from the incoming data flow
+ * @param The return type of the aggregating function
+ * @param The aggregating function type
+ * @return A GroupByFlowBuilder for the aggregated flow
+ * @see FlowBuilder#groupBy(SerializableFunction, SerializableFunction, SerializableSupplier)
+ */
+ @SafeVarargs
+ public final > GroupByFlowBuilder, A> groupByFieldsGetAndAggregate(
+ SerializableFunction valueFunction,
+ SerializableSupplier aggregateFunctionSupplier,
+ SerializableFunction keyFunction,
+ SerializableFunction... keyFunctions) {
+ return groupBy(GroupByKey.build(keyFunction, keyFunctions), valueFunction, aggregateFunctionSupplier);
+ }
+
public GroupByFlowBuilder> groupByToList(SerializableFunction keyFunction) {
- return groupBy(keyFunction, Mappers::identity, Collectors.toList());
+ return groupBy(keyFunction, Mappers::identity, Collectors.listFactory());
+ }
+
+ public GroupByFlowBuilder> groupByToList(
+ SerializableFunction keyFunction, SerializableFunction valueFunction) {
+ return groupBy(keyFunction, valueFunction, Collectors.listFactory());
}
public GroupByFlowBuilder> groupByToSet(SerializableFunction keyFunction) {
- return groupBy(keyFunction, Mappers::identity, Collectors.toSet());
+ return groupBy(keyFunction, Mappers::identity, Collectors.setFactory());
+ }
+
+ public GroupByFlowBuilder> groupByToSet(SerializableFunction keyFunction, SerializableFunction valueFunction) {
+ return groupBy(keyFunction, valueFunction, Collectors.setFactory());
}
public GroupByFlowBuilder> groupByToList(
SerializableFunction keyFunction,
int maxElementsInList) {
- return groupBy(keyFunction, Mappers::identity, Collectors.toList(maxElementsInList));
+ return groupBy(keyFunction, Mappers::identity, Collectors.listFactory(maxElementsInList));
}
public