Skip to content

Commit

Permalink
faster ALP encode (#924)
Browse files Browse the repository at this point in the history
fixes #920 

Consistently cuts encoding time by 10-50%.

Before the change:

```
Running benches/alp_compress.rs (target/release/deps/alp_compress-abbdaefc5eabf343)
Timer precision: 41 ns
alp_compress          fastest       │ slowest       │ median        │ mean          │ samples │ iters
├─ alp_compress                     │               │               │               │         │
│  ├─ f32                           │               │               │               │         │
│  │  ├─ 100000       191.9 µs      │ 824.9 µs      │ 314.7 µs      │ 354 µs        │ 100     │ 100
│  │  ╰─ 10000000     21.39 ms      │ 28.95 ms      │ 21.71 ms      │ 21.89 ms      │ 100     │ 100
│  ╰─ f64                           │               │               │               │         │
│     ├─ 100000       236 µs        │ 353.7 µs      │ 238.4 µs      │ 246.4 µs      │ 100     │ 100
│     ╰─ 10000000     28.78 ms      │ 68.68 ms      │ 29.49 ms      │ 29.93 ms      │ 100     │ 100
```

After:

```
Running benches/alp_compress.rs (target/release/deps/alp_compress-abbdaefc5eabf343)
Timer precision: 41 ns
alp_compress          fastest       │ slowest       │ median        │ mean          │ samples │ iters
├─ alp_compress                     │               │               │               │         │
│  ├─ f32                           │               │               │               │         │
│  │  ├─ 100000       161 µs        │ 234.6 µs      │ 163.3 µs      │ 166 µs        │ 100     │ 100
│  │  ╰─ 10000000     18.72 ms      │ 21.54 ms      │ 19.07 ms      │ 19.14 ms      │ 100     │ 100
│  ╰─ f64                           │               │               │               │         │
│     ├─ 100000       182 µs        │ 346 µs        │ 183.9 µs      │ 187.9 µs      │ 100     │ 100
│     ╰─ 10000000     23.98 ms      │ 28.71 ms      │ 24.52 ms      │ 24.53 ms      │ 100     │ 100
```
  • Loading branch information
lwwmanning authored Sep 25, 2024
1 parent b95a2af commit a7fd730
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 47 deletions.
182 changes: 136 additions & 46 deletions encodings/alp/src/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ use std::fmt::{Display, Formatter};
use std::mem::size_of;

use itertools::Itertools;
use num_traits::{CheckedSub, Float, NumCast, PrimInt, ToPrimitive, Zero};
use num_traits::{CheckedSub, Float, PrimInt, ToPrimitive};
use serde::{Deserialize, Serialize};
use vortex_error::vortex_panic;

const SAMPLE_SIZE: usize = 32;

Expand Down Expand Up @@ -35,10 +34,11 @@ pub trait ALPFloat: Float + Display + 'static {
(self + Self::SWEET) - Self::SWEET
}

#[inline]
fn as_int(self) -> Option<Self::ALPInt> {
<Self::ALPInt as NumCast>::from(self)
}
/// Equivalent to calling `as` to cast the primitive float to the target integer type.
fn as_int(self) -> Self::ALPInt;

/// Convert from the integer type back to the float type using `as`.
fn from_int(n: Self::ALPInt) -> Self;

fn find_best_exponents(values: &[Self]) -> Exponents {
let mut best_exp = Exponents { e: 0, f: 0 };
Expand Down Expand Up @@ -72,7 +72,7 @@ pub trait ALPFloat: Float + Display + 'static {
best_exp
}

#[inline(always)]
#[inline]
fn estimate_encoded_size(encoded: &[Self::ALPInt], patches: &[Self]) -> usize {
let bits_per_encoded = encoded
.iter()
Expand Down Expand Up @@ -103,56 +103,126 @@ pub trait ALPFloat: Float + Display + 'static {
) -> (Exponents, Vec<Self::ALPInt>, Vec<u64>, Vec<Self>) {
let exp = exponents.unwrap_or_else(|| Self::find_best_exponents(values));

let mut exc_pos = Vec::new();
let mut exc_value = Vec::new();
let mut prev = Self::ALPInt::zero();
let encoded = values
.iter()
.enumerate()
.map(|(i, v)| {
match Self::encode_single(*v, exp) {
Ok(fi) => {
prev = fi;
fi
}
Err(exc) => {
exc_pos.push(i as u64);
exc_value.push(exc);
// Emit the last known good value. This helps with run-end encoding.
prev
}
}
})
.collect_vec();
let mut encoded_output = Vec::with_capacity(values.len());
let mut patch_indices = Vec::new();
let mut patch_values = Vec::new();
let mut fill_value: Option<Self::ALPInt> = None;

(exp, encoded, exc_pos, exc_value)
// this is intentionally branchless
// we batch this into 32KB of values at a time to make it more L1 cache friendly
let encode_chunk_size: usize = (32 << 10) / size_of::<Self::ALPInt>();
for chunk in values.chunks(encode_chunk_size) {
encode_chunk_unchecked(
chunk,
exp,
&mut encoded_output,
&mut patch_indices,
&mut patch_values,
&mut fill_value,
);
}

(exp, encoded_output, patch_indices, patch_values)
}

