-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Nc/1oct/py prepare multipart #1054
Conversation
nfcampos
commented
Oct 1, 2024
- Add requests-toolbelt dep
- Use custom http adapter
- subclass so that we can override blocksize (default one too small to maximize performance on modern network stacks)
- override default pool size to match the max nr of batch tarcing threads (otherwise some threads would be left waiting for connections to be available)
- actually mount the adapter (previous behavior was never mounting the adapter because sessions always have http and https adapters)
- note before retry policy was being ignored, and no transport level retries were being done
- Add attachments field to run dict/model
- update _run_transform to collect/drop attachments
- warn if attachments are passed to an endpoint that can't accept them, in this case inject runs without attachments
- subclass so that we can override blocksize (default one too small to maximize performance on modern network stacks) - override default pool size to match the max nr of batch tarcing threads (otherwise some threads would be left waiting for connections to be available) - actually mount the adapter (previous behavior was never mounting the adapter because sessions always have http and https adapters)
- update _run_transform to collect/drop attachments - warn if attachments are passed to an endpoint that can't accept them, in this case inject runs without attachments
- Use streaming multipart encoder form requests_toolbelt - Currently dump each part to json before sending the request as that's the only way to enforce the payload size limit - When we lift payload size limit we should implement true streaming encoding, where each part is only encoded immediately before being sent over the connection, and use transfer-encoding: chunked
- Use streaming multipart encoder form requests_toolbelt - Currently dump each part to json before sending the request as that's the only way to enforce the payload size limit - When we lift payload size limit we should implement true streaming encoding, where each part is only encoded immediately before being sent over the connection, and use transfer-encoding: chunked
@@ -435,6 +443,34 @@ class TracingQueueItem: | |||
item: Any = field(compare=False) | |||
|
|||
|
|||
class _LangSmithHttpAdapter(requests_adapters.HTTPAdapter): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for the PR but I wonder if we should be running simple tests on our min/max bounds for requests lib here since we are taking on some maintanence burdne by subclassing here; just like 1 unit test to check on bounds
@@ -781,7 +821,7 @@ def request_with_retries( | |||
ls_utils.FilterPoolFullWarning(host=str(self._host)), | |||
] | |||
retry_on_: Tuple[Type[BaseException], ...] = ( | |||
*(retry_on or []), | |||
*(retry_on or ()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*(retry_on or ()), | |
*(retry_on or EMPTY_SEQ), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this one works, because its the wrong type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okok cool
# Check that no duplicate run_ids are present in the request bodies | ||
assert len(request_bodies) == len(set([body["id"] for body in request_bodies])) | ||
# Check that no duplicate run_ids are present in the request bodies | ||
assert len(request_bodies) == len(set([body["id"] for body in request_bodies])) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likely stupid, but maybe a quick test on multipart_ingest_runs
on two empty lists (or just one empty list) if that's not done elsewhere
self._insert_runtime_env(create_dicts) | ||
self._insert_runtime_env(update_dicts) | ||
# check size limit | ||
size_limit_bytes = (self.info.batch_ingest_config or {}).get( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also have an EMPTY_DICT
? (or just a frozen dict-like)
from types import MappingProxyType
EMPTY_DICT = MappingProxyType({})
Co-authored-by: William FH <[email protected]>