Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ StreamParsing - Add streaming parsing capacity #12

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion magicparse/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from io import BytesIO
from typing import Any, Dict, List, Tuple, Union

from .callbacks import OnInvalidRowCallback, OnValidRowCallback
from .schema import Schema, builtins as builtins_schemas
from .post_processors import PostProcessor, builtins as builtins_post_processors
from .pre_processors import PreProcessor, builtins as builtins_pre_processors
from .transform import Transform
from .type_converters import TypeConverter, builtins as builtins_type_converters
from typing import Any, Dict, List, Tuple, Union
from .validators import Validator, builtins as builtins_validators


Expand All @@ -26,6 +27,20 @@ def parse(
return schema_definition.parse(data)


def stream_parse(
data: Union[bytes, BytesIO],
schema_options: Dict[str, Any],
on_valid_parsed_row: OnValidRowCallback,
on_invalid_parsed_row: OnInvalidRowCallback,
) -> None:
schema_definition = Schema.build(schema_options)
return schema_definition.stream_parse(
data=data,
on_valid_parsed_row=on_valid_parsed_row,
on_invalid_parsed_row=on_invalid_parsed_row,
Comment on lines +39 to +40
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C'est quoi le besoin exact svp? @antoine-b-smartway @pewho @EwenBALOUIN

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Le but est d'agréger plusieurs fichiers d'import d'article en un seul. On n'a pas forcément besoin d'avoir la donnée parsée, on veut juste l'ean et la ligne brute originale pour la remettre dans le fichier de sortie.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Et pourquoi vous avez besoin de ça ? (désolé c'est vraiment pour comprendre 😄)

Copy link

@a-bertrand a-bertrand Nov 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pour par exemple rejouer les X arti d'un mag dans l'ordre.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pourquoi avoir la version brute ? J'ai peut être raté un truc, mais si au final notre fichier va être "tagué/nommé" d'une manière "spécifique", finalement on sait que pour lui on n'a pas besoin d'appliquer de parser.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c'est l'autre possiblité, mais ça veut dire que l'on fait un parser particulier pour celui ci. Je ne sais pas si c'est ce que l'on veut ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bas justement pas de parseur

)


Registrable = Union[Schema, Transform]


Expand Down
11 changes: 11 additions & 0 deletions magicparse/callbacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from typing import Any, Dict, List, Protocol


class OnValidRowCallback(Protocol):
def __call__(self, index: int, parsed_row: Dict[str, Any], raw_data: Any) -> None:
...


class OnInvalidRowCallback(Protocol):
def __call__(self, errors_info: List[Dict[str, Any]], raw_data: Any) -> None:
...
45 changes: 45 additions & 0 deletions magicparse/schema.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import codecs
from abc import ABC, abstractmethod
import csv

from magicparse import OnInvalidRowCallback, OnValidRowCallback
from .fields import Field
from io import BytesIO
from typing import Any, Dict, List, Tuple, Union, Iterable
Expand All @@ -21,6 +23,11 @@ def __init__(self, options: Dict[str, Any]) -> None:
def get_reader(self, stream: BytesIO) -> Iterable:
pass

def get_stream_readers(self, content: bytes) -> Tuple[Iterable, Iterable]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

J'ai pas trouvé mieux pour avoir le stream en double : en CSV, on ne peux pas acceder à la ligne lue dans le fichier (le reader ne la montre pas).
=> on a le fichier qui est donc en RAM :/

schema_reader = self.get_reader(BytesIO(content))
raw_reader = BytesIO(content)
return schema_reader, raw_reader

@staticmethod
def key() -> str:
pass
Expand Down Expand Up @@ -75,6 +82,44 @@ def parse(self, data: Union[bytes, BytesIO]) -> Tuple[List[dict], List[dict]]:

return result, errors

def stream_parse(
self,
data: Union[bytes, BytesIO],
on_valid_parsed_row: OnValidRowCallback,
on_invalid_parsed_row: OnInvalidRowCallback,
) -> None:
if isinstance(data, BytesIO):
data = data.read()

reader, raw_stream = self.get_stream_readers(data)

row_number = 0
if self.has_header:
next(reader)
next(raw_stream)
row_number += 1

for row, raw_row in zip(reader, raw_stream):
errors = []
row_is_valid = True
item = {}
for field in self.fields:
try:
value = field.read_value(row)
except Exception as exc:
errors.append({"row-number": row_number, **field.error(exc)})
row_is_valid = False
continue

item[field.key] = value

if row_is_valid:
on_valid_parsed_row(index=row_number, parsed_row=item, raw_data=raw_row)
else:
on_invalid_parsed_row(errors_info=errors, raw_data=raw_row)

row_number += 1


class CsvSchema(Schema):
def __init__(self, options: Dict[str, Any]) -> None:
Expand Down
121 changes: 121 additions & 0 deletions tests/test_schema.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from decimal import Decimal
from unittest.mock import Mock
from magicparse import Schema
from magicparse.schema import ColumnarSchema, CsvSchema
from magicparse.fields import ColumnarField, CsvField
Expand Down Expand Up @@ -130,6 +131,126 @@ def test_errors_do_not_halt_parsing(self):
]


class TestStreamParse:
def test_with_no_data(self):
on_valid_row = Mock()
on_error_row = Mock()
schema = Schema.build(
{
"file_type": "csv",
"fields": [{"key": "name", "type": "str", "column-number": 1}],
}
)
schema.stream_parse(b"", on_valid_row, on_error_row)
assert not on_valid_row.called
assert not on_error_row.called

def test_with_no_field_definition(self):
on_valid_row = Mock()
on_error_row = Mock()
schema = Schema.build({"file_type": "csv", "fields": []})
schema.stream_parse(b"a,b,c\n", on_valid_row, on_error_row)
on_valid_row.assert_called_once_with(
index=0, parsed_row={}, raw_data=b"a,b,c\n"
)
assert not on_error_row.called

def test_without_header(self):
on_valid_row = Mock()
on_error_row = Mock()
schema = Schema.build(
{
"file_type": "csv",
"fields": [{"key": "name", "type": "str", "column-number": 1}],
}
)
schema.stream_parse(b"1\n", on_valid_row, on_error_row)
on_valid_row.assert_called_once_with(
index=0, parsed_row={"name": "1"}, raw_data=b"1\n"
)
assert not on_error_row.called

def test_with_header(self):
on_valid_row = Mock()
on_error_row = Mock()
schema = Schema.build(
{
"file_type": "csv",
"has_header": True,
"fields": [{"key": "name", "type": "str", "column-number": 1}],
}
)
schema.stream_parse(b"column_name\n1\n", on_valid_row, on_error_row)

on_valid_row.assert_called_once_with(
index=1, parsed_row={"name": "1"}, raw_data=b"1\n"
)
assert not on_error_row.called

def test_multiple_lines(self):
on_valid_row = Mock()
on_error_row = Mock()
schema = Schema.build(
{
"file_type": "csv",
"fields": [{"key": "name", "type": "str", "column-number": 1}],
}
)
schema.stream_parse(b"1\n2\n", on_valid_row, on_error_row)
assert on_valid_row.call_count == 2
on_valid_row.assert_any_call(index=0, parsed_row={"name": "1"}, raw_data=b"1\n")
on_valid_row.assert_any_call(index=1, parsed_row={"name": "2"}, raw_data=b"2\n")
assert not on_error_row.called

def test_error_display_row_number(self):
on_valid_row = Mock()
on_error_row = Mock()
schema = Schema.build(
{
"file_type": "csv",
"fields": [{"key": "age", "type": "int", "column-number": 1}],
}
)
schema.stream_parse(b"a", on_valid_row, on_error_row)
assert not on_valid_row.called
on_error_row.assert_called_once_with(
errors_info=[
{
"row-number": 0,
"column-number": 1,
"field-key": "age",
"error": "value is not a valid integer",
}
],
raw_data=b"a",
)

def test_errors_do_not_halt_parsing(self):
on_valid_row = Mock()
on_error_row = Mock()
schema = Schema.build(
{
"file_type": "csv",
"fields": [{"key": "age", "type": "int", "column-number": 1}],
}
)
schema.stream_parse(b"1\na\n2\n", on_valid_row, on_error_row)
assert on_valid_row.call_count == 2
on_valid_row.assert_any_call(index=0, parsed_row={"age": 1}, raw_data=b"1\n")
on_valid_row.assert_any_call(index=2, parsed_row={"age": 2}, raw_data=b"2\n")
on_error_row.assert_called_once_with(
errors_info=[
{
"row-number": 1,
"column-number": 1,
"field-key": "age",
"error": "value is not a valid integer",
}
],
raw_data=b"a\n",
)


class TestColumnarParse(TestCase):
def test_with_no_data(self):
schema = Schema.build(
Expand Down
Loading