From c3f82b81c5f315bd2291df08a011a1a09eed7881 Mon Sep 17 00:00:00 2001 From: William Fu-Hinthorn <13333726+hinthornw@users.noreply.github.com> Date: Mon, 8 Jan 2024 12:47:20 -0800 Subject: [PATCH 01/31] Update --- python/langsmith/client.py | 76 +++++++++++++++---- python/langsmith/schemas.py | 29 +++++++ python/tests/integration_tests/test_client.py | 12 +-- python/tests/integration_tests/test_runs.py | 2 +- 4 files changed, 98 insertions(+), 21 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 60dfd6164..8df771cd3 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -689,6 +689,26 @@ def upload_csv( **result, _host_url=self._host_url, _tenant_id=self._get_tenant_id() ) + def _hide_run_io( + self, run: Union[ls_schemas.Run, dict, ls_schemas.RunLikeDict] + ) -> dict: + if hasattr(run, "dict") and callable(getattr(run, "dict")): + run_create = run.dict() # type: ignore + else: + run_create = cast(dict, run) + if "inputs" in run_create: + run_create["inputs"] = _hide_inputs(run_create["inputs"]) + if "outputs" in run_create: + run_create["outputs"] = _hide_outputs(run_create["outputs"]) + return run_create + + def _insert_runtime_env(self, runs: Sequence[dict]) -> None: + runtime_env = ls_env.get_runtime_and_metrics() + for run_create in runs: + run_extra = cast(dict, run_create.setdefault("extra", {})) + runtime = run_extra.setdefault("runtime", {}) + run_extra["runtime"] = {**runtime_env, **runtime} + def create_run( self, name: str, @@ -696,6 +716,7 @@ def create_run( run_type: str, *, execution_order: Optional[int] = None, + project_name: Optional[str] = None, **kwargs: Any, ) -> None: """Persist a run to the LangSmith API. @@ -720,28 +741,22 @@ def create_run( LangSmithUserError If the API key is not provided when using the hosted service. """ - project_name = kwargs.pop( - "project_name", - kwargs.pop( - "session_name", - # if the project is not provided, use the environment's project - ls_utils.get_tracer_project(), - ), + project_name = project_name or kwargs.pop( + "session_name", + # if the project is not provided, use the environment's project + ls_utils.get_tracer_project(), ) run_create = { **kwargs, "session_name": project_name, "name": name, - "inputs": _hide_inputs(inputs), + "inputs": inputs, "run_type": run_type, "execution_order": execution_order if execution_order is not None else 1, } - if "outputs" in run_create: - run_create["outputs"] = _hide_outputs(run_create["outputs"]) - run_extra = cast(dict, run_create.setdefault("extra", {})) - runtime = run_extra.setdefault("runtime", {}) - runtime_env = ls_env.get_runtime_and_metrics() - run_extra["runtime"] = {**runtime_env, **runtime} + + run_create = self._hide_run_io(run_create) + self._insert_runtime_env([run_create]) headers = { **self._headers, "Accept": "application/json", @@ -757,6 +772,39 @@ def create_run( }, ) + def batch_upsert_runs( + self, + create: Optional[ + Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict]] + ] = None, + update: Optional[ + Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict]] + ] = None, + ): + if not create and not update: + return + headers = { + **self._headers, + "Accept": "application/json", + "Content-Type": "application/json", + } + + body = { + "create": [self._hide_run_io(run) for run in create or []], + "update": [self._hide_run_io(run) for run in update or []], + } + self._insert_runtime_env(body["create"]) + + self.request_with_retries( + "post", + f"{self.api_url}/runs/batch", + request_kwargs={ + "data": json.dumps(body, default=_serialize_json), + "headers": headers, + "timeout": self.timeout_ms / 1000, + }, + ) + def update_run( self, run_id: ID_TYPE, diff --git a/python/langsmith/schemas.py b/python/langsmith/schemas.py index 2b3fba966..bec59b043 100644 --- a/python/langsmith/schemas.py +++ b/python/langsmith/schemas.py @@ -287,6 +287,35 @@ def url(self) -> Optional[str]: return None +class RunLikeDict(TypedDict, total=False): + """Run-like dictionary, for type-hinting.""" + + name: str + run_type: RunTypeEnum + start_time: datetime + inputs: Optional[dict] + outputs: Optional[dict] + end_time: Optional[datetime] + extra: Optional[dict] + error: Optional[str] + execution_order: int + serialized: Optional[dict] + parent_run_id: Optional[UUID] + manifest_id: Optional[UUID] + events: Optional[List[dict]] + tags: Optional[List[str]] + inputs_s3_urls: Optional[dict] + outputs_s3_urls: Optional[dict] + id: Optional[UUID] + session_id: Optional[UUID] + session_name: Optional[str] + reference_example_id: Optional[UUID] + input_attachments: Optional[dict] + output_attachments: Optional[dict] + trace_id: UUID + dotted_order: str + + class RunWithAnnotationQueueInfo(RunBase): """Run schema with annotation queue info.""" diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 4b5462df6..81dd93465 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -25,7 +25,7 @@ @pytest.fixture def langchain_client(monkeypatch: pytest.MonkeyPatch) -> Client: - monkeypatch.setenv("LANGCHAIN_ENDPOINT", "http://localhost:1984") + monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://api.smith.langchain.com") return Client() @@ -40,7 +40,7 @@ def test_projects(langchain_client: Client, monkeypatch: pytest.MonkeyPatch) -> ) assert new_project not in project_names - monkeypatch.setenv("LANGCHAIN_ENDPOINT", "http://localhost:1984") + monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://api.smith.langchain.com") langchain_client.create_project( project_name=new_project, project_extra={"evaluator": "THE EVALUATOR"}, @@ -134,7 +134,7 @@ def test_persist_update_run( monkeypatch: pytest.MonkeyPatch, langchain_client: Client ) -> None: """Test the persist and update methods work as expected.""" - monkeypatch.setenv("LANGCHAIN_ENDPOINT", "http://localhost:1984") + monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://api.smith.langchain.com") project_name = "__test_persist_update_run" if project_name in [sess.name for sess in langchain_client.list_projects()]: langchain_client.delete_project(project_name=project_name) @@ -172,7 +172,7 @@ def test_evaluate_run( monkeypatch: pytest.MonkeyPatch, langchain_client: Client ) -> None: """Test persisting runs and adding feedback.""" - monkeypatch.setenv("LANGCHAIN_ENDPOINT", "http://localhost:1984") + monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://api.smith.langchain.com") project_name = "__test_evaluate_run" dataset_name = "__test_evaluate_run_dataset" if project_name in [sess.name for sess in langchain_client.list_projects()]: @@ -255,7 +255,7 @@ def test_create_project( monkeypatch: pytest.MonkeyPatch, langchain_client: Client ) -> None: """Test the project creation""" - monkeypatch.setenv("LANGCHAIN_ENDPOINT", "http://localhost:1984") + monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://api.smith.langchain.com") try: langchain_client.read_project(project_name="__test_create_project") langchain_client.delete_project(project_name="__test_create_project") @@ -272,7 +272,7 @@ def test_create_dataset( monkeypatch: pytest.MonkeyPatch, langchain_client: Client ) -> None: """Test persisting runs and adding feedback.""" - monkeypatch.setenv("LANGCHAIN_ENDPOINT", "http://localhost:1984") + monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://api.smith.langchain.com") dataset_name = "__test_create_dataset" if dataset_name in [dataset.name for dataset in langchain_client.list_datasets()]: langchain_client.delete_dataset(dataset_name=dataset_name) diff --git a/python/tests/integration_tests/test_runs.py b/python/tests/integration_tests/test_runs.py index 6098e8d03..84f6b24ed 100644 --- a/python/tests/integration_tests/test_runs.py +++ b/python/tests/integration_tests/test_runs.py @@ -16,7 +16,7 @@ @pytest.fixture def langchain_client() -> Generator[Client, None, None]: original = os.environ.get("LANGCHAIN_ENDPOINT") - os.environ["LANGCHAIN_ENDPOINT"] = "http://localhost:1984" + os.environ["LANGCHAIN_ENDPOINT"] = "http://api.smith.langchain.com" yield Client() if original is None: os.environ.pop("LANGCHAIN_ENDPOINT") From 7e96bf2295e928e355ed3b02ba2f215a3167eaae Mon Sep 17 00:00:00 2001 From: William Fu-Hinthorn <13333726+hinthornw@users.noreply.github.com> Date: Mon, 8 Jan 2024 13:15:55 -0800 Subject: [PATCH 02/31] update --- python/langsmith/client.py | 82 ++++++++++++++++++++++++-------------- 1 file changed, 52 insertions(+), 30 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 8df771cd3..20bb00cc9 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -772,38 +772,60 @@ def create_run( }, ) - def batch_upsert_runs( - self, - create: Optional[ - Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict]] - ] = None, - update: Optional[ - Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict]] - ] = None, - ): - if not create and not update: - return - headers = { - **self._headers, - "Accept": "application/json", - "Content-Type": "application/json", - } + def batch_ingest_runs( + self, + create: Optional[ + Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict]] + ] = None, + update: Optional[ + Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict]] + ] = None, + ): + """ + Batch ingest/upsert multiple runs in the Langsmith system. + + Args: + create (Optional[Sequence[Union[ls_schemas.Run, RunLikeDict]]]): + A sequence of `Run` objects or equivalent dictionaries representing + runs to be created / posted. + update (Optional[Sequence[Union[ls_schemas.Run, RunLikeDict]]]): + A sequence of `Run` objects or equivalent dictionaries representing + runs that have already been created and should be updated / patched. + + Returns: + None: If both `create` and `update` are None. + + Raises: + LangsmithAPIError: If there is an error in the API request. + + Note: + - The run objects MUST contain the dotted_order and trace_id fields + to be accepted by the API. + """ + + if not create and not update: + return + headers = { + **self._headers, + "Accept": "application/json", + "Content-Type": "application/json", + } - body = { - "create": [self._hide_run_io(run) for run in create or []], - "update": [self._hide_run_io(run) for run in update or []], - } - self._insert_runtime_env(body["create"]) + body = { + "post": [self._hide_run_io(run) for run in create or []], + "patch": [self._hide_run_io(run) for run in update or []], + } + self._insert_runtime_env(body["post"]) - self.request_with_retries( - "post", - f"{self.api_url}/runs/batch", - request_kwargs={ - "data": json.dumps(body, default=_serialize_json), - "headers": headers, - "timeout": self.timeout_ms / 1000, - }, - ) + self.request_with_retries( + "post", + f"{self.api_url}/runs/batch", + request_kwargs={ + "data": json.dumps(body, default=_serialize_json), + "headers": headers, + "timeout": self.timeout_ms / 1000, + }, + ) def update_run( self, From f9c17ad814a39517b17f0ffa60f5e7f4358622ef Mon Sep 17 00:00:00 2001 From: William Fu-Hinthorn <13333726+hinthornw@users.noreply.github.com> Date: Mon, 8 Jan 2024 13:16:38 -0800 Subject: [PATCH 03/31] Update test --- python/tests/integration_tests/test_client.py | 45 +++++++++++++++++-- python/tests/integration_tests/test_runs.py | 2 +- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 81dd93465..437fc0472 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -25,7 +25,7 @@ @pytest.fixture def langchain_client(monkeypatch: pytest.MonkeyPatch) -> Client: - monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://api.smith.langchain.com") + monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://dev.api.smith.langchain.com") return Client() @@ -40,7 +40,7 @@ def test_projects(langchain_client: Client, monkeypatch: pytest.MonkeyPatch) -> ) assert new_project not in project_names - monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://api.smith.langchain.com") + monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://dev.api.smith.langchain.com") langchain_client.create_project( project_name=new_project, project_extra={"evaluator": "THE EVALUATOR"}, @@ -424,6 +424,43 @@ def test_create_chat_example( }, }, } - - # Delete dataset langchain_client.delete_dataset(dataset_id=dataset.id) + + +@pytest.mark.freeze_time("2023-01-01") +def test_batch_ingest_runs(langchain_client: Client) -> None: + trace_id = uuid4() + trace_id_2 = uuid4() + runs_to_create = [ + { + "name": "run 1", + "run_type": "chain", + "dotted_order": str(trace_id), + "trace_id": str(trace_id), + "inputs": {"input1": 1, "input2": 2}, + "outputs": {"output1": 3, "output2": 4}, + }, + { + "name": "run 2", + "run_type": "chain", + "dotted_order": str(trace_id_2), + "trace_id": str(trace_id_2), + "inputs": {"input1": 5, "input2": 6}, + "outputs": {"output1": 7, "output2": 8}, + }, + ] + langchain_client.batch_ingest_runs(create=runs_to_create) + + runs = list(langchain_client.list_runs()) + assert len(runs) == 2 + + assert runs[0].dotted_order == "1" + assert runs[0].trace_id == "123" + assert runs[0].inputs == {"input1": 1, "input2": 2} + assert runs[0].outputs == {"output1": 3, "output2": 4} + + assert runs[1].dotted_order == "2" + assert runs[1].trace_id == "456" + assert runs[1].inputs == {"input1": 5, "input2": 6} + assert runs[1].outputs == {"output1": 7, "output2": 8} + \ No newline at end of file diff --git a/python/tests/integration_tests/test_runs.py b/python/tests/integration_tests/test_runs.py index 84f6b24ed..6098e8d03 100644 --- a/python/tests/integration_tests/test_runs.py +++ b/python/tests/integration_tests/test_runs.py @@ -16,7 +16,7 @@ @pytest.fixture def langchain_client() -> Generator[Client, None, None]: original = os.environ.get("LANGCHAIN_ENDPOINT") - os.environ["LANGCHAIN_ENDPOINT"] = "http://api.smith.langchain.com" + os.environ["LANGCHAIN_ENDPOINT"] = "http://localhost:1984" yield Client() if original is None: os.environ.pop("LANGCHAIN_ENDPOINT") From 0ccc957ca6621313a3498fe28429f06d39bd6916 Mon Sep 17 00:00:00 2001 From: William Fu-Hinthorn <13333726+hinthornw@users.noreply.github.com> Date: Mon, 8 Jan 2024 17:22:41 -0800 Subject: [PATCH 04/31] Update test --- python/langsmith/client.py | 102 +++++++++--------- python/tests/integration_tests/test_client.py | 64 +++++++---- 2 files changed, 95 insertions(+), 71 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 20bb00cc9..39db2779a 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -773,59 +773,59 @@ def create_run( ) def batch_ingest_runs( - self, - create: Optional[ - Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict]] - ] = None, - update: Optional[ - Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict]] - ] = None, - ): - """ - Batch ingest/upsert multiple runs in the Langsmith system. - - Args: - create (Optional[Sequence[Union[ls_schemas.Run, RunLikeDict]]]): - A sequence of `Run` objects or equivalent dictionaries representing - runs to be created / posted. - update (Optional[Sequence[Union[ls_schemas.Run, RunLikeDict]]]): - A sequence of `Run` objects or equivalent dictionaries representing - runs that have already been created and should be updated / patched. - - Returns: - None: If both `create` and `update` are None. - - Raises: - LangsmithAPIError: If there is an error in the API request. - - Note: - - The run objects MUST contain the dotted_order and trace_id fields - to be accepted by the API. - """ - - if not create and not update: - return - headers = { - **self._headers, - "Accept": "application/json", - "Content-Type": "application/json", - } + self, + create: Optional[ + Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict]] + ] = None, + update: Optional[ + Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict]] + ] = None, + ): + """ + Batch ingest/upsert multiple runs in the Langsmith system. - body = { - "post": [self._hide_run_io(run) for run in create or []], - "patch": [self._hide_run_io(run) for run in update or []], - } - self._insert_runtime_env(body["post"]) + Args: + create (Optional[Sequence[Union[ls_schemas.Run, RunLikeDict]]]): + A sequence of `Run` objects or equivalent dictionaries representing + runs to be created / posted. + update (Optional[Sequence[Union[ls_schemas.Run, RunLikeDict]]]): + A sequence of `Run` objects or equivalent dictionaries representing + runs that have already been created and should be updated / patched. - self.request_with_retries( - "post", - f"{self.api_url}/runs/batch", - request_kwargs={ - "data": json.dumps(body, default=_serialize_json), - "headers": headers, - "timeout": self.timeout_ms / 1000, - }, - ) + Returns: + None: If both `create` and `update` are None. + + Raises: + LangsmithAPIError: If there is an error in the API request. + + Note: + - The run objects MUST contain the dotted_order and trace_id fields + to be accepted by the API. + """ + + if not create and not update: + return + headers = { + **self._headers, + "Accept": "application/json", + "Content-Type": "application/json", + } + + body = { + "post": [self._hide_run_io(run) for run in create or []], + "patch": [self._hide_run_io(run) for run in update or []], + } + self._insert_runtime_env(body["post"]) + + self.request_with_retries( + "post", + f"{self.api_url}/runs/batch", + request_kwargs={ + "data": json.dumps(body, default=_serialize_json), + "headers": headers, + "timeout": self.timeout_ms / 1000, + }, + ) def update_run( self, diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 437fc0472..9c7f1606d 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -4,7 +4,7 @@ import random import string import time -from datetime import datetime +from datetime import datetime, timedelta from typing import List, Optional from uuid import uuid4 @@ -25,7 +25,7 @@ @pytest.fixture def langchain_client(monkeypatch: pytest.MonkeyPatch) -> Client: - monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://dev.api.smith.langchain.com") + monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://api.smith.langchain.com") return Client() @@ -40,7 +40,7 @@ def test_projects(langchain_client: Client, monkeypatch: pytest.MonkeyPatch) -> ) assert new_project not in project_names - monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://dev.api.smith.langchain.com") + monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://api.smith.langchain.com") langchain_client.create_project( project_name=new_project, project_extra={"evaluator": "THE EVALUATOR"}, @@ -425,42 +425,66 @@ def test_create_chat_example( }, } langchain_client.delete_dataset(dataset_id=dataset.id) - -@pytest.mark.freeze_time("2023-01-01") + +@freeze_time("2023-01-01") def test_batch_ingest_runs(langchain_client: Client) -> None: + _session = "__test_batch_ingest_runs" trace_id = uuid4() - trace_id_2 = uuid4() + run_id_2 = uuid4() + current_time = datetime.utcnow().strftime("%Y%m%dT%H%M%S%fZ") + later_time = (datetime.utcnow() + timedelta(seconds=1)).strftime("%Y%m%dT%H%M%S%fZ") runs_to_create = [ { + "id": str(trace_id), + "session_name": _session, "name": "run 1", "run_type": "chain", - "dotted_order": str(trace_id), + "dotted_order": f"{current_time}{str(trace_id)}", "trace_id": str(trace_id), "inputs": {"input1": 1, "input2": 2}, "outputs": {"output1": 3, "output2": 4}, }, { + "id": str(run_id_2), + "session_name": _session, "name": "run 2", "run_type": "chain", - "dotted_order": str(trace_id_2), - "trace_id": str(trace_id_2), + "dotted_order": f"{current_time}{str(trace_id)}." + f"{later_time}{str(run_id_2)}", + "trace_id": str(trace_id), + "parent_run_id": str(trace_id), "inputs": {"input1": 5, "input2": 6}, "outputs": {"output1": 7, "output2": 8}, }, ] langchain_client.batch_ingest_runs(create=runs_to_create) - - runs = list(langchain_client.list_runs()) + runs = [] + wait = 2 + for _ in range(5): + try: + runs = list(langchain_client.list_runs(project_name=_session)) + if len(runs) == 2: + break + raise LangSmithError("Runs not created yet") + except LangSmithError: + time.sleep(wait) + wait += 4 + else: + raise ValueError("Runs not created in time") assert len(runs) == 2 + # Write all the assertions here + runs = sorted(runs, key=lambda x: x.dotted_order) + assert len(runs) == 2 + + # Assert inputs and outputs of run 1 + run1 = runs[0] + assert run1.inputs == {"input1": 1, "input2": 2} + assert run1.outputs == {"output1": 3, "output2": 4} - assert runs[0].dotted_order == "1" - assert runs[0].trace_id == "123" - assert runs[0].inputs == {"input1": 1, "input2": 2} - assert runs[0].outputs == {"output1": 3, "output2": 4} + # Assert inputs and outputs of run 2 + run2 = runs[1] + assert run2.inputs == {"input1": 5, "input2": 6} + assert run2.outputs == {"output1": 7, "output2": 8} - assert runs[1].dotted_order == "2" - assert runs[1].trace_id == "456" - assert runs[1].inputs == {"input1": 5, "input2": 6} - assert runs[1].outputs == {"output1": 7, "output2": 8} - \ No newline at end of file + langchain_client.delete_project(project_name=_session) From fae1b6572fd86eb8973edd8133bf65d49887de33 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Fri, 19 Jan 2024 15:38:46 -0800 Subject: [PATCH 05/31] Add optional tracing sampling rate to langsmith sdk - controlled by LANGCHAIN_TRACING_SAMPLING_RATE - if a POST was allowed then the matching PATCH is allowed too - applied on both single and batch tracing endpoints --- python/langsmith/client.py | 93 ++++++++++++++++----- python/tests/integration_tests/test_runs.py | 4 +- 2 files changed, 76 insertions(+), 21 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 39db2779a..d4254a0b7 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -21,6 +21,7 @@ Callable, DefaultDict, Dict, + Iterable, Iterator, List, Mapping, @@ -202,6 +203,25 @@ def _validate_api_key_if_hosted(api_url: str, api_key: Optional[str]) -> None: ) +def _get_tracing_sampling_rate() -> float | None: + """Get the tracing sampling rate. + + Returns + ------- + float + The tracing sampling rate. + """ + sampling_rate_str = os.getenv("LANGCHAIN_TRACING_SAMPLING_RATE") + if sampling_rate_str is None: + return None + sampling_rate = float(sampling_rate_str) + if sampling_rate < 0 or sampling_rate > 1: + raise ls_utils.LangSmithUserError( + "LANGCHAIN_TRACING_SAMPLING_RATE must be between 0 and 1 if set" + ) + return sampling_rate + + def _get_api_key(api_key: Optional[str]) -> Optional[str]: api_key = api_key if api_key is not None else os.getenv("LANGCHAIN_API_KEY") if api_key is None or not api_key.strip(): @@ -257,6 +277,8 @@ class Client: "_get_data_type_cached", "_web_url", "_tenant_id", + "tracing_sample_rate", + "_sampled_post_uuids", ] def __init__( @@ -295,6 +317,8 @@ def __init__( LangSmithUserError If the API key is not provided when using the hosted service. """ + self.tracing_sample_rate = _get_tracing_sampling_rate() + self._sampled_post_uuids = set() self.api_key = _get_api_key(api_key) self.api_url = _get_api_url(api_url, self.api_key) _validate_api_key_if_hosted(self.api_url, self.api_key) @@ -709,6 +733,27 @@ def _insert_runtime_env(self, runs: Sequence[dict]) -> None: runtime = run_extra.setdefault("runtime", {}) run_extra["runtime"] = {**runtime_env, **runtime} + def _filter_for_sampling( + self, runs: Iterable[dict], *, patch: bool = False + ) -> list[dict]: + if self.tracing_sample_rate is None: + return runs + + if patch: + sampled = [] + for run in runs: + if run["id"] in self._sampled_post_uuids: + sampled.append(run) + self._sampled_post_uuids.remove(run["id"]) + return sampled + else: + sampled = [] + for run in runs: + if random.random() < self.tracing_sample_rate: + sampled.append(run) + self._sampled_post_uuids.add(run["id"]) + return sampled + def create_run( self, name: str, @@ -762,15 +807,16 @@ def create_run( "Accept": "application/json", "Content-Type": "application/json", } - self.request_with_retries( - "post", - f"{self.api_url}/runs", - request_kwargs={ - "data": json.dumps(run_create, default=_serialize_json), - "headers": headers, - "timeout": self.timeout_ms / 1000, - }, - ) + if self._filter_for_sampling([run_create]): + self.request_with_retries( + "post", + f"{self.api_url}/runs", + request_kwargs={ + "data": json.dumps(run_create, default=_serialize_json), + "headers": headers, + "timeout": self.timeout_ms / 1000, + }, + ) def batch_ingest_runs( self, @@ -812,9 +858,15 @@ def batch_ingest_runs( } body = { - "post": [self._hide_run_io(run) for run in create or []], - "patch": [self._hide_run_io(run) for run in update or []], + "post": self._filter_for_sampling( + self._hide_run_io(run) for run in create or [] + ), + "patch": self._filter_for_sampling( + (self._hide_run_io(run) for run in update or []), patch=True + ), } + if not body["post"] and not body["patch"]: + return self._insert_runtime_env(body["post"]) self.request_with_retries( @@ -873,15 +925,16 @@ def update_run( data["outputs"] = _hide_outputs(outputs) if events is not None: data["events"] = events - self.request_with_retries( - "patch", - f"{self.api_url}/runs/{_as_uuid(run_id, 'run_id')}", - request_kwargs={ - "data": json.dumps(data, default=_serialize_json), - "headers": headers, - "timeout": self.timeout_ms / 1000, - }, - ) + if self._filter_for_sampling([data]): + self.request_with_retries( + "patch", + f"{self.api_url}/runs/{_as_uuid(run_id, 'run_id')}", + request_kwargs={ + "data": json.dumps(data, default=_serialize_json), + "headers": headers, + "timeout": self.timeout_ms / 1000, + }, + ) def _load_child_runs(self, run: ls_schemas.Run) -> ls_schemas.Run: """Load child runs for a given run. diff --git a/python/tests/integration_tests/test_runs.py b/python/tests/integration_tests/test_runs.py index 6098e8d03..623dd4fd2 100644 --- a/python/tests/integration_tests/test_runs.py +++ b/python/tests/integration_tests/test_runs.py @@ -72,13 +72,15 @@ def my_chain_run(text: str): my_chain_run("foo", langsmith_extra=dict(project_name=project_name)) executor.shutdown(wait=True) - for _ in range(5): + for _ in range(15): try: runs = list(langchain_client.list_runs(project_name=project_name)) assert len(runs) == 3 break except (ls_utils.LangSmithError, AssertionError): time.sleep(1) + else: + raise AssertionError("Failed to get runs after 5 attempts.") assert len(runs) == 3 runs_dict = {run.name: run for run in runs} assert runs_dict["my_chain_run"].parent_run_id is None From 82ba90151431a25be9b6658e2258efd05a2b556b Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Fri, 19 Jan 2024 15:42:00 -0800 Subject: [PATCH 06/31] Lint --- python/langsmith/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index d4254a0b7..c074adf09 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -318,7 +318,7 @@ def __init__( If the API key is not provided when using the hosted service. """ self.tracing_sample_rate = _get_tracing_sampling_rate() - self._sampled_post_uuids = set() + self._sampled_post_uuids: set[uuid.UUID] = set() self.api_key = _get_api_key(api_key) self.api_url = _get_api_url(api_url, self.api_key) _validate_api_key_if_hosted(self.api_url, self.api_key) @@ -737,7 +737,7 @@ def _filter_for_sampling( self, runs: Iterable[dict], *, patch: bool = False ) -> list[dict]: if self.tracing_sample_rate is None: - return runs + return list(runs) if patch: sampled = [] From 19d608dc256988a49a48bbd2250c0ce55cd3e71e Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Fri, 19 Jan 2024 15:55:32 -0800 Subject: [PATCH 07/31] In batch tracing endpoint, combine patch and post payloads where possible --- python/langsmith/client.py | 41 ++++++++++++------- python/tests/integration_tests/test_client.py | 11 ++++- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index c074adf09..565ad351f 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -713,7 +713,7 @@ def upload_csv( **result, _host_url=self._host_url, _tenant_id=self._get_tenant_id() ) - def _hide_run_io( + def _run_transform( self, run: Union[ls_schemas.Run, dict, ls_schemas.RunLikeDict] ) -> dict: if hasattr(run, "dict") and callable(getattr(run, "dict")): @@ -800,7 +800,7 @@ def create_run( "execution_order": execution_order if execution_order is not None else 1, } - run_create = self._hide_run_io(run_create) + run_create = self._run_transform(run_create) self._insert_runtime_env([run_create]) headers = { **self._headers, @@ -851,22 +851,29 @@ def batch_ingest_runs( if not create and not update: return - headers = { - **self._headers, - "Accept": "application/json", - "Content-Type": "application/json", - } - + # transform and convert to dicts + create_dicts = [self._run_transform(run) for run in create or []] + update_dicts = [self._run_transform(run) for run in update or []] + # combine post and patch dicts where possible + if update_dicts and create_dicts: + create_by_id = {run["id"]: run for run in create_dicts} + standalone_updates: list[dict] = [] + for run in update_dicts: + if run["id"] in create_by_id: + create_by_id[run["id"]].update( + {k: v for k, v in run.items() if v is not None} + ) + else: + standalone_updates.append(run) + update_dicts = standalone_updates + # filter out runs that are not sampled body = { - "post": self._filter_for_sampling( - self._hide_run_io(run) for run in create or [] - ), - "patch": self._filter_for_sampling( - (self._hide_run_io(run) for run in update or []), patch=True - ), + "post": self._filter_for_sampling(create_dicts), + "patch": self._filter_for_sampling(update_dicts, patch=True), } if not body["post"] and not body["patch"]: return + self._insert_runtime_env(body["post"]) self.request_with_retries( @@ -874,8 +881,12 @@ def batch_ingest_runs( f"{self.api_url}/runs/batch", request_kwargs={ "data": json.dumps(body, default=_serialize_json), - "headers": headers, "timeout": self.timeout_ms / 1000, + "headers": { + **self._headers, + "Accept": "application/json", + "Content-Type": "application/json", + }, }, ) diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 9c7f1606d..a0c9aadef 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -455,10 +455,19 @@ def test_batch_ingest_runs(langchain_client: Client) -> None: "trace_id": str(trace_id), "parent_run_id": str(trace_id), "inputs": {"input1": 5, "input2": 6}, + }, + ] + runs_to_update = [ + { + "id": str(run_id_2), + "dotted_order": f"{current_time}{str(trace_id)}." + f"{later_time}{str(run_id_2)}", + "trace_id": str(trace_id), + "parent_run_id": str(trace_id), "outputs": {"output1": 7, "output2": 8}, }, ] - langchain_client.batch_ingest_runs(create=runs_to_create) + langchain_client.batch_ingest_runs(create=runs_to_create, update=runs_to_update) runs = [] wait = 2 for _ in range(5): From 099162a42050fd8fe3a0fbc99690c59d7b23a476 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Fri, 19 Jan 2024 16:01:41 -0800 Subject: [PATCH 08/31] Update python/tests/integration_tests/test_runs.py Co-authored-by: William FH <13333726+hinthornw@users.noreply.github.com> --- python/tests/integration_tests/test_runs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/integration_tests/test_runs.py b/python/tests/integration_tests/test_runs.py index 623dd4fd2..4b6263bc7 100644 --- a/python/tests/integration_tests/test_runs.py +++ b/python/tests/integration_tests/test_runs.py @@ -80,7 +80,7 @@ def my_chain_run(text: str): except (ls_utils.LangSmithError, AssertionError): time.sleep(1) else: - raise AssertionError("Failed to get runs after 5 attempts.") + raise AssertionError("Failed to get runs after 15 attempts.") assert len(runs) == 3 runs_dict = {run.name: run for run in runs} assert runs_dict["my_chain_run"].parent_run_id is None From 511bcac8ec6457e0b6e6dc562b5f075fe3303cbe Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Fri, 19 Jan 2024 16:01:45 -0800 Subject: [PATCH 09/31] Update python/langsmith/client.py Co-authored-by: William FH <13333726+hinthornw@users.noreply.github.com> --- python/langsmith/client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index c074adf09..391dcc5ea 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -217,7 +217,8 @@ def _get_tracing_sampling_rate() -> float | None: sampling_rate = float(sampling_rate_str) if sampling_rate < 0 or sampling_rate > 1: raise ls_utils.LangSmithUserError( - "LANGCHAIN_TRACING_SAMPLING_RATE must be between 0 and 1 if set" + "LANGCHAIN_TRACING_SAMPLING_RATE must be between 0 and 1 if set." + f" Got: {sampling_rate}" ) return sampling_rate From fc3414614d6ff8761b6afe4b41421456baad944a Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Fri, 19 Jan 2024 16:08:18 -0800 Subject: [PATCH 10/31] Lint --- python/langsmith/client.py | 6 ++++-- python/tests/integration_tests/test_client.py | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 75dd4f962..3fb11fd4c 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -722,6 +722,8 @@ def _run_transform( run_create = run.dict() # type: ignore else: run_create = cast(dict, run) + if isinstance(run["id"], str): + run["id"] = uuid.UUID(run["id"]) if "inputs" in run_create: run_create["inputs"] = _hide_inputs(run_create["inputs"]) if "outputs" in run_create: @@ -830,10 +832,10 @@ def create_run( def batch_ingest_runs( self, create: Optional[ - Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict]] + Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict, Dict]] ] = None, update: Optional[ - Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict]] + Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict, Dict]] ] = None, ): """ diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index a0c9aadef..1f9334553 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -5,7 +5,7 @@ import string import time from datetime import datetime, timedelta -from typing import List, Optional +from typing import List, Optional, cast from uuid import uuid4 import pytest @@ -483,7 +483,7 @@ def test_batch_ingest_runs(langchain_client: Client) -> None: raise ValueError("Runs not created in time") assert len(runs) == 2 # Write all the assertions here - runs = sorted(runs, key=lambda x: x.dotted_order) + runs = sorted(runs, key=lambda x: cast(str, x.dotted_order)) assert len(runs) == 2 # Assert inputs and outputs of run 1 From 1f83125197940fbe07c320ef443bb466e857a202 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Fri, 19 Jan 2024 17:29:44 -0800 Subject: [PATCH 11/31] Add auto_batch_tracing modality for Client - if on, starts a background thread that batches inserts/updates - only applies to insert/updates w/ trace_id and dotted_order --- python/langsmith/client.py | 128 ++++++++++++++++++++----- python/tests/unit_tests/test_client.py | 36 ++++++- 2 files changed, 134 insertions(+), 30 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 3fb11fd4c..84ca3b63e 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -11,10 +11,13 @@ import logging import os import random +import signal import socket +import threading import time import uuid import weakref +from queue import Empty, Queue from typing import ( TYPE_CHECKING, Any, @@ -281,6 +284,8 @@ class Client: "_tenant_id", "tracing_sample_rate", "_sampled_post_uuids", + "tracing_queue", + "tracing_thread", ] def __init__( @@ -292,6 +297,7 @@ def __init__( timeout_ms: Optional[int] = None, web_url: Optional[str] = None, session: Optional[requests.Session] = None, + auto_batch_tracing: bool = True, ) -> None: """Initialize a Client instance. @@ -331,6 +337,22 @@ def __init__( # Create a session and register a finalizer to close it self.session = session if session else requests.Session() weakref.finalize(self, close_session, self.session) + # Initialize auto batching + if auto_batch_tracing: + self.tracing_queue: Queue = Queue() + exit_event = threading.Event() + + def signal_exit(signum: int, frame: Any) -> None: + exit_event.set() + + signal.signal(signal.SIGINT, signal_exit) + self.tracing_thread = threading.Thread( + target=self._tracing_thread_func, args=(exit_event,) + ) + self.tracing_thread.start() + else: + self.tracing_queue = None + self.tracing_thread = None # Mount the HTTPAdapter with the retry configuration adapter = requests_adapters.HTTPAdapter(max_retries=self.retry_config) @@ -340,6 +362,35 @@ def __init__( self._get_data_type ) + def _tracing_thread_func(self, exit_event: threading.Event) -> None: + def drain_queue(limit: Optional[int] = None) -> List[Tuple[str, dict]]: + next_batch: List[Tuple[str, dict]] = [] + try: + while item := self.tracing_queue.get(block=True, timeout=0.25): + next_batch.append(item) + if limit and len(next_batch) >= limit: + break + except Empty: + pass + return next_batch + + def handle_batch(batch: List[Tuple[str, dict]]) -> None: + create = [item[1] for item in batch if item[0] == "create"] + update = [item[1] for item in batch if item[0] == "update"] + try: + self.batch_ingest_runs(create=create, update=update, pre_sampled=True) + finally: + for _ in batch: + self.tracing_queue.task_done() + + # loop until we receive a signal to exit or the main thread dies + while not exit_event.is_set() and threading.main_thread().is_alive(): + if next_batch := drain_queue(100): + handle_batch(next_batch) + # drain the queue on exit + if next_batch := drain_queue(): + handle_batch(next_batch) + def _repr_html_(self) -> str: """Return an HTML representation of the instance with a link to the URL. @@ -810,6 +861,14 @@ def create_run( "run_type": run_type, "execution_order": execution_order if execution_order is not None else 1, } + if not self._filter_for_sampling([run_create]): + return + if ( + self.tracing_queue is not None + and run_create.get("trace_id") is not None + and run_create.get("dotted_order") is not None + ): + return self.tracing_queue.put(("create", run_create)) run_create = self._run_transform(run_create) self._insert_runtime_env([run_create]) @@ -818,16 +877,16 @@ def create_run( "Accept": "application/json", "Content-Type": "application/json", } - if self._filter_for_sampling([run_create]): - self.request_with_retries( - "post", - f"{self.api_url}/runs", - request_kwargs={ - "data": json.dumps(run_create, default=_serialize_json), - "headers": headers, - "timeout": self.timeout_ms / 1000, - }, - ) + + self.request_with_retries( + "post", + f"{self.api_url}/runs", + request_kwargs={ + "data": json.dumps(run_create, default=_serialize_json), + "headers": headers, + "timeout": self.timeout_ms / 1000, + }, + ) def batch_ingest_runs( self, @@ -837,6 +896,7 @@ def batch_ingest_runs( update: Optional[ Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict, Dict]] ] = None, + pre_sampled: bool = False, ): """ Batch ingest/upsert multiple runs in the Langsmith system. @@ -878,10 +938,16 @@ def batch_ingest_runs( standalone_updates.append(run) update_dicts = standalone_updates # filter out runs that are not sampled - body = { - "post": self._filter_for_sampling(create_dicts), - "patch": self._filter_for_sampling(update_dicts, patch=True), - } + if pre_sampled: + body = { + "post": create_dicts, + "patch": update_dicts, + } + else: + body = { + "post": self._filter_for_sampling(create_dicts), + "patch": self._filter_for_sampling(update_dicts, patch=True), + } if not body["post"] and not body["patch"]: return @@ -936,7 +1002,13 @@ def update_run( "Accept": "application/json", "Content-Type": "application/json", } - data: Dict[str, Any] = {} + data: Dict[str, Any] = { + "id": _as_uuid(run_id, "run_id"), + "trace_id": kwargs.pop("trace_id", None), + "dotted_order": kwargs.pop("dotted_order", None), + } + if not self._filter_for_sampling([data], patch=True): + return if end_time is not None: data["end_time"] = end_time.isoformat() if error is not None: @@ -947,16 +1019,22 @@ def update_run( data["outputs"] = _hide_outputs(outputs) if events is not None: data["events"] = events - if self._filter_for_sampling([data]): - self.request_with_retries( - "patch", - f"{self.api_url}/runs/{_as_uuid(run_id, 'run_id')}", - request_kwargs={ - "data": json.dumps(data, default=_serialize_json), - "headers": headers, - "timeout": self.timeout_ms / 1000, - }, - ) + if ( + self.tracing_queue is not None + and data["trace_id"] is not None + and data["dotted_order"] is not None + ): + return self.tracing_queue.put(("update", data)) + + self.request_with_retries( + "patch", + f"{self.api_url}/runs/{data['id']}", + request_kwargs={ + "data": json.dumps(data, default=_serialize_json), + "headers": headers, + "timeout": self.timeout_ms / 1000, + }, + ) def _load_child_runs(self, run: ls_schemas.Run) -> ls_schemas.Run: """Load child runs for a given run. diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index 930d4211d..8adb5d2a8 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -195,8 +195,15 @@ def test_create_run_unicode() -> None: client.update_run(id_, status="completed") -def test_create_run_includes_langchain_env_var_metadata() -> None: - client = Client(api_url="http://localhost:1984", api_key="123") +@pytest.mark.parametrize("auto_batch_tracing", [True, False]) +def test_create_run_includes_langchain_env_var_metadata( + auto_batch_tracing: bool +) -> None: + client = Client( + api_url="http://localhost:1984", + api_key="123", + auto_batch_tracing=auto_batch_tracing, + ) inputs = { "foo": "これは私の友達です", "bar": "این یک کتاب است", @@ -212,13 +219,32 @@ def test_create_run_includes_langchain_env_var_metadata() -> None: ls_env.get_langchain_env_var_metadata.cache_clear() with patch.object(client, "session", session): id_ = uuid.uuid4() + start_time = datetime.now() client.create_run( - "my_run", inputs=inputs, run_type="llm", execution_order=1, id=id_ + "my_run", + inputs=inputs, + run_type="llm", + execution_order=1, + id=id_, + trace_id=id_, + dotted_order=f"{start_time.strftime('%Y%m%dT%H%M%S%fZ')}{id_}", + start_time=start_time, ) + if auto_batch_tracing: + client.tracing_queue.join() # Check the posted value in the request posted_value = json.loads(session.request.call_args[1]["data"]) - assert posted_value["extra"]["metadata"]["LANGCHAIN_REVISION"] == "abcd2234" - assert "LANGCHAIN_API_KEY" not in posted_value["extra"]["metadata"] + if not auto_batch_tracing: + assert ( + posted_value["extra"]["metadata"]["LANGCHAIN_REVISION"] + == "abcd2234" + ) + assert "LANGCHAIN_API_KEY" not in posted_value["extra"]["metadata"] + else: + assert ( + posted_value["post"][0]["extra"]["metadata"]["LANGCHAIN_REVISION"] + == "abcd2234" + ) @pytest.mark.parametrize("source_type", ["api", "model"]) From 278042130f026f10aba42b022779c07dd591245f Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Fri, 19 Jan 2024 17:52:46 -0800 Subject: [PATCH 12/31] Lint --- python/langsmith/client.py | 82 +++++++++++++------------- python/tests/unit_tests/test_client.py | 6 +- 2 files changed, 45 insertions(+), 43 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 84ca3b63e..85271c791 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -184,6 +184,10 @@ def close_session(session: requests.Session) -> None: session.close() +def signal_exit(event: threading.Event, signum: int, frame: Any) -> None: + event.set() + + def _validate_api_key_if_hosted(api_url: str, api_key: Optional[str]) -> None: """Verify API key is provided if url not localhost. @@ -285,7 +289,6 @@ class Client: "tracing_sample_rate", "_sampled_post_uuids", "tracing_queue", - "tracing_thread", ] def __init__( @@ -339,20 +342,15 @@ def __init__( weakref.finalize(self, close_session, self.session) # Initialize auto batching if auto_batch_tracing: - self.tracing_queue: Queue = Queue() + self.tracing_queue: Optional[Queue] = Queue() exit_event = threading.Event() - def signal_exit(signum: int, frame: Any) -> None: - exit_event.set() - - signal.signal(signal.SIGINT, signal_exit) - self.tracing_thread = threading.Thread( - target=self._tracing_thread_func, args=(exit_event,) - ) - self.tracing_thread.start() + signal.signal(signal.SIGINT, functools.partial(signal_exit, exit_event)) + threading.Thread( + target=_tracing_thread_func, args=(self, exit_event) + ).start() else: self.tracing_queue = None - self.tracing_thread = None # Mount the HTTPAdapter with the retry configuration adapter = requests_adapters.HTTPAdapter(max_retries=self.retry_config) @@ -362,35 +360,6 @@ def signal_exit(signum: int, frame: Any) -> None: self._get_data_type ) - def _tracing_thread_func(self, exit_event: threading.Event) -> None: - def drain_queue(limit: Optional[int] = None) -> List[Tuple[str, dict]]: - next_batch: List[Tuple[str, dict]] = [] - try: - while item := self.tracing_queue.get(block=True, timeout=0.25): - next_batch.append(item) - if limit and len(next_batch) >= limit: - break - except Empty: - pass - return next_batch - - def handle_batch(batch: List[Tuple[str, dict]]) -> None: - create = [item[1] for item in batch if item[0] == "create"] - update = [item[1] for item in batch if item[0] == "update"] - try: - self.batch_ingest_runs(create=create, update=update, pre_sampled=True) - finally: - for _ in batch: - self.tracing_queue.task_done() - - # loop until we receive a signal to exit or the main thread dies - while not exit_event.is_set() and threading.main_thread().is_alive(): - if next_batch := drain_queue(100): - handle_batch(next_batch) - # drain the queue on exit - if next_batch := drain_queue(): - handle_batch(next_batch) - def _repr_html_(self) -> str: """Return an HTML representation of the instance with a link to the URL. @@ -3114,3 +3083,36 @@ def _evaluate_strings(self, prediction, reference=None, input=None, **kwargs) -> input_mapper=input_mapper, revision_id=revision_id, ) + + +def _tracing_thread_func(client: Client, exit_event: threading.Event) -> None: + tracing_queue = client.tracing_queue + assert tracing_queue is not None + + def drain_queue(limit: Optional[int] = None) -> List[Tuple[str, dict]]: + next_batch: List[Tuple[str, dict]] = [] + try: + while item := tracing_queue.get(block=True, timeout=0.25): + next_batch.append(item) + if limit and len(next_batch) >= limit: + break + except Empty: + pass + return next_batch + + def handle_batch(batch: List[Tuple[str, dict]]) -> None: + create = [item[1] for item in batch if item[0] == "create"] + update = [item[1] for item in batch if item[0] == "update"] + try: + client.batch_ingest_runs(create=create, update=update, pre_sampled=True) + finally: + for _ in batch: + tracing_queue.task_done() + + # loop until we receive a signal to exit or the main thread dies + while not exit_event.is_set() and threading.main_thread().is_alive(): + if next_batch := drain_queue(100): + handle_batch(next_batch) + # drain the queue on exit + if next_batch := drain_queue(): + handle_batch(next_batch) diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index 8adb5d2a8..46f93bf11 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -197,7 +197,7 @@ def test_create_run_unicode() -> None: @pytest.mark.parametrize("auto_batch_tracing", [True, False]) def test_create_run_includes_langchain_env_var_metadata( - auto_batch_tracing: bool + auto_batch_tracing: bool, ) -> None: client = Client( api_url="http://localhost:1984", @@ -230,8 +230,8 @@ def test_create_run_includes_langchain_env_var_metadata( dotted_order=f"{start_time.strftime('%Y%m%dT%H%M%S%fZ')}{id_}", start_time=start_time, ) - if auto_batch_tracing: - client.tracing_queue.join() + if tracing_queue := client.tracing_queue: + tracing_queue.join() # Check the posted value in the request posted_value = json.loads(session.request.call_args[1]["data"]) if not auto_batch_tracing: From 439e0e42b0efd6460a8a953788f8c165c6030b00 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Fri, 19 Jan 2024 18:28:13 -0800 Subject: [PATCH 13/31] Fix GC problem --- python/langsmith/client.py | 75 ++++++++++++++++---------- python/tests/unit_tests/test_client.py | 28 ++++++++++ 2 files changed, 76 insertions(+), 27 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 85271c791..464d820a1 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -13,6 +13,7 @@ import random import signal import socket +import sys import threading import time import uuid @@ -347,7 +348,7 @@ def __init__( signal.signal(signal.SIGINT, functools.partial(signal_exit, exit_event)) threading.Thread( - target=_tracing_thread_func, args=(self, exit_event) + target=_tracing_thread_func, args=(weakref.ref(self), exit_event) ).start() else: self.tracing_queue = None @@ -3085,34 +3086,54 @@ def _evaluate_strings(self, prediction, reference=None, input=None, **kwargs) -> ) -def _tracing_thread_func(client: Client, exit_event: threading.Event) -> None: +def _tracing_thread_drain_queue( + tracing_queue: Queue, limit: Optional[int] = None +) -> List[Tuple[str, dict]]: + next_batch: List[Tuple[str, dict]] = [] + try: + while item := tracing_queue.get(block=True, timeout=0.25): + next_batch.append(item) + if limit and len(next_batch) >= limit: + break + except Empty: + pass + return next_batch + + +def _tracing_thread_handle_batch( + client: Client, tracing_queue: Queue, batch: List[Tuple[str, dict]] +) -> None: + create = [item[1] for item in batch if item[0] == "create"] + update = [item[1] for item in batch if item[0] == "update"] + try: + client.batch_ingest_runs(create=create, update=update, pre_sampled=True) + finally: + for _ in batch: + tracing_queue.task_done() + + +def _tracing_thread_func( + client_ref: weakref.ref[Client], exit_event: threading.Event +) -> None: + client = client_ref() + if client is None: + return tracing_queue = client.tracing_queue assert tracing_queue is not None - def drain_queue(limit: Optional[int] = None) -> List[Tuple[str, dict]]: - next_batch: List[Tuple[str, dict]] = [] - try: - while item := tracing_queue.get(block=True, timeout=0.25): - next_batch.append(item) - if limit and len(next_batch) >= limit: - break - except Empty: - pass - return next_batch - - def handle_batch(batch: List[Tuple[str, dict]]) -> None: - create = [item[1] for item in batch if item[0] == "create"] - update = [item[1] for item in batch if item[0] == "update"] - try: - client.batch_ingest_runs(create=create, update=update, pre_sampled=True) - finally: - for _ in batch: - tracing_queue.task_done() + # loop until + while ( + # we receive a signal to exit + not exit_event.is_set() + # or the main thread dies + and threading.main_thread().is_alive() + # or we're the only remaining reference to the client + and sys.getrefcount(client) > 3 + # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached + ): + if next_batch := _tracing_thread_drain_queue(tracing_queue, 100): + _tracing_thread_handle_batch(client, tracing_queue, next_batch) - # loop until we receive a signal to exit or the main thread dies - while not exit_event.is_set() and threading.main_thread().is_alive(): - if next_batch := drain_queue(100): - handle_batch(next_batch) # drain the queue on exit - if next_batch := drain_queue(): - handle_batch(next_batch) + if next_batch := _tracing_thread_drain_queue(tracing_queue): + _tracing_thread_handle_batch(client, tracing_queue, next_batch) diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index 46f93bf11..f98a5af13 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -1,8 +1,11 @@ """Test the LangSmith client.""" import asyncio +import gc import json import os +import time import uuid +import weakref from datetime import datetime from io import BytesIO from typing import Optional @@ -195,6 +198,31 @@ def test_create_run_unicode() -> None: client.update_run(id_, status="completed") +class CallTracker: + def __init__(self) -> None: + self.counter = 0 + + def __call__(self, *args: object, **kwargs: object) -> None: + self.counter += 1 + + +@pytest.mark.parametrize("auto_batch_tracing", [True, False]) +def test_client_gc(auto_batch_tracing: bool) -> None: + client = Client( + api_url="http://localhost:1984", + api_key="123", + auto_batch_tracing=auto_batch_tracing, + ) + tracker = CallTracker() + weakref.finalize(client, tracker) + assert tracker.counter == 0 + + del client + time.sleep(1) # Give the background thread time to stop + gc.collect() # Force garbage collection + assert tracker.counter == 1, "Client was not garbage collected" + + @pytest.mark.parametrize("auto_batch_tracing", [True, False]) def test_create_run_includes_langchain_env_var_metadata( auto_batch_tracing: bool, From 1f98fbedc7595fdf1e2ccbf4d4820817f59081e6 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Fri, 19 Jan 2024 19:26:28 -0800 Subject: [PATCH 14/31] Remove unused signal, add comments --- python/langsmith/client.py | 38 +++++++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 464d820a1..f9ed5ad04 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -11,7 +11,6 @@ import logging import os import random -import signal import socket import sys import threading @@ -185,10 +184,6 @@ def close_session(session: requests.Session) -> None: session.close() -def signal_exit(event: threading.Event, signum: int, frame: Any) -> None: - event.set() - - def _validate_api_key_if_hosted(api_url: str, api_key: Optional[str]) -> None: """Verify API key is provided if url not localhost. @@ -344,11 +339,12 @@ def __init__( # Initialize auto batching if auto_batch_tracing: self.tracing_queue: Optional[Queue] = Queue() - exit_event = threading.Event() - signal.signal(signal.SIGINT, functools.partial(signal_exit, exit_event)) threading.Thread( - target=_tracing_thread_func, args=(weakref.ref(self), exit_event) + target=_tracing_thread_func, + # arg must be a weakref to self to avoid the Thread object + # preventing garbage collection of the Client object + args=(weakref.ref(self),), ).start() else: self.tracing_queue = None @@ -835,6 +831,7 @@ def create_run( return if ( self.tracing_queue is not None + # batch ingest requires trace_id and dotted_order to be set and run_create.get("trace_id") is not None and run_create.get("dotted_order") is not None ): @@ -866,6 +863,7 @@ def batch_ingest_runs( update: Optional[ Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict, Dict]] ] = None, + *, pre_sampled: bool = False, ): """ @@ -878,6 +876,9 @@ def batch_ingest_runs( update (Optional[Sequence[Union[ls_schemas.Run, RunLikeDict]]]): A sequence of `Run` objects or equivalent dictionaries representing runs that have already been created and should be updated / patched. + pre_sampled (bool, optional): Whether the runs have already been subject + to sampling, and therefore should not be sampled again. + Defaults to False. Returns: None: If both `create` and `update` are None. @@ -907,6 +908,16 @@ def batch_ingest_runs( else: standalone_updates.append(run) update_dicts = standalone_updates + for run in create_dicts: + if not run.get("trace_id") or not run.get("dotted_order"): + raise ls_utils.LangSmithUserError( + "Batch ingest requires trace_id and dotted_order to be set." + ) + for run in update_dicts: + if not run.get("trace_id") or not run.get("dotted_order"): + raise ls_utils.LangSmithUserError( + "Batch ingest requires trace_id and dotted_order to be set." + ) # filter out runs that are not sampled if pre_sampled: body = { @@ -991,6 +1002,7 @@ def update_run( data["events"] = events if ( self.tracing_queue is not None + # batch ingest requires trace_id and dotted_order to be set and data["trace_id"] is not None and data["dotted_order"] is not None ): @@ -3112,9 +3124,7 @@ def _tracing_thread_handle_batch( tracing_queue.task_done() -def _tracing_thread_func( - client_ref: weakref.ref[Client], exit_event: threading.Event -) -> None: +def _tracing_thread_func(client_ref: weakref.ref[Client]) -> None: client = client_ref() if client is None: return @@ -3123,10 +3133,8 @@ def _tracing_thread_func( # loop until while ( - # we receive a signal to exit - not exit_event.is_set() - # or the main thread dies - and threading.main_thread().is_alive() + # the main thread dies + threading.main_thread().is_alive() # or we're the only remaining reference to the client and sys.getrefcount(client) > 3 # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached From 3faa5eb6577573902134e55f3d28c71ada45dbc1 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Fri, 19 Jan 2024 19:45:31 -0800 Subject: [PATCH 15/31] Add missing parent_run_id --- python/langsmith/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index f9ed5ad04..2e6905a24 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -986,6 +986,7 @@ def update_run( data: Dict[str, Any] = { "id": _as_uuid(run_id, "run_id"), "trace_id": kwargs.pop("trace_id", None), + "parent_run_id": kwargs.pop("parent_run_id", None), "dotted_order": kwargs.pop("dotted_order", None), } if not self._filter_for_sampling([data], patch=True): From 3bd9e6f324f9ac11f2cf6a660f07629a34fc9955 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Sat, 20 Jan 2024 16:40:12 -0800 Subject: [PATCH 16/31] Adjust config --- python/langsmith/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 2e6905a24..757a6b939 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -330,7 +330,7 @@ def __init__( self.api_url = _get_api_url(api_url, self.api_key) _validate_api_key_if_hosted(self.api_url, self.api_key) self.retry_config = retry_config or _default_retry_config() - self.timeout_ms = timeout_ms or 7000 + self.timeout_ms = timeout_ms or 10000 self._web_url = web_url self._tenant_id: Optional[uuid.UUID] = None # Create a session and register a finalizer to close it @@ -3140,7 +3140,7 @@ def _tracing_thread_func(client_ref: weakref.ref[Client]) -> None: and sys.getrefcount(client) > 3 # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached ): - if next_batch := _tracing_thread_drain_queue(tracing_queue, 100): + if next_batch := _tracing_thread_drain_queue(tracing_queue, 250): _tracing_thread_handle_batch(client, tracing_queue, next_batch) # drain the queue on exit From 64febeb690f07be1bd91b37a0b88e31dddb9967b Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Sun, 21 Jan 2024 09:34:45 -0800 Subject: [PATCH 17/31] Use a priority queue to group runs from same trace in same batch where possible --- python/langsmith/client.py | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 757a6b939..cef63d26e 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -17,7 +17,8 @@ import time import uuid import weakref -from queue import Empty, Queue +from dataclasses import dataclass, field +from queue import Empty, PriorityQueue, Queue from typing import ( TYPE_CHECKING, Any, @@ -269,6 +270,13 @@ def _as_uuid(value: ID_TYPE, var: str) -> uuid.UUID: ) from e +@dataclass(order=True) +class TracingQueueItem: + priority: str + action: str + item: Any = field(compare=False) + + class Client: """Client for interacting with the LangSmith API.""" @@ -338,7 +346,7 @@ def __init__( weakref.finalize(self, close_session, self.session) # Initialize auto batching if auto_batch_tracing: - self.tracing_queue: Optional[Queue] = Queue() + self.tracing_queue: Optional[PriorityQueue] = PriorityQueue() threading.Thread( target=_tracing_thread_func, @@ -835,7 +843,9 @@ def create_run( and run_create.get("trace_id") is not None and run_create.get("dotted_order") is not None ): - return self.tracing_queue.put(("create", run_create)) + return self.tracing_queue.put( + TracingQueueItem(run_create["dotted_order"], "create", run_create) + ) run_create = self._run_transform(run_create) self._insert_runtime_env([run_create]) @@ -1007,7 +1017,9 @@ def update_run( and data["trace_id"] is not None and data["dotted_order"] is not None ): - return self.tracing_queue.put(("update", data)) + return self.tracing_queue.put( + TracingQueueItem(data["dotted_order"], "update", data) + ) self.request_with_retries( "patch", @@ -3101,8 +3113,8 @@ def _evaluate_strings(self, prediction, reference=None, input=None, **kwargs) -> def _tracing_thread_drain_queue( tracing_queue: Queue, limit: Optional[int] = None -) -> List[Tuple[str, dict]]: - next_batch: List[Tuple[str, dict]] = [] +) -> List[TracingQueueItem]: + next_batch: List[TracingQueueItem] = [] try: while item := tracing_queue.get(block=True, timeout=0.25): next_batch.append(item) @@ -3114,10 +3126,10 @@ def _tracing_thread_drain_queue( def _tracing_thread_handle_batch( - client: Client, tracing_queue: Queue, batch: List[Tuple[str, dict]] + client: Client, tracing_queue: Queue, batch: List[TracingQueueItem] ) -> None: - create = [item[1] for item in batch if item[0] == "create"] - update = [item[1] for item in batch if item[0] == "update"] + create = [it.item for it in batch if it.action == "create"] + update = [it.item for it in batch if it.action == "update"] try: client.batch_ingest_runs(create=create, update=update, pre_sampled=True) finally: @@ -3140,7 +3152,7 @@ def _tracing_thread_func(client_ref: weakref.ref[Client]) -> None: and sys.getrefcount(client) > 3 # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached ): - if next_batch := _tracing_thread_drain_queue(tracing_queue, 250): + if next_batch := _tracing_thread_drain_queue(tracing_queue, 100): _tracing_thread_handle_batch(client, tracing_queue, next_batch) # drain the queue on exit From 4f5c7934f86151b78b81b1d8e3e84a2c6e1a3053 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Sun, 21 Jan 2024 09:41:43 -0800 Subject: [PATCH 18/31] Also chunk at end --- python/langsmith/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index cef63d26e..efa7bf226 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -3156,5 +3156,5 @@ def _tracing_thread_func(client_ref: weakref.ref[Client]) -> None: _tracing_thread_handle_batch(client, tracing_queue, next_batch) # drain the queue on exit - if next_batch := _tracing_thread_drain_queue(tracing_queue): + while next_batch := _tracing_thread_drain_queue(tracing_queue, 100): _tracing_thread_handle_batch(client, tracing_queue, next_batch) From 86580987720e32484387267381b15a9c7b1a1d2c Mon Sep 17 00:00:00 2001 From: William Fu-Hinthorn <13333726+hinthornw@users.noreply.github.com> Date: Mon, 22 Jan 2024 15:28:57 -0800 Subject: [PATCH 19/31] Update retry --- python/langsmith/client.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index efa7bf226..3b1bb189b 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -453,6 +453,10 @@ def request_with_retries( LangSmithError If the request fails. """ + retry_on_ = tuple( + *(retry_on or []), + *(ls_utils.LangSmithConnectionError, ls_utils.LangSmithAPIError), + ) for idx in range(stop_after_attempt): try: try: @@ -506,7 +510,7 @@ def request_with_retries( raise ls_utils.LangSmithError( f"Failed to {request_method} {url} in LangSmith API. {emsg}" ) from e - except tuple(retry_on or []): + except retry_on_: if idx + 1 == stop_after_attempt: raise sleep_time = 2**idx + (random.random() * 0.5) From a6757192515ff166cb5c41b0e2383e70d9b9f941 Mon Sep 17 00:00:00 2001 From: William Fu-Hinthorn <13333726+hinthornw@users.noreply.github.com> Date: Mon, 22 Jan 2024 16:57:03 -0800 Subject: [PATCH 20/31] Actually retry --- python/langsmith/client.py | 3 +- python/tests/unit_tests/test_client.py | 75 ++++++++++++++++++++++++-- 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 3b1bb189b..9b5e48172 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -453,7 +453,8 @@ def request_with_retries( LangSmithError If the request fails. """ - retry_on_ = tuple( + + retry_on_: Tuple[Type[BaseException], ...] = ( *(retry_on or []), *(ls_utils.LangSmithConnectionError, ls_utils.LangSmithAPIError), ) diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index f98a5af13..fc026fd3b 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -10,12 +10,15 @@ from io import BytesIO from typing import Optional from unittest import mock -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest +import requests from pydantic import BaseModel +from requests import HTTPError import langsmith.env as ls_env +import langsmith.utils as ls_utils from langsmith.client import ( Client, _get_api_key, @@ -25,7 +28,6 @@ _serialize_json, ) from langsmith.schemas import Example -from langsmith.utils import LangSmithUserError _CREATED_AT = datetime(2015, 1, 1, 0, 0, 0) @@ -45,7 +47,7 @@ def test__is_langchain_hosted() -> None: def test_validate_api_key_if_hosted(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.delenv("LANGCHAIN_API_KEY", raising=False) - with pytest.raises(LangSmithUserError, match="API key must be provided"): + with pytest.raises(ls_utils.LangSmithUserError, match="API key must be provided"): Client(api_url="https://api.smith.langchain.com") client = Client(api_url="http://localhost:1984") assert client.api_url == "http://localhost:1984" @@ -175,7 +177,7 @@ def test_get_api_url() -> None: with patch.dict(os.environ, {"LANGCHAIN_ENDPOINT": "http://env.url"}): assert _get_api_url(None, None) == "http://env.url" - with pytest.raises(LangSmithUserError): + with pytest.raises(ls_utils.LangSmithUserError): _get_api_url(" ", "api_key") @@ -350,3 +352,68 @@ def test_host_url() -> None: client = Client(api_url="https://api.smith.langchain.com", api_key="API_KEY") assert client._host_url == "https://smith.langchain.com" + + +@patch("langsmith.client.time.sleep") +def test_retry_on_connection_error(mock_sleep): + client = Client(api_key="test") + with patch.object(client, "session") as mock_session: + mock_session.request.side_effect = requests.ConnectionError() + + with pytest.raises(ls_utils.LangSmithConnectionError): + client.request_with_retries( + "GET", "https://test.url", {}, stop_after_attempt=2 + ) + assert mock_session.request.call_count == 2 + + +@patch("langsmith.client.time.sleep") +def test_http_status_500_handling(mock_sleep): + client = Client(api_key="test") + with patch.object(client, "session") as mock_session: + mock_response = MagicMock() + mock_response.status_code = 500 + mock_response.raise_for_status.side_effect = HTTPError() + mock_session.request.return_value = mock_response + + with pytest.raises(ls_utils.LangSmithAPIError): + client.request_with_retries( + "GET", "https://test.url", {}, stop_after_attempt=2 + ) + assert mock_session.request.call_count == 2 + + +@patch("langsmith.client.ls_utils.raise_for_status_with_text") +def test_http_status_429_handling(mock_raise_for_status): + client = Client(api_key="test") + with patch.object(client, "session") as mock_session: + mock_response = MagicMock() + mock_response.status_code = 429 + mock_session.request.return_value = mock_response + mock_raise_for_status.side_effect = HTTPError() + with pytest.raises(ls_utils.LangSmithRateLimitError): + client.request_with_retries("GET", "https://test.url", {}) + + +@patch("langsmith.client.ls_utils.raise_for_status_with_text") +def test_http_status_401_handling(mock_raise_for_status): + client = Client(api_key="test") + with patch.object(client, "session") as mock_session: + mock_response = MagicMock() + mock_response.status_code = 401 + mock_session.request.return_value = mock_response + mock_raise_for_status.side_effect = HTTPError() + with pytest.raises(ls_utils.LangSmithAuthError): + client.request_with_retries("GET", "https://test.url", {}) + + +@patch("langsmith.client.ls_utils.raise_for_status_with_text") +def test_http_status_404_handling(mock_raise_for_status): + client = Client(api_key="test") + with patch.object(client, "session") as mock_session: + mock_response = MagicMock() + mock_response.status_code = 404 + mock_session.request.return_value = mock_response + mock_raise_for_status.side_effect = HTTPError() + with pytest.raises(ls_utils.LangSmithNotFoundError): + client.request_with_retries("GET", "https://test.url", {}) From 5bed31181b90dd8bfe40968d20bc2acaada9533c Mon Sep 17 00:00:00 2001 From: William FH <13333726+hinthornw@users.noreply.github.com> Date: Wed, 24 Jan 2024 07:06:20 -0800 Subject: [PATCH 21/31] Catch 409's (#375) --- python/langsmith/client.py | 21 +++++++++++++++++++++ python/langsmith/utils.py | 4 ++++ python/tests/unit_tests/test_client.py | 20 ++++++++++++++++++++ 3 files changed, 45 insertions(+) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 9b5e48172..d8bc7a8bc 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -425,6 +425,7 @@ def request_with_retries( request_kwargs: Mapping, stop_after_attempt: int = 1, retry_on: Optional[Sequence[Type[BaseException]]] = None, + to_ignore: Optional[Sequence[Type[BaseException]]] = None, ) -> requests.Response: """Send a request with retries. @@ -436,6 +437,13 @@ def request_with_retries( The URL to send the request to. request_kwargs : Mapping Additional request parameters. + stop_after_attempt : int, default=1 + The number of attempts to make. + retry_on : Sequence[Type[BaseException]] or None, default=None + The exceptions to retry on. In addition to: + [LangSmithConnectionError, LangSmithAPIError]. + to_ignore : Sequence[Type[BaseException]] or None, default=None + The exceptions to ignore / pass on. Returns ------- @@ -458,6 +466,8 @@ def request_with_retries( *(retry_on or []), *(ls_utils.LangSmithConnectionError, ls_utils.LangSmithAPIError), ) + to_ignore_: Tuple[Type[BaseException], ...] = (*(to_ignore or ()),) + response = None for idx in range(stop_after_attempt): try: try: @@ -486,6 +496,10 @@ def request_with_retries( raise ls_utils.LangSmithNotFoundError( f"Resource not found for {url}. {repr(e)}" ) + elif response.status_code == 409: + raise ls_utils.LangSmithConflictError( + f"Conflict for {url}. {repr(e)}" + ) else: raise ls_utils.LangSmithError( f"Failed to {request_method} {url} in LangSmith" @@ -511,6 +525,11 @@ def request_with_retries( raise ls_utils.LangSmithError( f"Failed to {request_method} {url} in LangSmith API. {emsg}" ) from e + except to_ignore_ as e: + if response is not None: + logger.debug("Passing on exception %s", e) + return response + # Else we still raise an error except retry_on_: if idx + 1 == stop_after_attempt: raise @@ -868,6 +887,7 @@ def create_run( "headers": headers, "timeout": self.timeout_ms / 1000, }, + to_ignore=(ls_utils.LangSmithConflictError,), ) def batch_ingest_runs( @@ -961,6 +981,7 @@ def batch_ingest_runs( "Content-Type": "application/json", }, }, + to_ignore=(ls_utils.LangSmithConflictError,), ) def update_run( diff --git a/python/langsmith/utils.py b/python/langsmith/utils.py index 1d87f5cf1..382dc6190 100644 --- a/python/langsmith/utils.py +++ b/python/langsmith/utils.py @@ -37,6 +37,10 @@ class LangSmithNotFoundError(LangSmithError): """Couldn't find the requested resource.""" +class LangSmithConflictError(LangSmithError): + """The resource already exists.""" + + class LangSmithConnectionError(LangSmithError): """Couldn't connect to the LangSmith API.""" diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index fc026fd3b..9e455c87e 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -383,6 +383,26 @@ def test_http_status_500_handling(mock_sleep): assert mock_session.request.call_count == 2 +@patch("langsmith.client.time.sleep") +def test_pass_on_409_handling(mock_sleep): + client = Client(api_key="test") + with patch.object(client, "session") as mock_session: + mock_response = MagicMock() + mock_response.status_code = 409 + mock_response.raise_for_status.side_effect = HTTPError() + mock_session.request.return_value = mock_response + + response = client.request_with_retries( + "GET", + "https://test.url", + {}, + stop_after_attempt=5, + to_ignore=[ls_utils.LangSmithConflictError], + ) + assert mock_session.request.call_count == 1 + assert response == mock_response + + @patch("langsmith.client.ls_utils.raise_for_status_with_text") def test_http_status_429_handling(mock_raise_for_status): client = Client(api_key="test") From 836648cdd618fd468273ccd290c587f05cc8a990 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Thu, 25 Jan 2024 12:59:12 -0800 Subject: [PATCH 22/31] Autoscale background threads for tracer auto batching --- python/langsmith/client.py | 53 +++++++++-- python/tests/unit_tests/test_client.py | 116 +++++++++++++++++++++++++ 2 files changed, 162 insertions(+), 7 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index d8bc7a8bc..50629173d 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -349,7 +349,7 @@ def __init__( self.tracing_queue: Optional[PriorityQueue] = PriorityQueue() threading.Thread( - target=_tracing_thread_func, + target=_tracing_control_thread_func, # arg must be a weakref to self to avoid the Thread object # preventing garbage collection of the Client object args=(weakref.ref(self),), @@ -3138,11 +3138,11 @@ def _evaluate_strings(self, prediction, reference=None, input=None, **kwargs) -> def _tracing_thread_drain_queue( - tracing_queue: Queue, limit: Optional[int] = None + tracing_queue: Queue, limit: int = 100, block: bool = True ) -> List[TracingQueueItem]: next_batch: List[TracingQueueItem] = [] try: - while item := tracing_queue.get(block=True, timeout=0.25): + while item := tracing_queue.get(block=block, timeout=0.25): next_batch.append(item) if limit and len(next_batch) >= limit: break @@ -3163,24 +3163,63 @@ def _tracing_thread_handle_batch( tracing_queue.task_done() -def _tracing_thread_func(client_ref: weakref.ref[Client]) -> None: +def _tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: client = client_ref() if client is None: return tracing_queue = client.tracing_queue assert tracing_queue is not None + sub_threads: List[threading.Thread] = [] + # loop until while ( # the main thread dies threading.main_thread().is_alive() # or we're the only remaining reference to the client - and sys.getrefcount(client) > 3 + and sys.getrefcount(client) > 3 + len(sub_threads) # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached ): - if next_batch := _tracing_thread_drain_queue(tracing_queue, 100): + print("im looping", tracing_queue.qsize()) + for thread in sub_threads: + if not thread.is_alive(): + sub_threads.remove(thread) + if tracing_queue.qsize() > 1000: + new_thread = threading.Thread( + target=_tracing_sub_thread_func, args=(weakref.ref(client),) + ) + sub_threads.append(new_thread) + new_thread.start() + if next_batch := _tracing_thread_drain_queue(tracing_queue): _tracing_thread_handle_batch(client, tracing_queue, next_batch) # drain the queue on exit - while next_batch := _tracing_thread_drain_queue(tracing_queue, 100): + while next_batch := _tracing_thread_drain_queue(tracing_queue, block=False): + _tracing_thread_handle_batch(client, tracing_queue, next_batch) + + +def _tracing_sub_thread_func(client_ref: weakref.ref[Client]) -> None: + client = client_ref() + if client is None: + return + tracing_queue = client.tracing_queue + assert tracing_queue is not None + + seen_successive_empty_queues = 0 + + # loop until + while ( + # the main thread dies + threading.main_thread().is_alive() + # or we've seen the queue empty 4 times in a row + and seen_successive_empty_queues < 5 + ): + if next_batch := _tracing_thread_drain_queue(tracing_queue): + seen_successive_empty_queues = 0 + _tracing_thread_handle_batch(client, tracing_queue, next_batch) + else: + seen_successive_empty_queues += 1 + + # drain the queue on exit + while next_batch := _tracing_thread_drain_queue(tracing_queue, block=False): _tracing_thread_handle_batch(client, tracing_queue, next_batch) diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index 9e455c87e..7cc484e5b 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -208,23 +208,139 @@ def __call__(self, *args: object, **kwargs: object) -> None: self.counter += 1 +@pytest.mark.parametrize("auto_batch_tracing", [True, False]) +def test_client_gc_empty(auto_batch_tracing: bool) -> None: + client = Client( + api_url="http://localhost:1984", + api_key="123", + auto_batch_tracing=auto_batch_tracing, + ) + tracker = CallTracker() + weakref.finalize(client, tracker) + assert tracker.counter == 0 + + del client + time.sleep(1) # Give the background thread time to stop + gc.collect() # Force garbage collection + assert tracker.counter == 1, "Client was not garbage collected" + + @pytest.mark.parametrize("auto_batch_tracing", [True, False]) def test_client_gc(auto_batch_tracing: bool) -> None: + session = mock.MagicMock(spec=requests.Session) client = Client( api_url="http://localhost:1984", api_key="123", auto_batch_tracing=auto_batch_tracing, + session=session, ) tracker = CallTracker() weakref.finalize(client, tracker) assert tracker.counter == 0 + for _ in range(10): + id = uuid.uuid4() + client.create_run( + "my_run", + inputs={}, + run_type="llm", + execution_order=1, + id=id, + trace_id=id, + dotted_order=id, + ) + + if auto_batch_tracing: + assert client.tracing_queue + client.tracing_queue.join() + + request_calls = [call for call in session.request.mock_calls if call.args] + assert len(request_calls) == 1 + for call in request_calls: + assert call.args[0] == "post" + assert call.args[1] == "http://localhost:1984/runs/batch" + else: + request_calls = [call for call in session.request.mock_calls if call.args] + assert len(request_calls) == 10 + for call in request_calls: + assert call.args[0] == "post" + assert call.args[1] == "http://localhost:1984/runs" + del client time.sleep(1) # Give the background thread time to stop gc.collect() # Force garbage collection assert tracker.counter == 1, "Client was not garbage collected" +@pytest.mark.parametrize("auto_batch_tracing", [True, False]) +def test_client_gc_no_batched_runs(auto_batch_tracing: bool) -> None: + session = mock.MagicMock(spec=requests.Session) + client = Client( + api_url="http://localhost:1984", + api_key="123", + auto_batch_tracing=auto_batch_tracing, + session=session, + ) + tracker = CallTracker() + weakref.finalize(client, tracker) + assert tracker.counter == 0 + + # because no trace_id/dotted_order provided, auto batch is disabled + for _ in range(10): + client.create_run( + "my_run", inputs={}, run_type="llm", execution_order=1, id=uuid.uuid4() + ) + request_calls = [call for call in session.request.mock_calls if call.args] + assert len(request_calls) == 10 + for call in request_calls: + assert call.args[0] == "post" + assert call.args[1] == "http://localhost:1984/runs" + + del client + time.sleep(1) # Give the background thread time to stop + gc.collect() # Force garbage collection + assert tracker.counter == 1, "Client was not garbage collected" + + +def test_client_gc_after_autoscale() -> None: + session = mock.MagicMock(spec=requests.Session) + client = Client( + api_url="http://localhost:1984", + api_key="123", + session=session, + ) + tracker = CallTracker() + weakref.finalize(client, tracker) + assert tracker.counter == 0 + + tracing_queue = client.tracing_queue + assert tracing_queue is not None + + for _ in range(50_000): + id = uuid.uuid4() + client.create_run( + "my_run", + inputs={}, + run_type="llm", + execution_order=1, + id=id, + trace_id=id, + dotted_order=id, + ) + + del client + tracing_queue.join() + time.sleep(2) # Give the background threads time to stop + gc.collect() # Force garbage collection + assert tracker.counter == 1, "Client was not garbage collected" + + request_calls = [call for call in session.request.mock_calls if call.args] + assert len(request_calls) >= 500 and len(request_calls) <= 550 + for call in request_calls: + assert call.args[0] == "post" + assert call.args[1] == "http://localhost:1984/runs/batch" + + @pytest.mark.parametrize("auto_batch_tracing", [True, False]) def test_create_run_includes_langchain_env_var_metadata( auto_batch_tracing: bool, From 8d00547c263f2319a26cc83d342eb20c75a74ae1 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Thu, 25 Jan 2024 13:22:20 -0800 Subject: [PATCH 23/31] Remove print --- python/langsmith/client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 50629173d..4af29679f 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -3180,7 +3180,6 @@ def _tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: and sys.getrefcount(client) > 3 + len(sub_threads) # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached ): - print("im looping", tracing_queue.qsize()) for thread in sub_threads: if not thread.is_alive(): sub_threads.remove(thread) From ddadf9a78a30aa4fd9f07cf5d1c917b2b4a85a9c Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Thu, 25 Jan 2024 13:38:48 -0800 Subject: [PATCH 24/31] Compromise --- python/langsmith/client.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 4af29679f..8718a711a 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -3142,7 +3142,14 @@ def _tracing_thread_drain_queue( ) -> List[TracingQueueItem]: next_batch: List[TracingQueueItem] = [] try: - while item := tracing_queue.get(block=block, timeout=0.25): + # wait 250ms for the first item, then + # - drain the queue with a 10ms block timeout + # - stop draining if we hit the limit + # shorter drain timeout is used instead of non-blocking calls to + # avoid creating too many small batches + if item := tracing_queue.get(block=block, timeout=0.25): + next_batch.append(item) + while item := tracing_queue.get(block=block, timeout=0.01): next_batch.append(item) if limit and len(next_batch) >= limit: break From 08f67351802fce0c77eb89d49637c377c9a64bab Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Thu, 25 Jan 2024 13:48:56 -0800 Subject: [PATCH 25/31] Add limits --- python/langsmith/client.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 8718a711a..4757f851e 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -3170,6 +3170,10 @@ def _tracing_thread_handle_batch( tracing_queue.task_done() +_AUTO_SCALE_UP_QSIZE_TRIGGER = 1000 +_AUTO_SCALE_UP_NTHREADS_LIMIT = 16 + + def _tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: client = client_ref() if client is None: @@ -3190,7 +3194,10 @@ def _tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: for thread in sub_threads: if not thread.is_alive(): sub_threads.remove(thread) - if tracing_queue.qsize() > 1000: + if ( + len(sub_threads) < _AUTO_SCALE_UP_NTHREADS_LIMIT + and tracing_queue.qsize() > _AUTO_SCALE_UP_QSIZE_TRIGGER + ): new_thread = threading.Thread( target=_tracing_sub_thread_func, args=(weakref.ref(client),) ) From 4538ac9ac19cf358bf28c3fc79ea9f731c86d66b Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Thu, 25 Jan 2024 13:50:41 -0800 Subject: [PATCH 26/31] Add one more constant --- python/langsmith/client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 4757f851e..5f84f9e8c 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -3172,6 +3172,7 @@ def _tracing_thread_handle_batch( _AUTO_SCALE_UP_QSIZE_TRIGGER = 1000 _AUTO_SCALE_UP_NTHREADS_LIMIT = 16 +_AUTO_SCALE_DOWN_NEMPTY_TRIGGER = 4 def _tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: @@ -3225,7 +3226,7 @@ def _tracing_sub_thread_func(client_ref: weakref.ref[Client]) -> None: # the main thread dies threading.main_thread().is_alive() # or we've seen the queue empty 4 times in a row - and seen_successive_empty_queues < 5 + and seen_successive_empty_queues <= _AUTO_SCALE_DOWN_NEMPTY_TRIGGER ): if next_batch := _tracing_thread_drain_queue(tracing_queue): seen_successive_empty_queues = 0 From 1645fcde70548dc49e5d6fdbaf88729a7320b40f Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Thu, 25 Jan 2024 14:35:59 -0800 Subject: [PATCH 27/31] Adjust --- python/langsmith/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 5f84f9e8c..aa13341b8 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -3143,13 +3143,13 @@ def _tracing_thread_drain_queue( next_batch: List[TracingQueueItem] = [] try: # wait 250ms for the first item, then - # - drain the queue with a 10ms block timeout + # - drain the queue with a 50ms block timeout # - stop draining if we hit the limit # shorter drain timeout is used instead of non-blocking calls to # avoid creating too many small batches if item := tracing_queue.get(block=block, timeout=0.25): next_batch.append(item) - while item := tracing_queue.get(block=block, timeout=0.01): + while item := tracing_queue.get(block=block, timeout=0.05): next_batch.append(item) if limit and len(next_batch) >= limit: break From e9de9f90c9015d7b8dc2aa52f50e118233d156d5 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Sat, 27 Jan 2024 17:26:59 -0800 Subject: [PATCH 28/31] Add auto_batch_tracing modality for Client (#372) - if on, starts a background thread that batches inserts/updates - only applies to insert/updates w/ trace_id and dotted_order - after release, bump sdk version here https://github.com/langchain-ai/langchain/pull/16305 --------- Co-authored-by: William Fu-Hinthorn <13333726+hinthornw@users.noreply.github.com> --- python/langsmith/client.py | 200 +++++++++++++++++++++---- python/langsmith/utils.py | 4 + python/tests/unit_tests/test_client.py | 159 ++++++++++++++++++-- 3 files changed, 328 insertions(+), 35 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 3fb11fd4c..d8bc7a8bc 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -12,9 +12,13 @@ import os import random import socket +import sys +import threading import time import uuid import weakref +from dataclasses import dataclass, field +from queue import Empty, PriorityQueue, Queue from typing import ( TYPE_CHECKING, Any, @@ -266,6 +270,13 @@ def _as_uuid(value: ID_TYPE, var: str) -> uuid.UUID: ) from e +@dataclass(order=True) +class TracingQueueItem: + priority: str + action: str + item: Any = field(compare=False) + + class Client: """Client for interacting with the LangSmith API.""" @@ -281,6 +292,7 @@ class Client: "_tenant_id", "tracing_sample_rate", "_sampled_post_uuids", + "tracing_queue", ] def __init__( @@ -292,6 +304,7 @@ def __init__( timeout_ms: Optional[int] = None, web_url: Optional[str] = None, session: Optional[requests.Session] = None, + auto_batch_tracing: bool = True, ) -> None: """Initialize a Client instance. @@ -325,12 +338,24 @@ def __init__( self.api_url = _get_api_url(api_url, self.api_key) _validate_api_key_if_hosted(self.api_url, self.api_key) self.retry_config = retry_config or _default_retry_config() - self.timeout_ms = timeout_ms or 7000 + self.timeout_ms = timeout_ms or 10000 self._web_url = web_url self._tenant_id: Optional[uuid.UUID] = None # Create a session and register a finalizer to close it self.session = session if session else requests.Session() weakref.finalize(self, close_session, self.session) + # Initialize auto batching + if auto_batch_tracing: + self.tracing_queue: Optional[PriorityQueue] = PriorityQueue() + + threading.Thread( + target=_tracing_thread_func, + # arg must be a weakref to self to avoid the Thread object + # preventing garbage collection of the Client object + args=(weakref.ref(self),), + ).start() + else: + self.tracing_queue = None # Mount the HTTPAdapter with the retry configuration adapter = requests_adapters.HTTPAdapter(max_retries=self.retry_config) @@ -400,6 +425,7 @@ def request_with_retries( request_kwargs: Mapping, stop_after_attempt: int = 1, retry_on: Optional[Sequence[Type[BaseException]]] = None, + to_ignore: Optional[Sequence[Type[BaseException]]] = None, ) -> requests.Response: """Send a request with retries. @@ -411,6 +437,13 @@ def request_with_retries( The URL to send the request to. request_kwargs : Mapping Additional request parameters. + stop_after_attempt : int, default=1 + The number of attempts to make. + retry_on : Sequence[Type[BaseException]] or None, default=None + The exceptions to retry on. In addition to: + [LangSmithConnectionError, LangSmithAPIError]. + to_ignore : Sequence[Type[BaseException]] or None, default=None + The exceptions to ignore / pass on. Returns ------- @@ -428,6 +461,13 @@ def request_with_retries( LangSmithError If the request fails. """ + + retry_on_: Tuple[Type[BaseException], ...] = ( + *(retry_on or []), + *(ls_utils.LangSmithConnectionError, ls_utils.LangSmithAPIError), + ) + to_ignore_: Tuple[Type[BaseException], ...] = (*(to_ignore or ()),) + response = None for idx in range(stop_after_attempt): try: try: @@ -456,6 +496,10 @@ def request_with_retries( raise ls_utils.LangSmithNotFoundError( f"Resource not found for {url}. {repr(e)}" ) + elif response.status_code == 409: + raise ls_utils.LangSmithConflictError( + f"Conflict for {url}. {repr(e)}" + ) else: raise ls_utils.LangSmithError( f"Failed to {request_method} {url} in LangSmith" @@ -481,7 +525,12 @@ def request_with_retries( raise ls_utils.LangSmithError( f"Failed to {request_method} {url} in LangSmith API. {emsg}" ) from e - except tuple(retry_on or []): + except to_ignore_ as e: + if response is not None: + logger.debug("Passing on exception %s", e) + return response + # Else we still raise an error + except retry_on_: if idx + 1 == stop_after_attempt: raise sleep_time = 2**idx + (random.random() * 0.5) @@ -810,6 +859,17 @@ def create_run( "run_type": run_type, "execution_order": execution_order if execution_order is not None else 1, } + if not self._filter_for_sampling([run_create]): + return + if ( + self.tracing_queue is not None + # batch ingest requires trace_id and dotted_order to be set + and run_create.get("trace_id") is not None + and run_create.get("dotted_order") is not None + ): + return self.tracing_queue.put( + TracingQueueItem(run_create["dotted_order"], "create", run_create) + ) run_create = self._run_transform(run_create) self._insert_runtime_env([run_create]) @@ -818,16 +878,17 @@ def create_run( "Accept": "application/json", "Content-Type": "application/json", } - if self._filter_for_sampling([run_create]): - self.request_with_retries( - "post", - f"{self.api_url}/runs", - request_kwargs={ - "data": json.dumps(run_create, default=_serialize_json), - "headers": headers, - "timeout": self.timeout_ms / 1000, - }, - ) + + self.request_with_retries( + "post", + f"{self.api_url}/runs", + request_kwargs={ + "data": json.dumps(run_create, default=_serialize_json), + "headers": headers, + "timeout": self.timeout_ms / 1000, + }, + to_ignore=(ls_utils.LangSmithConflictError,), + ) def batch_ingest_runs( self, @@ -837,6 +898,8 @@ def batch_ingest_runs( update: Optional[ Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict, Dict]] ] = None, + *, + pre_sampled: bool = False, ): """ Batch ingest/upsert multiple runs in the Langsmith system. @@ -848,6 +911,9 @@ def batch_ingest_runs( update (Optional[Sequence[Union[ls_schemas.Run, RunLikeDict]]]): A sequence of `Run` objects or equivalent dictionaries representing runs that have already been created and should be updated / patched. + pre_sampled (bool, optional): Whether the runs have already been subject + to sampling, and therefore should not be sampled again. + Defaults to False. Returns: None: If both `create` and `update` are None. @@ -877,11 +943,27 @@ def batch_ingest_runs( else: standalone_updates.append(run) update_dicts = standalone_updates + for run in create_dicts: + if not run.get("trace_id") or not run.get("dotted_order"): + raise ls_utils.LangSmithUserError( + "Batch ingest requires trace_id and dotted_order to be set." + ) + for run in update_dicts: + if not run.get("trace_id") or not run.get("dotted_order"): + raise ls_utils.LangSmithUserError( + "Batch ingest requires trace_id and dotted_order to be set." + ) # filter out runs that are not sampled - body = { - "post": self._filter_for_sampling(create_dicts), - "patch": self._filter_for_sampling(update_dicts, patch=True), - } + if pre_sampled: + body = { + "post": create_dicts, + "patch": update_dicts, + } + else: + body = { + "post": self._filter_for_sampling(create_dicts), + "patch": self._filter_for_sampling(update_dicts, patch=True), + } if not body["post"] and not body["patch"]: return @@ -899,6 +981,7 @@ def batch_ingest_runs( "Content-Type": "application/json", }, }, + to_ignore=(ls_utils.LangSmithConflictError,), ) def update_run( @@ -936,7 +1019,14 @@ def update_run( "Accept": "application/json", "Content-Type": "application/json", } - data: Dict[str, Any] = {} + data: Dict[str, Any] = { + "id": _as_uuid(run_id, "run_id"), + "trace_id": kwargs.pop("trace_id", None), + "parent_run_id": kwargs.pop("parent_run_id", None), + "dotted_order": kwargs.pop("dotted_order", None), + } + if not self._filter_for_sampling([data], patch=True): + return if end_time is not None: data["end_time"] = end_time.isoformat() if error is not None: @@ -947,17 +1037,26 @@ def update_run( data["outputs"] = _hide_outputs(outputs) if events is not None: data["events"] = events - if self._filter_for_sampling([data]): - self.request_with_retries( - "patch", - f"{self.api_url}/runs/{_as_uuid(run_id, 'run_id')}", - request_kwargs={ - "data": json.dumps(data, default=_serialize_json), - "headers": headers, - "timeout": self.timeout_ms / 1000, - }, + if ( + self.tracing_queue is not None + # batch ingest requires trace_id and dotted_order to be set + and data["trace_id"] is not None + and data["dotted_order"] is not None + ): + return self.tracing_queue.put( + TracingQueueItem(data["dotted_order"], "update", data) ) + self.request_with_retries( + "patch", + f"{self.api_url}/runs/{data['id']}", + request_kwargs={ + "data": json.dumps(data, default=_serialize_json), + "headers": headers, + "timeout": self.timeout_ms / 1000, + }, + ) + def _load_child_runs(self, run: ls_schemas.Run) -> ls_schemas.Run: """Load child runs for a given run. @@ -3036,3 +3135,52 @@ def _evaluate_strings(self, prediction, reference=None, input=None, **kwargs) -> input_mapper=input_mapper, revision_id=revision_id, ) + + +def _tracing_thread_drain_queue( + tracing_queue: Queue, limit: Optional[int] = None +) -> List[TracingQueueItem]: + next_batch: List[TracingQueueItem] = [] + try: + while item := tracing_queue.get(block=True, timeout=0.25): + next_batch.append(item) + if limit and len(next_batch) >= limit: + break + except Empty: + pass + return next_batch + + +def _tracing_thread_handle_batch( + client: Client, tracing_queue: Queue, batch: List[TracingQueueItem] +) -> None: + create = [it.item for it in batch if it.action == "create"] + update = [it.item for it in batch if it.action == "update"] + try: + client.batch_ingest_runs(create=create, update=update, pre_sampled=True) + finally: + for _ in batch: + tracing_queue.task_done() + + +def _tracing_thread_func(client_ref: weakref.ref[Client]) -> None: + client = client_ref() + if client is None: + return + tracing_queue = client.tracing_queue + assert tracing_queue is not None + + # loop until + while ( + # the main thread dies + threading.main_thread().is_alive() + # or we're the only remaining reference to the client + and sys.getrefcount(client) > 3 + # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached + ): + if next_batch := _tracing_thread_drain_queue(tracing_queue, 100): + _tracing_thread_handle_batch(client, tracing_queue, next_batch) + + # drain the queue on exit + while next_batch := _tracing_thread_drain_queue(tracing_queue, 100): + _tracing_thread_handle_batch(client, tracing_queue, next_batch) diff --git a/python/langsmith/utils.py b/python/langsmith/utils.py index 1d87f5cf1..382dc6190 100644 --- a/python/langsmith/utils.py +++ b/python/langsmith/utils.py @@ -37,6 +37,10 @@ class LangSmithNotFoundError(LangSmithError): """Couldn't find the requested resource.""" +class LangSmithConflictError(LangSmithError): + """The resource already exists.""" + + class LangSmithConnectionError(LangSmithError): """Couldn't connect to the LangSmith API.""" diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index 930d4211d..9e455c87e 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -1,18 +1,24 @@ """Test the LangSmith client.""" import asyncio +import gc import json import os +import time import uuid +import weakref from datetime import datetime from io import BytesIO from typing import Optional from unittest import mock -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest +import requests from pydantic import BaseModel +from requests import HTTPError import langsmith.env as ls_env +import langsmith.utils as ls_utils from langsmith.client import ( Client, _get_api_key, @@ -22,7 +28,6 @@ _serialize_json, ) from langsmith.schemas import Example -from langsmith.utils import LangSmithUserError _CREATED_AT = datetime(2015, 1, 1, 0, 0, 0) @@ -42,7 +47,7 @@ def test__is_langchain_hosted() -> None: def test_validate_api_key_if_hosted(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.delenv("LANGCHAIN_API_KEY", raising=False) - with pytest.raises(LangSmithUserError, match="API key must be provided"): + with pytest.raises(ls_utils.LangSmithUserError, match="API key must be provided"): Client(api_url="https://api.smith.langchain.com") client = Client(api_url="http://localhost:1984") assert client.api_url == "http://localhost:1984" @@ -172,7 +177,7 @@ def test_get_api_url() -> None: with patch.dict(os.environ, {"LANGCHAIN_ENDPOINT": "http://env.url"}): assert _get_api_url(None, None) == "http://env.url" - with pytest.raises(LangSmithUserError): + with pytest.raises(ls_utils.LangSmithUserError): _get_api_url(" ", "api_key") @@ -195,8 +200,40 @@ def test_create_run_unicode() -> None: client.update_run(id_, status="completed") -def test_create_run_includes_langchain_env_var_metadata() -> None: - client = Client(api_url="http://localhost:1984", api_key="123") +class CallTracker: + def __init__(self) -> None: + self.counter = 0 + + def __call__(self, *args: object, **kwargs: object) -> None: + self.counter += 1 + + +@pytest.mark.parametrize("auto_batch_tracing", [True, False]) +def test_client_gc(auto_batch_tracing: bool) -> None: + client = Client( + api_url="http://localhost:1984", + api_key="123", + auto_batch_tracing=auto_batch_tracing, + ) + tracker = CallTracker() + weakref.finalize(client, tracker) + assert tracker.counter == 0 + + del client + time.sleep(1) # Give the background thread time to stop + gc.collect() # Force garbage collection + assert tracker.counter == 1, "Client was not garbage collected" + + +@pytest.mark.parametrize("auto_batch_tracing", [True, False]) +def test_create_run_includes_langchain_env_var_metadata( + auto_batch_tracing: bool, +) -> None: + client = Client( + api_url="http://localhost:1984", + api_key="123", + auto_batch_tracing=auto_batch_tracing, + ) inputs = { "foo": "これは私の友達です", "bar": "این یک کتاب است", @@ -212,13 +249,32 @@ def test_create_run_includes_langchain_env_var_metadata() -> None: ls_env.get_langchain_env_var_metadata.cache_clear() with patch.object(client, "session", session): id_ = uuid.uuid4() + start_time = datetime.now() client.create_run( - "my_run", inputs=inputs, run_type="llm", execution_order=1, id=id_ + "my_run", + inputs=inputs, + run_type="llm", + execution_order=1, + id=id_, + trace_id=id_, + dotted_order=f"{start_time.strftime('%Y%m%dT%H%M%S%fZ')}{id_}", + start_time=start_time, ) + if tracing_queue := client.tracing_queue: + tracing_queue.join() # Check the posted value in the request posted_value = json.loads(session.request.call_args[1]["data"]) - assert posted_value["extra"]["metadata"]["LANGCHAIN_REVISION"] == "abcd2234" - assert "LANGCHAIN_API_KEY" not in posted_value["extra"]["metadata"] + if not auto_batch_tracing: + assert ( + posted_value["extra"]["metadata"]["LANGCHAIN_REVISION"] + == "abcd2234" + ) + assert "LANGCHAIN_API_KEY" not in posted_value["extra"]["metadata"] + else: + assert ( + posted_value["post"][0]["extra"]["metadata"]["LANGCHAIN_REVISION"] + == "abcd2234" + ) @pytest.mark.parametrize("source_type", ["api", "model"]) @@ -296,3 +352,88 @@ def test_host_url() -> None: client = Client(api_url="https://api.smith.langchain.com", api_key="API_KEY") assert client._host_url == "https://smith.langchain.com" + + +@patch("langsmith.client.time.sleep") +def test_retry_on_connection_error(mock_sleep): + client = Client(api_key="test") + with patch.object(client, "session") as mock_session: + mock_session.request.side_effect = requests.ConnectionError() + + with pytest.raises(ls_utils.LangSmithConnectionError): + client.request_with_retries( + "GET", "https://test.url", {}, stop_after_attempt=2 + ) + assert mock_session.request.call_count == 2 + + +@patch("langsmith.client.time.sleep") +def test_http_status_500_handling(mock_sleep): + client = Client(api_key="test") + with patch.object(client, "session") as mock_session: + mock_response = MagicMock() + mock_response.status_code = 500 + mock_response.raise_for_status.side_effect = HTTPError() + mock_session.request.return_value = mock_response + + with pytest.raises(ls_utils.LangSmithAPIError): + client.request_with_retries( + "GET", "https://test.url", {}, stop_after_attempt=2 + ) + assert mock_session.request.call_count == 2 + + +@patch("langsmith.client.time.sleep") +def test_pass_on_409_handling(mock_sleep): + client = Client(api_key="test") + with patch.object(client, "session") as mock_session: + mock_response = MagicMock() + mock_response.status_code = 409 + mock_response.raise_for_status.side_effect = HTTPError() + mock_session.request.return_value = mock_response + + response = client.request_with_retries( + "GET", + "https://test.url", + {}, + stop_after_attempt=5, + to_ignore=[ls_utils.LangSmithConflictError], + ) + assert mock_session.request.call_count == 1 + assert response == mock_response + + +@patch("langsmith.client.ls_utils.raise_for_status_with_text") +def test_http_status_429_handling(mock_raise_for_status): + client = Client(api_key="test") + with patch.object(client, "session") as mock_session: + mock_response = MagicMock() + mock_response.status_code = 429 + mock_session.request.return_value = mock_response + mock_raise_for_status.side_effect = HTTPError() + with pytest.raises(ls_utils.LangSmithRateLimitError): + client.request_with_retries("GET", "https://test.url", {}) + + +@patch("langsmith.client.ls_utils.raise_for_status_with_text") +def test_http_status_401_handling(mock_raise_for_status): + client = Client(api_key="test") + with patch.object(client, "session") as mock_session: + mock_response = MagicMock() + mock_response.status_code = 401 + mock_session.request.return_value = mock_response + mock_raise_for_status.side_effect = HTTPError() + with pytest.raises(ls_utils.LangSmithAuthError): + client.request_with_retries("GET", "https://test.url", {}) + + +@patch("langsmith.client.ls_utils.raise_for_status_with_text") +def test_http_status_404_handling(mock_raise_for_status): + client = Client(api_key="test") + with patch.object(client, "session") as mock_session: + mock_response = MagicMock() + mock_response.status_code = 404 + mock_session.request.return_value = mock_response + mock_raise_for_status.side_effect = HTTPError() + with pytest.raises(ls_utils.LangSmithNotFoundError): + client.request_with_retries("GET", "https://test.url", {}) From 7916eecc4040a5017ca2c588619c58c5b72350da Mon Sep 17 00:00:00 2001 From: William Fu-Hinthorn <13333726+hinthornw@users.noreply.github.com> Date: Sat, 27 Jan 2024 17:28:39 -0800 Subject: [PATCH 29/31] Turn auto-batch tracing off --- python/langsmith/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index d8bc7a8bc..d64444867 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -304,7 +304,7 @@ def __init__( timeout_ms: Optional[int] = None, web_url: Optional[str] = None, session: Optional[requests.Session] = None, - auto_batch_tracing: bool = True, + auto_batch_tracing: bool = False, ) -> None: """Initialize a Client instance. From 8732a810b75d6caa6f59b69d12bfeeacaa14affc Mon Sep 17 00:00:00 2001 From: William Fu-Hinthorn <13333726+hinthornw@users.noreply.github.com> Date: Sat, 27 Jan 2024 17:50:37 -0800 Subject: [PATCH 30/31] fixup int tests --- python/langsmith/client.py | 4 +- python/tests/integration_tests/test_client.py | 38 +++++++++---------- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index d2015553b..a674b034f 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -771,7 +771,9 @@ def _run_transform( run_create = run.dict() # type: ignore else: run_create = cast(dict, run) - if isinstance(run["id"], str): + if "id" not in run_create: + run_create["id"] = uuid.uuid4() + elif isinstance(run["id"], str): run["id"] = uuid.UUID(run["id"]) if "inputs" in run_create: run_create["inputs"] = _hide_inputs(run_create["inputs"]) diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 1f9334553..c1ee8becd 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -139,6 +139,7 @@ def test_persist_update_run( if project_name in [sess.name for sess in langchain_client.list_projects()]: langchain_client.delete_project(project_name=project_name) start_time = datetime.now() + revision_id = uuid4() run: dict = dict( id=uuid4(), name="test_run", @@ -149,22 +150,27 @@ def test_persist_update_run( execution_order=1, start_time=start_time, extra={"extra": "extra"}, + revision_id=revision_id, ) langchain_client.create_run(**run) run["outputs"] = {"output": ["Hi"]} run["extra"]["foo"] = "bar" langchain_client.update_run(run["id"], **run) - for _ in range(5): - try: - stored_run = langchain_client.read_run(run["id"]) - break - except LangSmithError: - time.sleep(2) - - assert stored_run.id == run["id"] - assert stored_run.outputs == run["outputs"] - assert stored_run.start_time == run["start_time"] - langchain_client.delete_project(project_name=project_name) + try: + for _ in range(10): + try: + stored_run = langchain_client.read_run(run["id"]) + if stored_run.end_time is not None: + break + except LangSmithError: + time.sleep(2) + + assert stored_run.id == run["id"] + assert stored_run.outputs == run["outputs"] + assert stored_run.start_time == run["start_time"] + assert stored_run.extra["metadata"]["revision_id"] == str(revision_id) + finally: + langchain_client.delete_project(project_name=project_name) @freeze_time("2023-01-01") @@ -313,8 +319,8 @@ def test_list_datasets(langchain_client: Client) -> None: assert dataset2.data_type == DataType.kv # Sub-filter on data type datasets = list(langchain_client.list_datasets(data_type=DataType.llm.value)) - assert len(datasets) == 1 - assert datasets[0].id == dataset1.id + assert len(datasets) > 0 + assert dataset1.id in {dataset.id for dataset in datasets} # Sub-filter on name datasets = list( langchain_client.list_datasets( @@ -325,12 +331,6 @@ def test_list_datasets(langchain_client: Client) -> None: # Delete datasets langchain_client.delete_dataset(dataset_id=dataset1.id) langchain_client.delete_dataset(dataset_id=dataset2.id) - assert ( - len( - list(langchain_client.list_datasets(dataset_ids=[dataset1.id, dataset2.id])) - ) - == 0 - ) @freeze_time("2023-01-01") From 0a5adfea770486ede66d050196e4fad33acb9d83 Mon Sep 17 00:00:00 2001 From: William Fu-Hinthorn <13333726+hinthornw@users.noreply.github.com> Date: Sat, 27 Jan 2024 17:56:21 -0800 Subject: [PATCH 31/31] fixup --- python/tests/integration_tests/test_client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index c1ee8becd..1eae4f974 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -168,6 +168,7 @@ def test_persist_update_run( assert stored_run.id == run["id"] assert stored_run.outputs == run["outputs"] assert stored_run.start_time == run["start_time"] + assert stored_run.extra assert stored_run.extra["metadata"]["revision_id"] == str(revision_id) finally: langchain_client.delete_project(project_name=project_name)