From 8773ab7ac670806b8e4b457156d61584e8df46e2 Mon Sep 17 00:00:00 2001 From: William FH <13333726+hinthornw@users.noreply.github.com> Date: Fri, 19 Jul 2024 17:45:27 -0700 Subject: [PATCH] Add run stats endpoints (#890) --- .github/workflows/js_test.yml | 4 +- js/package.json | 4 +- js/src/client.ts | 88 ++++++++++++++++++ js/src/index.ts | 2 +- js/src/tests/client.int.test.ts | 9 ++ python/langsmith/client.py | 91 +++++++++++++++++++ python/tests/integration_tests/test_client.py | 7 ++ python/tests/integration_tests/test_runs.py | 72 ++++++++++----- 8 files changed, 248 insertions(+), 29 deletions(-) diff --git a/.github/workflows/js_test.yml b/.github/workflows/js_test.yml index 172ed9034..1778178cc 100644 --- a/.github/workflows/js_test.yml +++ b/.github/workflows/js_test.yml @@ -81,7 +81,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - node-version: [18.x, 19.x, 20.x, 21.x, 22.x] + node-version: [18.x, 20.x, "22.4.1"] # See Node.js release schedule at https://nodejs.org/en/about/releases/ include: - os: windows-latest @@ -107,4 +107,4 @@ jobs: - name: Check version run: yarn run check-version - name: Test - run: yarn run test \ No newline at end of file + run: yarn run test diff --git a/js/package.json b/js/package.json index 45d3394fe..90e3086bb 100644 --- a/js/package.json +++ b/js/package.json @@ -1,6 +1,6 @@ { "name": "langsmith", - "version": "0.1.38", + "version": "0.1.39", "description": "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform.", "packageManager": "yarn@1.22.19", "files": [ @@ -261,4 +261,4 @@ }, "./package.json": "./package.json" } -} +} \ No newline at end of file diff --git a/js/src/client.ts b/js/src/client.ts index 05752d578..14746a17a 100644 --- a/js/src/client.ts +++ b/js/src/client.ts @@ -1229,6 +1229,94 @@ export class Client { } } + public async getRunStats({ + id, + trace, + parentRun, + runType, + projectNames, + projectIds, + referenceExampleIds, + startTime, + endTime, + error, + query, + filter, + traceFilter, + treeFilter, + isRoot, + dataSourceType, + }: { + id?: string[]; + trace?: string; + parentRun?: string; + runType?: string; + projectNames?: string[]; + projectIds?: string[]; + referenceExampleIds?: string[]; + startTime?: string; + endTime?: string; + error?: boolean; + query?: string; + filter?: string; + traceFilter?: string; + treeFilter?: string; + isRoot?: boolean; + dataSourceType?: string; + }): Promise { + let projectIds_ = projectIds || []; + if (projectNames) { + projectIds_ = [ + ...(projectIds || []), + ...(await Promise.all( + projectNames.map((name) => + this.readProject({ projectName: name }).then( + (project) => project.id + ) + ) + )), + ]; + } + + const payload = { + id, + trace, + parent_run: parentRun, + run_type: runType, + session: projectIds_, + reference_example: referenceExampleIds, + start_time: startTime, + end_time: endTime, + error, + query, + filter, + trace_filter: traceFilter, + tree_filter: treeFilter, + is_root: isRoot, + data_source_type: dataSourceType, + }; + + // Remove undefined values from the payload + const filteredPayload = Object.fromEntries( + Object.entries(payload).filter(([_, value]) => value !== undefined) + ); + + const response = await this.caller.call( + fetch, + `${this.apiUrl}/runs/stats`, + { + method: "POST", + headers: this.headers, + body: JSON.stringify(filteredPayload), + signal: AbortSignal.timeout(this.timeout_ms), + ...this.fetchOptions, + } + ); + + const result = await response.json(); + return result; + } + public async shareRun( runId: string, { shareId }: { shareId?: string } = {} diff --git a/js/src/index.ts b/js/src/index.ts index 75c978d6d..73f1007da 100644 --- a/js/src/index.ts +++ b/js/src/index.ts @@ -12,4 +12,4 @@ export type { export { RunTree, type RunTreeConfig } from "./run_trees.js"; // Update using yarn bump-version -export const __version__ = "0.1.38"; +export const __version__ = "0.1.39"; diff --git a/js/src/tests/client.int.test.ts b/js/src/tests/client.int.test.ts index 7275369aa..29200ce57 100644 --- a/js/src/tests/client.int.test.ts +++ b/js/src/tests/client.int.test.ts @@ -739,3 +739,12 @@ test.concurrent("list runs limit arg works", async () => { } } }); + +test.concurrent("Test run stats", async () => { + const client = new Client(); + const stats = await client.getRunStats({ + projectNames: ["default"], + runType: "llm", + }); + expect(stats).toBeDefined(); +}); diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 203527392..be40dfb02 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -1768,6 +1768,93 @@ def list_runs( if limit is not None and i + 1 >= limit: break + def get_run_stats( + self, + *, + id: Optional[List[ID_TYPE]] = None, + trace: Optional[ID_TYPE] = None, + parent_run: Optional[ID_TYPE] = None, + run_type: Optional[str] = None, + project_names: Optional[List[str]] = None, + project_ids: Optional[List[ID_TYPE]] = None, + reference_example_ids: Optional[List[ID_TYPE]] = None, + start_time: Optional[str] = None, + end_time: Optional[str] = None, + error: Optional[bool] = None, + query: Optional[str] = None, + filter: Optional[str] = None, + trace_filter: Optional[str] = None, + tree_filter: Optional[str] = None, + is_root: Optional[bool] = None, + data_source_type: Optional[str] = None, + ) -> Dict[str, Any]: + """Get aggregate statistics over queried runs. + + Takes in similar query parameters to `list_runs` and returns statistics + based on the runs that match the query. + + Args: + id (Optional[List[ID_TYPE]]): List of run IDs to filter by. + trace (Optional[ID_TYPE]): Trace ID to filter by. + parent_run (Optional[ID_TYPE]): Parent run ID to filter by. + run_type (Optional[str]): Run type to filter by. + projects (Optional[List[ID_TYPE]]): List of session IDs to filter by. + reference_example (Optional[List[ID_TYPE]]): List of reference example IDs to filter by. + start_time (Optional[str]): Start time to filter by. + end_time (Optional[str]): End time to filter by. + error (Optional[bool]): Filter by error status. + query (Optional[str]): Query string to filter by. + filter (Optional[str]): Filter string to apply. + trace_filter (Optional[str]): Trace filter string to apply. + tree_filter (Optional[str]): Tree filter string to apply. + is_root (Optional[bool]): Filter by root run status. + data_source_type (Optional[str]): Data source type to filter by. + + Returns: + Dict[str, Any]: A dictionary containing the run statistics. + """ # noqa: E501 + from concurrent.futures import ThreadPoolExecutor, as_completed # type: ignore + + project_ids = project_ids or [] + if project_names: + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(self.read_project, project_name=name) + for name in project_names + ] + for future in as_completed(futures): + project_ids.append(future.result().id) + payload = { + "id": id, + "trace": trace, + "parent_run": parent_run, + "run_type": run_type, + "session": project_ids, + "reference_example": reference_example_ids, + "start_time": start_time, + "end_time": end_time, + "error": error, + "query": query, + "filter": filter, + "trace_filter": trace_filter, + "tree_filter": tree_filter, + "is_root": is_root, + "data_source_type": data_source_type, + } + + # Remove None values from the payload + payload = {k: v for k, v in payload.items() if v is not None} + + response = self.request_with_retries( + "POST", + "/runs/stats", + request_kwargs={ + "data": _dumps_json(payload), + }, + ) + ls_utils.raise_for_status_with_text(response) + return response.json() + def get_run_url( self, *, @@ -1777,6 +1864,10 @@ def get_run_url( ) -> str: """Get the URL for a run. + Not recommended for use within your agent runtime. + More for use interacting with runs after the fact + for data analysis or ETL workloads. + Parameters ---------- run : Run diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index ea01b257c..89d57da26 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -698,3 +698,10 @@ def test_surrogates(): run_type="llm", end_time=datetime.datetime.now(datetime.timezone.utc), ) + + +def test_runs_stats(): + langchain_client = Client() + # We always have stuff in the "default" project... + stats = langchain_client.get_run_stats(project_names=["default"], run_type="llm") + assert stats diff --git a/python/tests/integration_tests/test_runs.py b/python/tests/integration_tests/test_runs.py index 405571dee..fbf87ea92 100644 --- a/python/tests/integration_tests/test_runs.py +++ b/python/tests/integration_tests/test_runs.py @@ -117,7 +117,6 @@ async def my_run(text: str): filter_ = f'and(eq(metadata_key, "test_run"), eq(metadata_value, "{run_meta}"))' poll_runs_until_count(langchain_client, project_names[0], 1, filter_=filter_) - poll_runs_until_count(langchain_client, project_names[1], 1, filter_=filter_) runs = list( langchain_client.list_runs( project_name=project_names, @@ -296,20 +295,29 @@ async def my_llm(prompt: str) -> str: assert len(runs_) == 8 -async def test_sync_generator(langchain_client: Client): +def test_sync_generator(langchain_client: Client): project_name = "__My Tracer Project - test_sync_generator" - if langchain_client.has_project(project_name): - langchain_client.delete_project(project_name=project_name) + run_meta = uuid.uuid4().hex @traceable(run_type="chain") def my_generator(num: int) -> Generator[str, None, None]: for i in range(num): yield f"Yielded {i}" - results = list(my_generator(5, langsmith_extra=dict(project_name=project_name))) + results = list( + my_generator( + 5, + langsmith_extra=dict( + project_name=project_name, metadata={"test_run": run_meta} + ), + ) + ) assert results == ["Yielded 0", "Yielded 1", "Yielded 2", "Yielded 3", "Yielded 4"] - poll_runs_until_count(langchain_client, project_name, 1, max_retries=20) - runs = list(langchain_client.list_runs(project_name=project_name)) + _filter = f'and(eq(metadata_key, "test_run"), eq(metadata_value, "{run_meta}"))' + poll_runs_until_count( + langchain_client, project_name, 1, max_retries=20, filter_=_filter + ) + runs = list(langchain_client.list_runs(project_name=project_name, filter=_filter)) run = runs[0] assert run.run_type == "chain" assert run.name == "my_generator" @@ -318,10 +326,9 @@ def my_generator(num: int) -> Generator[str, None, None]: } -async def test_sync_generator_reduce_fn(langchain_client: Client): +def test_sync_generator_reduce_fn(langchain_client: Client): project_name = "__My Tracer Project - test_sync_generator_reduce_fn" - if langchain_client.has_project(project_name): - langchain_client.delete_project(project_name=project_name) + run_meta = uuid.uuid4().hex def reduce_fn(outputs: list) -> dict: return {"my_output": " ".join(outputs)} @@ -331,10 +338,20 @@ def my_generator(num: int) -> Generator[str, None, None]: for i in range(num): yield f"Yielded {i}" - results = list(my_generator(5, langsmith_extra=dict(project_name=project_name))) + results = list( + my_generator( + 5, + langsmith_extra=dict( + project_name=project_name, metadata={"test_run": run_meta} + ), + ) + ) + filter_ = f'and(eq(metadata_key, "test_run"), eq(metadata_value, "{run_meta}"))' assert results == ["Yielded 0", "Yielded 1", "Yielded 2", "Yielded 3", "Yielded 4"] - poll_runs_until_count(langchain_client, project_name, 1, max_retries=20) - runs = list(langchain_client.list_runs(project_name=project_name)) + poll_runs_until_count( + langchain_client, project_name, 1, max_retries=20, filter_=filter_ + ) + runs = list(langchain_client.list_runs(project_name=project_name, filter=filter_)) run = runs[0] assert run.run_type == "chain" assert run.name == "my_generator" @@ -347,8 +364,7 @@ def my_generator(num: int) -> Generator[str, None, None]: async def test_async_generator(langchain_client: Client): project_name = "__My Tracer Project - test_async_generator" - if langchain_client.has_project(project_name): - langchain_client.delete_project(project_name=project_name) + run_meta = uuid.uuid4().hex @traceable(run_type="chain") async def my_async_generator(num: int) -> AsyncGenerator[str, None]: @@ -359,7 +375,10 @@ async def my_async_generator(num: int) -> AsyncGenerator[str, None]: results = [ item async for item in my_async_generator( - 5, langsmith_extra=dict(project_name=project_name) + 5, + langsmith_extra=dict( + project_name=project_name, metadata={"test_run": run_meta} + ), ) ] assert results == [ @@ -369,8 +388,11 @@ async def my_async_generator(num: int) -> AsyncGenerator[str, None]: "Async yielded 3", "Async yielded 4", ] - poll_runs_until_count(langchain_client, project_name, 1, max_retries=20) - runs = list(langchain_client.list_runs(project_name=project_name)) + _filter = f'and(eq(metadata_key, "test_run"), eq(metadata_value, "{run_meta}"))' + poll_runs_until_count( + langchain_client, project_name, 1, max_retries=20, filter_=_filter + ) + runs = list(langchain_client.list_runs(project_name=project_name, filter=_filter)) run = runs[0] assert run.run_type == "chain" assert run.name == "my_async_generator" @@ -387,8 +409,7 @@ async def my_async_generator(num: int) -> AsyncGenerator[str, None]: async def test_async_generator_reduce_fn(langchain_client: Client): project_name = "__My Tracer Project - test_async_generator_reduce_fn" - if langchain_client.has_project(project_name): - langchain_client.delete_project(project_name=project_name) + run_meta = uuid.uuid4().hex def reduce_fn(outputs: list) -> dict: return {"my_output": " ".join(outputs)} @@ -402,7 +423,10 @@ async def my_async_generator(num: int) -> AsyncGenerator[str, None]: results = [ item async for item in my_async_generator( - 5, langsmith_extra=dict(project_name=project_name) + 5, + langsmith_extra=dict( + project_name=project_name, metadata={"test_run": run_meta} + ), ) ] assert results == [ @@ -412,11 +436,11 @@ async def my_async_generator(num: int) -> AsyncGenerator[str, None]: "Async yielded 3", "Async yielded 4", ] - + filter_ = f'and(eq(metadata_key, "test_run"), eq(metadata_value, "{run_meta}"))' poll_runs_until_count( - langchain_client, project_name, 1, max_retries=20, sleep_time=5 + langchain_client, project_name, 1, max_retries=20, sleep_time=5, filter_=filter_ ) - runs = list(langchain_client.list_runs(project_name=project_name)) + runs = list(langchain_client.list_runs(project_name=project_name, filter=filter_)) run = runs[0] assert run.run_type == "chain" assert run.name == "my_async_generator"