Skip to content

Commit

Permalink
Stripping trailing null bytes + batch size of 1 = working code
Browse files Browse the repository at this point in the history
  • Loading branch information
obi1kenobi committed Dec 12, 2024
1 parent c26b5a5 commit 321b67a
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 43 deletions.
82 changes: 50 additions & 32 deletions python/bench/tracing_script.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,62 @@
import os

os.environ["LANGCHAIN_PROJECT"] = "llm_messages_test_py"
os.environ["LANGSMITH_USE_PYO3_CLIENT"] = "true"

from langsmith import traceable
from langsmith import Client, traceable

client = Client(
api_url="https://beta.api.smith.langchain.com",
api_key=os.environ["LANGCHAIN_API_KEY"],
)


@traceable
@traceable(client=client)
def format_prompt(subject):
return [
{
"role": "system",
"content": "You are a helpful assistant.",
},
{
"role": "user",
"content": f"What's a good name for a store that sells {subject}?"
}
]

@traceable(run_type="llm")
return [
{
"role": "system",
"content": "You are a helpful assistant.",
},
{
"role": "user",
"content": f"What's a good name for a store that sells {subject}?",
},
]


@traceable(run_type="llm", client=client)
def invoke_llm(messages):
return {
"choices": [
{
"message": {
"role": "assistant",
"content": "Sure, how about 'Rainbow Socks'?"
}
}
]
}

@traceable
return {
"choices": [
{
"message": {
"role": "assistant",
"content": "Sure, how about 'Rainbow Socks'?",
}
}
]
}


@traceable(client=client)
def parse_output(response):
return response["choices"][0]["message"]["content"]
return response["choices"][0]["message"]["content"]


@traceable
@traceable(client=client)
def run_pipeline():
messages = format_prompt("colorful socks")
response = invoke_llm(messages)
return parse_output(response)
messages = format_prompt("colorful socks")
response = invoke_llm(messages)
result = parse_output(response)

import time

time.sleep(2)

return result


if __name__ == "__main__":
run_pipeline()
print("running pipeline")
run_pipeline()
9 changes: 7 additions & 2 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,9 @@ def __init__(

if langsmith_pyo3:
# TODO: tweak these constants as needed
print("using pyo3 client")
queue_capacity = 1_000_000
batch_size = 1
batch_size = 100
batch_timeout_millis = 1000
worker_threads = 1

Expand All @@ -571,6 +572,9 @@ def __init__(
repr(e),
)

if self._pyo3_client is None:
print("NOT using pyo3 client")

self._settings: Union[ls_schemas.LangSmithSettings, None] = None

self._manual_cleanup = False
Expand Down Expand Up @@ -1272,6 +1276,7 @@ def create_run(
"inputs": inputs,
"run_type": run_type,
}
print("RUN_CREATE called", run_create)
if not self._filter_for_sampling([run_create]):
return
if revision_id is not None:
Expand All @@ -1287,8 +1292,8 @@ def create_run(
run_create.get("trace_id") is not None
and run_create.get("dotted_order") is not None
):
print("RUN_CREATE batch", run_create)
if self._pyo3_client is not None:
print("RUN_CREATE", run_create)
self._pyo3_client.create_run(run_create)
elif self.tracing_queue is not None:
serialized_op = serialize_run_dict("post", run_create)
Expand Down
2 changes: 1 addition & 1 deletion python/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{mpsc, Arc, Mutex};
use std::time::{Duration, Instant};
use std::io::Read as _;

use rayon::iter::{IntoParallelIterator, ParallelIterator};
use reqwest::blocking::multipart::{Form, Part};
Expand Down Expand Up @@ -120,11 +121,13 @@ impl RunProcessor {
to_vec(&run_create).unwrap(), // TODO: get rid of unwrap
));

if let Some(inputs) = io.inputs {
if let Some(mut inputs) = io.inputs {
assert_eq!(inputs.pop(), Some(0));
json_data.push((format!("post.{}.inputs", run_id), inputs));
}

if let Some(outputs) = io.outputs {
if let Some(mut outputs) = io.outputs {
assert_eq!(outputs.pop(), Some(0));
json_data.push((format!("post.{}.outputs", run_id), outputs));
}

Expand Down Expand Up @@ -153,7 +156,8 @@ impl RunProcessor {
to_vec(&run_update).unwrap(), // TODO: get rid of unwrap
));

if let Some(outputs) = io.outputs {
if let Some(mut outputs) = io.outputs {
assert_eq!(outputs.pop(), Some(0));
json_data.push((format!("patch.{}.outputs", run_id), outputs));
}

Expand Down Expand Up @@ -191,31 +195,41 @@ impl RunProcessor {
.into_par_iter()
.map(|(part_name, data_bytes)| {
let part_size = data_bytes.len() as u64;
let part2 = Part::bytes(data_bytes.clone())
.mime_str(&format!("application/json; length={}", part_size))?;
let part = Part::bytes(data_bytes)
.mime_str(&format!("application/json; length={}", part_size))?;
Ok::<(String, Part), TracingClientError>((part_name, part))
Ok::<(String, Part, Part), TracingClientError>((part_name, part, part2))
})
.collect::<Result<Vec<_>, TracingClientError>>()?;
// println!("JSON processing took {:?}", start.elapsed());

let mut form2 = Form::new();
let mut form = Form::new();
for (part_name, part) in json_parts.into_iter().chain(attachment_parts) {
for (part_name, part, part2) in json_parts.into_iter() {
form2 = form2.part(part_name.clone(), part2);
form = form.part(part_name, part);
}

let mut reader = form2.reader();
let mut buf = String::new();
reader.read_to_string(&mut buf).expect("error reading multipart form");
dbg!(buf);

// send the multipart POST request
let start_send_batch = Instant::now();
let response = self
let request = dbg!(self
.http_client
.post(format!("{}/runs/multipart", self.config.endpoint))
.multipart(form)
.headers(self.config.headers.as_ref().cloned().unwrap_or_default())
.send()?;
.headers(self.config.headers.as_ref().cloned().unwrap_or_default()));
let response = dbg!(request.send())?;
// println!("Sending batch took {:?}", start_send_batch.elapsed());

if response.status().is_success() {
Ok(())
} else {
eprintln!("tracing error: {response:?}");
Err(TracingClientError::HttpError(response.status()))
}
}
Expand Down

0 comments on commit 321b67a

Please sign in to comment.