Skip to content

Commit

Permalink
improve benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
agola11 committed Oct 23, 2024
1 parent d8e98d2 commit 01b6305
Showing 1 changed file with 76 additions and 97 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion};
use tokio::runtime::Runtime;
use std::time::Duration;
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use langsmith_tracing_client::client::run::{
Attachment, RunCommon, RunCreate, RunCreateExtended, RunIO, TimeValue,
};
use langsmith_tracing_client::client::tracing_client::{ClientConfig, TracingClient};
use langsmith_tracing_client::client::run::{RunCreateExtended, RunCreate, RunIO, Attachment, TimeValue, RunCommon};
use mockito::{ Server};
use mockito::Server;
use std::time::Duration;
use tokio::runtime::Runtime;

fn create_mock_client_config(server_url: &str) -> ClientConfig {
fn create_mock_client_config(server_url: &str, batch_size: usize) -> ClientConfig {
ClientConfig {
endpoint: server_url.to_string(),
queue_capacity: 10000,
batch_size: 50,
queue_capacity: 1_000_000,
batch_size,
batch_timeout: Duration::from_secs(1),
headers: Default::default(),
}
}

fn create_run_create(attachments: Option<Vec<Attachment>>, inputs: Option<serde_json::Value>, outputs: Option<serde_json::Value>) -> RunCreateExtended {
fn create_run_create(
attachments: Option<Vec<Attachment>>,
inputs: Option<serde_json::Value>,
outputs: Option<serde_json::Value>,
) -> RunCreateExtended {
RunCreateExtended {
run_create: RunCreate {
common: RunCommon {
Expand All @@ -38,23 +44,22 @@ fn create_run_create(attachments: Option<Vec<Attachment>>, inputs: Option<serde_
reference_example_id: None,
},
attachments,
io: RunIO {
inputs,
outputs,
},
io: RunIO { inputs, outputs },
}
}

fn create_large_json() -> serde_json::Value {
let large_array: Vec<serde_json::Value> = (0..1_000)
.map(|i| serde_json::json!({
"index": i,
"data": format!("This is element number {}", i),
"nested": {
"id": i,
"value": format!("Nested value for element {}", i),
}
}))
fn create_large_json(len: usize) -> serde_json::Value {
let large_array: Vec<serde_json::Value> = (0..len)
.map(|i| {
serde_json::json!({
"index": i,
"data": format!("This is element number {}", i),
"nested": {
"id": i,
"value": format!("Nested value for element {}", i),
}
})
})
.collect();

serde_json::json!({
Expand All @@ -68,50 +73,7 @@ fn create_large_json() -> serde_json::Value {
}
})
}

fn bench_single_request_without_attachments(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let server = rt.block_on(async {
let mut server = Server::new_async().await;
server
.mock("POST", "/runs/multipart")
.with_status(202)
.create_async()
.await;
server
});

let client_config = create_mock_client_config(&server.url());
let client = rt.block_on(async {
TracingClient::new(client_config).unwrap()
});

c.bench_function("simple run create", |b| {
b.to_async(&rt).iter_batched(
|| create_run_create(None, Some(serde_json::json!({"input": "value"})), Some(serde_json::json!({"output": "value"}))),
|run_create| async {
client.submit_run_create(black_box(run_create)).await.unwrap()
},
BatchSize::SmallInput,
);
});

c.bench_function("run create with large i/o", |b| {
b.to_async(&rt).iter_batched(
|| create_run_create(None, Some(create_large_json()), Some(create_large_json())),
|run_create| async {
client.submit_run_create(black_box(run_create)).await.unwrap()
},
BatchSize::LargeInput,
);
});

rt.block_on(async {
client.shutdown().await.unwrap();
});
}

fn bench_shutdown_large_buffer(c: &mut Criterion) {
fn bench_run_create(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let server = rt.block_on(async {
let mut server = Server::new_async().await;
Expand All @@ -123,39 +85,56 @@ fn bench_shutdown_large_buffer(c: &mut Criterion) {
server
});

let num_runs = 1000;

c.bench_function("client shutdown with large buffer", |b| {
b.to_async(&rt).iter_batched(
|| {
let runs: Vec<RunCreateExtended> = (0..num_runs)
.map(|i| {
let mut run = create_run_create(None, Some(create_large_json()), Some(create_large_json()));
run.run_create.common.id = format!("test_id_{}", i);
run
})
.collect();
let client_config = create_mock_client_config(&server.url());
let client = TracingClient::new(client_config).unwrap();
(client, runs)
},
|(client, runs)| async {
for run in runs {
client.submit_run_create(run).await.unwrap();
}
client.shutdown().await.unwrap();
},
BatchSize::LargeInput,
);
});
let mut group = c.benchmark_group("run_create");
for batch_size in vec![50] {
for json_len in vec![1_000, 5_000] {
for num_runs in vec![500, 1_000] {
group.bench_with_input(
BenchmarkId::new(
"run_create",
format!("batch_{}_json_{}_runs_{}", batch_size, json_len, num_runs),
),
&(batch_size, json_len, num_runs),
|b, &(batch_size, json_len, num_runs)| {
b.to_async(&rt).iter_batched(
|| {
let runs: Vec<RunCreateExtended> = (0..num_runs)
.map(|i| {
let mut run = create_run_create(
None,
Some(create_large_json(json_len)),
Some(create_large_json(json_len)),
);
run.run_create.common.id = format!("test_id_{}", i);
run
})
.collect();
let client_config =
create_mock_client_config(&server.url(), batch_size);
let client = TracingClient::new(client_config).unwrap();
(client, runs)
},
|(client, runs)| async {
for run in runs {
client.submit_run_create(run).await.unwrap();
}
// shutdown the client to flush the queue
client.shutdown().await.unwrap();
},
BatchSize::LargeInput,
);
},
);
}
}
}
group.finish();
}

//criterion_group!(benches, bench_single_request_without_attachments);
//criterion_group!(benches, bench_shutdown_large_buffer);
criterion_group!{
criterion_group! {
name = benches;
config = Criterion::default().sample_size(10);
targets = bench_shutdown_large_buffer
targets = bench_run_create
}

criterion_main!(benches);
criterion_main!(benches);

0 comments on commit 01b6305

Please sign in to comment.