Skip to content

Commit

Permalink
Implement CSVFileLoader to write data to csv files
Browse files Browse the repository at this point in the history
  • Loading branch information
ti1uan committed Feb 17, 2024
1 parent 2b96a4c commit a9495c0
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 1 deletion.
1 change: 0 additions & 1 deletion src/loader_interface/base_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,5 @@ async def run(self, data_generator: AsyncGenerator[Any, None]) -> None:
"""
self.logger.info("Starting data loading process.")
async for data in data_generator:
print(self.write_data)
await self.write_data(data)
self.logger.info("Data loading process completed.")
29 changes: 29 additions & 0 deletions src/loader_interface/csv_file_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from loader_interface.base_loader import BaseLoader
from pandas import DataFrame
from pathlib import Path


class CSVLoader(BaseLoader):
def __init__(self, destination_file: str) -> None:
"""
Initialize the CSVLoader object.
Args:
destination_file (str): The path to the destination CSV file.
"""
super().__init__()
self.destination_file = destination_file
self.initial_write = not Path(self.destination_file).exists()

async def write_data(self, data: DataFrame) -> None:
"""
Writes the given data to a CSV file.
Args:
data (DataFrame): The data to be written. Expected to be a pandas DataFrame.
"""
# Write DataFrame to CSV file
data.to_csv(self.destination_file, mode='a', header=self.initial_write, index=False)

# After the first write, set initial_write to False so headers are not repeated
self.initial_write = False
41 changes: 41 additions & 0 deletions tests/loader_interface/test_csv_file_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import pytest
import pandas as pd
from pandas.testing import assert_frame_equal
from loader_interface.csv_file_loader import CSVLoader
import asyncio

@pytest.mark.asyncio
async def test_csv_loader(tmp_path):
# Create a temp path
destination_file = tmp_path / "output.csv"

# Create DataFrame used for tests
df1 = pd.DataFrame({'A': [1, 2], 'B': [3, 4]})
df2 = pd.DataFrame({'A': [5, 6], 'B': [7, 8]})

loader = CSVLoader(str(destination_file))

# Write the first batch of data
await loader.write_data(df1)

# Verify if the first write successful
loaded_df1 = pd.read_csv(destination_file)
assert_frame_equal(loaded_df1, df1, check_dtype=False)

# Write the second batch of data
await loader.write_data(df2)

# Verify if the second write successful
loaded_df2 = pd.read_csv(destination_file)
combined_df = pd.concat([df1, df2], ignore_index=True)
assert_frame_equal(loaded_df2, combined_df, check_dtype=False)

# Clean files
@pytest.fixture(autouse=True)
def run_around_tests():
# Do nothing before test
yield
# Execute cleanup
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.sleep(0.1))
# Delete anything here if need

0 comments on commit a9495c0

Please sign in to comment.