Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
isahers1 committed Dec 19, 2024
1 parent 7f7d7af commit 696e4bf
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 41 deletions.
116 changes: 89 additions & 27 deletions python/langsmith/evaluation/_arunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,15 +491,24 @@ async def _aevaluate(
cache_path = None
with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]):
if is_async_target:
manager = await manager.awith_predictions(
cast(ATARGET_T, target), max_concurrency=max_concurrency
)
if evaluators:
manager = await manager.awith_evaluators(
evaluators, max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
if evaluators:
# Run predictions and evaluations in a single pipeline
manager = await manager.awith_predictions_and_evaluators(
cast(ATARGET_T, target), evaluators, max_concurrency=max_concurrency
)
else:
manager = await manager.awith_predictions(
cast(ATARGET_T, target), max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
else:
if evaluators:
manager = await manager.awith_evaluators(
evaluators, max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
results = AsyncExperimentResults(manager)
if blocking:
await results.wait()
Expand Down Expand Up @@ -642,6 +651,61 @@ async def astart(self) -> _AsyncExperimentManager:
upload_results=self._upload_results,
)

async def awith_predictions_and_evaluators(
self,
target: ATARGET_T,
evaluators: Sequence[Union[EVALUATOR_T, AEVALUATOR_T]],
/,
max_concurrency: Optional[int] = None,
) -> _AsyncExperimentManager:
"""Run predictions and evaluations in a single pipeline.
This allows evaluators to process results as soon as they're available from
the target function, rather than waiting for all predictions to complete first.
"""
evaluators = _resolve_evaluators(evaluators)

if not hasattr(self, "_evaluator_executor"):
self._evaluator_executor = cf.ThreadPoolExecutor(max_workers=4)

async def process_examples():
async for pred in self._apredict(
target,
max_concurrency=max_concurrency,
include_attachments=_include_attachments(target),
):
example, run = pred["example"], pred["run"]
result = self._arun_evaluators(
evaluators,
{
"run": run,
"example": example,
"evaluation_results": {"results": []},
},
executor=self._evaluator_executor,
)
yield result

experiment_results = aitertools.aiter_with_concurrency(
max_concurrency,
process_examples(),
_eager_consumption_timeout=0.001,
)

r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock())

return _AsyncExperimentManager(
(result["example"] async for result in r1),
experiment=self._experiment,
metadata=self._metadata,
client=self.client,
runs=(result["run"] async for result in r2),
evaluation_results=(result["evaluation_results"] async for result in r3),
summary_results=self._summary_results,
include_attachments=self._include_attachments,
upload_results=self._upload_results,
)

async def awith_predictions(
self,
target: ATARGET_T,
Expand Down Expand Up @@ -796,15 +860,20 @@ async def _arun_evaluators(
run = current_results["run"]
example = current_results["example"]
eval_results = current_results["evaluation_results"]
for evaluator in evaluators:
lock = asyncio.Lock()

async def _run_single_evaluator(evaluator):
try:
evaluator_response = await evaluator.aevaluate_run(
run=run,
example=example,
)
eval_results["results"].extend(
self.client._select_eval_results(evaluator_response)
selected_results = self.client._select_eval_results(
evaluator_response
)
async with lock:
eval_results["results"].extend(selected_results)

if self._upload_results:
self.client._log_evaluation_feedback(
evaluator_response, run=run, _executor=executor
Expand All @@ -824,9 +893,11 @@ async def _arun_evaluators(
for key in feedback_keys
]
)
eval_results["results"].extend(
self.client._select_eval_results(error_response)
selected_results = self.client._select_eval_results(
error_response
)
async with lock:
eval_results["results"].extend(selected_results)
if self._upload_results:
self.client._log_evaluation_feedback(
error_response, run=run, _executor=executor
Expand All @@ -839,15 +910,10 @@ async def _arun_evaluators(
f" run {run.id}: {repr(e)}",
exc_info=True,
)
logger.error(
f"Error running evaluator {repr(evaluator)} on"
f" run {run.id}: {repr(e)}",
exc_info=True,
)
if example.attachments is not None:
for attachment in example.attachments:
reader = example.attachments[attachment]["reader"]
reader.seek(0)

await asyncio.gather(
*[_run_single_evaluator(evaluator) for evaluator in evaluators]
)
return ExperimentResultRow(
run=run,
example=example,
Expand Down Expand Up @@ -1064,10 +1130,6 @@ def _get_run(r: run_trees.RunTree) -> None:
client=client,
),
)
if include_attachments and example.attachments is not None:
for attachment in example.attachments:
reader = example.attachments[attachment]["reader"]
reader.seek(0)
except Exception as e:
logger.error(
f"Error running target function: {e}", exc_info=True, stacklevel=1
Expand Down
11 changes: 10 additions & 1 deletion python/langsmith/evaluation/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import asyncio
import inspect
import io
import uuid
from abc import abstractmethod
from typing import (
Expand Down Expand Up @@ -666,7 +667,15 @@ async def awrapper(
"example": example,
"inputs": example.inputs if example else {},
"outputs": run.outputs or {},
"attachments": example.attachments or {} if example else {},
"attachments": {
name: {
"presigned_url": value["presigned_url"],
"reader": io.BytesIO(value["reader"].getvalue()),
}
for name, value in (example.attachments or {}).items()
}
if example
else {},
"reference_outputs": example.outputs or {} if example else {},
}
args = (arg_map[arg] for arg in positional_args)
Expand Down
8 changes: 4 additions & 4 deletions python/langsmith/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ def read(self, size: int = -1) -> bytes:
"""Read function."""
...

def write(self, b: bytes) -> int:
"""Write function."""
...

def seek(self, offset: int, whence: int = 0) -> int:
"""Seek function."""
...

def getvalue(self) -> bytes:
"""Get value function."""
...


class ExampleBase(BaseModel):
"""Example base model."""
Expand Down
21 changes: 12 additions & 9 deletions python/tests/integration_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1488,15 +1488,18 @@ async def test_aevaluate_with_attachments(langchain_client: Client) -> None:
data_type=DataType.kv,
)

example = ExampleUploadWithAttachments(
inputs={"question": "What is shown in the image?"},
outputs={"answer": "test image"},
attachments={
"image": ("image/png", b"fake image data for testing"),
},
)
examples = [
ExampleUploadWithAttachments(
inputs={"question": "What is shown in the image?"},
outputs={"answer": "test image"},
attachments={
"image": ("image/png", b"fake image data for testing"),
},
)
for i in range(10)
]

langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=[example])
langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=examples)

async def target(
inputs: Dict[str, Any], attachments: Dict[str, Any]
Expand Down Expand Up @@ -1542,7 +1545,7 @@ async def evaluator_2(
max_concurrency=3,
)

assert len(results) == 2
assert len(results) == 20
async for result in results:
assert result["evaluation_results"]["results"][0].score == 1.0
assert result["evaluation_results"]["results"][1].score == 1.0
Expand Down

0 comments on commit 696e4bf

Please sign in to comment.