Skip to content

Commit

Permalink
✨ StreamParsing - Add streaming parsing capacity
Browse files Browse the repository at this point in the history
  • Loading branch information
m.bazire committed Nov 2, 2023
1 parent 6634fd8 commit c608881
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
22 changes: 21 additions & 1 deletion magicparse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from .pre_processors import PreProcessor, builtins as builtins_pre_processors
from .transform import Transform
from .type_converters import TypeConverter, builtins as builtins_type_converters
from typing import Any, Dict, List, Tuple, Union
from typing import Any, Dict, List, Protocol, Tuple, Union
from .validators import Validator, builtins as builtins_validators


Expand All @@ -26,6 +26,26 @@ def parse(
return schema_definition.parse(data)


class OnNewRowCallback(Protocol):
def __call__(self, index: int, parsed_row: Dict[str, Any], raw_data: Any) -> None:
...


class OnErrorCallback(Protocol):
def __call__(self, errors_info: List[Dict[str, Any]], raw_data: Any) -> None:
...


def stream_parse(
data: Union[bytes, BytesIO],
schema_options: Dict[str, Any],
on_new_row: OnNewRowCallback,
on_error: OnErrorCallback
) -> None:
schema_definition = Schema.build(schema_options)
return schema_definition.stream_parse(data, on_new_row, on_error)


Registrable = Union[Schema, Transform]


Expand Down
42 changes: 42 additions & 0 deletions magicparse/schema.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import codecs
from abc import ABC, abstractmethod
import csv

from magicparse import OnErrorCallback, OnNewRowCallback
from .fields import Field
from io import BytesIO
from typing import Any, Dict, List, Tuple, Union, Iterable
Expand Down Expand Up @@ -75,6 +77,46 @@ def parse(self, data: Union[bytes, BytesIO]) -> Tuple[List[dict], List[dict]]:

return result, errors

def stream_parse(self, data: Union[bytes, BytesIO], on_new_row: OnNewRowCallback, on_error: OnErrorCallback) -> None:
if isinstance(data, bytes):
stream = BytesIO(data)
else:
stream = data

reader = self.get_reader(stream)

row_number = 0
if self.has_header:
next(reader)
row_number += 1

for row in reader:
errors = []
row_is_valid = True
item = {}
for field in self.fields:
try:
value = field.read_value(row)
except Exception as exc:
errors.append({"row-number": row_number, **field.error(exc)})
row_is_valid = False
continue

item[field.key] = value

if row_is_valid:
on_new_row(
index=row_number,
parsed_row=item,
raw_data=row
)
else:
on_error(errors_info=errors, raw_data=row)

row_number += 1

return None


class CsvSchema(Schema):
def __init__(self, options: Dict[str, Any]) -> None:
Expand Down

0 comments on commit c608881

Please sign in to comment.