Skip to content

Commit

Permalink
In batch tracing endpoint, combine patch and post payloads where poss…
Browse files Browse the repository at this point in the history
…ible
  • Loading branch information
nfcampos committed Jan 19, 2024
1 parent 82ba901 commit 19d608d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 16 deletions.
41 changes: 26 additions & 15 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ def upload_csv(
**result, _host_url=self._host_url, _tenant_id=self._get_tenant_id()
)

def _hide_run_io(
def _run_transform(
self, run: Union[ls_schemas.Run, dict, ls_schemas.RunLikeDict]
) -> dict:
if hasattr(run, "dict") and callable(getattr(run, "dict")):
Expand Down Expand Up @@ -800,7 +800,7 @@ def create_run(
"execution_order": execution_order if execution_order is not None else 1,
}

run_create = self._hide_run_io(run_create)
run_create = self._run_transform(run_create)
self._insert_runtime_env([run_create])
headers = {
**self._headers,
Expand Down Expand Up @@ -851,31 +851,42 @@ def batch_ingest_runs(

if not create and not update:
return
headers = {
**self._headers,
"Accept": "application/json",
"Content-Type": "application/json",
}

# transform and convert to dicts
create_dicts = [self._run_transform(run) for run in create or []]
update_dicts = [self._run_transform(run) for run in update or []]
# combine post and patch dicts where possible
if update_dicts and create_dicts:
create_by_id = {run["id"]: run for run in create_dicts}
standalone_updates: list[dict] = []
for run in update_dicts:
if run["id"] in create_by_id:
create_by_id[run["id"]].update(
{k: v for k, v in run.items() if v is not None}
)
else:
standalone_updates.append(run)
update_dicts = standalone_updates
# filter out runs that are not sampled
body = {
"post": self._filter_for_sampling(
self._hide_run_io(run) for run in create or []
),
"patch": self._filter_for_sampling(
(self._hide_run_io(run) for run in update or []), patch=True
),
"post": self._filter_for_sampling(create_dicts),
"patch": self._filter_for_sampling(update_dicts, patch=True),
}
if not body["post"] and not body["patch"]:
return

self._insert_runtime_env(body["post"])

self.request_with_retries(
"post",
f"{self.api_url}/runs/batch",
request_kwargs={
"data": json.dumps(body, default=_serialize_json),
"headers": headers,
"timeout": self.timeout_ms / 1000,
"headers": {
**self._headers,
"Accept": "application/json",
"Content-Type": "application/json",
},
},
)

Expand Down
11 changes: 10 additions & 1 deletion python/tests/integration_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,10 +455,19 @@ def test_batch_ingest_runs(langchain_client: Client) -> None:
"trace_id": str(trace_id),
"parent_run_id": str(trace_id),
"inputs": {"input1": 5, "input2": 6},
},
]
runs_to_update = [
{
"id": str(run_id_2),
"dotted_order": f"{current_time}{str(trace_id)}."
f"{later_time}{str(run_id_2)}",
"trace_id": str(trace_id),
"parent_run_id": str(trace_id),
"outputs": {"output1": 7, "output2": 8},
},
]
langchain_client.batch_ingest_runs(create=runs_to_create)
langchain_client.batch_ingest_runs(create=runs_to_create, update=runs_to_update)
runs = []
wait = 2
for _ in range(5):
Expand Down

0 comments on commit 19d608d

Please sign in to comment.