Skip to content

Commit

Permalink
Add stream parsing in Schema class + temporary tests to verify the co…
Browse files Browse the repository at this point in the history
…nsumption of memory
  • Loading branch information
antoine-b-smartway committed Apr 17, 2024
1 parent ea301a7 commit 7506094
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 13 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ rows, errors= magicparse.parse(data="...", schema=schema)
from uuid import UUID
import magicparse

class GuidConverter(magicparse.Converter):
class GuidConverter(magicparse.TypeConverter):
@staticmethod
def key() -> str:
return "guid"
Expand Down
25 changes: 15 additions & 10 deletions magicparse/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,20 @@ def register(cls, schema: "Schema") -> None:

cls.registry[schema.key()] = schema


def parse(self, data: Union[bytes, BytesIO]) -> Tuple[List[dict], List[dict]]:
items = []
errors = []

for item, rowErrors in self.stream_parse(data):
if(rowErrors):
errors.extend(rowErrors)
else:
items.append(item)

return items, errors

def stream_parse(self, data: Union[bytes, BytesIO]) -> Iterable[Tuple[dict, dict]]:
if isinstance(data, bytes):
stream = BytesIO(data)
else:
Expand All @@ -57,18 +70,15 @@ def parse(self, data: Union[bytes, BytesIO]) -> Tuple[List[dict], List[dict]]:
next(reader)
row_number += 1

result = []
errors = []
for row in reader:
errors = []
row_number += 1
row_is_valid = True
item = {}
for field in self.fields:
try:
value = field.read_value(row)
except Exception as exc:
errors.append({"row-number": row_number, **field.error(exc)})
row_is_valid = False
continue

item[field.key] = value
Expand All @@ -80,16 +90,11 @@ def parse(self, data: Union[bytes, BytesIO]) -> Tuple[List[dict], List[dict]]:
errors.append(
{"row-number": row_number, **computed_field.error(exc)}
)
row_is_valid = False
continue

item[computed_field.key] = value

if row_is_valid:
result.append(item)

return result, errors

yield item, errors

class CsvSchema(Schema):
def __init__(self, options: Dict[str, Any]) -> None:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ exclude = [".git/", ".pytest_cache/", ".venv"]

[tool.pytest.ini_options]
python_files = ["tests/*"]
log_cli = true
129 changes: 127 additions & 2 deletions tests/test_schema.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from decimal import Decimal
from magicparse import Schema

from pyparsing import unicode_string
from magicparse import Schema, TypeConverter, register
from magicparse.schema import ColumnarSchema, CsvSchema
from magicparse.fields import ColumnarField, CsvField
import pytest
from unittest import TestCase

import logging
import psutil

class TestBuild(TestCase):
def test_default_csv_schema(self):
Expand Down Expand Up @@ -287,3 +290,125 @@ def test_register(self):
}
)
assert isinstance(schema, self.PipedSchema)

class TestStreaming(TestCase):

def test_stream_parse_errors_do_not_halt_parsing(self):
schema = Schema.build(
{
"file_type": "csv",
"fields": [{"key": "age", "type": "int", "column-number": 1}],
}
)
rows = list(schema.stream_parse(b"1\na\n2"))
assert rows == [
({"age": 1}, []),
({}, [{
"row-number": 2,
"column-number": 1,
"field-key": "age",
"error": "value 'a' is not a valid integer",
}]),
({"age": 2}, [])
]

#TODO: TO REMOVE BELOW
LOGGER = logging.getLogger(__name__)
ITERATION_COUNT = 1000000
class TestPerformanceToRemove(TestCase):

class LogConverter(TypeConverter):
#LOGGER = logging.getLogger(__name__)

@staticmethod
def key() -> str:
return "log"

def apply(self, value):
LOGGER.critical("Read value " + value)
return value

def test_parsing_order(self):
register(self.LogConverter)

schema = Schema.build(
{
"file_type": "csv",
"fields": [{"key": "name", "type": "log", "column-number": 1}],
}
)
input_csv = b"1\n2\n3\n4\n5"
rows, errors = schema.parse(input_csv)
assert not errors

for row in rows:
LOGGER.critical("Write value " + row['name'])

def test_streaming_order(self):
register(self.LogConverter)

schema = Schema.build(
{
"file_type": "csv",
"fields": [{"key": "name", "type": "log", "column-number": 1}],
}
)
input_csv = b"1\n2\n3\n4\n5\n"

for row in schema.stream_parse(input_csv):
item, errors = row
LOGGER.critical("Write value " + item['name'])

def test_parsing_memory_usage(self):
schema = Schema.build(
{
"file_type": "csv",
"fields": [
{"key": "num", "type": "decimal", "column-number": 1},
{"key": "name", "type": "str", "column-number": 2},
{"key": "date", "type": "datetime", "column-number": 3},
{"key": "description", "type": "str", "column-number": 4},
],
}
)
input_csv = ''.join([f'{num},"This is my name {num}",2022-01-12T10:12:03+03:00,"This is a very long description to load the memory with data\n' for num in range(ITERATION_COUNT)]).encode('utf-8')

process = psutil.Process()
memory_percent = process.memory_percent()
LOGGER.critical(f"Memory Usage: {memory_percent}%")

rows, errors = schema.parse(input_csv)
assert errors == []
i = 0
for row in rows:
if i % (ITERATION_COUNT / 10) == 0 :
memory_percent = process.memory_percent()
LOGGER.critical(f"Memory Usage: {memory_percent}%")
i = i + 1

def test_streaming_memory_usage(self):
schema = Schema.build(
{
"file_type": "csv",
"fields": [
{"key": "num", "type": "decimal", "column-number": 1},
{"key": "name", "type": "str", "column-number": 2},
{"key": "date", "type": "datetime", "column-number": 3},
{"key": "description", "type": "str", "column-number": 4},
],
}
)
input_csv = ''.join([f'{num},"This is my name {num}","2022-01-12T10:12:03+03:00","This is a very long description to load the memory with data\n' for num in range(ITERATION_COUNT)]).encode('utf-8')

process = psutil.Process()
memory_percent = process.memory_percent()
LOGGER.critical(f"Memory Usage: {memory_percent}%")

i = 0
for row in schema.stream_parse(input_csv):
item, errors = row
if i % (ITERATION_COUNT / 10) == 0 :
memory_percent = process.memory_percent()
LOGGER.critical(f"Memory Usage: {memory_percent}%")
i = i + 1

0 comments on commit 7506094

Please sign in to comment.