Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring Kafka Integration, Partial J2735 2024 Compatibility, and Developer Experience Enhancements #562

Merged
merged 318 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
318 commits
Select commit Hold shift + click to select a range
6538eac
Extract environment variables from OdeProperties into Spring Configur…
mcook42 Nov 14, 2024
c3333d4
Address polish comments
drewjj Nov 15, 2024
b9e6ae1
Merge pull request #117 from CDOT-CV/ssm-processing-fix
drewjj Nov 15, 2024
e5187ba
Add Approval Tests for MAP Data Flows (#116)
mcook42 Nov 18, 2024
29f4f38
Merge branch 'dev' into fix/update-circle-json-properties
dmccoystephenson Nov 20, 2024
3cb982b
Added missing imports in TimTransmogrifierTest
dmccoystephenson Nov 20, 2024
a089aa7
Apply suggestions from code review
dmccoystephenson Nov 20, 2024
7a8f7bf
Merge pull request #114 from CDOT-CV/fix/update-circle-json-properties
dmccoystephenson Nov 20, 2024
898638d
Updated TravelerMessageFromHumanToAsnConverter to ensure compliance w…
dmccoystephenson Nov 20, 2024
d27207b
Removed error handling from unit tests in `TravelerMessageFromHumanTo…
dmccoystephenson Nov 20, 2024
c828175
Removed access modifiers from `TravelerMessageFromHumanToAsnConverte…
dmccoystephenson Nov 20, 2024
2e1c179
Add TIM POJO and builder
drewjj Nov 21, 2024
5effd8a
Modify TimCreator to use the new creator. Add and update unit tests
drewjj Nov 25, 2024
69f5137
Merge remote-tracking branch 'origin/dev' into tim-pojo-rework
drewjj Nov 25, 2024
d93e6ec
Update schemas
drewjj Nov 25, 2024
6848ede
Add support for unsigned TIMs to go to the TMC filtered topic
drewjj Nov 25, 2024
8b8de0c
Update unit tests to address recent changes
drewjj Nov 26, 2024
1973dc7
Updated models to ensure compliance with J2735 2024
dmccoystephenson Nov 26, 2024
6f700e2
Spring Kafka Proof-of-Concept - Implemented in MAP Data Flow (#118)
mcook42 Nov 26, 2024
d6a038d
Switching adopt builder to temurin
John-Wiens Nov 26, 2024
11063ba
Merge branch 'dev' into j2735/update-to-2024-revision
dmccoystephenson Nov 27, 2024
7c5f91a
Cleaned up a TimTransmogrifier test by extracting some helper methods
dmccoystephenson Nov 27, 2024
9317895
Updated expected output for a TimTransmogrifier test to comply with J…
dmccoystephenson Nov 27, 2024
e05405f
Updated TIM schema to comply with J2735 2024
dmccoystephenson Nov 27, 2024
796e5ce
Updated data files to comply with J2735 2024
dmccoystephenson Nov 27, 2024
247c20b
Updated docs to comply with J2735 2024
dmccoystephenson Nov 27, 2024
092c9a5
Replaced older fields with doNotUseX to comply with J2735 2024
dmccoystephenson Nov 27, 2024
7c9c2da
Merge pull request #121 from John-Wiens/adopt-workflow-fix
drewjj Nov 27, 2024
623acf6
Updated expected output in tests for TravelerMessageFromHumanToAsnCon…
dmccoystephenson Nov 27, 2024
80a061f
Implemented unit tests for converting pre-2024 TIM requests to comply…
dmccoystephenson Nov 27, 2024
641ae07
Introduce Checkstyle Linter to GH pipeline and provide steps for loca…
mcook42 Nov 27, 2024
99b156f
Merge branch 'dev' of github.com:CDOT-CV/jpo-ode into j2735/update-to…
dmccoystephenson Nov 27, 2024
41a93df
created failing test for OdeTravelerInputDataTest to validate serdes
dmccoystephenson Nov 27, 2024
046c7b1
added JsonAlias annotations to OdeTravelerInformationMessage.java to …
dmccoystephenson Nov 27, 2024
3a9c298
generated hashCode and equals methods for all subclasses needed to te…
dmccoystephenson Nov 27, 2024
420acc1
Finished moving conversion tests to jpo-ode-core and adding json alia…
dmccoystephenson Dec 1, 2024
5f3966a
Fixed parameter reference in InetPacketSender
dmccoystephenson Dec 1, 2024
56ae0cb
Removed unused imports
dmccoystephenson Dec 1, 2024
c37684d
Updated data types in json strings in TravelerMessageFromHumanToAsnCo…
dmccoystephenson Dec 1, 2024
397df56
Modified TravelerMessageFromHumanToAsnConverter to throw exception if…
dmccoystephenson Dec 1, 2024
07ab12d
Updated major version to 4 due to breaking changes with J2735 2024
dmccoystephenson Dec 1, 2024
53514f2
Added placeholder to release notes for 4.0.0
dmccoystephenson Dec 1, 2024
cac7459
Reverted changes to archived files
dmccoystephenson Dec 2, 2024
bfab325
Removed unnecessary getters & setters from OdeTravelerInputData
dmccoystephenson Dec 2, 2024
c7819e0
Removed unused imports from ServiceRequest
dmccoystephenson Dec 2, 2024
9a15b9b
Removed unused fields in TravelerMessageFromHumanToAsnConverter
dmccoystephenson Dec 2, 2024
e6b0d3b
Updated description of `convertTravelerInputDataToEncodableTim()` met…
dmccoystephenson Dec 2, 2024
642b2e0
Reverted changes to deprecated files
dmccoystephenson Dec 2, 2024
1a41bbc
Updated testing data
dmccoystephenson Dec 2, 2024
da4f03e
Updated types of notUsedX fields in TIM schema
dmccoystephenson Dec 2, 2024
c7af0f2
Updated testing data used in OdeTimDataTest
dmccoystephenson Dec 2, 2024
884aa85
Updated encoded TIM in `udpsender_tim.py`
dmccoystephenson Dec 2, 2024
76c833d
Removed unnecessary comments in OdeTravelerInputDataTest helper method
dmccoystephenson Dec 2, 2024
f311752
Moved to using `@EqualsAndHashCode` annotation in OdeTravelerInputDat…
dmccoystephenson Dec 2, 2024
3b9a0c1
Defined `project.ode.version` property in pom.xml
dmccoystephenson Dec 2, 2024
b94e5d0
Reverted changes to pom.xml
dmccoystephenson Dec 2, 2024
af8d0d0
Added `@JsonAlias` and `@EqualsAndHashCode` annotations to TravelerDa…
dmccoystephenson Dec 2, 2024
82bd2a7
Formatted aemInputContainingCircleGeometry.xml for ease of review & u…
dmccoystephenson Dec 2, 2024
595f47c
Remove the array from the computed lane
drewjj Dec 2, 2024
f3d5cf6
Add dynamic serializing and deserializing of TIM messages with new dy…
drewjj Dec 3, 2024
d01ae2a
Formatted modified java files
dmccoystephenson Dec 3, 2024
e5f447f
add missing confluent.password and confluent.username to application.…
mcook42 Dec 3, 2024
b094a6c
swap ODE_KAFKA_BROKERS in for DOCKER_HOST_IP for kafka configs. updat…
mcook42 Dec 3, 2024
270e5bb
fixing confluent authentication properties and updating environmental…
Michael7371 Dec 3, 2024
0e5223a
Merge branch 'mcook42/spring-kafka/confluent-config-props' of https:/…
Michael7371 Dec 3, 2024
0cb3b29
Merge pull request #123 from CDOT-CV/mcook42/spring-kafka/confluent-c…
Michael7371 Dec 3, 2024
5c1892e
Add @JsonIgnore to fields on asn1 type base classes
iyourshaw Dec 3, 2024
f4a85e6
Serialize Bitstrings as verbose maps for ODE JSON
iyourshaw Dec 3, 2024
3603e0f
Set extension presence booleans to jsonignore annotations
drewjj Dec 4, 2024
7c991f5
BitString serialization rules to build them out as verbose boolean se…
drewjj Dec 4, 2024
c5e7bda
Bitstring deserializer for verbose ODE json
iyourshaw Dec 4, 2024
578e2cc
Remove diagnostic logging statements
iyourshaw Dec 4, 2024
651941e
Remove printf statements
iyourshaw Dec 4, 2024
afee483
Set tim udp buffer size to 2048. Add complex tim example to script
iyourshaw Dec 4, 2024
235f51a
Complex example tims
iyourshaw Dec 4, 2024
ef385c6
Comment manual edits
iyourshaw Dec 4, 2024
2568fab
Java style: use Lobok for getters/setters, capitalize enum items
iyourshaw Dec 5, 2024
f223988
Use 2020/2024 modules as package names. Package names lower case.
iyourshaw Dec 5, 2024
ff5a742
Fix test namespace
iyourshaw Dec 5, 2024
29d7bf6
Merge remote-tracking branch 'iy/tim-pojo-rework' into tim-pojo-rework
drewjj Dec 5, 2024
c726baa
Remove unused older POJOs and update the TIM test with a better XML t…
drewjj Dec 5, 2024
2b8ebcb
Merge remote-tracking branch 'origin/dev' into tim-pojo-rework
drewjj Dec 5, 2024
37e5399
Add test for NodeAttributeSetLL. Fix JSON serialization of dElevatio…
iyourshaw Dec 5, 2024
8ec1957
Updated ensureComplianceWithJ2735Revision2024() in TravelerMessageFro…
dmccoystephenson Dec 5, 2024
795fef6
Pulled object mapper out of individual tests in TravelerDataFrameTest
dmccoystephenson Dec 5, 2024
b643ea6
Updated TravelerMessageFromHumanToAsnConverter to throw custom Noncom…
dmccoystephenson Dec 5, 2024
80064d6
Addressed checkstyle warnings for TimDepositController
dmccoystephenson Dec 5, 2024
dfd600f
Addressed checkstyle warnings for TravelerMessageFromHumanToAsnConver…
dmccoystephenson Dec 5, 2024
08292e5
Addressed checkstyle warnings for RegulatorySpeedLimit, NodeSetXY, No…
dmccoystephenson Dec 5, 2024
f46bcb6
Addressed checkstyle warnings for LaneDataAttributeList, Area, Region…
dmccoystephenson Dec 5, 2024
d6d1942
Addressed checkstyle warnings for Content, Description, SpeedLimitLis…
dmccoystephenson Dec 5, 2024
3abe4e3
use KafkaType instead of Producer Type so that confluent is configure…
mcook42 Dec 5, 2024
3ce6931
Addressed checkstyle warnings for NodeAttributeSetXY, ValidRegion, Of…
dmccoystephenson Dec 5, 2024
921b05e
Merge remote-tracking branch 'origin/dev' into tim-pojo-rework
drewjj Dec 6, 2024
f676a6b
Code styling changes to appease the code styling lords
drewjj Dec 6, 2024
2fc4615
Update checkstyle xml
drewjj Dec 6, 2024
ce5538a
Even more code styling changes
drewjj Dec 6, 2024
fb4f577
Last bit of code styling. Please.
drewjj Dec 6, 2024
abce4e5
Update the TIM schema and resolve some test issues
drewjj Dec 6, 2024
dc0e975
Addressed checkstyle warnings for InetPacketSender, TimDepositControl…
dmccoystephenson Dec 6, 2024
1d1b61c
Merge branch 'dev' into j2735/update-to-2024-revision
dmccoystephenson Dec 6, 2024
1851067
Addressed checkstyle warnings for TimDepositControllerTest & TimTrans…
dmccoystephenson Dec 6, 2024
48fea3a
Regenerated TIM classes: Fix for sequence-of choice serialization to …
iyourshaw Dec 9, 2024
a397df0
itis, region, fix tests
iyourshaw Dec 9, 2024
92c4504
Update attribution documentation to note where the TIM POJOs came from
drewjj Dec 9, 2024
8c35ad2
Address pull request comments
drewjj Dec 9, 2024
7dccf75
Update the imported assertEquals
drewjj Dec 9, 2024
9e48c45
update to TimData class to allow for Tim payload serialization to wor…
Michael7371 Dec 10, 2024
cd232b6
Merge pull request #119 from CDOT-CV/tim-pojo-rework
drewjj Dec 11, 2024
d7fc79a
Migrate UDPRecievers to use Spring Kafka (#129)
mcook42 Dec 11, 2024
e03605e
Migrate Asn1DecodedDataRouter to use Spring Kafka (#131)
mcook42 Dec 13, 2024
9059b15
Merge branch 'dev' into j2735/update-to-2024-revision
dmccoystephenson Dec 13, 2024
43e0333
Finishing addressing merge conflicts in TIM_test.json, TimDepositCont…
dmccoystephenson Dec 13, 2024
05108c5
Addressed checkstyle warnings in TimDepositController & TimDepositCon…
dmccoystephenson Dec 16, 2024
bac7f0a
Update POM version
drewjj Dec 16, 2024
cb5e446
Update last pom
drewjj Dec 16, 2024
c3ce7bb
Remove the "SNAPSHOT" from the version
drewjj Dec 16, 2024
402a84e
Update run bat and sh scripts
drewjj Dec 16, 2024
d2a6812
Updated new TravelerDataFrame POJO to be compliant with J2735 2024 & …
dmccoystephenson Dec 16, 2024
24e7789
Merge branch 'dev' into j2735/update-to-2024-revision
dmccoystephenson Dec 16, 2024
0d900a3
Updated test data used in Asn1DecodedDataRouterTest for TIM data flow
dmccoystephenson Dec 16, 2024
4221056
Updated comment in TravelerMessageFromHumanToAsnConverter
dmccoystephenson Dec 16, 2024
d94cbca
Added `@JsonAlias` to new TravelerDataFrame POJO & accompanying unit …
dmccoystephenson Dec 16, 2024
52e8726
Added unit test to create TIM from decoded 2016 message
dmccoystephenson Dec 16, 2024
5ce6ee2
Merge pull request #132 from CDOT-CV/version-update-4.0.0
dmccoystephenson Dec 16, 2024
363b834
Merge branch 'dev' into j2735/update-to-2024-revision
dmccoystephenson Dec 16, 2024
b89b98c
Updated comments in TravelerMessageFromHumanToAsnConverter.buildItem(…
dmccoystephenson Dec 16, 2024
cb72731
Removed unnecessary empty comments in TravelerMessageFromHumanToAsnCo…
dmccoystephenson Dec 16, 2024
9e126c8
Added `@Data` annotation to Anchor class
dmccoystephenson Dec 17, 2024
9e27a56
Added `@Data` and `@EqualsAndHashCode` annotations to all classes in …
dmccoystephenson Dec 17, 2024
854f7ad
Merge pull request #122 from CDOT-CV/j2735/update-to-2024-revision
payneBrandon Dec 17, 2024
072cffd
Migrate AsnCodecMessageServiceController to Spring Kafka (#134)
mcook42 Dec 20, 2024
6c2914a
Update the schema version to 8 for the output messages
drewjj Dec 23, 2024
5aa498a
Update formatting for checkstyle
drewjj Dec 24, 2024
cc850ac
adding caching to github actions runners
Michael7371 Dec 24, 2024
14d1bb2
Update docs/schemas/README.md
drewjj Dec 29, 2024
d59c4ac
Updated schema constant requirement and associated tests
drewjj Dec 29, 2024
cb51b7b
Merge branch 'fix/schema-version-update' of https://github.com/CDOT-C…
drewjj Dec 29, 2024
bc59dbe
Merge pull request #137 from CDOT-CV/github-actions-caching
Michael7371 Dec 30, 2024
b0ed4a0
Remove outdated comment
drewjj Dec 30, 2024
db4b514
Data flow diagram updates - Spring Kafka related (#140)
mcook42 Dec 30, 2024
9ab6086
Merge pull request #136 from CDOT-CV/fix/schema-version-update
drewjj Dec 31, 2024
31768cd
updating the asn1 codec submodule reference to the 2024 j2735 support
Michael7371 Jan 3, 2025
2eb57ea
Merge pull request #141 from CDOT-CV/fix/asn1_codec_2024-submodule-ref
drewjj Jan 3, 2025
9f266ff
Updated `compatibility.md` for 2025 Q1 release
dmccoystephenson Jan 6, 2025
ad22001
Updated new version for jpo-geojsonconverter in `compatibility.md`
dmccoystephenson Jan 6, 2025
f0e25b4
chore: set scope to provided for annotation processors (#142)
mcook42 Jan 6, 2025
9385a6d
Updated new version for jpo-conflictmonitor in compatibility.md
dmccoystephenson Jan 6, 2025
b0d52cc
Updated `Release_notes.md` for 2025 Q1 release
dmccoystephenson Jan 6, 2025
6534bc5
Add hyphens to the nodexy and nodell objects since it is the correct …
drewjj Jan 7, 2025
b01612b
feat: add Spring Actuator dependency and update configuration
mcook42 Jan 7, 2025
fa2ba93
feat: add trace ID handling for improved observability
mcook42 Jan 7, 2025
a64da09
feat: enable observation in Kafka consumers
mcook42 Jan 7, 2025
abd8baa
Added CDOT PR 145 to 2025 Q1 release notes
dmccoystephenson Jan 7, 2025
722044c
chore: set retries to 1 to reduce noise in logs
mcook42 Jan 7, 2025
f305d33
chore: delete swap
mcook42 Jan 8, 2025
1d7c1a6
Updated jpo-utils git submodule reference to latest commit in CDOT de…
dmccoystephenson Jan 8, 2025
5bfaa70
Added CDOT PR 146 to 2025 Q1 release notes
dmccoystephenson Jan 8, 2025
a56f68e
Merge pull request #146 from CDOT-CV/git/update-jpo-utils-commit-refe…
dmccoystephenson Jan 8, 2025
5c7ccf0
Implemented failing unit test to ensure no missing bytes after retrie…
dmccoystephenson Jan 8, 2025
1d17934
Updated UdpHexDecoder and UperUtil to ensure no missing bytes when re…
dmccoystephenson Jan 8, 2025
b77f0bc
refactor: increase producer retries to 2
mcook42 Jan 8, 2025
3dc0349
refactor: reduce listener retries to zero until we can properly handl…
mcook42 Jan 8, 2025
3418552
Removed UperUtil.stripTrailingZeros, updated decoder input JSON for e…
dmccoystephenson Jan 8, 2025
a7df9b6
Updated UdpHexDecoder.getPayloadHexString() for clarity & updated uni…
dmccoystephenson Jan 8, 2025
e93a213
Updated javadocs for UdpHexDecoder.retrieveRelevantBytes
dmccoystephenson Jan 8, 2025
f4ddc84
Formatted changed java files
dmccoystephenson Jan 8, 2025
b09c66c
Formatted UperUtil.java
dmccoystephenson Jan 8, 2025
8b9ff7d
Reinserted headers into the modified test data
dmccoystephenson Jan 8, 2025
11b5c09
Added CDOT PR 147 to 2025 Q1 release notes
dmccoystephenson Jan 8, 2025
40159ea
chore: delete unnecessary CorrelationIDHandler.java
mcook42 Jan 8, 2025
e849542
chore: remove value from kafka logging to reduce noise
mcook42 Jan 8, 2025
28cc28f
updating jpo utils reference to fix env var requirementes
Michael7371 Jan 8, 2025
e9aebbf
Merge pull request #145 from CDOT-CV/hotfix/tim-schema-v8
dmccoystephenson Jan 8, 2025
dbcec81
Merge pull request #148 from CDOT-CV/git/update-jpo-utils
dmccoystephenson Jan 8, 2025
cffdde9
Added CDOT PR 148 to 2025 Q1 release notes
dmccoystephenson Jan 8, 2025
e781a63
Update schema-tim.json with TIM deposit request metadata field value …
drewjj Jan 9, 2025
336b907
Removed unnecessary else block in UperUtil
dmccoystephenson Jan 9, 2025
76e33d9
update to jpo utils to have sample passwords to reduce docker compose…
Michael7371 Jan 9, 2025
489ce54
Added CDOT PR 149 to 2025 Q1 release notes
dmccoystephenson Jan 9, 2025
59275bb
Merge pull request #149 from CDOT-CV/git/update-jpo-utils
dmccoystephenson Jan 9, 2025
c09d420
Update docs/Release_notes.md
dmccoystephenson Jan 9, 2025
55d630b
chore: turn off consumer observation
mcook42 Jan 9, 2025
5901177
refactor: set producerPerThread and include execption message in fail…
mcook42 Jan 9, 2025
f671b5a
refactor: set producerPerThread in all factories
mcook42 Jan 9, 2025
f3379bd
chore: remove hardcoded log levels from logback.xml
mcook42 Jan 9, 2025
5df2614
chore: observability dependencies from pom and config
mcook42 Jan 9, 2025
2c73e69
chore: undo setProducerPerThread(true)
mcook42 Jan 9, 2025
94e5039
Added CDOT PR 151 to 2025 Q1 release notes
dmccoystephenson Jan 9, 2025
7352e74
Merge pull request #151 from CDOT-CV/mcook42/chore/kafka-and-logging-…
dmccoystephenson Jan 9, 2025
d65f56c
Merge branch 'dev' into udp/convert-packet-bytes-based-on-length
dmccoystephenson Jan 9, 2025
3dd3ed6
perf: set linger.ms to 1 millis to prevent large batching and reduce …
mcook42 Jan 9, 2025
6a4d951
Update map schema elevation to be a String
drewjj Jan 9, 2025
f112784
Update the tim request odeposition3d to be different due to the field…
drewjj Jan 9, 2025
e59d3fe
Update Release_notes.md
dmccoystephenson Jan 9, 2025
e0dfc08
Merge pull request #152 from CDOT-CV/fix/schema-request-metdata
dmccoystephenson Jan 9, 2025
05b60dc
Added CDOT PR 153 to 2025 Q1 release notes
dmccoystephenson Jan 10, 2025
3937a8e
Merge pull request #153 from CDOT-CV/mcook42/fix/producer-failures
dmccoystephenson Jan 10, 2025
e9ac777
use odeKafkaProperties env vars to drive producer retries
mcook42 Jan 10, 2025
f2f4f5e
Added CDOT PR 154 to 2025 Q1 release notes
dmccoystephenson Jan 10, 2025
03d6731
Merge pull request #154 from CDOT-CV/mcook42/fix/producer-config
drewjj Jan 10, 2025
b1d1e35
Moved contents of `ReleaseNotes.md` to `docs/Release_notes.md`
dmccoystephenson Jan 10, 2025
e34ecf0
Merge pull request #143 from CDOT-CV/docs/update-version-compatibilit…
dmccoystephenson Jan 10, 2025
740921e
Merge pull request #147 from CDOT-CV/udp/convert-packet-bytes-based-o…
dmccoystephenson Jan 12, 2025
b988e33
Update docs/Release_notes.md
dmccoystephenson Jan 12, 2025
0b475bb
Update docs/Release_notes.md
dmccoystephenson Jan 12, 2025
e584738
Merge pull request #144 from CDOT-CV/docs/2025-q1/update-release-notes
dmccoystephenson Jan 12, 2025
4f375d7
update to fix PSM schema "recordGeneratedBy" field
Michael7371 Jan 16, 2025
736fe61
added a unit test that failed on old schema
Michael7371 Jan 16, 2025
08a2e58
address checkstyle violations
Michael7371 Jan 16, 2025
97b1c2f
adding PSM schema fix to the release notes
Michael7371 Jan 16, 2025
3a95b03
Update docs/Release_notes.md
Michael7371 Jan 16, 2025
501a1f5
Merge pull request #158 from CDOT-CV/release/psm-schema-fix
drewjj Jan 16, 2025
6839a93
Updated BSM schema to address field renamings in J2735 2024
dmccoystephenson Jan 19, 2025
3428ed1
Updated BSM-related POJOs to address field renamings in J2735 2024
dmccoystephenson Jan 19, 2025
585d36e
Updated 2025 Q1 release notes to include details on partial compatibi…
dmccoystephenson Jan 20, 2025
fa102d9
Reverted change to header for v3 release notes
dmccoystephenson Jan 20, 2025
d986aec
style: address checkstyle warnings in OdeBsmDataCreatorHelperTest
mcook42 Jan 20, 2025
452a417
refactor: simplify test flow in OdeBsmDataCreatorHelperTest
mcook42 Jan 20, 2025
0dcdf1d
test: add BSM validation to assert generated BSM aligns with schema
mcook42 Jan 20, 2025
1cf577f
Pulled XML into separate file from OdeBsmDataCreatorHelperTest
dmccoystephenson Jan 20, 2025
5cfba96
Updated BsmBuilder partII reference, added a unit test for single par…
dmccoystephenson Jan 21, 2025
f280828
Updated test data in BsmBuilderTest.java
dmccoystephenson Jan 21, 2025
161faa7
Addressed checkstyle comments
dmccoystephenson Jan 21, 2025
917795d
Added `@Data` and `@EqualsAndHashCode` annotations to J2735Supplement…
dmccoystephenson Jan 21, 2025
24a9652
Commented out `testCreateOdeBsmData_ThreePartIIExtensions` test case …
dmccoystephenson Jan 21, 2025
eb8b82d
Pulled JSON into separate files for J2735BsmPart2ContentTest.java
dmccoystephenson Jan 21, 2025
bc08490
Pulled test data into separate files for BsmBuilderTest.java
dmccoystephenson Jan 21, 2025
3a38c8a
Added assertion to BsmBuilderTest.shouldTranslateBsm to ensure partII…
dmccoystephenson Jan 21, 2025
14efc44
Added CDOT PR 160 to 2025 Q1 release notes
dmccoystephenson Jan 21, 2025
2da8ff1
Updated wording in 2025 Q1 release notes
dmccoystephenson Jan 21, 2025
f7fe9a1
Added link to usdot-asn1c github issue
dmccoystephenson Jan 21, 2025
fc2e9c6
Updated multi-partII-extension test in OdeBsmDataCreatorHelperTest.java
dmccoystephenson Jan 21, 2025
24132b2
Pulled partII extension definitions into separate schemas for `schema…
dmccoystephenson Jan 21, 2025
2e8b4d1
Updated SupplementalVehicleExtensions schema definition to make doNot…
dmccoystephenson Jan 21, 2025
de3c9f8
Reverted license headers & switched to normal-comment format
dmccoystephenson Jan 21, 2025
7bd94c1
Updated `@EqualsAndHashCode` annotation for J2735VehicleData class to…
dmccoystephenson Jan 21, 2025
13dbb00
Updated 2025 Q1 release notes to include specific unsupported fields
dmccoystephenson Jan 21, 2025
cfbd4d7
Updated 2025 Q1 release notes to include unsupported new fields for S…
dmccoystephenson Jan 21, 2025
c9466c9
Updated comment in BsmBuilder
dmccoystephenson Jan 22, 2025
cfe7caa
Merge pull request #160 from CDOT-CV/j2735/bsm/update-schema-and-pojos
dmccoystephenson Jan 22, 2025
a9ed9bb
Merge pull request #159 from CDOT-CV/rel2025q1/update-release-notes-f…
dmccoystephenson Jan 22, 2025
01b228f
fix: force BigDecimal serialization to NUMBER format
mcook42 Jan 22, 2025
411bbd7
fix: force BigDecimal serialization to NUMBER format in Serialization…
mcook42 Jan 22, 2025
6463f51
fix: set elevation type to number in schema-map.json
mcook42 Jan 22, 2025
137b777
Added CDOT PR 165 to 2025 Q1 release notes
dmccoystephenson Jan 22, 2025
acbe6b7
Merge branch 'mcook42/hotfix/set-big-decimal-serialization-2' of gith…
dmccoystephenson Jan 22, 2025
58f80f0
chore: correct typo in comment
mcook42 Jan 22, 2025
e089177
Merge branch 'mcook42/hotfix/set-big-decimal-serialization-2' of http…
mcook42 Jan 22, 2025
d200673
Merge pull request #165 from CDOT-CV/mcook42/hotfix/set-big-decimal-s…
drewjj Jan 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Spring Kafka Proof-of-Concept - Implemented in MAP Data Flow (#118)
* unify tests in Asn1DecodedDataRouterApprovalTest and prepend TEST to topic names in Asn1DecodeMAPJSONTest for unique topic naming to avoid topic name conflicts across tests

* add spring.application.name to application.yaml to support Spring Boot kafka consumer and producer automatic naming

* Add Asn1DecodeMAPJSONListener and KafkaConsumerConfig to use Spring Kafka listener for Asn1DecodeMAPJSONTest. Test is not passing, but is proving that the new listener consumes the message as expected

* comment out outputSchemaVersion in OdeProperties and add Import annotation for BuildProperties.class to allow usage of SpringBootTest annotations.

BuildProperties wasn't found unless mvn package was run prior to testing. That's slow and not intuitive, so the annotation fixed the issue. Commenting out outputSchemaVersion allowed for setting OdeProperties as a Component instead of a Configuration class, since that's essentially what it is, and we don't need output schema to be served from a bean in the long run. It can be used as a Value annotated field wherever needed instead

* Use EmbeddedKafkaHolder for single EmbeddedKafka instance across test cases and migrate Asn1DecodeMAPJSONTest to use it

* Implement KafkaProducerConfig and XMLOdeObjectSerializer to support producing OdeObjects. Not correct for use case, but does prove we can use KafkaTemplates alongside hand-rolled producers

* Set kafka broker address to localhost:9093 in test application.yaml to ensure EmbeddedKafka broker doesn't fail to start up when a kafka broker is already running in a dev container on 9092

* Set kafka broker address to localhost:4242 in test application.yaml to ensure EmbeddedKafka broker doesn't fail to start up when a kafka broker is already running in a dev container on 9092 and 9093

* set kafka.consumer and kafka.producer properties in application.yaml to defaults from MessageConsumer and MessageProducer to enable correct startup connections.

This proves we can simultaneously run Spring Kafka with our hand-rolled kafka implementations!

* increase TimIngestTrackerProperies default interval from 1 to 10 seconds for test runs to reduce log noise

* remove Asn1DecodedDataRouter from Asn1DecodedDataRouterApprovalTest since SpringBootTest loads the beans into the context automatically now

* use asn1 encoded hex strings as inputs for MapReceiverTest instead of plain json objects

* replace hand-rolled producer in MapReceiver with KafkaTemplate

* simplify test setup by using SpringBootTest annotation and the EmbeddedKafkaBrokerHolder in MapReceiverTest

* use the correct test group name when creating test consumer in MapReceiverTest

* comment out MAP routing in Asn1DecodedDataRouter to prove Asn1DecodedDataRouterApprovalTest fails before implementation of spring kafka consumer-producer

* Move all Topics configuration objects into subpackage named topics

* Move Asn1DecodeMAPJSONListener to kafka.listeners package

* Add UnsupportedDataTypeException to prevent acknowledgment of MAP consumption by Asn1DecodedDataRouter

* Add filtering consumer factory to support processing of MAP messages in Asn1DecodedDataListener (also implemented)

By filtering out messages that are not MAP messages, we can allow for processing of non-map messages via the current Asn1DecodedDataRouter paths while also allowing Asn1DecodedDataListener to process the MAP messages from the same topic (topic.Asn1DecoderOutput)

* remove commented out code and update javadoc for filter strategy in KafkaConsumerConfig

* remove routeMAP in favor of explicitly throwing UnsupportedDataTypeException

* test application.yaml bootstrap-server port 9093->4242 to match tests

* properly serialize data before publishing in Asn1DecodeMAPJSONListener

* moved all TopicsTests under kafka.topics test package for consistency

* moved Asn1DecodeMAPJSONTest to kafka package for consistency with what it's testing

* use constructor injection instead of field injection AND constructor injection in Asn1DecodeMAPJSONListener

* remove references to Asn1DecodeMAPJSON from Asn1DecodeMAPJSONTest to prepare for deletion of Asn1DecodeMAPJSON

* delete Asn1DecodeMAPJSON.java and all references now that we've migrated to the Spring Kafka implementation for this data flow

* only create one partition per topic in EmbeddedKafkaHolder

* add Awaitility as test dependency to support async testing

* replace EmbeddedKafka annotation with EmbeddedKafkaHolder for all Asn1Decode*JSONTests

* delete obsolete AsnCodecMessageServiceControllerTest.java. Any test annotated with SpringBootTest will already confirm we can instantiate the AsnCodecMessageServiceController as expected

* remove UnsupportedDataTypeException.java from consumption flows and let kafka's consumerGroups manage the separate offsets

* inject test queue names with SpringBootTest annotation in Asn1DecodeMAPJSONTest to support unique test topic naming

* inject test queue names with SpringBootTest annotation in MapReceiverTest to support unique test topic naming and port assignment

* delete disabled AsnCodecRouterServiceControllerTest.java because bean instantiation is already tested by any SpringBootTest annotated tests

* make OdeTimJsonTopology a spring Component and inject to properly manage streams lifecycle

Before this change, whenever we would run any SpringBootTest tests in sequence we would get an INVALID_STATE error and fail tests unrelated to OdeTimJsonTopology. By turning the OdeTimJsonTopology into a Component we can allow Spring to manage the singleton lifecycle of the OdeTimJsonTopology and its kafka streams.

* add consumer ids to listeners and move from property injection to constructor injection in Asn1DecodedDataListener

* inject topic names into Asn1DecodedDataRouterApprovalTest to keep topics under test separate between tests

* remove unnecessary annotations from Asn1DecodeMAPJSONTest

* convert properties to local variable in OdeTimJsonTopology constructor

* add KAFKA_TYPE env variable into OdeKafkaProperties

* add ConfluentProperties to OdeKafkaProperties

* default to empty string for kafkaType in OdeKafkaProperties to prevent NPE errors

* replace OdeTimJsonTopology.java with Spring Kafka provided KStream via KafkaStreamConfig

The Spring Kafka managed implementation provides SmartLifecycleManagement so that we don't need to manually manage the start/cleanup steps of the kstreams lifecycles. Before this implementation our SpringBootTest integration tests could not run in sequence because the streams would not be in a valid state between tests.

* add confluent properties to the consumer and producer configuration conditionally

* implement disabledTopic filtering in publish steps of Asn1DecodedDataListener and Asn1DecodeMAPJSONListener

* correct indentation in TimDepositController

* Revert "correct indentation in TimDepositController"

This reverts commit d12d12c.

* Revert "replace OdeTimJsonTopology.java with Spring Kafka provided KStream via KafkaStreamConfig"

This reverts commit 29a7fa1.

* Revert "convert properties to local variable in OdeTimJsonTopology constructor"

This reverts commit 9e2b332.

* Revert "make OdeTimJsonTopology a spring Component and inject to properly manage streams lifecycle"

This reverts commit 1427534.

* make streams private final instead of static in OdeTimJsonTopology to better manage lifecycle

* use OdeKafkaProperties.Confluent to set stream props for OdeTimJsonTopology to remove reliance on late stage props validation

* make stream properties local to OdeTimJsonTopology constructor

* pass in topic to OdeTimJsonTopology, add state listener, take start() internal

* use Awaitility to await states in OdeTimJsonTopologyTest and remove testQuery as it is not unit testable; it requires an integration test with multiple kafka topic interactions

* configure default timeout for Awaitility in Asn1DecodeMAPJSONTest to prevent flakiness

* remove slf4j annotation from MessageProcessor where it is not needed

* remove setStaticSchemaVersion from OdeProperties in favor of hard coding in OdeMsgMetadata for now

* remove null string for key in send method Asn1DecodedDataListener

* replace environment variable lookup with configuration value in OdeTimJsonTopology for kafkaType

* deleted unnecessary OdePropertiesTest.java

* reformat OdeKafkaPropertiesValidatorTest

* correctly initialize embedded kafka and needed topics in Asn1Decode*JSONTest files

* add missing confluent configuration to odeDatProducerFactory

* add missing confluent configuration to odeDataConsumerFactory
  • Loading branch information
mcook42 authored Nov 26, 2024
commit 6f700e28a3eb0e7ad32a21c5e26817f52affe5fd
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public enum GeneratedBy {

private static final long serialVersionUID = 3979762143291085955L;

private static int staticSchemaVersion;
private static int staticSchemaVersion = 7;

private String payloadType;
private SerialId serialId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
******************************************************************************/
package us.dot.its.jpo.ode.wrapper;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;

/**
* @author 572682
* This abstract class provides the common and basic functionality for processinf messages
Expand Down
6 changes: 6 additions & 0 deletions jpo-ode-svcs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>3.0.0</version>
<scope>test</scope>
</dependency>
<!-- WEB UI Boot Dependencies -->
<dependency>
<groupId>org.webjars</groupId>
Expand Down
19 changes: 9 additions & 10 deletions jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,30 @@
import jakarta.annotation.PostConstruct;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.info.BuildProperties;
import org.springframework.context.annotation.Configuration;
import us.dot.its.jpo.ode.model.OdeMsgMetadata;
import org.springframework.context.annotation.Import;
import org.springframework.stereotype.Component;


@Configuration
@ConfigurationProperties(prefix = "ode")
@Component
@Data
@Slf4j
@Import(BuildProperties.class)
public class OdeProperties {

private int outputSchemaVersion = 7;
private static final byte[] JPO_ODE_GROUP_ID = "jode".getBytes();

@Autowired
BuildProperties buildProperties;
final BuildProperties buildProperties;

public OdeProperties(BuildProperties buildProperties) {
this.buildProperties = buildProperties;
}

@PostConstruct
void initialize() {
log.info("groupId: {}", buildProperties.getGroup());
log.info("artifactId: {}", buildProperties.getArtifact());
log.info("version: {}", buildProperties.getVersion());
OdeMsgMetadata.setStaticSchemaVersion(this.outputSchemaVersion);
}

public String getVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,71 +23,42 @@
@Slf4j
public class OdeTimJsonTopology {

private final KafkaStreams streams;

private final Properties streamsProperties = new Properties();
static KafkaStreams streams;
public OdeTimJsonTopology(OdeKafkaProperties odeKafkaProps, String topic) {

public OdeTimJsonTopology(OdeKafkaProperties odeKafkaProps) {
if (odeKafkaProps.getBrokers() != null) {
this.streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "KeyedOdeTimJson");
this.streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, odeKafkaProps.getBrokers());
this.streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
this.streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

String kafkaType = System.getenv("KAFKA_TYPE");
if (kafkaType != null && kafkaType.equals("CONFLUENT")) {
addConfluentProperties(this.streamsProperties);
}
} else {
log.error("Kafka Brokers not set in OdeProperties");
}
}
Properties streamsProperties = new Properties();
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "KeyedOdeTimJson");
streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, odeKafkaProps.getBrokers());
streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);

public void start() {
if (streams != null && streams.state().isRunningOrRebalancing()) {
throw new IllegalStateException("Start called while streams is already running.");
} else {
if (streams == null) {
streams = new KafkaStreams(buildTopology(), streamsProperties);
}
log.info("Starting Ode Tim Json Topology");
streams.start();
if ("CONFLUENT".equals(odeKafkaProps.getKafkaType())) {
streamsProperties.put("sasl.jaas.config", odeKafkaProps.getConfluent().getSaslJaasConfig());
}
streams = new KafkaStreams(buildTopology(topic), streamsProperties);
streams.setStateListener((newState, oldState) ->
log.info("Transitioning from {} to {}", oldState, newState)
);
streams.start();
}

public void stop() {
if (streams != null) {
log.info("Stopping Ode Tim Json Topology");
streams.close();
}
log.info("Stopping Ode Tim Json Topology");
streams.close();
}

public boolean isRunning() {
return streams != null && streams.state().isRunningOrRebalancing();
return streams.state().isRunningOrRebalancing();
}

public Topology buildTopology() {
public Topology buildTopology(String topic) {
StreamsBuilder builder = new StreamsBuilder();
builder.table("topic.OdeTimJson", Materialized.<String, String>as(Stores.inMemoryKeyValueStore("timjson-store")));
builder.table(topic, Materialized.<String, String>as(Stores.inMemoryKeyValueStore("timjson-store")));
return builder.build();
}

public String query(String uuid) {
return (String) streams.store(StoreQueryParameters.fromNameAndType("timjson-store", QueryableStoreTypes.keyValueStore())).get(uuid);
}

private void addConfluentProperties(Properties properties) {
String username = System.getenv("CONFLUENT_KEY");
String password = System.getenv("CONFLUENT_SECRET");

if (username != null && password != null) {
String auth = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + username + "\" " +
"password=\"" + password + "\";";
this.streamsProperties.put("sasl.jaas.config", auth);
}
else {
log.error("Environment variables CONFLUENT_KEY and CONFLUENT_SECRET are not set. Set these in the .env file to use Confluent Cloud");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import lombok.extern.slf4j.Slf4j;
import us.dot.its.jpo.ode.coder.stream.LogFileToAsn1CodecPublisher;
import us.dot.its.jpo.ode.importer.ImporterDirectoryWatcher.ImporterFileType;
import us.dot.its.jpo.ode.kafka.JsonTopics;
import us.dot.its.jpo.ode.kafka.topics.JsonTopics;
import us.dot.its.jpo.ode.kafka.OdeKafkaProperties;
import us.dot.its.jpo.ode.kafka.RawEncodedJsonTopics;
import us.dot.its.jpo.ode.kafka.topics.RawEncodedJsonTopics;

import java.io.BufferedInputStream;
import java.nio.file.Path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import us.dot.its.jpo.ode.importer.ImporterDirectoryWatcher.ImporterFileType;
import us.dot.its.jpo.ode.importer.parser.*;
import us.dot.its.jpo.ode.importer.parser.FileParser.ParserStatus;
import us.dot.its.jpo.ode.kafka.JsonTopics;
import us.dot.its.jpo.ode.kafka.RawEncodedJsonTopics;
import us.dot.its.jpo.ode.kafka.topics.JsonTopics;
import us.dot.its.jpo.ode.kafka.topics.RawEncodedJsonTopics;
import us.dot.its.jpo.ode.model.*;
import us.dot.its.jpo.ode.uper.UperUtil;
import us.dot.its.jpo.ode.util.JsonUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import lombok.extern.slf4j.Slf4j;
import us.dot.its.jpo.ode.coder.FileAsn1CodecPublisher;
import us.dot.its.jpo.ode.coder.stream.FileImporterProperties;
import us.dot.its.jpo.ode.kafka.JsonTopics;
import us.dot.its.jpo.ode.kafka.topics.JsonTopics;
import us.dot.its.jpo.ode.kafka.OdeKafkaProperties;
import us.dot.its.jpo.ode.kafka.RawEncodedJsonTopics;
import us.dot.its.jpo.ode.kafka.topics.RawEncodedJsonTopics;

import java.io.IOException;
import java.nio.file.Path;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package us.dot.its.jpo.ode.kafka;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "ode.kafka.confluent")
@Data
public class ConfluentProperties {
private String username;
private String password;

public String getSaslJaasConfig() {
return "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + username + "\" " +
"password=\"" + password + "\";";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package us.dot.its.jpo.ode.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.json.JSONObject;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import us.dot.its.jpo.ode.context.AppContext;
import us.dot.its.jpo.ode.model.OdeAsn1Data;
import us.dot.its.jpo.ode.model.OdeMapData;
import us.dot.its.jpo.ode.plugin.j2735.J2735DSRCmsgID;
import us.dot.its.jpo.ode.util.XmlUtils;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
@Slf4j
public class KafkaConsumerConfig {

private final KafkaProperties kafkaProperties;
private final OdeKafkaProperties odeKafkaProperties;

public KafkaConsumerConfig(KafkaProperties kafkaProperties, OdeKafkaProperties odeKafkaProperties) {
this.kafkaProperties = kafkaProperties;
this.odeKafkaProperties = odeKafkaProperties;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
var consumerProps = kafkaProperties.buildConsumerProperties();
if ("CONFLUENT".equals(this.odeKafkaProperties.getKafkaType())) {
consumerProps.put("sasl.jaas.config", odeKafkaProperties.getConfluent().getSaslJaasConfig());
}
return new DefaultKafkaConsumerFactory<>(consumerProps);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}

@Bean
public ConsumerFactory<String, OdeMapData> odeMapDataConsumerFactory() {
Map<String, Object> props = new HashMap<>(kafkaProperties.buildConsumerProperties());
if ("CONFLUENT".equals(this.odeKafkaProperties.getKafkaType())) {
props.put("sasl.jaas.config", odeKafkaProperties.getConfluent().getSaslJaasConfig());
}
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(OdeMapData.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, OdeMapData> odeMapDataConsumerListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OdeMapData> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(odeMapDataConsumerFactory());
return factory;
}

/**
* @return factory A listener factory that supports filtering out messages that don't match a specific pattern
* <p>
* @deprecated This method is intended to be short-lived. It exists to allow consumption via the Asn1DecodedDataRouter &
* the Asn1DecodedDataListener while we are migrating from hand-rolled Kafka implementation to Spring's Kafka implementation
*/
@Bean
@Deprecated(forRemoval = true)
public ConcurrentKafkaListenerContainerFactory<String, String> tempFilteringKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(getFilterStrategySpringKafkaSupportedMessageTypesOnly());

return factory;
}

/**
* While migrating to Spring Kafka the consumers provided from this factory will only consume (and ack) messages
* we support via the Spring Kafka implementation. All other messages will be handled by the Asn1DecodedDataRouter
*
* @return RecordFilterStrategy<String, String> filter
*/
private static RecordFilterStrategy<String, String> getFilterStrategySpringKafkaSupportedMessageTypesOnly() {
return consumerRecord -> {
try {
JSONObject consumed = XmlUtils.toJSONObject(consumerRecord.value()).getJSONObject(OdeAsn1Data.class.getSimpleName());

J2735DSRCmsgID messageId = J2735DSRCmsgID.valueOf(
consumed.getJSONObject(AppContext.PAYLOAD_STRING)
.getJSONObject(AppContext.DATA_STRING)
.getJSONObject("MessageFrame")
.getInt("messageId")
);

// Filter out all messages EXCEPT for MAP messages
return !J2735DSRCmsgID.MAPMessage.equals(messageId);
} catch (XmlUtils.XmlUtilsException e) {
log.warn("Unable to parse JSON object", e);
return false;
} catch (Exception e) {
log.warn("Failed to detect message ID", e);
return false;
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package us.dot.its.jpo.ode.kafka;

import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import us.dot.its.jpo.ode.model.OdeObject;

@EnableKafka
@Configuration
public class KafkaProducerConfig {

private final KafkaProperties kafkaProperties;
private final OdeKafkaProperties odeKafkaProperties;

public KafkaProducerConfig(KafkaProperties kafkaProperties, OdeKafkaProperties odeKafkaProperties) {
this.kafkaProperties = kafkaProperties;
this.odeKafkaProperties = odeKafkaProperties;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
var producerProps = kafkaProperties.buildProducerProperties();
if ("CONFLUENT".equals(this.odeKafkaProperties.getKafkaType())) {
producerProps.put("sasl.jaas.config", odeKafkaProperties.getConfluent().getSaslJaasConfig());
}
return new DefaultKafkaProducerFactory<>(producerProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, OdeObject> odeDataProducerFactory() {
var producerProps = kafkaProperties.buildProducerProperties();
if ("CONFLUENT".equals(this.odeKafkaProperties.getKafkaType())) {
producerProps.put("sasl.jaas.config", odeKafkaProperties.getConfluent().getSaslJaasConfig());
}
return new DefaultKafkaProducerFactory<>(producerProps,
new StringSerializer(), new XMLOdeObjectSerializer());
}

@Bean
public KafkaTemplate<String, OdeObject> odeDataKafkaTemplate() {
return new KafkaTemplate<>(odeDataProducerFactory());
}
}
Loading