diff --git a/.semversioner/next-release/patch-20250213222251109897.json b/.semversioner/next-release/patch-20250213222251109897.json new file mode 100644 index 0000000000..9870ded838 --- /dev/null +++ b/.semversioner/next-release/patch-20250213222251109897.json @@ -0,0 +1,4 @@ +{ + "type": "patch", + "description": "Optimize data iteration by removing some iterrows from code" +} diff --git a/graphrag/index/operations/create_graph.py b/graphrag/index/operations/create_graph.py index 54b63b70aa..a28b1ceb5b 100644 --- a/graphrag/index/operations/create_graph.py +++ b/graphrag/index/operations/create_graph.py @@ -18,6 +18,6 @@ def create_graph( if nodes is not None: nodes.set_index(node_id, inplace=True) - graph.add_nodes_from((n, dict(d)) for n, d in nodes.iterrows()) + graph.add_nodes_from(nodes.to_dict("index").items()) return graph diff --git a/graphrag/index/operations/snapshot_rows.py b/graphrag/index/operations/snapshot_rows.py deleted file mode 100644 index 1140ee555b..0000000000 --- a/graphrag/index/operations/snapshot_rows.py +++ /dev/null @@ -1,85 +0,0 @@ -# Copyright (c) 2024 Microsoft Corporation. -# Licensed under the MIT License - -"""A module containing 'FormatSpecifier' model.""" - -import json -from dataclasses import dataclass -from typing import Any - -import pandas as pd - -from graphrag.storage.pipeline_storage import PipelineStorage - - -@dataclass -class FormatSpecifier: - """Format specifier class definition.""" - - format: str - extension: str - - -async def snapshot_rows( - input: pd.DataFrame, - column: str | None, - base_name: str, - storage: PipelineStorage, - formats: list[str | dict[str, Any]], - row_name_column: str | None = None, -) -> None: - """Take a by-row snapshot of the tabular data.""" - parsed_formats = _parse_formats(formats) - num_rows = len(input) - - def get_row_name(row: Any, row_idx: Any): - if row_name_column is None: - if num_rows == 1: - return base_name - return f"{base_name}.{row_idx}" - return f"{base_name}.{row[row_name_column]}" - - for row_idx, row in input.iterrows(): - for fmt in parsed_formats: - row_name = get_row_name(row, row_idx) - extension = fmt.extension - if fmt.format == "json": - await storage.set( - f"{row_name}.{extension}", - ( - json.dumps(row[column], ensure_ascii=False) - if column is not None - else json.dumps(row.to_dict(), ensure_ascii=False) - ), - ) - elif fmt.format == "text": - if column is None: - msg = "column must be specified for text format" - raise ValueError(msg) - await storage.set(f"{row_name}.{extension}", str(row[column])) - - -def _parse_formats(formats: list[str | dict[str, Any]]) -> list[FormatSpecifier]: - """Parse the formats into a list of FormatSpecifiers.""" - return [ - ( - FormatSpecifier(**fmt) - if isinstance(fmt, dict) - else FormatSpecifier(format=fmt, extension=_get_format_extension(fmt)) - ) - for fmt in formats - ] - - -def _get_format_extension(fmt: str) -> str: - """Get the file extension for a given format.""" - if fmt == "json": - return "json" - if fmt == "text": - return "txt" - if fmt == "parquet": - return "parquet" - if fmt == "csv": - return "csv" - msg = f"Unknown format: {fmt}" - raise ValueError(msg) diff --git a/graphrag/index/operations/summarize_descriptions/summarize_descriptions.py b/graphrag/index/operations/summarize_descriptions/summarize_descriptions.py index 31dcf96d3b..25331b9071 100644 --- a/graphrag/index/operations/summarize_descriptions/summarize_descriptions.py +++ b/graphrag/index/operations/summarize_descriptions/summarize_descriptions.py @@ -89,12 +89,12 @@ async def get_summarized( node_futures = [ do_summarize_descriptions( - str(row[1]["title"]), - sorted(set(row[1]["description"])), + str(row.title), # type: ignore + sorted(set(row.description)), # type: ignore ticker, semaphore, ) - for row in nodes.iterrows() + for row in nodes.itertuples(index=False) ] node_results = await asyncio.gather(*node_futures) @@ -109,12 +109,12 @@ async def get_summarized( edge_futures = [ do_summarize_descriptions( - (str(row[1]["source"]), str(row[1]["target"])), - sorted(set(row[1]["description"])), + (str(row.source), str(row.target)), # type: ignore + sorted(set(row.description)), # type: ignore ticker, semaphore, ) - for row in edges.iterrows() + for row in edges.itertuples(index=False) ] edge_results = await asyncio.gather(*edge_futures) diff --git a/graphrag/index/update/entities.py b/graphrag/index/update/entities.py index 42be21848b..89ac6712f6 100644 --- a/graphrag/index/update/entities.py +++ b/graphrag/index/update/entities.py @@ -119,11 +119,12 @@ async def _run_entity_summarization( # Prepare tasks for async summarization where needed async def process_row(row): - description = row["description"] + # Accessing attributes directly from the named tuple. + description = row.description if isinstance(description, list) and len(description) > 1: # Run entity summarization asynchronously result = await run_entity_summarization( - row["title"], + row.title, description, callbacks, cache, @@ -134,7 +135,9 @@ async def process_row(row): return description[0] if isinstance(description, list) else description # Create a list of async tasks for summarization - tasks = [process_row(row) for _, row in entities_df.iterrows()] + tasks = [ + process_row(row) for row in entities_df.itertuples(index=False, name="Entity") + ] results = await asyncio.gather(*tasks) # Update the 'description' column in the DataFrame diff --git a/graphrag/query/input/loaders/dfs.py b/graphrag/query/input/loaders/dfs.py index 5450fd92ef..8b0ce42b53 100644 --- a/graphrag/query/input/loaders/dfs.py +++ b/graphrag/query/input/loaders/dfs.py @@ -22,6 +22,16 @@ ) +def _prepare_records(df: pd.DataFrame) -> list[dict]: + """ + Reset index and convert the DataFrame to a list of dictionaries. + + We rename the reset index column to 'Index' for consistency. + """ + df_reset = df.reset_index().rename(columns={"index": "Index"}) + return df_reset.to_dict("records") + + def read_entities( df: pd.DataFrame, id_col: str = "id", @@ -36,12 +46,14 @@ def read_entities( rank_col: str | None = "degree", attributes_cols: list[str] | None = None, ) -> list[Entity]: - """Read entities from a dataframe.""" - entities = [] - for idx, row in df.iterrows(): - entity = Entity( + """Read entities from a dataframe using pre-converted records.""" + records = _prepare_records(df) + return [ + Entity( id=to_str(row, id_col), - short_id=to_optional_str(row, short_id_col) if short_id_col else str(idx), + short_id=to_optional_str(row, short_id_col) + if short_id_col + else str(row["Index"]), title=to_str(row, title_col), type=to_optional_str(row, type_col), description=to_optional_str(row, description_col), @@ -58,8 +70,8 @@ def read_entities( else None ), ) - entities.append(entity) - return entities + for row in records + ] def read_relationships( @@ -75,12 +87,14 @@ def read_relationships( text_unit_ids_col: str | None = "text_unit_ids", attributes_cols: list[str] | None = None, ) -> list[Relationship]: - """Read relationships from a dataframe.""" - relationships = [] - for idx, row in df.iterrows(): - rel = Relationship( + """Read relationships from a dataframe using pre-converted records.""" + records = _prepare_records(df) + return [ + Relationship( id=to_str(row, id_col), - short_id=to_optional_str(row, short_id_col) if short_id_col else str(idx), + short_id=to_optional_str(row, short_id_col) + if short_id_col + else str(row["Index"]), source=to_str(row, source_col), target=to_str(row, target_col), description=to_optional_str(row, description_col), @@ -96,8 +110,8 @@ def read_relationships( else None ), ) - relationships.append(rel) - return relationships + for row in records + ] def read_covariates( @@ -109,12 +123,14 @@ def read_covariates( text_unit_ids_col: str | None = "text_unit_ids", attributes_cols: list[str] | None = None, ) -> list[Covariate]: - """Read covariates from a dataframe.""" - covariates = [] - for idx, row in df.iterrows(): - cov = Covariate( + """Read covariates from a dataframe using pre-converted records.""" + records = _prepare_records(df) + return [ + Covariate( id=to_str(row, id_col), - short_id=to_optional_str(row, short_id_col) if short_id_col else str(idx), + short_id=to_optional_str(row, short_id_col) + if short_id_col + else str(row["Index"]), subject_id=to_str(row, subject_col), covariate_type=( to_str(row, covariate_type_col) if covariate_type_col else "claim" @@ -126,8 +142,8 @@ def read_covariates( else None ), ) - covariates.append(cov) - return covariates + for row in records + ] def read_communities( @@ -143,12 +159,14 @@ def read_communities( children_col: str | None = "children", attributes_cols: list[str] | None = None, ) -> list[Community]: - """Read communities from a dataframe.""" - communities = [] - for idx, row in df.iterrows(): - comm = Community( + """Read communities from a dataframe using pre-converted records.""" + records = _prepare_records(df) + return [ + Community( id=to_str(row, id_col), - short_id=to_optional_str(row, short_id_col) if short_id_col else str(idx), + short_id=to_optional_str(row, short_id_col) + if short_id_col + else str(row["Index"]), title=to_str(row, title_col), level=to_str(row, level_col), entity_ids=to_optional_list(row, entities_col, item_type=str), @@ -164,8 +182,8 @@ def read_communities( else None ), ) - communities.append(comm) - return communities + for row in records + ] def read_community_reports( @@ -180,12 +198,14 @@ def read_community_reports( content_embedding_col: str | None = "full_content_embedding", attributes_cols: list[str] | None = None, ) -> list[CommunityReport]: - """Read community reports from a dataframe.""" - reports = [] - for idx, row in df.iterrows(): - report = CommunityReport( + """Read community reports from a dataframe using pre-converted records.""" + records = _prepare_records(df) + return [ + CommunityReport( id=to_str(row, id_col), - short_id=to_optional_str(row, short_id_col) if short_id_col else str(idx), + short_id=to_optional_str(row, short_id_col) + if short_id_col + else str(row["Index"]), title=to_str(row, title_col), community_id=to_str(row, community_col), summary=to_str(row, summary_col), @@ -200,8 +220,8 @@ def read_community_reports( else None ), ) - reports.append(report) - return reports + for row in records + ] def read_text_units( @@ -215,12 +235,12 @@ def read_text_units( document_ids_col: str | None = "document_ids", attributes_cols: list[str] | None = None, ) -> list[TextUnit]: - """Read text units from a dataframe.""" - text_units = [] - for idx, row in df.iterrows(): - chunk = TextUnit( + """Read text units from a dataframe using pre-converted records.""" + records = _prepare_records(df) + return [ + TextUnit( id=to_str(row, id_col), - short_id=str(idx), + short_id=str(row["Index"]), text=to_str(row, text_col), entity_ids=to_optional_list(row, entities_col, item_type=str), relationship_ids=to_optional_list(row, relationships_col, item_type=str), @@ -235,5 +255,5 @@ def read_text_units( else None ), ) - text_units.append(chunk) - return text_units + for row in records + ] diff --git a/graphrag/query/input/loaders/utils.py b/graphrag/query/input/loaders/utils.py index a96844ba76..40fe2cfd89 100644 --- a/graphrag/query/input/loaders/utils.py +++ b/graphrag/query/input/loaders/utils.py @@ -3,245 +3,185 @@ """Data load utils.""" +from collections.abc import Mapping +from typing import Any + import numpy as np -import pandas as pd -def to_str(data: pd.Series, column_name: str | None) -> str: - """Convert and validate a value to a string.""" +def _get_value( + data: Mapping[str, Any], column_name: str | None, required: bool = True +) -> Any: + """ + Retrieve a column value from data. + + If `required` is True, raises a ValueError when: + - column_name is None, or + - column_name is not in data. + + For optional columns (required=False), returns None if column_name is None. + """ if column_name is None: - msg = "Column name is None" + if required: + msg = "Column name is None" + raise ValueError(msg) + return None + if column_name in data: + return data[column_name] + if required: + msg = f"Column [{column_name}] not found in data" raise ValueError(msg) + return None - if column_name in data: - return str(data[column_name]) - msg = f"Column [{column_name}] not found in data" - raise ValueError(msg) +def to_str(data: Mapping[str, Any], column_name: str | None) -> str: + """Convert and validate a value to a string.""" + value = _get_value(data, column_name, required=True) + return str(value) -def to_optional_str(data: pd.Series, column_name: str | None) -> str | None: - """Convert and validate a value to an optional string.""" - if column_name is None: - msg = "Column name is None" - raise ValueError(msg) - if column_name in data: - value = data[column_name] - if value is None: - return None - return str(data[column_name]) - msg = f"Column [{column_name}] not found in data" - raise ValueError(msg) +def to_optional_str(data: Mapping[str, Any], column_name: str | None) -> str | None: + """Convert and validate a value to an optional string.""" + value = _get_value(data, column_name, required=True) + return None if value is None else str(value) def to_list( - data: pd.Series, column_name: str | None, item_type: type | None = None + data: Mapping[str, Any], column_name: str | None, item_type: type | None = None ) -> list: """Convert and validate a value to a list.""" - if column_name is None: - msg = "Column name is None" - raise ValueError(msg) - - if column_name in data: - value = data[column_name] - if isinstance(value, np.ndarray): - value = value.tolist() - - if not isinstance(value, list): - msg = f"value is not a list: {value} ({type(value)})" - raise ValueError(msg) - - if item_type is not None: - for v in value: - if not isinstance(v, item_type): - msg = ( - f"list item has item that is not [{item_type}]: {v} ({type(v)})" - ) - raise TypeError(msg) - return value - - msg = f"Column [{column_name}] not found in data" - raise ValueError(msg) + value = _get_value(data, column_name, required=True) + if isinstance(value, np.ndarray): + value = value.tolist() + if not isinstance(value, list): + msg = f"value is not a list: {value} ({type(value)})" + raise TypeError(msg) + if item_type is not None: + for v in value: + if not isinstance(v, item_type): + msg = f"list item is not [{item_type}]: {v} ({type(v)})" + raise TypeError(msg) + return value def to_optional_list( - data: pd.Series, column_name: str | None, item_type: type | None = None + data: Mapping[str, Any], column_name: str | None, item_type: type | None = None ) -> list | None: """Convert and validate a value to an optional list.""" - if column_name is None: + if column_name is None or column_name not in data: return None - - if column_name in data: - value = data[column_name] # type: ignore - if value is None: - return None - - if isinstance(value, np.ndarray): - value = value.tolist() - - if isinstance(value, str): - value = [value] - - if not isinstance(value, list): - msg = f"value is not a list: {value} ({type(value)})" - raise ValueError(msg) - - if item_type is not None: - for v in value: - if not isinstance(v, item_type): - msg = ( - f"list item has item that is not [{item_type}]: {v} ({type(v)})" - ) - raise TypeError(msg) - return value - - return None - - -def to_int(data: pd.Series, column_name: str | None) -> int: + value = data[column_name] + if value is None: + return None + if isinstance(value, np.ndarray): + value = value.tolist() + if isinstance(value, str): + value = [value] + if not isinstance(value, list): + msg = f"value is not a list: {value} ({type(value)})" + raise TypeError(msg) + if item_type is not None: + for v in value: + if not isinstance(v, item_type): + msg = f"list item is not [{item_type}]: {v} ({type(v)})" + raise TypeError(msg) + return value + + +def to_int(data: Mapping[str, Any], column_name: str | None) -> int: """Convert and validate a value to an int.""" - if column_name is None: - msg = "Column name is None" - raise ValueError(msg) - - if column_name in data: - value = data[column_name] - if isinstance(value, float): - value = int(value) - if not isinstance(value, int): - msg = f"value is not an int: {value} ({type(value)})" - raise ValueError(msg) - else: - msg = f"Column [{column_name}] not found in data" - raise ValueError(msg) - + value = _get_value(data, column_name, required=True) + if isinstance(value, float): + value = int(value) + if not isinstance(value, int): + msg = f"value is not an int: {value} ({type(value)})" + raise TypeError(msg) return int(value) -def to_optional_int(data: pd.Series, column_name: str | None) -> int | None: +def to_optional_int(data: Mapping[str, Any], column_name: str | None) -> int | None: """Convert and validate a value to an optional int.""" - if column_name is None: + if column_name is None or column_name not in data: return None - - if column_name in data: - value = data[column_name] - - if value is None: - return None - - if isinstance(value, float): - value = int(value) - if not isinstance(value, int): - msg = f"value is not an int: {value} ({type(value)})" - raise ValueError(msg) - else: - msg = f"Column [{column_name}] not found in data" - raise ValueError(msg) - + value = data[column_name] + if value is None: + return None + if isinstance(value, float): + value = int(value) + if not isinstance(value, int): + msg = f"value is not an int: {value} ({type(value)})" + raise TypeError(msg) return int(value) -def to_float(data: pd.Series, column_name: str | None) -> float: +def to_float(data: Mapping[str, Any], column_name: str | None) -> float: """Convert and validate a value to a float.""" - if column_name is None: - msg = "Column name is None" - raise ValueError(msg) - - if column_name in data: - value = data[column_name] - if not isinstance(value, float): - msg = f"value is not a float: {value} ({type(value)})" - raise ValueError(msg) - else: - msg = f"Column [{column_name}] not found in data" - raise ValueError(msg) - + value = _get_value(data, column_name, required=True) + if not isinstance(value, float): + msg = f"value is not a float: {value} ({type(value)})" + raise TypeError(msg) return float(value) -def to_optional_float(data: pd.Series, column_name: str | None) -> float | None: +def to_optional_float(data: Mapping[str, Any], column_name: str | None) -> float | None: """Convert and validate a value to an optional float.""" - if column_name is None: + if column_name is None or column_name not in data: return None - - if column_name in data: - value = data[column_name] - if value is None: - return None - if not isinstance(value, float): - return float(value) - else: - msg = f"Column {column_name} not found in data" - raise ValueError(msg) - + value = data[column_name] + if value is None: + return None + if not isinstance(value, float): + return float(value) return float(value) def to_dict( - data: pd.Series, + data: Mapping[str, Any], column_name: str | None, key_type: type | None = None, value_type: type | None = None, ) -> dict: """Convert and validate a value to a dict.""" - if column_name is None: - msg = "Column name is None" - raise ValueError(msg) - - if column_name in data: - value = data[column_name] - if not isinstance(value, dict): - msg = f"value is not a dict: {value} ({type(value)})" - raise ValueError(msg) - - if key_type is not None: - for v in value: - if not isinstance(v, key_type): - msg = f"dict key has item that is not [{key_type}]: {v} ({type(v)})" - raise TypeError(msg) - - if value_type is not None: - for v in value.values(): - if not isinstance(v, value_type): - msg = f"dict value has item that is not [{value_type}]: {v} ({type(v)})" - raise TypeError(msg) - return value - - msg = f"Column [{column_name}] not found in data" - raise ValueError(msg) + value = _get_value(data, column_name, required=True) + if not isinstance(value, dict): + msg = f"value is not a dict: {value} ({type(value)})" + raise TypeError(msg) + if key_type is not None: + for k in value: + if not isinstance(k, key_type): + msg = f"dict key is not [{key_type}]: {k} ({type(k)})" + raise TypeError(msg) + if value_type is not None: + for v in value.values(): + if not isinstance(v, value_type): + msg = f"dict value is not [{value_type}]: {v} ({type(v)})" + raise TypeError(msg) + return value def to_optional_dict( - data: pd.Series, + data: Mapping[str, Any], column_name: str | None, key_type: type | None = None, value_type: type | None = None, ) -> dict | None: """Convert and validate a value to an optional dict.""" - if column_name is None: + if column_name is None or column_name not in data: return None - - if column_name in data: - value = data[column_name] - if value is None: - return None - if not isinstance(value, dict): - msg = f"value is not a dict: {value} ({type(value)})" - raise TypeError(msg) - - if key_type is not None: - for v in value: - if not isinstance(v, key_type): - msg = f"dict key has item that is not [{key_type}]: {v} ({type(v)})" - raise TypeError(msg) - - if value_type is not None: - for v in value.values(): - if not isinstance(v, value_type): - msg = f"dict value has item that is not [{value_type}]: {v} ({type(v)})" - raise TypeError(msg) - - return value - - msg = f"Column {column_name} not found in data" - raise ValueError(msg) + value = data[column_name] + if value is None: + return None + if not isinstance(value, dict): + msg = f"value is not a dict: {value} ({type(value)})" + raise TypeError(msg) + if key_type is not None: + for k in value: + if not isinstance(k, key_type): + msg = f"dict key is not [{key_type}]: {k} ({type(k)})" + raise TypeError(msg) + if value_type is not None: + for v in value.values(): + if not isinstance(v, value_type): + msg = f"dict value is not [{value_type}]: {v} ({type(v)})" + raise TypeError(msg) + return value