Skip to content

Commit

Permalink
chore: Add a file (CSV) tap for testing (WIP!) (#2668)
Browse files Browse the repository at this point in the history
* chore: Add a file (CSV) tap for testing

* Implement and test tap

* Add fixture files

* Incremental replication

* Formalize default stream name

* Update cache keys

* Enable more tests

* gitignore cache file

* More tests
  • Loading branch information
edgarrmondragon authored Sep 17, 2024
1 parent 83b3d49 commit 3d3ac7b
Show file tree
Hide file tree
Showing 9 changed files with 1,296 additions and 3 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ jobs:
nox --version
- uses: actions/cache@v4
if: always() && (matrix.session == 'tests')
if: matrix.session == 'tests'
with:
path: http_cache.sqlite
key: http_cache-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.session }}-${{ matrix.sqlalchemy }}
key: http_cache-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.sqlalchemy }}

- name: Run Nox
env:
Expand All @@ -106,7 +106,7 @@ jobs:
if: always() && (matrix.session == 'tests')
with:
include-hidden-files: true
name: coverage-data-nox_${{ matrix.session }}-${{ matrix.os }}-py${{ matrix.python-version }}_sqlalchemy_${{ matrix.sqlalchemy }}
name: coverage-data-nox_-${{ matrix.os }}-py${{ matrix.python-version }}_sqlalchemy_${{ matrix.sqlalchemy }}
path: ".coverage.*"

tests-external:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# HTTP cache
http_cache.sqlite

# Local Poetry configuration file

poetry.toml
Expand Down
1,001 changes: 1,001 additions & 0 deletions fixtures/csv/customers.csv

Large diffs are not rendered by default.

51 changes: 51 additions & 0 deletions fixtures/csv/employees.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
id,first_name,last_name,email,ip_address
1,Tobye,Tallach,[email protected],159.80.54.64
2,Bret,Auchterlonie,[email protected],63.179.228.179
3,Chester,Leban,[email protected],109.23.123.220
4,Weston,Venny,[email protected],80.78.0.69
5,Alejoa,Hassen,[email protected],193.70.126.231
6,Otes,Ioselevich,[email protected],55.238.240.160
7,Dolley,Mc Ilwrick,[email protected],225.224.151.67
8,Cliff,Druitt,[email protected],216.35.85.142
9,Alfreda,Parysiak,[email protected],234.124.93.69
10,Alfonso,Wotherspoon,[email protected],34.94.1.132
11,Jemmy,Gavriel,[email protected],69.13.142.245
12,Ezechiel,Binion,[email protected],85.203.127.191
13,Burk,Blowfelde,[email protected],74.133.42.177
14,Danette,Brealey,[email protected],249.85.157.243
15,Brent,Collcutt,[email protected],68.202.67.52
16,Filbert,Wane,[email protected],51.190.146.189
17,Amory,Brewers,[email protected],147.155.225.194
18,Giraud,Reen,[email protected],134.254.177.66
19,Burtie,Siebert,[email protected],47.194.48.217
20,Adam,Maddick,[email protected],165.16.248.228
21,Callean,Vernall,[email protected],243.145.198.197
22,Olympie,Itzakovitz,[email protected],40.55.240.15
23,Jacky,Emney,[email protected],216.72.80.81
24,Isidoro,Novello,[email protected],153.171.11.150
25,Kora,Liversedge,[email protected],131.126.97.242
26,Salaidh,McMenamie,[email protected],228.231.31.219
27,Corey,Dowdeswell,[email protected],203.73.30.64
28,Brodie,Holwell,[email protected],64.200.225.25
29,Trudey,Ungerer,[email protected],122.82.88.41
30,Doralin,Maxted,[email protected],37.78.14.199
31,Maurie,Marklin,[email protected],22.181.178.6
32,Hermann,Voase,[email protected],155.126.157.84
33,Fanchette,Callaway,[email protected],121.161.80.246
34,Sara-ann,Birdall,[email protected],35.64.166.83
35,Harriot,Clipsham,[email protected],78.103.253.219
36,Bonita,Woolway,[email protected],70.114.50.135
37,Arleyne,MacComiskey,[email protected],80.22.221.216
38,Ethelbert,Covill,[email protected],52.66.186.124
39,Irita,Knee,[email protected],239.247.34.120
40,Naoma,Janca,[email protected],189.63.152.60
41,Kayne,Mizzen,[email protected],84.133.236.10
42,Estell,Stuckford,[email protected],246.168.153.22
43,Larine,Stack,[email protected],197.176.195.68
44,Rikki,Newbold,[email protected],27.245.43.243
45,Romonda,Charer,[email protected],137.144.236.93
46,Letizia,Monksfield,[email protected],209.47.5.147
47,Sinclare,McAreavey,[email protected],251.250.216.206
48,Athene,Haysham,[email protected],81.227.231.240
49,Gale,Tracy,[email protected],93.138.226.205
50,Dareen,O'Shields,[email protected],27.226.127.240
Empty file.
5 changes: 5 additions & 0 deletions samples/sample_tap_csv/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from samples.sample_tap_csv.sample_tap_csv import SampleTapCSV

