Skip to content

Commit

Permalink
Add error reporting
Browse files Browse the repository at this point in the history
Add an `error` method to the `Extractor` class that starts tracking of
an error. This can be used in one of two ways, either by manually
starting and stopping the error state:

``` python
e = extractor.error(...)

 # handle error

e.finish()
```

or by using it as a context

``` python
with extractor.error(...):
    # Handle error
```

You can create an instant error (with no duration) by using the
`instant()` method:

``` python
extractor.error(...).instant()
```

Tracking of start/end times, generating and keeping track of external
IDs, reporting in checkins, and so on is all handled automatically.
  • Loading branch information
mathialo committed Nov 22, 2024
1 parent 934a01b commit 9075489
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 3 deletions.
12 changes: 12 additions & 0 deletions cognite/extractorutils/unstable/core/_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from humps import camelize
from pydantic import BaseModel, ConfigDict

from cognite.extractorutils.unstable.core.errors import ErrorLevel


class CogniteModel(BaseModel):
"""
Expand All @@ -32,3 +34,13 @@ class TaskUpdate(CogniteModel):
type: Literal["started"] | Literal["ended"]
name: str
timestamp: int


class Error(CogniteModel):
external_id: str
level: ErrorLevel
description: str
details: str | None
start_time: int
end_time: int | None
task: str | None
46 changes: 44 additions & 2 deletions cognite/extractorutils/unstable/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Queue
from threading import RLock, Thread
from traceback import format_exception
from types import TracebackType
from typing import Generic, Literal, Optional, Type, TypeVar, Union

Expand All @@ -10,8 +11,10 @@

from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.unstable.configuration.models import ConnectionConfig, ExtractorConfig
from cognite.extractorutils.unstable.core._dto import Error as DtoError
from cognite.extractorutils.unstable.core._dto import TaskUpdate
from cognite.extractorutils.unstable.core._messaging import RuntimeMessage
from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel
from cognite.extractorutils.unstable.core.tasks import ContinuousTask, ScheduledTask, StartupTask, Task
from cognite.extractorutils.unstable.scheduling import TaskScheduler
from cognite.extractorutils.util import now
Expand Down Expand Up @@ -50,6 +53,7 @@ def __init__(

self._tasks: list[Task] = []
self._task_updates: list[TaskUpdate] = []
self._errors: dict[str, Error] = {}

self.logger = logging.getLogger(f"{self.EXTERNAL_ID}.main")

Expand All @@ -61,11 +65,26 @@ def _checkin(self) -> None:
task_updates = [t.model_dump() for t in self._task_updates]
self._task_updates.clear()

error_updates = [
DtoError(
external_id=e.external_id,
level=e.level,
description=e.description,
details=e.details,
start_time=e.start_time,
end_time=e.end_time,
task=e._task.name if e._task is not None else None,
).model_dump()
for e in self._errors.values()
]
self._errors.clear()

res = self.cognite_client.post(
f"/api/v1/projects/{self.cognite_client.config.project}/odin/checkin",
json={
"externalId": self.connection_config.extraction_pipeline,
"taskEvents": task_updates,
"errors": error_updates,
},
headers={"cdf-version": "alpha"},
)
Expand All @@ -83,13 +102,26 @@ def _run_checkin(self) -> None:
self.logger.exception("Error during checkin")
self.cancellation_token.wait(10)

def _report_error(self, error: Error) -> None:
with self._checkin_lock:
self._errors[error.external_id] = error

def error(
self,
level: ErrorLevel,
description: str,
details: str | None = None,
task: Task | None = None,
) -> Error:
return Error(level=level, description=description, details=details, extractor=self, task=task)

def restart(self) -> None:
if self._runtime_messages:
self._runtime_messages.put(RuntimeMessage.RESTART)
self.cancellation_token.cancel()

@classmethod
def init_from_runtime(
def _init_from_runtime(
cls,
connection_config: ConnectionConfig,
application_config: ConfigType,
Expand All @@ -109,6 +141,16 @@ def wrapped() -> None:
try:
target()

except Exception as e:
self.error(
ErrorLevel.fatal,
description="Task crashed unexpectedly",
details="".join(format_exception(e)),
task=task,
).finish()

raise e

finally:
with self._checkin_lock:
self._task_updates.append(
Expand Down Expand Up @@ -186,7 +228,7 @@ def run(self) -> None:
case _:
assert_never(task)

self.logger.info("Starting up extractor")
self.logger.info("Starting extractor")
if startup:
with ThreadPoolExecutor() as pool:
for task in startup:
Expand Down
71 changes: 71 additions & 0 deletions cognite/extractorutils/unstable/core/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import typing
from enum import Enum
from types import TracebackType
from uuid import uuid4

from cognite.extractorutils.unstable.core.tasks import Task
from cognite.extractorutils.util import now

if typing.TYPE_CHECKING:
from .base import Extractor


class ErrorLevel(Enum):
warning = "warning"
error = "error"
fatal = "fatal"


class Error:
def __init__(
self,
level: ErrorLevel,
description: str,
details: str | None,
task: Task | None,
extractor: "Extractor",
) -> None:
self.level = level
self.description = description
self.details = details

self.external_id = uuid4().hex
self.start_time = now()
self.end_time: int | None = None

self._extractor = extractor
self._task = task

self._extractor._report_error(self)

def instant(self) -> None:
# Only end the error once
if self.end_time is not None:
return

self.end_time = self.start_time

# Re-add in case the error has already been reported and dict cleared
self._extractor._report_error(self)

def finish(self) -> None:
# Only end the error once
if self.end_time is not None:
return

self.end_time = now()

# Re-add in case the error has already been reported and dict cleared
self._extractor._report_error(self)

def __enter__(self) -> "Error":
return self

def __exit__(
self,
exc_type: typing.Type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
self.finish()
return exc_val is None
2 changes: 1 addition & 1 deletion cognite/extractorutils/unstable/core/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def _inner_run(
current_config_revision: ConfigRevision,
) -> None:
# This code is run inside the new extractor process
extractor = self._extractor_class.init_from_runtime(
extractor = self._extractor_class._init_from_runtime(
connection_config,
application_config,
current_config_revision,
Expand Down

0 comments on commit 9075489

Please sign in to comment.