Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2. Kafka: Add topic details based on new bulk format #1495

Open
wants to merge 11 commits into
base: mvp_demo
Choose a base branch
from

Conversation

khansaad
Copy link
Contributor

@khansaad khansaad commented Feb 4, 2025

Description

This PR contains the topic changes based on the new Bulk API format.
Note: This is on top of #1493

Fixes # (issue)

Type of change

  • Bug fix
  • New feature
  • Docs update
  • Breaking change (What changes might users need to make in their application due to this PR?)
  • Requires DB changes

How has this been tested?

Tested manually by running it on Openshift.

  • New Test X
  • Functional testsuite

Test Configuration

  • Kubernetes clusters tested on: Openshift

Checklist 🎯

  • Followed coding guidelines
  • Comments added
  • Dependent changes merged
  • Documentation updated
  • Tests added or updated

Additional information

Include any additional information such as links, test results, screenshots here

@khansaad khansaad added the enhancement New feature or request label Feb 4, 2025
@khansaad khansaad added this to the Kruize 0.4 Release milestone Feb 4, 2025
@khansaad khansaad self-assigned this Feb 4, 2025
experiment.getApis().getRecommendations().setResponse(new KruizeResponse(e.getMessage(),
HttpURLConnection.HTTP_INTERNAL_ERROR, null, FAILED));
// if kafka is enabled, push the error response in the error topic
if(KruizeDeploymentInfo.is_kafka_enabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also check if TOPIC is available via config , IF yes then only publish

setFinalJobStatus(COMPLETED, null, null, finalDatasource);
// if kafka is enabled, push the final summary in the summary topic
if (KruizeDeploymentInfo.is_kafka_enabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also check if topic is available via Configs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added config list in ENV and validations for the same in the code

setFinalJobStatus(COMPLETED, null, null, finalDatasource);
// if kafka is enabled, push the final summary in the summary topic
if (KruizeDeploymentInfo.is_kafka_enabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that the code checks KruizeDeploymentInfo.is_kafka_enabled three times. Instead, consider gathering the necessary details into a custom object, such as KruizeKafka.

In the finally block per container , check if KruizeDeploymentInfo.is_kafka_enabled is true. If so, use the KruizeKafka object to invoke the KruizeKafka.publish() method.

Inside the publish() method, determine which topics the messages should be sent to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* @return filtered JSON message to be sent to kafka topic
* @throws Exception exception if raised during filter process
*/
private String buildKafkaResponse(BulkJobStatus jobData, String experimentName, String topic, String failedAPI) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this function should be part of KruizKafa object and KruizeKafkaManger.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved

* @return filtered JSON message to be sent to kafka topic
* @throws Exception exception if raised during filter process
*/
private String buildKafkaResponse(BulkJobStatus jobData, String experimentName, String topic, String failedAPI) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kafka Response should be configurable via config varaible

@msvinaykumar
Copy link
Contributor

Ideally, BulkManager and Kafka should be loosely coupled and managed through KruizeKafka and KruizeKafkaManager objects. There should be a single check in BulkManager for is_kafka_enabled, followed by invoking a method in KruizeKafkaManager. This approach minimizes Kafka-related code within BulkManager, centralizing it in one place, which improves readability, maintainability, and scalability for future enhancements.

@khansaad
Copy link
Contributor Author

Ideally, BulkManager and Kafka should be loosely coupled and managed through KruizeKafka and KruizeKafkaManager objects. There should be a single check in BulkManager for is_kafka_enabled, followed by invoking a method in KruizeKafkaManager. This approach minimizes Kafka-related code within BulkManager, centralizing it in one place, which improves readability, maintainability, and scalability for future enhancements.

Refactored the code now by adding KruizeKafkaManager and KruizeKafka classes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Status: Under Review
Development

Successfully merging this pull request may close these issues.

2 participants