Skip to content

Commit

Permalink
Limit batch size (#659)
Browse files Browse the repository at this point in the history
  • Loading branch information
hinthornw authored Jun 4, 2024
1 parent 7326476 commit 42d33c6
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 38 deletions.
100 changes: 63 additions & 37 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2156,7 +2156,7 @@ def get_test_results(
project_id: Optional[ID_TYPE] = None,
project_name: Optional[str] = None,
) -> "pd.DataFrame":
"""Read the record-level information from a test project into a Pandas DF.
"""Read the record-level information from an experiment into a Pandas DF.
Note: this will fetch whatever data exists in the DB. Results are not
immediately available in the DB upon evaluation run completion.
Expand All @@ -2166,24 +2166,47 @@ def get_test_results(
pd.DataFrame
A dataframe containing the test results.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed # type: ignore

import pandas as pd # type: ignore

runs = self.list_runs(
project_id=project_id, project_name=project_name, is_root=True
project_id=project_id,
project_name=project_name,
is_root=True,
select=[
"id",
"reference_example_id",
"inputs",
"outputs",
"error",
"feedback_stats",
"start_time",
"end_time",
],
)
results = []
results: list[dict] = []
example_ids = []
for r in runs:
row = {
"example_id": r.reference_example_id,
**{f"input.{k}": v for k, v in r.inputs.items()},
**{f"outputs.{k}": v for k, v in (r.outputs or {}).items()},
}
if r.feedback_stats:
for k, v in r.feedback_stats.items():
row[f"feedback.{k}"] = v.get("avg")
row.update(

def fetch_examples(batch):
examples = self.list_examples(example_ids=batch)
return [
{
"example_id": example.id,
**{f"reference.{k}": v for k, v in (example.outputs or {}).items()},
}
for example in examples
]

batch_size = 50
cursor = 0
with ThreadPoolExecutor() as executor:
futures = []
for r in runs:
row = {
"example_id": r.reference_example_id,
**{f"input.{k}": v for k, v in r.inputs.items()},
**{f"outputs.{k}": v for k, v in (r.outputs or {}).items()},
"execution_time": (
(r.end_time - r.start_time).total_seconds()
if r.end_time
Expand All @@ -2192,32 +2215,35 @@ def get_test_results(
"error": r.error,
"id": r.id,
}
)
if r.reference_example_id:
example_ids.append(r.reference_example_id)
results.append(row)
result = pd.DataFrame(results).set_index("example_id")
batch_size = 100
example_outputs = []
for batch in [
example_ids[i : i + batch_size]
for i in range(0, len(example_ids), batch_size)
]:
for example in self.list_examples(example_ids=batch):
example_outputs.append(
{
"example_id": example.id,
**{
f"reference.{k}": v
for k, v in (example.outputs or {}).items()
},
}
)
if r.feedback_stats:
row.update(
{
f"feedback.{k}": v.get("avg")
for k, v in r.feedback_stats.items()
}
)
if r.reference_example_id:
example_ids.append(r.reference_example_id)
if len(results) % batch_size == 0:
# Ensure not empty
if batch := example_ids[cursor : cursor + batch_size]:
futures.append(executor.submit(fetch_examples, batch))
cursor += batch_size
results.append(row)

# Handle any remaining examples
if example_ids[cursor:]:
futures.append(executor.submit(fetch_examples, example_ids[cursor:]))
result_df = pd.DataFrame(results).set_index("example_id")
example_outputs = [
output for future in as_completed(futures) for output in future.result()
]
if example_outputs:
df = pd.DataFrame(example_outputs).set_index("example_id")
result = df.merge(result, left_index=True, right_index=True)
example_df = pd.DataFrame(example_outputs).set_index("example_id")
result_df = example_df.merge(result_df, left_index=True, right_index=True)

# Flatten dict columns into dot syntax for easier access
return pd.json_normalize(result.to_dict(orient="records"))
return pd.json_normalize(result_df.to_dict(orient="records"))

def list_projects(
self,
Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "langsmith"
version = "0.1.69"
version = "0.1.70"
description = "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform."
authors = ["LangChain <[email protected]>"]
license = "MIT"
Expand Down

0 comments on commit 42d33c6

Please sign in to comment.