Skip to content

Commit

Permalink
feature/columns first work
Browse files Browse the repository at this point in the history
  • Loading branch information
greg-higgins committed Dec 11, 2023
1 parent 510fec4 commit 15fb5fb
Show file tree
Hide file tree
Showing 16 changed files with 637 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.fluxtion.compiler.builder.dataflow;

import com.fluxtion.runtime.EventProcessorBuilderService;
import com.fluxtion.runtime.dataflow.FlowSupplier;
import com.fluxtion.runtime.dataflow.TriggeredFlowFunction;
import com.fluxtion.runtime.dataflow.column.Column;
import com.fluxtion.runtime.dataflow.column.ColumnFilterFlowFunction;
import com.fluxtion.runtime.dataflow.column.ColumnMapFlowFunction;
import com.fluxtion.runtime.dataflow.function.MapFlowFunction;
import com.fluxtion.runtime.dataflow.function.NotifyFlowFunction;
import com.fluxtion.runtime.dataflow.function.PeekFlowFunction;
import com.fluxtion.runtime.dataflow.function.PushFlowFunction;
import com.fluxtion.runtime.dataflow.helpers.Peekers;
import com.fluxtion.runtime.output.SinkPublisher;
import com.fluxtion.runtime.partition.LambdaReflection;

import java.util.List;

