Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hotfix/2025 02 12 #153

Merged
merged 13 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion docs/Release_Notes.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
## JPO Conflict Monitor Release Notes

## Version 1.3.0
## Version 2.0.1
Hotfix for 2025 Q1 CIMMS Release
This fix adds a check to the ProgressionEvents preventing an issue where queries are run with invalid start and end query parameters.
- Removes extraneous print statements
- Adds time check to ProgressionEvents

## Version 2.0.0

### **Summary**
The forth release for the jpo-conflictmonitor, version 1.3.0
Expand Down
2 changes: 1 addition & 1 deletion jpo-conflictmonitor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</parent>
<groupId>usdot.jpo.ode</groupId>
<artifactId>jpo-conflictmonitor</artifactId>
<version>2.0.0-SNAPSHOT</version>
<version>2.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>jpo-conflictmonitor</name>
<url>http://maven.apache.org</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ public MonitorServiceController(final ConflictMonitorProperties conflictMonitorP
startSpatMessageCountProgressionAlgorithm();

//Bsm Message Count Progression Topology
// startBsmMessageCountProgressionAlgorithm(); // Disabled until ProcessedBSMs get fully integrated
startBsmMessageCountProgressionAlgorithm();

// Combined Event Topology
final String event = "event";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@ public class BsmIntersectionIdKey extends RsuIntersectionKey {

String bsmId;

/**
* Creates a new Empty BsmIntersectionIdKey
*/
public BsmIntersectionIdKey() {}

/**
* @param bsmid the BSM to add to the aggregation
* @return BsmAggregator returns this instance of the BSMAggregator
*/
public BsmIntersectionIdKey(String bsmId, String rsuId, int intersectionId) {
super(rsuId, intersectionId);
this.bsmId = bsmId;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -68,49 +68,54 @@ public void process(Record<RsuIntersectionSignalGroupKey, SpatMovementState> rec
startTime = Instant.ofEpochMilli(context().currentStreamTimeMs())
.minusMillis(parameters.getBufferTimeMs());
}
var query =

// Verify that the exclude grace period is after the start time of the query
if(excludeGracePeriod.compareTo(startTime) > 0){
var query =
MultiVersionedKeyQuery.<RsuIntersectionSignalGroupKey, SpatMovementState>withKey(record.key())
.fromTime(startTime)
.toTime(excludeGracePeriod)
.withAscendingTimestamps();


QueryResult<VersionedRecordIterator<SpatMovementState>> result =
stateStore.query(query,
PositionBound.unbounded(),
new QueryConfig(false));
QueryResult<VersionedRecordIterator<SpatMovementState>> result =
stateStore.query(query,
PositionBound.unbounded(),
new QueryConfig(false));

if (result.isSuccess()) {
if (result.isSuccess()) {

// Identify transitions, and forward transition messages
VersionedRecordIterator<SpatMovementState> iterator = result.getResult();
SpatMovementState previousState = null;
// Identify transitions, and forward transition messages
VersionedRecordIterator<SpatMovementState> iterator = result.getResult();
SpatMovementState previousState = null;

while (iterator.hasNext()) {
final VersionedRecord<SpatMovementState> state = iterator.next();
final SpatMovementState thisState = state.value();
if (previousState != null && previousState.getPhaseState() != thisState.getPhaseState()) {
while (iterator.hasNext()) {
final VersionedRecord<SpatMovementState> state = iterator.next();
final SpatMovementState thisState = state.value();
if (previousState != null && previousState.getPhaseState() != thisState.getPhaseState()) {

if (parameters.isDebug()) {
log.info("transition detected at timestamp {} -> {}, signal group {}, {} -> {}",
previousState.getUtcTimeStamp(),state.timestamp(),
record.key().getSignalGroup(), previousState.getPhaseState(), thisState.getPhaseState());
}
if (parameters.isDebug()) {
log.info("transition detected at timestamp {} -> {}, signal group {}, {} -> {}",
previousState.getUtcTimeStamp(),state.timestamp(),
record.key().getSignalGroup(), previousState.getPhaseState(), thisState.getPhaseState());
}

latestTransitionStore.put(record.key(), record.timestamp());
latestTransitionStore.put(record.key(), record.timestamp());

// Transition detected,
context().forward(record
.withTimestamp(state.timestamp())
.withValue(new SpatMovementStateTransition(previousState, thisState)));
// Transition detected,
context().forward(record
.withTimestamp(state.timestamp())
.withValue(new SpatMovementStateTransition(previousState, thisState)));

}
previousState = thisState;
}
previousState = thisState;
} else {
log.error("Failed to query state store: {}", result.getFailureMessage());
}
} else {
log.error("Failed to query state store: {}", result.getFailureMessage());
}else{
log.warn("Skipping Query for Event State Progression Processor because start time " + startTime + " did not happen before end time: " + excludeGracePeriod);
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,115 +1,115 @@
// package us.dot.its.jpo.conflictmonitor.monitor.topologies;
package us.dot.its.jpo.conflictmonitor.monitor.topologies;

// import lombok.extern.slf4j.Slf4j;
// import org.apache.kafka.common.serialization.Serdes;
// import org.apache.kafka.streams.StreamsBuilder;
// import org.apache.kafka.streams.Topology;
// import org.apache.kafka.streams.kstream.Consumed;
// import org.apache.kafka.streams.kstream.KStream;
// import org.apache.kafka.streams.kstream.Produced;
// import org.apache.kafka.streams.kstream.Repartitioned;
// import org.apache.kafka.streams.state.Stores;
// import org.slf4j.Logger;
// import org.springframework.stereotype.Component;
// import us.dot.its.jpo.conflictmonitor.monitor.algorithms.BaseStreamsTopology;
// import us.dot.its.jpo.conflictmonitor.monitor.algorithms.aggregation.bsm_message_count_progression.BsmMessageCountProgressionAggregationAlgorithm;
// import us.dot.its.jpo.conflictmonitor.monitor.algorithms.aggregation.bsm_message_count_progression.BsmMessageCountProgressionAggregationKey;
// import us.dot.its.jpo.conflictmonitor.monitor.algorithms.aggregation.bsm_message_count_progression.BsmMessageCountProgressionAggregationStreamsAlgorithm;
// import us.dot.its.jpo.conflictmonitor.monitor.algorithms.bsm_message_count_progression.BsmMessageCountProgressionParameters;
// import us.dot.its.jpo.conflictmonitor.monitor.algorithms.bsm_message_count_progression.BsmMessageCountProgressionStreamsAlgorithm;
// import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmRsuIdKey;
// import us.dot.its.jpo.geojsonconverter.partitioner.RsuIdPartitioner;
// import us.dot.its.jpo.geojsonconverter.pojos.geojson.bsm.ProcessedBsm;
// import us.dot.its.jpo.geojsonconverter.pojos.geojson.Point;
// import us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes;
// import us.dot.its.jpo.conflictmonitor.monitor.processors.BsmMessageCountProgressionProcessor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.springframework.stereotype.Component;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.BaseStreamsTopology;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.aggregation.bsm_message_count_progression.BsmMessageCountProgressionAggregationAlgorithm;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.aggregation.bsm_message_count_progression.BsmMessageCountProgressionAggregationKey;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.aggregation.bsm_message_count_progression.BsmMessageCountProgressionAggregationStreamsAlgorithm;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.bsm_message_count_progression.BsmMessageCountProgressionParameters;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.bsm_message_count_progression.BsmMessageCountProgressionStreamsAlgorithm;
import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmRsuIdKey;
import us.dot.its.jpo.geojsonconverter.partitioner.RsuIdPartitioner;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.bsm.ProcessedBsm;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.Point;
import us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes;
import us.dot.its.jpo.conflictmonitor.monitor.processors.BsmMessageCountProgressionProcessor;

// import java.time.Duration;
import java.time.Duration;

// import static us.dot.its.jpo.conflictmonitor.monitor.algorithms.bsm_message_count_progression.BsmMessageCountProgressionConstants.DEFAULT_BSM_MESSAGE_COUNT_PROGRESSION_ALGORITHM;
import static us.dot.its.jpo.conflictmonitor.monitor.algorithms.bsm_message_count_progression.BsmMessageCountProgressionConstants.DEFAULT_BSM_MESSAGE_COUNT_PROGRESSION_ALGORITHM;

// @Component(DEFAULT_BSM_MESSAGE_COUNT_PROGRESSION_ALGORITHM)
// @Slf4j
// public class BsmMessageCountProgressionTopology
// extends BaseStreamsTopology<BsmMessageCountProgressionParameters>
// implements BsmMessageCountProgressionStreamsAlgorithm {
@Component(DEFAULT_BSM_MESSAGE_COUNT_PROGRESSION_ALGORITHM)
@Slf4j
public class BsmMessageCountProgressionTopology
extends BaseStreamsTopology<BsmMessageCountProgressionParameters>
implements BsmMessageCountProgressionStreamsAlgorithm {

// @Override
// protected Logger getLogger() {
// return log;
// }
@Override
protected Logger getLogger() {
return log;
}

// BsmMessageCountProgressionAggregationStreamsAlgorithm aggregationAlgorithm;
BsmMessageCountProgressionAggregationStreamsAlgorithm aggregationAlgorithm;

// @Override
// public Topology buildTopology() {
// StreamsBuilder builder = new StreamsBuilder();
@Override
public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();

// final String processedBsmStateStore = parameters.getProcessedBsmStateStoreName();
// final String latestBsmStateStore = parameters.getLatestBsmStateStoreName();
// final Duration retentionTime = Duration.ofMillis(parameters.getBufferTimeMs());
final String processedBsmStateStore = parameters.getProcessedBsmStateStoreName();
final String latestBsmStateStore = parameters.getLatestBsmStateStoreName();
final Duration retentionTime = Duration.ofMillis(parameters.getBufferTimeMs());

// builder.addStateStore(
// Stores.versionedKeyValueStoreBuilder(
// Stores.persistentVersionedKeyValueStore(processedBsmStateStore, retentionTime),
// us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.BsmRsuIdKey(),
// JsonSerdes.ProcessedBsm()
// )
// );
builder.addStateStore(
Stores.versionedKeyValueStoreBuilder(
Stores.persistentVersionedKeyValueStore(processedBsmStateStore, retentionTime),
us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.BsmRsuIdKey(),
JsonSerdes.ProcessedBsm()
)
);

// builder.addStateStore(
// Stores.keyValueStoreBuilder(
// Stores.persistentKeyValueStore(latestBsmStateStore),
// us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.BsmRsuIdKey(),
// JsonSerdes.ProcessedBsm()
// )
// );
// KStream<BsmRsuIdKey, ProcessedBsm<Point>> inputStream = builder.stream(parameters.getBsmInputTopicName(),
// Consumed.with(
// us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.BsmRsuIdKey(),
// JsonSerdes.ProcessedBsm()));
builder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(latestBsmStateStore),
us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.BsmRsuIdKey(),
JsonSerdes.ProcessedBsm()
)
);
KStream<BsmRsuIdKey, ProcessedBsm<Point>> inputStream = builder.stream(parameters.getBsmInputTopicName(),
Consumed.with(
us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.BsmRsuIdKey(),
JsonSerdes.ProcessedBsm()));

// inputStream.print(Printed.toSysOut());

// var eventStream = inputStream
// .process(() -> new BsmMessageCountProgressionProcessor<>(parameters), processedBsmStateStore, latestBsmStateStore);
var eventStream = inputStream
.process(() -> new BsmMessageCountProgressionProcessor<>(parameters), processedBsmStateStore, latestBsmStateStore);

// if (parameters.isAggregateEvents()) {
// // Aggregate events
// // Select new key that includes all fields to aggregate on
// var aggKeyStream = eventStream.selectKey((key, value) -> {
// var aggKey = new BsmMessageCountProgressionAggregationKey();
// aggKey.setRsuId(key.getRsuId());
// aggKey.setBsmId(key.getBsmId());
// // TODO:
// //aggKey.setDataFrame(value.getDataFrame());
// //aggKey.setChange(value.getChange());
// return aggKey;
// })
// // Use RsuIdKey partitioner, which partitions only on the RSU ID, so the partitioning won't actually change
// .repartition(
// Repartitioned.with(
// us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.BsmMessageCountProgressionAggregationKey(),
// us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.BsmMessageCountProgressionEvent())
// .withStreamPartitioner(new RsuIdPartitioner<>()));
// // Plug in the aggregation algorithm
// aggregationAlgorithm.buildTopology(builder, aggKeyStream);
// } else {
// // Don't aggregate events
// eventStream.to(parameters.getBsmMessageCountProgressionEventOutputTopicName(),
// Produced.with(
// us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.BsmRsuIdKey(),
// us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.BsmMessageCountProgressionEvent()));
// }
// return builder.build();
// }
if (parameters.isAggregateEvents()) {
// Aggregate events
// Select new key that includes all fields to aggregate on
var aggKeyStream = eventStream.selectKey((key, value) -> {
var aggKey = new BsmMessageCountProgressionAggregationKey();
aggKey.setRsuId(key.getRsuId());
aggKey.setBsmId(key.getBsmId());
// TODO:
//aggKey.setDataFrame(value.getDataFrame());
//aggKey.setChange(value.getChange());
return aggKey;
})
// Use RsuIdKey partitioner, which partitions only on the RSU ID, so the partitioning won't actually change
.repartition(
Repartitioned.with(
us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.BsmMessageCountProgressionAggregationKey(),
us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.BsmMessageCountProgressionEvent())
.withStreamPartitioner(new RsuIdPartitioner<>()));
// Plug in the aggregation algorithm
aggregationAlgorithm.buildTopology(builder, aggKeyStream);
} else {
// Don't aggregate events
eventStream.to(parameters.getBsmMessageCountProgressionEventOutputTopicName(),
Produced.with(
us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.BsmRsuIdKey(),
us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.BsmMessageCountProgressionEvent()));
}
return builder.build();
}

// @Override
// public void setAggregationAlgorithm(BsmMessageCountProgressionAggregationAlgorithm aggregationAlgorithm) {
// // Enforce the algorithm being a Streams algorithm
// if (aggregationAlgorithm instanceof BsmMessageCountProgressionAggregationStreamsAlgorithm streamsAlgorithm) {
// this.aggregationAlgorithm = streamsAlgorithm;
// } else {
// throw new IllegalArgumentException("Aggregation algorithm must be a streams algorithm");
// }
// }
// }
@Override
public void setAggregationAlgorithm(BsmMessageCountProgressionAggregationAlgorithm aggregationAlgorithm) {
// Enforce the algorithm being a Streams algorithm
if (aggregationAlgorithm instanceof BsmMessageCountProgressionAggregationStreamsAlgorithm streamsAlgorithm) {
this.aggregationAlgorithm = streamsAlgorithm;
} else {
throw new IllegalArgumentException("Aggregation algorithm must be a streams algorithm");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import us.dot.its.jpo.conflictmonitor.monitor.utils.BsmUtils;
import us.dot.its.jpo.geojsonconverter.partitioner.RsuIdPartitioner;
import us.dot.its.jpo.ode.model.OdeBsmData;
import us.dot.its.jpo.ode.model.OdeBsmMetadata;
import us.dot.its.jpo.ode.plugin.j2735.J2735Bsm;

import static us.dot.its.jpo.conflictmonitor.monitor.algorithms.repartition.RepartitionConstants.DEFAULT_REPARTITION_ALGORITHM;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ public Topology buildTopology() {
return agg;
};

signalStateEvents.print(Printed.toSysOut());

Aggregator<String, StopLinePassageEvent, StopLinePassageAggregator> signalStateEventAggregator =
(key, value, aggregate)-> {
return aggregate.add(value);
Expand Down Expand Up @@ -168,7 +166,6 @@ public Topology buildTopology() {
}
);

notificationEventStream.print(Printed.toSysOut());

KTable<String, StopLinePassageNotification> stopLinePassageNotificationTable =
notificationEventStream.groupByKey(Grouped.with(Serdes.String(), JsonSerdes.StopLinePassageNotification()))
Expand Down
Loading
Loading