SampleTapCSV.cli()
91 changes: 91 additions & 0 deletions samples/sample_tap_csv/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from __future__ import annotations

import csv
import datetime
import os
import typing as t

from singer_sdk import Stream
from singer_sdk.streams.core import REPLICATION_INCREMENTAL

if t.TYPE_CHECKING:
from singer_sdk.helpers.types import Context, Record
from singer_sdk.tap_base import Tap

SDC_META_FILEPATH = "_sdc_path"
SDC_META_MODIFIED_AT = "_sdc_modified_at"


def _to_datetime(value: float) -> str:
return datetime.datetime.fromtimestamp(value).astimezone()


class CSVStream(Stream):
"""CSV stream class."""

def __init__(
self,
tap: Tap,
name: str | None = None,
*,
partitions: list[str] | None = None,
) -> None:
# TODO(edgarmondragon): Build schema from CSV file.
schema = {
"type": ["object"],
"properties": {
SDC_META_FILEPATH: {"type": "string"},
SDC_META_MODIFIED_AT: {"type": "string", "format": "date-time"},
},
"required": [],
"additionalProperties": {"type": "string"},
}
super().__init__(tap, schema, name)

# TODO(edgarrmondragon): Make this None if the filesytem does not support it.
self.replication_key = SDC_META_MODIFIED_AT

self._partitions = partitions or []

@property
def partitions(self) -> list[Context]:
return self._partitions

def _read_file(self, path: str) -> t.Iterable[Record]: # noqa: PLR6301
# Make these configurable.
delimiter = ","
quotechar = '"'
escapechar = None
doublequote = True
lineterminator = "\r\n"

# TODO: Use filesytem-specific file open method.
with open(path, encoding="utf-8") as file: # noqa: PTH123
reader = csv.DictReader(
file,
delimiter=delimiter,
quotechar=quotechar,
escapechar=escapechar,
doublequote=doublequote,
lineterminator=lineterminator,
)
yield from reader

def get_records(
self,
context: Context | None,
) -> t.Iterable[Record | tuple[Record, Context | None]]:
path: str = context[SDC_META_FILEPATH]
mtime = os.path.getmtime(path) # noqa: PTH204

if (
self.replication_method is REPLICATION_INCREMENTAL
and (previous_bookmark := self.get_starting_timestamp(context))
and _to_datetime(mtime) < previous_bookmark
):
self.logger.info("File has not been modified since last read, skipping")
return

