From ea01119e34762557f866d00977c01f622c10b2c7 Mon Sep 17 00:00:00 2001 From: devillove084 <786537003@qq.com> Date: Wed, 18 Sep 2024 16:02:26 +0000 Subject: [PATCH] feat(file_index): add basic bitmap index --- crates/paimon/Cargo.toml | 2 + crates/paimon/src/error.rs | 14 + crates/paimon/src/file_index/bitmap_index.rs | 347 +++++++++++++++++++ crates/paimon/src/file_index/mod.rs | 3 + 4 files changed, 366 insertions(+) create mode 100644 crates/paimon/src/file_index/bitmap_index.rs diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index ceb6100..555f587 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -40,6 +40,8 @@ bytes = "1.7.1" bitflags = "2.6.0" tokio = { version = "1.39.2", features = ["macros"] } chrono = { version = "0.4.38", features = ["serde"] } +roaring = "0.10.6" +indexmap = { version = "2.5.0", features = ["serde"] } serde = { version = "1", features = ["derive"] } serde_bytes = "0.11.15" serde_json = "1.0.120" diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs index d7cfd18..5355df9 100644 --- a/crates/paimon/src/error.rs +++ b/crates/paimon/src/error.rs @@ -65,6 +65,20 @@ pub enum Error { display("Paimon hitting invalid file index format: {}", message) )] FileIndexFormatInvalid { message: String }, + #[snafu(visibility(pub(crate)), display("Serialization error: {}", source))] + SerializationError { source: serde_json::Error }, + #[snafu(visibility(pub(crate)), display("Deserialization error: {}", source))] + DeserializationError { source: serde_json::Error }, + #[snafu( + visibility(pub(crate)), + display("Roaring bitmap serialization error: {}", source) + )] + BitmapSerializationError { source: std::io::Error }, + #[snafu( + visibility(pub(crate)), + display("Bitmap deserialization error: {}", source) + )] + BitmapDeserializationError { source: std::io::Error }, } impl From for Error { diff --git a/crates/paimon/src/file_index/bitmap_index.rs b/crates/paimon/src/file_index/bitmap_index.rs new file mode 100644 index 0000000..a7a1b0d --- /dev/null +++ b/crates/paimon/src/file_index/bitmap_index.rs @@ -0,0 +1,347 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::hash::Hash; +use std::sync::Arc; + +use crate::error::{BitmapSerializationSnafu, DeserializationSnafu, SerializationSnafu}; +use crate::io::{FileRead, InputFile}; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use indexmap::IndexMap; +use roaring::RoaringBitmap; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; + +const BITMAP_VERSION_1: u8 = 1; + +#[derive(Serialize, Deserialize)] +struct BitmapFileIndexMeta +where + K: Serialize + DeserializeOwned + Clone + Eq + Hash, +{ + row_count: u32, + non_null_bitmap_number: u32, + has_null_value: bool, + null_value_offset: Option, + #[serde(with = "indexmap::map::serde_seq")] + bitmap_offsets: IndexMap, +} + +pub struct BitmapFileIndexWriter +where + K: Serialize + DeserializeOwned + Clone + Eq + Hash, +{ + id2bitmap: IndexMap, + null_bitmap: RoaringBitmap, + row_number: u32, +} + +impl Default for BitmapFileIndexWriter +where + K: Serialize + DeserializeOwned + Clone + Eq + Hash, +{ + fn default() -> Self { + Self::new() + } +} + +impl BitmapFileIndexWriter +where + K: Serialize + DeserializeOwned + Clone + Eq + Hash, +{ + pub fn new() -> Self { + Self { + id2bitmap: IndexMap::new(), + null_bitmap: RoaringBitmap::new(), + row_number: 0, + } + } + + pub fn write(&mut self, key: Option) { + if let Some(key) = key { + self.id2bitmap + .entry(key) + .or_default() + .insert(self.row_number); + } else { + self.null_bitmap.insert(self.row_number); + } + self.row_number += 1; + } + + pub fn serialized_bytes(&self) -> crate::Result { + // 1. Serialize the null_bitmap + let mut null_bitmap_bytes = Vec::new(); + self.null_bitmap + .serialize_into(&mut null_bitmap_bytes) + .context(BitmapSerializationSnafu)?; + let null_bitmap_size = null_bitmap_bytes.len(); + + // 2. Serialize each bitmap and calculate total size + let mut bitmap_offsets = IndexMap::new(); + let mut serialized_bitmaps = Vec::new(); + let mut total_bitmap_size = 0; + + for (key, bitmap) in &self.id2bitmap { + if bitmap.len() == 1 { + // Single value bitmap, offset is negative + let value = -1_i64 - bitmap.iter().next().unwrap() as i64; + bitmap_offsets.insert(key.clone(), value); + } else { + let mut bitmap_bytes = Vec::new(); + bitmap + .serialize_into(&mut bitmap_bytes) + .context(BitmapSerializationSnafu)?; + let bitmap_size = bitmap_bytes.len(); + serialized_bitmaps.push((bitmap_bytes, bitmap_size)); + bitmap_offsets.insert(key.clone(), total_bitmap_size as i64); + total_bitmap_size += bitmap_size; + } + } + + // 3. Handle null bitmap offset + let null_value_offset = if !self.null_bitmap.is_empty() { + Some(if self.null_bitmap.len() == 1 { + -1_i64 - self.null_bitmap.iter().next().unwrap() as i64 + } else { + 0_i64 + }) + } else { + None + }; + + // 4. Create metadata and serialize it + let meta = BitmapFileIndexMeta { + row_count: self.row_number, + non_null_bitmap_number: self.id2bitmap.len() as u32, + has_null_value: !self.null_bitmap.is_empty(), + null_value_offset, + bitmap_offsets, + }; + + let meta_bytes = serde_json::to_vec(&meta).context(SerializationSnafu)?; + let meta_size = meta_bytes.len(); + + // 5. Calculate total size + let version_size = 1; // BITMAP_VERSION_1 is a single byte + let meta_size_size = 8; // u64 + let total_size = version_size + + meta_size_size + + meta_size + + if self.null_bitmap.len() > 1 { + null_bitmap_size + } else { + 0 + } + + total_bitmap_size; + + // 6. Allocate buffer with total_size + let mut output = BytesMut::with_capacity(total_size); + + // 7. Write data into buffer + // Write version + output.put_u8(BITMAP_VERSION_1); + + // Write meta_size as u64 + output.put_u64_le(meta_size as u64); + + // Write metadata + output.put_slice(&meta_bytes); + + // Write null_bitmap if necessary + if self.null_bitmap.len() > 1 { + output.put_slice(&null_bitmap_bytes); + } + + // Write all bitmaps + for (bitmap_bytes, _size) in serialized_bitmaps { + output.put_slice(&bitmap_bytes); + } + + Ok(output.freeze()) + } +} + +pub struct BitmapFileIndexReader +where + K: Serialize + DeserializeOwned + Clone + Eq + Hash, +{ + input_file: Arc, + meta: BitmapFileIndexMeta, + bitmaps: IndexMap, + null_bitmap: Option, + body_offset: u64, +} + +impl BitmapFileIndexReader +where + K: Serialize + DeserializeOwned + Clone + Eq + Hash, +{ + pub async fn new(input_file: Arc) -> crate::Result { + let input = input_file.read().await?; + let mut buf = input.clone(); + + if buf.remaining() < 1 { + return Err(crate::Error::FileIndexFormatInvalid { + message: "File too small to contain version byte".to_string(), + }); + } + let version = buf.get_u8(); + if version != BITMAP_VERSION_1 { + return Err(crate::Error::FileIndexFormatInvalid { + message: format!("Unsupported version: {}", version), + }); + } + + if buf.remaining() < 8 { + return Err(crate::Error::FileIndexFormatInvalid { + message: "File too small to contain meta_size".to_string(), + }); + } + let meta_size = buf.get_u64_le() as usize; + + if buf.remaining() < meta_size { + return Err(crate::Error::FileIndexFormatInvalid { + message: "File too small to contain metadata".to_string(), + }); + } + let meta_bytes = buf.copy_to_bytes(meta_size); + + let meta: BitmapFileIndexMeta = + serde_json::from_slice(&meta_bytes).context(DeserializationSnafu)?; + + let body_offset = input.len() - buf.remaining(); + + Ok(Self { + input_file, + meta, + bitmaps: IndexMap::new(), + null_bitmap: None, + body_offset: body_offset as u64, + }) + } + + pub async fn get_bitmap(&mut self, key: Option<&K>) -> crate::Result { + if let Some(key) = key { + if let Some(bitmap) = self.bitmaps.get(key) { + return Ok(bitmap.clone()); + } + if let Some(&offset) = self.meta.bitmap_offsets.get(key) { + let bitmap = self.read_bitmap(offset).await?; + self.bitmaps.insert(key.clone(), bitmap.clone()); + Ok(bitmap) + } else { + Ok(RoaringBitmap::new()) + } + } else { + if let Some(bitmap) = &self.null_bitmap { + return Ok(bitmap.clone()); + } + if let Some(offset) = self.meta.null_value_offset { + let bitmap = self.read_bitmap(offset).await?; + self.null_bitmap = Some(bitmap.clone()); + Ok(bitmap) + } else { + Ok(RoaringBitmap::new()) + } + } + } + + async fn read_bitmap(&self, offset: i64) -> crate::Result { + if offset < 0 { + let index = (-1 - offset) as u32; + let mut bitmap = RoaringBitmap::new(); + bitmap.insert(index); + Ok(bitmap) + } else { + let bitmap_pos = self.body_offset as i64 + offset; + let file_meta = self.input_file.metadata().await?; + if bitmap_pos < 0 { + return Err(crate::Error::FileIndexFormatInvalid { + message: format!("Invalid bitmap offset: {}", bitmap_pos), + }); + } + + if bitmap_pos as u64 > file_meta.size { + return Err(crate::Error::FileIndexFormatInvalid { + message: format!( + "Bitmap offset {} exceeds file size {}", + bitmap_pos, file_meta.size + ), + }); + } + + let reader = self.input_file.reader().await?; + let range = bitmap_pos as u64..file_meta.size; + let buf = reader.read(range).await?; + let bitmap = RoaringBitmap::deserialize_from(&mut &buf[..]) + .map_err(|e| crate::Error::BitmapDeserializationError { source: e })?; + + Ok(bitmap) + } + } +} + +#[cfg(test)] +mod basic_bitmap_index_test { + use super::*; + use crate::io::FileIO; + use std::sync::Arc; + + #[tokio::test] + async fn test_basic_bitmap_index_read_write() -> crate::Result<()> { + let path = "memory:/tmp/test_basic_bitmap_index"; + let file_io = FileIO::from_url(path)?.build()?; + + let mut writer = BitmapFileIndexWriter::::new(); + + writer.write(Some("key1".to_string())); + writer.write(None); + writer.write(Some("key2".to_string())); + writer.write(Some("key1".to_string())); + + let bytes = writer.serialized_bytes()?; + + let output = file_io.new_output(path)?; + let mut file_writer = output.writer().await?; + file_writer.write(bytes).await?; + file_writer.close().await?; + + let input_file = output.to_input_file(); + + let mut reader = BitmapFileIndexReader::::new(Arc::new(input_file)).await?; + + let bitmap_key1 = reader.get_bitmap(Some(&"key1".to_string())).await?; + assert_eq!(bitmap_key1.len(), 2); + assert!(bitmap_key1.contains(0)); + assert!(bitmap_key1.contains(3)); + + let bitmap_key2 = reader.get_bitmap(Some(&"key2".to_string())).await?; + assert_eq!(bitmap_key2.len(), 1); + assert!(bitmap_key2.contains(2)); + + let bitmap_none = reader.get_bitmap(None).await?; + assert_eq!(bitmap_none.len(), 1); + assert!(bitmap_none.contains(1)); + + file_io.delete_file(path).await?; + + Ok(()) + } +} diff --git a/crates/paimon/src/file_index/mod.rs b/crates/paimon/src/file_index/mod.rs index ca9ee54..215f192 100644 --- a/crates/paimon/src/file_index/mod.rs +++ b/crates/paimon/src/file_index/mod.rs @@ -17,3 +17,6 @@ mod file_index_format; pub use file_index_format::*; + +mod bitmap_index; +pub use bitmap_index::*;