From c6088810e83324b5ad15469401a46f5eb3ce9296 Mon Sep 17 00:00:00 2001 From: "m.bazire" Date: Thu, 2 Nov 2023 14:08:52 +0100 Subject: [PATCH] :sparkles: StreamParsing - Add streaming parsing capacity --- magicparse/__init__.py | 22 +++++++++++++++++++++- magicparse/schema.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/magicparse/__init__.py b/magicparse/__init__.py index cc43a53..5762cf1 100644 --- a/magicparse/__init__.py +++ b/magicparse/__init__.py @@ -5,7 +5,7 @@ from .pre_processors import PreProcessor, builtins as builtins_pre_processors from .transform import Transform from .type_converters import TypeConverter, builtins as builtins_type_converters -from typing import Any, Dict, List, Tuple, Union +from typing import Any, Dict, List, Protocol, Tuple, Union from .validators import Validator, builtins as builtins_validators @@ -26,6 +26,26 @@ def parse( return schema_definition.parse(data) +class OnNewRowCallback(Protocol): + def __call__(self, index: int, parsed_row: Dict[str, Any], raw_data: Any) -> None: + ... + + +class OnErrorCallback(Protocol): + def __call__(self, errors_info: List[Dict[str, Any]], raw_data: Any) -> None: + ... + + +def stream_parse( + data: Union[bytes, BytesIO], + schema_options: Dict[str, Any], + on_new_row: OnNewRowCallback, + on_error: OnErrorCallback +) -> None: + schema_definition = Schema.build(schema_options) + return schema_definition.stream_parse(data, on_new_row, on_error) + + Registrable = Union[Schema, Transform] diff --git a/magicparse/schema.py b/magicparse/schema.py index 40f1002..5e7c7b3 100644 --- a/magicparse/schema.py +++ b/magicparse/schema.py @@ -1,6 +1,8 @@ import codecs from abc import ABC, abstractmethod import csv + +from magicparse import OnErrorCallback, OnNewRowCallback from .fields import Field from io import BytesIO from typing import Any, Dict, List, Tuple, Union, Iterable @@ -75,6 +77,46 @@ def parse(self, data: Union[bytes, BytesIO]) -> Tuple[List[dict], List[dict]]: return result, errors + def stream_parse(self, data: Union[bytes, BytesIO], on_new_row: OnNewRowCallback, on_error: OnErrorCallback) -> None: + if isinstance(data, bytes): + stream = BytesIO(data) + else: + stream = data + + reader = self.get_reader(stream) + + row_number = 0 + if self.has_header: + next(reader) + row_number += 1 + + for row in reader: + errors = [] + row_is_valid = True + item = {} + for field in self.fields: + try: + value = field.read_value(row) + except Exception as exc: + errors.append({"row-number": row_number, **field.error(exc)}) + row_is_valid = False + continue + + item[field.key] = value + + if row_is_valid: + on_new_row( + index=row_number, + parsed_row=item, + raw_data=row + ) + else: + on_error(errors_info=errors, raw_data=row) + + row_number += 1 + + return None + class CsvSchema(Schema): def __init__(self, options: Dict[str, Any]) -> None: