Skip to content

Commit

Permalink
♻️✅ Schema - CSV Reader TU/multi buffer read
Browse files Browse the repository at this point in the history
  • Loading branch information
m.bazire committed Nov 2, 2023
1 parent bf62187 commit b9d4c9d
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 8 deletions.
20 changes: 12 additions & 8 deletions magicparse/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ def __init__(self, options: Dict[str, Any]) -> None:
def get_reader(self, stream: BytesIO) -> Iterable:
pass

def get_stream_readers(self, content: bytes) -> Tuple[Iterable, Iterable]:
schema_reader = self.get_reader(BytesIO(content))
raw_reader = BytesIO(content)
return schema_reader, raw_reader

@staticmethod
def key() -> str:
pass
Expand Down Expand Up @@ -83,19 +88,18 @@ def stream_parse(
on_valid_parsed_row: OnValidRowCallback,
on_invalid_parsed_row: OnInvalidRowCallback,
) -> None:
if isinstance(data, bytes):
stream = BytesIO(data)
else:
stream = data
if isinstance(data, BytesIO):
data = data.read()

reader = self.get_reader(stream)
reader, raw_stream = self.get_stream_readers(data)

row_number = 0
if self.has_header:
next(reader)
next(raw_stream)
row_number += 1

for row in reader:
for row, raw_row in zip(reader, raw_stream):
errors = []
row_is_valid = True
item = {}
Expand All @@ -110,9 +114,9 @@ def stream_parse(
item[field.key] = value

if row_is_valid:
on_valid_parsed_row(index=row_number, parsed_row=item, raw_data=row)
on_valid_parsed_row(index=row_number, parsed_row=item, raw_data=raw_row)
else:
on_invalid_parsed_row(errors_info=errors, raw_data=row)
on_invalid_parsed_row(errors_info=errors, raw_data=raw_row)

row_number += 1

Expand Down
121 changes: 121 additions & 0 deletions tests/test_schema.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from decimal import Decimal
from unittest.mock import Mock
from magicparse import Schema
from magicparse.schema import ColumnarSchema, CsvSchema
from magicparse.fields import ColumnarField, CsvField
Expand Down Expand Up @@ -130,6 +131,126 @@ def test_errors_do_not_halt_parsing(self):
]


class TestStreamParse:
def test_with_no_data(self):
on_valid_row = Mock()
on_error_row = Mock()
schema = Schema.build(
{
"file_type": "csv",
"fields": [{"key": "name", "type": "str", "column-number": 1}],
}
)
schema.stream_parse(b"", on_valid_row, on_error_row)
assert not on_valid_row.called
assert not on_error_row.called

def test_with_no_field_definition(self):
on_valid_row = Mock()
on_error_row = Mock()
schema = Schema.build({"file_type": "csv", "fields": []})
schema.stream_parse(b"a,b,c\n", on_valid_row, on_error_row)
on_valid_row.assert_called_once_with(
index=0, parsed_row={}, raw_data=b"a,b,c\n"
)
assert not on_error_row.called

def test_without_header(self):
on_valid_row = Mock()
on_error_row = Mock()
schema = Schema.build(
{
"file_type": "csv",
"fields": [{"key": "name", "type": "str", "column-number": 1}],
}
)
schema.stream_parse(b"1\n", on_valid_row, on_error_row)
on_valid_row.assert_called_once_with(
index=0, parsed_row={"name": "1"}, raw_data=b"1\n"
)
assert not on_error_row.called

def test_with_header(self):
on_valid_row = Mock()
on_error_row = Mock()
schema = Schema.build(
{
"file_type": "csv",
"has_header": True,
"fields": [{"key": "name", "type": "str", "column-number": 1}],
}
)
schema.stream_parse(b"column_name\n1\n", on_valid_row, on_error_row)

on_valid_row.assert_called_once_with(
index=1, parsed_row={"name": "1"}, raw_data=b"1\n"
)
assert not on_error_row.called

def test_multiple_lines(self):
on_valid_row = Mock()
on_error_row = Mock()
schema = Schema.build(
{
"file_type": "csv",
"fields": [{"key": "name", "type": "str", "column-number": 1}],
}
)
schema.stream_parse(b"1\n2\n", on_valid_row, on_error_row)
assert on_valid_row.call_count == 2
on_valid_row.assert_any_call(index=0, parsed_row={"name": "1"}, raw_data=b"1\n")
on_valid_row.assert_any_call(index=1, parsed_row={"name": "2"}, raw_data=b"2\n")
assert not on_error_row.called

def test_error_display_row_number(self):
on_valid_row = Mock()
on_error_row = Mock()
schema = Schema.build(
{
"file_type": "csv",
"fields": [{"key": "age", "type": "int", "column-number": 1}],
}
)
schema.stream_parse(b"a", on_valid_row, on_error_row)
assert not on_valid_row.called
on_error_row.assert_called_once_with(
errors_info=[
{
"row-number": 0,
"column-number": 1,
"field-key": "age",
"error": "value is not a valid integer",
}
],
raw_data=b"a",
)

def test_errors_do_not_halt_parsing(self):
on_valid_row = Mock()
on_error_row = Mock()
schema = Schema.build(
{
"file_type": "csv",
"fields": [{"key": "age", "type": "int", "column-number": 1}],
}
)
schema.stream_parse(b"1\na\n2\n", on_valid_row, on_error_row)
assert on_valid_row.call_count == 2
on_valid_row.assert_any_call(index=0, parsed_row={"age": 1}, raw_data=b"1\n")
on_valid_row.assert_any_call(index=2, parsed_row={"age": 2}, raw_data=b"2\n")
on_error_row.assert_called_once_with(
errors_info=[
{
"row-number": 1,
"column-number": 1,
"field-key": "age",
"error": "value is not a valid integer",
}
],
raw_data=b"a\n",
)


class TestColumnarParse(TestCase):
def test_with_no_data(self):
schema = Schema.build(
Expand Down

0 comments on commit b9d4c9d

Please sign in to comment.