Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: optimize time series memtable ingestion #5451

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/datatypes/src/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,16 @@ pub trait MutableVector: Send + Sync {
/// # Panics
/// Panics if error if data types mismatch.
fn push_value_ref(&mut self, value: ValueRef) {
#[cfg(debug_assertions)]
self.try_push_value_ref(value).unwrap_or_else(|_| {
panic!(
"expecting pushing value of datatype {:?}, actual {:?}",
self.data_type(),
value
);
});
waynexia marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(not(debug_assertions))]
let _ = self.try_push_value_ref(value);
}

/// Push null to this mutable vector.
Expand Down
82 changes: 55 additions & 27 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use crate::region::options::MergeMode;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};

/// Initial vector builder capacity.
const INITIAL_BUILDER_CAPACITY: usize = 0;
const BUILDER_CAPACITY: usize = 512;
waynexia marked this conversation as resolved.
Show resolved Hide resolved

/// Builder to build [TimeSeriesMemtable].
#[derive(Debug, Default)]
Expand Down Expand Up @@ -154,9 +154,7 @@ impl TimeSeriesMemtable {
);

let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
let fields = kv.fields().collect::<Vec<_>>();

stats.value_bytes += fields.iter().map(|v| v.data_size()).sum::<usize>();
let (series, series_allocated) = self.series_set.get_or_add_series(primary_key_encoded);
stats.key_bytes += series_allocated;

Expand All @@ -166,7 +164,8 @@ impl TimeSeriesMemtable {
stats.max_ts = stats.max_ts.max(ts);

let mut guard = series.write().unwrap();
guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), fields);
let size = guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
stats.value_bytes += size;

Ok(())
}
Expand Down Expand Up @@ -617,20 +616,33 @@ struct Series {
pk_cache: Option<Vec<Value>>,
active: ValueBuilder,
frozen: Vec<Values>,
region_metadata: RegionMetadataRef,
}

impl Series {
fn new(region_metadata: &RegionMetadataRef) -> Self {
Self {
pk_cache: None,
active: ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY),
active: ValueBuilder::new(region_metadata, BUILDER_CAPACITY),
frozen: vec![],
region_metadata: region_metadata.clone(),
}
}

/// Pushes a row of values into Series.
fn push(&mut self, ts: ValueRef, sequence: u64, op_type: OpType, values: Vec<ValueRef>) {
self.active.push(ts, sequence, op_type as u8, values);
/// Pushes a row of values into Series. Return the size of values.
fn push<'a>(
&mut self,
ts: ValueRef<'a>,
sequence: u64,
op_type: OpType,
values: impl Iterator<Item = ValueRef<'a>>,
) -> usize {
// + 10 to avoid potential reallocation.
if self.active.len() + 10 > BUILDER_CAPACITY {
let region_metadata = self.region_metadata.clone();
self.freeze(&region_metadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we should use BUILDER_CAPACITY as the capacity of freeze. We may update the capacity in freeze().

/// Freezes the active part and push it to `frozen`.
fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
if self.active.len() != 0 {
let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
std::mem::swap(&mut self.active, &mut builder);
self.frozen.push(Values::from(builder));
}
}

}
self.active.push(ts, sequence, op_type as u8, values)
}

fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
Expand All @@ -640,7 +652,7 @@ impl Series {
/// Freezes the active part and push it to `frozen`.
fn freeze(&mut self, region_metadata: &RegionMetadataRef) {
if self.active.len() != 0 {
let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY);
let mut builder = ValueBuilder::new(region_metadata, BUILDER_CAPACITY);
std::mem::swap(&mut self.active, &mut builder);
self.frozen.push(Values::from(builder));
}
Expand Down Expand Up @@ -725,26 +737,42 @@ impl ValueBuilder {

/// Pushes a new row to `ValueBuilder`.
/// We don't need primary keys since they've already be encoded.
fn push(&mut self, ts: ValueRef, sequence: u64, op_type: u8, fields: Vec<ValueRef>) {
debug_assert_eq!(fields.len(), self.fields.len());
/// Returns the size of field values.
fn push<'a>(
&mut self,
ts: ValueRef,
sequence: u64,
op_type: u8,
fields: impl Iterator<Item = ValueRef<'a>>,
) -> usize {
#[cfg(debug_assertions)]
let fields = {
let field_vec = fields.collect::<Vec<_>>();
debug_assert_eq!(field_vec.len(), self.fields.len());
field_vec.into_iter()
};

self.timestamp.push_value_ref(ts);
self.sequence.push_value_ref(ValueRef::UInt64(sequence));
self.op_type.push_value_ref(ValueRef::UInt8(op_type));
let num_rows = self.timestamp.len();
for (idx, field_value) in fields.into_iter().enumerate() {
let mut size = 0;
for (idx, field_value) in fields.enumerate() {
size += field_value.data_size();
if !field_value.is_null() || self.fields[idx].is_some() {
self.fields[idx]
.get_or_insert_with(|| {
// lazy initialize on first non-null value
let mut mutable_vector =
self.field_types[idx].create_mutable_vector(num_rows);
// fill previous rows with nulls
mutable_vector.push_nulls(num_rows - 1);
mutable_vector
})
.push_value_ref(field_value);
if let Some(field) = self.fields[idx].as_mut() {
field.push_value_ref(field_value);
} else {
let mut mutable_vector =
self.field_types[idx].create_mutable_vector(num_rows.max(BUILDER_CAPACITY));
mutable_vector.push_nulls(num_rows - 1);
mutable_vector.push_value_ref(field_value);
self.fields[idx] = Some(mutable_vector);
}
}
}

size
}

/// Returns the length of [ValueBuilder]
Expand Down Expand Up @@ -951,8 +979,8 @@ mod tests {
ValueRef::Timestamp(Timestamp::new_millisecond(val))
}

fn field_value_ref(v0: i64, v1: f64) -> Vec<ValueRef<'static>> {
vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))]
fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
}

fn check_values(values: Values, expect: &[(i64, u64, u8, i64, f64)]) {
Expand Down Expand Up @@ -1014,20 +1042,20 @@ mod tests {
ts_value_ref(1),
0,
OpType::Put,
vec![ValueRef::Null, ValueRef::Null],
vec![ValueRef::Null, ValueRef::Null].into_iter(),
);
series.push(
ts_value_ref(1),
0,
OpType::Put,
vec![ValueRef::Int64(1), ValueRef::Null],
vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
);
series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
series.push(
ts_value_ref(1),
3,
OpType::Put,
vec![ValueRef::Int64(2), ValueRef::Null],
vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
);
assert_eq!(4, series.active.timestamp.len());
assert_eq!(0, series.frozen.len());
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/row_converter/dense.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,8 @@ impl DensePrimaryKeyCodec {
I: Iterator<Item = ValueRef<'a>>,
{
let mut serializer = Serializer::new(buffer);
for (value, (_, field)) in row.zip(self.ordered_primary_key_columns.iter()) {
field.serialize(&mut serializer, &value)?;
for (idx, value) in row.enumerate() {
self.field_at(idx).serialize(&mut serializer, &value)?;
}
Ok(())
}
Expand Down
Loading