From b9d4c9d374b3a84085dcf6ad8c8f145b68b9106b Mon Sep 17 00:00:00 2001 From: "m.bazire" Date: Thu, 2 Nov 2023 17:31:21 +0100 Subject: [PATCH] :recycle::white_check_mark: Schema - CSV Reader TU/multi buffer read --- magicparse/schema.py | 20 ++++--- tests/test_schema.py | 121 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+), 8 deletions(-) diff --git a/magicparse/schema.py b/magicparse/schema.py index 90c5c5e..a8bfa66 100644 --- a/magicparse/schema.py +++ b/magicparse/schema.py @@ -23,6 +23,11 @@ def __init__(self, options: Dict[str, Any]) -> None: def get_reader(self, stream: BytesIO) -> Iterable: pass + def get_stream_readers(self, content: bytes) -> Tuple[Iterable, Iterable]: + schema_reader = self.get_reader(BytesIO(content)) + raw_reader = BytesIO(content) + return schema_reader, raw_reader + @staticmethod def key() -> str: pass @@ -83,19 +88,18 @@ def stream_parse( on_valid_parsed_row: OnValidRowCallback, on_invalid_parsed_row: OnInvalidRowCallback, ) -> None: - if isinstance(data, bytes): - stream = BytesIO(data) - else: - stream = data + if isinstance(data, BytesIO): + data = data.read() - reader = self.get_reader(stream) + reader, raw_stream = self.get_stream_readers(data) row_number = 0 if self.has_header: next(reader) + next(raw_stream) row_number += 1 - for row in reader: + for row, raw_row in zip(reader, raw_stream): errors = [] row_is_valid = True item = {} @@ -110,9 +114,9 @@ def stream_parse( item[field.key] = value if row_is_valid: - on_valid_parsed_row(index=row_number, parsed_row=item, raw_data=row) + on_valid_parsed_row(index=row_number, parsed_row=item, raw_data=raw_row) else: - on_invalid_parsed_row(errors_info=errors, raw_data=row) + on_invalid_parsed_row(errors_info=errors, raw_data=raw_row) row_number += 1 diff --git a/tests/test_schema.py b/tests/test_schema.py index 65e3fd7..41e8449 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -1,4 +1,5 @@ from decimal import Decimal +from unittest.mock import Mock from magicparse import Schema from magicparse.schema import ColumnarSchema, CsvSchema from magicparse.fields import ColumnarField, CsvField @@ -130,6 +131,126 @@ def test_errors_do_not_halt_parsing(self): ] +class TestStreamParse: + def test_with_no_data(self): + on_valid_row = Mock() + on_error_row = Mock() + schema = Schema.build( + { + "file_type": "csv", + "fields": [{"key": "name", "type": "str", "column-number": 1}], + } + ) + schema.stream_parse(b"", on_valid_row, on_error_row) + assert not on_valid_row.called + assert not on_error_row.called + + def test_with_no_field_definition(self): + on_valid_row = Mock() + on_error_row = Mock() + schema = Schema.build({"file_type": "csv", "fields": []}) + schema.stream_parse(b"a,b,c\n", on_valid_row, on_error_row) + on_valid_row.assert_called_once_with( + index=0, parsed_row={}, raw_data=b"a,b,c\n" + ) + assert not on_error_row.called + + def test_without_header(self): + on_valid_row = Mock() + on_error_row = Mock() + schema = Schema.build( + { + "file_type": "csv", + "fields": [{"key": "name", "type": "str", "column-number": 1}], + } + ) + schema.stream_parse(b"1\n", on_valid_row, on_error_row) + on_valid_row.assert_called_once_with( + index=0, parsed_row={"name": "1"}, raw_data=b"1\n" + ) + assert not on_error_row.called + + def test_with_header(self): + on_valid_row = Mock() + on_error_row = Mock() + schema = Schema.build( + { + "file_type": "csv", + "has_header": True, + "fields": [{"key": "name", "type": "str", "column-number": 1}], + } + ) + schema.stream_parse(b"column_name\n1\n", on_valid_row, on_error_row) + + on_valid_row.assert_called_once_with( + index=1, parsed_row={"name": "1"}, raw_data=b"1\n" + ) + assert not on_error_row.called + + def test_multiple_lines(self): + on_valid_row = Mock() + on_error_row = Mock() + schema = Schema.build( + { + "file_type": "csv", + "fields": [{"key": "name", "type": "str", "column-number": 1}], + } + ) + schema.stream_parse(b"1\n2\n", on_valid_row, on_error_row) + assert on_valid_row.call_count == 2 + on_valid_row.assert_any_call(index=0, parsed_row={"name": "1"}, raw_data=b"1\n") + on_valid_row.assert_any_call(index=1, parsed_row={"name": "2"}, raw_data=b"2\n") + assert not on_error_row.called + + def test_error_display_row_number(self): + on_valid_row = Mock() + on_error_row = Mock() + schema = Schema.build( + { + "file_type": "csv", + "fields": [{"key": "age", "type": "int", "column-number": 1}], + } + ) + schema.stream_parse(b"a", on_valid_row, on_error_row) + assert not on_valid_row.called + on_error_row.assert_called_once_with( + errors_info=[ + { + "row-number": 0, + "column-number": 1, + "field-key": "age", + "error": "value is not a valid integer", + } + ], + raw_data=b"a", + ) + + def test_errors_do_not_halt_parsing(self): + on_valid_row = Mock() + on_error_row = Mock() + schema = Schema.build( + { + "file_type": "csv", + "fields": [{"key": "age", "type": "int", "column-number": 1}], + } + ) + schema.stream_parse(b"1\na\n2\n", on_valid_row, on_error_row) + assert on_valid_row.call_count == 2 + on_valid_row.assert_any_call(index=0, parsed_row={"age": 1}, raw_data=b"1\n") + on_valid_row.assert_any_call(index=2, parsed_row={"age": 2}, raw_data=b"2\n") + on_error_row.assert_called_once_with( + errors_info=[ + { + "row-number": 1, + "column-number": 1, + "field-key": "age", + "error": "value is not a valid integer", + } + ], + raw_data=b"a\n", + ) + + class TestColumnarParse(TestCase): def test_with_no_data(self): schema = Schema.build(