#[inline]
fn encode_single(value: Self, exponents: Exponents) -> Result<Self::ALPInt, Self> {
let encoded = (value * Self::F10[exponents.e as usize] * Self::IF10[exponents.f as usize])
.fast_round();
if let Some(e) = encoded.as_int() {
let decoded = Self::decode_single(e, exponents);
if decoded == value {
return Ok(e);
}
let encoded = unsafe { Self::encode_single_unchecked(value, exponents) };
let decoded = Self::decode_single(encoded, exponents);
if decoded == value {
return Ok(encoded);
}

Err(value)
}

#[inline]
fn decode_single(encoded: Self::ALPInt, exponents: Exponents) -> Self {
let encoded_float: Self = Self::from(encoded).unwrap_or_else(|| {
vortex_panic!(
"Failed to convert encoded value {} from {} to {} in ALPFloat::decode_single",
encoded,
std::any::type_name::<Self::ALPInt>(),
std::any::type_name::<Self>()
)
});
encoded_float * Self::F10[exponents.f as usize] * Self::IF10[exponents.e as usize]
Self::from_int(encoded) * Self::F10[exponents.f as usize] * Self::IF10[exponents.e as usize]
}

/// # Safety
///
/// The returned value may not decode back to the original value.
#[inline(always)]
unsafe fn encode_single_unchecked(value: Self, exponents: Exponents) -> Self::ALPInt {
(value * Self::F10[exponents.e as usize] * Self::IF10[exponents.f as usize])
.fast_round()
.as_int()
}
}

fn encode_chunk_unchecked<T: ALPFloat>(
chunk: &[T],
exp: Exponents,
encoded_output: &mut Vec<T::ALPInt>,
patch_indices: &mut Vec<u64>,
patch_values: &mut Vec<T>,
fill_value: &mut Option<T::ALPInt>,
) {
let num_prev_encoded = encoded_output.len();
let num_prev_patches = patch_indices.len();
assert_eq!(patch_indices.len(), patch_values.len());
let has_filled = fill_value.is_some();

// encode the chunk, counting the number of patches
let mut chunk_patch_count = 0;
encoded_output.extend(chunk.iter().map(|v| {
let encoded = unsafe { T::encode_single_unchecked(*v, exp) };
let decoded = T::decode_single(encoded, exp);
let neq = (decoded != *v) as usize;
chunk_patch_count += neq;
encoded
}));
let chunk_patch_count = chunk_patch_count; // immutable hereafter
assert_eq!(encoded_output.len(), num_prev_encoded + chunk.len());

// find the first successfully encoded value (i.e., not patched)
// this is our fill value for missing values
if fill_value.is_none() && (num_prev_encoded + chunk_patch_count < encoded_output.len()) {
assert_eq!(num_prev_encoded, num_prev_patches);
for i in num_prev_encoded..encoded_output.len() {
if i >= patch_indices.len() || patch_indices[i] != i as u64 {
*fill_value = Some(encoded_output[i]);
break;
}
}
}

// if there are no patches, we are done
if chunk_patch_count == 0 {
return;
}

// we need to gather the patches for this chunk
// preallocate space for the patches (plus one because our loop may attempt to write one past the end)
patch_indices.reserve(chunk_patch_count + 1);
patch_values.reserve(chunk_patch_count + 1);

// record the patches in this chunk
let patch_indices_mut = patch_indices.spare_capacity_mut();
let patch_values_mut = patch_values.spare_capacity_mut();
let mut chunk_patch_index = 0;
for i in num_prev_encoded..encoded_output.len() {
let decoded = T::decode_single(encoded_output[i], exp);
// write() is only safe to call more than once because the values are primitive (i.e., Drop is a no-op)
patch_indices_mut[chunk_patch_index].write(i as u64);
patch_values_mut[chunk_patch_index].write(chunk[i - num_prev_encoded]);
chunk_patch_index += (decoded != chunk[i - num_prev_encoded]) as usize;
}
assert_eq!(chunk_patch_index, chunk_patch_count);
unsafe {
patch_indices.set_len(num_prev_patches + chunk_patch_count);
patch_values.set_len(num_prev_patches + chunk_patch_count);
}

// replace the patched values in the encoded array with the fill value
// for better downstream compression
if let Some(fill_value) = fill_value {
// handle the edge case where the first N >= 1 chunks are all patches
let start_patch = if !has_filled { 0 } else { num_prev_patches };
for patch_idx in &patch_indices[start_patch..] {
encoded_output[*patch_idx as usize] = *fill_value;
}
}
}

Expand Down Expand Up @@ -189,6 +259,16 @@ impl ALPFloat for f32 {
0.000000001,
0.0000000001, // 10^-10
];

#[inline(always)]
fn as_int(self) -> Self::ALPInt {
self as _
}

#[inline(always)]
fn from_int(n: Self::ALPInt) -> Self {
n as _
}
}

impl ALPFloat for f64 {
Expand Down Expand Up @@ -250,4 +330,14 @@ impl ALPFloat for f64 {
0.0000000000000000000001,
0.00000000000000000000001, // 10^-23
];

#[inline(always)]
fn as_int(self) -> Self::ALPInt {
self as _
}

#[inline(always)]
fn from_int(n: Self::ALPInt) -> Self {
n as _
}
}
2 changes: 1 addition & 1 deletion encodings/alp/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ mod tests {
assert!(encoded.patches().is_some());
assert_eq!(
encoded.encoded().as_primitive().maybe_null_slice::<i64>(),
vec![1234i64, 2718, 2718, 4000] // fill forward
vec![1234i64, 2718, 1234, 4000] // fill forward
);
assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 });

Expand Down

0 comments on commit a7fd730

Please sign in to comment.