for record in self._read_file(path):
record[SDC_META_MODIFIED_AT] = _to_datetime(mtime)
yield record
106 changes: 106 additions & 0 deletions samples/sample_tap_csv/sample_tap_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Sample Tap for CSV files."""

from __future__ import annotations

import enum
import functools
import os

import singer_sdk.typing as th
from samples.sample_tap_csv.client import SDC_META_FILEPATH, CSVStream
from singer_sdk import Tap

DEFAULT_MERGE_STREAM_NAME = "files"


def file_path_to_stream_name(file_path: str) -> str:
"""Convert a file path to a stream name."""
return os.path.basename(file_path).replace(".csv", "").replace(os.sep, "__") # noqa: PTH119


class ReadMode(str, enum.Enum):
"""Sync mode for the tap."""

one_stream_per_file = "one_stream_per_file"
merge = "merge"


class SampleTapCSV(Tap):
"""Sample Tap for CSV files."""

name = "sample-tap-csv"

config_jsonschema = th.PropertiesList(
th.Property(
"path",
th.StringType,
required=True,
description="Path to CSV files.",
),
th.Property(
"read_mode",
th.StringType,
required=True,
description=(
"Use `one_stream_per_file` to read each file as a separate stream, or "
"`merge` to merge all files into a single stream."
),
allowed_values=[
ReadMode.one_stream_per_file,
ReadMode.merge,
],
),
th.Property(
"stream_name",
th.StringType,
required=True,
default=DEFAULT_MERGE_STREAM_NAME,
description="Name of the stream to use when `read_mode` is `merge`.",
),
# TODO(edgarmondragon): Other configuration options.
).to_dict()

@functools.cached_property
def read_mode(self) -> ReadMode:
return ReadMode(self.config["read_mode"])

def discover_streams(self) -> list:
# TODO(edgarmondragon): Implement stream discovery, based on the configured path
# and read mode.
path: str = self.config[
"path"
] # a directory for now, but could be a glob pattern

# One stream per file
if self.read_mode == ReadMode.one_stream_per_file:
if os.path.isdir(path): # noqa: PTH112
return [
CSVStream(
tap=self,
name=file_path_to_stream_name(member),
partitions=[{SDC_META_FILEPATH: os.path.join(path, member)}], # noqa: PTH118
)
for member in os.listdir(path)
if member.endswith(".csv")
]

msg = f"Path {path} is not a directory."
raise ValueError(msg)

# Merge
if os.path.isdir(path): # noqa: PTH112
contexts = [
{
SDC_META_FILEPATH: os.path.join(path, member), # noqa: PTH118
}
for member in os.listdir(path)
if member.endswith(".csv")
]
return [
CSVStream(
tap=self,
name=self.config["stream_name"],
partitions=contexts,
)
]
return []
36 changes: 36 additions & 0 deletions tests/samples/test_tap_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from __future__ import annotations

import pytest

from samples.sample_tap_csv.sample_tap_csv import SampleTapCSV
from singer_sdk.testing import get_tap_test_class

_TestCSVMerge = get_tap_test_class(
tap_class=SampleTapCSV,
config={
"path": "fixtures/csv",
"read_mode": "merge",
"stream_name": "people",
},
)


class TestCSVMerge(_TestCSVMerge):
@pytest.mark.xfail(reason="Schema generation not implemented", strict=True)
def test_tap_stream_record_schema_matches_transformed_catalog(self, stream: str):
super().test_tap_stream_record_schema_matches_transformed_catalog(stream)


TestCSVOneStreamPerFile = get_tap_test_class(
tap_class=SampleTapCSV,
config={
"path": "fixtures/csv",
"read_mode": "one_stream_per_file",
},
)


class TestCSVOneStreamPerFile(TestCSVOneStreamPerFile):
@pytest.mark.xfail(reason="Schema generation not implemented", strict=True)
def test_tap_stream_record_schema_matches_transformed_catalog(self, stream: str):
super().test_tap_stream_record_schema_matches_transformed_catalog(stream)

0 comments on commit 3d3ac7b

Please sign in to comment.