-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
2. Kafka: Add topic details based on new bulk format #1495
base: mvp_demo
Are you sure you want to change the base?
Conversation
Signed-off-by: msvinaykumar <[email protected]>
Signed-off-by: Saad Khan <[email protected]>
Signed-off-by: msvinaykumar <[email protected]>
Signed-off-by: Saad Khan <[email protected]>
src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java
Outdated
Show resolved
Hide resolved
experiment.getApis().getRecommendations().setResponse(new KruizeResponse(e.getMessage(), | ||
HttpURLConnection.HTTP_INTERNAL_ERROR, null, FAILED)); | ||
// if kafka is enabled, push the error response in the error topic | ||
if(KruizeDeploymentInfo.is_kafka_enabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also check if TOPIC is available via config , IF yes then only publish
setFinalJobStatus(COMPLETED, null, null, finalDatasource); | ||
// if kafka is enabled, push the final summary in the summary topic | ||
if (KruizeDeploymentInfo.is_kafka_enabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here also check if topic is available via Configs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added config list in ENV and validations for the same in the code
setFinalJobStatus(COMPLETED, null, null, finalDatasource); | ||
// if kafka is enabled, push the final summary in the summary topic | ||
if (KruizeDeploymentInfo.is_kafka_enabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that the code checks KruizeDeploymentInfo.is_kafka_enabled three times. Instead, consider gathering the necessary details into a custom object, such as KruizeKafka.
In the finally block per container , check if KruizeDeploymentInfo.is_kafka_enabled is true. If so, use the KruizeKafka object to invoke the KruizeKafka.publish() method.
Inside the publish() method, determine which topics the messages should be sent to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* @return filtered JSON message to be sent to kafka topic | ||
* @throws Exception exception if raised during filter process | ||
*/ | ||
private String buildKafkaResponse(BulkJobStatus jobData, String experimentName, String topic, String failedAPI) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All this function should be part of KruizKafa object and KruizeKafkaManger.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved
* @return filtered JSON message to be sent to kafka topic | ||
* @throws Exception exception if raised during filter process | ||
*/ | ||
private String buildKafkaResponse(BulkJobStatus jobData, String experimentName, String topic, String failedAPI) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kafka Response should be configurable via config varaible
Ideally, BulkManager and Kafka should be loosely coupled and managed through KruizeKafka and KruizeKafkaManager objects. There should be a single check in BulkManager for is_kafka_enabled, followed by invoking a method in KruizeKafkaManager. This approach minimizes Kafka-related code within BulkManager, centralizing it in one place, which improves readability, maintainability, and scalability for future enhancements. |
Refactored the code now by adding KruizeKafkaManager and KruizeKafka classes |
Description
This PR contains the topic changes based on the new Bulk API format.
Note: This is on top of #1493
Fixes # (issue)
Type of change
How has this been tested?
Tested manually by running it on Openshift.
Test Configuration
Checklist 🎯
Additional information
Include any additional information such as links, test results, screenshots here