diff --git a/libs/libcommon/src/libcommon/exceptions.py b/libs/libcommon/src/libcommon/exceptions.py index 5eb104fdb8..bcde4018d5 100644 --- a/libs/libcommon/src/libcommon/exceptions.py +++ b/libs/libcommon/src/libcommon/exceptions.py @@ -111,6 +111,7 @@ def as_response(self) -> ErrorResponse: "ParquetResponseEmptyError", "PreviousStepFormatError", "PreviousStepStatusError", + "PreviousStepStillProcessingError", "ResponseAlreadyComputedError", "RowsPostProcessingError", "SplitsNamesError", @@ -441,6 +442,13 @@ def __init__(self, message: str, cause: Optional[BaseException] = None): super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "PreviousStepStatusError", cause, False) +class PreviousStepStillProcessingError(CacheableError): + """The previous steps are still being processed.""" + + def __init__(self, message: str, cause: Optional[BaseException] = None): + super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "PreviousStepStillProcessingError", cause, False) + + class ResponseAlreadyComputedError(CacheableError): """The response has been already computed by another job runner.""" diff --git a/services/worker/src/worker/job_manager.py b/services/worker/src/worker/job_manager.py index 14b5ea6297..bcace4dd51 100644 --- a/services/worker/src/worker/job_manager.py +++ b/services/worker/src/worker/job_manager.py @@ -13,6 +13,7 @@ DatasetScriptError, JobManagerCrashedError, JobManagerExceededMaximumDurationError, + PreviousStepStillProcessingError, ResponseAlreadyComputedError, TooBigContentError, UnexpectedError, @@ -21,6 +22,7 @@ from libcommon.processing_graph import ProcessingGraph, ProcessingStep from libcommon.simple_cache import ( CachedArtifactError, + CachedArtifactNotFoundError, CacheEntryDoesNotExistError, get_response_without_content_params, ) @@ -174,6 +176,10 @@ def process( "The computed response content exceeds the supported size in bytes" f" ({self.worker_config.content_max_bytes})." ) + except CachedArtifactNotFoundError as err: + raise PreviousStepStillProcessingError( + message="The previous steps are still being processed", cause=err + ) finally: # ensure the post_compute hook is called even if the compute raises an exception self.job_runner.post_compute()