Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cross platform tracing client written in rust #1107

Merged
merged 32 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3a03725
add minimal test
agola11 Oct 16, 2024
0be9ec1
add struct definitions and tests
agola11 Oct 17, 2024
b491b09
lint and format
agola11 Oct 17, 2024
dac80dd
compilation
agola11 Oct 17, 2024
f212ff8
all tests passing
agola11 Oct 18, 2024
0a7d49e
use shutdown method instead of janky sleep
agola11 Oct 18, 2024
471e654
comments
agola11 Oct 18, 2024
b7eefed
cleanup
agola11 Oct 18, 2024
f5d0ade
move more stuff to common
agola11 Oct 19, 2024
77f3dc5
make the vec<u8> optional
agola11 Oct 19, 2024
8065b3d
something kind of working with streaming
agola11 Oct 19, 2024
b7c5508
inspect multipart form
agola11 Oct 19, 2024
f7e3e2f
fix some issues
agola11 Oct 21, 2024
85315cd
use arc::mutex to capture request body
agola11 Oct 21, 2024
a9d71ad
use bytes
agola11 Oct 21, 2024
5ba4b10
finish writing assertions
agola11 Oct 21, 2024
5f5f768
avoid excessive clones
agola11 Oct 21, 2024
d8e020d
remove excessive clear
agola11 Oct 21, 2024
2cc93f3
fix tests
agola11 Oct 22, 2024
8ad8dbf
add length header
agola11 Oct 22, 2024
d24a87b
add a directory for bindings
agola11 Oct 22, 2024
4cc7d92
several fixes
agola11 Oct 23, 2024
d8e98d2
add benchmarks
agola11 Oct 23, 2024
01b6305
improve benchmarks
agola11 Oct 23, 2024
69398a2
use custom_iter
agola11 Oct 24, 2024
4774da2
use custom_iter
agola11 Oct 24, 2024
14f5817
baseline
agola11 Oct 24, 2024
3a21a41
benchmark for run_bytes
agola11 Oct 25, 2024
29aa6a1
add json serialization benchmarks
agola11 Oct 25, 2024
d92f19d
more perf stuff
agola11 Oct 26, 2024
df21de0
more prints
agola11 Oct 26, 2024
3459434
Ankush/pyo3 (#1139)
agola11 Oct 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions python/bench/json_serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import orjson
import timeit
from concurrent.futures import ThreadPoolExecutor, as_completed

def create_large_json(length):
large_array = [
{
"index": i,
"data": f"This is element number {i}",
"nested": {
"id": i,
"value": f"Nested value for element {i}"
}
}
for i in range(length)
]

return {
"name": "Huge JSON",
"description": "This is a very large JSON object for benchmarking purposes.",
"array": large_array,
"metadata": {
"created_at": "2024-10-22T19:00:00Z",
"author": "Python Program",
"version": 1.0
}
}


def benchmark_sequential(data):
return [orjson.dumps(json_obj) for json_obj in data]


def benchmark_parallel(data):
with ThreadPoolExecutor() as executor:
futures = [executor.submit(orjson.dumps, json_obj) for json_obj in data]
return [future.result() for future in as_completed(futures)]


def benchmark_with_map(data):
with ThreadPoolExecutor() as executor:
return list(executor.map(orjson.dumps, data))

def benchmark_no_return(data):
for json_obj in data:
orjson.dumps(json_obj)


num_json_objects = 2000
json_length = 3000
data = [create_large_json(json_length) for _ in range(num_json_objects)]

print("Sequential serialization:")
print(timeit.timeit(lambda: benchmark_sequential(data), number=10))

print("Parallel serialization with ThreadPoolExecutor:")
print(timeit.timeit(lambda: benchmark_parallel(data), number=10))

print("Parallel serialization with map:")
print(timeit.timeit(lambda: benchmark_with_map(data), number=15))

print("Parallel serialization without return:")
print(timeit.timeit(lambda: benchmark_no_return(data), number=15))
109 changes: 109 additions & 0 deletions python/bench/tracing_client_bench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import statistics
import time
from typing import Dict
from unittest.mock import Mock
from uuid import uuid4

import pytest

from langsmith.client import Client


def create_large_json(length: int) -> Dict:
"""Create a large JSON object for benchmarking purposes."""
large_array = [
{
"index": i,
"data": f"This is element number {i}",
"nested": {"id": i, "value": f"Nested value for element {i}"},
}
for i in range(length)
]

return {
"name": "Huge JSON",
"description": "This is a very large JSON object for benchmarking purposes.",
"array": large_array,
"metadata": {
"created_at": "2024-10-22T19:00:00Z",
"author": "Python Program",
"version": 1.0,
},
}


def create_run_data(run_id: str, json_size: int) -> Dict:
"""Create a single run data object."""
return {
"name": "Run Name",
"id": run_id,
"run_type": "chain",
"inputs": create_large_json(json_size),
"outputs": create_large_json(json_size),
"extra": {"extra_data": "value"},
"trace_id": "trace_id",
"dotted_order": "1.1",
"tags": ["tag1", "tag2"],
"session_name": "Session Name",
}


def benchmark_run_creation(num_runs: int, json_size: int, samples: int = 1) -> Dict:
"""
Benchmark run creation with specified parameters.
Returns timing statistics.
"""
timings = []

for _ in range(samples):
runs = [create_run_data(str(uuid4()), json_size) for i in range(num_runs)]

mock_session = Mock()
mock_response = Mock()
mock_response.status_code = 202
mock_response.text = "Accepted"
mock_response.json.return_value = {"status": "success"}
mock_session.request.return_value = mock_response
client = Client(session=mock_session, api_key="xxx")

start = time.perf_counter()
for run in runs:
client.create_run(**run)

# wait for client.tracing_queue to be empty
client.tracing_queue.join()

elapsed = time.perf_counter() - start

timings.append(elapsed)

return {
"mean": statistics.mean(timings),
"median": statistics.median(timings),
"stdev": statistics.stdev(timings) if len(timings) > 1 else 0,
"min": min(timings),
"max": max(timings),
}


json_size = 3_000
num_runs = 1000

def main(json_size: int, num_runs: int):
"""
Run benchmarks with different combinations of parameters and report results.
"""

results = benchmark_run_creation(num_runs=num_runs, json_size=json_size)

print(f"\nBenchmark Results for {num_runs} runs with JSON size {json_size}:")
print(f"Mean time: {results['mean']:.4f} seconds")
print(f"Median time: {results['median']:.4f} seconds")
print(f"Std Dev: {results['stdev']:.4f} seconds")
print(f"Min time: {results['min']:.4f} seconds")
print(f"Max time: {results['max']:.4f} seconds")
print(f"Throughput: {num_runs / results['mean']:.2f} runs/second")


if __name__ == "__main__":
main(json_size, num_runs)
21 changes: 18 additions & 3 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,7 @@ def create_run(
}
if not self._filter_for_sampling([run_create]):
return
run_create = self._run_transform(run_create, copy=True)
run_create = self._run_transform(run_create, copy=False)
if revision_id is not None:
run_create["extra"]["metadata"]["revision_id"] = revision_id
if (
Expand Down Expand Up @@ -1656,6 +1656,7 @@ def multipart_ingest_runs(
("events", payload.pop("events", None)),
]
# encode the main run payload
# print("Dumping payload for run in multipart ingest runs")
payloadb = _dumps_json(payload)
acc_parts.append(
(
Expand All @@ -1672,6 +1673,7 @@ def multipart_ingest_runs(
for key, value in fields:
if value is None:
continue
# print("Dumping payload in multipart ingest runs", event)
valb = _dumps_json(value)
acc_parts.append(
(
Expand Down Expand Up @@ -5779,22 +5781,31 @@ def _tracing_thread_handle_batch(
client: Client,
tracing_queue: Queue,
batch: List[TracingQueueItem],
use_multipart: bool,
use_multipart: bool = True,
) -> None:
print("Handling a batch of size", len(batch))
create = [it.item for it in batch if it.action == "create"]
update = [it.item for it in batch if it.action == "update"]
try:
if use_multipart:
if True:
print("Using multipart endpoint")
now = time.perf_counter()
client.multipart_ingest_runs(create=create, update=update, pre_sampled=True)
print("Multipart ingest took", time.perf_counter() - now)
else:
print("Using batch endpoint")
now = time.time()
client.batch_ingest_runs(create=create, update=update, pre_sampled=True)
print("Batch ingest took", time.time() - now)
except Exception:
print("Error in tracing queue")
logger.error("Error in tracing queue", exc_info=True)
# exceptions are logged elsewhere, but we need to make sure the
# background thread continues to run
pass
finally:
for _ in batch:
# print("marking task as done")
tracing_queue.task_done()


Expand Down Expand Up @@ -5856,17 +5867,20 @@ def _tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None:
len(sub_threads) < scale_up_nthreads_limit
and tracing_queue.qsize() > scale_up_qsize_trigger
):
print("Scaling up...")
new_thread = threading.Thread(
target=_tracing_sub_thread_func,
args=(weakref.ref(client), use_multipart),
)
sub_threads.append(new_thread)
new_thread.start()
print("Started new thread, there are now", len(sub_threads), "threads")
if next_batch := _tracing_thread_drain_queue(tracing_queue, limit=size_limit):
_tracing_thread_handle_batch(
client, tracing_queue, next_batch, use_multipart
)
# drain the queue on exit
print("Draining queue on exit")
while next_batch := _tracing_thread_drain_queue(
tracing_queue, limit=size_limit, block=False
):
Expand Down Expand Up @@ -5909,6 +5923,7 @@ def _tracing_sub_thread_func(
seen_successive_empty_queues += 1

# drain the queue on exit
print("Draining queue on exit")
while next_batch := _tracing_thread_drain_queue(
tracing_queue, limit=size_limit, block=False
):
Expand Down
Loading