diff --git a/libs/libcommon/src/libcommon/parquet_utils.py b/libs/libcommon/src/libcommon/parquet_utils.py index bcfc7dfad2..e497d9048f 100644 --- a/libs/libcommon/src/libcommon/parquet_utils.py +++ b/libs/libcommon/src/libcommon/parquet_utils.py @@ -26,7 +26,7 @@ PARTIAL_PREFIX = "partial-" -class ParquetResponseEmptyError(Exception): +class EmptyParquetMetadataError(Exception): pass @@ -228,7 +228,7 @@ def from_parquet_metadata_items( unsupported_features: list[FeatureType] = [], ) -> "ParquetIndexWithMetadata": if not parquet_file_metadata_items: - raise ParquetResponseEmptyError("No parquet files found.") + raise EmptyParquetMetadataError("No parquet files found.") partial = parquet_export_is_partial(parquet_file_metadata_items[0]["url"]) diff --git a/services/worker/src/worker/job_runners/split/first_rows_from_parquet.py b/services/worker/src/worker/job_runners/split/first_rows_from_parquet.py index 30590eef2e..0f4882d3cb 100644 --- a/services/worker/src/worker/job_runners/split/first_rows_from_parquet.py +++ b/services/worker/src/worker/job_runners/split/first_rows_from_parquet.py @@ -10,11 +10,12 @@ PROCESSING_STEP_SPLIT_FIRST_ROWS_FROM_STREAMING_VERSION, ) from libcommon.exceptions import ( + ParquetResponseEmptyError, RowsPostProcessingError, TooBigContentError, TooManyColumnsError, ) -from libcommon.parquet_utils import Indexer, TooBigRows +from libcommon.parquet_utils import EmptyParquetMetadataError, Indexer, TooBigRows from libcommon.processing_graph import ProcessingGraph, ProcessingStep from libcommon.public_assets_storage import PublicAssetsStorage from libcommon.storage import StrPath @@ -71,11 +72,14 @@ def compute_first_rows_response( ) -> SplitFirstRowsResponse: logging.info(f"get first-rows for dataset={dataset} config={config} split={split}") - rows_index = indexer.get_rows_index( - dataset=dataset, - config=config, - split=split, - ) + try: + rows_index = indexer.get_rows_index( + dataset=dataset, + config=config, + split=split, + ) + except EmptyParquetMetadataError: + raise ParquetResponseEmptyError("No parquet files found.") # validate the features features = rows_index.parquet_index.features diff --git a/services/worker/tests/job_runners/split/test_first_rows_from_parquet.py b/services/worker/tests/job_runners/split/test_first_rows_from_parquet.py index 980af8eece..464b368d1b 100644 --- a/services/worker/tests/job_runners/split/test_first_rows_from_parquet.py +++ b/services/worker/tests/job_runners/split/test_first_rows_from_parquet.py @@ -121,11 +121,12 @@ def ds_fs(ds: Dataset, tmpfs: AbstractFileSystem) -> Generator[AbstractFileSyste @pytest.mark.parametrize( - "rows_max_bytes,columns_max_number,error_code", + "rows_max_bytes,columns_max_number,has_parquet_files,error_code", [ - (0, 10, "TooBigContentError"), # too small limit, even with truncation - (1_000, 1, "TooManyColumnsError"), # too small columns limit - (1_000, 10, None), + (0, 10, True, "TooBigContentError"), # too small limit, even with truncation + (1_000, 1, True, "TooManyColumnsError"), # too small columns limit + (1_000, 10, True, None), + (1_000, 10, False, "ParquetResponseEmptyError"), ], ) def test_compute( @@ -136,6 +137,7 @@ def test_compute( app_config: AppConfig, rows_max_bytes: int, columns_max_number: int, + has_parquet_files: bool, error_code: str, ) -> None: dataset, config, split = "dataset", "config", "split" @@ -158,6 +160,8 @@ def test_compute( "parquet_metadata_subpath": fake_metadata_subpath, } ] + if has_parquet_files + else [] } upsert_response(