Skip to content

Commit

Permalink
Merge branch 'release/3.0.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
runner committed Dec 12, 2021
2 parents 2bdbbb9 + 261e941 commit 378faa3
Show file tree
Hide file tree
Showing 42 changed files with 2,204 additions and 246 deletions.
7 changes: 1 addition & 6 deletions compiler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Copyright (C) 2018 V12 Technology Ltd.
<parent>
<groupId>com.fluxtion</groupId>
<artifactId>root-parent-pom</artifactId>
<version>3.0.1</version>
<version>3.0.2</version>
<relativePath>../poms/parent-root/pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -177,7 +177,6 @@ Copyright (C) 2018 V12 Technology Ltd.
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<!--<version>1.3.1</version>-->
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
Expand All @@ -192,10 +191,6 @@ Copyright (C) 2018 V12 Technology Ltd.
<artifactId>javapoet</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
</dependency>
<dependency>
<groupId>com.intellij</groupId>
<artifactId>annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.fluxtion.compiler.builder.node;

import com.fluxtion.runtim.audit.NodeNameLookup;
import com.google.auto.service.AutoService;

import java.util.Map;

@AutoService(NodeFactory.class)
public class NodeNameLookupFactory implements NodeFactory<NodeNameLookup> {

static NodeNameLookup SINGLETON = new NodeNameLookup();

@Override
public NodeNameLookup createNode(Map<?, ?> config, NodeRegistry registry) {
registry.registerAuditor(SINGLETON, "nodeNameLookup");
return SINGLETON;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.fluxtion.compiler.builder.stream;

import com.fluxtion.runtim.SepContext;
import com.fluxtion.runtim.partition.LambdaReflection.*;
import com.fluxtion.runtim.stream.*;
import com.fluxtion.runtim.stream.EventStream.DoubleEventStream;
import com.fluxtion.runtim.stream.helpers.DefaultValue;

public class DoubleStreamBuilder<I, S extends EventStream<I>> {

final DoubleEventStream eventStream;

DoubleStreamBuilder(DoubleEventStream eventStream) {
SepContext.service().add(eventStream);
this.eventStream = eventStream;
}

//TRIGGERS - START
public DoubleStreamBuilder<I, S> updateTrigger(Object updateTrigger){
eventStream.setUpdateTriggerNode(StreamHelper.getSource(updateTrigger));
return this;
}

public DoubleStreamBuilder<I, S> publishTrigger(Object publishTrigger){
eventStream.setPublishTriggerNode(StreamHelper.getSource(publishTrigger));
return this;
}

public DoubleStreamBuilder<I, S> resetTrigger(Object resetTrigger){
eventStream.setResetTriggerNode(StreamHelper.getSource(resetTrigger));
return this;
}

public DoubleStreamBuilder<Double, DoubleEventStream> filter(SerializableDoubleFunction<Boolean> filterFunction){
return new DoubleStreamBuilder<>( new FilterEventStream.DoubleFilterEventStream(eventStream, filterFunction));
}

public DoubleStreamBuilder<Double, DoubleEventStream>defaultValue(double defaultValue){
return map(new DefaultValue.DefaultDouble(defaultValue)::getOrDefault);
}

//PROCESSING - START
public DoubleStreamBuilder<Double, DoubleEventStream> map(SerializableDoubleUnaryOperator int2IntFunction) {
return new DoubleStreamBuilder<>(new MapEventStream.MapDouble2ToDoubleEventStream(eventStream, int2IntFunction));
}

public IntStreamBuilder<Double, DoubleEventStream> mapToInt(SerializableDoubleToIntFunction int2IntFunction) {
return new IntStreamBuilder<>(new MapEventStream.MapDouble2ToIntEventStream(eventStream, int2IntFunction));
}

public LongStreamBuilder<Double, DoubleEventStream> mapToLong(SerializableDoubleToLongFunction int2IntFunction) {
return new LongStreamBuilder<>(new MapEventStream.MapDouble2ToLongEventStream(eventStream, int2IntFunction));
}

//OUTPUTS - START
public DoubleStreamBuilder<Double, DoubleEventStream> notify(Object target) {
SepContext.service().add(target);
return new DoubleStreamBuilder<>(new NotifyEventStream.DoubleNotifyEventStream(eventStream, target));
}

public DoubleStreamBuilder<Double, DoubleEventStream> push(SerializableDoubleConsumer pushFunction) {
SepContext.service().add(pushFunction.captured()[0]);
return new DoubleStreamBuilder<>(new PushEventStream.DoublePushEventStream(eventStream, pushFunction));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.fluxtion.compiler.builder.stream;

import com.fluxtion.runtim.event.DefaultFilteredEventHandler;
import com.fluxtion.runtim.stream.NodeEventStream;

public interface EventFlow {

static <T> EventStreamBuilder<T> subscribe(Class<T> classSubscription) {
return new EventStreamBuilder<>(new DefaultFilteredEventHandler<>(classSubscription));
}

static <T> EventStreamBuilder<T> streamFromNode(T source){
return new EventStreamBuilder<>(new NodeEventStream<>(source));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package com.fluxtion.compiler.builder.stream;

import com.fluxtion.runtim.SepContext;
import com.fluxtion.runtim.partition.LambdaReflection;
import com.fluxtion.runtim.partition.LambdaReflection.SerializableConsumer;
import com.fluxtion.runtim.stream.*;
import com.fluxtion.runtim.stream.helpers.DefaultValue;

public class EventStreamBuilder<T> {

final TriggeredEventStream<T> eventStream;

EventStreamBuilder(TriggeredEventStream<T> eventStream) {
SepContext.service().add(eventStream);
this.eventStream = eventStream;
}

//TRIGGERS - START
public EventStreamBuilder<T> updateTrigger(Object updateTrigger){
eventStream.setUpdateTriggerNode(StreamHelper.getSource(updateTrigger));
return this;
}

public EventStreamBuilder<T> publishTrigger(Object publishTrigger){
eventStream.setPublishTriggerNode(StreamHelper.getSource(publishTrigger));
return this;
}

public EventStreamBuilder<T> resetTrigger(Object resetTrigger){
eventStream.setResetTriggerNode(StreamHelper.getSource(resetTrigger));
return this;
}

public EventStreamBuilder<T> filter( LambdaReflection.SerializableFunction<T, Boolean> filterFunction){
return new EventStreamBuilder<>( new FilterEventStream<>(eventStream, filterFunction));
}

public EventStreamBuilder<T> defaultValue(T defaultValue){
return map(new DefaultValue<>(defaultValue)::getOrDefault);
}

//PROCESSING - START
public <R> EventStreamBuilder<R> map(LambdaReflection.SerializableFunction<T, R> mapFunction) {
return new EventStreamBuilder<>( new MapEventStream.MapRef2RefEventStream<>(eventStream, mapFunction));
}

public IntStreamBuilder<T, EventStream<T>> mapToInt(LambdaReflection.SerializableToIntFunction<T> mapFunction) {
return new IntStreamBuilder<>( new MapEventStream.MapRef2ToIntEventStream<>(eventStream, mapFunction));
}

public DoubleStreamBuilder<T, EventStream<T>> mapToDouble(LambdaReflection.SerializableToDoubleFunction<T> mapFunction) {
return new DoubleStreamBuilder<>( new MapEventStream.MapRef2ToDoubleEventStream<>(eventStream, mapFunction));
}

public LongStreamBuilder<T, EventStream<T>> mapToLong(LambdaReflection.SerializableToLongFunction<T> mapFunction) {
return new LongStreamBuilder<>( new MapEventStream.MapRef2ToLongEventStream<>(eventStream, mapFunction));
}

//OUTPUTS - START
public EventStreamBuilder<T> push(SerializableConsumer<T> pushFunction) {
SepContext.service().add(pushFunction.captured()[0]);
return new EventStreamBuilder<>(new PushEventStream<>(eventStream, pushFunction));
}

public EventStreamBuilder<T> notify(Object target) {
SepContext.service().add(target);
return new EventStreamBuilder<>(new NotifyEventStream<>(eventStream, target));
}

public EventStreamBuilder<T> peek(SerializableConsumer<T> peekFunction) {
return new EventStreamBuilder<>(new PeekEventStream<>(eventStream, peekFunction));
}


/*
TODO:
================
De-dupe filter
binaryMap
optional:
================
log - helper function
audit - helper function
merge/zip
flatmap
DONE
================
Default helper
subscribe
wrapNode
updateTrigger
peek
get
push
filter
notify
tests
resetTrigger
publishTrigger
primitive map
primitive tests
*/

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.fluxtion.compiler.builder.stream;

import com.fluxtion.runtim.SepContext;
import com.fluxtion.runtim.partition.LambdaReflection;
import com.fluxtion.runtim.partition.LambdaReflection.SerializableIntConsumer;
import com.fluxtion.runtim.partition.LambdaReflection.SerializableIntFunction;
import com.fluxtion.runtim.partition.LambdaReflection.SerializableIntUnaryOperator;
import com.fluxtion.runtim.stream.*;
import com.fluxtion.runtim.stream.EventStream.IntEventStream;
import com.fluxtion.runtim.stream.helpers.DefaultValue;

public class IntStreamBuilder<I, S extends EventStream<I>> {

final IntEventStream eventStream;

IntStreamBuilder(IntEventStream eventStream) {
SepContext.service().add(eventStream);
this.eventStream = eventStream;
}

//TRIGGERS - START
public IntStreamBuilder<I, S> updateTrigger(Object updateTrigger){
eventStream.setUpdateTriggerNode(StreamHelper.getSource(updateTrigger));
return this;
}

public IntStreamBuilder<I, S> publishTrigger(Object publishTrigger){
eventStream.setPublishTriggerNode(StreamHelper.getSource(publishTrigger));
return this;
}

public IntStreamBuilder<I, S> resetTrigger(Object resetTrigger){
eventStream.setResetTriggerNode(StreamHelper.getSource(resetTrigger));
return this;
}

public IntStreamBuilder<Integer, IntEventStream> filter( SerializableIntFunction<Boolean> filterFunction){
return new IntStreamBuilder<>( new FilterEventStream.IntFilterEventStream(eventStream, filterFunction));
}

public IntStreamBuilder<Integer, IntEventStream> defaultValue( int defaultValue){
return map(new DefaultValue.DefaultInt(defaultValue)::getOrDefault);
}

//PROCESSING - START
public IntStreamBuilder<Integer, IntEventStream> map(SerializableIntUnaryOperator int2IntFunction) {
return new IntStreamBuilder<>(new MapEventStream.MapInt2ToIntEventStream(eventStream, int2IntFunction));
}

public DoubleStreamBuilder<Integer, IntEventStream> mapToDouble(LambdaReflection.SerializableIntToDoubleFunction int2IntFunction) {
return new DoubleStreamBuilder<>(new MapEventStream.MapInt2ToDoubleEventStream(eventStream, int2IntFunction));
}

public LongStreamBuilder<Integer, IntEventStream> mapToLong(LambdaReflection.SerializableIntToLongFunction int2IntFunction) {
return new LongStreamBuilder<>(new MapEventStream.MapInt2ToLongEventStream(eventStream, int2IntFunction));
}

//OUTPUTS - START
public IntStreamBuilder<Integer, IntEventStream> notify(Object target) {
SepContext.service().add(target);
return new IntStreamBuilder<>(new NotifyEventStream.IntNotifyEventStream(eventStream, target));
}

public IntStreamBuilder<Integer, IntEventStream> push(SerializableIntConsumer pushFunction) {
SepContext.service().add(pushFunction.captured()[0]);
return new IntStreamBuilder<>(new PushEventStream.IntPushEventStream(eventStream, pushFunction));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.fluxtion.compiler.builder.stream;

import com.fluxtion.runtim.SepContext;
import com.fluxtion.runtim.partition.LambdaReflection;
import com.fluxtion.runtim.partition.LambdaReflection.SerializableLongConsumer;
import com.fluxtion.runtim.partition.LambdaReflection.SerializableLongFunction;
import com.fluxtion.runtim.partition.LambdaReflection.SerializableLongUnaryOperator;
import com.fluxtion.runtim.stream.*;
import com.fluxtion.runtim.stream.EventStream.LongEventStream;
import com.fluxtion.runtim.stream.helpers.DefaultValue;

public class LongStreamBuilder<I, S extends EventStream<I>> {

final LongEventStream eventStream;

LongStreamBuilder(LongEventStream eventStream) {
SepContext.service().add(eventStream);
this.eventStream = eventStream;
}

//TRIGGERS - START
public LongStreamBuilder<I, S> updateTrigger(Object updateTrigger){
eventStream.setUpdateTriggerNode(StreamHelper.getSource(updateTrigger));
return this;
}

public LongStreamBuilder<I, S> publishTrigger(Object publishTrigger){
eventStream.setPublishTriggerNode(StreamHelper.getSource(publishTrigger));
return this;
}

public LongStreamBuilder<I, S> resetTrigger(Object resetTrigger){
eventStream.setResetTriggerNode(StreamHelper.getSource(resetTrigger));
return this;
}

public LongStreamBuilder<Long, LongEventStream> filter(SerializableLongFunction<Boolean> filterFunction){
return new LongStreamBuilder<>( new FilterEventStream.LongFilterEventStream(eventStream, filterFunction));
}

public LongStreamBuilder<Long, LongEventStream> defaultValue(long defaultValue){
return map(new DefaultValue.DefaultLong(defaultValue)::getOrDefault);
}

//PROCESSING - START
public LongStreamBuilder<Long, LongEventStream> map(SerializableLongUnaryOperator int2IntFunction) {
return new LongStreamBuilder<>(new MapEventStream.MapLong2ToLongEventStream(eventStream, int2IntFunction));
}

public IntStreamBuilder<Long, LongEventStream> mapToInt(LambdaReflection.SerializableLongToIntFunction int2IntFunction) {
return new IntStreamBuilder<>(new MapEventStream.MapLong2ToIntEventStream(eventStream, int2IntFunction));
}

public DoubleStreamBuilder<Long, LongEventStream> mapToDouble(LambdaReflection.SerializableLongToDoubleFunction int2IntFunction) {
return new DoubleStreamBuilder<>(new MapEventStream.MapLong2ToDoubleEventStream(eventStream, int2IntFunction));
}

//OUTPUTS - START
public LongStreamBuilder<Long, LongEventStream> notify(Object target) {
SepContext.service().add(target);
return new LongStreamBuilder<>(new NotifyEventStream.LongNotifyEventStream(eventStream, target));
}

public LongStreamBuilder<Long, LongEventStream> push(SerializableLongConsumer pushFunction) {
SepContext.service().add(pushFunction.captured()[0]);
return new LongStreamBuilder<>(new PushEventStream.LongPushEventStream(eventStream, pushFunction));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.fluxtion.compiler.builder.stream;

class StreamHelper {
static Object getSource(Object input) {
Object returnValue = input;
if (input instanceof EventStreamBuilder<?>) {
EventStreamBuilder<?> eventStreamBuilder = (EventStreamBuilder<?>) input;
returnValue = eventStreamBuilder.eventStream;
} else if (input instanceof IntStreamBuilder<?, ?>) {
IntStreamBuilder<?, ?> eventStreamBuilder = (IntStreamBuilder<?, ?>) input;
returnValue = eventStreamBuilder.eventStream;
}
return returnValue;
}

}
Loading

0 comments on commit 378faa3

Please sign in to comment.