Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RowConverter keeps growing in size while merging streams on high-cardinality dictionary fields #7200

Closed
JayjeetAtGithub opened this issue Aug 4, 2023 · 10 comments · Fixed by #7401
Labels
bug Something isn't working

Comments

@JayjeetAtGithub
Copy link
Contributor

Describe the bug

On executing SortPreservingMerge on multiple streams of record batches and using a high-cardinality dictionary field as the sort key, the RowConverter instance used to merge the multiple RowCursorStreams keeps growing in memory (as it keeps accumulating the dict mappings internally in the OrderPreservingInterner structure). This unbounded memory growth eventually causes data fusion to get killed by the OOM killer.

To Reproduce

Detailed steps to reproduce this issue is given here.

Expected behavior

SortPreservingMerge on streams of record batches with high-cardinality dictionary-encoded sort keys should be memory aware and keep memory usage within a user-defined limit.

Additional context

Possible solution:

  1. Keep track of the memory usage for the RowConverter using the size() method which in the case of Dictionary fields returns the size of the OrderPreservingInterner.
  2. If the size of the RowConverter grows more than a user-defined memory limit, take note of the RowCursorStream that are still getting converted, delete the converter, create a new one, and re-do the aborted conversions.
@JayjeetAtGithub JayjeetAtGithub added the bug Something isn't working label Aug 4, 2023
@JayjeetAtGithub
Copy link
Contributor Author

@alamb
Copy link
Contributor

alamb commented Aug 9, 2023

I was thinking about how to do this -- one thought I had was to "rewrite" any existing Rows using a RowConverter / convert_rows https://docs.rs/arrow-row/45.0.0/arrow_row/struct.RowConverter.html#method.convert_rows

Something like

let old_converter: RowConverter = ...
let old_rows: Rows = old_converter.convert_columns(input);
....
// now we need to rewrite to use `new_converter`:
let new_converter: RowConverter = ...
let new_rows: Rows = new_converter.convert_columns(
  // convert old Rows back to Arrays
  &old_converter.convert_rows(old_rows)?
)?;

So in that way you don't need to keep around the original input columns

You have probably already thought of this, but I figured I would write it down

@alamb
Copy link
Contributor

alamb commented Aug 15, 2023

@wiedld, @tustvold @crepererum @JayjeetAtGithub and I had a discussion and here are some notes:

The proposal is to look at the input before starting to do the merge or convert any rows and change how the row converter works for high cardinality dictionaries

The assumption is that for low cardinality dictionaries (a small
number of distinct values), using preserve_dictionaries is
important for performance but for high cardinality dictionaries (with
a large number of distinct values) using preserve_dictionaries not
only consumes large amounts of memory as described in this ticket, but
also will be slower as the size of the interned keys will be substantial.

If we do not use preserve_dictionaries the RowInternerwill no
longer keep a mapping and thus the memory consumption will not grow.

So specificially this would look like:

  1. Based on some heuristic, if the dictionary is high cardinality then use the normal string encoding (set preserve_dictionaries false)
  2. if the dictionary is low cardinality then use the dictionaries encoding (set preserve_dictionaries true, the default)

Open questions:

  1. What heuristic to use to determine high cardinality (The heuristic needs to be reasonably fast / memory efficient to compute)
  2. Can we improve the performance of preserve_dictionaries=false, conversion (Update ticket: Improve the performance of "DictionaryValue" row encoding arrow-rs#4712)
  3. How to verify this doesn't cause a performance regressions

Other options we discussed:

  1. Try update the state of RowConverter to prune out unused entries (not clear we could make this work)
  2. Recreate the RowConverter
  3. Use the "non dictionary encoding" mode (what is described above)

@alamb
Copy link
Contributor

alamb commented Aug 17, 2023

I filed apache/arrow-rs#4712 to track a possible performance improvement

@alamb
Copy link
Contributor

alamb commented Aug 23, 2023

@JayjeetAtGithub -- in terms of calculating "high cardinality" dictionaries perhaps we can use some sort of heuristic like "total number of distinct values used in the dictionary is greater than N" where "N" is a constant like 8 or 32 (maybe @tustvold has some thoughts on the right values to use

You can find the number of values used with this method: https://docs.rs/arrow/latest/arrow/array/struct.DictionaryArray.html#method.occupancy

(and then compute the number of set bits)

@JayjeetAtGithub
Copy link
Contributor Author

Sounds good !

@alamb
Copy link
Contributor

alamb commented Aug 24, 2023

@JayjeetAtGithub I was thinking about this issue after some analysis I did on https://github.com/influxdata/influxdb_iox/issues/8568. I think my observation is that the RowConverter memory consumption explodes for high cardinality dictionaries wherever it is used, wherever it is used (not just in merge). Now that I type it out, it seems obvious 😆

Thus it seems like it might be a good patten to encapsulate / reuse the logic with some sort of wrapper around the row converter. Maybe something like:

/// wrapper around a Row converter that automatically
/// picks appropriate dictionary encoding
struct DataFusionRowConverter { 
  inner: Option<RowEncoder>
}

impl DataFusionRowConverter {
  pub fn convert_columns(
    &mut self,
    columns: &[ArrayRef]
  ) -> Result<Rows, ArrowError> {
    if self.inner.is_none() {
     // Check the arrays, detect high cardinality dictionaries
     // and fallback to normal decoding for that case
   }
   // after the first batch, use the pre-configured row coverter
   self.inner.as_mut().unwrap().convert_columns(columns)
}

@JayjeetAtGithub
Copy link
Contributor Author

JayjeetAtGithub commented Aug 24, 2023

Quick question: We are implementing this wrapper inside arrow-rs or arrow-datafusion ? Because doing this inside data fusion, I get a lot of private field errors.

I think arrow-rs is the right place.

@alamb
Copy link
Contributor

alamb commented Aug 25, 2023

Quick question: We are implementing this wrapper inside arrow-rs or arrow-datafusion ? Because doing this inside data fusion, I get a lot of private field errors.

I think it can be done in either repo but the code would look different depending on where it is

@alamb
Copy link
Contributor

alamb commented Aug 25, 2023

TLDR I left some comments on the PRs -- they are looking good. I think we should put the code into DataFusion to start.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
2 participants