Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
isahers1 committed Dec 16, 2024
1 parent f684f2c commit 977340a
Showing 1 changed file with 79 additions and 25 deletions.
104 changes: 79 additions & 25 deletions python/langsmith/evaluation/_arunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,15 +491,24 @@ async def _aevaluate(
cache_path = None
with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]):
if is_async_target:
manager = await manager.awith_predictions(
cast(ATARGET_T, target), max_concurrency=max_concurrency
)
if evaluators:
manager = await manager.awith_evaluators(
evaluators, max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
if evaluators:
# Run predictions and evaluations in a single pipeline
manager = await manager.awith_predictions_and_evaluators(
cast(ATARGET_T, target), evaluators, max_concurrency=max_concurrency
)
else:
manager = await manager.awith_predictions(
cast(ATARGET_T, target), max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
else:
if evaluators:
manager = await manager.awith_evaluators(
evaluators, max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
results = AsyncExperimentResults(manager)
if blocking:
await results.wait()
Expand Down Expand Up @@ -642,6 +651,56 @@ async def astart(self) -> _AsyncExperimentManager:
upload_results=self._upload_results,
)

async def awith_predictions_and_evaluators(
self,
target: ATARGET_T,
evaluators: Sequence[Union[EVALUATOR_T, AEVALUATOR_T]],
/,
max_concurrency: Optional[int] = None,
) -> _AsyncExperimentManager:
"""Run predictions and evaluations in a single pipeline.
This allows evaluators to process results as soon as they're available from
the target function, rather than waiting for all predictions to complete first.
"""
evaluators = _resolve_evaluators(evaluators)

if not hasattr(self, '_evaluator_executor'):
self._evaluator_executor = cf.ThreadPoolExecutor(max_workers=4)
async def process_examples():
async for pred in self._apredict(
target,
max_concurrency=max_concurrency,
include_attachments=_include_attachments(target),
):
example, run = pred["example"], pred["run"]
result = self._arun_evaluators(
evaluators,
{"run": run, "example": example, "evaluation_results": {"results": []}},
executor=self._evaluator_executor,
)
yield result

experiment_results = aitertools.aiter_with_concurrency(
max_concurrency,
process_examples(),
_eager_consumption_timeout=0.001,
)

r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock())

return _AsyncExperimentManager(
(result["example"] async for result in r1),
experiment=self._experiment,
metadata=self._metadata,
client=self.client,
runs=(result["run"] async for result in r2),
evaluation_results=(result["evaluation_results"] async for result in r3),
summary_results=self._summary_results,
include_attachments=self._include_attachments,
upload_results=self._upload_results,
)

async def awith_predictions(
self,
target: ATARGET_T,
Expand Down Expand Up @@ -796,15 +855,17 @@ async def _arun_evaluators(
run = current_results["run"]
example = current_results["example"]
eval_results = current_results["evaluation_results"]
for evaluator in evaluators:
lock = asyncio.Lock()
async def _run_single_evaluator(evaluator):
try:
evaluator_response = await evaluator.aevaluate_run(
run=run,
example=example,
)
eval_results["results"].extend(
self.client._select_eval_results(evaluator_response)
)
selected_results = self.client._select_eval_results(evaluator_response)
async with lock:
eval_results["results"].extend(selected_results)

if self._upload_results:
self.client._log_evaluation_feedback(
evaluator_response, run=run, _executor=executor
Expand All @@ -824,9 +885,9 @@ async def _arun_evaluators(
for key in feedback_keys
]
)
eval_results["results"].extend(
self.client._select_eval_results(error_response)
)
selected_results = self.client._select_eval_results(error_response)
async with lock:
eval_results["results"].extend(selected_results)
if self._upload_results:
self.client._log_evaluation_feedback(
error_response, run=run, _executor=executor
Expand All @@ -839,15 +900,8 @@ async def _arun_evaluators(
f" run {run.id}: {repr(e)}",
exc_info=True,
)
logger.error(
f"Error running evaluator {repr(evaluator)} on"
f" run {run.id}: {repr(e)}",
exc_info=True,
)
if example.attachments is not None:
for attachment in example.attachments:
reader = example.attachments[attachment]["reader"]
reader.seek(0)

await asyncio.gather(*[_run_single_evaluator(evaluator) for evaluator in evaluators])
return ExperimentResultRow(
run=run,
example=example,
Expand Down

0 comments on commit 977340a

Please sign in to comment.