Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ankush/pyo3 #1139

Merged
merged 4 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions python/bench/json_serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import orjson
import timeit
from concurrent.futures import ThreadPoolExecutor, as_completed

def create_large_json(length):
large_array = [
{
"index": i,
"data": f"This is element number {i}",
"nested": {
"id": i,
"value": f"Nested value for element {i}"
}
}
for i in range(length)
]

return {
"name": "Huge JSON",
"description": "This is a very large JSON object for benchmarking purposes.",
"array": large_array,
"metadata": {
"created_at": "2024-10-22T19:00:00Z",
"author": "Python Program",
"version": 1.0
}
}


def benchmark_sequential(data):
return [orjson.dumps(json_obj) for json_obj in data]


def benchmark_parallel(data):
with ThreadPoolExecutor() as executor:
futures = [executor.submit(orjson.dumps, json_obj) for json_obj in data]
return [future.result() for future in as_completed(futures)]


def benchmark_with_map(data):
with ThreadPoolExecutor() as executor:
return list(executor.map(orjson.dumps, data))

def benchmark_no_return(data):
for json_obj in data:
orjson.dumps(json_obj)


num_json_objects = 2000
json_length = 3000
data = [create_large_json(json_length) for _ in range(num_json_objects)]

print("Sequential serialization:")
print(timeit.timeit(lambda: benchmark_sequential(data), number=10))

print("Parallel serialization with ThreadPoolExecutor:")
print(timeit.timeit(lambda: benchmark_parallel(data), number=10))

print("Parallel serialization with map:")
print(timeit.timeit(lambda: benchmark_with_map(data), number=15))

print("Parallel serialization without return:")
print(timeit.timeit(lambda: benchmark_no_return(data), number=15))
109 changes: 109 additions & 0 deletions python/bench/tracing_client_bench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import statistics
import time
from typing import Dict
from unittest.mock import Mock
from uuid import uuid4

import pytest

from langsmith.client import Client


def create_large_json(length: int) -> Dict:
"""Create a large JSON object for benchmarking purposes."""
large_array = [
{
"index": i,
"data": f"This is element number {i}",
"nested": {"id": i, "value": f"Nested value for element {i}"},
}
for i in range(length)
]

return {
"name": "Huge JSON",
"description": "This is a very large JSON object for benchmarking purposes.",
"array": large_array,
"metadata": {
"created_at": "2024-10-22T19:00:00Z",
"author": "Python Program",
"version": 1.0,
},
}


def create_run_data(run_id: str, json_size: int) -> Dict:
"""Create a single run data object."""
return {
"name": "Run Name",
"id": run_id,
"run_type": "chain",
"inputs": create_large_json(json_size),
"outputs": create_large_json(json_size),
"extra": {"extra_data": "value"},
"trace_id": "trace_id",
"dotted_order": "1.1",
"tags": ["tag1", "tag2"],
"session_name": "Session Name",
}


def benchmark_run_creation(num_runs: int, json_size: int, samples: int = 1) -> Dict:
"""
Benchmark run creation with specified parameters.
Returns timing statistics.
"""
timings = []

for _ in range(samples):
runs = [create_run_data(str(uuid4()), json_size) for i in range(num_runs)]

mock_session = Mock()
mock_response = Mock()
mock_response.status_code = 202
mock_response.text = "Accepted"
mock_response.json.return_value = {"status": "success"}
mock_session.request.return_value = mock_response
client = Client(session=mock_session, api_key="xxx")

start = time.perf_counter()
for run in runs:
client.create_run(**run)

# wait for client.tracing_queue to be empty
client.tracing_queue.join()

elapsed = time.perf_counter() - start

timings.append(elapsed)

return {
"mean": statistics.mean(timings),
"median": statistics.median(timings),
"stdev": statistics.stdev(timings) if len(timings) > 1 else 0,
"min": min(timings),
"max": max(timings),
}


json_size = 3_000
num_runs = 1000

def main(json_size: int, num_runs: int):
"""
Run benchmarks with different combinations of parameters and report results.
"""

results = benchmark_run_creation(num_runs=num_runs, json_size=json_size)

print(f"\nBenchmark Results for {num_runs} runs with JSON size {json_size}:")
print(f"Mean time: {results['mean']:.4f} seconds")
print(f"Median time: {results['median']:.4f} seconds")
print(f"Std Dev: {results['stdev']:.4f} seconds")
print(f"Min time: {results['min']:.4f} seconds")
print(f"Max time: {results['max']:.4f} seconds")
print(f"Throughput: {num_runs / results['mean']:.2f} runs/second")


