From b44830342d691057069450691ebabff35cd3ae49 Mon Sep 17 00:00:00 2001 From: msvinaykumar Date: Fri, 8 Nov 2024 22:40:14 +0530 Subject: [PATCH 1/5] Handling GatewayTimout 503 , ConnectionTimeout 502 and HttpHostConnection Signed-off-by: msvinaykumar --- src/main/java/com/autotune/Autotune.java | 24 +- .../datasource/DataSourceCollection.java | 231 ++++++++------- .../DataSourceMetadataOperator.java | 102 +++---- .../common/datasource/DataSourceOperator.java | 27 +- .../datasource/DataSourceOperatorImpl.java | 271 +++++++++--------- .../PrometheusDataOperatorImpl.java | 114 +++----- .../handler/MetricCollectionHandler.java | 23 +- .../autotune/utils/GenericRestApiClient.java | 64 ++++- 8 files changed, 462 insertions(+), 394 deletions(-) diff --git a/src/main/java/com/autotune/Autotune.java b/src/main/java/com/autotune/Autotune.java index ad6ad9834..42eee9986 100644 --- a/src/main/java/com/autotune/Autotune.java +++ b/src/main/java/com/autotune/Autotune.java @@ -24,6 +24,9 @@ import com.autotune.analyzer.utils.AnalyzerConstants; import com.autotune.common.datasource.DataSourceCollection; import com.autotune.common.datasource.DataSourceInfo; +import com.autotune.common.exceptions.datasource.DataSourceAlreadyExist; +import com.autotune.common.exceptions.datasource.DataSourceNotServiceable; +import com.autotune.common.exceptions.datasource.UnsupportedDataSourceProvider; import com.autotune.database.helper.DBConstants; import com.autotune.database.init.KruizeHibernateUtil; import com.autotune.experimentManager.core.ExperimentManager; @@ -31,7 +34,10 @@ import com.autotune.operator.KruizeDeploymentInfo; import com.autotune.service.HealthService; import com.autotune.service.InitiateListener; -import com.autotune.utils.*; +import com.autotune.utils.CloudWatchAppender; +import com.autotune.utils.KruizeConstants; +import com.autotune.utils.MetricsConfig; +import com.autotune.utils.ServerContext; import com.autotune.utils.filter.KruizeCORSFilter; import io.prometheus.client.exporter.MetricsServlet; import io.prometheus.client.hotspot.DefaultExports; @@ -50,8 +56,12 @@ import javax.servlet.DispatcherType; import java.io.File; +import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; import java.util.EnumSet; import java.util.HashMap; import java.util.Scanner; @@ -112,7 +122,11 @@ public static void main(String[] args) { // load available datasources from db loadDataSourcesFromDB(); // setting up DataSources - setUpDataSources(); + try { + setUpDataSources(); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } // checking available DataSources checkAvailableDataSources(); // load available metric profiles from db @@ -124,7 +138,7 @@ public static void main(String[] args) { //Regenerate a Hibernate session following the creation of new tables KruizeHibernateUtil.buildSessionFactory(); } catch (Exception | K8sTypeNotSupportedException | MonitoringAgentNotSupportedException | - MonitoringAgentNotFoundException e) { + MonitoringAgentNotFoundException e) { e.printStackTrace(); System.exit(1); } @@ -170,7 +184,7 @@ public static void main(String[] args) { /** * Set up the data sources available at installation time from config file */ - private static void setUpDataSources() { + private static void setUpDataSources() throws UnsupportedDataSourceProvider, DataSourceNotServiceable, DataSourceAlreadyExist, IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { DataSourceCollection dataSourceCollection = DataSourceCollection.getInstance(); dataSourceCollection.addDataSourcesFromConfigFile(KruizeConstants.CONFIG_FILE); } @@ -190,7 +204,7 @@ private static void checkAvailableDataSources() { DataSourceCollection dataSourceCollection = DataSourceCollection.getInstance(); LOGGER.info(KruizeConstants.DataSourceConstants.DataSourceInfoMsgs.CHECKING_AVAILABLE_DATASOURCE); HashMap dataSources = dataSourceCollection.getDataSourcesCollection(); - for (String name: dataSources.keySet()) { + for (String name : dataSources.keySet()) { DataSourceInfo dataSource = dataSources.get(name); String dataSourceName = dataSource.getName(); String url = dataSource.getUrl().toString(); diff --git a/src/main/java/com/autotune/common/datasource/DataSourceCollection.java b/src/main/java/com/autotune/common/datasource/DataSourceCollection.java index 6734adc1c..f84b653a7 100644 --- a/src/main/java/com/autotune/common/datasource/DataSourceCollection.java +++ b/src/main/java/com/autotune/common/datasource/DataSourceCollection.java @@ -16,8 +16,8 @@ package com.autotune.common.datasource; import com.autotune.common.auth.AuthenticationConfig; -import com.autotune.common.exceptions.datasource.*; import com.autotune.common.data.ValidationOutputData; +import com.autotune.common.exceptions.datasource.*; import com.autotune.common.utils.CommonUtils; import com.autotune.database.service.ExperimentDBService; import com.autotune.utils.KruizeConstants; @@ -31,6 +31,9 @@ import java.io.InputStream; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; import java.util.HashMap; import java.util.List; @@ -43,13 +46,22 @@ private DataSourceCollection() { this.dataSourceCollection = new HashMap<>(); } + /** + * Returns the instance of dataSourceCollection class + * + * @return DataSourceCollection instance + */ + public static DataSourceCollection getInstance() { + return dataSourceCollectionInstance; + } + public void loadDataSourcesFromDB() { try { LOGGER.info(KruizeConstants.DataSourceConstants.DataSourceInfoMsgs.CHECKING_AVAILABLE_DATASOURCE_FROM_DB); List availableDataSources = new ExperimentDBService().loadAllDataSources(); if (null == availableDataSources) { LOGGER.info(KruizeConstants.DataSourceConstants.DataSourceInfoMsgs.NO_DATASOURCE_FOUND_IN_DB); - }else { + } else { for (DataSourceInfo dataSourceInfo : availableDataSources) { LOGGER.info(KruizeConstants.DataSourceConstants.DataSourceSuccessMsgs.DATASOURCE_FOUND + dataSourceInfo.getName()); dataSourceCollection.put(dataSourceInfo.getName(), dataSourceInfo); @@ -61,16 +73,10 @@ public void loadDataSourcesFromDB() { } } - /** - * Returns the instance of dataSourceCollection class - * @return DataSourceCollection instance - */ - public static DataSourceCollection getInstance() { - return dataSourceCollectionInstance; - } /** * Returns the hashmap of data sources + * * @return HashMap containing dataSourceInfo objects */ public HashMap getDataSourcesCollection() { @@ -79,116 +85,111 @@ public HashMap getDataSourcesCollection() { /** * Adds datasource to collection + * * @param datasource DataSourceInfo object containing details of datasource */ - public void addDataSource(DataSourceInfo datasource) { + public void addDataSource(DataSourceInfo datasource) throws DataSourceAlreadyExist, DataSourceNotServiceable, UnsupportedDataSourceProvider, IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { final String name = datasource.getName(); final String provider = datasource.getProvider(); ValidationOutputData addedToDB = null; LOGGER.info(KruizeConstants.DataSourceConstants.DataSourceInfoMsgs.ADDING_DATASOURCE + name); - try { - if (dataSourceCollection.containsKey(name)) { - throw new DataSourceAlreadyExist(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.DATASOURCE_ALREADY_EXIST); - } - if (provider.equalsIgnoreCase(KruizeConstants.SupportedDatasources.PROMETHEUS)) { - LOGGER.info(KruizeConstants.DataSourceConstants.DataSourceInfoMsgs.VERIFYING_DATASOURCE_REACHABILITY + name); - DataSourceOperatorImpl op = DataSourceOperatorImpl.getInstance().getOperator(KruizeConstants.SupportedDatasources.PROMETHEUS); - if (op.isServiceable(datasource) == CommonUtils.DatasourceReachabilityStatus.REACHABLE) { - LOGGER.info(KruizeConstants.DataSourceConstants.DataSourceSuccessMsgs.DATASOURCE_SERVICEABLE); - // add the data source to DB - addedToDB = new ExperimentDBService().addDataSourceToDB(datasource); - if (addedToDB.isSuccess()) { - LOGGER.info("Datasource added to the DB successfully."); - } else { - LOGGER.error("Failed to add datasource to DB: {}", addedToDB.getMessage()); - } - dataSourceCollection.put(name, datasource); - LOGGER.info(KruizeConstants.DataSourceConstants.DataSourceSuccessMsgs.DATASOURCE_ADDED); + if (dataSourceCollection.containsKey(name)) { + throw new DataSourceAlreadyExist(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.DATASOURCE_ALREADY_EXIST); + } + + if (provider.equalsIgnoreCase(KruizeConstants.SupportedDatasources.PROMETHEUS)) { + LOGGER.info(KruizeConstants.DataSourceConstants.DataSourceInfoMsgs.VERIFYING_DATASOURCE_REACHABILITY + name); + DataSourceOperatorImpl op = DataSourceOperatorImpl.getInstance().getOperator(KruizeConstants.SupportedDatasources.PROMETHEUS); + if (op.isServiceable(datasource) == CommonUtils.DatasourceReachabilityStatus.REACHABLE) { + LOGGER.info(KruizeConstants.DataSourceConstants.DataSourceSuccessMsgs.DATASOURCE_SERVICEABLE); + // add the data source to DB + addedToDB = new ExperimentDBService().addDataSourceToDB(datasource); + if (addedToDB.isSuccess()) { + LOGGER.info("Datasource added to the DB successfully."); } else { - throw new DataSourceNotServiceable(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.DATASOURCE_NOT_SERVICEABLE); + LOGGER.error("Failed to add datasource to DB: {}", addedToDB.getMessage()); } + dataSourceCollection.put(name, datasource); + LOGGER.info(KruizeConstants.DataSourceConstants.DataSourceSuccessMsgs.DATASOURCE_ADDED); } else { - throw new UnsupportedDataSourceProvider(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.UNSUPPORTED_DATASOURCE_PROVIDER); + throw new DataSourceNotServiceable(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.DATASOURCE_NOT_SERVICEABLE); } - } catch (UnsupportedDataSourceProvider e) { - LOGGER.error(e.getMessage()); - } catch (DataSourceNotServiceable e) { - LOGGER.error(e.getMessage()); - } catch (DataSourceAlreadyExist e) { - LOGGER.error(e.getMessage()); + } else { + throw new UnsupportedDataSourceProvider(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.UNSUPPORTED_DATASOURCE_PROVIDER); } + } /** * Loads the data sources available at installation time + * * @param configFileName name of the config file mounted */ - public void addDataSourcesFromConfigFile(String configFileName) { - try { - String configFile = System.getenv(configFileName); - JSONObject configObject = null; - - InputStream is = new FileInputStream(configFile); - String jsonTxt = new String(is.readAllBytes(), StandardCharsets.UTF_8); - configObject = new JSONObject(jsonTxt); - JSONArray dataSourceArr = configObject.getJSONArray(KruizeConstants.DataSourceConstants.KRUIZE_DATASOURCE); - - for (Object dataSourceObj: dataSourceArr) { - JSONObject dataSourceObject = (JSONObject) dataSourceObj; - String name = dataSourceObject.getString(KruizeConstants.DataSourceConstants.DATASOURCE_NAME); - // check the DB if the datasource already exists - try { - DataSourceInfo dataSourceInfo = new ExperimentDBService().loadDataSourceFromDBByName(name); - if (null != dataSourceInfo) { - LOGGER.error("Datasource: {} already exists!", name); - continue; - } - } catch (Exception e) { - LOGGER.error("Loading saved datasource {} failed: {} ", name, e.getMessage()); - } - String provider = dataSourceObject.getString(KruizeConstants.DataSourceConstants.DATASOURCE_PROVIDER); - String serviceName = dataSourceObject.getString(KruizeConstants.DataSourceConstants.DATASOURCE_SERVICE_NAME); - String namespace = dataSourceObject.getString(KruizeConstants.DataSourceConstants.DATASOURCE_SERVICE_NAMESPACE); - String dataSourceURL = dataSourceObject.getString(KruizeConstants.DataSourceConstants.DATASOURCE_URL); - AuthenticationConfig authConfig; - try { - JSONObject authenticationObj = dataSourceObject.optJSONObject(KruizeConstants.AuthenticationConstants.AUTHENTICATION); - // create the corresponding authentication object - authConfig = AuthenticationConfig.createAuthenticationConfigObject(authenticationObj); - } catch (Exception e) { - LOGGER.warn("Auth details are missing for datasource: {}", name); - authConfig = AuthenticationConfig.noAuth(); - } - - DataSourceInfo datasource; - // Validate input - if (!validateInput(name, provider, serviceName, dataSourceURL, namespace)) { + public void addDataSourcesFromConfigFile(String configFileName) throws UnsupportedDataSourceProvider, DataSourceNotServiceable, DataSourceAlreadyExist, IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { + + String configFile = System.getenv(configFileName); + JSONObject configObject = null; + + InputStream is = new FileInputStream(configFile); + String jsonTxt = new String(is.readAllBytes(), StandardCharsets.UTF_8); + configObject = new JSONObject(jsonTxt); + JSONArray dataSourceArr = configObject.getJSONArray(KruizeConstants.DataSourceConstants.KRUIZE_DATASOURCE); + + for (Object dataSourceObj : dataSourceArr) { + JSONObject dataSourceObject = (JSONObject) dataSourceObj; + String name = dataSourceObject.getString(KruizeConstants.DataSourceConstants.DATASOURCE_NAME); + // check the DB if the datasource already exists + try { + DataSourceInfo dataSourceInfo = new ExperimentDBService().loadDataSourceFromDBByName(name); + if (null != dataSourceInfo) { + LOGGER.error("Datasource: {} already exists!", name); continue; } - if (dataSourceURL.isEmpty()) { - datasource = new DataSourceInfo(name, provider, serviceName, namespace, null); - } else { - datasource = new DataSourceInfo(name, provider, serviceName, namespace, new URL(dataSourceURL)); - } - // set the authentication config - datasource.setAuthenticationConfig(authConfig); - addDataSource(datasource); + } catch (Exception e) { + LOGGER.error("Loading saved datasource {} failed: {} ", name, e.getMessage()); } - } catch (IOException e) { - LOGGER.error(e.getMessage()); + String provider = dataSourceObject.getString(KruizeConstants.DataSourceConstants.DATASOURCE_PROVIDER); + String serviceName = dataSourceObject.getString(KruizeConstants.DataSourceConstants.DATASOURCE_SERVICE_NAME); + String namespace = dataSourceObject.getString(KruizeConstants.DataSourceConstants.DATASOURCE_SERVICE_NAMESPACE); + String dataSourceURL = dataSourceObject.getString(KruizeConstants.DataSourceConstants.DATASOURCE_URL); + AuthenticationConfig authConfig; + try { + JSONObject authenticationObj = dataSourceObject.optJSONObject(KruizeConstants.AuthenticationConstants.AUTHENTICATION); + // create the corresponding authentication object + authConfig = AuthenticationConfig.createAuthenticationConfigObject(authenticationObj); + } catch (Exception e) { + LOGGER.warn("Auth details are missing for datasource: {}", name); + authConfig = AuthenticationConfig.noAuth(); + } + + DataSourceInfo datasource; + // Validate input + if (!validateInput(name, provider, serviceName, dataSourceURL, namespace)) { + continue; + } + if (dataSourceURL.isEmpty()) { + datasource = new DataSourceInfo(name, provider, serviceName, namespace, null); + } else { + datasource = new DataSourceInfo(name, provider, serviceName, namespace, new URL(dataSourceURL)); + } + // set the authentication config + datasource.setAuthenticationConfig(authConfig); + addDataSource(datasource); } + } /** * validates the input parameters before creating dataSourceInfo objects - * @param name String containing name of the datasource - * @param provider String containing provider of the datasource + * + * @param name String containing name of the datasource + * @param provider String containing provider of the datasource * @param servicename String containing service name for the datasource - * @param url String containing URL of the data source - * @param namespace String containing namespace for the datasource service + * @param url String containing URL of the data source + * @param namespace String containing namespace for the datasource service * @return boolean returns true if validation is successful otherwise return false */ public boolean validateInput(String name, String provider, String servicename, String url, String namespace) { @@ -214,42 +215,38 @@ public boolean validateInput(String name, String provider, String servicename, S /** * deletes the datasource from the Hashmap + * * @param name String containing the name of the datasource to be deleted - * TODO: add db related operations + * TODO: add db related operations */ - public void deleteDataSource(String name) { - try { - if (name == null) { - throw new DataSourceMissingRequiredFiled(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.MISSING_DATASOURCE_NAME); - } - if (dataSourceCollection.containsKey(name)) { - dataSourceCollection.remove(name); - } else { - throw new DataSourceDoesNotExist(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.DATASOURCE_NOT_EXIST); - } - } catch (DataSourceMissingRequiredFiled e) { - LOGGER.error(e.getMessage()); - } catch (DataSourceDoesNotExist e) { - LOGGER.error(e.getMessage()); + public void deleteDataSource(String name) throws DataSourceMissingRequiredFiled, DataSourceDoesNotExist { + + if (name == null) { + throw new DataSourceMissingRequiredFiled(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.MISSING_DATASOURCE_NAME); + } + if (dataSourceCollection.containsKey(name)) { + dataSourceCollection.remove(name); + } else { + throw new DataSourceDoesNotExist(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.DATASOURCE_NOT_EXIST); } + } /** * updates the existing datasource in the Hashmap - * @param name String containing the name of the datasource to be updated + * + * @param name String containing the name of the datasource to be updated * @param newDataSource DataSourceInfo object with updated values - * TODO: add db related operations + * TODO: add db related operations */ - public void updateDataSource(String name, DataSourceInfo newDataSource) { - try { - if (dataSourceCollection.containsKey(name)) { - dataSourceCollection.remove(name); - addDataSource(newDataSource); - } else { - throw new DataSourceDoesNotExist(name + ": " + KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.DATASOURCE_NOT_EXIST); - } - } catch (DataSourceDoesNotExist e) { - LOGGER.error(e.getMessage()); + public void updateDataSource(String name, DataSourceInfo newDataSource) throws DataSourceDoesNotExist, UnsupportedDataSourceProvider, DataSourceNotServiceable, DataSourceAlreadyExist, IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { + + if (dataSourceCollection.containsKey(name)) { + dataSourceCollection.remove(name); + addDataSource(newDataSource); + } else { + throw new DataSourceDoesNotExist(name + ": " + KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.DATASOURCE_NOT_EXIST); } + } } diff --git a/src/main/java/com/autotune/common/datasource/DataSourceMetadataOperator.java b/src/main/java/com/autotune/common/datasource/DataSourceMetadataOperator.java index 99c663402..5c22e9001 100644 --- a/src/main/java/com/autotune/common/datasource/DataSourceMetadataOperator.java +++ b/src/main/java/com/autotune/common/datasource/DataSourceMetadataOperator.java @@ -55,7 +55,7 @@ public static DataSourceMetadataOperator getInstance() { * @param startTime Get metadata from starttime to endtime * @param endTime Get metadata from starttime to endtime * @param steps the interval between data points in a range query - * TODO - support multiple data sources + * TODO - support multiple data sources */ public DataSourceMetadataInfo createDataSourceMetadata(DataSourceInfo dataSourceInfo, String uniqueKey, long startTime, long endTime, int steps) throws Exception { return processQueriesAndPopulateDataSourceMetadataInfo(dataSourceInfo, uniqueKey, startTime, endTime, steps); @@ -97,8 +97,8 @@ public DataSourceMetadataInfo getDataSourceMetadataInfo(DataSourceInfo dataSourc * @param dataSourceInfo The DataSourceInfo object containing information about the * data source to be updated. *

- * TODO - Currently Create and Update functions have identical functionalities, based on UI workflow and requirements - * need to further enhance updateDataSourceMetadata() to support namespace, workload level granular updates + * TODO - Currently Create and Update functions have identical functionalities, based on UI workflow and requirements + * need to further enhance updateDataSourceMetadata() to support namespace, workload level granular updates */ public DataSourceMetadataInfo updateDataSourceMetadata(DataSourceInfo dataSourceInfo, String uniqueKey, long startTime, long endTime, int steps) throws Exception { return processQueriesAndPopulateDataSourceMetadataInfo(dataSourceInfo, uniqueKey, startTime, endTime, steps); @@ -180,56 +180,56 @@ public DataSourceMetadataInfo processQueriesAndPopulateDataSourceMetadataInfo(Da JsonArray namespacesDataResultArray = op.getResultArrayForQuery(dataSourceInfo, namespaceQuery); if (!op.validateResultArray(namespacesDataResultArray)) { dataSourceMetadataInfo = dataSourceDetailsHelper.createDataSourceMetadataInfoObject(dataSourceName, null); - throw new Exception(KruizeConstants.DataSourceConstants.DataSourceMetadataErrorMsgs.NAMESPACE_QUERY_VALIDATION_FAILED); - } - - /** - * Key: Name of namespace - * Value: DataSourceNamespace object corresponding to a namespace - */ - HashMap datasourceNamespaces = dataSourceDetailsHelper.getActiveNamespaces(namespacesDataResultArray); - dataSourceMetadataInfo = dataSourceDetailsHelper.createDataSourceMetadataInfoObject(dataSourceName, datasourceNamespaces); - - /** - * Outer map: - * Key: Name of namespace - *

- * Inner map: - * Key: Name of workload - * Value: DataSourceWorkload object matching the name - * TODO - get workload metadata for a given namespace - */ - HashMap> datasourceWorkloads = new HashMap<>(); - JsonArray workloadDataResultArray = op.getResultArrayForQuery(dataSourceInfo, - workloadQuery); - - if (op.validateResultArray(workloadDataResultArray)) { - datasourceWorkloads = dataSourceDetailsHelper.getWorkloadInfo(workloadDataResultArray); - } - dataSourceDetailsHelper.updateWorkloadDataSourceMetadataInfoObject(dataSourceName, dataSourceMetadataInfo, - datasourceWorkloads); - - /** - * Outer map: - * Key: Name of workload - *

- * Inner map: - * Key: Name of container - * Value: DataSourceContainer object matching the name - * TODO - get container metadata for a given workload - */ - HashMap> datasourceContainers = new HashMap<>(); - JsonArray containerDataResultArray = op.getResultArrayForQuery(dataSourceInfo, - containerQuery); - LOGGER.debug("containerDataResultArray: {}", containerDataResultArray); - - if (op.validateResultArray(containerDataResultArray)) { - datasourceContainers = dataSourceDetailsHelper.getContainerInfo(containerDataResultArray); + } else { + /** + * Key: Name of namespace + * Value: DataSourceNamespace object corresponding to a namespace + */ + HashMap datasourceNamespaces = dataSourceDetailsHelper.getActiveNamespaces(namespacesDataResultArray); + dataSourceMetadataInfo = dataSourceDetailsHelper.createDataSourceMetadataInfoObject(dataSourceName, datasourceNamespaces); + + /** + * Outer map: + * Key: Name of namespace + *

+ * Inner map: + * Key: Name of workload + * Value: DataSourceWorkload object matching the name + * TODO - get workload metadata for a given namespace + */ + HashMap> datasourceWorkloads = new HashMap<>(); + JsonArray workloadDataResultArray = op.getResultArrayForQuery(dataSourceInfo, + workloadQuery); + + if (op.validateResultArray(workloadDataResultArray)) { + datasourceWorkloads = dataSourceDetailsHelper.getWorkloadInfo(workloadDataResultArray); + } + dataSourceDetailsHelper.updateWorkloadDataSourceMetadataInfoObject(dataSourceName, dataSourceMetadataInfo, + datasourceWorkloads); + + /** + * Outer map: + * Key: Name of workload + *

+ * Inner map: + * Key: Name of container + * Value: DataSourceContainer object matching the name + * TODO - get container metadata for a given workload + */ + HashMap> datasourceContainers = new HashMap<>(); + JsonArray containerDataResultArray = op.getResultArrayForQuery(dataSourceInfo, + containerQuery); + LOGGER.debug("containerDataResultArray: {}", containerDataResultArray); + + if (op.validateResultArray(containerDataResultArray)) { + datasourceContainers = dataSourceDetailsHelper.getContainerInfo(containerDataResultArray); + } + dataSourceDetailsHelper.updateContainerDataSourceMetadataInfoObject(dataSourceName, dataSourceMetadataInfo, + datasourceWorkloads, datasourceContainers); + return getDataSourceMetadataInfo(dataSourceInfo); } - dataSourceDetailsHelper.updateContainerDataSourceMetadataInfoObject(dataSourceName, dataSourceMetadataInfo, - datasourceWorkloads, datasourceContainers); - return getDataSourceMetadataInfo(dataSourceInfo); + return null; } catch (Exception e) { LOGGER.error(e.getMessage()); throw e; diff --git a/src/main/java/com/autotune/common/datasource/DataSourceOperator.java b/src/main/java/com/autotune/common/datasource/DataSourceOperator.java index f4fa77f86..a0266c95e 100644 --- a/src/main/java/com/autotune/common/datasource/DataSourceOperator.java +++ b/src/main/java/com/autotune/common/datasource/DataSourceOperator.java @@ -15,30 +15,38 @@ *******************************************************************************/ package com.autotune.common.datasource; +import com.autotune.analyzer.exceptions.FetchMetricsError; import com.autotune.common.utils.CommonUtils; import com.google.gson.JsonArray; import org.json.JSONObject; +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; + /** * DataSourceOperator is an abstraction which has a generic and implementation, * and it can also be implemented by each data source provider type. - * + *

* Currently Supported Implementations: - * - Prometheus - * - * The Implementation should have helper functions to perform operations related - * to datasource + * - Prometheus + *

+ * The Implementation should have helper functions to perform operations related + * to datasource */ public interface DataSourceOperator { /** * Returns the default service port for provider + * * @return String containing the port number */ String getDefaultServicePortForProvider(); /** * Returns the instance of specific operator class based on provider type + * * @param provider String containing the name of provider * @return instance of specific operator */ @@ -51,7 +59,7 @@ public interface DataSourceOperator { * @param dataSource DatasourceInfo object containing the datasource details * @return DatasourceReachabilityStatus */ - CommonUtils.DatasourceReachabilityStatus isServiceable(DataSourceInfo dataSource); + CommonUtils.DatasourceReachabilityStatus isServiceable(DataSourceInfo dataSource) throws FetchMetricsError, Exception; /** * executes specified query on datasource and returns the result value @@ -60,7 +68,7 @@ public interface DataSourceOperator { * @param query String containing the query to be executed * @return Object containing the result value for the specified query */ - Object getValueForQuery(DataSourceInfo dataSource, String query); + Object getValueForQuery(DataSourceInfo dataSource, String query) throws FetchMetricsError, Exception; /** * executes specified query on datasource and returns the JSON Object @@ -69,7 +77,7 @@ public interface DataSourceOperator { * @param query String containing the query to be executed * @return JSONObject for the specified query */ - JSONObject getJsonObjectForQuery(DataSourceInfo dataSource, String query); + JSONObject getJsonObjectForQuery(DataSourceInfo dataSource, String query) throws Exception, FetchMetricsError; /** * executes specified query on datasource and returns the result array @@ -78,7 +86,7 @@ public interface DataSourceOperator { * @param query String containing the query to be executed * @return JsonArray containing the result array for the specified query */ - public JsonArray getResultArrayForQuery(DataSourceInfo dataSource, String query); + public JsonArray getResultArrayForQuery(DataSourceInfo dataSource, String query) throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException; /** * Validates a JSON array to ensure it is not null, not a JSON null, and has at least one element. @@ -90,6 +98,7 @@ public interface DataSourceOperator { /** * returns query endpoint for datasource + * * @return String containing query endpoint */ String getQueryEndpoint(); diff --git a/src/main/java/com/autotune/common/datasource/DataSourceOperatorImpl.java b/src/main/java/com/autotune/common/datasource/DataSourceOperatorImpl.java index 5404a9d4b..6ff6a3d2b 100644 --- a/src/main/java/com/autotune/common/datasource/DataSourceOperatorImpl.java +++ b/src/main/java/com/autotune/common/datasource/DataSourceOperatorImpl.java @@ -4,9 +4,6 @@ import com.autotune.analyzer.exceptions.MonitoringAgentNotFoundException; import com.autotune.analyzer.exceptions.TooManyRecursiveCallsException; import com.autotune.analyzer.utils.AnalyzerConstants; -import com.autotune.common.auth.AuthenticationConfig; -import com.autotune.common.auth.AuthenticationStrategy; -import com.autotune.common.auth.AuthenticationStrategyFactory; import com.autotune.common.datasource.prometheus.PrometheusDataOperatorImpl; import com.autotune.common.exceptions.datasource.ServiceNotFound; import com.autotune.common.target.kubernetes.service.KubernetesServices; @@ -20,6 +17,7 @@ import org.json.JSONArray; import org.json.JSONObject; import org.slf4j.LoggerFactory; + import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; @@ -35,11 +33,13 @@ public class DataSourceOperatorImpl implements DataSourceOperator { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(DataSourceOperatorImpl.class); private static DataSourceOperatorImpl dataSourceOperator = null; + protected DataSourceOperatorImpl() { } /** * Returns the instance of DataSourceOperatorImpl class + * * @return DataSourceOperatorImpl instance */ public static DataSourceOperatorImpl getInstance() { @@ -49,8 +49,119 @@ public static DataSourceOperatorImpl getInstance() { return dataSourceOperator; } + /** + * TODO: monitoring agent will be replaced by default datasource later + * returns DataSourceInfo objects for default datasource which is currently monitoring agent + * + * @return DataSourceInfo objects + */ + public static DataSourceInfo getMonitoringAgent(String dataSource) throws MonitoringAgentNotFoundException, MalformedURLException { + String monitoringAgentEndpoint; + DataSourceInfo monitoringAgent = null; + + if (dataSource.toLowerCase().equals(KruizeDeploymentInfo.monitoring_agent)) { + monitoringAgentEndpoint = KruizeDeploymentInfo.monitoring_agent_endpoint; + // Monitoring agent endpoint not set in the configmap + if (monitoringAgentEndpoint == null || monitoringAgentEndpoint.isEmpty()) { + monitoringAgentEndpoint = getServiceEndpoint(KruizeDeploymentInfo.monitoring_service); + } + if (dataSource.equals(AnalyzerConstants.PROMETHEUS_DATA_SOURCE)) { + monitoringAgent = new DataSourceInfo(KruizeDeploymentInfo.monitoring_agent, AnalyzerConstants.PROMETHEUS_DATA_SOURCE, null, null, new URL(monitoringAgentEndpoint)); + } + } + + if (monitoringAgent == null) { + LOGGER.error("Datasource " + dataSource + " not supported"); + } + + return monitoringAgent; + } + + /** + * TODO: To find a suitable place for this function later + * Gets the service endpoint for the datasource service through the cluster IP + * of the service. + * + * @return Endpoint of the service. + * @throws ServiceNotFound + */ + private static String getServiceEndpoint(String serviceName) { + //No endpoint was provided in the configmap, find the endpoint from the service. + KubernetesServices kubernetesServices = new KubernetesServicesImpl(); + List serviceList = kubernetesServices.getServicelist(null); + kubernetesServices.shutdownClient(); + String serviceEndpoint = null; + + try { + if (serviceName == null) { + throw new ServiceNotFound(); + } + + for (Service service : serviceList) { + String name = service.getMetadata().getName(); + if (name.toLowerCase().equals(serviceName)) { + String clusterIP = service.getSpec().getClusterIP(); + int port = service.getSpec().getPorts().get(0).getPort(); + LOGGER.debug(KruizeDeploymentInfo.cluster_type); + if (KruizeDeploymentInfo.k8s_type.equalsIgnoreCase(KruizeConstants.MINIKUBE)) { + serviceEndpoint = AnalyzerConstants.HTTP_PROTOCOL + "://" + clusterIP + ":" + port; + } + if (KruizeDeploymentInfo.k8s_type.equalsIgnoreCase(KruizeConstants.OPENSHIFT)) { + serviceEndpoint = AnalyzerConstants.HTTPS_PROTOCOL + "://" + clusterIP + ":" + port; + } + } + } + } catch (ServiceNotFound e) { + LOGGER.error(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.SERVICE_NOT_FOUND); + } + + if (serviceEndpoint == null) { + LOGGER.error(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.ENDPOINT_NOT_FOUND); + } + + return serviceEndpoint; + } + + /** + * TODO: To find a suitable place for this function later + * + * @param jsonObj The JSON that needs to be parsed + * @param key The key to search in the JSON + * @param values ArrayList to hold the key values in the JSON + * @param level Level of recursion + */ + static void parseJsonForKey(JSONObject jsonObj, String key, ArrayList values, int level) throws TooManyRecursiveCallsException { + level += 1; + + if (level > 30) + throw new TooManyRecursiveCallsException(); + + for (String keyStr : jsonObj.keySet()) { + Object keyvalue = jsonObj.get(keyStr); + + if (keyStr.equals(key)) + values.add(keyvalue.toString()); + + //for nested objects + if (keyvalue instanceof JSONObject) + parseJsonForKey((JSONObject) keyvalue, key, values, level); + + //for json array, iterate and recursively get values + if (keyvalue instanceof JSONArray) { + JSONArray jsonArray = (JSONArray) keyvalue; + for (int index = 0; index < jsonArray.length(); index++) { + Object jsonObject = jsonArray.get(index); + if (jsonObject instanceof JSONObject) { + parseJsonForKey((JSONObject) jsonObject, key, values, level); + } + } + } + } + } + /** * Returns the instance of specific operator class based on provider type + * * @param provider String containg the name of provider * @return instance of specific operator */ @@ -64,6 +175,7 @@ public DataSourceOperatorImpl getOperator(String provider) { /** * Returns the default service port for prometheus + * * @return String containing the port number */ @Override @@ -79,7 +191,7 @@ public String getDefaultServicePortForProvider() { * @return DatasourceReachabilityStatus */ @Override - public CommonUtils.DatasourceReachabilityStatus isServiceable(DataSourceInfo dataSource) { + public CommonUtils.DatasourceReachabilityStatus isServiceable(DataSourceInfo dataSource) throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { return null; } @@ -91,18 +203,20 @@ public CommonUtils.DatasourceReachabilityStatus isServiceable(DataSourceInfo dat * @return Object containing the result value for the specified query */ @Override - public Object getValueForQuery(DataSourceInfo dataSource, String query) { + public Object getValueForQuery(DataSourceInfo dataSource, String query) throws FetchMetricsError, Exception { return null; } /** * returns query endpoint for datasource + * * @return String containing query endpoint */ @Override public String getQueryEndpoint() { return null; } + /** * executes specified query on datasource and returns the JSON Object * @@ -111,7 +225,7 @@ public String getQueryEndpoint() { * @return JSONObject for the specified query */ @Override - public JSONObject getJsonObjectForQuery(DataSourceInfo dataSource, String query) { + public JSONObject getJsonObjectForQuery(DataSourceInfo dataSource, String query) throws Exception, FetchMetricsError { return null; } @@ -123,7 +237,7 @@ public JSONObject getJsonObjectForQuery(DataSourceInfo dataSource, String query) * @return JsonArray containing the result array for the specified query */ @Override - public JsonArray getResultArrayForQuery(DataSourceInfo dataSource, String query) { + public JsonArray getResultArrayForQuery(DataSourceInfo dataSource, String query) throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { return null; } @@ -134,15 +248,18 @@ public JsonArray getResultArrayForQuery(DataSourceInfo dataSource, String query) * @return True if the JSON array is valid (not null, not a JSON null, and has at least one element), otherwise false. */ @Override - public boolean validateResultArray(JsonArray resultArray) { return false;} + public boolean validateResultArray(JsonArray resultArray) { + return false; + } /** * TODO: To find a suitable place for this function later * returns authentication token for datasource + * * @return String containing token */ public String getToken() throws IOException { - String fileName = KruizeConstants.AUTH_MOUNT_PATH+"token"; + String fileName = KruizeConstants.AUTH_MOUNT_PATH + "token"; String authToken = new String(Files.readAllBytes(Paths.get(fileName))); return authToken; } @@ -150,13 +267,14 @@ public String getToken() throws IOException { /** * TODO: To find a suitable place for this function later * Run the getAppsForLayer and return the list of applications matching the layer. + * * @param dataSource - * @param query getAppsForLayer query for the layer - * @param key The key to search for in the response + * @param query getAppsForLayer query for the layer + * @param key The key to search for in the response * @return ArrayList of all applications from the query * @throws MalformedURLException */ - public ArrayList getAppsForLayer(DataSourceInfo dataSource, String query, String key) throws MalformedURLException { + public ArrayList getAppsForLayer(DataSourceInfo dataSource, String query, String key) throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { String dataSourceURL = dataSource.getUrl().toString(); String provider = dataSource.getProvider(); DataSourceOperator op = this.getOperator(provider); @@ -165,128 +283,19 @@ public ArrayList getAppsForLayer(DataSourceInfo dataSource, String query ArrayList valuesList = new ArrayList<>(); String queryURL = dataSourceURL + queryEndpoint + query; LOGGER.debug("Query URL is: {}", queryURL); - try { - // Create the client - GenericRestApiClient genericRestApiClient = new GenericRestApiClient(dataSource); - genericRestApiClient.setBaseURL(dataSourceURL + queryEndpoint); - JSONObject responseJson = genericRestApiClient.fetchMetricsJson("GET", query); - int level = 0; - try { - parseJsonForKey(responseJson, key, valuesList, level); - LOGGER.debug("Applications for the query: {}", valuesList.toString()); - } catch (TooManyRecursiveCallsException e) { - e.printStackTrace(); - } - } catch (IOException | NoSuchAlgorithmException | KeyStoreException | KeyManagementException | FetchMetricsError e) { - LOGGER.error("Unable to proceed due to invalid connection to URL: "+ queryURL); - } - return valuesList; - } - - /** - * TODO: monitoring agent will be replaced by default datasource later - * returns DataSourceInfo objects for default datasource which is currently monitoring agent - * @return DataSourceInfo objects - */ - public static DataSourceInfo getMonitoringAgent(String dataSource) throws MonitoringAgentNotFoundException, MalformedURLException { - String monitoringAgentEndpoint; - DataSourceInfo monitoringAgent = null; - - if (dataSource.toLowerCase().equals(KruizeDeploymentInfo.monitoring_agent)) { - monitoringAgentEndpoint = KruizeDeploymentInfo.monitoring_agent_endpoint; - // Monitoring agent endpoint not set in the configmap - if (monitoringAgentEndpoint == null || monitoringAgentEndpoint.isEmpty()) { - monitoringAgentEndpoint = getServiceEndpoint(KruizeDeploymentInfo.monitoring_service); - } - if (dataSource.equals(AnalyzerConstants.PROMETHEUS_DATA_SOURCE)) { - monitoringAgent = new DataSourceInfo(KruizeDeploymentInfo.monitoring_agent, AnalyzerConstants.PROMETHEUS_DATA_SOURCE, null, null, new URL(monitoringAgentEndpoint)); - } - } - - if (monitoringAgent == null) { - LOGGER.error("Datasource " + dataSource + " not supported"); - } - - return monitoringAgent; - } - - /** - * TODO: To find a suitable place for this function later - * Gets the service endpoint for the datasource service through the cluster IP - * of the service. - * @return Endpoint of the service. - * @throws ServiceNotFound - */ - private static String getServiceEndpoint(String serviceName) { - //No endpoint was provided in the configmap, find the endpoint from the service. - KubernetesServices kubernetesServices = new KubernetesServicesImpl(); - List serviceList = kubernetesServices.getServicelist(null); - kubernetesServices.shutdownClient(); - String serviceEndpoint = null; + // Create the client + GenericRestApiClient genericRestApiClient = new GenericRestApiClient(dataSource); + genericRestApiClient.setBaseURL(dataSourceURL + queryEndpoint); + JSONObject responseJson = genericRestApiClient.fetchMetricsJson("GET", query); + int level = 0; try { - if (serviceName == null) { - throw new ServiceNotFound(); - } - - for (Service service : serviceList) { - String name = service.getMetadata().getName(); - if (name.toLowerCase().equals(serviceName)) { - String clusterIP = service.getSpec().getClusterIP(); - int port = service.getSpec().getPorts().get(0).getPort(); - LOGGER.debug(KruizeDeploymentInfo.cluster_type); - if (KruizeDeploymentInfo.k8s_type.equalsIgnoreCase(KruizeConstants.MINIKUBE)) { - serviceEndpoint = AnalyzerConstants.HTTP_PROTOCOL + "://" + clusterIP + ":" + port; - } - if (KruizeDeploymentInfo.k8s_type.equalsIgnoreCase(KruizeConstants.OPENSHIFT)) { - serviceEndpoint = AnalyzerConstants.HTTPS_PROTOCOL + "://" + clusterIP + ":" + port; - } - } - } - } catch (ServiceNotFound e) { - LOGGER.error(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.SERVICE_NOT_FOUND); + parseJsonForKey(responseJson, key, valuesList, level); + LOGGER.debug("Applications for the query: {}", valuesList.toString()); + } catch (TooManyRecursiveCallsException e) { + e.printStackTrace(); } - if (serviceEndpoint == null) { - LOGGER.error(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.ENDPOINT_NOT_FOUND); - } - - return serviceEndpoint; - } - - /** - * TODO: To find a suitable place for this function later - * @param jsonObj The JSON that needs to be parsed - * @param key The key to search in the JSON - * @param values ArrayList to hold the key values in the JSON - * @param level Level of recursion - */ - static void parseJsonForKey(JSONObject jsonObj, String key, ArrayList values, int level) throws TooManyRecursiveCallsException { - level += 1; - - if (level > 30) - throw new TooManyRecursiveCallsException(); - - for (String keyStr : jsonObj.keySet()) { - Object keyvalue = jsonObj.get(keyStr); - - if (keyStr.equals(key)) - values.add(keyvalue.toString()); - - //for nested objects - if (keyvalue instanceof JSONObject) - parseJsonForKey((JSONObject) keyvalue, key, values, level); - - //for json array, iterate and recursively get values - if (keyvalue instanceof JSONArray) { - JSONArray jsonArray = (JSONArray) keyvalue; - for (int index = 0; index < jsonArray.length(); index++) { - Object jsonObject = jsonArray.get(index); - if (jsonObject instanceof JSONObject) { - parseJsonForKey((JSONObject) jsonObject, key, values, level); - } - } - } - } + return valuesList; } } diff --git a/src/main/java/com/autotune/common/datasource/prometheus/PrometheusDataOperatorImpl.java b/src/main/java/com/autotune/common/datasource/prometheus/PrometheusDataOperatorImpl.java index 56614d24a..9e86a4d50 100644 --- a/src/main/java/com/autotune/common/datasource/prometheus/PrometheusDataOperatorImpl.java +++ b/src/main/java/com/autotune/common/datasource/prometheus/PrometheusDataOperatorImpl.java @@ -15,20 +15,16 @@ *******************************************************************************/ package com.autotune.common.datasource.prometheus; -import com.autotune.analyzer.exceptions.FetchMetricsError; import com.autotune.analyzer.utils.AnalyzerConstants; -import com.autotune.common.auth.AuthenticationStrategy; -import com.autotune.common.auth.AuthenticationStrategyFactory; import com.autotune.common.datasource.DataSourceInfo; import com.autotune.common.datasource.DataSourceOperatorImpl; import com.autotune.common.utils.CommonUtils; import com.autotune.operator.KruizeDeploymentInfo; -import com.autotune.utils.KruizeConstants; import com.autotune.utils.GenericRestApiClient; -import com.google.gson.*; -import org.apache.http.conn.HttpHostConnectException; -import org.json.JSONArray; -import org.json.JSONException; +import com.autotune.utils.KruizeConstants; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; import org.json.JSONObject; import org.slf4j.LoggerFactory; @@ -38,19 +34,22 @@ import java.security.NoSuchAlgorithmException; /** - * PrometheusDataOperatorImpl extends DataSourceOperatorImpl class - * This class provides Prometheus specific implementation for DataSourceOperator functions + * PrometheusDataOperatorImpl extends DataSourceOperatorImpl class + * This class provides Prometheus specific implementation for DataSourceOperator functions */ public class PrometheusDataOperatorImpl extends DataSourceOperatorImpl { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(PrometheusDataOperatorImpl.class); - private static PrometheusDataOperatorImpl prometheusDataOperator = null;; + private static PrometheusDataOperatorImpl prometheusDataOperator = null; + ; + private PrometheusDataOperatorImpl() { super(); } /** * Returns the instance of PrometheusDataOperatorImpl class + * * @return PrometheusDataOperatorImpl instance */ public static PrometheusDataOperatorImpl getInstance() { @@ -62,6 +61,7 @@ public static PrometheusDataOperatorImpl getInstance() { /** * Returns the default service port for prometheus + * * @return String containing the port number */ @Override @@ -80,7 +80,7 @@ public String getDefaultServicePortForProvider() { * @return DatasourceReachabilityStatus */ @Override - public CommonUtils.DatasourceReachabilityStatus isServiceable(DataSourceInfo dataSource) { + public CommonUtils.DatasourceReachabilityStatus isServiceable(DataSourceInfo dataSource) throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { String dataSourceStatus; Object queryResult; @@ -89,13 +89,13 @@ public CommonUtils.DatasourceReachabilityStatus isServiceable(DataSourceInfo dat queryResult = this.getValueForQuery(dataSource, query); - if (queryResult != null){ + if (queryResult != null) { dataSourceStatus = queryResult.toString(); } else { dataSourceStatus = "0"; } - if (dataSourceStatus.equalsIgnoreCase("1")){ + if (dataSourceStatus.equalsIgnoreCase("1")) { reachabilityStatus = CommonUtils.DatasourceReachabilityStatus.REACHABLE; } else { reachabilityStatus = CommonUtils.DatasourceReachabilityStatus.NOT_REACHABLE; @@ -111,23 +111,17 @@ public CommonUtils.DatasourceReachabilityStatus isServiceable(DataSourceInfo dat * @return Object containing the result value for the specified query */ @Override - public Object getValueForQuery(DataSourceInfo dataSource, String query) { - try { - JSONObject jsonObject = getJsonObjectForQuery(dataSource, query); - - if (null == jsonObject) { - return null; - } else { - return "1"; //if it returns HTTP STATUS_OK 200 - } + public Object getValueForQuery(DataSourceInfo dataSource, String query) throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { + JSONObject jsonObject = getJsonObjectForQuery(dataSource, query); - } catch (JSONException e) { - LOGGER.error(e.getMessage()); - } catch (NullPointerException e) { - LOGGER.error(e.getMessage()); + if (null == jsonObject) { + return null; + } else { + return "1"; //if it returns HTTP STATUS_OK 200 } - return null; + + } /** @@ -138,7 +132,7 @@ public Object getValueForQuery(DataSourceInfo dataSource, String query) { * @return JSONObject for the specified query */ @Override - public JSONObject getJsonObjectForQuery(DataSourceInfo dataSource, String query) { + public JSONObject getJsonObjectForQuery(DataSourceInfo dataSource, String query) throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { // Create the client GenericRestApiClient apiClient = new GenericRestApiClient(dataSource); apiClient.setBaseURL(CommonUtils.getBaseDataSourceUrl( @@ -150,10 +144,10 @@ public JSONObject getJsonObjectForQuery(DataSourceInfo dataSource, String query) return null; } - try { - JSONObject jsonObject = apiClient.fetchMetricsJson( - KruizeConstants.HttpConstants.MethodType.GET, - query); + + JSONObject jsonObject = apiClient.fetchMetricsJson( + KruizeConstants.HttpConstants.MethodType.GET, + query); /* TODO need to separate it out this logic form here if (!jsonObject.has(KruizeConstants.DataSourceConstants.DataSourceQueryJSONKeys.STATUS)) return null; @@ -168,26 +162,14 @@ public JSONObject getJsonObjectForQuery(DataSourceInfo dataSource, String query) */ - return jsonObject; - - } catch (HttpHostConnectException e) { - LOGGER.error(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.DATASOURCE_CONNECTION_FAILED); - } catch (IOException e) { - e.printStackTrace(); - } catch (NoSuchAlgorithmException e) { - e.printStackTrace(); - } catch (KeyStoreException e) { - e.printStackTrace(); - } catch (KeyManagementException e) { - e.printStackTrace(); - } catch (FetchMetricsError e) { - e.printStackTrace(); - } - return null; + return jsonObject; + + } /** * returns query endpoint for prometheus datasource + * * @return String containing query endpoint */ @Override @@ -201,45 +183,37 @@ public String getQueryEndpoint() { * @param dataSource DatasourceInfo object containing the datasource details * @param query String containing the query to be executed * @return JsonArray containing the result array for the specified query - * + *

* Example output JsonArray - * [ - * { - * "metric": { - * "__name__": "exampleMetric" - * }, - * "value": [1642612628.987, "1"] - * } + * { + * "metric": { + * "__name__": "exampleMetric" + * }, + * "value": [1642612628.987, "1"] + * } * ] */ @Override - public JsonArray getResultArrayForQuery(DataSourceInfo dataSource, String query) { - try { - JSONObject jsonObject = getJsonObjectForQuery(dataSource, query); + public JsonArray getResultArrayForQuery(DataSourceInfo dataSource, String query) throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { - if (null == jsonObject) { - return null; - } + JSONObject jsonObject = getJsonObjectForQuery(dataSource, query); + if (null == jsonObject) { + return null; + } else { String jsonString = jsonObject.toString(); JsonObject parsedJsonObject = JsonParser.parseString(jsonString).getAsJsonObject(); JsonObject dataObject = parsedJsonObject.get(KruizeConstants.DataSourceConstants.DataSourceQueryJSONKeys.DATA).getAsJsonObject(); - if (dataObject.has(KruizeConstants.DataSourceConstants.DataSourceQueryJSONKeys.RESULT) && dataObject.get(KruizeConstants.DataSourceConstants.DataSourceQueryJSONKeys.RESULT).isJsonArray()) { JsonArray resultArray = dataObject.getAsJsonArray(KruizeConstants.DataSourceConstants.DataSourceQueryJSONKeys.RESULT); - if (null != resultArray) { return resultArray; } } - } catch (JsonParseException e) { - LOGGER.error(e.getMessage()); - throw e; - } catch (NullPointerException e) { - LOGGER.error(e.getMessage()); - throw e; } + return null; } diff --git a/src/main/java/com/autotune/experimentManager/handler/MetricCollectionHandler.java b/src/main/java/com/autotune/experimentManager/handler/MetricCollectionHandler.java index 0aa8c41d7..bb9ea9103 100644 --- a/src/main/java/com/autotune/experimentManager/handler/MetricCollectionHandler.java +++ b/src/main/java/com/autotune/experimentManager/handler/MetricCollectionHandler.java @@ -15,6 +15,7 @@ *******************************************************************************/ package com.autotune.experimentManager.handler; +import com.autotune.analyzer.exceptions.FetchMetricsError; import com.autotune.analyzer.utils.AnalyzerConstants; import com.autotune.common.data.metrics.Metric; import com.autotune.common.data.metrics.MetricResults; @@ -121,8 +122,15 @@ public void execute(ExperimentTrial experimentTrial, TrialDetails trialDetails, if (null == ado) { // TODO: Return an error saying unsupported datasource } - String queryResult = (String) ado.getValueForQuery(experimentTrial.getDatasourceInfoHashMap() - .get(podMetric.getDatasource()), updatedPodQuery); + String queryResult = null; + try { + queryResult = (String) ado.getValueForQuery(experimentTrial.getDatasourceInfoHashMap() + .get(podMetric.getDatasource()), updatedPodQuery); + } catch (FetchMetricsError e) { + throw new RuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } if (null != queryResult && !queryResult.isEmpty() && !queryResult.isBlank()) { try { queryResult = queryResult.trim(); @@ -159,8 +167,15 @@ public void execute(ExperimentTrial experimentTrial, TrialDetails trialDetails, } if (null != updatedContainerQuery) { LOGGER.debug("Updated Query - " + updatedContainerQuery); - String queryResult = (String) ado.getValueForQuery(experimentTrial.getDatasourceInfoHashMap() - .get(containerMetric.getDatasource()), updatedContainerQuery); + String queryResult = null; + try { + queryResult = (String) ado.getValueForQuery(experimentTrial.getDatasourceInfoHashMap() + .get(containerMetric.getDatasource()), updatedContainerQuery); + } catch (FetchMetricsError e) { + throw new RuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } if (null != queryResult && !queryResult.isEmpty() && !queryResult.isBlank()) { try { queryResult = queryResult.trim(); diff --git a/src/main/java/com/autotune/utils/GenericRestApiClient.java b/src/main/java/com/autotune/utils/GenericRestApiClient.java index 99a3e8e13..0d178d219 100644 --- a/src/main/java/com/autotune/utils/GenericRestApiClient.java +++ b/src/main/java/com/autotune/utils/GenericRestApiClient.java @@ -37,9 +37,11 @@ import org.apache.http.impl.client.HttpClients; import org.apache.http.ssl.SSLContexts; import org.apache.http.util.EntityUtils; +import org.json.JSONException; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import javax.net.ssl.SSLContext; import java.io.IOException; import java.net.URLEncoder; @@ -64,6 +66,7 @@ public class GenericRestApiClient { /** * constructor to set the authentication based on the datasourceInfo object + * * @param dataSourceInfo object containing the datasource details */ public GenericRestApiClient(DataSourceInfo dataSourceInfo) { @@ -74,12 +77,13 @@ public GenericRestApiClient(DataSourceInfo dataSourceInfo) { /** * This method appends queryString with baseURL and returns response in JSON using specified authentication. - * @param methodType Http methods like GET,POST,PATCH etc + * + * @param methodType Http methods like GET,POST,PATCH etc * @param queryString * @return Json object which contains API response. * @throws IOException */ - public JSONObject fetchMetricsJson(String methodType, String queryString) throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, FetchMetricsError { + public JSONObject fetchMetricsJson(String methodType, String queryString) throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { String jsonResponse; try (CloseableHttpClient httpclient = setupHttpClient()) { @@ -97,6 +101,8 @@ public JSONObject fetchMetricsJson(String methodType, String queryString) throws // Execute the request jsonResponse = httpclient.execute(httpRequestBase, new StringResponseHandler()); + } catch (IOException e) { + throw e; } return new JSONObject(jsonResponse); } @@ -104,6 +110,7 @@ public JSONObject fetchMetricsJson(String methodType, String queryString) throws /** * Common method to setup SSL context for trust-all certificates. + * * @return CloseableHttpClient */ private CloseableHttpClient setupHttpClient() throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { @@ -115,6 +122,7 @@ private CloseableHttpClient setupHttpClient() throws NoSuchAlgorithmException, K /** * Common method to apply authentication to the HTTP request. + * * @param httpRequestBase the HTTP request (GET, POST, etc.) */ private void applyAuthentication(HttpRequestBase httpRequestBase) { @@ -126,12 +134,13 @@ private void applyAuthentication(HttpRequestBase httpRequestBase) { /** * Method to call the Experiment API (e.g., to create an experiment) using POST request. + * * @param payload JSON payload containing the experiment details * @return API response code * @throws IOException */ - public int callKruizeAPI(String payload) throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, FetchMetricsError { - + public HttpResponseWrapper callKruizeAPI(String payload) throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, FetchMetricsError { + HttpResponseWrapper httpResponseWrapper = null; // Create an HTTP client try (CloseableHttpClient httpclient = setupHttpClient()) { // Prepare the HTTP POST request @@ -150,15 +159,34 @@ public int callKruizeAPI(String payload) throws IOException, NoSuchAlgorithmExce // Get the status code from the response int responseCode = response.getStatusLine().getStatusCode(); LOGGER.debug("Response code: {}", responseCode); - return responseCode; + if (response.getEntity() != null) { + // Convert response entity to string + String responseBody = EntityUtils.toString(response.getEntity(), "UTF-8"); + try { + // Attempt to parse as JSON + JSONObject json = new JSONObject(responseBody); + httpResponseWrapper = new HttpResponseWrapper(responseCode, json); + } catch (JSONException e) { + // If JSON parsing fails, return as plain string + httpResponseWrapper = new HttpResponseWrapper(responseCode, responseBody); + } + } } catch (Exception e) { LOGGER.error("Error occurred while calling Kruize API: {}", e.getMessage()); throw new FetchMetricsError(e.getMessage()); } + } catch (Exception e) { + LOGGER.error("Error occurred while calling Kruize API: {}", e.getMessage()); + throw new FetchMetricsError(e.getMessage()); } + return httpResponseWrapper; } + public void setBaseURL(String baseURL) { + this.baseURL = baseURL; + } + private static class StringResponseHandler implements ResponseHandler { @Override public String handleResponse(HttpResponse response) throws IOException { @@ -174,7 +202,29 @@ public String handleResponse(HttpResponse response) throws IOException { } - public void setBaseURL(String baseURL) { - this.baseURL = baseURL; + public class HttpResponseWrapper { + private int statusCode; + private Object responseBody; + + public HttpResponseWrapper(int statusCode, Object responseBody) { + this.statusCode = statusCode; + this.responseBody = responseBody; + } + + public int getStatusCode() { + return statusCode; + } + + public Object getResponseBody() { + return responseBody; + } + + @Override + public String toString() { + return "HttpResponseWrapper{" + + "statusCode=" + statusCode + + ", responseBody=" + responseBody + + '}'; + } } } From ac82b857bcca27c7d9c8b2c4cfc4394b796da2c3 Mon Sep 17 00:00:00 2001 From: msvinaykumar Date: Fri, 8 Nov 2024 22:51:01 +0530 Subject: [PATCH 2/5] BulkAPI Response Code change Signed-off-by: msvinaykumar --- .../serviceObjects/BulkJobStatus.java | 252 +++++------------- .../analyzer/services/BulkService.java | 14 +- .../analyzer/workerimpl/BulkJobManager.java | 187 +++++++------ .../com/autotune/utils/KruizeConstants.java | 105 ++++++++ 4 files changed, 278 insertions(+), 280 deletions(-) diff --git a/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java b/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java index 1a988808e..24a658610 100644 --- a/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java +++ b/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java @@ -15,78 +15,87 @@ *******************************************************************************/ package com.autotune.analyzer.serviceObjects; +import com.autotune.utils.KruizeConstants; import com.fasterxml.jackson.annotation.JsonFilter; import com.fasterxml.jackson.annotation.JsonProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Instant; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; +import java.util.HashMap; +import java.util.Map; import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.JOB_ID; +import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.NotificationConstants.Status.UNPROCESSED; /** * Bulk API Response payload Object. */ @JsonFilter("jobFilter") public class BulkJobStatus { + private static final Logger LOGGER = LoggerFactory.getLogger(BulkJobStatus.class); @JsonProperty(JOB_ID) private String jobID; private String status; private int total_experiments; private int processed_experiments; - private Data data; @JsonProperty("job_start_time") private String startTime; // Change to String to store formatted time @JsonProperty("job_end_time") private String endTime; // Change to String to store formatted time - private String message; + private Map notifications; + private Map experiments = Collections.synchronizedMap(new HashMap<>()); - public BulkJobStatus(String jobID, String status, Data data, Instant startTime) { + public BulkJobStatus(String jobID, String status, Instant startTime) { this.jobID = jobID; this.status = status; - this.data = data; setStartTime(startTime); } - public String getJobID() { - return jobID; + + // Method to set a notification in the map + public void setNotification(String key, KruizeConstants.KRUIZE_BULK_API.NotificationConstants.Notification notification) { + if (notifications == null) { + notifications = new HashMap<>(); // Initialize if null + } + notifications.put(key, notification); } - public String getStatus() { - return status; + public Map getExperiments() { + return experiments; } - public void setStatus(String status) { - this.status = status; + public void setExperiments(Map experiments) { + this.experiments = experiments; } - public String getStartTime() { - return startTime; + // Method to add a new experiment with "unprocessed" status and null notification + public synchronized Experiment addExperiment(String experimentName) { + Experiment experiment = new Experiment(experimentName); + experiments.put(experimentName, experiment); + return experiment; } - public void setStartTime(Instant startTime) { - this.startTime = formatInstantAsUTCString(startTime); + + public String getStatus() { + return status; } - public void setStartTime(String startTime) { - this.startTime = startTime; + public void setStatus(String status) { + this.status = status; } - public String getEndTime() { - return endTime; + public void setStartTime(Instant startTime) { + this.startTime = formatInstantAsUTCString(startTime); } public void setEndTime(Instant endTime) { this.endTime = formatInstantAsUTCString(endTime); } - public void setEndTime(String endTime) { - this.endTime = endTime; - } - public int getTotal_experiments() { return total_experiments; } @@ -103,14 +112,6 @@ public void setProcessed_experiments(int processed_experiments) { this.processed_experiments = processed_experiments; } - public Data getData() { - return data; - } - - public void setData(Data data) { - this.data = data; - } - // Utility function to format Instant into the required UTC format private String formatInstantAsUTCString(Instant instant) { DateTimeFormatter formatter = DateTimeFormatter @@ -120,183 +121,68 @@ private String formatInstantAsUTCString(Instant instant) { return formatter.format(instant); } - public String getMessage() { - return message; - } - public void setMessage(String message) { - this.message = message; - } + public static class Experiment { + private String name; + private Notification notification; // Empty by default + private Recommendation recommendation; - // Inner class for the data field - public static class Data { - private Experiments experiments; - private Recommendations recommendations; - - public Data(Experiments experiments, Recommendations recommendations) { - this.experiments = experiments; - this.recommendations = recommendations; - } - - public Experiments getExperiments() { - return experiments; + public Experiment(String name) { + this.name = name; + this.notification = null; // Start with null notification + this.recommendation = new Recommendation(UNPROCESSED); // Start with unprocessed status } - public void setExperiments(Experiments experiments) { - this.experiments = experiments; + // Getters and setters + public Recommendation getRecommendation() { + return recommendation; } - public Recommendations getRecommendations() { - return recommendations; - } - - public void setRecommendations(Recommendations recommendations) { - this.recommendations = recommendations; + public void setNotification(Notification notification) { + this.notification = notification; } } - // Inner class for experiments - public static class Experiments { - @JsonProperty("new") - private List newExperiments; - @JsonProperty("updated") - private List updatedExperiments; - @JsonProperty("failed") - private List failedExperiments; - - public Experiments(List newExperiments, List updatedExperiments, List failedExperiments) { - this.newExperiments = newExperiments; - this.updatedExperiments = updatedExperiments; - this.failedExperiments = failedExperiments; - } + public static class Recommendation { + private KruizeConstants.KRUIZE_BULK_API.NotificationConstants.Status status; + private Notification notification; // Notifications can hold multiple entries - public List getNewExperiments() { - return newExperiments; + public Recommendation(KruizeConstants.KRUIZE_BULK_API.NotificationConstants.Status status) { + this.status = status; } - public void setNewExperiments(List newExperiments) { - this.newExperiments = newExperiments; - } + // Getters and setters - public List getUpdatedExperiments() { - return updatedExperiments; + public KruizeConstants.KRUIZE_BULK_API.NotificationConstants.Status getStatus() { + return status; } - public void setUpdatedExperiments(List updatedExperiments) { - this.updatedExperiments = updatedExperiments; + public void setStatus(KruizeConstants.KRUIZE_BULK_API.NotificationConstants.Status status) { + this.status = status; } - public List getFailedExperiments() { - return failedExperiments; + public Notification getNotification() { + return notification; } - public void setFailedExperiments(List failedExperiments) { - this.failedExperiments = failedExperiments; + public void setNotification(Notification notification) { + this.notification = notification; } } - // Inner class for recommendations - public static class Recommendations { - private RecommendationData data; + public static class Notification { + private String type; + private String message; + private int code; - public Recommendations(RecommendationData data) { - this.data = data; - } - - public RecommendationData getData() { - return data; - } + // Constructor, getters, and setters - public void setData(RecommendationData data) { - this.data = data; + public Notification(String type, String message, int code) { + this.type = type; + this.message = message; + this.code = code; } } - // Inner class for recommendation data - public static class RecommendationData { - private List processed = Collections.synchronizedList(new ArrayList<>()); - private List processing = Collections.synchronizedList(new ArrayList<>()); - private List unprocessed = Collections.synchronizedList(new ArrayList<>()); - private List failed = Collections.synchronizedList(new ArrayList<>()); - - public RecommendationData(List processed, List processing, List unprocessed, List failed) { - this.processed = processed; - this.processing = processing; - this.unprocessed = unprocessed; - this.failed = failed; - } - - public List getProcessed() { - return processed; - } - - public synchronized void setProcessed(List processed) { - this.processed = processed; - } - - public List getProcessing() { - return processing; - } - - public synchronized void setProcessing(List processing) { - this.processing = processing; - } - - public List getUnprocessed() { - return unprocessed; - } - - public synchronized void setUnprocessed(List unprocessed) { - this.unprocessed = unprocessed; - } - - public List getFailed() { - return failed; - } - - public synchronized void setFailed(List failed) { - this.failed = failed; - } - - // Move elements from inqueue to progress - public synchronized void moveToProgress(String element) { - if (unprocessed.contains(element)) { - unprocessed.remove(element); - if (!processing.contains(element)) { - processing.add(element); - } - } - } - // Move elements from progress to completed - public synchronized void moveToCompleted(String element) { - if (processing.contains(element)) { - processing.remove(element); - if (!processed.contains(element)) { - processed.add(element); - } - } - } - - // Move elements from progress to failed - public synchronized void moveToFailed(String element) { - if (processing.contains(element)) { - processing.remove(element); - if (!failed.contains(element)) { - failed.add(element); - } - } - } - - // Calculate the percentage of completion - public int completionPercentage() { - int totalTasks = processed.size() + processing.size() + unprocessed.size() + failed.size(); - if (totalTasks == 0) { - return (int) 0.0; - } - return (int) ((processed.size() * 100.0) / totalTasks); - } - - - } } diff --git a/src/main/java/com/autotune/analyzer/services/BulkService.java b/src/main/java/com/autotune/analyzer/services/BulkService.java index dd4bc795f..127b2f483 100644 --- a/src/main/java/com/autotune/analyzer/services/BulkService.java +++ b/src/main/java/com/autotune/analyzer/services/BulkService.java @@ -33,7 +33,6 @@ import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.time.Instant; -import java.util.ArrayList; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -88,7 +87,7 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws Se // Return the JSON representation of the JobStatus object ObjectMapper objectMapper = new ObjectMapper(); if (!verbose) { - filters.addFilter("jobFilter", SimpleBeanPropertyFilter.serializeAllExcept("data")); + filters.addFilter("jobFilter", SimpleBeanPropertyFilter.serializeAllExcept("experiments")); } else { filters.addFilter("jobFilter", SimpleBeanPropertyFilter.serializeAll()); } @@ -121,16 +120,7 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response) // Generate a unique jobID String jobID = UUID.randomUUID().toString(); - BulkJobStatus.Data data = new BulkJobStatus.Data( - new BulkJobStatus.Experiments(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()), - new BulkJobStatus.Recommendations(new BulkJobStatus.RecommendationData( - new ArrayList<>(), - new ArrayList<>(), - new ArrayList<>(), - new ArrayList<>() - )) - ); - jobStatusMap.put(jobID, new BulkJobStatus(jobID, IN_PROGRESS, data, Instant.now())); + jobStatusMap.put(jobID, new BulkJobStatus(jobID, IN_PROGRESS, Instant.now())); // Submit the job to be processed asynchronously executorService.submit(new BulkJobManager(jobID, jobStatusMap, payload)); diff --git a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java index c1d237a0c..3cf261050 100644 --- a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java +++ b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java @@ -31,12 +31,14 @@ import com.autotune.utils.Utils; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.gson.Gson; +import org.apache.http.conn.ConnectTimeoutException; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.HttpURLConnection; +import java.net.SocketTimeoutException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.sql.Timestamp; @@ -52,6 +54,7 @@ import static com.autotune.operator.KruizeDeploymentInfo.bulk_thread_pool_size; import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.*; +import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.NotificationConstants.*; /** @@ -117,113 +120,127 @@ private static Map parseLabelString(String labelString) { @Override public void run() { + BulkJobStatus jobData = jobStatusMap.get(jobID); try { - BulkJobStatus jobData = jobStatusMap.get(jobID); String labelString = getLabels(this.bulkInput.getFilter()); if (null == this.bulkInput.getDatasource()) { this.bulkInput.setDatasource(CREATE_EXPERIMENT_CONFIG_BEAN.getDatasourceName()); } DataSourceMetadataInfo metadataInfo = null; DataSourceManager dataSourceManager = new DataSourceManager(); - DataSourceInfo datasource = CommonUtils.getDataSourceInfo(this.bulkInput.getDatasource()); - JSONObject daterange = processDateRange(this.bulkInput.getTime_range()); - if (null != daterange) - metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, labelString, (Long) daterange.get("start_time"), (Long) daterange.get("end_time"), (Integer) daterange.get("steps")); - else { - metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, labelString, 0, 0, 0); + DataSourceInfo datasource = null; + try { + datasource = CommonUtils.getDataSourceInfo(this.bulkInput.getDatasource()); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + e.printStackTrace(); + jobData.setStatus(FAILED); + jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_BAD_REQUEST), DATASOURCE_NOT_REG_INFO); } - if (null == metadataInfo) { - jobData.setStatus(COMPLETED); - jobData.setMessage(NOTHING); - } else { - Map createExperimentAPIObjectMap = getExperimentMap(labelString, jobData, metadataInfo, datasource); //Todo Store this map in buffer and use it if BulkAPI pods restarts and support experiment_type - jobData.setTotal_experiments(createExperimentAPIObjectMap.size()); - jobData.setProcessed_experiments(0); - if (jobData.getTotal_experiments() > KruizeDeploymentInfo.BULK_API_LIMIT) { - jobStatusMap.get(jobID).setStatus(FAILED); - jobStatusMap.get(jobID).setMessage(String.format(LIMIT_MESSAGE, KruizeDeploymentInfo.BULK_API_LIMIT)); + if (null != datasource) { + JSONObject daterange = processDateRange(this.bulkInput.getTime_range()); + if (null != daterange) + metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, labelString, (Long) daterange.get("start_time"), (Long) daterange.get("end_time"), (Integer) daterange.get("steps")); + else { + metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, labelString, 0, 0, 0); + } + if (null == metadataInfo) { + jobData.setStatus(COMPLETED); + jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_OK), NOTHING_INFO); } else { - ExecutorService createExecutor = Executors.newFixedThreadPool(bulk_thread_pool_size); - ExecutorService generateExecutor = Executors.newFixedThreadPool(bulk_thread_pool_size); - for (CreateExperimentAPIObject apiObject : createExperimentAPIObjectMap.values()) { - createExecutor.submit(() -> { + Map createExperimentAPIObjectMap = getExperimentMap(labelString, jobData, metadataInfo, datasource); //Todo Store this map in buffer and use it if BulkAPI pods restarts and support experiment_type + jobData.setTotal_experiments(createExperimentAPIObjectMap.size()); + jobData.setProcessed_experiments(0); + if (jobData.getTotal_experiments() > KruizeDeploymentInfo.BULK_API_LIMIT) { + jobData.setStatus(FAILED); + jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_BAD_REQUEST), LIMIT_INFO); + } else { + ExecutorService createExecutor = Executors.newFixedThreadPool(bulk_thread_pool_size); + ExecutorService generateExecutor = Executors.newFixedThreadPool(bulk_thread_pool_size); + for (CreateExperimentAPIObject apiObject : createExperimentAPIObjectMap.values()) { String experiment_name = apiObject.getExperimentName(); - BulkJobStatus.Experiments newExperiments = jobData.getData().getExperiments(); - BulkJobStatus.RecommendationData recommendationData = jobData.getData().getRecommendations().getData(); - try { - // send request to createExperiment API for experiment creation - GenericRestApiClient apiClient = new GenericRestApiClient(datasource); - apiClient.setBaseURL(KruizeDeploymentInfo.experiments_url); - int responseCode; - boolean expriment_exists = false; + BulkJobStatus.Experiment experiment = jobData.addExperiment(experiment_name); + DataSourceInfo finalDatasource = datasource; + createExecutor.submit(() -> { try { - responseCode = apiClient.callKruizeAPI("[" + new Gson().toJson(apiObject) + "]"); - LOGGER.debug("API Response code: {}", responseCode); - if (responseCode == HttpURLConnection.HTTP_CREATED) { - newExperiments.setNewExperiments( - appendExperiments(newExperiments.getNewExperiments(), experiment_name)); - expriment_exists = true; - } else if (responseCode == HttpURLConnection.HTTP_CONFLICT) { - expriment_exists = true; - } else { - newExperiments.setFailedExperiments( - appendExperiments(newExperiments.getFailedExperiments(), experiment_name)); + // send request to createExperiment API for experiment creation + GenericRestApiClient apiClient = new GenericRestApiClient(finalDatasource); + apiClient.setBaseURL(KruizeDeploymentInfo.experiments_url); + GenericRestApiClient.HttpResponseWrapper responseCode; + boolean expriment_exists = false; + try { + responseCode = apiClient.callKruizeAPI("[" + new Gson().toJson(apiObject) + "]"); + LOGGER.debug("API Response code: {}", responseCode); + if (responseCode.getStatusCode() == HttpURLConnection.HTTP_CREATED) { + expriment_exists = true; + } else if (responseCode.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { + expriment_exists = true; + } else { + jobData.setProcessed_experiments(jobData.getProcessed_experiments() + 1); + experiment.setNotification(new BulkJobStatus.Notification(ERROR, responseCode.getResponseBody().toString(), responseCode.getStatusCode())); + } + } catch (FetchMetricsError e) { + e.printStackTrace(); jobData.setProcessed_experiments(jobData.getProcessed_experiments() + 1); + experiment.setNotification(new BulkJobStatus.Notification(ERROR, e.getMessage(), HttpURLConnection.HTTP_BAD_REQUEST)); } - } catch (FetchMetricsError e) { - e.printStackTrace(); - newExperiments.setFailedExperiments( - appendExperiments(newExperiments.getFailedExperiments(), experiment_name)); - jobData.setProcessed_experiments(jobData.getProcessed_experiments() + 1); - } - - if (expriment_exists) { - recommendationData.setUnprocessed( - appendExperiments(recommendationData.getUnprocessed(), experiment_name) - ); - generateExecutor.submit(() -> { - - // send request to generateRecommendations API - GenericRestApiClient recommendationApiClient = new GenericRestApiClient(datasource); - String encodedExperimentName; - encodedExperimentName = URLEncoder.encode(experiment_name, StandardCharsets.UTF_8); - recommendationApiClient.setBaseURL(String.format(KruizeDeploymentInfo.recommendations_url, encodedExperimentName)); - int recommendationResponseCode = 0; - try { - recommendationData.moveToProgress(experiment_name); - recommendationResponseCode = recommendationApiClient.callKruizeAPI(null); - LOGGER.debug("API Response code: {}", recommendationResponseCode); - } catch (Exception | FetchMetricsError e) { - e.printStackTrace(); - } - if (recommendationResponseCode == HttpURLConnection.HTTP_CREATED) { - recommendationData.moveToCompleted(experiment_name); - jobData.setProcessed_experiments(jobData.getProcessed_experiments() + 1); - if (jobData.getTotal_experiments() == jobData.getProcessed_experiments()) { - jobData.setStatus(COMPLETED); - jobStatusMap.get(jobID).setEndTime(Instant.now()); + if (expriment_exists) { + generateExecutor.submit(() -> { + // send request to generateRecommendations API + GenericRestApiClient recommendationApiClient = new GenericRestApiClient(finalDatasource); + String encodedExperimentName; + encodedExperimentName = URLEncoder.encode(experiment_name, StandardCharsets.UTF_8); + recommendationApiClient.setBaseURL(String.format(KruizeDeploymentInfo.recommendations_url, encodedExperimentName)); + GenericRestApiClient.HttpResponseWrapper recommendationResponseCode = null; + try { + recommendationResponseCode = recommendationApiClient.callKruizeAPI(null); + LOGGER.debug("API Response code: {}", recommendationResponseCode); + if (recommendationResponseCode.getStatusCode() == HttpURLConnection.HTTP_CREATED) { + jobData.setProcessed_experiments(jobData.getProcessed_experiments() + 1); + if (jobData.getTotal_experiments() == jobData.getProcessed_experiments()) { + jobData.setStatus(COMPLETED); + jobData.setEndTime(Instant.now()); + } + experiment.getRecommendation().setStatus(NotificationConstants.Status.PROCESSED); + } else { + experiment.getRecommendation().setStatus(NotificationConstants.Status.FAILED); + experiment.setNotification(new BulkJobStatus.Notification(ERROR, recommendationResponseCode.getResponseBody().toString(), recommendationResponseCode.getStatusCode())); + } + } catch (Exception | FetchMetricsError e) { + e.printStackTrace(); + experiment.getRecommendation().setStatus(NotificationConstants.Status.FAILED); + experiment.getRecommendation().setNotification(new BulkJobStatus.Notification(ERROR, e.getMessage(), HttpURLConnection.HTTP_BAD_REQUEST)); } - - } else { - recommendationData.moveToFailed(experiment_name); - } - }); + }); + } + } catch (Exception e) { + e.printStackTrace(); + experiment.setNotification(new BulkJobStatus.Notification(ERROR, e.getMessage(), HttpURLConnection.HTTP_INTERNAL_ERROR)); } - - } catch (Exception e) { - e.printStackTrace(); - recommendationData.moveToFailed(experiment_name); - } - }); + }); + } } } } + } catch (IOException e) { + LOGGER.error(e.getMessage()); + jobData.setStatus("FAILED"); + jobData.setEndTime(Instant.now()); + + if (e instanceof SocketTimeoutException) { + jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_GATEWAY_TIMEOUT), DATASOURCE_GATEWAY_TIMEOUT_INFO); + } else if (e instanceof ConnectTimeoutException) { + jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_UNAVAILABLE), DATASOURCE_CONNECT_TIMEOUT_INFO); + } else { + jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_UNAVAILABLE), DATASOURCE_DOWN_INFO); + } } catch (Exception e) { LOGGER.error(e.getMessage()); e.printStackTrace(); - jobStatusMap.get(jobID).setStatus("FAILED"); - jobStatusMap.get(jobID).setMessage(e.getMessage()); + jobData.setStatus("FAILED"); + jobData.setEndTime(Instant.now()); + jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_INTERNAL_ERROR), new NotificationConstants.Notification(NotificationConstants.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_INTERNAL_ERROR)); } } diff --git a/src/main/java/com/autotune/utils/KruizeConstants.java b/src/main/java/com/autotune/utils/KruizeConstants.java index 0eb900aab..7c8560d88 100644 --- a/src/main/java/com/autotune/utils/KruizeConstants.java +++ b/src/main/java/com/autotune/utils/KruizeConstants.java @@ -795,5 +795,110 @@ public static final class KRUIZE_BULK_API { CREATE_EXPERIMENT_CONFIG_BEAN.setMeasurementDurationStr("15min"); CREATE_EXPERIMENT_CONFIG_BEAN.setMeasurementDuration(15); } + + public static final class NotificationConstants { + + public static final Notification FETCH_METRIC_FAILURE = new Notification( + NotificationType.ERROR, + "Not able to fetch metrics", + 400 + ); + public static final Notification DATASOURCE_NOT_REG_INFO = new Notification( + NotificationType.ERROR, + "Datasource not registered with Kruize.", + 400 + ); + public static final Notification DATASOURCE_DOWN_INFO = new Notification( + NotificationType.ERROR, + "HttpHostConnectException: Unable to connect to the data source. Please try again later.", + 503 + ); + public static final Notification DATASOURCE_GATEWAY_TIMEOUT_INFO = new Notification( + NotificationType.ERROR, + "SocketTimeoutException: request timed out waiting for a data source response", + 504 + ); + public static final Notification DATASOURCE_CONNECT_TIMEOUT_INFO = new Notification( + NotificationType.ERROR, + "ConnectTimeoutException: cannot establish a data source connection in a given time frame due to connectivity issues", + 503 + ); + public static final Notification JOB_NOT_FOUND_INFO = new Notification( + NotificationType.WARNING, + JOB_NOT_FOUND_MSG, + 404 + ); + public static final Notification LIMIT_INFO = new Notification( + NotificationType.INFO, + LIMIT_MESSAGE, + 400 + ); + + public static final Notification NOTHING_INFO = new Notification( + NotificationType.INFO, + NOTHING, + 400 + ); + + public enum NotificationType { + ERROR("error"), + WARNING("warning"), + INFO("info"); + + private final String type; + + NotificationType(String type) { + this.type = type; + } + + public String getType() { + return type; + } + } + + // More notification constants can be added here as needed + + public enum Status { + PROCESSED("PROCESSED"), + UNPROCESSED("UNPROCESSED"), + PROCESSING("PROCESSING"), + FAILED("FAILED"); + + private final String status; + + Status(String status) { + this.status = status; + } + + public String getStatus() { + return status; + } + } + + // Notification class representing each constant's details + public static class Notification { + private final NotificationType type; + private final String message; + private final int code; + + public Notification(NotificationType type, String message, int code) { + this.type = type; + this.message = message; + this.code = code; + } + + public NotificationType getType() { + return type; + } + + public String getMessage() { + return message; + } + + public int getCode() { + return code; + } + } + } } } From 0a21e975a591d5884aabd7e8eb40b0731e9c1387 Mon Sep 17 00:00:00 2001 From: msvinaykumar Date: Sun, 10 Nov 2024 14:25:22 +0530 Subject: [PATCH 3/5] Error handling Signed-off-by: msvinaykumar --- .../engine/RecommendationEngine.java | 140 +++++++++--------- .../serviceObjects/BulkJobStatus.java | 49 ++++-- .../analyzer/services/BulkService.java | 65 ++++---- .../analyzer/workerimpl/BulkJobManager.java | 1 + 4 files changed, 151 insertions(+), 104 deletions(-) diff --git a/src/main/java/com/autotune/analyzer/recommendations/engine/RecommendationEngine.java b/src/main/java/com/autotune/analyzer/recommendations/engine/RecommendationEngine.java index f157550a4..1e9241333 100644 --- a/src/main/java/com/autotune/analyzer/recommendations/engine/RecommendationEngine.java +++ b/src/main/java/com/autotune/analyzer/recommendations/engine/RecommendationEngine.java @@ -35,7 +35,10 @@ import com.autotune.utils.KruizeConstants; import com.autotune.utils.MetricsConfig; import com.autotune.utils.Utils; -import com.google.gson.*; +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; import io.micrometer.core.instrument.Timer; import org.json.JSONObject; import org.slf4j.Logger; @@ -100,6 +103,26 @@ private static int getNumPods(Map filteredResultsMap private static void getPromQls(Map promQls) { } + /** + * Calculates the number of pods for a namespace based on the provided results map. + * + * @param filteredResultsMap A map containing timestamp as keys and contains metric results for the corresponding timestamp. + * @return int maximum number of pods observed across all timestamps in the filtered results map. + */ + private static int getNumPodsForNamespace(Map filteredResultsMap) { + LOGGER.debug("Size of Filter Map: {}", filteredResultsMap.size()); + Double max_pods_cpu = filteredResultsMap.values() + .stream() + .map(e -> { + Optional numPodsResults = Optional.ofNullable(e.getMetricResultsMap().get(AnalyzerConstants.MetricName.namespaceTotalPods)); + double numPods = numPodsResults.map(m -> m.getAggregationInfoResult().getAvg()).orElse(0.0); + return numPods; + }) + .max(Double::compareTo).get(); + + return (int) Math.ceil(max_pods_cpu); + } + private void init() { // Add new models recommendationModels = new ArrayList<>(); @@ -236,7 +259,7 @@ public String validate_local() { //TODO Instead of relying on the 'lo * @param calCount The count of incoming requests. * @return The KruizeObject containing the prepared recommendations. */ - public KruizeObject prepareRecommendations(int calCount) throws FetchMetricsError{ + public KruizeObject prepareRecommendations(int calCount) throws FetchMetricsError { Map mainKruizeExperimentMAP = new ConcurrentHashMap<>(); Map terms = new HashMap<>(); ValidationOutputData validationOutputData; @@ -311,6 +334,7 @@ public KruizeObject prepareRecommendations(int calCount) throws FetchMetricsErro /** * Generates recommendations for the specified KruizeObject + * * @param kruizeObject The KruizeObject containing experiment data */ public void generateRecommendations(KruizeObject kruizeObject) { @@ -322,7 +346,7 @@ public void generateRecommendations(KruizeObject kruizeObject) { NamespaceData namespaceData = k8sObject.getNamespaceData(); LOGGER.info("Generating recommendations for namespace: {}", namespaceName); generateRecommendationsBasedOnNamespace(namespaceData, kruizeObject); - } else if (kruizeObject.isContainerExperiment()){ + } else if (kruizeObject.isContainerExperiment()) { for (String containerName : k8sObject.getContainerDataMap().keySet()) { ContainerData containerData = k8sObject.getContainerDataMap().get(containerName); @@ -740,25 +764,6 @@ private MappedRecommendationForModel generateRecommendationBasedOnModel(Timestam return mappedRecommendationForModel; } - /** - * Calculates the number of pods for a namespace based on the provided results map. - * @param filteredResultsMap A map containing timestamp as keys and contains metric results for the corresponding timestamp. - * @return int maximum number of pods observed across all timestamps in the filtered results map. - */ - private static int getNumPodsForNamespace(Map filteredResultsMap) { - LOGGER.debug("Size of Filter Map: {}", filteredResultsMap.size()); - Double max_pods_cpu = filteredResultsMap.values() - .stream() - .map(e -> { - Optional numPodsResults = Optional.ofNullable(e.getMetricResultsMap().get(AnalyzerConstants.MetricName.namespaceTotalPods)); - double numPods = numPodsResults.map(m -> m.getAggregationInfoResult().getAvg()).orElse(0.0); - return numPods; - }) - .max(Double::compareTo).get(); - - return (int) Math.ceil(max_pods_cpu); - } - private void generateRecommendationsBasedOnNamespace(NamespaceData namespaceData, KruizeObject kruizeObject) { Timestamp monitoringEndTime = namespaceData.getResults().keySet().stream().max(Timestamp::compareTo).get(); NamespaceRecommendations namespaceRecommendations = namespaceData.getNamespaceRecommendations(); @@ -806,8 +811,8 @@ private void generateRecommendationsBasedOnNamespace(NamespaceData namespaceData } private HashMap> getCurrentNamespaceConfigData(NamespaceData namespaceData, - Timestamp monitoringEndTime, - MappedRecommendationForTimestamp timestampRecommendation) { + Timestamp monitoringEndTime, + MappedRecommendationForTimestamp timestampRecommendation) { HashMap> currentNamespaceConfig = new HashMap<>(); @@ -1096,13 +1101,13 @@ private MappedRecommendationForModel generateNamespaceRecommendationBasedOnModel * DO NOT EDIT THIS METHOD UNLESS THERE ARE ANY CHANGES TO BE ADDED IN VALIDATION OR POPULATION MECHANISM * EDITING THIS METHOD MIGHT LEAD TO UNEXPECTED OUTCOMES IN RECOMMENDATIONS, PLEASE PROCEED WITH CAUTION * - * @param termEntry The entry containing a term key and its associated {@link Terms} object. - * @param recommendationModel The model used to map recommendations. - * @param notifications A list to which recommendation notifications will be added. - * @param internalMapToPopulate The internal map to populate with recommendation configuration items. - * @param numPods The number of pods to consider for the recommendation. - * @param cpuThreshold The CPU usage threshold for the recommendation. - * @param memoryThreshold The memory usage threshold for the recommendation. + * @param termEntry The entry containing a term key and its associated {@link Terms} object. + * @param recommendationModel The model used to map recommendations. + * @param notifications A list to which recommendation notifications will be added. + * @param internalMapToPopulate The internal map to populate with recommendation configuration items. + * @param numPods The number of pods to consider for the recommendation. + * @param cpuThreshold The CPU usage threshold for the recommendation. + * @param memoryThreshold The memory usage threshold for the recommendation. * @param recommendationAcceleratorRequestMap The Map which has Accelerator recommendations * @return {@code true} if the internal map was successfully populated; {@code false} otherwise. */ @@ -1800,10 +1805,10 @@ private String getResults(Map mainKruizeExperimentMAP, Kru /** * Fetches metrics based on the specified datasource using queries from the metricProfile for the given time interval. * - * @param kruizeObject KruizeObject - * @param interval_end_time The end time of the interval in the format yyyy-MM-ddTHH:mm:sssZ - * @param interval_start_time The start time of the interval in the format yyyy-MM-ddTHH:mm:sssZ. - * @param dataSourceInfo DataSource object + * @param kruizeObject KruizeObject + * @param interval_end_time The end time of the interval in the format yyyy-MM-ddTHH:mm:sssZ + * @param interval_start_time The start time of the interval in the format yyyy-MM-ddTHH:mm:sssZ. + * @param dataSourceInfo DataSource object * @throws Exception */ public void fetchMetricsBasedOnProfileAndDatasource(KruizeObject kruizeObject, Timestamp interval_end_time, Timestamp interval_start_time, DataSourceInfo dataSourceInfo) throws Exception, FetchMetricsError { @@ -1839,12 +1844,13 @@ public void fetchMetricsBasedOnProfileAndDatasource(KruizeObject kruizeObject, T /** * Fetches namespace metrics based on the specified datasource using queries from the metricProfile for the given time interval. - * @param kruizeObject KruizeObject - * @param interval_end_time The end time of the interval in the format yyyy-MM-ddTHH:mm:sssZ - * @param interval_start_time The start time of the interval in the format yyyy-MM-ddTHH:mm:sssZ. - * @param dataSourceInfo DataSource object - * @param metricProfile performance profile to be used - * @param maxDateQuery max date query for namespace + * + * @param kruizeObject KruizeObject + * @param interval_end_time The end time of the interval in the format yyyy-MM-ddTHH:mm:sssZ + * @param interval_start_time The start time of the interval in the format yyyy-MM-ddTHH:mm:sssZ. + * @param dataSourceInfo DataSource object + * @param metricProfile performance profile to be used + * @param maxDateQuery max date query for namespace * @throws Exception */ private void fetchNamespaceMetricsBasedOnDataSourceAndProfile(KruizeObject kruizeObject, Timestamp interval_end_time, Timestamp interval_start_time, DataSourceInfo dataSourceInfo, PerformanceProfile metricProfile, String maxDateQuery) throws Exception, FetchMetricsError { @@ -1995,12 +2001,12 @@ private void fetchNamespaceMetricsBasedOnDataSourceAndProfile(KruizeObject kruiz /** * Fetches Container metrics based on the specified datasource using queries from the metricProfile for the given time interval. * - * @param kruizeObject KruizeObject - * @param interval_end_time The end time of the interval in the format yyyy-MM-ddTHH:mm:sssZ - * @param interval_start_time The start time of the interval in the format yyyy-MM-ddTHH:mm:sssZ. - * @param dataSourceInfo DataSource object - * @param metricProfile performance profile to be used - * @param maxDateQuery max date query for containers + * @param kruizeObject KruizeObject + * @param interval_end_time The end time of the interval in the format yyyy-MM-ddTHH:mm:sssZ + * @param interval_start_time The start time of the interval in the format yyyy-MM-ddTHH:mm:sssZ. + * @param dataSourceInfo DataSource object + * @param metricProfile performance profile to be used + * @param maxDateQuery max date query for containers * @throws Exception */ private void fetchContainerMetricsBasedOnDataSourceAndProfile(KruizeObject kruizeObject, @@ -2052,7 +2058,7 @@ private void fetchContainerMetricsBasedOnDataSourceAndProfile(KruizeObject kruiz LOGGER.debug("maxDateQuery: {}", maxDateQuery); - queryToEncode = maxDateQuery + queryToEncode = maxDateQuery .replace(AnalyzerConstants.NAMESPACE_VARIABLE, namespace) .replace(AnalyzerConstants.CONTAINER_VARIABLE, containerName) .replace(AnalyzerConstants.WORKLOAD_VARIABLE, workload) @@ -2062,7 +2068,7 @@ private void fetchContainerMetricsBasedOnDataSourceAndProfile(KruizeObject kruiz dataSourceInfo.getUrl(), URLEncoder.encode(queryToEncode, CHARACTER_ENCODING) ); - LOGGER.info(dateMetricsUrl); + LOGGER.debug(dateMetricsUrl); client.setBaseURL(dateMetricsUrl); JSONObject genericJsonObject = client.fetchMetricsJson(KruizeConstants.APIMessages.GET, ""); JsonObject jsonObject = new Gson().fromJson(genericJsonObject.toString(), JsonObject.class); @@ -2125,7 +2131,7 @@ private void fetchContainerMetricsBasedOnDataSourceAndProfile(KruizeObject kruiz continue; HashMap aggregationFunctions = metricEntry.getAggregationFunctionsMap(); - for (Map.Entry aggregationFunctionsEntry: aggregationFunctions.entrySet()) { + for (Map.Entry aggregationFunctionsEntry : aggregationFunctions.entrySet()) { // Determine promQL query on metric type String promQL = aggregationFunctionsEntry.getValue().getQuery(); @@ -2163,7 +2169,7 @@ private void fetchContainerMetricsBasedOnDataSourceAndProfile(KruizeObject kruiz interval_start_time_epoc, interval_end_time_epoc, measurementDurationMinutesInDouble.intValue() * KruizeConstants.TimeConv.NO_OF_SECONDS_PER_MINUTE); - LOGGER.info(podMetricsUrl); + LOGGER.debug(podMetricsUrl); client.setBaseURL(podMetricsUrl); JSONObject genericJsonObject = client.fetchMetricsJson(KruizeConstants.APIMessages.GET, ""); JsonObject jsonObject = new Gson().fromJson(genericJsonObject.toString(), JsonObject.class); @@ -2174,7 +2180,7 @@ private void fetchContainerMetricsBasedOnDataSourceAndProfile(KruizeObject kruiz continue; // Process fetched metrics - if (isAcceleratorMetric){ + if (isAcceleratorMetric) { for (JsonElement result : resultArray) { JsonObject resultObject = result.getAsJsonObject(); JsonObject metricObject = resultObject.getAsJsonObject(KruizeConstants.JSONKeys.METRIC); @@ -2316,13 +2322,14 @@ private void fetchContainerMetricsBasedOnDataSourceAndProfile(KruizeObject kruiz /** * Fetches max date query for namespace and containers from performance profile - * @param metricProfile performance profile to be used + * + * @param metricProfile performance profile to be used */ private String getMaxDateQuery(PerformanceProfile metricProfile, String metricName) { List metrics = metricProfile.getSloInfo().getFunctionVariables(); - for (Metric metric: metrics) { + for (Metric metric : metrics) { String name = metric.getName(); - if(name.equals(metricName)) { + if (name.equals(metricName)) { return metric.getAggregationFunctionsMap().get("max").getQuery(); } } @@ -2376,6 +2383,7 @@ private void prepareIntervalResults(Map dataResultsM /** * Filters out maxDateQuery and includes metrics based on the experiment type and kubernetes_object + * * @param metricProfile Metric profile to be used * @param maxDateQuery maxDateQuery metric to be filtered out * @param experimentType experiment type @@ -2384,17 +2392,17 @@ public List filterMetricsBasedOnExpTypeAndK8sObject(PerformanceProfile m String namespace = KruizeConstants.JSONKeys.NAMESPACE; String container = KruizeConstants.JSONKeys.CONTAINER; return metricProfile.getSloInfo().getFunctionVariables().stream() - .filter(Metric -> { - String name = Metric.getName(); - String kubernetes_object = Metric.getKubernetesObject(); - - // Include metrics based on experiment_type, kubernetes_object and exclude maxDate metric - return !name.equals(maxDateQuery) && ( - (experimentType.equals(AnalyzerConstants.ExperimentTypes.NAMESPACE_EXPERIMENT) && kubernetes_object.equals(namespace)) || - (experimentType.equals(AnalyzerConstants.ExperimentTypes.CONTAINER_EXPERIMENT) && kubernetes_object.equals(container)) - ); - }) - .toList(); + .filter(Metric -> { + String name = Metric.getName(); + String kubernetes_object = Metric.getKubernetesObject(); + + // Include metrics based on experiment_type, kubernetes_object and exclude maxDate metric + return !name.equals(maxDateQuery) && ( + (experimentType.equals(AnalyzerConstants.ExperimentTypes.NAMESPACE_EXPERIMENT) && kubernetes_object.equals(namespace)) || + (experimentType.equals(AnalyzerConstants.ExperimentTypes.CONTAINER_EXPERIMENT) && kubernetes_object.equals(container)) + ); + }) + .toList(); } } diff --git a/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java b/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java index 24a658610..3537ffce0 100644 --- a/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java +++ b/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java @@ -64,6 +64,46 @@ public void setNotification(String key, KruizeConstants.KRUIZE_BULK_API.Notifica notifications.put(key, notification); } + public String getJobID() { + return jobID; + } + + public void setJobID(String jobID) { + this.jobID = jobID; + } + + public String getStartTime() { + return startTime; + } + + public void setStartTime(String startTime) { + this.startTime = startTime; + } + + public void setStartTime(Instant startTime) { + this.startTime = formatInstantAsUTCString(startTime); + } + + public String getEndTime() { + return endTime; + } + + public void setEndTime(String endTime) { + this.endTime = endTime; + } + + public void setEndTime(Instant endTime) { + this.endTime = formatInstantAsUTCString(endTime); + } + + public Map getNotifications() { + return notifications; + } + + public void setNotifications(Map notifications) { + this.notifications = notifications; + } + public Map getExperiments() { return experiments; } @@ -79,7 +119,6 @@ public synchronized Experiment addExperiment(String experimentName) { return experiment; } - public String getStatus() { return status; } @@ -88,14 +127,6 @@ public void setStatus(String status) { this.status = status; } - public void setStartTime(Instant startTime) { - this.startTime = formatInstantAsUTCString(startTime); - } - - public void setEndTime(Instant endTime) { - this.endTime = formatInstantAsUTCString(endTime); - } - public int getTotal_experiments() { return total_experiments; } diff --git a/src/main/java/com/autotune/analyzer/services/BulkService.java b/src/main/java/com/autotune/analyzer/services/BulkService.java index 127b2f483..3ac40f6f5 100644 --- a/src/main/java/com/autotune/analyzer/services/BulkService.java +++ b/src/main/java/com/autotune/analyzer/services/BulkService.java @@ -65,38 +65,45 @@ public void init(ServletConfig config) throws ServletException { */ @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - String jobID = req.getParameter(JOB_ID); - String verboseParam = req.getParameter(VERBOSE); - // If the parameter is not provided (null), default it to false - boolean verbose = verboseParam != null && Boolean.parseBoolean(verboseParam); - BulkJobStatus jobDetails = jobStatusMap.get(jobID); - resp.setContentType(JSON_CONTENT_TYPE); - resp.setCharacterEncoding(CHARACTER_ENCODING); - SimpleFilterProvider filters = new SimpleFilterProvider(); + try { + String jobID = req.getParameter(JOB_ID); + String verboseParam = req.getParameter(VERBOSE); + // If the parameter is not provided (null), default it to false + boolean verbose = verboseParam != null && Boolean.parseBoolean(verboseParam); + BulkJobStatus jobDetails; + synchronized (jobStatusMap) { + jobDetails = jobStatusMap.get(jobID); + } + resp.setContentType(JSON_CONTENT_TYPE); + resp.setCharacterEncoding(CHARACTER_ENCODING); + SimpleFilterProvider filters = new SimpleFilterProvider(); - if (jobDetails == null) { - sendErrorResponse( - resp, - null, - HttpServletResponse.SC_NOT_FOUND, - JOB_NOT_FOUND_MSG - ); - } else { - try { - resp.setStatus(HttpServletResponse.SC_OK); - // Return the JSON representation of the JobStatus object - ObjectMapper objectMapper = new ObjectMapper(); - if (!verbose) { - filters.addFilter("jobFilter", SimpleBeanPropertyFilter.serializeAllExcept("experiments")); - } else { - filters.addFilter("jobFilter", SimpleBeanPropertyFilter.serializeAll()); + if (jobDetails == null) { + sendErrorResponse( + resp, + null, + HttpServletResponse.SC_NOT_FOUND, + JOB_NOT_FOUND_MSG + ); + } else { + try { + resp.setStatus(HttpServletResponse.SC_OK); + // Return the JSON representation of the JobStatus object + ObjectMapper objectMapper = new ObjectMapper(); + if (!verbose) { + filters.addFilter("jobFilter", SimpleBeanPropertyFilter.serializeAllExcept("experiments")); + } else { + filters.addFilter("jobFilter", SimpleBeanPropertyFilter.serializeAll()); + } + objectMapper.setFilterProvider(filters); + String jsonResponse = objectMapper.writeValueAsString(jobDetails); + resp.getWriter().write(jsonResponse); + } catch (Exception e) { + e.printStackTrace(); } - objectMapper.setFilterProvider(filters); - String jsonResponse = objectMapper.writeValueAsString(jobDetails); - resp.getWriter().write(jsonResponse); - } catch (Exception e) { - e.printStackTrace(); } + } catch (Exception e) { + e.printStackTrace(); } } diff --git a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java index 3cf261050..a2d98fc89 100644 --- a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java +++ b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java @@ -146,6 +146,7 @@ public void run() { } if (null == metadataInfo) { jobData.setStatus(COMPLETED); + jobData.setEndTime(Instant.now()); jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_OK), NOTHING_INFO); } else { Map createExperimentAPIObjectMap = getExperimentMap(labelString, jobData, metadataInfo, datasource); //Todo Store this map in buffer and use it if BulkAPI pods restarts and support experiment_type From f8f6a66112b2b34d27388bf96c7ee30dfc9e7724 Mon Sep 17 00:00:00 2001 From: msvinaykumar Date: Mon, 11 Nov 2024 09:50:54 +0530 Subject: [PATCH 4/5] Exception handler for Bulk API Signed-off-by: msvinaykumar --- .../serviceObjects/BulkJobStatus.java | 52 +++++- .../analyzer/workerimpl/BulkJobManager.java | 16 +- .../common/datasource/DataSourceManager.java | 84 +++++----- .../DataSourceMetadataOperator.java | 157 +++++++++--------- .../autotune/utils/GenericRestApiClient.java | 43 ++++- .../com/autotune/utils/KruizeConstants.java | 94 ++++------- 6 files changed, 246 insertions(+), 200 deletions(-) diff --git a/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java b/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java index 3537ffce0..926bf04b1 100644 --- a/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java +++ b/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java @@ -46,7 +46,7 @@ public class BulkJobStatus { private String startTime; // Change to String to store formatted time @JsonProperty("job_end_time") private String endTime; // Change to String to store formatted time - private Map notifications; + private Map notifications; private Map experiments = Collections.synchronizedMap(new HashMap<>()); public BulkJobStatus(String jobID, String status, Instant startTime) { @@ -57,7 +57,7 @@ public BulkJobStatus(String jobID, String status, Instant startTime) { // Method to set a notification in the map - public void setNotification(String key, KruizeConstants.KRUIZE_BULK_API.NotificationConstants.Notification notification) { + public void setNotification(String key, Notification notification) { if (notifications == null) { notifications = new HashMap<>(); // Initialize if null } @@ -96,11 +96,11 @@ public void setEndTime(Instant endTime) { this.endTime = formatInstantAsUTCString(endTime); } - public Map getNotifications() { + public Map getNotifications() { return notifications; } - public void setNotifications(Map notifications) { + public void setNotifications(Map notifications) { this.notifications = notifications; } @@ -153,6 +153,22 @@ private String formatInstantAsUTCString(Instant instant) { } + public static enum NotificationType { + ERROR("error"), + WARNING("warning"), + INFO("info"); + + private final String type; + + NotificationType(String type) { + this.type = type; + } + + public String getType() { + return type; + } + } + public static class Experiment { private String name; private Notification notification; // Empty by default @@ -202,17 +218,41 @@ public void setNotification(Notification notification) { } public static class Notification { - private String type; + private NotificationType type; private String message; private int code; // Constructor, getters, and setters - public Notification(String type, String message, int code) { + public Notification(NotificationType type, String message, int code) { this.type = type; this.message = message; this.code = code; } + + public NotificationType getType() { + return type; + } + + public void setType(NotificationType type) { + this.type = type; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } } diff --git a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java index a2d98fc89..c7acfd149 100644 --- a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java +++ b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java @@ -135,7 +135,9 @@ public void run() { LOGGER.error(e.getMessage()); e.printStackTrace(); jobData.setStatus(FAILED); - jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_BAD_REQUEST), DATASOURCE_NOT_REG_INFO); + BulkJobStatus.Notification notification = DATASOURCE_NOT_REG_INFO; + notification.setMessage(String.format(notification.getMessage(), e.getMessage())); + jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_BAD_REQUEST), notification); } if (null != datasource) { JSONObject daterange = processDateRange(this.bulkInput.getTime_range()); @@ -178,12 +180,12 @@ public void run() { expriment_exists = true; } else { jobData.setProcessed_experiments(jobData.getProcessed_experiments() + 1); - experiment.setNotification(new BulkJobStatus.Notification(ERROR, responseCode.getResponseBody().toString(), responseCode.getStatusCode())); + experiment.setNotification(new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, responseCode.getResponseBody().toString(), responseCode.getStatusCode())); } } catch (FetchMetricsError e) { e.printStackTrace(); jobData.setProcessed_experiments(jobData.getProcessed_experiments() + 1); - experiment.setNotification(new BulkJobStatus.Notification(ERROR, e.getMessage(), HttpURLConnection.HTTP_BAD_REQUEST)); + experiment.setNotification(new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_BAD_REQUEST)); } if (expriment_exists) { @@ -206,18 +208,18 @@ public void run() { experiment.getRecommendation().setStatus(NotificationConstants.Status.PROCESSED); } else { experiment.getRecommendation().setStatus(NotificationConstants.Status.FAILED); - experiment.setNotification(new BulkJobStatus.Notification(ERROR, recommendationResponseCode.getResponseBody().toString(), recommendationResponseCode.getStatusCode())); + experiment.setNotification(new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, recommendationResponseCode.getResponseBody().toString(), recommendationResponseCode.getStatusCode())); } } catch (Exception | FetchMetricsError e) { e.printStackTrace(); experiment.getRecommendation().setStatus(NotificationConstants.Status.FAILED); - experiment.getRecommendation().setNotification(new BulkJobStatus.Notification(ERROR, e.getMessage(), HttpURLConnection.HTTP_BAD_REQUEST)); + experiment.getRecommendation().setNotification(new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_BAD_REQUEST)); } }); } } catch (Exception e) { e.printStackTrace(); - experiment.setNotification(new BulkJobStatus.Notification(ERROR, e.getMessage(), HttpURLConnection.HTTP_INTERNAL_ERROR)); + experiment.setNotification(new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_INTERNAL_ERROR)); } }); } @@ -241,7 +243,7 @@ public void run() { e.printStackTrace(); jobData.setStatus("FAILED"); jobData.setEndTime(Instant.now()); - jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_INTERNAL_ERROR), new NotificationConstants.Notification(NotificationConstants.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_INTERNAL_ERROR)); + jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_INTERNAL_ERROR), new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_INTERNAL_ERROR)); } } diff --git a/src/main/java/com/autotune/common/datasource/DataSourceManager.java b/src/main/java/com/autotune/common/datasource/DataSourceManager.java index cf28c73b4..1a1fc9e37 100644 --- a/src/main/java/com/autotune/common/datasource/DataSourceManager.java +++ b/src/main/java/com/autotune/common/datasource/DataSourceManager.java @@ -17,25 +17,31 @@ import com.autotune.analyzer.utils.AnalyzerErrorConstants; import com.autotune.common.data.ValidationOutputData; +import com.autotune.common.data.dataSourceMetadata.DataSource; +import com.autotune.common.data.dataSourceMetadata.DataSourceCluster; +import com.autotune.common.data.dataSourceMetadata.DataSourceMetadataInfo; import com.autotune.common.exceptions.datasource.DataSourceDoesNotExist; -import com.autotune.common.data.dataSourceMetadata.*; import com.autotune.database.dao.ExperimentDAOImpl; import com.autotune.database.service.ExperimentDBService; import com.autotune.utils.KruizeConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; import java.util.HashMap; import java.util.Map; /** * DataSourceManager is an interface to manage (create and update) metadata * of data sources - * - * + *

+ *

* Currently Supported Implementations: - * - importMetadataFromDataSource - * - getMetadataFromDataSource + * - importMetadataFromDataSource + * - getMetadataFromDataSource * TODO - DB integration for update and delete functionalities */ public class DataSourceManager { @@ -47,29 +53,24 @@ public DataSourceManager() { /** * Imports Metadata for a specific data source using associated DataSourceInfo. + * * @param dataSourceInfo - * @param uniqueKey this is used as labels in query example container="xyz" namespace="abc" - * @param startTime Get metadata from starttime to endtime - * @param endTime Get metadata from starttime to endtime - * @param steps the interval between data points in a range query + * @param uniqueKey this is used as labels in query example container="xyz" namespace="abc" + * @param startTime Get metadata from starttime to endtime + * @param endTime Get metadata from starttime to endtime + * @param steps the interval between data points in a range query * @return */ - public DataSourceMetadataInfo importMetadataFromDataSource(DataSourceInfo dataSourceInfo,String uniqueKey,long startTime,long endTime,int steps) throws Exception { - try { - if (null == dataSourceInfo) { - throw new DataSourceDoesNotExist(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.MISSING_DATASOURCE_INFO); - } - DataSourceMetadataInfo dataSourceMetadataInfo = dataSourceMetadataOperator.createDataSourceMetadata(dataSourceInfo,uniqueKey, startTime, endTime, steps); - if (null == dataSourceMetadataInfo) { - LOGGER.error(KruizeConstants.DataSourceConstants.DataSourceMetadataErrorMsgs.DATASOURCE_METADATA_INFO_NOT_AVAILABLE, "for datasource {}" + dataSourceInfo.getName()); - return null; - } - return dataSourceMetadataInfo; - } catch (Exception e) { - LOGGER.error(e.getMessage()); - throw e; + public DataSourceMetadataInfo importMetadataFromDataSource(DataSourceInfo dataSourceInfo, String uniqueKey, long startTime, long endTime, int steps) throws DataSourceDoesNotExist, IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { + if (null == dataSourceInfo) { + throw new DataSourceDoesNotExist(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.MISSING_DATASOURCE_INFO); } - + DataSourceMetadataInfo dataSourceMetadataInfo = dataSourceMetadataOperator.createDataSourceMetadata(dataSourceInfo, uniqueKey, startTime, endTime, steps); + if (null == dataSourceMetadataInfo) { + LOGGER.error(KruizeConstants.DataSourceConstants.DataSourceMetadataErrorMsgs.DATASOURCE_METADATA_INFO_NOT_AVAILABLE, "for datasource {}" + dataSourceInfo.getName()); + return null; + } + return dataSourceMetadataInfo; } /** @@ -93,7 +94,7 @@ public DataSourceMetadataInfo getMetadataFromDataSource(DataSourceInfo dataSourc return dataSourceMetadataInfo; } catch (DataSourceDoesNotExist e) { LOGGER.error(e.getMessage()); - }catch (Exception e) { + } catch (Exception e) { LOGGER.error("Loading saved datasource metadata failed: {} ", e.getMessage()); } return null; @@ -101,9 +102,10 @@ public DataSourceMetadataInfo getMetadataFromDataSource(DataSourceInfo dataSourc /** * Updates metadata of the specified data source and metadata object - * @param dataSource The information about the data source to be updated. + * + * @param dataSource The information about the data source to be updated. * @param dataSourceMetadataInfo The existing DataSourceMetadataInfo object containing the current - * metadata information of the data source. + * metadata information of the data source. */ public void updateMetadataFromDataSource(DataSourceInfo dataSource, DataSourceMetadataInfo dataSourceMetadataInfo) { try { @@ -113,7 +115,7 @@ public void updateMetadataFromDataSource(DataSourceInfo dataSource, DataSourceMe if (null == dataSourceMetadataInfo) { throw new DataSourceDoesNotExist(KruizeConstants.DataSourceConstants.DataSourceMetadataErrorMsgs.DATASOURCE_METADATA_INFO_NOT_AVAILABLE); } - dataSourceMetadataOperator.updateDataSourceMetadata(dataSource,"",0,0,0); + dataSourceMetadataOperator.updateDataSourceMetadata(dataSource, "", 0, 0, 0); } catch (Exception e) { LOGGER.error(e.getMessage()); } @@ -121,6 +123,7 @@ public void updateMetadataFromDataSource(DataSourceInfo dataSource, DataSourceMe /** * Deletes metadata of the specified data source + * * @param dataSource The metadata associated with the specified data source to be deleted. */ public void deleteMetadataFromDataSource(DataSourceInfo dataSource) { @@ -137,8 +140,9 @@ public void deleteMetadataFromDataSource(DataSourceInfo dataSource) { /** * Adds Metadata object to DB + * * @param dataSourceMetadataInfo DataSourceMetadataInfo object - * Note - It's assumed that metadata will be added to database after validating dataSourceMetadataInfo object + * Note - It's assumed that metadata will be added to database after validating dataSourceMetadataInfo object */ public void addMetadataToDB(DataSourceMetadataInfo dataSourceMetadataInfo) { ValidationOutputData addedToDB = null; @@ -159,7 +163,7 @@ public void addMetadataToDB(DataSourceMetadataInfo dataSourceMetadataInfo) { private boolean checkIfDataSourceMetadataExists(String dataSourceName) { boolean isPresent = false; try { - DataSourceMetadataInfo dataSourceMetadataInfo = new ExperimentDBService().loadMetadataFromDBByName(dataSourceName,"false"); + DataSourceMetadataInfo dataSourceMetadataInfo = new ExperimentDBService().loadMetadataFromDBByName(dataSourceName, "false"); if (null != dataSourceMetadataInfo) { LOGGER.error("Metadata already exists for datasource: {}!", dataSourceName); isPresent = true; @@ -172,6 +176,7 @@ private boolean checkIfDataSourceMetadataExists(String dataSourceName) { /** * Fetches and deletes DataSourceMetadata of the specified datasource from Database + * * @param dataSourceInfo DataSourceInfo object */ public void deleteMetadataFromDBByDataSource(DataSourceInfo dataSourceInfo) { @@ -195,6 +200,7 @@ public void deleteMetadataFromDBByDataSource(DataSourceInfo dataSourceInfo) { /** * Deletes DataSourceMetadata entry from Database + * * @param dataSourceName datasource name */ public void deleteMetadataFromDB(String dataSourceName) { @@ -215,12 +221,13 @@ public void deleteMetadataFromDB(String dataSourceName) { /** * Fetches Datasource details from Database by name + * * @param dataSourceName Name of the datasource to be fetched * @return DataSourceInfo object of the specified datasource name */ public DataSourceInfo fetchDataSourceFromDBByName(String dataSourceName) { try { - if(null == dataSourceName || dataSourceName.isEmpty()) { + if (null == dataSourceName || dataSourceName.isEmpty()) { throw new Exception(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.MISSING_DATASOURCE_NAME); } DataSourceInfo datasource = new ExperimentDBService().loadDataSourceFromDBByName(dataSourceName); @@ -233,13 +240,14 @@ public DataSourceInfo fetchDataSourceFromDBByName(String dataSourceName) { /** * Fetches Datasource metadata details from Database by name - * @param dataSourceName Name of the datasource to be fetched - * @param verbose Flag indicating granularity of metadata to be fetched + * + * @param dataSourceName Name of the datasource to be fetched + * @param verbose Flag indicating granularity of metadata to be fetched * @return DataSourceMetadataInfo object of the specified datasource name */ public DataSourceMetadataInfo fetchDataSourceMetadataFromDBByName(String dataSourceName, String verbose) { try { - if(null == dataSourceName || dataSourceName.isEmpty()) { + if (null == dataSourceName || dataSourceName.isEmpty()) { throw new Exception(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.MISSING_DATASOURCE_NAME); } DataSourceMetadataInfo metadataInfo = new ExperimentDBService().loadMetadataFromDBByName(dataSourceName, verbose); @@ -255,13 +263,13 @@ public DataSourceMetadataInfo fetchDataSourceMetadataFromDBByName(String dataSou * This method processes the provided metadata includes only the datasource * names and their associated cluster names, pruning all other details. * - * @param dataSourceName Datasource name - * @param dataSourceMetadataInfo DataSourceMetadataInfo object containing granular metadata + * @param dataSourceName Datasource name + * @param dataSourceMetadataInfo DataSourceMetadataInfo object containing granular metadata * @return A new DataSourceMetadataInfo object containing only the cluster details. - * + *

* Note - It's assumed that Cluster view will be requested after validating dataSourceMetadataInfo object */ - public DataSourceMetadataInfo DataSourceMetadataClusterView(String dataSourceName, DataSourceMetadataInfo dataSourceMetadataInfo){ + public DataSourceMetadataInfo DataSourceMetadataClusterView(String dataSourceName, DataSourceMetadataInfo dataSourceMetadataInfo) { try { HashMap filteredDataSourceHashMap = new HashMap<>(); diff --git a/src/main/java/com/autotune/common/datasource/DataSourceMetadataOperator.java b/src/main/java/com/autotune/common/datasource/DataSourceMetadataOperator.java index 5c22e9001..7c1d09033 100644 --- a/src/main/java/com/autotune/common/datasource/DataSourceMetadataOperator.java +++ b/src/main/java/com/autotune/common/datasource/DataSourceMetadataOperator.java @@ -22,6 +22,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; import java.util.HashMap; /** @@ -55,9 +59,9 @@ public static DataSourceMetadataOperator getInstance() { * @param startTime Get metadata from starttime to endtime * @param endTime Get metadata from starttime to endtime * @param steps the interval between data points in a range query - * TODO - support multiple data sources + * TODO - support multiple data sources */ - public DataSourceMetadataInfo createDataSourceMetadata(DataSourceInfo dataSourceInfo, String uniqueKey, long startTime, long endTime, int steps) throws Exception { + public DataSourceMetadataInfo createDataSourceMetadata(DataSourceInfo dataSourceInfo, String uniqueKey, long startTime, long endTime, int steps) throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { return processQueriesAndPopulateDataSourceMetadataInfo(dataSourceInfo, uniqueKey, startTime, endTime, steps); } @@ -97,8 +101,8 @@ public DataSourceMetadataInfo getDataSourceMetadataInfo(DataSourceInfo dataSourc * @param dataSourceInfo The DataSourceInfo object containing information about the * data source to be updated. *

- * TODO - Currently Create and Update functions have identical functionalities, based on UI workflow and requirements - * need to further enhance updateDataSourceMetadata() to support namespace, workload level granular updates + * TODO - Currently Create and Update functions have identical functionalities, based on UI workflow and requirements + * need to further enhance updateDataSourceMetadata() to support namespace, workload level granular updates */ public DataSourceMetadataInfo updateDataSourceMetadata(DataSourceInfo dataSourceInfo, String uniqueKey, long startTime, long endTime, int steps) throws Exception { return processQueriesAndPopulateDataSourceMetadataInfo(dataSourceInfo, uniqueKey, startTime, endTime, steps); @@ -141,7 +145,7 @@ public void deleteDataSourceMetadata(DataSourceInfo dataSourceInfo) { * @return DataSourceMetadataInfo object with populated metadata fields * todo rename processQueriesAndFetchClusterMetadataInfo */ - public DataSourceMetadataInfo processQueriesAndPopulateDataSourceMetadataInfo(DataSourceInfo dataSourceInfo, String uniqueKey, long startTime, long endTime, int steps) throws Exception { + public DataSourceMetadataInfo processQueriesAndPopulateDataSourceMetadataInfo(DataSourceInfo dataSourceInfo, String uniqueKey, long startTime, long endTime, int steps) throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { DataSourceMetadataHelper dataSourceDetailsHelper = new DataSourceMetadataHelper(); /** * Get DataSourceOperatorImpl instance on runtime based on dataSource provider @@ -158,81 +162,78 @@ public DataSourceMetadataInfo processQueriesAndPopulateDataSourceMetadataInfo(Da * creating a comprehensive DataSourceMetadataInfo object that is then added to a list. * TODO - Process cluster metadata using a custom query */ - try { - String dataSourceName = dataSourceInfo.getName(); - String namespaceQuery = PromQLDataSourceQueries.NAMESPACE_QUERY; - String workloadQuery = PromQLDataSourceQueries.WORKLOAD_QUERY; - String containerQuery = PromQLDataSourceQueries.CONTAINER_QUERY; - if (null != uniqueKey && !uniqueKey.isEmpty()) { - LOGGER.debug("uniquekey: {}", uniqueKey); - namespaceQuery = namespaceQuery.replace("ADDITIONAL_LABEL", "," + uniqueKey); - workloadQuery = workloadQuery.replace("ADDITIONAL_LABEL", "," + uniqueKey); - containerQuery = containerQuery.replace("ADDITIONAL_LABEL", "," + uniqueKey); - } else { - namespaceQuery = namespaceQuery.replace("ADDITIONAL_LABEL", ""); - workloadQuery = workloadQuery.replace("ADDITIONAL_LABEL", ""); - containerQuery = containerQuery.replace("ADDITIONAL_LABEL", ""); + + String dataSourceName = dataSourceInfo.getName(); + String namespaceQuery = PromQLDataSourceQueries.NAMESPACE_QUERY; + String workloadQuery = PromQLDataSourceQueries.WORKLOAD_QUERY; + String containerQuery = PromQLDataSourceQueries.CONTAINER_QUERY; + if (null != uniqueKey && !uniqueKey.isEmpty()) { + LOGGER.debug("uniquekey: {}", uniqueKey); + namespaceQuery = namespaceQuery.replace("ADDITIONAL_LABEL", "," + uniqueKey); + workloadQuery = workloadQuery.replace("ADDITIONAL_LABEL", "," + uniqueKey); + containerQuery = containerQuery.replace("ADDITIONAL_LABEL", "," + uniqueKey); + } else { + namespaceQuery = namespaceQuery.replace("ADDITIONAL_LABEL", ""); + workloadQuery = workloadQuery.replace("ADDITIONAL_LABEL", ""); + containerQuery = containerQuery.replace("ADDITIONAL_LABEL", ""); + } + LOGGER.info("namespaceQuery: {}", namespaceQuery); + LOGGER.info("workloadQuery: {}", workloadQuery); + LOGGER.info("containerQuery: {}", containerQuery); + + JsonArray namespacesDataResultArray = op.getResultArrayForQuery(dataSourceInfo, namespaceQuery); + if (!op.validateResultArray(namespacesDataResultArray)) { + dataSourceMetadataInfo = dataSourceDetailsHelper.createDataSourceMetadataInfoObject(dataSourceName, null); + } else { + /** + * Key: Name of namespace + * Value: DataSourceNamespace object corresponding to a namespace + */ + HashMap datasourceNamespaces = dataSourceDetailsHelper.getActiveNamespaces(namespacesDataResultArray); + dataSourceMetadataInfo = dataSourceDetailsHelper.createDataSourceMetadataInfoObject(dataSourceName, datasourceNamespaces); + + /** + * Outer map: + * Key: Name of namespace + *

+ * Inner map: + * Key: Name of workload + * Value: DataSourceWorkload object matching the name + * TODO - get workload metadata for a given namespace + */ + HashMap> datasourceWorkloads = new HashMap<>(); + JsonArray workloadDataResultArray = op.getResultArrayForQuery(dataSourceInfo, + workloadQuery); + + if (op.validateResultArray(workloadDataResultArray)) { + datasourceWorkloads = dataSourceDetailsHelper.getWorkloadInfo(workloadDataResultArray); } - LOGGER.info("namespaceQuery: {}", namespaceQuery); - LOGGER.info("workloadQuery: {}", workloadQuery); - LOGGER.info("containerQuery: {}", containerQuery); - - JsonArray namespacesDataResultArray = op.getResultArrayForQuery(dataSourceInfo, namespaceQuery); - if (!op.validateResultArray(namespacesDataResultArray)) { - dataSourceMetadataInfo = dataSourceDetailsHelper.createDataSourceMetadataInfoObject(dataSourceName, null); - } else { - /** - * Key: Name of namespace - * Value: DataSourceNamespace object corresponding to a namespace - */ - HashMap datasourceNamespaces = dataSourceDetailsHelper.getActiveNamespaces(namespacesDataResultArray); - dataSourceMetadataInfo = dataSourceDetailsHelper.createDataSourceMetadataInfoObject(dataSourceName, datasourceNamespaces); - - /** - * Outer map: - * Key: Name of namespace - *

- * Inner map: - * Key: Name of workload - * Value: DataSourceWorkload object matching the name - * TODO - get workload metadata for a given namespace - */ - HashMap> datasourceWorkloads = new HashMap<>(); - JsonArray workloadDataResultArray = op.getResultArrayForQuery(dataSourceInfo, - workloadQuery); - - if (op.validateResultArray(workloadDataResultArray)) { - datasourceWorkloads = dataSourceDetailsHelper.getWorkloadInfo(workloadDataResultArray); - } - dataSourceDetailsHelper.updateWorkloadDataSourceMetadataInfoObject(dataSourceName, dataSourceMetadataInfo, - datasourceWorkloads); - - /** - * Outer map: - * Key: Name of workload - *

- * Inner map: - * Key: Name of container - * Value: DataSourceContainer object matching the name - * TODO - get container metadata for a given workload - */ - HashMap> datasourceContainers = new HashMap<>(); - JsonArray containerDataResultArray = op.getResultArrayForQuery(dataSourceInfo, - containerQuery); - LOGGER.debug("containerDataResultArray: {}", containerDataResultArray); - - if (op.validateResultArray(containerDataResultArray)) { - datasourceContainers = dataSourceDetailsHelper.getContainerInfo(containerDataResultArray); - } - dataSourceDetailsHelper.updateContainerDataSourceMetadataInfoObject(dataSourceName, dataSourceMetadataInfo, - datasourceWorkloads, datasourceContainers); - return getDataSourceMetadataInfo(dataSourceInfo); + dataSourceDetailsHelper.updateWorkloadDataSourceMetadataInfoObject(dataSourceName, dataSourceMetadataInfo, + datasourceWorkloads); + + /** + * Outer map: + * Key: Name of workload + *

+ * Inner map: + * Key: Name of container + * Value: DataSourceContainer object matching the name + * TODO - get container metadata for a given workload + */ + HashMap> datasourceContainers = new HashMap<>(); + JsonArray containerDataResultArray = op.getResultArrayForQuery(dataSourceInfo, + containerQuery); + LOGGER.debug("containerDataResultArray: {}", containerDataResultArray); + + if (op.validateResultArray(containerDataResultArray)) { + datasourceContainers = dataSourceDetailsHelper.getContainerInfo(containerDataResultArray); } - - return null; - } catch (Exception e) { - LOGGER.error(e.getMessage()); - throw e; + dataSourceDetailsHelper.updateContainerDataSourceMetadataInfoObject(dataSourceName, dataSourceMetadataInfo, + datasourceWorkloads, datasourceContainers); + return getDataSourceMetadataInfo(dataSourceInfo); } + + return null; + } } diff --git a/src/main/java/com/autotune/utils/GenericRestApiClient.java b/src/main/java/com/autotune/utils/GenericRestApiClient.java index 0d178d219..cdb19ac94 100644 --- a/src/main/java/com/autotune/utils/GenericRestApiClient.java +++ b/src/main/java/com/autotune/utils/GenericRestApiClient.java @@ -22,6 +22,8 @@ import com.autotune.utils.authModels.APIKeysAuthentication; import com.autotune.utils.authModels.BasicAuthentication; import com.autotune.utils.authModels.BearerAccessToken; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.client.ClientProtocolException; @@ -58,6 +60,8 @@ public class GenericRestApiClient { private static final long serialVersionUID = 1L; private static final Logger LOGGER = LoggerFactory.getLogger(GenericRestApiClient.class); + private static final int MAX_RETRIES = 5; + private static final long INITIAL_BACKOFF_MS = 1000; // 1 second private String baseURL; private BasicAuthentication basicAuthentication; private BearerAccessToken bearerAccessToken; @@ -97,12 +101,41 @@ public JSONObject fetchMetricsJson(String methodType, String queryString) throws // Apply authentication applyAuthentication(httpRequestBase); - LOGGER.debug("Executing Prometheus metrics request: {}", httpRequestBase.getRequestLine()); + LOGGER.info("Executing Prometheus metrics request: {}", httpRequestBase.getRequestLine()); - // Execute the request - jsonResponse = httpclient.execute(httpRequestBase, new StringResponseHandler()); - } catch (IOException e) { - throw e; + // Execute the request and get the HttpResponse + HttpResponse response = httpclient.execute(httpRequestBase); + + // Get and print the response code + int responseCode = response.getStatusLine().getStatusCode(); + LOGGER.info("Response code: {}", responseCode); + + // Get the response body if needed + jsonResponse = new StringResponseHandler().handleResponse(response); + LOGGER.info("jsonResponse {}", jsonResponse); + + // Parse the JSON response + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode rootNode = objectMapper.readTree(jsonResponse); + JsonNode resultNode = rootNode.path("data").path("result"); + JsonNode warningsNode = rootNode.path("warnings"); + + // Check if the result is empty and if there are specific warnings + if (resultNode.isArray() && resultNode.size() == 0) { + LOGGER.info("resultNode is empty"); + for (JsonNode warning : warningsNode) { + String warningMessage = warning.asText(); + LOGGER.info("warnings is {}", warningMessage); + if (warningMessage.contains("error reading from server") || warningMessage.contains("Please reduce your request rate")) { + LOGGER.warn("Warning detected: {}", warningMessage); + throw new IOException(warningMessage); + } else { + LOGGER.info("no warnings detected"); + } + } + } else { + LOGGER.info("resultNode is not empty"); + } } return new JSONObject(jsonResponse); } diff --git a/src/main/java/com/autotune/utils/KruizeConstants.java b/src/main/java/com/autotune/utils/KruizeConstants.java index 7c8560d88..6431bdaa1 100644 --- a/src/main/java/com/autotune/utils/KruizeConstants.java +++ b/src/main/java/com/autotune/utils/KruizeConstants.java @@ -18,6 +18,7 @@ package com.autotune.utils; import com.autotune.analyzer.kruizeObject.CreateExperimentConfigBean; +import com.autotune.analyzer.serviceObjects.BulkJobStatus; import com.autotune.analyzer.utils.AnalyzerConstants; import java.text.SimpleDateFormat; @@ -796,65 +797,49 @@ public static final class KRUIZE_BULK_API { CREATE_EXPERIMENT_CONFIG_BEAN.setMeasurementDuration(15); } - public static final class NotificationConstants { + public static class NotificationConstants { - public static final Notification FETCH_METRIC_FAILURE = new Notification( - NotificationType.ERROR, + public static final BulkJobStatus.Notification JOB_NOT_FOUND_INFO = new BulkJobStatus.Notification( + BulkJobStatus.NotificationType.WARNING, + JOB_NOT_FOUND_MSG, + 404 + ); + public static final BulkJobStatus.Notification LIMIT_INFO = new BulkJobStatus.Notification( + BulkJobStatus.NotificationType.INFO, + LIMIT_MESSAGE, + 400 + ); + public static final BulkJobStatus.Notification NOTHING_INFO = new BulkJobStatus.Notification( + BulkJobStatus.NotificationType.INFO, + NOTHING, + 400 + ); + public static BulkJobStatus.Notification FETCH_METRIC_FAILURE = new BulkJobStatus.Notification( + BulkJobStatus.NotificationType.ERROR, "Not able to fetch metrics", 400 ); - public static final Notification DATASOURCE_NOT_REG_INFO = new Notification( - NotificationType.ERROR, - "Datasource not registered with Kruize.", + public static BulkJobStatus.Notification DATASOURCE_NOT_REG_INFO = new BulkJobStatus.Notification( + BulkJobStatus.NotificationType.ERROR, + "Datasource not registered with Kruize. (%s)", 400 ); - public static final Notification DATASOURCE_DOWN_INFO = new Notification( - NotificationType.ERROR, + public static BulkJobStatus.Notification DATASOURCE_DOWN_INFO = new BulkJobStatus.Notification( + BulkJobStatus.NotificationType.ERROR, "HttpHostConnectException: Unable to connect to the data source. Please try again later.", 503 ); - public static final Notification DATASOURCE_GATEWAY_TIMEOUT_INFO = new Notification( - NotificationType.ERROR, + public static BulkJobStatus.Notification DATASOURCE_GATEWAY_TIMEOUT_INFO = new BulkJobStatus.Notification( + BulkJobStatus.NotificationType.ERROR, "SocketTimeoutException: request timed out waiting for a data source response", 504 ); - public static final Notification DATASOURCE_CONNECT_TIMEOUT_INFO = new Notification( - NotificationType.ERROR, + public static BulkJobStatus.Notification DATASOURCE_CONNECT_TIMEOUT_INFO = new BulkJobStatus.Notification( + BulkJobStatus.NotificationType.ERROR, "ConnectTimeoutException: cannot establish a data source connection in a given time frame due to connectivity issues", 503 ); - public static final Notification JOB_NOT_FOUND_INFO = new Notification( - NotificationType.WARNING, - JOB_NOT_FOUND_MSG, - 404 - ); - public static final Notification LIMIT_INFO = new Notification( - NotificationType.INFO, - LIMIT_MESSAGE, - 400 - ); - public static final Notification NOTHING_INFO = new Notification( - NotificationType.INFO, - NOTHING, - 400 - ); - - public enum NotificationType { - ERROR("error"), - WARNING("warning"), - INFO("info"); - - private final String type; - - NotificationType(String type) { - this.type = type; - } - - public String getType() { - return type; - } - } // More notification constants can be added here as needed @@ -875,30 +860,7 @@ public String getStatus() { } } - // Notification class representing each constant's details - public static class Notification { - private final NotificationType type; - private final String message; - private final int code; - public Notification(NotificationType type, String message, int code) { - this.type = type; - this.message = message; - this.code = code; - } - - public NotificationType getType() { - return type; - } - - public String getMessage() { - return message; - } - - public int getCode() { - return code; - } - } } } } From 518a389f182f031f31c5516bdd1be86431e3cf91 Mon Sep 17 00:00:00 2001 From: msvinaykumar Date: Mon, 11 Nov 2024 09:56:22 +0530 Subject: [PATCH 5/5] Kruize Bulk Api functioanl Test Signed-off-by: msvinaykumar --- tests/scripts/helpers/kruize.py | 16 ++- .../rest_apis/test_bulkAPI.py | 119 ++++++++++++++++++ 2 files changed, 133 insertions(+), 2 deletions(-) create mode 100644 tests/scripts/local_monitoring_tests/rest_apis/test_bulkAPI.py diff --git a/tests/scripts/helpers/kruize.py b/tests/scripts/helpers/kruize.py index bcae2bbe5..9c4994b12 100644 --- a/tests/scripts/helpers/kruize.py +++ b/tests/scripts/helpers/kruize.py @@ -15,9 +15,12 @@ """ import json +import requests import subprocess -import requests + +def get_kruize_url(): + return URL def form_kruize_url(cluster_type, SERVER_IP=None): @@ -28,6 +31,10 @@ def form_kruize_url(cluster_type, SERVER_IP=None): print("\nKRUIZE AUTOTUNE URL = ", URL) return + if (cluster_type == "local"): + AUTOTUNE_PORT = 8080 + SERVER_IP = '127.0.0.1' + URL = "http://" + str(SERVER_IP) + ":" + str(AUTOTUNE_PORT) if (cluster_type == "minikube"): port = subprocess.run( ['kubectl -n monitoring get svc kruize --no-headers -o=custom-columns=PORT:.spec.ports[*].nodePort'], @@ -43,11 +50,14 @@ def form_kruize_url(cluster_type, SERVER_IP=None): subprocess.run(['oc expose svc/kruize -n openshift-tuning'], shell=True, stdout=subprocess.PIPE) ip = subprocess.run( - ['oc status -n openshift-tuning | grep "kruize" | grep -v "kruize-ui" | grep -v "kruize-db" | grep port | cut -d " " -f1 | cut -d "/" -f3'], shell=True, + [ + 'oc status -n openshift-tuning | grep "kruize" | grep -v "kruize-ui" | grep -v "kruize-db" | grep port | cut -d " " -f1 | cut -d "/" -f3'], + shell=True, stdout=subprocess.PIPE) SERVER_IP = ip.stdout.decode('utf-8').strip('\n') print("IP = ", SERVER_IP) URL = "http://" + str(SERVER_IP) + print("\nKRUIZE AUTOTUNE URL = ", URL) @@ -381,6 +391,7 @@ def create_metric_profile(metric_profile_json_file): print(response.text) return response + # Description: This function deletes the metric profile # Input Parameters: metric profile input json def delete_metric_profile(input_json_file, invalid_header=False): @@ -437,6 +448,7 @@ def list_metric_profiles(name=None, verbose=None, logging=True): print("\n************************************************************") return response + # Description: This function generates recommendation for the given experiment_name def generate_recommendations(experiment_name): print("\n************************************************************") diff --git a/tests/scripts/local_monitoring_tests/rest_apis/test_bulkAPI.py b/tests/scripts/local_monitoring_tests/rest_apis/test_bulkAPI.py new file mode 100644 index 000000000..634eb83fb --- /dev/null +++ b/tests/scripts/local_monitoring_tests/rest_apis/test_bulkAPI.py @@ -0,0 +1,119 @@ +""" +Copyright (c) 2022, 2024 Red Hat, IBM Corporation and others. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import logging +import pytest +import requests +import sys + +sys.path.append("../../") +from helpers.fixtures import * +from helpers.kruize import * +from helpers.utils import * + +# Set up logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + +@pytest.mark.sanity +@pytest.mark.parametrize("bulk_request_payload, expected_job_id_present", [ + ({}, True), # Test with an empty payload to check if a job_id is created. + ({ + "filter": { + "exclude": { + "namespace": [], + "workload": [], + "containers": [], + "labels": {} + }, + "include": { + "namespace": [], + "workload": [], + "containers": [], + "labels": {} + } + }, + "time_range": {} + }, True) # Test with a sample payload with some JSON content +]) +def test_bulk_post_request(cluster_type, bulk_request_payload, expected_job_id_present, caplog): + form_kruize_url(cluster_type) + URL = get_kruize_url() + + with caplog.at_level(logging.INFO): + # Log request payload and curl command for POST request + logger.info("Sending POST request to URL: %s", f"{URL}/bulk") + logger.info("Request Payload: %s", bulk_request_payload) + curl_command = f"curl -X POST {URL}/bulk -H 'Content-Type: application/json' -d '{json.dumps(bulk_request_payload)}'" + logger.info("Equivalent cURL command: %s", curl_command) + + # Send the POST request + response = requests.post(f"{URL}/bulk", json=bulk_request_payload) + logger.info("Response Status Code: %s", response.status_code) + logger.info("Response JSON: %s", response.json()) + + # Check if job_id is present in the response + job_id_present = "job_id" in response.json() and isinstance(response.json()["job_id"], str) + assert job_id_present == expected_job_id_present, f"Expected job_id presence to be {expected_job_id_present} but was {job_id_present}" + + # If a job_id is generated, run the GET request test + if job_id_present: + test_get_job_status(response.json()["job_id"], URL, caplog) + + +def test_get_job_status(job_id, base_url, caplog): + # Define URLs for both requests + url_basic = f"{base_url}/bulk?job_id={job_id}" + url_verbose = f"{base_url}/bulk?job_id={job_id}&verbose=true" + + # Common keys expected in both responses + common_keys = { + "status", "total_experiments", "processed_experiments", "job_id", "job_start_time", "job_end_time" + } + + # Extra keys expected when verbose=true + verbose_keys = { + "experiments" + } + + with caplog.at_level(logging.INFO): + # Make the GET request without verbose + logger.info("Sending GET request to URL (basic): %s", url_basic) + curl_command_basic = f"curl -X GET '{url_basic}'" + logger.info("Equivalent cURL command (basic): %s", curl_command_basic) + response_basic = requests.get(url_basic) + + logger.info("Basic GET Response Status Code: %s", response_basic.status_code) + logger.info("Basic GET Response JSON: %s", response_basic.json()) + + # Verify common keys in the basic response + assert common_keys.issubset( + response_basic.json().keys()), f"Missing keys in response: {common_keys - response_basic.json().keys()}" + + # Make the GET request with verbose=true + logger.info("Sending GET request to URL (verbose): %s", url_verbose) + curl_command_verbose = f"curl -X GET '{url_verbose}'" + logger.info("Equivalent cURL command (verbose): %s", curl_command_verbose) + response_verbose = requests.get(url_verbose) + + logger.info("Verbose GET Response Status Code: %s", response_verbose.status_code) + logger.info("Verbose GET Response JSON: %s", response_verbose.json()) + + # Verify common and verbose keys in the verbose response + assert common_keys.issubset( + response_verbose.json().keys()), f"Missing keys in verbose response: {common_keys - response_verbose.json().keys()}" + assert verbose_keys.issubset( + response_verbose.json().keys()), f"Missing verbose keys in response: {verbose_keys - response_verbose.json().keys()}"