Skip to content

Commit

Permalink
fix(api): timeout (#2858)
Browse files Browse the repository at this point in the history
  • Loading branch information
shahargl authored Dec 17, 2024
1 parent d41adfe commit 57907ac
Showing 1 changed file with 24 additions and 59 deletions.
83 changes: 24 additions & 59 deletions keep/providers/gcpmonitoring_provider/gcpmonitoring_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@
import datetime
import json
import logging
from typing import Optional

import google.api_core
import google.api_core.exceptions
import google.cloud.logging
import pydantic
from google.cloud.logging_v2.services.logging_service_v2 import LoggingServiceV2Client

from keep.api.models.alert import AlertDto, AlertSeverity, AlertStatus
from keep.contextmanager.contextmanager import ContextManager
Expand Down Expand Up @@ -44,13 +42,6 @@ class GcpmonitoringProviderAuthConfig:
"file_type": "application/json", # this is used to filter the file type in the UI
}
)
timeout: Optional[int] = dataclasses.field(
default=30,
metadata={
"required": False,
"description": "Query timeout in seconds",
},
)


class GcpmonitoringProvider(BaseProvider):
Expand Down Expand Up @@ -109,26 +100,20 @@ def __init__(
self.authentication_config.service_account_json
)
self._client = None
self.timeout = self.authentication_config.timeout

def validate_config(self):
self.authentication_config = GcpmonitoringProviderAuthConfig(
**self.config.authentication
)

def dispose(self):
if self._client:
self._client.transport.close()
pass

def validate_scopes(self) -> dict[str, bool | str]:
scopes = {}
# try initializing the client to validate the scopes
try:
# Use a small page size and timeout for validation
self.client.list_log_entries(
page_size=1,
timeout=10,
)
self.client.list_entries(max_results=1)
scopes["roles/logs.viewer"] = True
except google.api_core.exceptions.PermissionDenied:
scopes["roles/logs.viewer"] = (
Expand All @@ -139,15 +124,14 @@ def validate_scopes(self) -> dict[str, bool | str]:
return scopes

@property
def client(self) -> LoggingServiceV2Client:
def client(self) -> google.cloud.logging.Client:
if self._client is None:
self._client = self.__generate_client()
return self._client

def __generate_client(self) -> LoggingServiceV2Client:
def __generate_client(self) -> google.cloud.logging.Client:
if not self._client:

self._client = LoggingServiceV2Client.from_service_account_info(
self._client = google.cloud.logging.Client.from_service_account_info(
self._service_account_data
)
return self._client
Expand All @@ -159,11 +143,9 @@ def _query(
page_size=1000,
raw="true",
project="",
timeout=None,
**kwargs,
):
raw = raw == "true"
timeout = timeout or self.timeout
self.logger.info(
f"Querying GCP Monitoring with filter: {filter} and timedelta_in_days: {timedelta_in_days}"
)
Expand All @@ -177,42 +159,25 @@ def _query(
if project:
self.client.project = project

try:
self.logger.info(f"Querying logs with filter: {filter}")
entries_iterator = self.client.list_log_entries(
filter=filter,
page_size=page_size,
timeout=timeout,
)
self.logger.info("Querying logs completed")
entries = []
for entry in entries_iterator:
if raw:
entries.append(entry)
else:
try:
log_entry = LogEntry(
timestamp=entry.timestamp,
severity=entry.severity,
payload=entry.payload,
http_request=entry.http_request,
payload_exists=entry.payload is not None,
http_request_exists=entry.http_request is not None,
)
entries.append(log_entry)
except Exception:
self.logger.error("Error parsing log entry", exc_info=True)
continue

self.logger.info(f"Found {len(entries)} entries")
return entries

except google.api_core.exceptions.DeadlineExceeded:
self.logger.error(f"Query timed out after {timeout} seconds")
raise
except Exception as e:
self.logger.error(f"Error querying logs: {str(e)}", exc_info=True)
raise
entries_iterator = self.client.list_entries(filter_=filter, page_size=page_size)
entries = []
for entry in entries_iterator:
if raw:
entries.append(entry)
else:
try:
log_entry = LogEntry(
timestamp=entry.timestamp,
severity=entry.severity,
payload=entry.payload,
http_request=entry.http_request,
payload_exists=entry.payload is not None,
http_request_exists=entry.http_request is not None,
)
entries.append(log_entry)
except Exception:
self.logger.error("Error parsing log entry")
continue

self.logger.info(f"Found {len(entries)} entries")
return entries
Expand Down

0 comments on commit 57907ac

Please sign in to comment.