diff --git a/python/bench/tracing_script.py b/python/bench/tracing_script.py index 30084db9f..c55aa3f13 100644 --- a/python/bench/tracing_script.py +++ b/python/bench/tracing_script.py @@ -1,44 +1,62 @@ import os + os.environ["LANGCHAIN_PROJECT"] = "llm_messages_test_py" os.environ["LANGSMITH_USE_PYO3_CLIENT"] = "true" -from langsmith import traceable +from langsmith import Client, traceable + +client = Client( + api_url="https://beta.api.smith.langchain.com", + api_key=os.environ["LANGCHAIN_API_KEY"], +) + -@traceable +@traceable(client=client) def format_prompt(subject): - return [ - { - "role": "system", - "content": "You are a helpful assistant.", - }, - { - "role": "user", - "content": f"What's a good name for a store that sells {subject}?" - } - ] - -@traceable(run_type="llm") + return [ + { + "role": "system", + "content": "You are a helpful assistant.", + }, + { + "role": "user", + "content": f"What's a good name for a store that sells {subject}?", + }, + ] + + +@traceable(run_type="llm", client=client) def invoke_llm(messages): - return { - "choices": [ - { - "message": { - "role": "assistant", - "content": "Sure, how about 'Rainbow Socks'?" - } - } - ] -} - -@traceable + return { + "choices": [ + { + "message": { + "role": "assistant", + "content": "Sure, how about 'Rainbow Socks'?", + } + } + ] + } + + +@traceable(client=client) def parse_output(response): - return response["choices"][0]["message"]["content"] + return response["choices"][0]["message"]["content"] + -@traceable +@traceable(client=client) def run_pipeline(): - messages = format_prompt("colorful socks") - response = invoke_llm(messages) - return parse_output(response) + messages = format_prompt("colorful socks") + response = invoke_llm(messages) + result = parse_output(response) + + import time + + time.sleep(2) + + return result + if __name__ == "__main__": - run_pipeline() \ No newline at end of file + print("running pipeline") + run_pipeline() diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 54888db01..ecf620b76 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -550,8 +550,9 @@ def __init__( if langsmith_pyo3: # TODO: tweak these constants as needed + print("using pyo3 client") queue_capacity = 1_000_000 - batch_size = 1 + batch_size = 100 batch_timeout_millis = 1000 worker_threads = 1 @@ -571,6 +572,9 @@ def __init__( repr(e), ) + if self._pyo3_client is None: + print("NOT using pyo3 client") + self._settings: Union[ls_schemas.LangSmithSettings, None] = None self._manual_cleanup = False @@ -1272,6 +1276,7 @@ def create_run( "inputs": inputs, "run_type": run_type, } + print("RUN_CREATE called", run_create) if not self._filter_for_sampling([run_create]): return if revision_id is not None: @@ -1287,8 +1292,8 @@ def create_run( run_create.get("trace_id") is not None and run_create.get("dotted_order") is not None ): + print("RUN_CREATE batch", run_create) if self._pyo3_client is not None: - print("RUN_CREATE", run_create) self._pyo3_client.create_run(run_create) elif self.tracing_queue is not None: serialized_op = serialize_run_dict("post", run_create) diff --git a/python/poetry.lock b/python/poetry.lock index 32a77f0dd..92a48a976 100644 --- a/python/poetry.lock +++ b/python/poetry.lock @@ -2107,4 +2107,4 @@ vcr = [] [metadata] lock-version = "2.0" python-versions = ">=3.9,<4.0" -content-hash = "d8f0d4a97b9e1131e51c992dba0ee0d0d34dcb03b5965cca249950d0bdaea1f2" +content-hash = "2a72dff2f1cc3dfea3f5f7406b3be7bd45c9978fbbb0b3e7e9e927c2e46223ca" diff --git a/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs b/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs index 12d4bdc8d..516a82548 100644 --- a/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs +++ b/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs @@ -1,6 +1,7 @@ use std::sync::mpsc::{Receiver, Sender}; use std::sync::{mpsc, Arc, Mutex}; use std::time::{Duration, Instant}; +use std::io::Read as _; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use reqwest::blocking::multipart::{Form, Part}; @@ -120,11 +121,13 @@ impl RunProcessor { to_vec(&run_create).unwrap(), // TODO: get rid of unwrap )); - if let Some(inputs) = io.inputs { + if let Some(mut inputs) = io.inputs { + assert_eq!(inputs.pop(), Some(0)); json_data.push((format!("post.{}.inputs", run_id), inputs)); } - if let Some(outputs) = io.outputs { + if let Some(mut outputs) = io.outputs { + assert_eq!(outputs.pop(), Some(0)); json_data.push((format!("post.{}.outputs", run_id), outputs)); } @@ -153,7 +156,8 @@ impl RunProcessor { to_vec(&run_update).unwrap(), // TODO: get rid of unwrap )); - if let Some(outputs) = io.outputs { + if let Some(mut outputs) = io.outputs { + assert_eq!(outputs.pop(), Some(0)); json_data.push((format!("patch.{}.outputs", run_id), outputs)); } @@ -191,31 +195,41 @@ impl RunProcessor { .into_par_iter() .map(|(part_name, data_bytes)| { let part_size = data_bytes.len() as u64; + let part2 = Part::bytes(data_bytes.clone()) + .mime_str(&format!("application/json; length={}", part_size))?; let part = Part::bytes(data_bytes) .mime_str(&format!("application/json; length={}", part_size))?; - Ok::<(String, Part), TracingClientError>((part_name, part)) + Ok::<(String, Part, Part), TracingClientError>((part_name, part, part2)) }) .collect::, TracingClientError>>()?; // println!("JSON processing took {:?}", start.elapsed()); + let mut form2 = Form::new(); let mut form = Form::new(); - for (part_name, part) in json_parts.into_iter().chain(attachment_parts) { + for (part_name, part, part2) in json_parts.into_iter() { + form2 = form2.part(part_name.clone(), part2); form = form.part(part_name, part); } + let mut reader = form2.reader(); + let mut buf = String::new(); + reader.read_to_string(&mut buf).expect("error reading multipart form"); + dbg!(buf); + // send the multipart POST request let start_send_batch = Instant::now(); - let response = self + let request = dbg!(self .http_client .post(format!("{}/runs/multipart", self.config.endpoint)) .multipart(form) - .headers(self.config.headers.as_ref().cloned().unwrap_or_default()) - .send()?; + .headers(self.config.headers.as_ref().cloned().unwrap_or_default())); + let response = dbg!(request.send())?; // println!("Sending batch took {:?}", start_send_batch.elapsed()); if response.status().is_success() { Ok(()) } else { + eprintln!("tracing error: {response:?}"); Err(TracingClientError::HttpError(response.status())) } }