if __name__ == "__main__":
main(json_size, num_runs)
21 changes: 18 additions & 3 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,7 @@ def create_run(
}
if not self._filter_for_sampling([run_create]):
return
run_create = self._run_transform(run_create, copy=True)
run_create = self._run_transform(run_create, copy=False)
if revision_id is not None:
run_create["extra"]["metadata"]["revision_id"] = revision_id
if (
Expand Down Expand Up @@ -1656,6 +1656,7 @@ def multipart_ingest_runs(
("events", payload.pop("events", None)),
]
# encode the main run payload
# print("Dumping payload for run in multipart ingest runs")
payloadb = _dumps_json(payload)
acc_parts.append(
(
Expand All @@ -1672,6 +1673,7 @@ def multipart_ingest_runs(
for key, value in fields:
if value is None:
continue
# print("Dumping payload in multipart ingest runs", event)
valb = _dumps_json(value)
acc_parts.append(
(
Expand Down Expand Up @@ -5779,22 +5781,31 @@ def _tracing_thread_handle_batch(
client: Client,
tracing_queue: Queue,
batch: List[TracingQueueItem],
use_multipart: bool,
use_multipart: bool = True,
) -> None:
print("Handling a batch of size", len(batch))
create = [it.item for it in batch if it.action == "create"]
update = [it.item for it in batch if it.action == "update"]
try:
if use_multipart:
if True:
print("Using multipart endpoint")
now = time.perf_counter()
client.multipart_ingest_runs(create=create, update=update, pre_sampled=True)
print("Multipart ingest took", time.perf_counter() - now)
else:
print("Using batch endpoint")
now = time.time()
client.batch_ingest_runs(create=create, update=update, pre_sampled=True)
print("Batch ingest took", time.time() - now)
except Exception:
print("Error in tracing queue")
logger.error("Error in tracing queue", exc_info=True)
# exceptions are logged elsewhere, but we need to make sure the
# background thread continues to run
pass
finally:
for _ in batch:
# print("marking task as done")
tracing_queue.task_done()


Expand Down Expand Up @@ -5856,17 +5867,20 @@ def _tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None:
len(sub_threads) < scale_up_nthreads_limit
and tracing_queue.qsize() > scale_up_qsize_trigger
):
print("Scaling up...")
new_thread = threading.Thread(
target=_tracing_sub_thread_func,
args=(weakref.ref(client), use_multipart),
)
sub_threads.append(new_thread)
new_thread.start()
print("Started new thread, there are now", len(sub_threads), "threads")
if next_batch := _tracing_thread_drain_queue(tracing_queue, limit=size_limit):
_tracing_thread_handle_batch(
client, tracing_queue, next_batch, use_multipart
)
# drain the queue on exit
print("Draining queue on exit")
while next_batch := _tracing_thread_drain_queue(
tracing_queue, limit=size_limit, block=False
):
Expand Down Expand Up @@ -5909,6 +5923,7 @@ def _tracing_sub_thread_func(
seen_successive_empty_queues += 1

# drain the queue on exit
print("Draining queue on exit")
while next_batch := _tracing_thread_drain_queue(
tracing_queue, limit=size_limit, block=False
):
Expand Down
1 change: 1 addition & 0 deletions rust/crates/langsmith_tracing_client/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions rust/crates/langsmith_tracing_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ tokio-util = "0.7.12"
tempfile = "3.13.0"
pyo3-asyncio = { version = "0.20.0", features = ["testing", "attributes", "async-std-runtime", "tokio-runtime"] }
pyo3 = "0.20.3"
rayon = "1.10.0"

[dev-dependencies]
multipart = "0.18.0"
Expand All @@ -24,3 +25,7 @@ criterion = { version = "0.5.1", features = ["async_tokio"] }
[[bench]]
name = "tracing_client_benchmark"
harness = false

[[bench]]
name = "json_serialization_benchmark"
harness = false
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use rayon::prelude::*;
use serde_json::Value;
use criterion::{black_box, criterion_group, criterion_main, Criterion};

fn create_large_json(len: usize) -> Value {
let large_array: Vec<Value> = (0..len)
.map(|i| {
serde_json::json!({
"index": i,
"data": format!("This is element number {}", i),
"nested": {
"id": i,
"value": format!("Nested value for element {}", i),
}
})
})
.collect();

serde_json::json!({
"name": "Huge JSON",
"description": "This is a very large JSON object for benchmarking purposes.",
"array": large_array,
"metadata": {
"created_at": "2024-10-22T19:00:00Z",
"author": "Rust Program",
"version": 1.0
}
})
}

// Sequential processing
fn benchmark_sequential(data: &[Value]) -> Vec<Vec<u8>> {
data.iter()
.map(|json| serde_json::to_vec(json).expect("Failed to serialize JSON"))
.collect()
}

// Parallel processing
fn benchmark_parallel(data: &[Value]) -> Vec<Vec<u8>> {
data.par_iter()
.map(|json| serde_json::to_vec(json).expect("Failed to serialize JSON"))
.collect()
}

// into par iter
fn benchmark_into_par_iter(data: &[Value]) -> Vec<Vec<u8>> {
let start = std::time::Instant::now();
let meow = data.into_par_iter()
.map(|json| serde_json::to_vec(&json).expect("Failed to serialize JSON"))
.collect();
println!("into_par_iter: {:?}", start.elapsed());
meow
}

fn json_benchmark(c: &mut Criterion) {
let num_json_objects = 2000;
let json_length = 3000;
let data: Vec<Value> = (0..num_json_objects)
.map(|_| create_large_json(json_length))
.collect();

c.bench_function("sequential serialization", |b| {
b.iter(|| benchmark_sequential(&data))
});

c.bench_function("parallel serialization", |b| {
b.iter(|| benchmark_parallel(&data))
});

c.bench_function("into par iter serialization", |b| {
b.iter(|| benchmark_into_par_iter(black_box(&data)))
});
}

criterion_group! {
name = benches;
config = Criterion::default().sample_size(10);
targets = json_benchmark
}
criterion_main!(benches);
Loading
Loading