Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-encode dictionaries in selection kernels (take / concat_batches) #3558

Merged
merged 24 commits into from
Sep 5, 2023

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented Jan 18, 2023

Which issue does this PR close?

Closes #506
Relates to #2832

Rationale for this change

MutableArrayData blindly concatenates the dictionary values arrays together when given multiple different dictionaries. This is straightforward and often fast, but degrades spectacularly if either of the following hold:

  • The dictionary values are large
  • Not all dictionary values are referenced by the keys (e.g. the array has been filtered / sliced, read from parquet)

This leads to high memory usage, wasted CPU cycles, and potential byte array offset overflow

What changes are included in this PR?

Are there any user-facing changes?

@github-actions github-actions bot added the arrow Changes to the arrow crate label Jan 18, 2023
/// containing all unique, reference values, along with mappings from the [`DictionaryArray`]
/// keys to the new keys within this values array
pub fn merge_dictionaries<K: ArrowDictionaryKeyType>(
dictionaries: &[(&DictionaryArray<K>, Option<&[u8]>)],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to use the masks for the interleave kernel

@tustvold
Copy link
Contributor Author

I think this is now ready for review, benchmarks suggest re-encoding dictionaries is on the order of ~10µs for 1024 rows.

The difficulty is the old method is very fast most of the time, on the order of 1µs, and so always re-encoding would result in a significant performance regression in the common case. To avoid this, I've stuck a fairly pessimistic heuristic that will only re-encode if the concatenation of the dictionary values would end up with more elements than the output has keys.

This helps to avoid the worst case most of the time, and I think is an acceptable trade-off but welcome other thoughts.

Interleave with the same dictionary (which won't merge)

interleave dict(20, 0.0) 100 [0..100, 100..230, 450..1000]
                        time:   [1.6196 µs 1.6204 µs 1.6212 µs]
Found 3 outliers among 100 measurements (3.00%)
  2 (2.00%) high mild
  1 (1.00%) high severe

interleave dict(20, 0.0) 400 [0..100, 100..230, 450..1000]
                        time:   [4.4878 µs 4.4911 µs 4.4946 µs]
Found 4 outliers among 100 measurements (4.00%)
  3 (3.00%) high mild
  1 (1.00%) high severe

interleave dict(20, 0.0) 1024 [0..100, 100..230, 450..1000]
                        time:   [10.931 µs 10.937 µs 10.944 µs]
Found 6 outliers among 100 measurements (6.00%)
  3 (3.00%) high mild
  3 (3.00%) high severe

interleave dict(20, 0.0) 1024 [0..100, 100..230, 450..1000, 0..1000]
                        time:   [10.675 µs 10.678 µs 10.681 µs]
Found 6 outliers among 100 measurements (6.00%)
  4 (4.00%) high mild
  2 (2.00%) high severe

interleave dict_sparse(20, 0.0) 100 [0..100, 100..230, 450..1000]
                        time:   [1.6390 µs 1.6399 µs 1.6407 µs]
Found 6 outliers among 100 measurements (6.00%)
  3 (3.00%) low mild
  2 (2.00%) high mild
  1 (1.00%) high severe

interleave dict_sparse(20, 0.0) 400 [0..100, 100..230, 450..1000]
                        time:   [4.5065 µs 4.5080 µs 4.5096 µs]
Found 3 outliers among 100 measurements (3.00%)
  2 (2.00%) high mild
  1 (1.00%) high severe

interleave dict_sparse(20, 0.0) 1024 [0..100, 100..230, 450..1000]
                        time:   [11.008 µs 11.014 µs 11.020 µs]
Found 3 outliers among 100 measurements (3.00%)
  2 (2.00%) high mild
  1 (1.00%) high severe

interleave dict_sparse(20, 0.0) 1024 [0..100, 100..230, 450..1000, 0..1000]
                        time:   [10.684 µs 10.688 µs 10.692 µs]
Found 6 outliers among 100 measurements (6.00%)
  1 (1.00%) low mild
  4 (4.00%) high mild
  1 (1.00%) high severe

Interleave with different dictionaries where one is sparse (will merge)

interleave dict_distinct 100
                        time:   [5.2581 µs 5.2605 µs 5.2631 µs]
Found 6 outliers among 100 measurements (6.00%)
  2 (2.00%) high mild
  4 (4.00%) high severe

interleave dict_distinct 1024
                        time:   [5.2225 µs 5.2256 µs 5.2289 µs]
Found 8 outliers among 100 measurements (8.00%)
  5 (5.00%) high mild
  3 (3.00%) high severe

interleave dict_distinct 2048
                        time:   [5.0365 µs 5.0419 µs 5.0478 µs]
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe

The concatenate kernel is typically much faster, and so merging does represent a non-trivial performance regression. (Note this is concatenating 2 arrays of 1024, whereas the interleave benchmarks are interleaving to produce an array of 1024 rows).

concat str_dict 1024    time:   [2.7670 µs 2.7692 µs 2.7711 µs]

concat str_dict_sparse 1024
                        time:   [21.122 µs 21.136 µs 21.151 µs]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high severe

@tustvold tustvold marked this pull request as ready for review January 20, 2023 19:03
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through the code and tests carefully and it looks 👨‍🍳 👌

I was inspired by @jorgecarleitao 's comment #506 (comment) and think it may be a better idea to expose the "optimizing concat" as its own kernel rather than heuristically trying to choose when to call it within the existing concat kernel.

Making it a separate kernel might better align to the current design principle of "give the user full control"

Perhaps @viirya or @jhorstmann has some thoughts on how to expose this code?

arrow-schema/src/datatype.rs Outdated Show resolved Hide resolved
Comment on lines 80 to 84
for ((d, _), mapping) in dictionaries.iter().zip(merged.key_mappings) {
for key in d.keys_iter() {
keys.append_option(key.map(|x| mapping[x]));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way to use the new extend function that got added? Something like (untested)

Suggested change
for ((d, _), mapping) in dictionaries.iter().zip(merged.key_mappings) {
for key in d.keys_iter() {
keys.append_option(key.map(|x| mapping[x]));
}
}
for ((d, _), mapping) in dictionaries.iter().zip(merged.key_mappings) {
keys.extend(d.keys_iter().map(|x| mapping[x]))
}

arrow-select/src/concat.rs Show resolved Hide resolved
assert_eq!(actual, expected);

// Should have merged inputs together
// Not 30 as this is done on a best-effort basis
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

/// A weak heuristic of whether to merge dictionary values that aims to only
/// perform the expensive computation when is likely to yield at least
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// perform the expensive computation when is likely to yield at least
/// perform the expensive merge computation when is likely to yield at least

arrow-select/src/dictionary.rs Show resolved Hide resolved
}

impl<'a, V> Interner<'a, V> {
fn new(capacity: usize) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might help to document the implications of capacity size here -- it seems like the idea is the smaller the capacity greater the chance for collisions (and thus duplicates)

let a =
DictionaryArray::<Int32Type>::from_iter(["a", "b", "a", "b", "d", "c", "e"]);
let b = DictionaryArray::<Int32Type>::from_iter(["c", "f", "c", "d", "a", "d"]);
let merged = merge_dictionary_values(&[(&a, None), (&b, None)]).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have at least one test with NULL values in the dictionary keys

arrow-select/src/interleave.rs Show resolved Hide resolved
let mut values = Vec::with_capacity(dictionaries.len());
let mut value_slices = Vec::with_capacity(dictionaries.len());

for (dictionary, key_mask) in dictionaries {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw in should_merge dictionary_values function that dictionaries with same pointers are considered same ArrayData::ptr_eq(values, first_values); I don't know how frequent the scenario is. But is it worth to keep track of merged dictionary pointers and skip merging them if they are seen again?


let values = &data.child_data()[0];
total_values += values.len();
single_dictionary &= ArrayData::ptr_eq(values, first_values);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why check only the first value? Are there no possibility of duplicate pointers in indexes other that first index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only care if all the dictionary values are the same, if there are any dictionaries that differ, we will end up concatenating

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I agree with @alamb's comment on a separate optimized kernel.

/// some return over the naive approach used by MutableArrayData
///
/// `len` is the total length of the merged output
pub fn should_merge_dictionary_values<K: ArrowDictionaryKeyType>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module isn't public

!single_dictionary && is_supported && (overflow || values_exceed_length)
}

/// Given an array of dictionaries and an optional row mask compute a values array
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Given an array of dictionaries and an optional row mask compute a values array
/// Given an array of dictionaries and an optional key row mask compute a values array

@tustvold tustvold marked this pull request as draft January 30, 2023 15:18
@tustvold
Copy link
Contributor Author

Converting to draft whilst I continue to think about how we should be handling this, especially in light of the RunEncodedArray work

@tustvold tustvold closed this Jun 1, 2023
@tustvold
Copy link
Contributor Author

tustvold commented Jun 1, 2023

Closing this for now, may re-open in future if I have further ideas in this space

@tustvold tustvold reopened this Aug 25, 2023
@tustvold tustvold force-pushed the dictionary-re-encode branch from ea9af09 to dfe0f7b Compare August 25, 2023 13:40
@tustvold
Copy link
Contributor Author

tustvold commented Sep 1, 2023

The latest benchmarks are

concat str_dict 1024    time:   [4.1847 µs 4.1892 µs 4.1935 µs]
                        change: [-11.560% -11.248% -10.897%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 3 outliers among 100 measurements (3.00%)
  2 (2.00%) high mild
  1 (1.00%) high severe

concat str_dict_sparse 1024
                        time:   [11.443 µs 11.452 µs 11.463 µs]
                        change: [+13.983% +14.341% +14.645%] (p = 0.00 < 0.05)
                        Performance has regressed.
interleave dict(20, 0.0) 100 [0..100, 100..230, 450..1000]
                        time:   [3.4982 µs 3.5000 µs 3.5020 µs]
                        change: [+27.355% +27.663% +28.183%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe

interleave dict(20, 0.0) 400 [0..100, 100..230, 450..1000]
                        time:   [6.5197 µs 6.5226 µs 6.5257 µs]
                        change: [+6.0766% +6.3786% +6.6982%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 3 outliers among 100 measurements (3.00%)
  1 (1.00%) high mild
  2 (2.00%) high severe

interleave dict(20, 0.0) 1024 [0..100, 100..230, 450..1000]
                        time:   [12.783 µs 12.787 µs 12.791 µs]
                        change: [-4.4718% -4.2584% -4.1254%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) low mild
  1 (1.00%) high mild
  2 (2.00%) high severe

interleave dict(20, 0.0) 1024 [0..100, 100..230, 450..1000, 0..1000]
                        time:   [13.352 µs 13.358 µs 13.364 µs]
                        change: [-2.1913% -1.9010% -1.6320%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
  4 (4.00%) high mild
  4 (4.00%) high severe

interleave dict_sparse(20, 0.0) 100 [0..100, 100..230, 450..1000]
                        time:   [3.5297 µs 3.5310 µs 3.5323 µs]
                        change: [+26.230% +26.553% +26.772%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 7 outliers among 100 measurements (7.00%)
  5 (5.00%) high mild
  2 (2.00%) high severe

interleave dict_sparse(20, 0.0) 400 [0..100, 100..230, 450..1000]
                        time:   [6.5106 µs 6.5152 µs 6.5201 µs]
                        change: [+6.5054% +6.8053% +7.1029%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 3 outliers among 100 measurements (3.00%)
  3 (3.00%) high severe

interleave dict_sparse(20, 0.0) 1024 [0..100, 100..230, 450..1000]
                        time:   [13.007 µs 13.010 µs 13.014 µs]
                        change: [-3.6668% -3.2717% -2.9611%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 7 outliers among 100 measurements (7.00%)
  4 (4.00%) high mild
  3 (3.00%) high severe

interleave dict_sparse(20, 0.0) 1024 [0..100, 100..230, 450..1000, 0..1000]
                        time:   [13.288 µs 13.297 µs 13.307 µs]
                        change: [-2.8566% -2.6176% -2.4714%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
  3 (3.00%) high mild
  1 (1.00%) high severe

interleave dict_distinct 100
                        time:   [5.5472 µs 5.5536 µs 5.5606 µs]
                        change: [+0.6246% +0.9056% +1.1312%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 5 outliers among 100 measurements (5.00%)
  4 (4.00%) high mild
  1 (1.00%) high severe

interleave dict_distinct 1024
                        time:   [5.5718 µs 5.5794 µs 5.5875 µs]
                        change: [+0.9659% +1.2001% +1.4061%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 10 outliers among 100 measurements (10.00%)
  6 (6.00%) high mild
  4 (4.00%) high severe

interleave dict_distinct 2048
                        time:   [5.4061 µs 5.4136 µs 5.4212 µs]
                        change: [-2.1558% -1.8430% -1.5765%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 3 outliers among 100 measurements (3.00%)

My takeaway from this is that the performance regression is not significant enough in absolute terms (on the order of microseconds) to warrant not making this the default behaviour.

@tustvold tustvold marked this pull request as ready for review September 1, 2023 13:22
@tustvold
Copy link
Contributor Author

tustvold commented Sep 1, 2023

Looking at a flamegraph for the regressed interleave dict(20, 0.0) 100 [0..100, 100..230, 450..1000] shows a non-trivial amount of time spent in should_merge_dictionary_values associated with the ArrayData conversion. I may see if I can optimise this further

image

@tustvold
Copy link
Contributor Author

tustvold commented Sep 1, 2023

With the changes in c2d4009 there is no longer a performance regression, except when re-encoding concat

interleave dict(20, 0.0) 100 [0..100, 100..230, 450..1000]
                        time:   [2.7514 µs 2.7549 µs 2.7582 µs]
                        change: [+0.2105% +0.4287% +0.7010%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 3 outliers among 100 measurements (3.00%)
  1 (1.00%) low mild
  1 (1.00%) high mild
  1 (1.00%) high severe

interleave dict(20, 0.0) 400 [0..100, 100..230, 450..1000]
                        time:   [5.7040 µs 5.7069 µs 5.7101 µs]
                        change: [-7.1155% -6.8505% -6.5598%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
  2 (2.00%) high mild
  2 (2.00%) high severe

interleave dict(20, 0.0) 1024 [0..100, 100..230, 450..1000]
                        time:   [11.952 µs 11.958 µs 11.964 µs]
                        change: [-10.702% -10.503% -10.379%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 5 outliers among 100 measurements (5.00%)
  4 (4.00%) high mild
  1 (1.00%) high severe

interleave dict(20, 0.0) 1024 [0..100, 100..230, 450..1000, 0..1000]
                        time:   [12.001 µs 12.006 µs 12.011 µs]
                        change: [-12.141% -11.932% -11.799%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 7 outliers among 100 measurements (7.00%)
  6 (6.00%) high mild
  1 (1.00%) high severe

interleave dict_sparse(20, 0.0) 100 [0..100, 100..230, 450..1000]
                        time:   [2.7432 µs 2.7468 µs 2.7506 µs]
                        change: [-1.5949% -1.2917% -0.9539%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high severe

interleave dict_sparse(20, 0.0) 400 [0..100, 100..230, 450..1000]
                        time:   [5.6984 µs 5.7024 µs 5.7064 µs]
                        change: [-6.7690% -6.4981% -6.2229%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 5 outliers among 100 measurements (5.00%)
  4 (4.00%) high mild
  1 (1.00%) high severe

interleave dict_sparse(20, 0.0) 1024 [0..100, 100..230, 450..1000]
                        time:   [12.065 µs 12.071 µs 12.077 µs]
                        change: [-10.666% -10.329% -10.110%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 5 outliers among 100 measurements (5.00%)
  1 (1.00%) high mild
  4 (4.00%) high severe

interleave dict_sparse(20, 0.0) 1024 [0..100, 100..230, 450..1000, 0..1000]
                        time:   [11.995 µs 12.000 µs 12.007 µs]
                        change: [-12.334% -12.126% -11.999%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
  1 (1.00%) low mild
  5 (5.00%) high mild
  2 (2.00%) high severe

interleave dict_distinct 100
                        time:   [4.9384 µs 4.9423 µs 4.9467 µs]
                        change: [-10.644% -10.405% -10.218%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 5 outliers among 100 measurements (5.00%)
  4 (4.00%) high mild
  1 (1.00%) high severe

interleave dict_distinct 1024
                        time:   [4.8532 µs 4.8597 µs 4.8655 µs]
                        change: [-12.302% -12.073% -11.862%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe

interleave dict_distinct 2048
                        time:   [4.8942 µs 4.9006 µs 4.9081 µs]
                        change: [-11.126% -10.785% -10.446%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 3 outliers among 100 measurements (3.00%)
  1 (1.00%) high mild
  2 (2.00%) high severe

concat str_dict 1024    time:   [4.7026 µs 4.7050 µs 4.7078 µs]
                        change: [-0.4268% -0.1402% +0.0898%] (p = 0.34 > 0.05)
                        No change in performance detected.
Found 3 outliers among 100 measurements (3.00%)
  1 (1.00%) high mild
  2 (2.00%) high severe

concat str_dict_sparse 1024
                        time:   [11.014 µs 11.022 µs 11.031 µs]
                        change: [+10.100% +10.428% +10.688%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 4 outliers among 100 measurements (4.00%)
  4 (4.00%) high mild

/// Performs a cheap, pointer-based comparison of two byte array
///
/// See [`ScalarBuffer::ptr_eq`]
fn bytes_ptr_eq<T: ByteArrayType>(a: &dyn Array, b: &dyn Array) -> bool {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could see us eventually promoting this to a first-class API on array

@tustvold tustvold requested a review from alamb September 2, 2023 11:39
let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
assert_eq!(&collected, &["c", "c", "c", "a", "c", "b"]);

// Should recompute dictionary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this one is recomputed but the other case isn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the interleave output is smaller

@tustvold tustvold merged commit 65edbb1 into apache:master Sep 5, 2023
25 checks passed
@alamb
Copy link
Contributor

alamb commented Sep 5, 2023

🎉

@alamb alamb changed the title Re-encode dictionaries in selection kernels Re-encode dictionaries in selection kernels (take / concat_batches) Sep 14, 2023
@alamb alamb changed the title Re-encode dictionaries in selection kernels (take / concat_batches) Re-encode dictionaries in selection kernels (take / concat_batches) Sep 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

"Optimize" Dictionary contents in DictionaryArray / concat_batches
5 participants