Skip to content

Commit

Permalink
Add stream parsing in Schema class + temporary tests to verify the co…
Browse files Browse the repository at this point in the history
…nsumption of memory
  • Loading branch information
antoine-b-smartway committed Apr 18, 2024
1 parent fc73462 commit 3fc9254
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 12 deletions.
25 changes: 15 additions & 10 deletions magicparse/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,20 @@ def register(cls, schema: "Schema") -> None:

cls.registry[schema.key()] = schema


def parse(self, data: Union[bytes, BytesIO]) -> Tuple[List[dict], List[dict]]:
items = []
errors = []

for item, row_errors in self.stream_parse(data):
if(row_errors):
errors.extend(row_errors)
else:
items.append(item)

return items, errors

def stream_parse(self, data: Union[bytes, BytesIO]) -> Iterable[Tuple[dict, list[dict]]]:
if isinstance(data, bytes):
stream = BytesIO(data)
else:
Expand All @@ -57,18 +70,15 @@ def parse(self, data: Union[bytes, BytesIO]) -> Tuple[List[dict], List[dict]]:
next(reader)
row_number += 1

result = []
errors = []
for row in reader:
errors = []
row_number += 1
row_is_valid = True
item = {}
for field in self.fields:
try:
value = field.read_value(row)
except Exception as exc:
errors.append({"row-number": row_number, **field.error(exc)})
row_is_valid = False
continue

item[field.key] = value
Expand All @@ -80,16 +90,11 @@ def parse(self, data: Union[bytes, BytesIO]) -> Tuple[List[dict], List[dict]]:
errors.append(
{"row-number": row_number, **computed_field.error(exc)}
)
row_is_valid = False
continue

item[computed_field.key] = value

if row_is_valid:
result.append(item)

return result, errors

yield item, errors

class CsvSchema(Schema):
def __init__(self, options: Dict[str, Any]) -> None:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ extend-ignore = ["E203", "E722"]
exclude = [".git/", ".pytest_cache/", ".venv"]

[tool.pytest.ini_options]
python_files = ["tests/*"]
python_files = ["tests/*"]
23 changes: 22 additions & 1 deletion tests/test_schema.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from decimal import Decimal

from magicparse import Schema
from magicparse.schema import ColumnarSchema, CsvSchema
from magicparse.fields import ColumnarField, CsvField
import pytest
from unittest import TestCase


class TestBuild(TestCase):
def test_default_csv_schema(self):
schema = Schema.build(
Expand Down Expand Up @@ -287,3 +287,24 @@ def test_register(self):
}
)
assert isinstance(schema, self.PipedSchema)

class TestSteamParse(TestCase):

def test_stream_parse_errors_do_not_halt_parsing(self):
schema = Schema.build(
{
"file_type": "csv",
"fields": [{"key": "age", "type": "int", "column-number": 1}],
}
)
rows = list(schema.stream_parse(b"1\na\n2"))
assert rows == [
({"age": 1}, []),
({}, [{
"row-number": 2,
"column-number": 1,
"field-key": "age",
"error": "value 'a' is not a valid integer",
}]),
({"age": 2}, [])
]

0 comments on commit 3fc9254

Please sign in to comment.