From 42d33c60a6da1d10a9e19c571439e164f3892dcd Mon Sep 17 00:00:00 2001 From: William FH <13333726+hinthornw@users.noreply.github.com> Date: Tue, 4 Jun 2024 13:56:29 -0700 Subject: [PATCH] Limit batch size (#659) --- python/langsmith/client.py | 100 +++++++++++++++++++++++-------------- python/pyproject.toml | 2 +- 2 files changed, 64 insertions(+), 38 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index d3966f597..029badf35 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -2156,7 +2156,7 @@ def get_test_results( project_id: Optional[ID_TYPE] = None, project_name: Optional[str] = None, ) -> "pd.DataFrame": - """Read the record-level information from a test project into a Pandas DF. + """Read the record-level information from an experiment into a Pandas DF. Note: this will fetch whatever data exists in the DB. Results are not immediately available in the DB upon evaluation run completion. @@ -2166,24 +2166,47 @@ def get_test_results( pd.DataFrame A dataframe containing the test results. """ + from concurrent.futures import ThreadPoolExecutor, as_completed # type: ignore + import pandas as pd # type: ignore runs = self.list_runs( - project_id=project_id, project_name=project_name, is_root=True + project_id=project_id, + project_name=project_name, + is_root=True, + select=[ + "id", + "reference_example_id", + "inputs", + "outputs", + "error", + "feedback_stats", + "start_time", + "end_time", + ], ) - results = [] + results: list[dict] = [] example_ids = [] - for r in runs: - row = { - "example_id": r.reference_example_id, - **{f"input.{k}": v for k, v in r.inputs.items()}, - **{f"outputs.{k}": v for k, v in (r.outputs or {}).items()}, - } - if r.feedback_stats: - for k, v in r.feedback_stats.items(): - row[f"feedback.{k}"] = v.get("avg") - row.update( + + def fetch_examples(batch): + examples = self.list_examples(example_ids=batch) + return [ { + "example_id": example.id, + **{f"reference.{k}": v for k, v in (example.outputs or {}).items()}, + } + for example in examples + ] + + batch_size = 50 + cursor = 0 + with ThreadPoolExecutor() as executor: + futures = [] + for r in runs: + row = { + "example_id": r.reference_example_id, + **{f"input.{k}": v for k, v in r.inputs.items()}, + **{f"outputs.{k}": v for k, v in (r.outputs or {}).items()}, "execution_time": ( (r.end_time - r.start_time).total_seconds() if r.end_time @@ -2192,32 +2215,35 @@ def get_test_results( "error": r.error, "id": r.id, } - ) - if r.reference_example_id: - example_ids.append(r.reference_example_id) - results.append(row) - result = pd.DataFrame(results).set_index("example_id") - batch_size = 100 - example_outputs = [] - for batch in [ - example_ids[i : i + batch_size] - for i in range(0, len(example_ids), batch_size) - ]: - for example in self.list_examples(example_ids=batch): - example_outputs.append( - { - "example_id": example.id, - **{ - f"reference.{k}": v - for k, v in (example.outputs or {}).items() - }, - } - ) + if r.feedback_stats: + row.update( + { + f"feedback.{k}": v.get("avg") + for k, v in r.feedback_stats.items() + } + ) + if r.reference_example_id: + example_ids.append(r.reference_example_id) + if len(results) % batch_size == 0: + # Ensure not empty + if batch := example_ids[cursor : cursor + batch_size]: + futures.append(executor.submit(fetch_examples, batch)) + cursor += batch_size + results.append(row) + + # Handle any remaining examples + if example_ids[cursor:]: + futures.append(executor.submit(fetch_examples, example_ids[cursor:])) + result_df = pd.DataFrame(results).set_index("example_id") + example_outputs = [ + output for future in as_completed(futures) for output in future.result() + ] if example_outputs: - df = pd.DataFrame(example_outputs).set_index("example_id") - result = df.merge(result, left_index=True, right_index=True) + example_df = pd.DataFrame(example_outputs).set_index("example_id") + result_df = example_df.merge(result_df, left_index=True, right_index=True) + # Flatten dict columns into dot syntax for easier access - return pd.json_normalize(result.to_dict(orient="records")) + return pd.json_normalize(result_df.to_dict(orient="records")) def list_projects( self, diff --git a/python/pyproject.toml b/python/pyproject.toml index e12e09ccb..c40cd6387 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "langsmith" -version = "0.1.69" +version = "0.1.70" description = "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform." authors = ["LangChain "] license = "MIT"