Skip to content

Commit

Permalink
Release 9.6.1 (#336)
Browse files Browse the repository at this point in the history
* updating poms for 9.5.3-SNAPSHOT development

* updating poms for branch'release/9.5.2' with non-snapshot versions

* updating develop poms to master versions to avoid merge conflicts

* Updating develop poms back to pre merge state

* simplify api for using MultiJoinBuilder

* simplify api for using merge and Map. Merge and map now supports merging to an instance, previously was only a factory

* add correct @SafeVarargs suppression

* code tidy

* move next release version to 9.6.0

* Feature/multi push function (#333)

* initial work on pushing multiple data flows to a single node instance method with multiple parameters. Each flow maps to a method argument

* remove unused BiPushFunctionOLD.java

* remove debug from tests

* support 3 and 4 argument push helper in DataFlow

* Added test checking triggering of child node to a multi arg push method on parent

---------

Co-authored-by: greg <[email protected]>

* updating poms for 9.6.0 branch with snapshot versions

* updating poms for 9.6.1-SNAPSHOT development

* updating poms for branch'release/9.6.0' with non-snapshot versions

* updating develop poms to master versions to avoid merge conflicts

* Updating develop poms back to pre merge state

* adds default value as a member of MapFlowFunction and BinaryMapFlowFunction. Removes the need for some DefaultValue nodes, reduces size of nodes in the graph (#335)

---------

Co-authored-by: runner <runner@fv-az1501-311>
Co-authored-by: greg <[email protected]>
Co-authored-by: runner <runner@fv-az1456-655.m124l1r4sqdexegkwpsehxmp0h.phxx.internal.cloudapp.net>
  • Loading branch information
4 people authored Jan 12, 2025
1 parent 69672c3 commit 401851f
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 47 deletions.
5 changes: 2 additions & 3 deletions compiler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
~ along with this program. If not, see
~ <http://www.mongodb.com/licensing/server-side-public-license>.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.fluxtion</groupId>
<artifactId>root-parent-pom</artifactId>
<version>9.6.0-SNAPSHOT</version>
<version>9.6.1-SNAPSHOT</version>
<relativePath>../parent-root/pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,10 +465,11 @@ static <T, K> GroupByFlowBuilder<K, T> groupBy(SerializableFunction<T, K> keyFun
}

static <T, K, V> GroupByFlowBuilder<K, V> groupByFromMap(SerializableFunction<T, Map<K, V>> mapSupplier) {
TriggeredFlowFunction<GroupBy<K, V>> triggered = new MapRef2RefFlowFunction<>(
MapRef2RefFlowFunction<Map<K, V>, GroupBy<K, V>, TriggeredFlowFunction<Map<K, V>>> triggered = new MapRef2RefFlowFunction<>(
subscribe(mapSupplier).eventStream,
new GroupByHashMap<K, V>()::fromMap
);
triggered.defaultValue(new GroupBy.EmptyGroupBy<>());
return new GroupByFlowBuilder<>(triggered);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ public <R> FlowBuilder<R> flatMapFromArray(SerializableFunction<T, R[]> iterable
SerializableFunction<T, V> valueFunction,
SerializableSupplier<F> aggregateFunctionSupplier) {
MapFlowFunction<T, GroupBy<K1, A>, TriggeredFlowFunction<T>> x = new MapRef2RefFlowFunction<>(eventStream,
new GroupByFlowFunctionWrapper<>(keyFunction, valueFunction, aggregateFunctionSupplier)::aggregate);
new GroupByFlowFunctionWrapper<>(keyFunction, valueFunction, aggregateFunctionSupplier)::aggregate)
.defaultValue(GroupBy.emptyCollection());
return new GroupByFlowBuilder<>(x);
}

Expand Down Expand Up @@ -437,25 +438,11 @@ public <I, Z extends FlowBuilder<I>> Z mapOnNotify(I target) {
/*
Done:
================
co-group joining multiple aggregates into a single row/object
outer joins
innerjoin
groupBy - sliding window
add peek to primitive streams
stateful support for functions
Use transient reference in any stream that has an instance function reference. Remove anchor
add standard Binary and Map functions for primitives, sum, max, min, add, multiply etc.
add standard predicates for primitives
windowing sliding
windowing tumbling
De-dupe filter
mapOnNotify
id for eventStream
flatmap
groupBy
groupBy - tumbling window
More tests
merge
optional:
================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ public GroupByFlowBuilder<K, V> deleteByKey(FlowBuilder<Collection<K>> supplierO
new BinaryMapToRefFlowFunction<>(
eventStream,
supplierOfIdsToDelete.defaultValue(Collections::emptyList).eventStream,
new GroupByDeleteByKeyFlowFunction(supplierOfIdsToDelete.flowSupplier(), clearDeleteIdsAfterApplying)::deleteByKey));
new GroupByDeleteByKeyFlowFunction(supplierOfIdsToDelete.flowSupplier(), clearDeleteIdsAfterApplying)::deleteByKey))
.defaultValue(new GroupBy.EmptyGroupBy<>());
}

/**
Expand All @@ -169,7 +170,8 @@ public GroupByFlowBuilder<K, V> deleteByValue(SerializableFunction<V, Boolean> d
new BinaryMapToRefFlowFunction<>(
eventStream,
deleteTestFlow.defaultValue(functionInstance).eventStream,
new GroupByDeleteByNameFlowFunctionWrapper(deletePredicateFunction, functionInstance)::deleteByKey));
new GroupByDeleteByNameFlowFunctionWrapper(deletePredicateFunction, functionInstance)::deleteByKey))
.defaultValue(new GroupBy.EmptyGroupBy<>());
}

public GroupByFlowBuilder<K, V> filterValues(SerializableFunction<V, Boolean> mappingFunction) {
Expand Down Expand Up @@ -222,8 +224,8 @@ GroupByFlowBuilder<KOUT, VOUT> mapBiFunction(
SerializableBiFunction<GroupBy<K, V>, GroupBy<K2, V2>, GroupBy<KOUT, VOUT>> int2IntFunction,
GroupByFlowBuilder<K2, V2> stream2Builder) {
return new GroupByFlowBuilder<>(
new BinaryMapToRefFlowFunction<>(
eventStream, stream2Builder.eventStream, int2IntFunction)
new BinaryMapToRefFlowFunction<>(eventStream, stream2Builder.eventStream, int2IntFunction)
.defaultValue(new GroupBy.EmptyGroupBy<>())
);
}

Expand All @@ -232,8 +234,8 @@ GroupByFlowBuilder<KOUT, VOUT> mapBiFlowFunction(
SerializableBiFunction<GroupBy<K, V>, V2, GroupBy<KOUT, VOUT>> int2IntFunction,
FlowBuilder<V2> stream2Builder) {
return new GroupByFlowBuilder<>(
new BinaryMapToRefFlowFunction<>(
eventStream, stream2Builder.eventStream, int2IntFunction)
new BinaryMapToRefFlowFunction<>(eventStream, stream2Builder.eventStream, int2IntFunction)
.defaultValue(new GroupBy.EmptyGroupBy<>())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public final static class School {
@Test
public void innerJoinTest() {
sep(c -> {
val schools = DataFlow.subscribe(School.class)
GroupByFlowBuilder<String, School> schools = DataFlow.subscribe(School.class)
.groupBy(School::getName);
val pupils = DataFlow.subscribe(Pupil.class)
.groupByToList(Pupil::getSchool)
Expand Down Expand Up @@ -82,6 +82,7 @@ public void leftJoinTest() {
sep(c -> {
val schools = DataFlow.subscribe(School.class)
.groupBy(School::getName);

val pupils = DataFlow.subscribe(Pupil.class)
.groupByToList(Pupil::getSchool)
.defaultValue(GroupBy.emptyCollection());
Expand Down
5 changes: 2 additions & 3 deletions parent-root/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
~ <http://www.mongodb.com/licensing/server-side-public-license>.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fluxtion</groupId>
<artifactId>root-parent-pom</artifactId>
<version>9.6.0-SNAPSHOT</version>
<version>9.6.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>fluxtion :: poms :: parent root</name>

Expand Down
5 changes: 2 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
~ along with this program. If not, see
~ <http://www.mongodb.com/licensing/server-side-public-license>.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fluxtion</groupId>
<artifactId>fluxtion.master</artifactId>
<version>9.6.0-SNAPSHOT</version>
<version>9.6.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>fluxtion</name>

Expand Down
5 changes: 2 additions & 3 deletions runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
~ <http://www.mongodb.com/licensing/server-side-public-license>.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.fluxtion</groupId>
<artifactId>root-parent-pom</artifactId>
<version>9.6.0-SNAPSHOT</version>
<version>9.6.1-SNAPSHOT</version>
<relativePath>../parent-root/pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

import com.fluxtion.runtime.annotations.OnTrigger;
import com.fluxtion.runtime.annotations.builder.AssignToField;
import com.fluxtion.runtime.dataflow.DoubleFlowFunction;
import com.fluxtion.runtime.dataflow.FlowFunction;
import com.fluxtion.runtime.dataflow.IntFlowFunction;
import com.fluxtion.runtime.dataflow.LongFlowFunction;
import com.fluxtion.runtime.dataflow.*;
import com.fluxtion.runtime.partition.LambdaReflection.MethodReferenceReflection;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableBiDoubleFunction;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableBiIntFunction;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableBiLongFunction;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;

import java.lang.reflect.Method;

Expand All @@ -27,6 +27,10 @@ public abstract class BinaryMapFlowFunction<R, Q, T, S extends FlowFunction<R>,

protected transient String auditInfo;
protected transient T result;
@Getter
@Setter
@Accessors(fluent = true)
protected T defaultValue;

public BinaryMapFlowFunction(
S inputEventStream,
Expand All @@ -53,9 +57,14 @@ public final boolean map() {
return fireEventUpdateNotification();
}

@Override
public boolean hasDefaultValue() {
return defaultValue != null | DefaultValueSupplier.class.isAssignableFrom(getStreamFunction().method().getDeclaringClass());
}

@Override
public T get() {
return result;
return result == null ? defaultValue : result;
}

abstract protected void mapOperation();
Expand All @@ -68,6 +77,14 @@ protected void resetOperation() {
// System.out.println("Call to binary function reset - not implemented");
}

public T getDefaultValue() {
return defaultValue;
}

public void setDefaultValue(T defaultValue) {
this.defaultValue = defaultValue;
}

public static class BinaryMapToRefFlowFunction<R, Q, T, S extends FlowFunction<R>, U extends FlowFunction<Q>>
extends BinaryMapFlowFunction<R, Q, T, S, U> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@

import com.fluxtion.runtime.annotations.NoTriggerReference;
import com.fluxtion.runtime.annotations.OnTrigger;
import com.fluxtion.runtime.dataflow.DefaultValueSupplier;
import com.fluxtion.runtime.dataflow.DoubleFlowFunction;
import com.fluxtion.runtime.dataflow.FlowFunction;
import com.fluxtion.runtime.dataflow.IntFlowFunction;
import com.fluxtion.runtime.dataflow.LongFlowFunction;
import com.fluxtion.runtime.dataflow.*;
import com.fluxtion.runtime.partition.LambdaReflection;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;

import java.lang.reflect.Method;

Expand All @@ -26,6 +25,10 @@ public abstract class MapFlowFunction<T, R, S extends FlowFunction<T>> extends A

protected transient String auditInfo;
protected transient R result;
@Getter
@Setter
@Accessors(fluent = true)
protected R defaultValue;

@SuppressWarnings("unchecked")
public MapFlowFunction(S inputEventStream, MethodReferenceReflection methodReferenceReflection) {
Expand Down Expand Up @@ -62,12 +65,12 @@ protected void initialise() {

@Override
public boolean hasDefaultValue() {
return DefaultValueSupplier.class.isAssignableFrom(getStreamFunction().method().getDeclaringClass());
return defaultValue != null | DefaultValueSupplier.class.isAssignableFrom(getStreamFunction().method().getDeclaringClass());
}

@Override
public R get() {
return result;
return result == null ? defaultValue : result;
}

abstract protected void mapOperation();
Expand All @@ -76,6 +79,13 @@ protected void resetOperation() {
result = resetFunction.reset();
}

public R getDefaultValue() {
return defaultValue;
}

public void setDefaultValue(R defaultValue) {
this.defaultValue = defaultValue;
}

//***************** REFERENCE map producers START *****************//
@EqualsAndHashCode(callSuper = true)
Expand Down

0 comments on commit 401851f

Please sign in to comment.