diff --git a/cognite/extractorutils/unstable/core/_dto.py b/cognite/extractorutils/unstable/core/_dto.py index 8d605043..3cf8d960 100644 --- a/cognite/extractorutils/unstable/core/_dto.py +++ b/cognite/extractorutils/unstable/core/_dto.py @@ -7,6 +7,8 @@ from humps import camelize from pydantic import BaseModel, ConfigDict +from cognite.extractorutils.unstable.core.errors import ErrorLevel + class CogniteModel(BaseModel): """ @@ -32,3 +34,13 @@ class TaskUpdate(CogniteModel): type: Literal["started"] | Literal["ended"] name: str timestamp: int + + +class Error(CogniteModel): + external_id: str + level: ErrorLevel + description: str + details: str | None + start_time: int + end_time: int | None + task: str | None diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index 323ea62e..99b2d101 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -2,6 +2,7 @@ from concurrent.futures import ThreadPoolExecutor from multiprocessing import Queue from threading import RLock, Thread +from traceback import format_exception from types import TracebackType from typing import Generic, Literal, Optional, Type, TypeVar, Union @@ -10,8 +11,10 @@ from cognite.extractorutils.threading import CancellationToken from cognite.extractorutils.unstable.configuration.models import ConnectionConfig, ExtractorConfig +from cognite.extractorutils.unstable.core._dto import Error as DtoError from cognite.extractorutils.unstable.core._dto import TaskUpdate from cognite.extractorutils.unstable.core._messaging import RuntimeMessage +from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel from cognite.extractorutils.unstable.core.tasks import ContinuousTask, ScheduledTask, StartupTask, Task from cognite.extractorutils.unstable.scheduling import TaskScheduler from cognite.extractorutils.util import now @@ -50,6 +53,7 @@ def __init__( self._tasks: list[Task] = [] self._task_updates: list[TaskUpdate] = [] + self._errors: dict[str, Error] = {} self.logger = logging.getLogger(f"{self.EXTERNAL_ID}.main") @@ -61,11 +65,26 @@ def _checkin(self) -> None: task_updates = [t.model_dump() for t in self._task_updates] self._task_updates.clear() + error_updates = [ + DtoError( + external_id=e.external_id, + level=e.level, + description=e.description, + details=e.details, + start_time=e.start_time, + end_time=e.end_time, + task=e._task.name if e._task is not None else None, + ).model_dump() + for e in self._errors.values() + ] + self._errors.clear() + res = self.cognite_client.post( f"/api/v1/projects/{self.cognite_client.config.project}/odin/checkin", json={ "externalId": self.connection_config.extraction_pipeline, "taskEvents": task_updates, + "errors": error_updates, }, headers={"cdf-version": "alpha"}, ) @@ -83,13 +102,26 @@ def _run_checkin(self) -> None: self.logger.exception("Error during checkin") self.cancellation_token.wait(10) + def _report_error(self, error: Error) -> None: + with self._checkin_lock: + self._errors[error.external_id] = error + + def error( + self, + level: ErrorLevel, + description: str, + details: str | None = None, + task: Task | None = None, + ) -> Error: + return Error(level=level, description=description, details=details, extractor=self, task=task) + def restart(self) -> None: if self._runtime_messages: self._runtime_messages.put(RuntimeMessage.RESTART) self.cancellation_token.cancel() @classmethod - def init_from_runtime( + def _init_from_runtime( cls, connection_config: ConnectionConfig, application_config: ConfigType, @@ -109,6 +141,16 @@ def wrapped() -> None: try: target() + except Exception as e: + self.error( + ErrorLevel.fatal, + description="Task crashed unexpectedly", + details="".join(format_exception(e)), + task=task, + ).finish() + + raise e + finally: with self._checkin_lock: self._task_updates.append( @@ -186,7 +228,7 @@ def run(self) -> None: case _: assert_never(task) - self.logger.info("Starting up extractor") + self.logger.info("Starting extractor") if startup: with ThreadPoolExecutor() as pool: for task in startup: diff --git a/cognite/extractorutils/unstable/core/errors.py b/cognite/extractorutils/unstable/core/errors.py new file mode 100644 index 00000000..ad2c8384 --- /dev/null +++ b/cognite/extractorutils/unstable/core/errors.py @@ -0,0 +1,71 @@ +import typing +from enum import Enum +from types import TracebackType +from uuid import uuid4 + +from cognite.extractorutils.unstable.core.tasks import Task +from cognite.extractorutils.util import now + +if typing.TYPE_CHECKING: + from .base import Extractor + + +class ErrorLevel(Enum): + warning = "warning" + error = "error" + fatal = "fatal" + + +class Error: + def __init__( + self, + level: ErrorLevel, + description: str, + details: str | None, + task: Task | None, + extractor: "Extractor", + ) -> None: + self.level = level + self.description = description + self.details = details + + self.external_id = uuid4().hex + self.start_time = now() + self.end_time: int | None = None + + self._extractor = extractor + self._task = task + + self._extractor._report_error(self) + + def instant(self) -> None: + # Only end the error once + if self.end_time is not None: + return + + self.end_time = self.start_time + + # Re-add in case the error has already been reported and dict cleared + self._extractor._report_error(self) + + def finish(self) -> None: + # Only end the error once + if self.end_time is not None: + return + + self.end_time = now() + + # Re-add in case the error has already been reported and dict cleared + self._extractor._report_error(self) + + def __enter__(self) -> "Error": + return self + + def __exit__( + self, + exc_type: typing.Type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> bool: + self.finish() + return exc_val is None diff --git a/cognite/extractorutils/unstable/core/runtime.py b/cognite/extractorutils/unstable/core/runtime.py index c6621a86..67395335 100644 --- a/cognite/extractorutils/unstable/core/runtime.py +++ b/cognite/extractorutils/unstable/core/runtime.py @@ -87,7 +87,7 @@ def _inner_run( current_config_revision: ConfigRevision, ) -> None: # This code is run inside the new extractor process - extractor = self._extractor_class.init_from_runtime( + extractor = self._extractor_class._init_from_runtime( connection_config, application_config, current_config_revision,