Skip to content

Commit

Permalink
add cacheable ParquetResponseEmptyError in first-rows-from-parquet (#…
Browse files Browse the repository at this point in the history
…2101)

* add ParquetResponseEmptyError

* fix test style

* rename exception name
  • Loading branch information
AndreaFrancis authored Nov 14, 2023
1 parent 976fdc0 commit 882de92
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 12 deletions.
4 changes: 2 additions & 2 deletions libs/libcommon/src/libcommon/parquet_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
PARTIAL_PREFIX = "partial-"


class ParquetResponseEmptyError(Exception):
class EmptyParquetMetadataError(Exception):
pass


Expand Down Expand Up @@ -228,7 +228,7 @@ def from_parquet_metadata_items(
unsupported_features: list[FeatureType] = [],
) -> "ParquetIndexWithMetadata":
if not parquet_file_metadata_items:
raise ParquetResponseEmptyError("No parquet files found.")
raise EmptyParquetMetadataError("No parquet files found.")

partial = parquet_export_is_partial(parquet_file_metadata_items[0]["url"])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
PROCESSING_STEP_SPLIT_FIRST_ROWS_FROM_STREAMING_VERSION,
)
from libcommon.exceptions import (
ParquetResponseEmptyError,
RowsPostProcessingError,
TooBigContentError,
TooManyColumnsError,
)
from libcommon.parquet_utils import Indexer, TooBigRows
from libcommon.parquet_utils import EmptyParquetMetadataError, Indexer, TooBigRows
from libcommon.processing_graph import ProcessingGraph, ProcessingStep
from libcommon.public_assets_storage import PublicAssetsStorage
from libcommon.storage import StrPath
Expand Down Expand Up @@ -71,11 +72,14 @@ def compute_first_rows_response(
) -> SplitFirstRowsResponse:
logging.info(f"get first-rows for dataset={dataset} config={config} split={split}")

rows_index = indexer.get_rows_index(
dataset=dataset,
config=config,
split=split,
)
try:
rows_index = indexer.get_rows_index(
dataset=dataset,
config=config,
split=split,
)
except EmptyParquetMetadataError:
raise ParquetResponseEmptyError("No parquet files found.")

# validate the features
features = rows_index.parquet_index.features
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,12 @@ def ds_fs(ds: Dataset, tmpfs: AbstractFileSystem) -> Generator[AbstractFileSyste


@pytest.mark.parametrize(
"rows_max_bytes,columns_max_number,error_code",
"rows_max_bytes,columns_max_number,has_parquet_files,error_code",
[
(0, 10, "TooBigContentError"), # too small limit, even with truncation
(1_000, 1, "TooManyColumnsError"), # too small columns limit
(1_000, 10, None),
(0, 10, True, "TooBigContentError"), # too small limit, even with truncation
(1_000, 1, True, "TooManyColumnsError"), # too small columns limit
(1_000, 10, True, None),
(1_000, 10, False, "ParquetResponseEmptyError"),
],
)
def test_compute(
Expand All @@ -136,6 +137,7 @@ def test_compute(
app_config: AppConfig,
rows_max_bytes: int,
columns_max_number: int,
has_parquet_files: bool,
error_code: str,
) -> None:
dataset, config, split = "dataset", "config", "split"
Expand All @@ -158,6 +160,8 @@ def test_compute(
"parquet_metadata_subpath": fake_metadata_subpath,
}
]
if has_parquet_files
else []
}

upsert_response(
Expand Down

0 comments on commit 882de92

Please sign in to comment.