Skip to content

Commit

Permalink
Merge pull request #89 from usdot-jpo-ode/spat-timestamp-fix
Browse files Browse the repository at this point in the history
SPAT Timestamp - Latest
  • Loading branch information
John-Wiens authored May 31, 2024
2 parents 487ed45 + de63b99 commit e4a1470
Show file tree
Hide file tree
Showing 15 changed files with 73 additions and 82 deletions.
2 changes: 1 addition & 1 deletion api/jpo-conflictmonitor
Submodule jpo-conflictmonitor updated 28 files
+23 −3 docker-compose.yml
+7 −0 docker/mongo/a_init_replicas.js
+5 −2 docker/mongo/b_create_indexes.js
+13 −0 docker/mongo/manage-volume-cron
+241 −0 docker/mongo/manage_volume.js
+3 −3 jpo-conflictmonitor/pom.xml
+2 −0 jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/RegulatorIntersectionId.java
+4 −5 jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/SpatMap.java
+2 −1 jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/processors/SpatSequenceProcessor.java
+59 −59 ...nitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/MapSpatMessageAssessmentTopology.java
+6 −8 ...itor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/BaseValidationTopology.java
+86 −0 ...monitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/BaseZeroRateChecker.java
+51 −23 ...nitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/MapValidationTopology.java
+31 −0 ...tmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/MapZeroRateChecker.java
+27 −1 ...itor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/SpatValidationTopology.java
+25 −0 ...monitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/SpatZeroRateChecker.java
+1 −0 jpo-conflictmonitor/src/main/resources/application.yaml
+17 −13 ...t/its/jpo/conflictmonitor/monitor/notifications/IntersectionReferenceAlignmentNotificationTopologyTest.java
+2,535 −41 ...java/us/dot/its/jpo/conflictmonitor/monitor/notifications/SignalGroupAlignmentNotificationTopologyTest.java
+0 −6 jpo-deduplicator/pom.xml
+11 −3 jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/DeduplicatorServiceController.java
+20 −0 jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/models/ProcessedMapWktPair.java
+7 −0 jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/serialization/PairSerdes.java
+9 −5 ...tor/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/topologies/ProcessedMapDeduplicatorTopology.java
+134 −0 .../src/main/java/us/dot/its/jpo/deduplicator/deduplicator/topologies/ProcessedMapWktDeduplicatorTopology.java
+7 −0 jpo-deduplicator/src/main/resources/application.yaml
+94 −0 jpo-deduplicator/src/test/java/deduplicator/ProcessedMapWktDeduplicatorTopologyTest.java
+20 −10 sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import us.dot.its.jpo.ode.api.models.IntersectionReferenceData;

