-
Notifications
You must be signed in to change notification settings - Fork 802
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prevent FlightData overflowing max size limit whenever possible. #6690
base: master
Are you sure you want to change the base?
Prevent FlightData overflowing max size limit whenever possible. #6690
Conversation
}; | ||
|
||
match array_data.data_type() { | ||
DataType::Dictionary(_, _) => Ok(()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is something that I wasn't able to figure out about this encoding process - it seems we don't write the child_data
for dictionaries into the encoded message, but that's where all the values of the dictionary are. Without this, we only have the keys written. Does anyone know why this is?
It looks like CI failed due to some network flakiness - I'm going to close and reopen to try it again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @itsjunetime
I went over this PR carefully.
I think this is a very nice improvement -- I think the reason it is so large is that it makes the slice size calculation more accurate, which seems like a good think to me
One thing that would (selfishly) help me review these PRs is if you could annotate anything "non obvuous" with the rationale of why.
Examples of non obvious changes are:
- The change to bit_slice
- Switching the code back to use the (deprecated) method
flight_data_from_arrow_batch
BTW what do you think we should do about flight_data_from_arrow_batch
given that you changed a bunch of code in this PR to use it it seems like the suggested alternatives are not a good fit?
/// otherwise a new buffer is allocated and filled with a copy of the bits in the range. | ||
pub fn bit_slice(&self, offset: usize, len: usize) -> Self { | ||
if offset % 8 == 0 { | ||
return self.slice(offset / 8); | ||
if offset % 8 == 0 && len % 8 == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like it may fix a bug where the length is incorrectly set after slice if the offset is zero 👍
Can you please also add a unit tests showing this bug fix (aka a unit test in immutable.rs)
I verified that the tests in this PR fail after this change:
Encoding 1023 with a maximum size of 1024
test encode::tests::flight_data_size_string_dictionary ... FAILED
failures:
failures:
encode::tests::flight_data_size_string_dictionary
test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 28 filtered out; finished in 0.00s
--- STDERR: arrow-flight encode::tests::flight_data_size_string_dictionary ---
thread 'encode::tests::flight_data_size_string_dictionary' panicked at arrow-flight/src/encode.rs:1717:21:
assertion `left == right` failed: encoded data[1]: actual size 1136, actual_overage: 112
left: 112
right: 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - I'll pull this out to a separate PR to make this easier to review and add a unit test there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved this to #6707
@@ -327,6 +327,10 @@ impl FlightDataEncoder { | |||
|
|||
/// Encodes batch into one or more `FlightData` messages in self.queue | |||
fn encode_batch(&mut self, batch: RecordBatch) -> Result<()> { | |||
if batch.num_rows() == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of this optimization?
I vaguely remember at least one usecase of encoding empty batches is send the schema information. With this change it seems empty batches can no longer be encoded
Unless there is a reason to prevent empty batches, I would suggest we remove this check here and instead callers of the encoder can decide to filter out empty batches if they want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not an optimization, but rather due to a difference in behavior that the tests dictate - in some of the tests, encoding an empty batch should result in an IPC header + empty data, but in some other tests, encoding an empty batch should result in nothing. I haven't yet figured out exactly why that is, and need to do some more poking to figure out why different parts of the code want this different behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it looks like there are two contradicting comments regarding this behavior:
- The doc comment for this function says
one or more
FlightData messages - implying that if we send an empty batch, it should still load oneFlightData
message into the queue. This is at odds with the previous behavior, which just used the response fromsplit_batch_for_grpc_response
as an iterator for batches to load in, butsplit_batch...
returned an empty vector when given aRecordBatch
with 0 rows. - A comment in the arrow-flight tests (here) that are relevant to this change says that empty batches should not be encoded. If we comment out the
filter
that removes empty batches AND comment out this check for empty batches, all tests pass.
Because there are multiple conflicting records of what is the expected behavior, I'm not sure what to do. I feel like we should respect the doc-comment, as that's what's documented to the users, but the behavior seems to be contrary to that, and it's also contrary to some other tests in arrow-ipc that always expect encoding of a Batch to return some encoded data (even if it's just a header).
@@ -700,6 +682,9 @@ mod tests { | |||
use super::*; | |||
|
|||
#[test] | |||
// flight_data_from_arrow_batch is deprecated but does exactly what we need. Would probably be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not continue to use make_flight_data
(which seems like is exactly what this comment proposes -- a copy of flight_data_from_arrow_batch
?
This change seems like it would only make the final removal of flight_data_from_arrow_batch
harder (would have to re-create make_flight_data
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I just didn't see the purpose for duplicating code when this method already existed and did the exact same thing as make_flight_data
. It also seems that flight_data_from_arrow_batch
is a perfectly fine method, just easy to misuse, and that's why it was deprecated. Its behavior was duplicated in many different places throughout this crate, though - I don't think it's something that we would ever want to completely remove from usage, as it clearly has an internal purpose.
When we want to remove flight_data_from_arrow_batch
from the public API, we could just comfortably make it private, and keep using its functionality internally.
Does that make sense? I feel like I may not be explaining my thoughts perfectly.
let b = pretty_format_batches(&[batch]).unwrap().to_string(); | ||
assert_eq!(a, b); | ||
} | ||
|
||
#[test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please explain why this test removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test simply checked something that isn't applicable anymore - it assumes that we can split a single RecordBatch into smaller RecordBatches with less rows which saturate the given size, but we don't want to verify that we can do that anymore as we don't want to provide any way to split RecordBatches without also accomodating for the size of the IPC Headers (and the rest of the functions in this mod are already doing that).
This test was just mainly there for the split_batch_for_grpc_response
function, and that's just not part of the design anymore.
@@ -1679,9 +1637,7 @@ mod tests { | |||
|
|||
let batch = RecordBatch::try_from_iter(vec![("a1", Arc::new(array) as _)]).unwrap(); | |||
|
|||
// overage is much higher than ideal | |||
// https://github.com/apache/arrow-rs/issues/3478 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
|
||
// If only 1 row was sent over via this recordBatch, there was no way to avoid | ||
// going over the limit. There's currently no mechanism for splitting a single row | ||
// of results over multiple messages, so we allow going over the limit if it's the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this makes sense -- could you also put this caveat in the documentation for max_encoding_message_size
? (we can do it as a follow on PR too)
let dictionary_flight_data: Vec<FlightData> = | ||
encoded_dictionaries.into_iter().map(Into::into).collect(); | ||
let mut batch_flight_data: FlightData = encoded_batch.into(); | ||
#[allow(deprecated)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems unfortunate that we are reverting back to the deprecated method. Maybe we can figure out some non deprecated API 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @itsjunetime I got though a bunch of this PR today but even after reviewing it for an hour I was not able to make it through the entire thing
The changes to flight make sense, but I am struggling to review the changes in arrow-ipc
Is there any way you can break it up into smaller pieces to help review? If not I will try and find more contiguous time to review this but it may be a while (like in a few weeks) before I find it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a brief skim, couple of comments
Adding an IPC specific API to ArrayData seems a touch unfortunate. It's probably ok, but a little off.
I'm not really sure of the context for this PR, but assuming it is to better avoid the gRPC limits, I worry this may be a fools errand. There is a lot beyond the data buffers going into those payloads (e.g. metadata flatbuffers, framing protobuf, HTTP framing, etc...) and trying to account for all of this is going to be a never ending game of wack a mole. Ultimately the only solution I can see is to set the soft limit in the encoder well below the hard limit enforced by the transport.
match spec { | ||
// Just pulled from arrow-ipc | ||
#[inline] | ||
fn pad_to_alignment(alignment: u8, len: usize) -> usize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is assuming alignment is a power of two
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching that - I think the best move would just be documenting this in the function doc-comment so people can be aware of that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in ae37b13
The context of this PR is that we have set the soft limit well below the hard limit (2MB vs 4MB) and somehow a customer still managed to hit the hard limit. So @itsjunetime is trying to improve the specificity of the size. We (at least our team in Influx) don't directly control all layers involved in gRPC communication -- this limit is being enforced by one of the various programs / mixings installed in kubernetes to manage traffic, report on things, etc. While in theory we should be able to figure out which one and increase its limits, that will also likely be a never ending game of whack a mole. Let me see if I can help to find a way to break this PR up into smaller, more manageable pieces, so that we can get this in / tested in a way that is reasonable to maintain |
One change I could make that may help with breaking this up and also with @tustvold's concern about the IPC-specific interface would be to maybe genericize the enum SizeSource {
Buffer(BufferType),
NullBuffer,
ChildData,
// does it need to be more granular? idk
}
trait MemoryAccountant {
fn count_size(&mut self, size: usize, source: SizeSource);
}
impl ArrayData {
fn get_slice_memory_size_with_accountant<A: MemoryAccountant>(
&self,
acc: &mut A
) -> Result<(), ArrowError> {
// ...
}
fn get_slice_memory_size(&self) -> Result<usize, ArrowError> {
struct DefaultAccountant(usize);
impl MemoryAccountant for DefaultAccountant {
fn count_size(&mut self, size: usize, _: SizeSource) {
self.0 += size;
}
}
let mut acc = DefaultAccountant(0);
self.get_slice_memory_size_with_accountant(&mut acc)?;
Ok(acc.0)
}
} This would allows us to use it nicely with the 'alignment' accounting that we need without being too IPC-specific. It would also allow us to remove the ugly re-accounting for Obviously, I'd be happy to make this change and pull it out to a separate PR (to make this PR easier to review once that separate PR is merged) if we feel like this would be a better move. |
Which issue does this PR close?
Closes #3478
What changes are included in this PR?
This reworks the encoding/writing step of flight data messages to ensure it never overflows the given limit whenever possible (specifically, it's impossible when we can't even fit a single row + header within the limit - there are still no mechanisms for splitting a single row of data between multiple messages).
It does this by first constructing a fake IPC header, then getting that header's encoded length, and then subtracting that length from the provided max size. Because the header's size stays the same with the same schema (the only thing that changes is the value within the 'length of data' fields), we don't need to continually recalculate it.
There are more tests I'd like to add before merging this, I was just hoping to get this filed first so that I could get feedback in case any behavior seemed seriously off.
Rationale for these changes
Since we are dynamically checking array data sizes to see if they can fit within the alloted size, this ensures that they will never go over if possible. Of course, as I said before, they will still go over if necessary, but I've rewritten the tests to check this behavior (if the tests sense an overage, but decode it to see that only one row was written, they allow it as there is no other way to get the data across).
Are there any user-facing changes?
Yes, there are API additions. They are documented. As far as I can tell, this shouldn't require a breaking release, but I haven't run anything like cargo-semver-checks on it to actually verify.