Skip to content

Commit

Permalink
Add stream parsing in Schema class + temporary tests to verify the co…
Browse files Browse the repository at this point in the history
…nsumption of memory
  • Loading branch information
antoine-b-smartway committed Apr 19, 2024
1 parent fc73462 commit e11a82d
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 10 deletions.
25 changes: 16 additions & 9 deletions magicparse/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ def register(cls, schema: "Schema") -> None:
cls.registry[schema.key()] = schema

def parse(self, data: Union[bytes, BytesIO]) -> Tuple[List[dict], List[dict]]:
items = []
errors = []

for item, row_errors in self.stream_parse(data):
if row_errors:
errors.extend(row_errors)
else:
items.append(item)

return items, errors

def stream_parse(
self, data: Union[bytes, BytesIO]
) -> Iterable[Tuple[dict, list[dict]]]:
if isinstance(data, bytes):
stream = BytesIO(data)
else:
Expand All @@ -57,18 +71,15 @@ def parse(self, data: Union[bytes, BytesIO]) -> Tuple[List[dict], List[dict]]:
next(reader)
row_number += 1

result = []
errors = []
for row in reader:
errors = []
row_number += 1
row_is_valid = True
item = {}
for field in self.fields:
try:
value = field.read_value(row)
except Exception as exc:
errors.append({"row-number": row_number, **field.error(exc)})
row_is_valid = False
continue

item[field.key] = value
Expand All @@ -80,15 +91,11 @@ def parse(self, data: Union[bytes, BytesIO]) -> Tuple[List[dict], List[dict]]:
errors.append(
{"row-number": row_number, **computed_field.error(exc)}
)
row_is_valid = False
continue

item[computed_field.key] = value

if row_is_valid:
result.append(item)

return result, errors
yield item, errors


class CsvSchema(Schema):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ extend-ignore = ["E203", "E722"]
exclude = [".git/", ".pytest_cache/", ".venv"]

[tool.pytest.ini_options]
python_files = ["tests/*"]
python_files = ["tests/*"]
27 changes: 27 additions & 0 deletions tests/test_schema.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from decimal import Decimal

from magicparse import Schema
from magicparse.schema import ColumnarSchema, CsvSchema
from magicparse.fields import ColumnarField, CsvField
Expand Down Expand Up @@ -287,3 +288,29 @@ def test_register(self):
}
)
assert isinstance(schema, self.PipedSchema)


class TestSteamParse(TestCase):
def test_stream_parse_errors_do_not_halt_parsing(self):
schema = Schema.build(
{
"file_type": "csv",
"fields": [{"key": "age", "type": "int", "column-number": 1}],
}
)
rows = list(schema.stream_parse(b"1\na\n2"))
assert rows == [
({"age": 1}, []),
(
{},
[
{
"row-number": 2,
"column-number": 1,
"field-key": "age",
"error": "value 'a' is not a valid integer",
}
],
),
({"age": 2}, []),
]

0 comments on commit e11a82d

Please sign in to comment.