From c05653e6e2dc066279603553ac6a16a13670726d Mon Sep 17 00:00:00 2001 From: Predrag Gruevski Date: Mon, 9 Dec 2024 22:19:19 +0000 Subject: [PATCH 1/2] Add Rust implementation for Python client's `update_run()` method. --- python/langsmith/client.py | 5 +- .../src/blocking_tracing_client.rs | 14 ++- rust/crates/langsmith-pyo3/src/py_run.rs | 116 +++++++++++++----- 3 files changed, 102 insertions(+), 33 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 1d27e7c99..e422dbec1 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -1752,7 +1752,10 @@ def update_run( data["events"] = events if data["extra"]: self._insert_runtime_env([data]) - if use_multipart and self.tracing_queue is not None: + + if self._pyo3_client is not None: + self._pyo3_client.update_run(data) + elif use_multipart and self.tracing_queue is not None: # not collecting attachments currently, use empty dict serialized_op = serialize_run_dict(operation="patch", payload=data) self.tracing_queue.put( diff --git a/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs b/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs index 284e26a17..77020bd74 100644 --- a/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs +++ b/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs @@ -47,8 +47,7 @@ impl BlockingTracingClient { Ok(Self { client: Arc::from(client) }) } - // N.B.: We use `Py` so that we don't hold the GIL while running this method. - // `slf.get()` below is only valid if the `Self` type is `Sync` and `pyclass(frozen)`, + // N.B.: `slf.get()` below is only valid if the `Self` type is `Sync` and `pyclass(frozen)`, // which is enforced at compile-time. pub fn create_run( slf: &Bound<'_, Self>, @@ -59,6 +58,17 @@ impl BlockingTracingClient { .map_err(|e| into_py_err(slf.py(), e)) } + // N.B.: `slf.get()` below is only valid if the `Self` type is `Sync` and `pyclass(frozen)`, + // which is enforced at compile-time. + pub fn update_run( + slf: &Bound<'_, Self>, + run: super::py_run::RunUpdateExtended, + ) -> PyResult<()> { + let unpacked = slf.get(); + Python::allow_threads(slf.py(), || unpacked.client.submit_run_update(run.into_inner())) + .map_err(|e| into_py_err(slf.py(), e)) + } + pub fn drain(slf: &Bound<'_, Self>) -> PyResult<()> { let unpacked = slf.get(); Python::allow_threads(slf.py(), || unpacked.client.drain()) diff --git a/rust/crates/langsmith-pyo3/src/py_run.rs b/rust/crates/langsmith-pyo3/src/py_run.rs index 9133370fa..51b5e682f 100644 --- a/rust/crates/langsmith-pyo3/src/py_run.rs +++ b/rust/crates/langsmith-pyo3/src/py_run.rs @@ -46,43 +46,40 @@ impl FromPyObject<'_> for RunCreateExtended { } } -fn extract_attachments(value: &Bound<'_, PyAny>) -> PyResult>> { - if value.is_none() { - return Ok(None); - } - - let mapping = value.downcast::()?; +#[derive(Debug)] +pub struct RunUpdateExtended(langsmith_tracing_client::client::RunUpdateExtended); - let size = mapping.len()?; - if size == 0 { - return Ok(None); +impl RunUpdateExtended { + #[inline] + pub(crate) fn into_inner(self) -> langsmith_tracing_client::client::RunUpdateExtended { + self.0 } +} - let mut attachments = Vec::with_capacity(size); - - for result in mapping.items()?.iter()? { - let key_value_pair = result?; +impl FromPyObject<'_> for RunUpdateExtended { + fn extract_bound(value: &Bound<'_, PyAny>) -> PyResult { + let run_update = value.extract::()?.into_inner(); - let key_item = key_value_pair.get_item(0)?; - let key = key_item.extract::<&str>()?; + let attachments = { + if let Ok(attachments_value) = value.get_item(pyo3::intern!(value.py(), "attachments")) + { + extract_attachments(&attachments_value)? + } else { + None + } + }; - // Each value in the attachments dict is a (mime_type, bytes) tuple. - let value = key_value_pair.get_item(1)?; - let value_tuple = value.downcast_exact::()?; - let mime_type_value = value_tuple.get_item(0)?; - let bytes_value = value_tuple.get_item(1)?; + let io = RunIO { + inputs: serialize_optional_dict_value(value, pyo3::intern!(value.py(), "inputs"))?, + outputs: serialize_optional_dict_value(value, pyo3::intern!(value.py(), "outputs"))?, + }; - attachments.push(Attachment { - // TODO: It's unclear whether the key in the attachments dict is - // the `filename`` or the `ref_name`, and where the other one is coming from. - ref_name: key.to_string(), - filename: key.to_string(), - data: bytes_value.extract()?, - content_type: mime_type_value.extract()?, - }); + Ok(Self(langsmith_tracing_client::client::RunUpdateExtended { + run_update, + io, + attachments, + })) } - - Ok(Some(attachments)) } #[derive(Debug)] @@ -137,6 +134,26 @@ impl FromPyObject<'_> for RunCreate { } } +#[derive(Debug)] +pub(crate) struct RunUpdate(langsmith_tracing_client::client::RunUpdate); + +impl FromPyObject<'_> for RunUpdate { + fn extract_bound(value: &Bound<'_, PyAny>) -> PyResult { + let common = RunCommon::extract_bound(value)?.into_inner(); + + let end_time = extract_time_value(&value.get_item(pyo3::intern!(value.py(), "end_time"))?)?; + + Ok(Self(langsmith_tracing_client::client::RunUpdate { common, end_time })) + } +} + +impl RunUpdate { + #[inline] + pub(crate) fn into_inner(self) -> langsmith_tracing_client::client::RunUpdate { + self.0 + } +} + #[derive(Debug)] pub(crate) struct RunCommon(langsmith_tracing_client::client::RunCommon); @@ -196,6 +213,45 @@ impl FromPyObject<'_> for RunCommon { } } +fn extract_attachments(value: &Bound<'_, PyAny>) -> PyResult>> { + if value.is_none() { + return Ok(None); + } + + let mapping = value.downcast::()?; + + let size = mapping.len()?; + if size == 0 { + return Ok(None); + } + + let mut attachments = Vec::with_capacity(size); + + for result in mapping.items()?.iter()? { + let key_value_pair = result?; + + let key_item = key_value_pair.get_item(0)?; + let key = key_item.extract::<&str>()?; + + // Each value in the attachments dict is a (mime_type, bytes) tuple. + let value = key_value_pair.get_item(1)?; + let value_tuple = value.downcast_exact::()?; + let mime_type_value = value_tuple.get_item(0)?; + let bytes_value = value_tuple.get_item(1)?; + + attachments.push(Attachment { + // TODO: It's unclear whether the key in the attachments dict is + // the `filename`` or the `ref_name`, and where the other one is coming from. + ref_name: key.to_string(), + filename: key.to_string(), + data: bytes_value.extract()?, + content_type: mime_type_value.extract()?, + }); + } + + Ok(Some(attachments)) +} + /// Get an optional string from a Python `None`, string, or string-like object such as a UUID value. fn extract_string_like_or_none(value: Option<&Bound<'_, PyAny>>) -> PyResult> { match value { From 3928a34c2c0023fcc5867d0a205fa2918df943d1 Mon Sep 17 00:00:00 2001 From: Predrag Gruevski Date: Mon, 9 Dec 2024 23:13:43 +0000 Subject: [PATCH 2/2] Disable attachments in `update_run()` for now. --- rust/crates/langsmith-pyo3/src/py_run.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/rust/crates/langsmith-pyo3/src/py_run.rs b/rust/crates/langsmith-pyo3/src/py_run.rs index 51b5e682f..9725c5cd8 100644 --- a/rust/crates/langsmith-pyo3/src/py_run.rs +++ b/rust/crates/langsmith-pyo3/src/py_run.rs @@ -60,14 +60,17 @@ impl FromPyObject<'_> for RunUpdateExtended { fn extract_bound(value: &Bound<'_, PyAny>) -> PyResult { let run_update = value.extract::()?.into_inner(); - let attachments = { - if let Ok(attachments_value) = value.get_item(pyo3::intern!(value.py(), "attachments")) - { - extract_attachments(&attachments_value)? - } else { - None - } - }; + // TODO: attachments are WIP at the moment, ignore them here for now. + // + // let attachments = { + // if let Ok(attachments_value) = value.get_item(pyo3::intern!(value.py(), "attachments")) + // { + // extract_attachments(&attachments_value)? + // } else { + // None + // } + // }; + let attachments = None; let io = RunIO { inputs: serialize_optional_dict_value(value, pyo3::intern!(value.py(), "inputs"))?,