Skip to content

Commit

Permalink
compression changes
Browse files Browse the repository at this point in the history
  • Loading branch information
agola11 committed Dec 9, 2024
1 parent 33dc4e1 commit f376e33
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
21 changes: 21 additions & 0 deletions python/langsmith/_internal/_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,27 @@ def serialize_run_dict(
attachments=attachments if attachments is not None else None,
)

def serialize_run_dict_for_compressed_ingest(
operation: Literal["post", "patch"], payload: dict
):
inputs = payload.pop("inputs", None)
outputs = payload.pop("outputs", None)
events = payload.pop("events", None)
attachments = payload.pop("attachments", None)
serialized = ...
extra = ...
return SerializedRunOperation(
operation=operation,
id=payload["id"],
trace_id=payload["trace_id"],
_none=_dumps_json(payload),
inputs=_dumps_json(inputs) if inputs is not None else None,
outputs=_dumps_json(outputs) if outputs is not None else None,
events=_dumps_json(events) if events is not None else None,
attachments=attachments if attachments is not None else None,
)



def combine_serialized_queue_operations(
ops: list[Union[SerializedRunOperation, SerializedFeedbackOperation]],
Expand Down
9 changes: 8 additions & 1 deletion python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,13 @@ def __init__(
if info is None or isinstance(info, ls_schemas.LangSmithInfo)
else ls_schemas.LangSmithInfo(**info)
)
self.compressed_multipart_buffer = ...
weakref.finalize(self, close_session, self.session)
atexit.register(close_session, session_)
# Initialize auto batching
if auto_batch_tracing:
if auto_batch_tracing and _compression_enabled:
...
elif auto_batch_tracing:
self.tracing_queue: Optional[PriorityQueue] = PriorityQueue()

threading.Thread(
Expand Down Expand Up @@ -1295,6 +1298,10 @@ def create_run(
self.tracing_queue.put(
TracingQueueItem(run_create["dotted_order"], serialized_op)
)
elif os.environ["COMP"]:
# Do something different
# Use existing serialized_run_dict

else:
# Neither Rust nor Python batch ingestion is configured,
# fall back to the non-batch approach.
Expand Down

0 comments on commit f376e33

Please sign in to comment.