public class ColumFlowBuilder<T> implements FlowDataSupplier<FlowSupplier<Column<T>>> {

protected TriggeredFlowFunction<Column<T>> eventStream;

public ColumFlowBuilder(TriggeredFlowFunction<Column<T>> eventStream) {
this.eventStream = eventStream;
EventProcessorBuilderService.service().add(eventStream);
}

@Override
public FlowSupplier<Column<T>> flowSupplier() {
return eventStream;
}

public <R> ColumFlowBuilder<R> map(LambdaReflection.SerializableFunction<T, R> mapColumnFunction) {
LambdaReflection.SerializableFunction<Column<T>, Column<R>> function = new ColumnMapFlowFunction<>(mapColumnFunction)::map;
MapFlowFunction.MapRef2RefFlowFunction<Column<T>, Column<R>, TriggeredFlowFunction<Column<T>>> mapRef2RefFlowFunction = new MapFlowFunction.MapRef2RefFlowFunction<>(eventStream, function);
return new ColumFlowBuilder<>(mapRef2RefFlowFunction);
}

public ColumFlowBuilder<T> filter(LambdaReflection.SerializableFunction<T, Boolean> filterFunction) {
LambdaReflection.SerializableFunction<Column<T>, Column<T>> filter = new ColumnFilterFlowFunction<>(filterFunction)::filter;
MapFlowFunction.MapRef2RefFlowFunction<Column<T>, Column<T>, TriggeredFlowFunction<Column<T>>> mapRef2RefFlowFunction = new MapFlowFunction.MapRef2RefFlowFunction<>(eventStream, filter);
return new ColumFlowBuilder<>(mapRef2RefFlowFunction);
}

public FlowBuilder<List<T>> asList() {
return new FlowBuilder<>(new MapFlowFunction.MapRef2RefFlowFunction<>(eventStream, Column::values));
}

//OUTPUTS - START
public ColumFlowBuilder<T> notify(Object target) {
EventProcessorBuilderService.service().add(target);
return new ColumFlowBuilder<>(new NotifyFlowFunction<>(eventStream, target));
}

public ColumFlowBuilder<T> sink(String sinkId) {
return push(new SinkPublisher<>(sinkId)::publish);
}

public ColumFlowBuilder<T> push(LambdaReflection.SerializableConsumer<Column<T>> pushFunction) {
if (pushFunction.captured().length > 0) {
EventProcessorBuilderService.service().add(pushFunction.captured()[0]);
}
return new ColumFlowBuilder<>(new PushFlowFunction<>(eventStream, pushFunction));
}

public ColumFlowBuilder<T> peek(LambdaReflection.SerializableConsumer<Column<T>> peekFunction) {
return new ColumFlowBuilder<>(new PeekFlowFunction<>(eventStream, peekFunction));
}


public <R> ColumFlowBuilder<T> console(String in, LambdaReflection.SerializableFunction<Column<T>, R> peekFunction) {
peek(Peekers.console(in, peekFunction));
return this;
}

public ColumFlowBuilder<T> console(String in) {
peek(Peekers.console(in, Peekers::columnToList));
return this;
}

public ColumFlowBuilder<T> console() {
return console("{}");
}

//META-DATA
public ColumFlowBuilder<T> id(String nodeId) {
EventProcessorBuilderService.service().add(eventStream, nodeId);
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.fluxtion.runtime.event.Event;
import com.fluxtion.runtime.event.Signal;
import com.fluxtion.runtime.node.DefaultEventHandlerNode;
import com.fluxtion.runtime.node.DefaultEventToColumnHandlerNode;
import com.fluxtion.runtime.partition.LambdaReflection;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableBiFunction;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableFunction;
import com.fluxtion.runtime.partition.LambdaReflection.SerializableSupplier;
Expand Down Expand Up @@ -38,6 +40,47 @@ static <T> FlowBuilder<T> subscribe(Class<T> classSubscription) {
);
}

static <T> ColumFlowBuilder<T> buildColumn(Class<T> classSubscription) {
DefaultEventToColumnHandlerNode<T> columnSubscriber = EventProcessorBuilderService.service().addOrReuse(new DefaultEventToColumnHandlerNode<>(classSubscription));
return new ColumFlowBuilder<>(columnSubscriber);
}

static <T, R> ColumFlowBuilder<R> buildColumn(SerializableFunction<T, R> sourceProperty) {
Class<T> classSubscription = (Class<T>) sourceProperty.getContainingClass();
DefaultEventToColumnHandlerNode<T> columnSubscriber = EventProcessorBuilderService.service().addOrReuse(new DefaultEventToColumnHandlerNode<>(classSubscription));
return new ColumFlowBuilder<>(columnSubscriber).map(sourceProperty);
}

@SuppressWarnings("unckecked")
static <T, R> FlowBuilder<R> subscribe(SerializableFunction<T, R> sourceProperty) {
Class<T> classSubscription = (Class<T>) sourceProperty.getContainingClass();
return new FlowBuilder<>(
EventProcessorBuilderService.service().addOrReuse(new DefaultEventHandlerNode<>(classSubscription)))
.map(sourceProperty);
}

@SuppressWarnings("unckecked")
static <T> IntFlowBuilder subscribeToInt(LambdaReflection.SerializableToIntFunction<T> sourceProperty) {
Class<T> classSubscription = (Class<T>) sourceProperty.getContainingClass();
return new FlowBuilder<>(
EventProcessorBuilderService.service().addOrReuse(new DefaultEventHandlerNode<>(classSubscription)))
.mapToInt(sourceProperty);
}

static <T> DoubleFlowBuilder subscribeToDouble(LambdaReflection.SerializableToDoubleFunction<T> sourceProperty) {
Class<T> classSubscription = (Class<T>) sourceProperty.getContainingClass();
return new FlowBuilder<>(
EventProcessorBuilderService.service().addOrReuse(new DefaultEventHandlerNode<>(classSubscription)))
.mapToDouble(sourceProperty);
}

static <T> LongFlowBuilder subscribeToDouble(LambdaReflection.SerializableToLongFunction<T> sourceProperty) {
Class<T> classSubscription = (Class<T>) sourceProperty.getContainingClass();
return new FlowBuilder<>(
EventProcessorBuilderService.service().addOrReuse(new DefaultEventHandlerNode<>(classSubscription)))
.mapToLong(sourceProperty);
}

/**
* Subscribes to events of type {@literal <T>} filtering by {@link Event#filterString()}. Creates a handler method in the generated {@link com.fluxtion.runtime.StaticEventProcessor}
* so that if {@link com.fluxtion.runtime.StaticEventProcessor#onEvent(Object)} is called an invocation is routed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,7 @@
import com.fluxtion.runtime.dataflow.function.BinaryMapFlowFunction.BinaryMapToRefFlowFunction;
import com.fluxtion.runtime.dataflow.function.MapFlowFunction;
import com.fluxtion.runtime.dataflow.function.MapFlowFunction.MapRef2RefFlowFunction;
import com.fluxtion.runtime.dataflow.groupby.GroupBy;
import com.fluxtion.runtime.dataflow.groupby.GroupByFilterFlowFunctionWrapper;
import com.fluxtion.runtime.dataflow.groupby.GroupByMapFlowFunction;
import com.fluxtion.runtime.dataflow.groupby.GroupByReduceFlowFunction;
import com.fluxtion.runtime.dataflow.groupby.InnerJoin;
import com.fluxtion.runtime.dataflow.groupby.LeftJoin;
import com.fluxtion.runtime.dataflow.groupby.OuterJoin;
import com.fluxtion.runtime.dataflow.groupby.RightJoin;
import com.fluxtion.runtime.dataflow.groupby.*;
import com.fluxtion.runtime.dataflow.helpers.DefaultValue;
import com.fluxtion.runtime.dataflow.helpers.DefaultValue.DefaultValueFromSupplier;
import com.fluxtion.runtime.dataflow.helpers.Peekers;
Expand Down Expand Up @@ -94,7 +87,6 @@ public <T, R> GroupByFlowBuilder<K, R> mapNestedValues(SerializableFunction<T, R
GroupByFlowBuilder<K, VOUT> biMapValuesByKey(
SerializableBiFunction<V, V2, VOUT> mappingBiFunction,
GroupByFlowBuilder<K2, V2> secondArgumentStream) {
GroupByMapFlowFunction invoker = new GroupByMapFlowFunction(null, mappingBiFunction, null);
return biMapValuesByKey(mappingBiFunction, secondArgumentStream, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public IntFlowBuilder map(SerializableIntUnaryOperator int2IntFunction) {
return new IntFlowBuilder(new MapInt2ToIntFlowFunction(eventStream, int2IntFunction));
}

public IntFlowBuilder mapBiFunction(SerializableBiIntFunction int2IntFunction, IntFlowBuilder stream2Builder) {
public IntFlowBuilder map(SerializableBiIntFunction int2IntFunction, IntFlowBuilder stream2Builder) {
return new IntFlowBuilder(
new BinaryMapToIntFlowFunction<>(
eventStream, stream2Builder.eventStream, int2IntFunction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Locale;

import static com.fluxtion.compiler.builder.dataflow.DataFlow.subscribe;
import static com.fluxtion.compiler.builder.dataflow.DataFlow.subscribeToInt;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.closeTo;
Expand All @@ -30,7 +31,7 @@ public void testIntBinaryFunctionWith() {
sep(c ->
subscribe(Data_1.class)
.mapToInt(Data_1::getIntValue)
.mapBiFunction(BinaryMapTest::add, subscribe(Data_2.class).mapToInt(Data_2::getIntValue))
.map(BinaryMapTest::add, subscribe(Data_2.class).mapToInt(Data_2::getIntValue))
.push(new NotifyAndPushTarget()::setIntPushValue)
);
NotifyAndPushTarget target = getField(NotifyAndPushTarget.DEFAULT_NAME);
Expand All @@ -42,6 +43,41 @@ public void testIntBinaryFunctionWith() {
assertThat(target.getIntPushValue(), is(100));
}

@Test
public void testIntBinaryFunctionWithProperty() {
sep(c ->
subscribe(Data_1::getIntValue)
.mapToInt(Integer::intValue)
.map(BinaryMapTest::add, subscribe(Data_2.class).mapToInt(Data_2::getIntValue))
.push(new NotifyAndPushTarget()::setIntPushValue)
);
NotifyAndPushTarget target = getField(NotifyAndPushTarget.DEFAULT_NAME);
onEvent(new Data_1(10));
assertThat(target.getIntPushValue(), is(0));
onEvent(new Data_1(20));
assertThat(target.getIntPushValue(), is(0));
onEvent(new Data_2(80));
assertThat(target.getIntPushValue(), is(100));
}

@Test
public void testIntBinaryFunctionWitIntProperty() {
sep(c -> {
IntFlowBuilder data1ValueStream = subscribeToInt(Data_1::getIntValue);
IntFlowBuilder data2ValueStream = subscribeToInt(Data_2::getIntValue);
data1ValueStream.map(BinaryMapTest::add, data2ValueStream)
.push(new NotifyAndPushTarget()::setIntPushValue);
}
);
NotifyAndPushTarget target = getField(NotifyAndPushTarget.DEFAULT_NAME);
onEvent(new Data_1(10));
assertThat(target.getIntPushValue(), is(0));
onEvent(new Data_1(20));
assertThat(target.getIntPushValue(), is(0));
onEvent(new Data_2(80));
assertThat(target.getIntPushValue(), is(100));
}

@Test
public void testDoubleBinaryFunction() {
sep(c ->
Expand Down Expand Up @@ -82,7 +118,7 @@ public void testIntBinaryFunctionWithDefaultValue() {
sep(c ->
subscribe(Data_1.class)
.mapToInt(Data_1::getIntValue)
.mapBiFunction(BinaryMapTest::add,
.map(BinaryMapTest::add,
subscribe(Data_2.class).mapToInt(Data_2::getIntValue).defaultValue(50)
)
.push(new NotifyAndPushTarget()::setIntPushValue)
Expand All @@ -101,9 +137,9 @@ public void testIntAddStandardFunction() {
sep(c -> {
IntFlowBuilder int1 = subscribe(Data_1.class).mapToInt(Data_1::getIntValue);
IntFlowBuilder int2 = subscribe(Data_2.class).mapToInt(Data_2::getIntValue);
int1.mapBiFunction(Mappers.ADD_INTS, int2).id("add");
int1.mapBiFunction(Mappers.SUBTRACT_INTS, int2).id("subtract");
int1.mapBiFunction(Mappers.MULTIPLY_INTS, int2).id("multiply");
int1.map(Mappers.ADD_INTS, int2).id("add");
int1.map(Mappers.SUBTRACT_INTS, int2).id("subtract");
int1.map(Mappers.MULTIPLY_INTS, int2).id("multiply");
});

onEvent(new Data_1(10));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.fluxtion.compiler.builder.dataflow;

import com.fluxtion.compiler.generation.util.CompiledAndInterpretedSepTest;
import com.fluxtion.compiler.generation.util.MultipleSepTargetInProcessTest;
import com.fluxtion.runtime.dataflow.column.Column;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.junit.Test;

public class ColumnTest extends MultipleSepTargetInProcessTest {
public ColumnTest(CompiledAndInterpretedSepTest.SepTestConfig testConfig) {
super(testConfig);
}

@Test
public void buildColumn() {
writeSourceFile = true;
sep(c -> {
DataFlow.buildColumn(Data.class).map(Data::getName).console("column:{}");
});
onEvent(new Data(12, "Steve H"));
onEvent(new Data(65, "Steve G"));
onEvent(new Data(7, "Bob"));
}

@Test
public void buildColumnFromProperty() {
writeSourceFile = true;
sep(c -> {
DataFlow.buildColumn(Data::getName).console("column:{}");
});
onEvent(new Data(12, "Steve H"));
onEvent(new Data(65, "Steve G"));
onEvent(new Data(7, "Bob"));
}

@Test
public void buildColumnFromPropertyAndMap() {
writeSourceFile = true;
sep(c -> {
DataFlow.buildColumn(Data::getAge)
.map(ColumnTest::multiply10x)
.console("column:{}");
});
onEvent(new Data(12, "Steve H"));
onEvent(new Data(65, "Steve G"));
onEvent(new Data(7, "Bob"));
}

@Test
public void buildColumnPublishToSink() {
writeSourceFile = true;
sep(c -> {
DataFlow.buildColumn(Data::getName).sink("xxx");
});
addSink("xxx", (Column<Data> o) -> {
System.out.println("sink -> " + o.values());
});
onEvent(new Data(12, "Steve H"));
onEvent(new Data(65, "Steve G"));
onEvent(new Data(7, "Bob"));
}

@Test
public void buildColumnFilterPublishToSink() {
writeSourceFile = true;
sep(c -> {
DataFlow.buildColumn(Data::getName).filter(ColumnTest::startsWithSteve).sink("xxx");
});
addSink("xxx", (Column<Data> o) -> {
System.out.println("sink -> " + o.values());
});
onEvent(new Data(12, "Steve H"));
onEvent(new Data(7, "Bob"));
onEvent(new Data(65, "Steve G"));
}

public static boolean startsWithSteve(String in) {
return in.startsWith("Steve");
}

public static int multiply10x(int in) {
return in * 10;
}

@lombok.Data
@AllArgsConstructor
@NoArgsConstructor
public static class Data {
private int age;
private String name;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ public void groupByIdentityTest() {
public void groupByAsListIdentityTest() {
Map<String, List<Data>> expected = new HashMap<>();
sep(c -> {
DataFlow.subscribe(Data.class)


FlowBuilder<Data> dataFlow = subscribe(Data.class);
// dataFlow.map(Data::getValue)

dataFlow
.groupByToList(Data::getName)
.map(GroupBy::toMap).id("results");
});
Expand Down Expand Up @@ -503,12 +508,9 @@ public void biMapKeyedItemFromAnotherStreamTest() {
public void bimapKeyedParamStream() {
Map<String, KeyedData> expected = new HashMap<>();
sep(c -> {
subscribe(KeyedData.class).groupBy(KeyedData::getId)
.biMapValuesByKey(
GroupByTest::applyFactor,
subscribe(Data.class).groupBy(Data::getName).defaultValue(GroupBy.emptyCollection()),
new Data("default", 3)
)
val stream_1 = subscribe(KeyedData.class).groupBy(KeyedData::getId);
val stream_2 = subscribe(Data.class).groupBy(Data::getName).defaultValue(GroupBy.emptyCollection());
stream_1.biMapValuesByKey(GroupByTest::applyFactor, stream_2, new Data("default", 3))
.map(GroupBy::toMap)
.id("results");
});
Expand Down Expand Up @@ -613,8 +615,6 @@ public static class Data3 {
String name;
int value;
int x;


}

@Getter
Expand Down
Loading

0 comments on commit 15fb5fb

Please sign in to comment.