diff --git a/magicparse/schema.py b/magicparse/schema.py index aa0c6da..348dc55 100644 --- a/magicparse/schema.py +++ b/magicparse/schema.py @@ -44,7 +44,20 @@ def register(cls, schema: "Schema") -> None: cls.registry[schema.key()] = schema + def parse(self, data: Union[bytes, BytesIO]) -> Tuple[List[dict], List[dict]]: + items = [] + errors = [] + + for item, row_errors in self.stream_parse(data): + if(row_errors): + errors.extend(row_errors) + else: + items.append(item) + + return items, errors + + def stream_parse(self, data: Union[bytes, BytesIO]) -> Iterable[Tuple[dict, list[dict]]]: if isinstance(data, bytes): stream = BytesIO(data) else: @@ -57,18 +70,15 @@ def parse(self, data: Union[bytes, BytesIO]) -> Tuple[List[dict], List[dict]]: next(reader) row_number += 1 - result = [] - errors = [] for row in reader: + errors = [] row_number += 1 - row_is_valid = True item = {} for field in self.fields: try: value = field.read_value(row) except Exception as exc: errors.append({"row-number": row_number, **field.error(exc)}) - row_is_valid = False continue item[field.key] = value @@ -80,16 +90,11 @@ def parse(self, data: Union[bytes, BytesIO]) -> Tuple[List[dict], List[dict]]: errors.append( {"row-number": row_number, **computed_field.error(exc)} ) - row_is_valid = False continue item[computed_field.key] = value - if row_is_valid: - result.append(item) - - return result, errors - + yield item, errors class CsvSchema(Schema): def __init__(self, options: Dict[str, Any]) -> None: diff --git a/pyproject.toml b/pyproject.toml index b69cac2..70870c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,3 +33,4 @@ exclude = [".git/", ".pytest_cache/", ".venv"] [tool.pytest.ini_options] python_files = ["tests/*"] +log_cli = true diff --git a/tests/test_schema.py b/tests/test_schema.py index 7df6b47..545cc32 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -1,10 +1,13 @@ from decimal import Decimal -from magicparse import Schema + +from pyparsing import unicode_string +from magicparse import Schema, TypeConverter, register from magicparse.schema import ColumnarSchema, CsvSchema from magicparse.fields import ColumnarField, CsvField import pytest from unittest import TestCase - +import logging +import psutil class TestBuild(TestCase): def test_default_csv_schema(self): @@ -287,3 +290,125 @@ def test_register(self): } ) assert isinstance(schema, self.PipedSchema) + +class TestSteamParse(TestCase): + + def test_stream_parse_errors_do_not_halt_parsing(self): + schema = Schema.build( + { + "file_type": "csv", + "fields": [{"key": "age", "type": "int", "column-number": 1}], + } + ) + rows = list(schema.stream_parse(b"1\na\n2")) + assert rows == [ + ({"age": 1}, []), + ({}, [{ + "row-number": 2, + "column-number": 1, + "field-key": "age", + "error": "value 'a' is not a valid integer", + }]), + ({"age": 2}, []) + ] + +#TODO: TO REMOVE BELOW +LOGGER = logging.getLogger(__name__) +ITERATION_COUNT = 1000000 +class TestPerformanceToRemove(TestCase): + + class LogConverter(TypeConverter): + #LOGGER = logging.getLogger(__name__) + + @staticmethod + def key() -> str: + return "log" + + def apply(self, value): + LOGGER.critical("Read value " + value) + return value + + def test_parsing_order(self): + register(self.LogConverter) + + schema = Schema.build( + { + "file_type": "csv", + "fields": [{"key": "name", "type": "log", "column-number": 1}], + } + ) + input_csv = b"1\n2\n3\n4\n5" + rows, errors = schema.parse(input_csv) + assert not errors + + for row in rows: + LOGGER.critical("Write value " + row['name']) + + def test_streaming_order(self): + register(self.LogConverter) + + schema = Schema.build( + { + "file_type": "csv", + "fields": [{"key": "name", "type": "log", "column-number": 1}], + } + ) + input_csv = b"1\n2\n3\n4\n5\n" + + for row in schema.stream_parse(input_csv): + item, errors = row + LOGGER.critical("Write value " + item['name']) + + def test_parsing_memory_usage(self): + schema = Schema.build( + { + "file_type": "csv", + "fields": [ + {"key": "num", "type": "decimal", "column-number": 1}, + {"key": "name", "type": "str", "column-number": 2}, + {"key": "date", "type": "datetime", "column-number": 3}, + {"key": "description", "type": "str", "column-number": 4}, + ], + } + ) + input_csv = ''.join([f'{num},"This is my name {num}",2022-01-12T10:12:03+03:00,"This is a very long description to load the memory with data\n' for num in range(ITERATION_COUNT)]).encode('utf-8') + + process = psutil.Process() + memory_percent = process.memory_percent() + LOGGER.critical(f"Memory Usage: {memory_percent}%") + + rows, errors = schema.parse(input_csv) + assert errors == [] + i = 0 + for row in rows: + if i % (ITERATION_COUNT / 10) == 0 : + memory_percent = process.memory_percent() + LOGGER.critical(f"Memory Usage: {memory_percent}%") + i = i + 1 + + def test_streaming_memory_usage(self): + schema = Schema.build( + { + "file_type": "csv", + "fields": [ + {"key": "num", "type": "decimal", "column-number": 1}, + {"key": "name", "type": "str", "column-number": 2}, + {"key": "date", "type": "datetime", "column-number": 3}, + {"key": "description", "type": "str", "column-number": 4}, + ], + } + ) + input_csv = ''.join([f'{num},"This is my name {num}","2022-01-12T10:12:03+03:00","This is a very long description to load the memory with data\n' for num in range(ITERATION_COUNT)]).encode('utf-8') + + process = psutil.Process() + memory_percent = process.memory_percent() + LOGGER.critical(f"Memory Usage: {memory_percent}%") + + i = 0 + for row in schema.stream_parse(input_csv): + item, errors = row + if i % (ITERATION_COUNT / 10) == 0 : + memory_percent = process.memory_percent() + LOGGER.critical(f"Memory Usage: {memory_percent}%") + i = i + 1 + \ No newline at end of file