Skip to content

Commit

Permalink
fix(api): delete provider (keephq#3237)
Browse files Browse the repository at this point in the history
  • Loading branch information
talboren authored Jan 30, 2025
1 parent 8fc68ca commit 12dfe36
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 21 deletions.
32 changes: 26 additions & 6 deletions keep/parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def parse(
parsed_workflow_yaml: dict,
providers_file: str = None,
actions_file: str = None,
workflow_db_id: str = None,
) -> typing.List[Workflow]:
"""_summary_
Expand All @@ -83,6 +84,7 @@ def parse(
workflow_providers,
actions_file,
workflow_actions,
workflow_db_id,
)
for workflow in raw_workflows
]
Expand Down Expand Up @@ -138,6 +140,7 @@ def _parse_workflow(
workflow_providers: dict = None,
actions_file: str = None,
workflow_actions: dict = None,
workflow_db_id: str = None,
) -> Workflow:
self.logger.debug("Parsing workflow")
workflow_id = self._get_workflow_id(tenant_id, workflow)
Expand All @@ -158,10 +161,10 @@ def _parse_workflow(
workflow_owners = self._parse_owners(workflow)
workflow_tags = self._parse_tags(workflow)
workflow_steps = self._parse_steps(
context_manager, workflow, workflow_id, workflow_description
context_manager, workflow, workflow_id, workflow_description, workflow_db_id
)
workflow_actions = self._parse_actions(
context_manager, workflow, workflow_id, workflow_description
context_manager, workflow, workflow_id, workflow_description, workflow_db_id
)
workflow_interval = self.parse_interval(workflow)
on_failure_action = self._get_on_failure_action(context_manager, workflow)
Expand Down Expand Up @@ -431,13 +434,18 @@ def _parse_steps(
workflow: dict,
workflow_id: str | None = None,
workflow_description: str | None = None,
workflow_db_id: str | None = None,
) -> typing.List[Step]:
self.logger.debug("Parsing steps")
workflow_steps = workflow.get("steps", [])
workflow_steps_parsed = []
for _step in workflow_steps:
provider = self._get_step_provider(
context_manager, _step, workflow_id, workflow_description
context_manager,
_step,
workflow_id,
workflow_description,
workflow_db_id,
)
provider_parameters = _step.get("provider", {}).get("with")
parsed_provider_parameters = Parser.parse_provider_parameters(
Expand All @@ -462,6 +470,7 @@ def _get_step_provider(
_step: dict,
workflow_id: str | None = None,
workflow_description: str | None = None,
workflow_db_id: str | None = None,
) -> dict:
step_provider = _step.get("provider")
try:
Expand All @@ -483,11 +492,13 @@ def _get_step_provider(
self.logger.exception(
f"Error getting provider {provider_id} for step {_step.get('name')}",
extra={
"workflow_id": workflow_id,
"workflow_name": workflow_id,
"workflow_description": workflow_description,
"provider_id": provider_id,
"provider_type": step_provider_type,
"provider_config_name": step_provider_config,
"workflow_db_id": workflow_db_id,
"tenant_id": context_manager.tenant_id,
},
)
raise
Expand Down Expand Up @@ -558,6 +569,7 @@ def _get_action(
action_name: str | None = None,
workflow_id: str | None = None,
workflow_description: str | None = None,
workflow_db_id: str | None = None,
) -> Step:
name = action_name or action.get("name")
provider = action.get("provider", {})
Expand All @@ -582,11 +594,13 @@ def _get_action(
self.logger.exception(
f"Error getting provider {provider_id} for action {name}",
extra={
"workflow_id": workflow_id,
"workflow_name": workflow_id,
"workflow_description": workflow_description,
"provider_id": provider_id,
"provider_type": provider_type,
"provider_config_name": provider_config_name,
"workflow_db_id": workflow_db_id,
"tenant_id": context_manager.tenant_id,
},
)
raise
Expand All @@ -606,6 +620,7 @@ def _parse_actions(
workflow: dict,
workflow_id: str | None = None,
workflow_description: str | None = None,
workflow_db_id: str | None = None,
) -> typing.List[Step]:
self.logger.debug("Parsing actions")
workflow_actions_raw = workflow.get("actions", [])
Expand All @@ -616,7 +631,12 @@ def _parse_actions(
workflow_actions_parsed = []
for _action in workflow_actions:
parsed_action = self._get_action(
context_manager, _action, None, workflow_id, workflow_description
context_manager,
_action,
None,
workflow_id,
workflow_description,
workflow_db_id,
)
workflow_actions_parsed.append(parsed_action)
self.logger.debug("Actions parsed successfully")
Expand Down
27 changes: 22 additions & 5 deletions keep/providers/grafana_provider/grafana_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class GrafanaProviderAuthConfig:
"description": "Datasource UID",
"hint": "Provide if you want to pull topology data",
},
default="",
)


Expand Down Expand Up @@ -866,7 +867,10 @@ def simulate_alert(cls, **kwargs) -> dict:

def query_datasource_for_topology(self):
self.logger.info("Attempting to query datasource for topology data.")
headers = {"Authorization": f"Bearer {self.authentication_config.token}", "Content-Type": "application/json",}
headers = {
"Authorization": f"Bearer {self.authentication_config.token}",
"Content-Type": "application/json",
}
json_data = {
"queries": [
{
Expand Down Expand Up @@ -918,7 +922,10 @@ def query_datasource_for_topology(self):
raise Exception(response.text)
return response.json()
except Exception as e:
self.logger.error("Error while querying datasource for topology map", extra={"exception": str(e)})
self.logger.error(
"Error while querying datasource for topology map",
extra={"exception": str(e)},
)

@staticmethod
def __extract_schema_value_pair(results, query: str):
Expand All @@ -940,16 +947,23 @@ def __extract_schema_value_pair(results, query: str):

def pull_topology(self):
self.logger.info("Pulling Topology data from Grafana...")
if not self.authentication_config.datasource_uid:
self.logger.debug("No datasource uid found, skipping topology pull")
return [], {}
try:
service_topology = {}
results = self.query_datasource_for_topology().get("results", {})

self.logger.info("Scraping traces_service_graph_request_total data from the response")
self.logger.info(
"Scraping traces_service_graph_request_total data from the response"
)
requests_per_second_data = GrafanaProvider.__extract_schema_value_pair(
results=results, query="traces_service_graph_request_total"
)

self.logger.info("Scraping traces_service_graph_request_server_seconds_sum data from the response")
self.logger.info(
"Scraping traces_service_graph_request_server_seconds_sum data from the response"
)
total_response_times_data = GrafanaProvider.__extract_schema_value_pair(
results=results, query="traces_service_graph_request_server_seconds_sum"
)
Expand Down Expand Up @@ -981,7 +995,10 @@ def pull_topology(self):
self.logger.info("Successfully pulled Topology data from Grafana...")
return list(service_topology.values()), {}
except Exception as e:
self.logger.error("Error while pulling topology data from Grafana", extra={"exception": str(e)})
self.logger.error(
"Error while pulling topology data from Grafana",
extra={"exception": str(e)},
)
raise e


Expand Down
14 changes: 8 additions & 6 deletions keep/providers/providers_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,9 @@ def delete_provider(

context_manager = ContextManager(tenant_id=tenant_id)
secret_manager = SecretManagerFactory.get_secret_manager(context_manager)
config = secret_manager.read_secret(provider_model.configuration_key, is_json=True)
provider = ProvidersFactory.get_provider(
context_manager, provider_model.id, provider_model.type, config
)
config = secret_manager.read_secret(
provider_model.configuration_key, is_json=True
)

try:
secret_manager.delete_secret(provider_model.configuration_key)
Expand All @@ -319,14 +318,17 @@ def delete_provider(
logger.exception("Failed to unregister provider as a consumer")

try:
provider = ProvidersFactory.get_provider(
context_manager, provider_model.id, provider_model.type, config
)
provider.clean_up()
except NotImplementedError:
logger.info(
"Being deleted provider of type %s does not have a clean_up method",
provider_model.type
provider_model.type,
)
except Exception:
logger.exception(msg="Failed to clean up provider")
logger.exception(msg="Provider deleted but failed to clean up provider")

session.delete(provider_model)
session.commit()
Expand Down
27 changes: 24 additions & 3 deletions keep/workflowmanager/workflowscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,14 @@ def _handle_interval_workflows(self):
)
continue
except Exception as e:
self.logger.error(f"Error getting workflow: {e}")
self.logger.error(
f"Error getting workflow: {e}",
extra={
"workflow_id": workflow_id,
"workflow_execution_id": workflow_execution_id,
"tenant_id": tenant_id,
},
)
self._finish_workflow_execution(
tenant_id=tenant_id,
workflow_id=workflow_id,
Expand Down Expand Up @@ -424,7 +431,14 @@ def _handle_event_workflows(self):
)
# In case the provider are not configured properly
except ProviderConfigurationException as e:
self.logger.error(f"Error getting workflow: {e}")
self.logger.error(
f"Error getting workflow: {e}",
extra={
"workflow_id": workflow_id,
"workflow_execution_id": workflow_execution_id,
"tenant_id": tenant_id,
},
)
self._finish_workflow_execution(
tenant_id=tenant_id,
workflow_id=workflow_id,
Expand All @@ -434,7 +448,14 @@ def _handle_event_workflows(self):
)
continue
except Exception as e:
self.logger.error(f"Error getting workflow: {e}")
self.logger.error(
f"Error getting workflow: {e}",
extra={
"workflow_id": workflow_id,
"workflow_execution_id": workflow_execution_id,
"tenant_id": tenant_id,
},
)
self._finish_workflow_execution(
tenant_id=tenant_id,
workflow_id=workflow_id,
Expand Down
4 changes: 3 additions & 1 deletion keep/workflowmanager/workflowstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ def get_workflow(self, tenant_id: str, workflow_id: str) -> Workflow:
detail=f"Workflow {workflow_id} not found",
)
workflow_yaml = cyaml.safe_load(workflow)
workflow = self.parser.parse(tenant_id, workflow_yaml)
workflow = self.parser.parse(
tenant_id, workflow_yaml, workflow_db_id=workflow_id
)
if len(workflow) > 1:
raise HTTPException(
status_code=500,
Expand Down

0 comments on commit 12dfe36

Please sign in to comment.