diff --git a/cookbook/src/main/java/com/fluxtion/example/cookbook/racing/RaceCalculator.java b/cookbook/src/main/java/com/fluxtion/example/cookbook/racing/RaceCalculator.java new file mode 100644 index 0000000..dcb1ab5 --- /dev/null +++ b/cookbook/src/main/java/com/fluxtion/example/cookbook/racing/RaceCalculator.java @@ -0,0 +1,60 @@ +package com.fluxtion.example.cookbook.racing; + +import com.fluxtion.runtime.annotations.ExportService; +import com.fluxtion.runtime.annotations.OnEventHandler; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; + +public class RaceCalculator { + + public record RunnerStarted(long runnerId, Instant startTime) { + } + + public record RunnerFinished(long runnerId, Instant finishTime) { + } + + public interface ResultsPublisher { + void publishAllResults(); + } + + @Getter + public static class RaceTimeTracker { + + private final transient Map runnerRaceTimeMap = new HashMap<>(); + + @OnEventHandler(propagate = false) + public boolean runnerStarted(RunnerStarted runnerStarted) { + //add runner start time to map + return false; + } + + @OnEventHandler + public boolean runnerFinished(RunnerFinished runnerFinished) { + //calc runner total race time and add to map + return true; + } + } + + @RequiredArgsConstructor + public static class ResultsPublisherImpl implements @ExportService ResultsPublisher{ + + private final RaceTimeTracker raceTimeTracker; + + @OnEventHandler(propagate = false) + public boolean runnerFinished(RunnerFinished runnerFinished) { + //get the runner race time and send individual their results + long raceTime = raceTimeTracker.getRunnerRaceTimeMap().get(runnerFinished.runnerId()); + return false; + } + + @Override + public void publishAllResults() { + //get all results and publish + var runnerRaceTimeMap = raceTimeTracker.getRunnerRaceTimeMap(); + } + } +} diff --git a/cookbook/src/main/java/com/fluxtion/example/cookbook/racing/RaceCalculatorAotBuilder.java b/cookbook/src/main/java/com/fluxtion/example/cookbook/racing/RaceCalculatorAotBuilder.java new file mode 100644 index 0000000..c0c2be1 --- /dev/null +++ b/cookbook/src/main/java/com/fluxtion/example/cookbook/racing/RaceCalculatorAotBuilder.java @@ -0,0 +1,22 @@ +package com.fluxtion.example.cookbook.racing; + +import com.fluxtion.compiler.EventProcessorConfig; +import com.fluxtion.compiler.FluxtionCompilerConfig; +import com.fluxtion.compiler.FluxtionGraphBuilder; +import com.fluxtion.example.cookbook.racing.RaceCalculator.RaceTimeTracker; +import com.fluxtion.example.cookbook.racing.RaceCalculator.ResultsPublisherImpl; + +public class RaceCalculatorAotBuilder implements FluxtionGraphBuilder { + @Override + public void buildGraph(EventProcessorConfig eventProcessorConfig) { + RaceTimeTracker raceCalculator = eventProcessorConfig.addNode(new RaceTimeTracker(), "raceCalculator"); + eventProcessorConfig.addNode(new ResultsPublisherImpl(raceCalculator), "resultsPublisher"); + } + + @Override + public void configureGeneration(FluxtionCompilerConfig fluxtionCompilerConfig) { + fluxtionCompilerConfig.setClassName("RaceCalculatorProcessor"); + fluxtionCompilerConfig.setPackageName("com.fluxtion.example.cookbook.racing.generated"); + } +} + diff --git a/cookbook/src/main/java/com/fluxtion/example/cookbook/racing/RaceCalculatorApp.java b/cookbook/src/main/java/com/fluxtion/example/cookbook/racing/RaceCalculatorApp.java new file mode 100644 index 0000000..cfae436 --- /dev/null +++ b/cookbook/src/main/java/com/fluxtion/example/cookbook/racing/RaceCalculatorApp.java @@ -0,0 +1,28 @@ +package com.fluxtion.example.cookbook.racing; + +import com.fluxtion.example.cookbook.racing.generated.RaceCalculatorProcessor; + +import java.time.Instant; + +import static com.fluxtion.example.cookbook.racing.RaceCalculator.*; + +public class RaceCalculatorApp { + public static void main(String[] args) { + RaceCalculatorProcessor raceCalculatorProcessor = new RaceCalculatorProcessor(); + raceCalculatorProcessor.init(); + + ResultsPublisher resultsPublisher = raceCalculatorProcessor.getExportedService(); + + //connect to event stream and process runner timing events + raceCalculatorProcessor.onEvent(new RunnerStarted(1, Instant.now())); + raceCalculatorProcessor.onEvent(new RunnerStarted(2, Instant.now())); + raceCalculatorProcessor.onEvent(new RunnerStarted(3, Instant.now())); + + raceCalculatorProcessor.onEvent(new RunnerFinished(2, Instant.now())); + raceCalculatorProcessor.onEvent(new RunnerFinished(3, Instant.now())); + raceCalculatorProcessor.onEvent(new RunnerFinished(1, Instant.now())); + + //publish full results + resultsPublisher.publishAllResults(); + } +} diff --git a/cookbook/src/main/java/com/fluxtion/example/cookbook/racing/generated/RaceCalculatorProcessor.java b/cookbook/src/main/java/com/fluxtion/example/cookbook/racing/generated/RaceCalculatorProcessor.java new file mode 100644 index 0000000..9f1a707 --- /dev/null +++ b/cookbook/src/main/java/com/fluxtion/example/cookbook/racing/generated/RaceCalculatorProcessor.java @@ -0,0 +1,386 @@ +/* +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the Server Side Public License, version 1, +* as published by MongoDB, Inc. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* Server Side License for more details. +* +* You should have received a copy of the Server Side Public License +* along with this program. If not, see +* +. +*/ +package com.fluxtion.example.cookbook.racing.generated; + +import com.fluxtion.runtime.StaticEventProcessor; +import com.fluxtion.runtime.lifecycle.BatchHandler; +import com.fluxtion.runtime.lifecycle.Lifecycle; +import com.fluxtion.runtime.EventProcessor; +import com.fluxtion.runtime.callback.InternalEventProcessor; +import com.fluxtion.example.cookbook.racing.RaceCalculator.RaceTimeTracker; +import com.fluxtion.example.cookbook.racing.RaceCalculator.ResultsPublisher; +import com.fluxtion.example.cookbook.racing.RaceCalculator.ResultsPublisherImpl; +import com.fluxtion.example.cookbook.racing.RaceCalculator.RunnerFinished; +import com.fluxtion.example.cookbook.racing.RaceCalculator.RunnerStarted; +import com.fluxtion.runtime.EventProcessorContext; +import com.fluxtion.runtime.audit.Auditor; +import com.fluxtion.runtime.audit.EventLogManager; +import com.fluxtion.runtime.audit.NodeNameAuditor; +import com.fluxtion.runtime.callback.CallbackDispatcherImpl; +import com.fluxtion.runtime.callback.ExportFunctionAuditEvent; +import com.fluxtion.runtime.event.Event; +import com.fluxtion.runtime.input.EventFeed; +import com.fluxtion.runtime.input.SubscriptionManagerNode; +import com.fluxtion.runtime.node.ForkedTriggerTask; +import com.fluxtion.runtime.node.MutableEventProcessorContext; +import com.fluxtion.runtime.time.Clock; +import com.fluxtion.runtime.time.ClockStrategy.ClockStrategyEvent; +import java.util.Map; + +import java.util.IdentityHashMap; +import java.util.function.BooleanSupplier; +import java.util.function.Consumer; + +/** + * + * + *
+ * generation time                 : Not available
+ * eventProcessorGenerator version : 9.2.17
+ * api version                     : 9.2.17
+ * 
+ * + * Event classes supported: + * + * + * + * @author Greg Higgins + */ +@SuppressWarnings({"unchecked", "rawtypes"}) +public class RaceCalculatorProcessor + implements EventProcessor, + StaticEventProcessor, + InternalEventProcessor, + BatchHandler, + Lifecycle, + ResultsPublisher { + + // Node declarations + private final CallbackDispatcherImpl callbackDispatcher = new CallbackDispatcherImpl(); + public final NodeNameAuditor nodeNameLookup = new NodeNameAuditor(); + public final RaceTimeTracker raceCalculator = new RaceTimeTracker(); + public final ResultsPublisherImpl resultsPublisher = new ResultsPublisherImpl(raceCalculator); + private final SubscriptionManagerNode subscriptionManager = new SubscriptionManagerNode(); + private final MutableEventProcessorContext context = + new MutableEventProcessorContext( + nodeNameLookup, callbackDispatcher, subscriptionManager, callbackDispatcher); + public final Clock clock = new Clock(); + private final ExportFunctionAuditEvent functionAudit = new ExportFunctionAuditEvent(); + // Dirty flags + private boolean initCalled = false; + private boolean processing = false; + private boolean buffering = false; + private final IdentityHashMap dirtyFlagSupplierMap = + new IdentityHashMap<>(1); + private final IdentityHashMap> dirtyFlagUpdateMap = + new IdentityHashMap<>(1); + + private boolean isDirty_raceCalculator = false; + // Forked declarations + + // Filter constants + + public RaceCalculatorProcessor(Map contextMap) { + context.replaceMappings(contextMap); + // node auditors + initialiseAuditor(clock); + initialiseAuditor(nodeNameLookup); + subscriptionManager.setSubscribingEventProcessor(this); + context.setEventProcessorCallback(this); + } + + public RaceCalculatorProcessor() { + this(null); + } + + @Override + public void init() { + initCalled = true; + auditEvent(Lifecycle.LifecycleEvent.Init); + // initialise dirty lookup map + isDirty("test"); + clock.init(); + afterEvent(); + } + + @Override + public void start() { + if (!initCalled) { + throw new RuntimeException("init() must be called before start()"); + } + processing = true; + auditEvent(Lifecycle.LifecycleEvent.Start); + + afterEvent(); + callbackDispatcher.dispatchQueuedCallbacks(); + processing = false; + } + + @Override + public void stop() { + if (!initCalled) { + throw new RuntimeException("init() must be called before stop()"); + } + processing = true; + auditEvent(Lifecycle.LifecycleEvent.Stop); + + afterEvent(); + callbackDispatcher.dispatchQueuedCallbacks(); + processing = false; + } + + @Override + public void tearDown() { + initCalled = false; + auditEvent(Lifecycle.LifecycleEvent.TearDown); + nodeNameLookup.tearDown(); + clock.tearDown(); + subscriptionManager.tearDown(); + afterEvent(); + } + + @Override + public void setContextParameterMap(Map newContextMapping) { + context.replaceMappings(newContextMapping); + } + + @Override + public void addContextParameter(Object key, Object value) { + context.addMapping(key, value); + } + + // EVENT DISPATCH - START + @Override + public void onEvent(Object event) { + if (buffering) { + triggerCalculation(); + } + if (processing) { + callbackDispatcher.processReentrantEvent(event); + } else { + processing = true; + onEventInternal(event); + callbackDispatcher.dispatchQueuedCallbacks(); + processing = false; + } + } + + @Override + public void onEventInternal(Object event) { + if (event instanceof com.fluxtion.example.cookbook.racing.RaceCalculator.RunnerFinished) { + RunnerFinished typedEvent = (RunnerFinished) event; + handleEvent(typedEvent); + } else if (event instanceof com.fluxtion.example.cookbook.racing.RaceCalculator.RunnerStarted) { + RunnerStarted typedEvent = (RunnerStarted) event; + handleEvent(typedEvent); + } else if (event instanceof com.fluxtion.runtime.time.ClockStrategy.ClockStrategyEvent) { + ClockStrategyEvent typedEvent = (ClockStrategyEvent) event; + handleEvent(typedEvent); + } + } + + public void handleEvent(RunnerFinished typedEvent) { + auditEvent(typedEvent); + // Default, no filter methods + isDirty_raceCalculator = raceCalculator.runnerFinished(typedEvent); + resultsPublisher.runnerFinished(typedEvent); + afterEvent(); + } + + public void handleEvent(RunnerStarted typedEvent) { + auditEvent(typedEvent); + // Default, no filter methods + isDirty_raceCalculator = raceCalculator.runnerStarted(typedEvent); + afterEvent(); + } + + public void handleEvent(ClockStrategyEvent typedEvent) { + auditEvent(typedEvent); + // Default, no filter methods + clock.setClockStrategy(typedEvent); + afterEvent(); + } + // EVENT DISPATCH - END + + // EXPORTED SERVICE FUNCTIONS - START + @Override + public void publishAllResults() { + beforeServiceCall( + "public void com.fluxtion.example.cookbook.racing.RaceCalculator$ResultsPublisherImpl.publishAllResults()"); + ExportFunctionAuditEvent typedEvent = functionAudit; + resultsPublisher.publishAllResults(); + afterServiceCall(); + } + // EXPORTED SERVICE FUNCTIONS - END + + public void bufferEvent(Object event) { + buffering = true; + if (event instanceof com.fluxtion.example.cookbook.racing.RaceCalculator.RunnerFinished) { + RunnerFinished typedEvent = (RunnerFinished) event; + auditEvent(typedEvent); + isDirty_raceCalculator = raceCalculator.runnerFinished(typedEvent); + resultsPublisher.runnerFinished(typedEvent); + } else if (event instanceof com.fluxtion.example.cookbook.racing.RaceCalculator.RunnerStarted) { + RunnerStarted typedEvent = (RunnerStarted) event; + auditEvent(typedEvent); + isDirty_raceCalculator = raceCalculator.runnerStarted(typedEvent); + } else if (event instanceof com.fluxtion.runtime.time.ClockStrategy.ClockStrategyEvent) { + ClockStrategyEvent typedEvent = (ClockStrategyEvent) event; + auditEvent(typedEvent); + clock.setClockStrategy(typedEvent); + } + } + + public void triggerCalculation() { + buffering = false; + String typedEvent = "No event information - buffered dispatch"; + afterEvent(); + } + + private void auditEvent(Object typedEvent) { + clock.eventReceived(typedEvent); + nodeNameLookup.eventReceived(typedEvent); + } + + private void auditEvent(Event typedEvent) { + clock.eventReceived(typedEvent); + nodeNameLookup.eventReceived(typedEvent); + } + + private void initialiseAuditor(Auditor auditor) { + auditor.init(); + auditor.nodeRegistered(raceCalculator, "raceCalculator"); + auditor.nodeRegistered(resultsPublisher, "resultsPublisher"); + auditor.nodeRegistered(callbackDispatcher, "callbackDispatcher"); + auditor.nodeRegistered(subscriptionManager, "subscriptionManager"); + auditor.nodeRegistered(context, "context"); + } + + private void beforeServiceCall(String functionDescription) { + functionAudit.setFunctionDescription(functionDescription); + auditEvent(functionAudit); + if (buffering) { + triggerCalculation(); + } + processing = true; + } + + private void afterServiceCall() { + afterEvent(); + callbackDispatcher.dispatchQueuedCallbacks(); + processing = false; + } + + private void afterEvent() { + + clock.processingComplete(); + nodeNameLookup.processingComplete(); + isDirty_raceCalculator = false; + } + + @Override + public void batchPause() { + auditEvent(Lifecycle.LifecycleEvent.BatchPause); + processing = true; + + afterEvent(); + callbackDispatcher.dispatchQueuedCallbacks(); + processing = false; + } + + @Override + public void batchEnd() { + auditEvent(Lifecycle.LifecycleEvent.BatchEnd); + processing = true; + + afterEvent(); + callbackDispatcher.dispatchQueuedCallbacks(); + processing = false; + } + + @Override + public boolean isDirty(Object node) { + return dirtySupplier(node).getAsBoolean(); + } + + @Override + public BooleanSupplier dirtySupplier(Object node) { + if (dirtyFlagSupplierMap.isEmpty()) { + dirtyFlagSupplierMap.put(raceCalculator, () -> isDirty_raceCalculator); + } + return dirtyFlagSupplierMap.getOrDefault(node, StaticEventProcessor.ALWAYS_FALSE); + } + + @Override + public void setDirty(Object node, boolean dirtyFlag) { + if (dirtyFlagUpdateMap.isEmpty()) { + dirtyFlagUpdateMap.put(raceCalculator, (b) -> isDirty_raceCalculator = b); + } + dirtyFlagUpdateMap.get(node).accept(dirtyFlag); + } + + private boolean guardCheck_resultsPublisher() { + return isDirty_raceCalculator; + } + + @Override + public T getNodeById(String id) throws NoSuchFieldException { + return nodeNameLookup.getInstanceById(id); + } + + @Override + public A getAuditorById(String id) + throws NoSuchFieldException, IllegalAccessException { + return (A) this.getClass().getField(id).get(this); + } + + @Override + public void addEventFeed(EventFeed eventProcessorFeed) { + subscriptionManager.addEventProcessorFeed(eventProcessorFeed); + } + + @Override + public void removeEventFeed(EventFeed eventProcessorFeed) { + subscriptionManager.removeEventProcessorFeed(eventProcessorFeed); + } + + @Override + public RaceCalculatorProcessor newInstance() { + return new RaceCalculatorProcessor(); + } + + @Override + public RaceCalculatorProcessor newInstance(Map contextMap) { + return new RaceCalculatorProcessor(); + } + + @Override + public String getLastAuditLogRecord() { + try { + EventLogManager eventLogManager = + (EventLogManager) this.getClass().getField(EventLogManager.NODE_NAME).get(this); + return eventLogManager.lastRecordAsString(); + } catch (Throwable e) { + return ""; + } + } +} diff --git a/cookbook/src/main/resources/com/fluxtion/example/cookbook/racing/generated/RaceCalculatorProcessor.graphml b/cookbook/src/main/resources/com/fluxtion/example/cookbook/racing/generated/RaceCalculatorProcessor.graphml new file mode 100644 index 0000000..17d0415 --- /dev/null +++ b/cookbook/src/main/resources/com/fluxtion/example/cookbook/racing/generated/RaceCalculatorProcessor.graphml @@ -0,0 +1,127 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/cookbook/src/main/resources/com/fluxtion/example/cookbook/racing/generated/RaceCalculatorProcessor.png b/cookbook/src/main/resources/com/fluxtion/example/cookbook/racing/generated/RaceCalculatorProcessor.png new file mode 100644 index 0000000..c1a5a29 Binary files /dev/null and b/cookbook/src/main/resources/com/fluxtion/example/cookbook/racing/generated/RaceCalculatorProcessor.png differ