public interface ProcessedMapRepository extends DataLoader<ProcessedMap<LineString>>{
Query getQuery(Integer intersectionID, Long startTime, Long endTime,boolean latest);
Query getQuery(Integer intersectionID, Long startTime, Long endTime,boolean latest, boolean compact);

long getQueryResultCount(Query query);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ProcessedMapRepositoryImpl implements ProcessedMapRepository {
private ObjectMapper mapper = DateJsonMapper.getInstance();
private Logger logger = LoggerFactory.getLogger(ProcessedMapRepositoryImpl.class);

public Query getQuery(Integer intersectionID, Long startTime, Long endTime, boolean latest) {
public Query getQuery(Integer intersectionID, Long startTime, Long endTime, boolean latest, boolean compact) {
Query query = new Query();

if (intersectionID != null) {
Expand All @@ -78,8 +78,13 @@ public Query getQuery(Integer intersectionID, Long startTime, Long endTime, bool
query.limit(props.getMaximumResponseSize());
}

if (compact){
query.fields().exclude("recordGeneratedAt", "properties.validationMessages");
}else{
query.fields().exclude("recordGeneratedAt");
}

query.addCriteria(Criteria.where("properties.timeStamp").gte(startTimeString).lte(endTimeString));
query.fields().exclude("recordGeneratedAt");
return query;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.springframework.data.mongodb.core.query.Query;

public interface ProcessedSpatRepository extends DataLoader<ProcessedSpat>{
Query getQuery(Integer intersectionID, Long startTime, Long endTime);
Query getQuery(Integer intersectionID, Long startTime, Long endTime, boolean latest, boolean compact);

long getQueryResultCount(Query query);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
Expand Down Expand Up @@ -40,7 +41,7 @@ public class ProcessedSpatRepositoryImpl implements ProcessedSpatRepository {
private DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
private ObjectMapper mapper = DateJsonMapper.getInstance();

public Query getQuery(Integer intersectionID, Long startTime, Long endTime) {
public Query getQuery(Integer intersectionID, Long startTime, Long endTime, boolean latest, boolean compact) {
Query query = new Query();

if (intersectionID != null) {
Expand All @@ -56,9 +57,21 @@ public Query getQuery(Integer intersectionID, Long startTime, Long endTime) {
if (endTime != null) {
endTimeString = Instant.ofEpochMilli(endTime).toString();
}
query.limit(props.getMaximumResponseSize());
query.addCriteria(Criteria.where("odeReceivedAt").gte(startTimeString).lte(endTimeString));
query.fields().exclude("recordGeneratedAt");

if (latest) {
query.with(Sort.by(Sort.Direction.DESC, "utcTimeStamp"));
query.limit(1);
}else{
query.limit(props.getMaximumResponseSize());
}

if (compact){
query.fields().exclude("recordGeneratedAt", "validationMessages");
}else{
query.fields().exclude("recordGeneratedAt");
}

query.addCriteria(Criteria.where("utcTimeStamp").gte(startTimeString).lte(endTimeString));
return query;
}

Expand All @@ -72,26 +85,20 @@ public List<ProcessedSpat> findProcessedSpats(Query query) {
}

public List<IDCount> getSpatBroadcastRates(int intersectionID, Long startTime, Long endTime){
Query query = getQuery(intersectionID, startTime, endTime);
Query query = getQuery(intersectionID, startTime, endTime, false, true);

query.fields().include("utcTimeStamp");
List<Map> times = mongoTemplate.find(query, Map.class, collectionName);

//List<ZonedDateTime> spats = findProcessedSpats(query);
System.out.println("Retreived Spat List" + times.size());
Map<String, IDCount> results = new HashMap<>();

for(Map doc: times){
//ZonedDateTime time = spat.getUtcTimeStamp();
System.out.println(doc);
ZonedDateTime time = mapper.convertValue(doc.get("utcTimeStamp"), ZonedDateTime.class);
String key = time.format(formatter);

//String key = map.getProperties().getTimeStamp().substring(0,10) + map.getProperties().getTimeStamp().substring(11,13);
if(results.containsKey(key)){
IDCount count = results.get(key);
count.setCount(count.getCount() +1);
//results.put(key, count);
}
else{
IDCount count = new IDCount();
Expand All @@ -101,46 +108,12 @@ public List<IDCount> getSpatBroadcastRates(int intersectionID, Long startTime, L
}
}

System.out.println("Finished Message Parsing");

//AggregationResults<IDCount> result = mongoTemplate.aggregate(aggregation, collectionName, IDCount.class);
//List<IDCount> results = result.getMappedResults();
//results = new ArrayList<IDCount>(results);

List<IDCount> outputCounts = new ArrayList<>(results.values());
for (IDCount r : outputCounts) {
r.setCount((double) r.getCount() / 3600.0);
}
return outputCounts;
// String startTimeString = Instant.ofEpochMilli(0).toString();
// String endTimeString = Instant.now().toString();

// if (startTime != null) {
// startTimeString = Instant.ofEpochMilli(startTime).toString();
// }
// if (endTime != null) {
// endTimeString = Instant.ofEpochMilli(endTime).toString();
// }

// AggregationOptions options = AggregationOptions.builder().allowDiskUse(true).build();

// Aggregation aggregation = Aggregation.newAggregation(
// Aggregation.match(Criteria.where("intersectionId").is(intersectionID)),
// Aggregation.match(Criteria.where("utcTimeStamp").gte(startTimeString).lte(endTimeString)),
// Aggregation.project("utcTimeStamp"),
// Aggregation.project()
// .and(DateOperators.DateFromString.fromStringOf("utcTimeStamp")).as("date"),
// Aggregation.project()
// .and(DateOperators.DateToString.dateOf("date").toString("%Y-%m-%d-%H")).as("dateStr"),
// Aggregation.group("dateStr").count().as("count"),
// Aggregation.sort(Sort.Direction.ASC, "_id")
// ).withOptions(options);

// AggregationResults<IDCount> result = mongoTemplate.aggregate(aggregation, collectionName, IDCount.class);
// List<IDCount> results = result.getMappedResults();
// for (IDCount r: results){
// r.setCount((float)r.getCount() / 3600.0);
// }


}

Expand Down Expand Up @@ -187,5 +160,4 @@ public List<IDCount> getSpatBroadcastRateDistribution(int intersectionID, Long s
public void add(ProcessedSpat item) {
mongoTemplate.save(item, collectionName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ public ResponseEntity<List<ProcessedMap>> findMaps(
@RequestParam(name = "intersection_id", required = false) Integer intersectionID,
@RequestParam(name = "start_time_utc_millis", required = false) Long startTime,
@RequestParam(name = "end_time_utc_millis", required = false) Long endTime,
@RequestParam(name= "latest", required = false, defaultValue = "false") boolean latest,
@RequestParam(name = "test", required = false, defaultValue = "false") boolean testData) {
@RequestParam(name = "latest", required = false, defaultValue = "false") boolean latest,
@RequestParam(name = "test", required = false, defaultValue = "false") boolean testData,
@RequestParam(name = "compact", required = false, defaultValue = "false") boolean compact) {

if (testData) {
return ResponseEntity.ok(MockMapGenerator.getProcessedMaps());
} else {
Query query = processedMapRepo.getQuery(intersectionID, startTime, endTime, latest);
Query query = processedMapRepo.getQuery(intersectionID, startTime, endTime, latest, compact);
long count = processedMapRepo.getQueryResultCount(query);

logger.info("Returning ProcessedMap Response with Size: " + count);
Expand All @@ -71,7 +72,7 @@ public ResponseEntity<Long> countMaps(
if (testData) {
return ResponseEntity.ok(5L);
} else {
Query query = processedMapRepo.getQuery(intersectionID, startTime, endTime, false);
Query query = processedMapRepo.getQuery(intersectionID, startTime, endTime, false, true);
long count = processedMapRepo.getQueryResultCount(query);

logger.info("Found: " + count + "Processed Map Messages");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ public ResponseEntity<List<ProcessedSpat>> findSpats(
@RequestParam(name = "intersection_id", required = false) Integer intersectionID,
@RequestParam(name = "start_time_utc_millis", required = false) Long startTime,
@RequestParam(name = "end_time_utc_millis", required = false) Long endTime,
@RequestParam(name = "latest", required = false, defaultValue = "false") boolean latest,
@RequestParam(name = "compact", required = false, defaultValue = "false") boolean compact,
@RequestParam(name = "test", required = false, defaultValue = "false") boolean testData) {

if (testData) {
return ResponseEntity.ok(MockSpatGenerator.getProcessedSpats());
} else {
Query query = processedSpatRepo.getQuery(intersectionID, startTime, endTime);
Query query = processedSpatRepo.getQuery(intersectionID, startTime, endTime, latest, compact);
long count = processedSpatRepo.getQueryResultCount(query);
logger.info("Returning Processed Spat Response with Size: " + count);
return ResponseEntity.ok(processedSpatRepo.findProcessedSpats(query));
Expand All @@ -70,7 +72,7 @@ public ResponseEntity<Long> countSpats(
if (testData) {
return ResponseEntity.ok(80L);
} else {
Query query = processedSpatRepo.getQuery(intersectionID, startTime, endTime);
Query query = processedSpatRepo.getQuery(intersectionID, startTime, endTime,false, true);
long count = processedSpatRepo.getQueryResultCount(query);
logger.info("Found: " + count + "Processed Spat Messages");
return ResponseEntity.ok(count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import us.dot.its.jpo.conflictmonitor.monitor.models.assessments.ConnectionOfTravelAssessment;
import us.dot.its.jpo.conflictmonitor.monitor.models.assessments.LaneDirectionOfTravelAssessment;
import us.dot.its.jpo.conflictmonitor.monitor.models.assessments.SignalStateAssessment;
import us.dot.its.jpo.conflictmonitor.monitor.models.assessments.StopLinePassageAssessment;
import us.dot.its.jpo.conflictmonitor.monitor.models.assessments.StopLineStopAssessment;
import us.dot.its.jpo.ode.api.accessors.assessments.ConnectionOfTravelAssessment.ConnectionOfTravelAssessmentRepository;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ public void testProcessedMap() {

List<ProcessedMap> list = MockMapGenerator.getProcessedMaps();

Query query = processedMapRepo.getQuery(null, null, null, false);
Query query = processedMapRepo.getQuery(null, null, null, false, false);
when(processedMapRepo.findProcessedMaps(query)).thenReturn(list);

ResponseEntity<List<ProcessedMap>> result = controller.findMaps(null, null, null, false, false);
ResponseEntity<List<ProcessedMap>> result = controller.findMaps(null, null, null, false, false, false);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo(list);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.Collection;
import java.util.Set;


import org.mockito.Mockito;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ public void testProcessedSpat() {

List<ProcessedSpat> list = MockSpatGenerator.getProcessedSpats();

Query query = processedSpatRepo.getQuery(null, null, null);
Query query = processedSpatRepo.getQuery(null, null, null, false, false);
when(processedSpatRepo.findProcessedSpats(query)).thenReturn(list);

ResponseEntity<List<ProcessedSpat>> result = controller.findSpats(null, null, null, false);
ResponseEntity<List<ProcessedSpat>> result = controller.findSpats(null, null, null, false,false,false);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
// assertThat(result.getHeaders().getContentType()).isEqualTo(MediaType.APPLICATION_JSON);
assertThat(result.getBody()).isEqualTo(list);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testGetQuery() {

boolean latest = true;

Query query = repository.getQuery(intersectionID, startTime, endTime, latest);
Query query = repository.getQuery(intersectionID, startTime, endTime, latest, false);


// Assert IntersectionID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void setUp() {
@Test
public void testGetQuery() {

Query query = repository.getQuery(intersectionID, startTime, endTime);
Query query = repository.getQuery(intersectionID, startTime, endTime, false, false);



Expand All @@ -60,7 +60,7 @@ public void testGetQuery() {


// Assert Start and End Time
Document queryTimeDocument = (Document)query.getQueryObject().get("odeReceivedAt");
Document queryTimeDocument = (Document)query.getQueryObject().get("utcTimeStamp");
assertThat(queryTimeDocument.getString("$gte")).isEqualTo(Instant.ofEpochMilli(startTime).toString());
assertThat(queryTimeDocument.getString("$lte")).isEqualTo(Instant.ofEpochMilli(endTime).toString());

Expand Down
6 changes: 6 additions & 0 deletions gui/src/apis/mm-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,24 @@ class MessageMonitorApi {
roadRegulatorId,
startTime,
endTime,
latest,
compact,
}: {
token: string;
intersectionId: number;
roadRegulatorId: number;
startTime?: Date;
endTime?: Date;
latest?: boolean;
compact?: boolean;
}): Promise<ProcessedSpat[]> {
const queryParams: Record<string, string> = {};
queryParams["intersection_id"] = intersectionId.toString();
queryParams["road_regulator_id"] = roadRegulatorId.toString();
if (startTime) queryParams["start_time_utc_millis"] = startTime.getTime().toString();
if (endTime) queryParams["end_time_utc_millis"] = endTime.getTime().toString();
if (latest) queryParams["latest"] = latest.toString();
if (compact) queryParams["compact"] = compact.toString();

var response = await authApiHelper.invokeApi({
path: "/spat/json",
Expand Down
37 changes: 22 additions & 15 deletions gui/src/components/map/map-component.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ const getTimestamp = (dt: any): number => {
try {
const dtFromString = Date.parse(dt as any as string);
if (isNaN(dtFromString)) {
if (dt > 1000000000000) {
return dt; // already in milliseconds
} else {
return dt * 1000;
}
if (dt > 1000000000000) {
return dt; // already in milliseconds
} else {
return dt * 1000;
}
} else {
return dtFromString;
}
Expand Down Expand Up @@ -240,6 +240,7 @@ const generateQueryParams = (
endDate: new Date(Date.now() + endOffset),
eventDate: new Date(Date.now()),
vehicleId: undefined,
default: true,
};
}
};
Expand All @@ -266,6 +267,7 @@ const MapTab = (props: MyProps) => {
vehicleId?: string;
intersectionId?: number;
roadRegulatorId?: number;
default?: boolean;
}>({
...generateQueryParams(props.sourceData, props.sourceDataType),
intersectionId: props.intersectionId,
Expand Down Expand Up @@ -788,6 +790,21 @@ const MapTab = (props: MyProps) => {
let rawMap: ProcessedMap[] = [];
let rawSpat: ProcessedSpat[] = [];
let rawBsm: OdeBsmData[] = [];
if (queryParams.default == true) {
const latestSpats = await MessageMonitorApi.getSpatMessages({
token: session?.accessToken,
intersectionId: queryParams.intersectionId,
roadRegulatorId: queryParams.roadRegulatorId,
latest: true,
});
if (latestSpats && latestSpats.length > 0) {
setQueryParams({
...generateQueryParams({ timestamp: getTimestamp(latestSpats.at(-1)?.utcTimeStamp) }, "timestamp"),
intersectionId: queryParams.intersectionId,
roadRegulatorId: queryParams.roadRegulatorId,
});
}
}
if (importedMessageData == undefined) {
// ######################### Retrieve MAP Data #########################
const rawMapPromise = MessageMonitorApi.getMapMessages({
Expand Down Expand Up @@ -833,11 +850,6 @@ const MapTab = (props: MyProps) => {
queryParams.startDate,
queryParams.endDate
);
toast.promise(surroundingEventsPromise, {
loading: `Loading Event Data`,
success: `Successfully got Event Data`,
error: `Failed to get Event data. Please see console`,
});
surroundingEventsPromise.then((events) => setSurroundingEvents(events));

// ######################### BSM Events By Minute #########################
Expand All @@ -863,11 +875,6 @@ const MapTab = (props: MyProps) => {
startTime: queryParams.startDate,
endTime: queryParams.endDate,
});
toast.promise(surroundingNotificationsPromise, {
loading: `Loading Notification Data`,
success: `Successfully got Notification Data`,
error: `Failed to get Notification data. Please see console`,
});
surroundingNotificationsPromise.then((notifications) => setSurroundingNotifications(notifications));
} else {
rawMap = importedMessageData.mapData;
Expand Down

0 comments on commit e4a1470

Please sign in to comment.