Skip to content

Commit

Permalink
feat: add ability to override node field params
Browse files Browse the repository at this point in the history
  • Loading branch information
olbychos committed Jan 27, 2025
1 parent d113486 commit 286c61e
Showing 1 changed file with 24 additions and 7 deletions.
31 changes: 24 additions & 7 deletions dynamiq/nodes/converters/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ class CSVConverterInputSchema(BaseModel):
Attributes:
file_paths (list[str] | None): List of paths to CSV files on the filesystem.
files (list[BytesIO | bytes] | None): List of file objects or bytes containing CSV data.
delimiter (str): Character used to separate fields in the CSV files. Defaults to comma.
content_column (str): Name of the column to use as the main document content.
metadata_columns (list[str] | None): Column names to extract as metadata for each document.
"""

file_paths: list[str] | None = Field(
Expand All @@ -28,6 +30,18 @@ class CSVConverterInputSchema(BaseModel):
files: list[BytesIO | bytes] | None = Field(
default=None, description="List of file objects or bytes representing CSV files."
)
delimiter: str | None = Field(
default=None,
description="Delimiter used in the CSV files. If not provided, the Node's configured delimiter is used."
)
content_column: str | None = Field(
default=None,
description="Name of the column that will be used as the document's main content. If not provided, the Node's configured content_column is used."
)
metadata_columns: list[str] | None = Field(
default=None,
description="Optional list of column names to extract as metadata for each document. Can be None.",
)

model_config = ConfigDict(arbitrary_types_allowed=True)

Expand Down Expand Up @@ -97,17 +111,20 @@ def execute(
last_error = None
success_count = 0
total_files = len(input_data.file_paths or []) + len(input_data.files or [])
delimiter = input_data.delimiter if input_data.delimiter is not None else self.delimiter
content_column = input_data.content_column if input_data.content_column is not None else self.content_column
metadata_columns = input_data.metadata_columns if input_data.metadata_columns is not None else self.metadata_columns

if input_data.file_paths:
for path in input_data.file_paths:
try:
with open(path, encoding="utf-8") as csv_file:
reader = csv.DictReader(csv_file, delimiter=self.delimiter)
reader = csv.DictReader(csv_file, delimiter=delimiter)
for doc in self._process_rows_generator(
reader,
source=path,
content_column=self.content_column,
metadata_columns=self.metadata_columns,
content_column=content_column,
metadata_columns=metadata_columns,
):
all_documents.append(doc)
success_count += 1
Expand All @@ -123,13 +140,13 @@ def execute(
file_obj = BytesIO(file_obj)

file_text = TextIOWrapper(file_obj, encoding="utf-8")
reader = csv.DictReader(file_text, delimiter=self.delimiter)
reader = csv.DictReader(file_text, delimiter=delimiter)

for doc in self._process_rows_generator(
reader,
source=source_name,
content_column=self.content_column,
metadata_columns=self.metadata_columns,
content_column=content_column,
metadata_columns=metadata_columns,
):
all_documents.append(doc)
success_count += 1
Expand Down

0 comments on commit 286c61e

Please sign in to comment.