-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RowConverter keeps growing in size while merging streams on high-cardinality dictionary fields #7200
Comments
I was thinking about how to do this -- one thought I had was to "rewrite" any existing Something like let old_converter: RowConverter = ...
let old_rows: Rows = old_converter.convert_columns(input);
....
// now we need to rewrite to use `new_converter`:
let new_converter: RowConverter = ...
let new_rows: Rows = new_converter.convert_columns(
// convert old Rows back to Arrays
&old_converter.convert_rows(old_rows)?
)?; So in that way you don't need to keep around the original input columns You have probably already thought of this, but I figured I would write it down |
@wiedld, @tustvold @crepererum @JayjeetAtGithub and I had a discussion and here are some notes: The proposal is to look at the input before starting to do the merge or convert any rows and change how the row converter works for high cardinality dictionaries The assumption is that for low cardinality dictionaries (a small If we do not use So specificially this would look like:
Open questions:
Other options we discussed:
|
I filed apache/arrow-rs#4712 to track a possible performance improvement |
@JayjeetAtGithub -- in terms of calculating "high cardinality" dictionaries perhaps we can use some sort of heuristic like "total number of distinct values used in the dictionary is greater than N" where "N" is a constant like You can find the number of values used with this method: https://docs.rs/arrow/latest/arrow/array/struct.DictionaryArray.html#method.occupancy (and then compute the number of set bits) |
Sounds good ! |
@JayjeetAtGithub I was thinking about this issue after some analysis I did on https://github.com/influxdata/influxdb_iox/issues/8568. I think my observation is that the Thus it seems like it might be a good patten to encapsulate / reuse the logic with some sort of wrapper around the row converter. Maybe something like: /// wrapper around a Row converter that automatically
/// picks appropriate dictionary encoding
struct DataFusionRowConverter {
inner: Option<RowEncoder>
}
impl DataFusionRowConverter {
pub fn convert_columns(
&mut self,
columns: &[ArrayRef]
) -> Result<Rows, ArrowError> {
if self.inner.is_none() {
// Check the arrays, detect high cardinality dictionaries
// and fallback to normal decoding for that case
}
// after the first batch, use the pre-configured row coverter
self.inner.as_mut().unwrap().convert_columns(columns)
} |
Quick question: We are implementing this wrapper inside I think |
I think it can be done in either repo but the code would look different depending on where it is |
TLDR I left some comments on the PRs -- they are looking good. I think we should put the code into DataFusion to start. |
Describe the bug
On executing
SortPreservingMerge
on multiple streams of record batches and using a high-cardinality dictionary field as the sort key, theRowConverter
instance used to merge the multipleRowCursorStreams
keeps growing in memory (as it keeps accumulating the dict mappings internally in theOrderPreservingInterner
structure). This unbounded memory growth eventually causes data fusion to get killed by the OOM killer.To Reproduce
Detailed steps to reproduce this issue is given here.
Expected behavior
SortPreservingMerge
on streams of record batches with high-cardinality dictionary-encoded sort keys should be memory aware and keep memory usage within a user-defined limit.Additional context
Possible solution:
RowConverter
using thesize()
method which in the case ofDictionary
fields returns the size of theOrderPreservingInterner
.RowConverter
grows more than a user-defined memory limit, take note of theRowCursorStream
that are still getting converted, delete the converter, create a new one, and re-do the aborted conversions.The text was updated successfully, but these errors were encountered: