From bf6218718eef171a0faf8cd55c94e644b0068729 Mon Sep 17 00:00:00 2001 From: "m.bazire" Date: Thu, 2 Nov 2023 14:08:52 +0100 Subject: [PATCH 1/2] :sparkles: StreamParsing - Add streaming parsing capacity --- magicparse/__init__.py | 17 ++++++++++++++++- magicparse/callbacks.py | 11 +++++++++++ magicparse/schema.py | 41 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 1 deletion(-) create mode 100644 magicparse/callbacks.py diff --git a/magicparse/__init__.py b/magicparse/__init__.py index cc43a53..f778fd3 100644 --- a/magicparse/__init__.py +++ b/magicparse/__init__.py @@ -1,11 +1,12 @@ from io import BytesIO +from typing import Any, Dict, List, Tuple, Union +from .callbacks import OnInvalidRowCallback, OnValidRowCallback from .schema import Schema, builtins as builtins_schemas from .post_processors import PostProcessor, builtins as builtins_post_processors from .pre_processors import PreProcessor, builtins as builtins_pre_processors from .transform import Transform from .type_converters import TypeConverter, builtins as builtins_type_converters -from typing import Any, Dict, List, Tuple, Union from .validators import Validator, builtins as builtins_validators @@ -26,6 +27,20 @@ def parse( return schema_definition.parse(data) +def stream_parse( + data: Union[bytes, BytesIO], + schema_options: Dict[str, Any], + on_valid_parsed_row: OnValidRowCallback, + on_invalid_parsed_row: OnInvalidRowCallback, +) -> None: + schema_definition = Schema.build(schema_options) + return schema_definition.stream_parse( + data=data, + on_valid_parsed_row=on_valid_parsed_row, + on_invalid_parsed_row=on_invalid_parsed_row, + ) + + Registrable = Union[Schema, Transform] diff --git a/magicparse/callbacks.py b/magicparse/callbacks.py new file mode 100644 index 0000000..26909ba --- /dev/null +++ b/magicparse/callbacks.py @@ -0,0 +1,11 @@ +from typing import Any, Dict, List, Protocol + + +class OnValidRowCallback(Protocol): + def __call__(self, index: int, parsed_row: Dict[str, Any], raw_data: Any) -> None: + ... + + +class OnInvalidRowCallback(Protocol): + def __call__(self, errors_info: List[Dict[str, Any]], raw_data: Any) -> None: + ... diff --git a/magicparse/schema.py b/magicparse/schema.py index 40f1002..90c5c5e 100644 --- a/magicparse/schema.py +++ b/magicparse/schema.py @@ -1,6 +1,8 @@ import codecs from abc import ABC, abstractmethod import csv + +from magicparse import OnInvalidRowCallback, OnValidRowCallback from .fields import Field from io import BytesIO from typing import Any, Dict, List, Tuple, Union, Iterable @@ -75,6 +77,45 @@ def parse(self, data: Union[bytes, BytesIO]) -> Tuple[List[dict], List[dict]]: return result, errors + def stream_parse( + self, + data: Union[bytes, BytesIO], + on_valid_parsed_row: OnValidRowCallback, + on_invalid_parsed_row: OnInvalidRowCallback, + ) -> None: + if isinstance(data, bytes): + stream = BytesIO(data) + else: + stream = data + + reader = self.get_reader(stream) + + row_number = 0 + if self.has_header: + next(reader) + row_number += 1 + + for row in reader: + errors = [] + row_is_valid = True + item = {} + for field in self.fields: + try: + value = field.read_value(row) + except Exception as exc: + errors.append({"row-number": row_number, **field.error(exc)}) + row_is_valid = False + continue + + item[field.key] = value + + if row_is_valid: + on_valid_parsed_row(index=row_number, parsed_row=item, raw_data=row) + else: + on_invalid_parsed_row(errors_info=errors, raw_data=row) + + row_number += 1 + class CsvSchema(Schema): def __init__(self, options: Dict[str, Any]) -> None: From b9d4c9d374b3a84085dcf6ad8c8f145b68b9106b Mon Sep 17 00:00:00 2001 From: "m.bazire" Date: Thu, 2 Nov 2023 17:31:21 +0100 Subject: [PATCH 2/2] :recycle::white_check_mark: Schema - CSV Reader TU/multi buffer read --- magicparse/schema.py | 20 ++++--- tests/test_schema.py | 121 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+), 8 deletions(-) diff --git a/magicparse/schema.py b/magicparse/schema.py index 90c5c5e..a8bfa66 100644 --- a/magicparse/schema.py +++ b/magicparse/schema.py @@ -23,6 +23,11 @@ def __init__(self, options: Dict[str, Any]) -> None: def get_reader(self, stream: BytesIO) -> Iterable: pass + def get_stream_readers(self, content: bytes) -> Tuple[Iterable, Iterable]: + schema_reader = self.get_reader(BytesIO(content)) + raw_reader = BytesIO(content) + return schema_reader, raw_reader + @staticmethod def key() -> str: pass @@ -83,19 +88,18 @@ def stream_parse( on_valid_parsed_row: OnValidRowCallback, on_invalid_parsed_row: OnInvalidRowCallback, ) -> None: - if isinstance(data, bytes): - stream = BytesIO(data) - else: - stream = data + if isinstance(data, BytesIO): + data = data.read() - reader = self.get_reader(stream) + reader, raw_stream = self.get_stream_readers(data) row_number = 0 if self.has_header: next(reader) + next(raw_stream) row_number += 1 - for row in reader: + for row, raw_row in zip(reader, raw_stream): errors = [] row_is_valid = True item = {} @@ -110,9 +114,9 @@ def stream_parse( item[field.key] = value if row_is_valid: - on_valid_parsed_row(index=row_number, parsed_row=item, raw_data=row) + on_valid_parsed_row(index=row_number, parsed_row=item, raw_data=raw_row) else: - on_invalid_parsed_row(errors_info=errors, raw_data=row) + on_invalid_parsed_row(errors_info=errors, raw_data=raw_row) row_number += 1 diff --git a/tests/test_schema.py b/tests/test_schema.py index 65e3fd7..41e8449 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -1,4 +1,5 @@ from decimal import Decimal +from unittest.mock import Mock from magicparse import Schema from magicparse.schema import ColumnarSchema, CsvSchema from magicparse.fields import ColumnarField, CsvField @@ -130,6 +131,126 @@ def test_errors_do_not_halt_parsing(self): ] +class TestStreamParse: + def test_with_no_data(self): + on_valid_row = Mock() + on_error_row = Mock() + schema = Schema.build( + { + "file_type": "csv", + "fields": [{"key": "name", "type": "str", "column-number": 1}], + } + ) + schema.stream_parse(b"", on_valid_row, on_error_row) + assert not on_valid_row.called + assert not on_error_row.called + + def test_with_no_field_definition(self): + on_valid_row = Mock() + on_error_row = Mock() + schema = Schema.build({"file_type": "csv", "fields": []}) + schema.stream_parse(b"a,b,c\n", on_valid_row, on_error_row) + on_valid_row.assert_called_once_with( + index=0, parsed_row={}, raw_data=b"a,b,c\n" + ) + assert not on_error_row.called + + def test_without_header(self): + on_valid_row = Mock() + on_error_row = Mock() + schema = Schema.build( + { + "file_type": "csv", + "fields": [{"key": "name", "type": "str", "column-number": 1}], + } + ) + schema.stream_parse(b"1\n", on_valid_row, on_error_row) + on_valid_row.assert_called_once_with( + index=0, parsed_row={"name": "1"}, raw_data=b"1\n" + ) + assert not on_error_row.called + + def test_with_header(self): + on_valid_row = Mock() + on_error_row = Mock() + schema = Schema.build( + { + "file_type": "csv", + "has_header": True, + "fields": [{"key": "name", "type": "str", "column-number": 1}], + } + ) + schema.stream_parse(b"column_name\n1\n", on_valid_row, on_error_row) + + on_valid_row.assert_called_once_with( + index=1, parsed_row={"name": "1"}, raw_data=b"1\n" + ) + assert not on_error_row.called + + def test_multiple_lines(self): + on_valid_row = Mock() + on_error_row = Mock() + schema = Schema.build( + { + "file_type": "csv", + "fields": [{"key": "name", "type": "str", "column-number": 1}], + } + ) + schema.stream_parse(b"1\n2\n", on_valid_row, on_error_row) + assert on_valid_row.call_count == 2 + on_valid_row.assert_any_call(index=0, parsed_row={"name": "1"}, raw_data=b"1\n") + on_valid_row.assert_any_call(index=1, parsed_row={"name": "2"}, raw_data=b"2\n") + assert not on_error_row.called + + def test_error_display_row_number(self): + on_valid_row = Mock() + on_error_row = Mock() + schema = Schema.build( + { + "file_type": "csv", + "fields": [{"key": "age", "type": "int", "column-number": 1}], + } + ) + schema.stream_parse(b"a", on_valid_row, on_error_row) + assert not on_valid_row.called + on_error_row.assert_called_once_with( + errors_info=[ + { + "row-number": 0, + "column-number": 1, + "field-key": "age", + "error": "value is not a valid integer", + } + ], + raw_data=b"a", + ) + + def test_errors_do_not_halt_parsing(self): + on_valid_row = Mock() + on_error_row = Mock() + schema = Schema.build( + { + "file_type": "csv", + "fields": [{"key": "age", "type": "int", "column-number": 1}], + } + ) + schema.stream_parse(b"1\na\n2\n", on_valid_row, on_error_row) + assert on_valid_row.call_count == 2 + on_valid_row.assert_any_call(index=0, parsed_row={"age": 1}, raw_data=b"1\n") + on_valid_row.assert_any_call(index=2, parsed_row={"age": 2}, raw_data=b"2\n") + on_error_row.assert_called_once_with( + errors_info=[ + { + "row-number": 1, + "column-number": 1, + "field-key": "age", + "error": "value is not a valid integer", + } + ], + raw_data=b"a\n", + ) + + class TestColumnarParse(TestCase): def test_with_no_data(self): schema = Schema.build(