Skip to content

Commit

Permalink
Add data operations for reading from zip, iterating over csv and json…
Browse files Browse the repository at this point in the history
… records, and writing to parquet (#6)

* Add data operations: reading from zip file, json and csv record parsing, writing to parquet

* Update version to 0.1

* Fix linting; update to run multiple dependency versions

* Run dependency version checks with specific python versions
  • Loading branch information
simw authored Nov 7, 2023
1 parent a52122d commit 79955e2
Show file tree
Hide file tree
Showing 19 changed files with 773 additions and 11 deletions.
1 change: 1 addition & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ jobs:
- run: curl -sSL https://install.python-poetry.org | python - -y
- run: poetry config virtualenvs.in-project true
- run: make test
- run: make test-dep-versions

build:
needs:
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,6 @@ jobs:

- name: Run tests
run: make test

- name: Run tests on different dependency versions
run: make test-dep-versions
26 changes: 24 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ sources = src tests

.PHONY: prepare
prepare:
poetry install
poetry install --with ops


.PHONY: lintable
Expand All @@ -21,13 +21,35 @@ lint: prepare
poetry run mypy $(sources)



.PHONY: test
test: prepare
poetry run coverage run -m pytest
poetry run coverage report


.PHONY: test-python-versions
test-python-versions:
poetry env use python3.8
make test

poetry env use python3.9
make test

poetry env use python3.10
make test

poetry env use python3.11
make test

poetry env use python3.12
make test


.PHONY: test-dep-versions
test-dep-versions: prepare
./scripts/test_dependency_versions.sh


.PHONY: clean
clean:
rm -rf `find . -name __pycache__`
Expand Down
63 changes: 62 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,68 @@ Chained operations in Python, applied to data processing.
pip install pipedata
```

## An Example
## Examples

### Chaining Data Operations

pipedata.ops provides some operations for streaming data through memory.

```py
import json
import zipfile

import pyarrow.parquet as pq

from pipedata.core import StreamStart
from pipedata.ops import json_records, parquet_writer, zipped_files


data1 = [
{"col1": 1, "col2": "Hello"},
{"col1": 2, "col2": "world"},
]
data2 = [
{"col1": 3, "col2": "!"},
]

with zipfile.ZipFile("test_input.json.zip", "w") as zipped:
zipped.writestr("file1.json", json.dumps(data1))
zipped.writestr("file2.json", json.dumps(data2))

result = (
StreamStart(["test_input.json.zip"])
.flat_map(zipped_files)
.flat_map(json_records())
.flat_map(parquet_writer("test_output.parquet"))
.to_list()
)

table = pq.read_table("test_output.parquet")
print(table.to_pydict())
#> {'col1': [1, 2, 3], 'col2': ['Hello', 'world', '!']}
```

Alternatively, you can construct the pipeline as a chain:

```py
import pyarrow.parquet as pq

from pipedata.core import ChainStart, StreamStart
from pipedata.ops import json_records, parquet_writer, zipped_files

# Running this after input file created in above example
chain = (
ChainStart()
.flat_map(zipped_files)
.flat_map(json_records())
.flat_map(parquet_writer("test_output_2.parquet"))
)
result = StreamStart(["test_input.json.zip"]).flat_map(chain).to_list()
table = pq.read_table("test_output_2.parquet")
print(table.to_pydict())
#> {'col1': [1, 2, 3], 'col2': ['Hello', 'world', '!']}

```

### Core Framework

Expand Down
261 changes: 260 additions & 1 deletion poetry.lock

Large diffs are not rendered by default.

27 changes: 24 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "pipedata"
version = "0.0.1"
version = "0.1"
description = "Framework for building pipelines for data processing"
authors = ["Simon Wicks <[email protected]>"]
readme = "README.md"
Expand Down Expand Up @@ -35,17 +35,36 @@ packages = [{include = "pipedata", from = "src"}]
[tool.poetry.dependencies]
python = "^3.8"

[tool.poetry.group.ops.dependencies]
fsspec = [
{ version = ">=0.9.0", python = "<3.12" },
{ version = ">=2022.1.0", python = ">=3.12,<3.13"},
]
ijson = "^3.0.0"
pyarrow = [
{ version = ">=9.0.0", python = "<3.11" },
{ version = ">=11.0.0", python = ">=3.11,<3.12" },
{ version = ">=14.0.0", python = ">=3.12,<=3.13" },
]
# We don't have a direct numpy dependency, but pyarrow depends on numpy
# and numpy has python version constraints with python 3.12
numpy = [
{ version = "<1.25.0", python = "<3.9" },
{ version = "^1.26.0", python = ">=3.12,<3.13" }
]

[tool.poetry.group.lint.dependencies]
black = "^23.9.1"
ruff = "^0.1.3"
mypy = "^1.6.0"


[tool.poetry.group.test.dependencies]
pytest = "^7.4.2"
coverage = "^7.3.2"

[tool.poetry.group.ops]
optional = true


[tool.mypy]
strict = true
Expand Down Expand Up @@ -103,7 +122,9 @@ keep-runtime-typing = true
testpaths = "tests"
xfail_strict = true
filterwarnings = [
"error"
"error",
"ignore:distutils Version classes:DeprecationWarning",
"ignore:SelectableGroups dict:DeprecationWarning",
]


Expand Down
43 changes: 43 additions & 0 deletions scripts/test_dependency_versions.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/usr/bin/env bash

set -o errexit # Abort on non-zero exit status
set -o nounset # Abort on unbound variable
set -o pipefail # Abort on non-zero exit in pipeline

main() {
PYTHON_MINOR_VERSION=$(poetry run python -c 'import sys; version=sys.version_info[:3]; print("{1}".format(*version))')
echo "Python minor version: $PYTHON_MINOR_VERSION"

# The errors are mostly / all installation errors,
# about building from source. Could lower
# the requirements if able to build from source.
if (( $PYTHON_MINOR_VERSION < "11" )); then
poetry run pip install pyarrow==9.0.0
poetry run python -m pytest

poetry run pip install pyarrow==10.0.0
poetry run python -m pytest
fi

if (( $PYTHON_MINOR_VERSION < "12" )); then
poetry run pip install pyarrow==11.0.0
poetry run python -m pytest

poetry run pip install pyarrow==13.0.0
poetry run python -m pytest

poetry run pip install fsspec==0.9.0
poetry run python -m pytest
fi

poetry run pip install pyarrow==14.0.0
poetry run python -m pytest

poetry run pip install ijson==3.0.0
poetry run python -m pytest

poetry run pip install fsspec==2022.1.0
poetry run python -m pytest
}

main
2 changes: 1 addition & 1 deletion src/pipedata/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.0.1"
__version__ = "0.1"

__all__ = [
"__version__",
Expand Down
4 changes: 2 additions & 2 deletions src/pipedata/core/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
TOther = TypeVar("TOther")


def _batched(iterable: Iterator[TEnd], n: Optional[int]) -> Iterator[Tuple[TEnd, ...]]:
def batched(iterable: Iterator[TEnd], n: Optional[int]) -> Iterator[Tuple[TEnd, ...]]:
"""Can be replaced by itertools.batched once using Python 3.12+."""
while (elements := tuple(itertools.islice(iterable, n))) != ():
yield elements
Expand Down Expand Up @@ -160,7 +160,7 @@ def batched_map(

@functools.wraps(func)
def new_action(previous_step: Iterator[TEnd]) -> Iterator[TOther]:
for elements in _batched(previous_step, n):
for elements in batched(previous_step, n):
yield func(elements)

return self.flat_map(new_action)
Expand Down
10 changes: 10 additions & 0 deletions src/pipedata/ops/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .files import zipped_files
from .records import csv_records, json_records
from .storage import parquet_writer

__all__ = [
"zipped_files",
"csv_records",
"json_records",
"parquet_writer",
]
16 changes: 16 additions & 0 deletions src/pipedata/ops/files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import logging
import zipfile
from typing import IO, Iterator

import fsspec # type: ignore

logger = logging.getLogger(__name__)


def zipped_files(file_refs: Iterator[str]) -> Iterator[IO[bytes]]:
for file_ref in file_refs:
with fsspec.open(file_ref, "rb") as file:
with zipfile.ZipFile(file) as zip_file:
for name in zip_file.namelist():
with zip_file.open(name) as inner_file:
yield inner_file
34 changes: 34 additions & 0 deletions src/pipedata/ops/records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import csv
import io
import logging
from typing import IO, Any, Callable, Dict, Iterator, Optional

import ijson # type: ignore

logger = logging.getLogger(__name__)


def json_records(
json_path: str = "item", multiple_values: Optional[bool] = False
) -> Callable[[Iterator[IO[bytes]]], Iterator[Dict[str, Any]]]:
logger.info(f"Initializing json reader for {json_path}")

def json_records_func(json_files: Iterator[IO[bytes]]) -> Iterator[Dict[str, Any]]:
for json_file in json_files:
logger.info(f"Reading json file {json_file}")
records = ijson.items(json_file, json_path, multiple_values=multiple_values)
yield from records

return json_records_func


def csv_records() -> Callable[[Iterator[IO[bytes]]], Iterator[Dict[str, Any]]]:
def csv_records_func(csv_paths: Iterator[IO[bytes]]) -> Iterator[Dict[str, Any]]:
for csv_path in csv_paths:
logger.info(f"Reading csv file {csv_path}")
csv_reader = csv.DictReader(
io.TextIOWrapper(csv_path, "utf-8"), delimiter=","
)
yield from csv_reader

return csv_records_func
51 changes: 51 additions & 0 deletions src/pipedata/ops/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from typing import Any, Callable, Dict, Iterator, Optional

import pyarrow as pa # type: ignore
import pyarrow.parquet as pq # type: ignore

from pipedata.core.chain import batched


def parquet_writer(
file_path: str,
schema: Optional[pa.Schema] = None,
row_group_length: Optional[int] = None,
max_file_length: Optional[int] = None,
) -> Callable[[Iterator[Dict[str, Any]]], Iterator[str]]:
if row_group_length is None and max_file_length is not None:
row_group_length = max_file_length

if max_file_length is not None:
if file_path.format(i=1) == file_path:
msg = "When (possibly) writing to multiple files (as the file_length"
msg += " argument is not None), the file_path argument must be a"
msg += " format string that contains a format specifier for the file."
raise ValueError(msg)

def parquet_writer_func(records: Iterator[Dict[str, Any]]) -> Iterator[str]:
writer = None
file_number = 0
file_length = 0
for batch in batched(records, row_group_length):
table = pa.Table.from_pylist(batch, schema=schema)
if writer is None:
formated_file_path = file_path
if max_file_length is not None:
formated_file_path = file_path.format(i=file_number)
writer = pq.ParquetWriter(formated_file_path, table.schema)

writer.write_table(table)
file_length += len(batch)

if max_file_length is not None and file_length >= max_file_length:
writer.close()
writer = None
file_length = 0
file_number += 1
yield formated_file_path

if writer is not None:
writer.close()
yield formated_file_path

return parquet_writer_func
4 changes: 3 additions & 1 deletion tests/core/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ def test_stream_to_list() -> None:
def test_stream_on_range() -> None:
stream = StreamStart(range(3))
result = stream.to_list()
print(stream.get_counts())
assert result == [0, 1, 2]
assert stream.get_counts() == [
{"name": "_identity", "inputs": 3, "outputs": 3},
]


def test_repeated_stream() -> None:
Expand Down
Empty file added tests/ops/__init__.py
Empty file.
Loading

0 comments on commit 79955e2

Please sign in to comment.