From b941c19d31c47f9ccb8c6ded861aa4729235c29f Mon Sep 17 00:00:00 2001 From: Mingzhuo Yin Date: Wed, 22 Jan 2025 00:56:32 +0800 Subject: [PATCH 1/2] refactor: improve posting cursor Signed-off-by: Mingzhuo Yin --- src/algorithm/block_encode/delta_bitpack.rs | 425 +++++++++++++++ src/algorithm/block_encode/mod.rs | 23 + .../block_partition/fixed_block_partition.rs | 58 ++ src/algorithm/block_partition/mod.rs | 13 + src/algorithm/block_wand.rs | 54 +- src/algorithm/mod.rs | 2 + src/index/vacuum.rs | 8 +- src/page/postgres.rs | 9 +- src/segment/posting/append.rs | 226 ++++++++ src/segment/posting/mod.rs | 52 +- src/segment/posting/reader.rs | 323 ++++++------ src/segment/posting/serializer.rs | 498 ++++++------------ src/segment/posting/writer.rs | 30 +- src/segment/sealed.rs | 20 +- src/utils/compress_block.rs | 249 --------- src/utils/mod.rs | 1 - 16 files changed, 1147 insertions(+), 844 deletions(-) create mode 100644 src/algorithm/block_encode/delta_bitpack.rs create mode 100644 src/algorithm/block_encode/mod.rs create mode 100644 src/algorithm/block_partition/fixed_block_partition.rs create mode 100644 src/algorithm/block_partition/mod.rs create mode 100644 src/segment/posting/append.rs delete mode 100644 src/utils/compress_block.rs diff --git a/src/algorithm/block_encode/delta_bitpack.rs b/src/algorithm/block_encode/delta_bitpack.rs new file mode 100644 index 0000000..3eeb02d --- /dev/null +++ b/src/algorithm/block_encode/delta_bitpack.rs @@ -0,0 +1,425 @@ +// compress docid with delta encoding and bitpacking +// compress tf with bitpacking + +use std::num::NonZeroU32; + +use bitpacking::{BitPacker, BitPacker4x}; +use lazy_static::lazy_static; + +use super::{BlockDecodeTrait, BlockEncodeTrait}; + +const BLOCK_SIZE: usize = 128; + +lazy_static! { + static ref BITPACKER: BitPacker4x = BitPacker4x::new(); +} + +pub struct DeltaBitpackEncode { + output: Vec, +} + +impl DeltaBitpackEncode { + pub fn new() -> Self { + Self { output: Vec::new() } + } +} + +impl BlockEncodeTrait for DeltaBitpackEncode { + fn encode( + &mut self, + offset: Option, + docids: &mut [u32], + freqs: &mut [u32], + ) -> &[u8] { + assert!(docids.len() == freqs.len()); + assert!(docids.len() == BLOCK_SIZE); + + self.output.clear(); + freqs.iter_mut().for_each(|v| *v -= 1); + let offset = offset.map(|x| x.get()); + + let docid_bits = BITPACKER.num_bits_strictly_sorted(offset, docids); + let freq_bits = BITPACKER.num_bits(freqs); + let docid_size = compress_size(docid_bits, docids.len()); + let freq_size = compress_size(freq_bits, freqs.len()); + self.output.extend_from_slice(&[docid_bits, freq_bits]); + self.output.resize(docid_size + freq_size + 2, 0); + + let mut output = &mut self.output[2..]; + BITPACKER.compress_strictly_sorted(offset, docids, output, docid_bits); + output = &mut output[docid_size..]; + BITPACKER.compress(freqs, output, freq_bits); + &self.output + } +} + +pub struct DeltaBitpackDecode { + inner: Box, +} + +impl DeltaBitpackDecode { + pub fn new() -> Self { + Self { + inner: Box::new(DeltaBitpackDecodeInner::new()), + } + } +} + +impl BlockDecodeTrait for DeltaBitpackDecode { + fn decode(&mut self, data: &[u8], offset: Option) { + self.inner.decode(data, offset); + } + + fn next(&mut self) -> bool { + self.inner.next() + } + + fn seek(&mut self, target: u32) -> bool { + self.inner.seek(target) + } + + fn docid(&self) -> u32 { + self.inner.docid() + } + + fn freq(&self) -> u32 { + self.inner.freq() + } +} + +pub struct DeltaBitpackDecodeInner { + docids: [u32; BLOCK_SIZE], + freqs: [u32; BLOCK_SIZE], + offset: usize, +} + +impl DeltaBitpackDecodeInner { + pub fn new() -> Self { + Self { + docids: [0; BLOCK_SIZE], + freqs: [0; BLOCK_SIZE], + offset: 0, + } + } +} + +impl BlockDecodeTrait for DeltaBitpackDecodeInner { + fn decode(&mut self, mut data: &[u8], offset: Option) { + let offset = offset.map(|x| x.get()); + + let docid_bits = data[0]; + let freq_bits = data[1]; + data = &data[2..]; + let docid_size = + BITPACKER.decompress_strictly_sorted(offset, data, &mut self.docids, docid_bits); + data = &data[docid_size..]; + BITPACKER.decompress(data, &mut self.freqs, freq_bits); + + self.freqs.iter_mut().for_each(|v| *v += 1); + self.offset = 0; + } + + fn next(&mut self) -> bool { + self.offset += 1; + + if self.offset == BLOCK_SIZE { + return false; + } + true + } + + fn seek(&mut self, target: u32) -> bool { + self.offset += self.docids[self.offset..].partition_point(|&v| v < target); + self.offset < BLOCK_SIZE + } + + fn docid(&self) -> u32 { + self.docids[self.offset] + } + + fn freq(&self) -> u32 { + self.freqs[self.offset] + } +} + +// fn num_bits_strictly_sorted(offset: Option, values: &[u32]) -> u8 { +// let mut prev = offset.map(|x| x.get()).unwrap_or(u32::MAX); +// let mut max = 0; +// for &v in values { +// let delta = v.wrapping_sub(prev) - 1; +// prev = v; +// max = max.max(delta); +// } +// 32 - max.leading_zeros() as u8 +// } + +// fn num_bits(values: &[u32]) -> u8 { +// let max = values.iter().copied().max().unwrap_or(0); +// 32 - max.leading_zeros() as u8 +// } + +fn compress_size(num_bits: u8, len: usize) -> usize { + (num_bits as usize * len).div_ceil(8) +} + +// fn compress_strictly_sorted( +// offset: Option, +// uncompressed: &[u32], +// mut compressed: &mut [u8], +// bit_width: u8, +// ) { +// let mut prev = offset.map(|x| x.get()).unwrap_or(u32::MAX); +// let mut mini_buffer: u32 = 0u32; +// let mut cursor = 0; //< number of bits written in the mini_buffer. +// for &v in uncompressed { +// let delta = v.wrapping_sub(prev) - 1; +// prev = v; +// let remaining = 32 - cursor; +// match bit_width.cmp(&remaining) { +// Ordering::Less => { +// // Plenty of room remaining in our mini buffer. +// mini_buffer |= delta << cursor; +// cursor += bit_width; +// } +// Ordering::Equal => { +// mini_buffer |= delta << cursor; +// // We have completed our minibuffer exactly. +// // Let's write it to `compressed`. +// compressed[..4].copy_from_slice(&mini_buffer.to_le_bytes()); +// compressed = &mut compressed[4..]; +// mini_buffer = 0u32; +// cursor = 0; +// } +// Ordering::Greater => { +// mini_buffer |= delta << cursor; +// // We have completed our minibuffer. +// // Let's write it to `compressed` and set the fresh mini_buffer +// // with the remaining bits. +// compressed[..4].copy_from_slice(&mini_buffer.to_le_bytes()); +// compressed = &mut compressed[4..]; +// cursor = bit_width - remaining; +// mini_buffer = delta >> remaining; +// } +// } +// } +// let bit = cursor.div_ceil(8) as usize; +// compressed[..bit].copy_from_slice(&mini_buffer.to_le_bytes()[..bit]); +// } + +// fn compress(uncompressed: &[u32], mut compressed: &mut [u8], bit_width: u8) { +// let mut mini_buffer: u32 = 0u32; +// let mut cursor = 0; //< number of bits written in the mini_buffer. +// for &v in uncompressed { +// let remaining = 32 - cursor; +// match bit_width.cmp(&remaining) { +// Ordering::Less => { +// // Plenty of room remaining in our mini buffer. +// mini_buffer |= v << cursor; +// cursor += bit_width; +// } +// Ordering::Equal => { +// mini_buffer |= v << cursor; +// // We have completed our minibuffer exactly. +// // Let's write it to `compressed`. +// compressed[..4].copy_from_slice(&mini_buffer.to_le_bytes()); +// compressed = &mut compressed[4..]; +// mini_buffer = 0u32; +// cursor = 0; +// } +// Ordering::Greater => { +// mini_buffer |= v << cursor; +// // We have completed our minibuffer. +// // Let's write it to `compressed` and set the fresh mini_buffer +// // with the remaining bits. +// compressed[..4].copy_from_slice(&mini_buffer.to_le_bytes()); +// compressed = &mut compressed[4..]; +// cursor = bit_width - remaining; +// mini_buffer = v >> remaining; +// } +// } +// } +// let bit = cursor.div_ceil(8) as usize; +// compressed[..bit].copy_from_slice(&mini_buffer.to_le_bytes()[..bit]); +// } + +// fn decompress_strictly_sorted( +// offset: Option, +// compressed: &[u8], +// uncompressed: &mut [u32], +// bit_width: u8, +// ) { +// assert!(bit_width <= 32); +// let mut prev = offset.map(|x| x.get()).unwrap_or(u32::MAX); + +// if bit_width == 0 { +// for v in uncompressed.iter_mut() { +// prev = prev.wrapping_add(1); +// *v = prev; +// } +// return; +// } + +// let mut mini_buffer: u64 = 0; +// let mut cursor = 0; //< number of bits read in the mini_buffer. +// let mut idx = 0; +// for &byte in compressed { +// mini_buffer |= (byte as u64) << cursor; +// cursor += 8; +// while cursor >= bit_width { +// let delta = mini_buffer & ((1 << bit_width) - 1); +// mini_buffer >>= bit_width; +// cursor -= bit_width; +// let v = prev.wrapping_add(delta as u32).wrapping_add(1); +// prev = v; +// uncompressed[idx] = v; +// idx += 1; +// if idx == uncompressed.len() { +// return; +// } +// } +// } +// } + +// fn decompress(compressed: &[u8], uncompressed: &mut [u32], bit_width: u8) { +// assert!(bit_width <= 32); +// let mut mini_buffer: u64 = 0; +// let mut cursor = 0; //< number of bits read in the mini_buffer. +// let mut idx = 0; +// for &byte in compressed { +// mini_buffer |= (byte as u64) << cursor; +// cursor += 8; +// while cursor >= bit_width { +// let v = mini_buffer & ((1 << bit_width) - 1); +// mini_buffer >>= bit_width; +// cursor -= bit_width; +// uncompressed[idx] = v as u32; +// idx += 1; +// if idx == uncompressed.len() { +// return; +// } +// } +// } +// } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_next() { + let mut encoder = DeltaBitpackEncode::new(); + let mut decoder = DeltaBitpackDecode::new(); + + let mut docids = rand::seq::index::sample(&mut rand::thread_rng(), 10000, BLOCK_SIZE) + .into_iter() + .map(|x| x as u32) + .collect::>(); + docids.sort_unstable(); + let freqs = (0..BLOCK_SIZE) + .map(|_| rand::random::() % 1000 + 1) + .collect::>(); + let offset = NonZeroU32::new(0); + + println!("docids: {:?}", docids); + println!("freqs: {:?}", freqs); + + let data = encoder.encode(offset, &mut docids.clone(), &mut freqs.clone()); + decoder.decode(data, offset); + + for i in 0..docids.len() { + assert_eq!(docids[i], decoder.docid()); + assert_eq!(freqs[i], decoder.freq()); + if i + 1 < docids.len() { + assert!(decoder.next()); + } else { + assert!(!decoder.next()); + } + } + } + + #[test] + fn test_seek() { + let mut encoder = DeltaBitpackEncode::new(); + let mut decoder = DeltaBitpackDecode::new(); + + let mut docids = rand::seq::index::sample(&mut rand::thread_rng(), 10000, BLOCK_SIZE) + .into_iter() + .map(|x| x as u32) + .collect::>(); + docids.sort_unstable(); + let freqs = (0..BLOCK_SIZE) + .map(|_| rand::random::() % 1000 + 1) + .collect::>(); + let offset = NonZeroU32::new(0); + + println!("docids: {:?}", docids); + println!("freqs: {:?}", freqs); + + let data = encoder.encode(offset, &mut docids.clone(), &mut freqs.clone()); + decoder.decode(data, offset); + + for i in 0..docids.len() { + assert_eq!(docids[i], decoder.docid()); + assert_eq!(freqs[i], decoder.freq()); + if i + 1 < docids.len() { + assert!(decoder.seek(docids[i] + 1)); + } else { + assert!(!decoder.seek(docids[i] + 1)); + } + } + } + + #[test] + fn test_seek_long() { + let mut encoder = DeltaBitpackEncode::new(); + let mut decoder = DeltaBitpackDecode::new(); + + let mut docids = rand::seq::index::sample(&mut rand::thread_rng(), 10000, BLOCK_SIZE) + .into_iter() + .map(|x| x as u32) + .collect::>(); + docids.sort_unstable(); + let freqs = (0..BLOCK_SIZE) + .map(|_| rand::random::() % 1000 + 1) + .collect::>(); + let offset = NonZeroU32::new(0); + + println!("docids: {:?}", docids); + println!("freqs: {:?}", freqs); + + let data = encoder.encode(offset, &mut docids.clone(), &mut freqs.clone()); + decoder.decode(data, offset); + + assert_eq!(docids[0], decoder.docid()); + assert_eq!(freqs[0], decoder.freq()); + + assert!(decoder.seek(docids.last().unwrap().clone())); + assert_eq!(docids.last().unwrap().clone(), decoder.docid()); + } + + #[test] + fn test_zero_bit_width() { + let mut encoder = DeltaBitpackEncode::new(); + let mut decoder = DeltaBitpackDecode::new(); + + let docids = (10..).take(BLOCK_SIZE).collect::>(); + let freqs = vec![1; BLOCK_SIZE]; + let offset = NonZeroU32::new(9); + + println!("docids: {:?}", docids); + println!("freqs: {:?}", freqs); + + let data = encoder.encode(offset, &mut docids.clone(), &mut freqs.clone()); + decoder.decode(data, offset); + + for i in 0..docids.len() { + assert_eq!(docids[i], decoder.docid()); + assert_eq!(freqs[i], decoder.freq()); + if i + 1 < docids.len() { + assert!(decoder.seek(docids[i] + 1)); + } else { + assert!(!decoder.seek(docids[i] + 1)); + } + } + } +} diff --git a/src/algorithm/block_encode/mod.rs b/src/algorithm/block_encode/mod.rs new file mode 100644 index 0000000..00529d5 --- /dev/null +++ b/src/algorithm/block_encode/mod.rs @@ -0,0 +1,23 @@ +mod delta_bitpack; + +use std::num::NonZero; + +pub trait BlockEncodeTrait { + fn encode( + &mut self, + offset: Option>, + docids: &mut [u32], + freqs: &mut [u32], + ) -> &[u8]; +} + +pub trait BlockDecodeTrait { + fn decode(&mut self, data: &[u8], offset: Option>); + fn next(&mut self) -> bool; + fn seek(&mut self, target: u32) -> bool; + fn docid(&self) -> u32; + fn freq(&self) -> u32; +} + +pub type BlockEncode = delta_bitpack::DeltaBitpackEncode; +pub type BlockDecode = delta_bitpack::DeltaBitpackDecode; diff --git a/src/algorithm/block_partition/fixed_block_partition.rs b/src/algorithm/block_partition/fixed_block_partition.rs new file mode 100644 index 0000000..ac5b20a --- /dev/null +++ b/src/algorithm/block_partition/fixed_block_partition.rs @@ -0,0 +1,58 @@ +use super::BlockPartitionTrait; + +const BLOCK_SIZE: usize = 128; + +pub struct FixedBlockPartition { + scores: Vec, + partitions: Vec, + max_doc: Vec, +} + +impl FixedBlockPartition { + pub fn new() -> Self { + Self { + scores: Vec::new(), + partitions: Vec::new(), + max_doc: Vec::new(), + } + } +} + +impl BlockPartitionTrait for FixedBlockPartition { + fn partitions(&self) -> &[u32] { + &self.partitions + } + + fn max_doc(&self) -> &[u32] { + &self.max_doc + } + + fn add_doc(&mut self, score: f32) { + self.scores.push(score); + } + + fn reset(&mut self) { + self.scores.clear(); + self.partitions.clear(); + self.max_doc.clear(); + } + + fn make_partitions(&mut self) { + let doc_cnt = self.scores.len(); + let full_block_cnt = doc_cnt / BLOCK_SIZE; + for i in 0..full_block_cnt { + let start: u32 = (i * BLOCK_SIZE).try_into().unwrap(); + self.partitions.push(start + BLOCK_SIZE as u32 - 1); + let max_doc: u32 = self.scores[start as usize..][..BLOCK_SIZE] + .iter() + .cloned() + .enumerate() + .max_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap()) + .unwrap() + .0 + .try_into() + .unwrap(); + self.max_doc.push(max_doc + start); + } + } +} diff --git a/src/algorithm/block_partition/mod.rs b/src/algorithm/block_partition/mod.rs new file mode 100644 index 0000000..119cfeb --- /dev/null +++ b/src/algorithm/block_partition/mod.rs @@ -0,0 +1,13 @@ +use fixed_block_partition::FixedBlockPartition; + +mod fixed_block_partition; + +pub trait BlockPartitionTrait { + fn partitions(&self) -> &[u32]; + fn max_doc(&self) -> &[u32]; + fn add_doc(&mut self, score: f32); + fn reset(&mut self); + fn make_partitions(&mut self); +} + +pub type BlockPartition = FixedBlockPartition; diff --git a/src/algorithm/block_wand.rs b/src/algorithm/block_wand.rs index 22b812b..14c4bd1 100644 --- a/src/algorithm/block_wand.rs +++ b/src/algorithm/block_wand.rs @@ -4,14 +4,14 @@ use crate::{ segment::{ delete::DeleteBitmapReader, field_norm::{id_to_fieldnorm, FieldNormRead, FieldNormReader}, - posting::{PostingReader, TERMINATED_DOC}, + posting::{PostingCursor, TERMINATED_DOC}, }, utils::topk_computer::TopKComputer, weight::Bm25Weight, }; pub struct SealedScorer { - pub posting: PostingReader, + pub posting: PostingCursor, pub weight: Bm25Weight, pub max_score: f32, } @@ -27,19 +27,19 @@ impl SealedScorer { loop { scorer.posting.decode_block(); loop { - let doc_id = scorer.posting.doc_id(); - if !delete_bitmap_reader.is_delete(doc_id) { - let tf = scorer.posting.term_freq(); - let fieldnorm_id = fieldnorm_reader.read(doc_id); + let docid = scorer.posting.docid(); + if !delete_bitmap_reader.is_delete(docid) { + let tf = scorer.posting.freq(); + let fieldnorm_id = fieldnorm_reader.read(docid); let fieldnorm = id_to_fieldnorm(fieldnorm_id); let score = scorer.weight.score(fieldnorm, tf); - s.yield_with((score, scorer.posting.doc_id())); + s.yield_with((score, scorer.posting.docid())); } - if !scorer.posting.advance_cur() { + if !scorer.posting.next_doc() { break; } } - if !scorer.posting.advance_block() { + if !scorer.posting.next_block() { break; } } @@ -57,25 +57,25 @@ pub fn block_wand_single( ) { 'outer: loop { while scorer.posting.block_max_score(&scorer.weight) <= computer.threshold() { - if !scorer.posting.advance_block() { + if !scorer.posting.next_block() { break 'outer; } } scorer.posting.decode_block(); loop { - let doc_id = scorer.posting.doc_id(); - if !delete_bitmap_reader.is_delete(doc_id) { - let tf = scorer.posting.term_freq(); - let fieldnorm_id = fieldnorm_reader.read(doc_id); + let docid = scorer.posting.docid(); + if !delete_bitmap_reader.is_delete(docid) { + let tf = scorer.posting.freq(); + let fieldnorm_id = fieldnorm_reader.read(docid); let fieldnorm = id_to_fieldnorm(fieldnorm_id); let score = scorer.weight.score(fieldnorm, tf); - computer.push(score, scorer.posting.doc_id()); + computer.push(score, scorer.posting.docid()); } - if !scorer.posting.advance_cur() { + if !scorer.posting.next_doc() { break; } } - if !scorer.posting.advance_block() { + if !scorer.posting.next_block() { break; } } @@ -90,7 +90,7 @@ pub fn block_wand( for s in &mut scorers { s.posting.decode_block(); } - scorers.sort_by_key(|s| s.posting.doc_id()); + scorers.sort_by_key(|s| s.posting.docid()); while let Some((before_pivot_len, pivot_len, pivot_doc)) = find_pivot_doc(&scorers, computer.threshold()) @@ -116,7 +116,7 @@ pub fn block_wand( let len = id_to_fieldnorm(fieldnorm_reader.read(pivot_doc)); let score = scorers[..pivot_len] .iter() - .map(|scorer| scorer.weight.score(len, scorer.posting.term_freq())) + .map(|scorer| scorer.weight.score(len, scorer.posting.freq())) .sum(); computer.push(score, pivot_doc); } @@ -133,7 +133,7 @@ fn find_pivot_doc(scorers: &[SealedScorer], threshold: f32) -> Option<(usize, us let scorer = &scorers[before_pivot_len]; max_score += scorer.max_score; if max_score > threshold { - pivot_doc = scorer.posting.doc_id(); + pivot_doc = scorer.posting.docid(); break; } before_pivot_len += 1; @@ -145,7 +145,7 @@ fn find_pivot_doc(scorers: &[SealedScorer], threshold: f32) -> Option<(usize, us let mut pivot_len = before_pivot_len + 1; pivot_len += scorers[pivot_len..] .iter() - .take_while(|term_scorer| term_scorer.posting.doc_id() == pivot_doc) + .take_while(|term_scorer| term_scorer.posting.docid() == pivot_doc) .count(); Some((before_pivot_len, pivot_len, pivot_doc)) } @@ -168,8 +168,8 @@ fn block_max_was_too_low_advance_one_scorer(scorers: &mut [SealedScorer], pivot_ doc_to_seek_after = doc_to_seek_after.saturating_add(1); for scorer in &mut scorers[pivot_len..] { - if scorer.posting.doc_id() <= doc_to_seek_after { - doc_to_seek_after = scorer.posting.doc_id(); + if scorer.posting.docid() <= doc_to_seek_after { + doc_to_seek_after = scorer.posting.docid(); } } scorers[scorer_to_seek].posting.seek(doc_to_seek_after); @@ -178,9 +178,9 @@ fn block_max_was_too_low_advance_one_scorer(scorers: &mut [SealedScorer], pivot_ } fn restore_ordering(term_scorers: &mut [SealedScorer], ord: usize) { - let doc = term_scorers[ord].posting.doc_id(); + let doc = term_scorers[ord].posting.docid(); for i in ord + 1..term_scorers.len() { - if term_scorers[i].posting.doc_id() >= doc { + if term_scorers[i].posting.docid() >= doc { break; } term_scorers.swap(i, i - 1); @@ -207,8 +207,8 @@ fn align_scorers( fn advance_all_scorers_on_pivot(term_scorers: &mut Vec, pivot_len: usize) { for scorer in &mut term_scorers[..pivot_len] { - scorer.posting.advance(); + scorer.posting.next_with_auto_decode(); } term_scorers.retain(|scorer| !scorer.posting.completed()); - term_scorers.sort_unstable_by_key(|scorer| scorer.posting.doc_id()); + term_scorers.sort_unstable_by_key(|scorer| scorer.posting.docid()); } diff --git a/src/algorithm/mod.rs b/src/algorithm/mod.rs index 45d7bb9..3ffcb91 100644 --- a/src/algorithm/mod.rs +++ b/src/algorithm/mod.rs @@ -1 +1,3 @@ +pub mod block_encode; +pub mod block_partition; pub mod block_wand; diff --git a/src/index/vacuum.rs b/src/index/vacuum.rs index 59e3633..c25a552 100644 --- a/src/index/vacuum.rs +++ b/src/index/vacuum.rs @@ -107,21 +107,21 @@ pub unsafe extern "C" fn amvacuumcleanup( let sealed_reader = SealedSegmentReader::new(index, meta.sealed_segment); for i in 0..meta.sealed_segment.term_id_cnt { - let Some(mut posting) = sealed_reader.get_postings_docid_only(i) else { + let Some(mut posting) = sealed_reader.get_postings(i) else { continue; }; loop { posting.decode_block(); loop { - let doc_id = posting.doc_id(); + let doc_id = posting.docid(); if !delete_bitmap_reader.is_delete(doc_id) { term_stats[i as usize] += 1; } - if !posting.advance_cur() { + if !posting.next_doc() { break; } } - if !posting.advance_block() { + if !posting.next_block() { break; } } diff --git a/src/page/postgres.rs b/src/page/postgres.rs index 32f5cf4..333e8ba 100644 --- a/src/page/postgres.rs +++ b/src/page/postgres.rs @@ -21,10 +21,11 @@ bitflags::bitflags! { const FIELD_NORM = 1 << 2; const TERM_STATISTIC = 1 << 3; const TERM_INFO = 1 << 4; - const SKIP_INFO = 1 << 5; - const BLOCK_DATA = 1 << 6; - const GROWING = 1 << 7; - const DELETE = 1 << 8; + const TERM_META = 1 << 5; + const SKIP_INFO = 1 << 6; + const BLOCK_DATA = 1 << 7; + const GROWING = 1 << 8; + const DELETE = 1 << 9; const FREE = 1 << 15; } } diff --git a/src/segment/posting/append.rs b/src/segment/posting/append.rs new file mode 100644 index 0000000..6779bb4 --- /dev/null +++ b/src/segment/posting/append.rs @@ -0,0 +1,226 @@ +use std::num::NonZeroU32; + +use crate::{ + algorithm::block_encode::{BlockEncode, BlockEncodeTrait}, + page::{ + page_alloc, page_alloc_with_fsm, page_write, PageFlags, PageWriteGuard, VirtualPageWriter, + }, + segment::{ + field_norm::{id_to_fieldnorm, FieldNormRead, FieldNormReader}, + meta::MetaPageData, + posting::{PostingTermInfo, PostingTermMetaData}, + term_stat::TermStatReader, + }, + weight::{idf, Bm25Weight}, +}; + +use super::{ + serializer::PostingSerializer, writer::TFRecorder, InvertedWrite, PostingTermInfoReader, + SkipBlock, SkipBlockFlags, COMPRESSION_BLOCK_SIZE, +}; + +pub struct InvertedAppender { + index: pgrx::pg_sys::Relation, + block_encode: BlockEncode, + term_info_reader: PostingTermInfoReader, + term_stat_reader: TermStatReader, + term_id: u32, + doc_cnt: u32, + avgdl: f32, + fieldnorm_reader: FieldNormReader, +} + +impl InvertedAppender { + pub fn new(index: pgrx::pg_sys::Relation, meta: &MetaPageData) -> Self { + let block_encode = BlockEncode::new(); + let term_info_reader = PostingTermInfoReader::new(index, meta.sealed_segment); + let term_stat_reader = TermStatReader::new(index, meta); + let fieldnorm_reader = FieldNormReader::new(index, meta.field_norm_blkno); + Self { + index, + block_encode, + term_info_reader, + term_stat_reader, + term_id: 0, + doc_cnt: meta.doc_cnt, + avgdl: meta.avgdl(), + fieldnorm_reader, + } + } +} + +impl InvertedWrite for InvertedAppender { + fn write(&mut self, recorder: Option<&TFRecorder>) { + let Some(recorder) = recorder else { + self.term_id += 1; + return; + }; + + let doc_cnt = recorder.doc_cnt(); + if recorder.doc_cnt() == 0 { + self.term_id += 1; + return; + } + + let term_doc_cnt = self.term_stat_reader.read(self.term_id); + let idf = idf(self.doc_cnt, doc_cnt + term_doc_cnt); + let weight = Bm25Weight::new(1, idf, self.avgdl); + + let term_info = self.term_info_reader.read(self.term_id); + if term_info.meta_blkno == pgrx::pg_sys::InvalidBlockNumber { + let mut serializer = PostingSerializer::new(self.index); + serializer.new_term(); + + let mut blockwand_tf = 0; + let mut blockwand_fieldnorm_id = 0; + let mut blockwand_score = 0.0; + for (i, (docid, freq)) in recorder.iter().enumerate() { + serializer.write_doc(docid, freq); + + let fieldnorm_id = self.fieldnorm_reader.read(docid); + let len = id_to_fieldnorm(fieldnorm_id); + let score = weight.score(len, freq); + if score > blockwand_score { + blockwand_tf = freq; + blockwand_fieldnorm_id = fieldnorm_id; + blockwand_score = score; + } + + if (i + 1) % COMPRESSION_BLOCK_SIZE == 0 { + serializer.flush_block(blockwand_tf, blockwand_fieldnorm_id); + blockwand_tf = 0; + blockwand_fieldnorm_id = 0; + blockwand_score = 0.0; + } + } + + let mut term_meta_guard = page_alloc(self.index, PageFlags::TERM_META, true); + let term_meta_page = &mut *term_meta_guard; + term_meta_page.header.pd_lower += std::mem::size_of::() as u16; + let term_meta: &mut PostingTermMetaData = term_meta_page.as_mut(); + + let (unflushed_docids, unflushed_term_freqs) = serializer.unflushed_data(); + let unfulled_doc_cnt = unflushed_docids.len(); + assert!(unfulled_doc_cnt < 128); + term_meta.unfulled_docid[..unfulled_doc_cnt].copy_from_slice(unflushed_docids); + term_meta.unfulled_freq[..unfulled_doc_cnt].copy_from_slice(unflushed_term_freqs); + term_meta.unfulled_doc_cnt = unfulled_doc_cnt as u32; + + let (skip_info_blkno, skip_info_last_blkno, block_data_blkno) = + serializer.close_term(&weight, &self.fieldnorm_reader); + term_meta.skip_info_blkno = skip_info_blkno; + term_meta.skip_info_last_blkno = skip_info_last_blkno; + term_meta.block_data_blkno = block_data_blkno; + + self.term_info_reader.write( + self.term_id, + PostingTermInfo { + meta_blkno: term_meta_guard.blkno(), + }, + ); + } else { + let mut term_meta_guard = page_write(self.index, term_info.meta_blkno); + let term_meta: &mut PostingTermMetaData = term_meta_guard.as_mut(); + + let mut block_data_writer = + VirtualPageWriter::open(self.index, term_meta.block_data_blkno, false); + let mut skip_info_guard = page_write(self.index, term_meta.skip_info_last_blkno); + + let mut block_count = term_meta.block_count - 1; + let mut unfulled_doc_cnt = term_meta.unfulled_doc_cnt; + let mut last_full_block_last_docid = term_meta.last_full_block_last_docid; + let mut blockwand_tf = 0; + let mut blockwand_fieldnorm_id = 0; + let mut blockwand_score = 0.0; + + let skip_info_data = skip_info_guard.data(); + let last_skip_info: &SkipBlock = bytemuck::from_bytes( + &skip_info_data[(skip_info_data.len() - std::mem::size_of::())..], + ); + if last_skip_info.flag.contains(SkipBlockFlags::UNFULLED) { + blockwand_tf = last_skip_info.blockwand_tf; + blockwand_fieldnorm_id = last_skip_info.blockwand_fieldnorm_id; + blockwand_score = + weight.score(id_to_fieldnorm(blockwand_fieldnorm_id), blockwand_tf); + skip_info_guard.header.pd_lower -= std::mem::size_of::() as u16; + } + + for (docid, freq) in recorder.iter() { + term_meta.unfulled_docid[unfulled_doc_cnt as usize] = docid; + term_meta.unfulled_freq[unfulled_doc_cnt as usize] = freq; + + let fieldnorm_id = self.fieldnorm_reader.read(docid); + let len = id_to_fieldnorm(fieldnorm_id); + let score = weight.score(len, freq); + if score > blockwand_score { + blockwand_tf = freq; + blockwand_fieldnorm_id = fieldnorm_id; + blockwand_score = score; + } + + unfulled_doc_cnt += 1; + if unfulled_doc_cnt == 128 { + let mew_last_full_block_last_docid = Some(NonZeroU32::new(docid).unwrap()); + let data = self.block_encode.encode( + last_full_block_last_docid, + &mut term_meta.unfulled_docid, + &mut term_meta.unfulled_freq, + ); + last_full_block_last_docid = mew_last_full_block_last_docid; + unfulled_doc_cnt = 0; + block_count += 1; + + let page_changed = block_data_writer.write_vectorized_no_cross(&[data]); + let mut flag = SkipBlockFlags::empty(); + if page_changed { + flag |= SkipBlockFlags::PAGE_CHANGED; + } + let skip_info = SkipBlock { + last_doc: last_full_block_last_docid.unwrap().get(), + blockwand_tf, + doc_cnt: 128, + size: data.len().try_into().unwrap(), + blockwand_fieldnorm_id, + flag, + }; + append_skip_info(self.index, &mut skip_info_guard, skip_info); + } + } + + if unfulled_doc_cnt != 0 { + let skip_info = SkipBlock { + last_doc: term_meta.unfulled_docid[unfulled_doc_cnt as usize - 1], + blockwand_tf, + doc_cnt: unfulled_doc_cnt, + size: 0, + blockwand_fieldnorm_id, + flag: SkipBlockFlags::UNFULLED, + }; + append_skip_info(self.index, &mut skip_info_guard, skip_info); + block_count += 1; + } + term_meta.unfulled_doc_cnt = unfulled_doc_cnt as u32; + term_meta.block_count = block_count; + term_meta.skip_info_last_blkno = skip_info_guard.blkno(); + term_meta.last_full_block_last_docid = last_full_block_last_docid; + } + + self.term_id += 1; + } +} + +fn append_skip_info( + index: pgrx::pg_sys::Relation, + guard: &mut PageWriteGuard, + skip_info: SkipBlock, +) { + let mut freespace = guard.freespace_mut(); + if freespace.len() < std::mem::size_of::() { + let new_skip_info_guard = page_alloc_with_fsm(index, PageFlags::SKIP_INFO, false); + guard.opaque.next_blkno = new_skip_info_guard.blkno(); + *guard = new_skip_info_guard; + freespace = guard.freespace_mut(); + } + freespace[..std::mem::size_of::()].copy_from_slice(bytemuck::bytes_of(&skip_info)); + guard.header.pd_lower += std::mem::size_of::() as u16; +} diff --git a/src/segment/posting/mod.rs b/src/segment/posting/mod.rs index c5cda16..c597ddb 100644 --- a/src/segment/posting/mod.rs +++ b/src/segment/posting/mod.rs @@ -1,34 +1,30 @@ +mod append; mod reader; mod serializer; mod writer; +use std::num::NonZero; + use bytemuck::{Pod, Zeroable}; -pub use reader::{PostingReader, PostingTermInfoReader}; -pub use serializer::{InvertedAppender, InvertedSerialize, InvertedSerializer}; -pub use writer::InvertedWriter; -use crate::utils::compress_block::compressed_block_size; +pub use append::InvertedAppender; +pub use reader::{PostingCursor, PostingTermInfoReader}; +pub use serializer::{InvertedSerializer, InvertedWrite}; +pub use writer::InvertedWriter; pub const TERMINATED_DOC: u32 = u32::MAX; -pub const COMPRESSION_BLOCK_SIZE: usize = - ::BLOCK_LEN; +pub const COMPRESSION_BLOCK_SIZE: usize = 128; #[derive(Clone, Copy)] pub struct PostingTermInfo { - pub doc_count: u32, - pub skip_info_blkno: pgrx::pg_sys::BlockNumber, - pub skip_info_last_blkno: pgrx::pg_sys::BlockNumber, - pub block_data_blkno: pgrx::pg_sys::BlockNumber, + pub meta_blkno: pgrx::pg_sys::BlockNumber, } impl PostingTermInfo { pub fn empty() -> Self { Self { - doc_count: 0, - skip_info_blkno: pgrx::pg_sys::InvalidBlockNumber, - skip_info_last_blkno: pgrx::pg_sys::InvalidBlockNumber, - block_data_blkno: pgrx::pg_sys::InvalidBlockNumber, + meta_blkno: pgrx::pg_sys::InvalidBlockNumber, } } } @@ -36,6 +32,17 @@ impl PostingTermInfo { unsafe impl Zeroable for PostingTermInfo {} unsafe impl Pod for PostingTermInfo {} +pub struct PostingTermMetaData { + pub skip_info_blkno: pgrx::pg_sys::BlockNumber, + pub skip_info_last_blkno: pgrx::pg_sys::BlockNumber, + pub block_data_blkno: pgrx::pg_sys::BlockNumber, + pub block_count: u32, + pub last_full_block_last_docid: Option>, + pub unfulled_doc_cnt: u32, + pub unfulled_docid: [u32; 128], + pub unfulled_freq: [u32; 128], +} + bitflags::bitflags! { #[derive(Debug, Clone, Copy)] pub struct SkipBlockFlags: u8 { @@ -50,27 +57,16 @@ impl Default for SkipBlockFlags { } } -// for unfulled block, docid_bits and tf_bits are combined into a single u16 to store the block size #[derive(Clone, Copy, Default, Debug)] pub struct SkipBlock { last_doc: u32, blockwand_tf: u32, - docid_bits: u8, - tf_bits: u8, + #[allow(dead_code)] + doc_cnt: u32, // unused now + size: u16, blockwand_fieldnorm_id: u8, flag: SkipBlockFlags, } unsafe impl Zeroable for SkipBlock {} unsafe impl Pod for SkipBlock {} - -impl SkipBlock { - // unfulled block will return invalid block size - pub fn block_size(&self) -> usize { - if !self.flag.contains(SkipBlockFlags::UNFULLED) { - compressed_block_size(self.docid_bits) + compressed_block_size(self.tf_bits) - } else { - ((self.docid_bits as usize) << 8) | (self.tf_bits as usize) - } - } -} diff --git a/src/segment/posting/reader.rs b/src/segment/posting/reader.rs index 94d1561..26c0a88 100644 --- a/src/segment/posting/reader.rs +++ b/src/segment/posting/reader.rs @@ -1,13 +1,13 @@ -use std::{fmt::Debug, io::Read, mem::MaybeUninit}; +use std::num::NonZeroU32; use crate::{ - page::{page_read, PageReader, VirtualPageReader}, + algorithm::block_encode::{BlockDecode, BlockDecodeTrait}, + page::{bm25_page_size, page_read, VirtualPageReader}, segment::{field_norm::id_to_fieldnorm, sealed::SealedSegmentData}, - utils::compress_block::BlockDecoder, weight::Bm25Weight, }; -use super::{PostingTermInfo, SkipBlock, SkipBlockFlags, COMPRESSION_BLOCK_SIZE, TERMINATED_DOC}; +use super::{PostingTermInfo, PostingTermMetaData, SkipBlock, SkipBlockFlags, TERMINATED_DOC}; pub struct PostingTermInfoReader { page_reader: VirtualPageReader, @@ -47,114 +47,113 @@ impl PostingTermInfoReader { } } -pub struct PostingReader { +pub struct PostingCursor { index: pgrx::pg_sys::Relation, - skip_blocks: Box<[SkipBlock]>, - doc_count: u32, - // decoders - doc_decoder: BlockDecoder, - freq_decoder: BlockDecoder, - // skip cursor - block_data_reader: VirtualPageReader, - cur_page: pgrx::pg_sys::BlockNumber, - page_offset: usize, - cur_block: usize, - block_offset: usize, - remain_doc_cnt: u32, + block_decode: BlockDecode, + // block reader + block_page_reader: VirtualPageReader, + block_page_id: u32, + page_offset: u32, + // skip info reader + skip_info_page_id: u32, + skip_info_offset: u32, + decode_offset: u32, + cur_skip_info: SkipBlock, + // helper state block_decoded: bool, + remain_block_cnt: u32, + // unfulled block + unfulled_docid: Box<[u32]>, + unfulled_freq: Box<[u32]>, + unfulled_offset: u32, } -impl Debug for PostingReader { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PostingReader") - .field("with_freq", &WITH_FREQ) - .field("doc_count", &self.doc_count) - .field("cur_page", &self.cur_page) - .field("page_offset", &self.page_offset) - .field("cur_block", &self.cur_block) - .field("block_offset", &self.block_offset) - .field("remain_doc_cnt", &self.remain_doc_cnt) - .field("block_decoded", &self.block_decoded) - .finish() - } -} - -// This api is used in 2 ways: -// - advance_block + advance_cur to move forward, manually call decode_block -// - shallow_seek + seek to move to a specific doc_id, advance to move forward. it will decode_block automatically -impl PostingReader { +impl PostingCursor { pub fn new(index: pgrx::pg_sys::Relation, term_info: PostingTermInfo) -> Self { - assert!(term_info.doc_count > 0); - assert!(term_info.skip_info_blkno != pgrx::pg_sys::InvalidBlockNumber); - assert!(term_info.block_data_blkno != pgrx::pg_sys::InvalidBlockNumber); - let mut skip_info_reader = PageReader::new(index, term_info.skip_info_blkno); - let block_cnt = (term_info.doc_count).div_ceil(COMPRESSION_BLOCK_SIZE as u32); - - // for memory alignment - let mut buf: Box<[MaybeUninit]> = Box::new_uninit_slice(block_cnt as usize); - let slice_mut = unsafe { - std::slice::from_raw_parts_mut( - buf.as_mut_ptr() as *mut u8, - block_cnt as usize * std::mem::size_of::(), - ) - }; - skip_info_reader.read_exact(slice_mut).unwrap(); - drop(skip_info_reader); - let skip_blocks = unsafe { buf.assume_init() }; + let PostingTermInfo { meta_blkno } = term_info; - let block_data_reader = VirtualPageReader::new(index, term_info.block_data_blkno); + let term_meta_guard = page_read(index, meta_blkno); + let block_decode = BlockDecode::new(); + let term_meta: &PostingTermMetaData = term_meta_guard.as_ref(); + let block_page_reader = VirtualPageReader::new(index, term_meta.block_data_blkno); + let remain_block_cnt = term_meta.block_count; + let unfulled_docid = term_meta.unfulled_docid[..term_meta.unfulled_doc_cnt as usize].into(); + let unfulled_freq = term_meta.unfulled_freq[..term_meta.unfulled_doc_cnt as usize].into(); - Self { + let mut this = Self { index, - doc_count: term_info.doc_count, - skip_blocks, - doc_decoder: BlockDecoder::new(), - freq_decoder: BlockDecoder::new(), - block_data_reader, - cur_page: 0, + block_decode, + block_page_reader, + block_page_id: 0, page_offset: 0, - cur_block: 0, - block_offset: 0, - remain_doc_cnt: term_info.doc_count, + skip_info_page_id: term_meta.skip_info_blkno, + skip_info_offset: 0, + decode_offset: 0, + cur_skip_info: SkipBlock::default(), block_decoded: false, - } - } + remain_block_cnt, + unfulled_docid, + unfulled_freq, + unfulled_offset: u32::MAX, + }; - pub fn doc_count(&self) -> u32 { - self.doc_count + this.update_skip_info(); + this } - pub fn advance_block(&mut self) -> bool { + pub fn next_block(&mut self) -> bool { debug_assert!(!self.completed()); - self.cur_block += 1; + self.remain_block_cnt -= 1; self.block_decoded = false; - self.remain_doc_cnt -= std::cmp::min(COMPRESSION_BLOCK_SIZE as u32, self.remain_doc_cnt); - self.update_page_cursor(); if self.completed() { return false; } + + self.decode_offset = self.cur_skip_info.last_doc; + self.page_offset += self.cur_skip_info.size as u32; + + self.skip_info_offset += std::mem::size_of::() as u32; + if self.skip_info_offset == bm25_page_size() as u32 { + let page = page_read(self.index, self.skip_info_page_id); + self.skip_info_page_id = page.opaque.next_blkno; + self.skip_info_offset = 0; + } + self.update_skip_info(); + + if self + .cur_skip_info + .flag + .contains(SkipBlockFlags::PAGE_CHANGED) + { + self.block_page_id += 1; + self.page_offset = 0; + } + true } - pub fn advance_cur(&mut self) -> bool { + pub fn next_doc(&mut self) -> bool { debug_assert!(self.block_decoded); - if self.block_offset < COMPRESSION_BLOCK_SIZE.min(self.remain_doc_cnt as usize) { - self.block_offset += 1; - } - if self.block_offset == COMPRESSION_BLOCK_SIZE.min(self.remain_doc_cnt as usize) { - return false; + if self.is_in_unfulled_block() { + self.unfulled_offset += 1; + debug_assert!(self.unfulled_offset <= self.unfulled_doc_cnt()); + if self.unfulled_offset == self.unfulled_doc_cnt() { + return false; + } + true + } else { + self.block_decode.next() } - true } - pub fn advance(&mut self) -> bool { + pub fn next_with_auto_decode(&mut self) -> bool { if self.completed() { return false; } - if self.advance_cur() { + if self.next_doc() { return true; } - if self.advance_block() { + if self.next_block() { self.decode_block(); true } else { @@ -162,140 +161,122 @@ impl PostingReader { } } - pub fn shallow_seek(&mut self, doc_id: u32) -> bool { + pub fn shallow_seek(&mut self, docid: u32) -> bool { if self.completed() { return false; } - while self.skip_blocks[self.cur_block].last_doc < doc_id { - if !self.advance_block() { + let prev_docid = self.docid(); + while self.last_doc_in_block() < docid { + if !self.next_block() { + debug_assert!(prev_docid == self.docid()); return false; } } + debug_assert!(prev_docid == self.docid()); true } - pub fn seek(&mut self, doc_id: u32) -> u32 { + pub fn seek(&mut self, docid: u32) -> u32 { if self.completed() { - self.block_offset = 128; + self.unfulled_offset = self.unfulled_doc_cnt(); return TERMINATED_DOC; } - if !self.shallow_seek(doc_id) { + if !self.shallow_seek(docid) { return TERMINATED_DOC; } if !self.block_decoded { self.decode_block(); } - self.block_offset = self.doc_decoder.output().partition_point(|&v| v < doc_id); - self.doc_id() + + if self.is_in_unfulled_block() { + self.unfulled_offset = self + .unfulled_docid + .partition_point(|&d| d < docid) + .try_into() + .unwrap(); + debug_assert!(self.unfulled_offset < self.unfulled_doc_cnt()); + } else { + let incomplete = self.block_decode.seek(docid); + debug_assert!(incomplete); + } + debug_assert!(self.docid() >= docid); + self.docid() + } + + pub fn decode_block(&mut self) { + debug_assert!(!self.completed()); + if self.block_decoded { + return; + } + self.block_decoded = true; + if self.is_in_unfulled_block() { + self.unfulled_offset = 0; + return; + } + + let skip = &self.cur_skip_info; + let page = page_read( + self.index, + self.block_page_reader.get_block_id(self.block_page_id), + ); + self.block_decode.decode( + &page.data()[self.page_offset as usize..][..skip.size as usize], + NonZeroU32::new(self.decode_offset), + ); } - pub fn doc_id(&self) -> u32 { - if self.completed() && self.block_offset == 128 { + pub fn docid(&self) -> u32 { + if self.completed() && self.unfulled_offset == self.unfulled_doc_cnt() { return TERMINATED_DOC; } - self.doc_decoder.output()[self.block_offset] + if self.is_in_unfulled_block() && self.unfulled_offset != u32::MAX { + return self.unfulled_docid[self.unfulled_offset as usize]; + } + debug_assert!(self.block_decode.docid() <= self.last_doc_in_block()); + self.block_decode.docid() } - pub fn term_freq(&self) -> u32 { + pub fn freq(&self) -> u32 { debug_assert!(!self.completed()); debug_assert!(self.block_decoded); - const { - assert!(WITH_FREQ); + if self.is_in_unfulled_block() { + return self.unfulled_freq[self.unfulled_offset as usize]; } - self.freq_decoder.output()[self.block_offset] + self.block_decode.freq() } - pub fn block_max_score(&self, bm25_weight: &Bm25Weight) -> f32 { + pub fn block_max_score(&self, weight: &Bm25Weight) -> f32 { if self.completed() { return 0.0; } - let fieldnorm_id = self.skip_blocks[self.cur_block].blockwand_fieldnorm_id; - let fieldnorm = id_to_fieldnorm(fieldnorm_id); - let tf = self.skip_blocks[self.cur_block].blockwand_tf; - bm25_weight.score(fieldnorm, tf) + let len = id_to_fieldnorm(self.cur_skip_info.blockwand_fieldnorm_id); + weight.score(len, self.cur_skip_info.blockwand_tf) } pub fn last_doc_in_block(&self) -> u32 { if self.completed() { return TERMINATED_DOC; } - self.skip_blocks[self.cur_block].last_doc + self.cur_skip_info.last_doc } pub fn completed(&self) -> bool { - self.remain_doc_cnt == 0 + self.remain_block_cnt == 0 } - pub fn decode_block(&mut self) { - debug_assert!(!self.completed()); - if self.block_decoded { - return; - } - let skip = &self.skip_blocks[self.cur_block]; - let last_doc = if self.cur_block == 0 { - 0 - } else { - self.skip_blocks[self.cur_block - 1].last_doc - }; - - let page = page_read( - self.index, - self.block_data_reader.get_block_id(self.cur_page), + fn update_skip_info(&mut self) { + let page = page_read(self.index, self.skip_info_page_id); + let skip_info = *bytemuck::from_bytes( + &page.data()[self.skip_info_offset as usize..][..std::mem::size_of::()], ); - - if self.remain_doc_cnt < COMPRESSION_BLOCK_SIZE as u32 { - debug_assert!(skip.flag.contains(SkipBlockFlags::UNFULLED)); - let bytes = self.doc_decoder.decompress_vint_sorted( - &page.data()[self.page_offset + std::mem::size_of::()..], - last_doc, - self.remain_doc_cnt, - ); - if WITH_FREQ { - self.freq_decoder.decompress_vint_unsorted( - &page.data()[(self.page_offset + std::mem::size_of::() + bytes)..], - self.remain_doc_cnt, - ); - self.freq_decoder - .output_mut() - .iter_mut() - .for_each(|v| *v += 1); - } - } else { - debug_assert!(!skip.flag.contains(SkipBlockFlags::UNFULLED)); - let bytes = self.doc_decoder.decompress_block_sorted( - &page.data()[self.page_offset..], - skip.docid_bits, - last_doc, - ); - if WITH_FREQ { - self.freq_decoder.decompress_block_unsorted( - &page.data()[(self.page_offset + bytes)..], - skip.tf_bits, - ); - self.freq_decoder - .output_mut() - .iter_mut() - .for_each(|v| *v += 1); - } - } - self.block_offset = 0; - self.block_decoded = true; + self.cur_skip_info = skip_info; } - fn update_page_cursor(&mut self) { - self.page_offset += self.skip_blocks[self.cur_block - 1].block_size(); - - if self.completed() { - self.page_offset = 0; - return; - } + fn unfulled_doc_cnt(&self) -> u32 { + self.unfulled_docid.len() as u32 + } - if self.skip_blocks[self.cur_block] - .flag - .contains(SkipBlockFlags::PAGE_CHANGED) - { - self.cur_page += 1; - self.page_offset = 0; - } + fn is_in_unfulled_block(&self) -> bool { + !self.unfulled_docid.is_empty() && self.remain_block_cnt <= 1 } } diff --git a/src/segment/posting/serializer.rs b/src/segment/posting/serializer.rs index 9e3eb7a..c116c7d 100644 --- a/src/segment/posting/serializer.rs +++ b/src/segment/posting/serializer.rs @@ -1,41 +1,52 @@ +use std::num::NonZeroU32; + use crate::{ - page::{page_read, page_write, PageFlags, PageWriter, VirtualPageWriter}, + algorithm::{ + block_encode::{BlockEncode, BlockEncodeTrait}, + block_partition::{BlockPartition, BlockPartitionTrait}, + }, + page::{page_alloc, PageFlags, PageWriter, VirtualPageWriter}, segment::{ - field_norm::{id_to_fieldnorm, FieldNormRead, FieldNormReader, MAX_FIELD_NORM}, - meta::MetaPageData, + field_norm::{id_to_fieldnorm, FieldNormRead}, posting::SkipBlockFlags, }, - utils::compress_block::{BlockDecoder, BlockEncoder}, weight::{idf, Bm25Weight}, }; -use super::{PostingTermInfo, PostingTermInfoReader, SkipBlock, COMPRESSION_BLOCK_SIZE}; +use super::{writer::TFRecorder, PostingTermInfo, PostingTermMetaData, SkipBlock}; -pub trait InvertedSerialize { - fn new_term(&mut self, doc_count: u32); - fn write_doc(&mut self, doc_id: u32, freq: u32); - fn close_term(&mut self); +pub trait InvertedWrite { + fn write(&mut self, recorder: Option<&TFRecorder>); } pub struct InvertedSerializer { - postings_serializer: PostingSerializer, + index: pgrx::pg_sys::Relation, + postings_serializer: PostingSerializer, term_info_serializer: PostingTermInfoSerializer, - current_term_info: PostingTermInfo, + block_parttion: BlockPartition, + // block wand helper + avgdl: f32, + corpus_doc_cnt: u32, + fieldnorm_reader: R, } impl InvertedSerializer { pub fn new( index: pgrx::pg_sys::Relation, - doc_cnt: u32, + corpus_doc_cnt: u32, avgdl: f32, fieldnorm_reader: R, ) -> Self { - let postings_serializer = PostingSerializer::new(index, doc_cnt, avgdl, fieldnorm_reader); + let postings_serializer = PostingSerializer::new(index); let term_info_serializer = PostingTermInfoSerializer::new(index); Self { + index, postings_serializer, term_info_serializer, - current_term_info: PostingTermInfo::empty(), + block_parttion: BlockPartition::new(), + avgdl, + corpus_doc_cnt, + fieldnorm_reader, } } @@ -45,110 +56,76 @@ impl InvertedSerializer { } } -impl InvertedSerialize for InvertedSerializer { - fn new_term(&mut self, doc_count: u32) { - if doc_count != 0 { - self.postings_serializer.new_term(doc_count); - } - self.current_term_info = PostingTermInfo { - doc_count, - ..PostingTermInfo::empty() +impl InvertedWrite for InvertedSerializer { + fn write(&mut self, recorder: Option<&TFRecorder>) { + let Some(recorder) = recorder else { + self.term_info_serializer.push(PostingTermInfo::empty()); + return; }; - } - - fn write_doc(&mut self, doc_id: u32, freq: u32) { - self.postings_serializer.write_doc(doc_id, freq); - } - fn close_term(&mut self) { - if self.current_term_info.doc_count != 0 { - let (skip_info_blkno, skip_info_last_blkno, block_data_blkno) = - self.postings_serializer.close_term(); - self.current_term_info.skip_info_blkno = skip_info_blkno; - self.current_term_info.skip_info_last_blkno = skip_info_last_blkno; - self.current_term_info.block_data_blkno = block_data_blkno; - } - self.term_info_serializer.push(self.current_term_info); - } -} - -enum AppendState { - Empty, - New, - Append, -} - -pub struct InvertedAppender { - postings_serializer: PostingSerializer, - term_info_reader: PostingTermInfoReader, - current_term_info: PostingTermInfo, - term_id: u32, - state: AppendState, -} - -impl InvertedAppender { - pub fn new(index: pgrx::pg_sys::Relation, meta: &MetaPageData) -> Self { - let doc_cnt = meta.doc_cnt; - let avgdl = meta.avgdl(); - let fieldnorm_reader = FieldNormReader::new(index, meta.field_norm_blkno); - let postings_serializer = PostingSerializer::new(index, doc_cnt, avgdl, fieldnorm_reader); - let term_info_reader = PostingTermInfoReader::new(index, meta.sealed_segment); - Self { - postings_serializer, - term_info_reader, - current_term_info: PostingTermInfo::empty(), - term_id: 0, - state: AppendState::Empty, + let doc_cnt = recorder.doc_cnt(); + if doc_cnt == 0 { + self.term_info_serializer.push(PostingTermInfo::empty()); + return; } - } -} -impl InvertedSerialize for InvertedAppender { - fn new_term(&mut self, doc_count: u32) { - if doc_count != 0 { - let term_info = self.term_info_reader.read(self.term_id); - if term_info.doc_count != 0 { - self.postings_serializer.open_append(term_info, doc_count); - self.current_term_info = term_info; - self.current_term_info.doc_count += doc_count; - self.state = AppendState::Append; - } else { - self.postings_serializer.new_term(doc_count); - self.current_term_info = PostingTermInfo { - doc_count, - ..PostingTermInfo::empty() - }; - self.state = AppendState::New; - } - } else { - self.state = AppendState::Empty; + let idf = idf(self.corpus_doc_cnt, doc_cnt); + let bm25_weight = Bm25Weight::new(1, idf, self.avgdl); + for (doc_id, tf) in recorder.iter() { + let len = id_to_fieldnorm(self.fieldnorm_reader.read(doc_id)); + self.block_parttion.add_doc(bm25_weight.score(len, tf)); } - } - - fn write_doc(&mut self, doc_id: u32, freq: u32) { - self.postings_serializer.write_doc(doc_id, freq); - } - - fn close_term(&mut self) { - match self.state { - AppendState::Empty => {} - AppendState::New => { - let (skip_info_blkno, skip_info_last_blkno, block_data_blkno) = - self.postings_serializer.close_term(); - self.current_term_info.skip_info_blkno = skip_info_blkno; - self.current_term_info.skip_info_last_blkno = skip_info_last_blkno; - self.current_term_info.block_data_blkno = block_data_blkno; - self.term_info_reader - .write(self.term_id, self.current_term_info); + self.block_parttion.make_partitions(); + let partitions = self.block_parttion.partitions(); + let max_doc = self.block_parttion.max_doc(); + let mut block_count = 0; + let mut blockwand_tf = 0; + let mut blockwand_fieldnorm_id = 0; + + self.postings_serializer.new_term(); + for (i, (doc_id, freq)) in recorder.iter().enumerate() { + self.postings_serializer.write_doc(doc_id, freq); + if Some(i as u32) == partitions.get(block_count).copied() { + self.postings_serializer + .flush_block(blockwand_tf, blockwand_fieldnorm_id); + block_count += 1; } - AppendState::Append => { - let (_, skip_info_last_blkno, _) = self.postings_serializer.close_term(); - self.current_term_info.skip_info_last_blkno = skip_info_last_blkno; - self.term_info_reader - .write(self.term_id, self.current_term_info); + if Some(i as u32) == max_doc.get(block_count).copied() { + blockwand_tf = freq; + blockwand_fieldnorm_id = self.fieldnorm_reader.read(doc_id); } } - self.term_id += 1; + assert!(block_count == partitions.len()); + self.block_parttion.reset(); + + let mut term_meta_guard = page_alloc(self.index, PageFlags::TERM_META, true); + let term_meta_page = &mut *term_meta_guard; + term_meta_page.header.pd_lower += std::mem::size_of::() as u16; + let term_meta: &mut PostingTermMetaData = term_meta_page.as_mut(); + + let (unflushed_docids, unflushed_term_freqs) = self.postings_serializer.unflushed_data(); + let unfulled_doc_cnt = unflushed_docids.len(); + assert!(unfulled_doc_cnt < 128); + term_meta.unfulled_docid[..unfulled_doc_cnt].copy_from_slice(unflushed_docids); + term_meta.unfulled_freq[..unfulled_doc_cnt].copy_from_slice(unflushed_term_freqs); + term_meta.unfulled_doc_cnt = unfulled_doc_cnt as u32; + if unfulled_doc_cnt != 0 { + block_count += 1; + } + + term_meta.last_full_block_last_docid = + NonZeroU32::new(self.postings_serializer.prev_block_last_doc_id()); + let (skip_info_blkno, skip_info_last_blkno, block_data_blkno) = self + .postings_serializer + .close_term(&bm25_weight, &self.fieldnorm_reader); + term_meta.block_count = block_count.try_into().unwrap(); + term_meta.skip_info_blkno = skip_info_blkno; + term_meta.skip_info_last_blkno = skip_info_last_blkno; + term_meta.block_data_blkno = block_data_blkno; + + self.term_info_serializer.push(PostingTermInfo { + meta_blkno: term_meta_guard.blkno(), + }); } } @@ -176,282 +153,145 @@ impl PostingTermInfoSerializer { } } -struct PostingSerializer { +pub struct PostingSerializer { index: pgrx::pg_sys::Relation, // block encoder - doc_id_encoder: BlockEncoder, - term_freq_encoder: BlockEncoder, - last_doc_id: u32, + block_encode: BlockEncode, + prev_block_last_doc_id: u32, // block buffer - doc_ids: [u32; COMPRESSION_BLOCK_SIZE], - term_freqs: [u32; COMPRESSION_BLOCK_SIZE], - block_size: usize, + doc_ids: Vec, + term_freqs: Vec, // skip info writer skip_info_writer: Option, // block data writer block_data_writer: Option, - is_new_page: bool, - // block wand helper - avg_dl: f32, - doc_cnt: u32, - bm25_weight: Option, - fieldnorm_reader: R, } -impl PostingSerializer { - pub fn new( - index: pgrx::pg_sys::Relation, - doc_cnt: u32, - avg_dl: f32, - fieldnorm_reader: R, - ) -> Self { +impl PostingSerializer { + pub fn new(index: pgrx::pg_sys::Relation) -> Self { Self { index, - doc_id_encoder: BlockEncoder::new(), - term_freq_encoder: BlockEncoder::new(), - last_doc_id: 0, - doc_ids: [0; COMPRESSION_BLOCK_SIZE], - term_freqs: [0; COMPRESSION_BLOCK_SIZE], - block_size: 0, + block_encode: BlockEncode::new(), + prev_block_last_doc_id: 0, + doc_ids: Vec::with_capacity(128), + term_freqs: Vec::with_capacity(128), skip_info_writer: None, block_data_writer: None, - is_new_page: false, - avg_dl, - doc_cnt, - bm25_weight: None, - fieldnorm_reader, } } - pub fn open_append(&mut self, term_info: PostingTermInfo, append_doc_count: u32) { - let doc_count = term_info.doc_count + append_doc_count; - self.block_size = term_info.doc_count as usize % COMPRESSION_BLOCK_SIZE; - if self.block_size != 0 { - let mut skip_info_last_page = page_write(self.index, term_info.skip_info_last_blkno); - let data = skip_info_last_page.data(); - let offset = data.len() - std::mem::size_of::(); - let skip_info_last = - unsafe { (&data[offset..] as *const _ as *const SkipBlock).read() }; - skip_info_last_page.header.pd_lower -= std::mem::size_of::() as u16; - drop(skip_info_last_page); - - let block_size = skip_info_last.block_size(); - let mut decoder = BlockDecoder::new(); - - let mut block_data_writer = - VirtualPageWriter::open(self.index, term_info.block_data_blkno, false); - let data_page = block_data_writer.data_page(); - let data = data_page.data(); - let mut last_block = &data[data.len() - block_size..]; - self.last_doc_id = u32::from_le_bytes(last_block[..4].try_into().unwrap()); - last_block = &last_block[4..]; - let offset = decoder.decompress_vint_sorted( - last_block, - self.last_doc_id, - self.block_size as u32, - ); - self.doc_ids[..self.block_size].copy_from_slice(decoder.output()); - last_block = &last_block[offset..]; - decoder.decompress_vint_unsorted(last_block, self.block_size as u32); - self.term_freqs[..self.block_size].copy_from_slice(decoder.output()); - self.term_freqs[..self.block_size] - .iter_mut() - .for_each(|x| *x += 1); - data_page.header.pd_lower -= block_size as u16; - let is_new_page = data_page.data().is_empty(); - - self.skip_info_writer = Some(PageWriter::open( - self.index, - term_info.skip_info_last_blkno, - false, - )); - self.block_data_writer = Some(block_data_writer); - self.is_new_page = is_new_page; - } else { - let skip_info_last_page = page_read(self.index, term_info.skip_info_last_blkno); - let data = skip_info_last_page.data(); - let offset = data.len() - std::mem::size_of::(); - let skip_info_last = - unsafe { (&data[offset..] as *const _ as *const SkipBlock).read() }; - drop(skip_info_last_page); - self.last_doc_id = skip_info_last.last_doc; - - self.skip_info_writer = Some(PageWriter::open( - self.index, - term_info.skip_info_last_blkno, - false, - )); - self.block_data_writer = Some(VirtualPageWriter::open( - self.index, - term_info.block_data_blkno, - false, - )); - self.is_new_page = false; - } - - let idf = idf(self.doc_cnt, doc_count); - self.bm25_weight = Some(Bm25Weight::new(1, idf, self.avg_dl)); - } - - pub fn new_term(&mut self, doc_count: u32) { + pub fn new_term(&mut self) { self.skip_info_writer = Some(PageWriter::new(self.index, PageFlags::SKIP_INFO, true)); self.block_data_writer = Some(VirtualPageWriter::new( self.index, PageFlags::BLOCK_DATA, true, )); - self.last_doc_id = 0; - self.is_new_page = false; - let idf = idf(self.doc_cnt, doc_count); - self.bm25_weight = Some(Bm25Weight::new(1, idf, self.avg_dl)); + self.prev_block_last_doc_id = 0; } pub fn write_doc(&mut self, doc_id: u32, freq: u32) { - self.doc_ids[self.block_size] = doc_id; - self.term_freqs[self.block_size] = freq; - self.block_size += 1; - if self.block_size == COMPRESSION_BLOCK_SIZE { - self.flush_block(); - } + self.doc_ids.push(doc_id); + self.term_freqs.push(freq); } // return (skip_info_blkno, skip_info_last_blkno, block_data_blkno) - pub fn close_term(&mut self) -> (u32, u32, u32) { - if self.block_size > 0 { - if self.block_size == COMPRESSION_BLOCK_SIZE { - self.flush_block(); - } else { - self.flush_block_unfull(); - } + pub fn close_term( + &mut self, + bm25_weight: &Bm25Weight, + fieldnorm_reader: &R, + ) -> (u32, u32, u32) { + if !self.doc_ids.is_empty() { + let (blockwand_tf, blockwand_fieldnorm_id) = blockwand_max_calculate( + &self.doc_ids, + &self.term_freqs, + bm25_weight, + fieldnorm_reader, + ); + let skip_block = SkipBlock { + last_doc: *self.doc_ids.last().unwrap(), + doc_cnt: self.doc_ids.len().try_into().unwrap(), + blockwand_tf, + size: 0, + blockwand_fieldnorm_id, + flag: SkipBlockFlags::UNFULLED, + }; + self.skip_info_writer + .as_mut() + .unwrap() + .write(bytemuck::bytes_of(&skip_block)); } + let skip_info_last_blkno = self.skip_info_writer.as_ref().unwrap().blkno(); let skip_info_blkno = self.skip_info_writer.take().unwrap().finalize(); let block_data_blkno = self.block_data_writer.take().unwrap().finalize(); - self.bm25_weight = None; + self.doc_ids.clear(); + self.term_freqs.clear(); (skip_info_blkno, skip_info_last_blkno, block_data_blkno) } - fn flush_block(&mut self) { - assert!(self.block_size == COMPRESSION_BLOCK_SIZE); - - let (blockwand_tf, blockwand_fieldnorm_id) = self.block_wand(); + pub fn flush_block(&mut self, blockwand_tf: u32, blockwand_fieldnorm_id: u8) { + let offset = NonZeroU32::new(self.prev_block_last_doc_id); + self.prev_block_last_doc_id = *self.doc_ids.last().unwrap(); + let data = self + .block_encode + .encode(offset, &mut self.doc_ids, &mut self.term_freqs); - // doc_id - let (docid_bits, docid_block) = self - .doc_id_encoder - .compress_block_sorted(&self.doc_ids[..self.block_size], self.last_doc_id); - self.last_doc_id = self.doc_ids[self.block_size - 1]; - - // term_freq - for i in 0..self.block_size { - self.term_freqs[i] -= 1; - } - let (tf_bits, term_freq_block) = self - .term_freq_encoder - .compress_block_unsorted(&self.term_freqs[..self.block_size]); - - let change_page = self + let page_changed = self .block_data_writer .as_mut() .unwrap() - .write_vectorized_no_cross(&[docid_block, term_freq_block]); + .write_vectorized_no_cross(&[data]); let mut flag = SkipBlockFlags::empty(); - if change_page { - flag |= SkipBlockFlags::PAGE_CHANGED; - } - if self.is_new_page { + if page_changed { flag |= SkipBlockFlags::PAGE_CHANGED; - self.is_new_page = false; } + let doc_cnt = self.doc_ids.len().try_into().unwrap(); let skip_block = SkipBlock { - last_doc: self.last_doc_id, - docid_bits, - tf_bits, + last_doc: self.prev_block_last_doc_id, + doc_cnt, blockwand_tf, + size: data.len().try_into().unwrap(), blockwand_fieldnorm_id, flag, }; self.skip_info_writer .as_mut() .unwrap() - .write(bytemuck::cast_slice(&[skip_block])); + .write(bytemuck::bytes_of(&skip_block)); - self.block_size = 0; + self.doc_ids.clear(); + self.term_freqs.clear(); } - fn flush_block_unfull(&mut self) { - assert!(self.block_size > 0); - - let (blockwand_tf, blockwand_fieldnorm_id) = self.block_wand(); - - // doc_id - let docid_block = self - .doc_id_encoder - .compress_vint_sorted(&self.doc_ids[..self.block_size], self.last_doc_id); - let prev_last_doc_id = self.last_doc_id; - self.last_doc_id = self.doc_ids[self.block_size - 1]; - - // term_freq - for i in 0..self.block_size { - self.term_freqs[i] -= 1; - } - let term_freq_block = self - .term_freq_encoder - .compress_vint_unsorted(&self.term_freqs[..self.block_size]); - - let change_page = self - .block_data_writer - .as_mut() - .unwrap() - .write_vectorized_no_cross(&[ - &prev_last_doc_id.to_le_bytes(), - docid_block, - term_freq_block, - ]); - - let block_len = std::mem::size_of::() + docid_block.len() + term_freq_block.len(); - let mut flag = SkipBlockFlags::UNFULLED; - if change_page { - flag |= SkipBlockFlags::PAGE_CHANGED; - } - if self.is_new_page { - flag |= SkipBlockFlags::PAGE_CHANGED; - self.is_new_page = false; - } - let skip_block = SkipBlock { - last_doc: self.last_doc_id, - docid_bits: (block_len >> 8) as u8, - tf_bits: block_len as u8, - blockwand_tf, - blockwand_fieldnorm_id, - flag, - }; - self.skip_info_writer - .as_mut() - .unwrap() - .write(bytemuck::cast_slice(&[skip_block])); + pub fn unflushed_data(&self) -> (&[u32], &[u32]) { + (&self.doc_ids, &self.term_freqs) + } - self.block_size = 0; + pub fn prev_block_last_doc_id(&self) -> u32 { + self.prev_block_last_doc_id } +} - fn block_wand(&self) -> (u32, u8) { - let mut blockwand_tf = MAX_FIELD_NORM; - let mut blockwand_fieldnorm_id = u8::MAX; - let mut blockwand_max = 0.0f32; - let bm25_weight = self.bm25_weight.as_ref().expect("no bm25 weight"); - for i in 0..self.block_size { - let doc_id = self.doc_ids[i]; - let tf = self.term_freqs[i]; - let fieldnorm_id = self.fieldnorm_reader.read(doc_id); - let len = id_to_fieldnorm(fieldnorm_id); - let bm25_score = bm25_weight.score(len, tf); - if bm25_score > blockwand_max { - blockwand_max = bm25_score; - blockwand_tf = tf; - blockwand_fieldnorm_id = fieldnorm_id; - } +fn blockwand_max_calculate( + docids: &[u32], + freqs: &[u32], + bm25_weight: &Bm25Weight, + fieldnorm_reader: &R, +) -> (u32, u8) { + let mut max_score = 0.0; + let mut max_fieldnorm_id = 0; + let mut max_tf = 0; + for (&doc_id, &freq) in docids.iter().zip(freqs.iter()) { + let fieldnorm_id = fieldnorm_reader.read(doc_id); + let fieldnorm = id_to_fieldnorm(fieldnorm_id); + let score = bm25_weight.score(fieldnorm, freq); + if score > max_score { + max_score = score; + max_fieldnorm_id = fieldnorm_id; + max_tf = freq; } - (blockwand_tf, blockwand_fieldnorm_id) } + (max_tf, max_fieldnorm_id) } diff --git a/src/segment/posting/writer.rs b/src/segment/posting/writer.rs index 312e3cf..88f7811 100644 --- a/src/segment/posting/writer.rs +++ b/src/segment/posting/writer.rs @@ -1,8 +1,9 @@ use std::collections::BTreeMap; -use super::InvertedSerialize; use crate::{datatype::Bm25VectorBorrowed, utils::vint}; +use super::serializer::InvertedWrite; + // inverted lists in memory pub struct InvertedWriter { term_index: BTreeMap, @@ -35,19 +36,14 @@ impl InvertedWriter { } } - pub fn serialize(&self, s: &mut I) { + pub fn serialize(&self, s: &mut I) { let mut last_term_id = 0; for (&term_id, recorder) in &self.term_index { for _ in last_term_id..term_id { - s.new_term(0); - s.close_term(); + s.write(None); } - s.new_term(recorder.total_docs); - for (doc_id, tf) in recorder.iter() { - s.write_doc(doc_id, tf); - } - s.close_term(); + s.write(Some(recorder)); last_term_id = term_id + 1; } } @@ -61,7 +57,7 @@ impl InvertedWriter { last_term_id += 1; return Some(0); } - let total_docs = recorder.total_docs; + let total_docs = recorder.doc_cnt(); iter.next(); last_term_id += 1; Some(total_docs) @@ -77,11 +73,11 @@ impl InvertedWriter { } // Store (doc_id, tf) tuples, doc_id is delta encoded -struct TFRecorder { +pub struct TFRecorder { buffer: Vec, current_doc: u32, current_tf: u32, - total_docs: u32, + doc_cnt: u32, } impl TFRecorder { @@ -90,7 +86,7 @@ impl TFRecorder { buffer: Vec::new(), current_doc: u32::MAX, current_tf: 0, - total_docs: 0, + doc_cnt: 0, } } @@ -100,7 +96,7 @@ impl TFRecorder { fn new_doc(&mut self, doc_id: u32) { let delta = doc_id.wrapping_sub(self.current_doc); - self.total_docs += 1; + self.doc_cnt += 1; self.current_doc = doc_id; vint::encode_vint32(delta, &mut self.buffer).unwrap(); } @@ -117,7 +113,7 @@ impl TFRecorder { self.current_tf = 0; } - fn iter(&self) -> impl Iterator + '_ { + pub fn iter(&self) -> impl Iterator + '_ { let mut doc_id = u32::MAX; let mut buffer = self.buffer.as_slice(); std::iter::from_fn(move || { @@ -130,4 +126,8 @@ impl TFRecorder { Some((doc_id, tf)) }) } + + pub fn doc_cnt(&self) -> u32 { + self.doc_cnt + } } diff --git a/src/segment/sealed.rs b/src/segment/sealed.rs index 8eecc82..dbf4d3b 100644 --- a/src/segment/sealed.rs +++ b/src/segment/sealed.rs @@ -3,7 +3,7 @@ use crate::{datatype::Bm25VectorBorrowed, page::VirtualPageWriter}; use super::{ field_norm::FieldNormRead, posting::{ - InvertedSerializer, InvertedWriter, PostingReader, PostingTermInfo, PostingTermInfoReader, + InvertedSerializer, InvertedWriter, PostingCursor, PostingTermInfo, PostingTermInfoReader, }, }; @@ -53,28 +53,16 @@ impl SealedSegmentReader { } } - pub fn get_postings(&self, term_id: u32) -> Option> { + pub fn get_postings(&self, term_id: u32) -> Option { if term_id >= self.term_id_cnt { return None; } let term_info = self.term_info_reader.read(term_id); - if term_info.doc_count == 0 { + if term_info.meta_blkno == pgrx::pg_sys::InvalidBlockNumber { return None; } - Some(PostingReader::new(self.index, term_info)) - } - - pub fn get_postings_docid_only(&self, term_id: u32) -> Option> { - if term_id >= self.term_id_cnt { - return None; - } - - let term_info = self.term_info_reader.read(term_id); - if term_info.doc_count == 0 { - return None; - } - Some(PostingReader::new(self.index, term_info)) + Some(PostingCursor::new(self.index, term_info)) } } diff --git a/src/utils/compress_block.rs b/src/utils/compress_block.rs deleted file mode 100644 index 7570654..0000000 --- a/src/utils/compress_block.rs +++ /dev/null @@ -1,249 +0,0 @@ -use bitpacking::{BitPacker, BitPacker4x}; - -use crate::segment::posting::COMPRESSION_BLOCK_SIZE; - -use super::vint; - -const COMPRESSED_BLOCK_MAX_BYTES: usize = COMPRESSION_BLOCK_SIZE * std::mem::size_of::(); - -pub struct BlockEncoder { - bitpacker: BitPacker4x, - output: [u8; COMPRESSED_BLOCK_MAX_BYTES], -} - -impl BlockEncoder { - pub fn new() -> Self { - Self { - bitpacker: BitPacker4x::new(), - output: [0; COMPRESSED_BLOCK_MAX_BYTES], - } - } - - pub fn compress_block_sorted(&mut self, block: &[u32], offset: u32) -> (u8, &[u8]) { - let offset = if offset == 0u32 { None } else { Some(offset) }; - - let num_bits = self.bitpacker.num_bits_strictly_sorted(offset, block); - let written_size = - self.bitpacker - .compress_strictly_sorted(offset, block, &mut self.output[..], num_bits); - (num_bits, &self.output[..written_size]) - } - - pub fn compress_block_unsorted(&mut self, block: &[u32]) -> (u8, &[u8]) { - let num_bits = self.bitpacker.num_bits(block); - let written_size = self - .bitpacker - .compress(block, &mut self.output[..], num_bits); - (num_bits, &self.output[..written_size]) - } - - pub fn compress_vint_sorted(&mut self, block: &[u32], mut offset: u32) -> &[u8] { - let mut byte_written = 0; - for &v in block { - let mut to_encode: u32 = v - offset; - offset = v; - loop { - let next_byte: u8 = (to_encode % 128u32) as u8; - to_encode /= 128u32; - if to_encode == 0u32 { - self.output[byte_written] = next_byte | 128u8; - byte_written += 1; - break; - } else { - self.output[byte_written] = next_byte; - byte_written += 1; - } - } - } - &self.output[..byte_written] - } - - pub fn compress_vint_unsorted(&mut self, block: &[u32]) -> &[u8] { - let mut byte_written = 0; - for &v in block { - let mut to_encode: u32 = v; - loop { - let next_byte: u8 = (to_encode % 128u32) as u8; - to_encode /= 128u32; - if to_encode == 0u32 { - self.output[byte_written] = next_byte | 128u8; - byte_written += 1; - break; - } else { - self.output[byte_written] = next_byte; - byte_written += 1; - } - } - } - &self.output[..byte_written] - } -} - -pub struct BlockDecoder { - bitpacker: BitPacker4x, - output: [u32; COMPRESSION_BLOCK_SIZE], - len: usize, -} - -impl BlockDecoder { - pub fn new() -> Self { - Self { - bitpacker: BitPacker4x::new(), - output: [0; COMPRESSION_BLOCK_SIZE], - len: 0, - } - } - - pub fn decompress_block_sorted(&mut self, block: &[u8], num_bits: u8, offset: u32) -> usize { - let offset = if offset == 0u32 { None } else { Some(offset) }; - self.len = COMPRESSION_BLOCK_SIZE; - self.bitpacker - .decompress_strictly_sorted(offset, block, &mut self.output, num_bits) - } - - pub fn decompress_block_unsorted(&mut self, block: &[u8], num_bits: u8) -> usize { - self.len = COMPRESSION_BLOCK_SIZE; - self.bitpacker.decompress(block, &mut self.output, num_bits) - } - - pub fn decompress_vint_sorted(&mut self, mut block: &[u8], offset: u32, count: u32) -> usize { - let count = count as usize; - let start = block; - self.len = count; - let mut res = offset; - for i in 0..count { - res += vint::decode_vint32(&mut block); - self.output[i] = res; - } - block.as_ptr() as usize - start.as_ptr() as usize - } - - pub fn decompress_vint_unsorted(&mut self, mut block: &[u8], count: u32) -> usize { - let count = count as usize; - let start = block; - self.len = count; - for i in 0..count { - self.output[i] = vint::decode_vint32(&mut block); - } - block.as_ptr() as usize - start.as_ptr() as usize - } - - pub fn output(&self) -> &[u32] { - &self.output[..self.len] - } - - pub fn output_mut(&mut self) -> &mut [u32] { - &mut self.output[..self.len] - } -} - -pub fn compressed_block_size(num_bits: u8) -> usize { - (num_bits as usize) * COMPRESSION_BLOCK_SIZE / 8 -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_block_encoder() { - let mut encoder = BlockEncoder::new(); - let block = (0..128).collect::>(); - let (num_bits, compressed) = encoder.compress_block_sorted(&block, 0); - let mut decoder = BlockDecoder::new(); - let bytes = decoder.decompress_block_sorted(compressed, num_bits, 0); - assert_eq!(decoder.output(), block); - assert_eq!(bytes, compressed.len()); - } - - #[test] - fn test_block_encoder_unsorted() { - let mut encoder = BlockEncoder::new(); - let block = (0..128).collect::>(); - let (num_bits, compressed) = encoder.compress_block_unsorted(&block); - let mut decoder = BlockDecoder::new(); - let bytes = decoder.decompress_block_unsorted(compressed, num_bits); - assert_eq!(decoder.output(), block); - assert_eq!(bytes, compressed.len()); - } - - #[test] - fn test_block_encoder_vint_sorted() { - let mut encoder = BlockEncoder::new(); - let block = (0..100).collect::>(); - let compressed = encoder.compress_vint_sorted(&block, 0); - let mut decoder = BlockDecoder::new(); - let bytes = decoder.decompress_vint_sorted(compressed, 0, 100); - assert_eq!(decoder.output(), block); - assert_eq!(bytes, compressed.len()); - } - - #[test] - fn test_block_encoder_vint_unsorted() { - let mut encoder = BlockEncoder::new(); - let block = (0..100).collect::>(); - let compressed = encoder.compress_vint_unsorted(&block); - let mut decoder = BlockDecoder::new(); - let bytes = decoder.decompress_vint_unsorted(compressed, 100); - assert_eq!(decoder.output(), block); - assert_eq!(bytes, compressed.len()); - } - - #[test] - fn test_block_encoder_random() { - let mut encoder = BlockEncoder::new(); - let mut block = rand::seq::index::sample(&mut rand::thread_rng(), 100000, 128) - .into_iter() - .map(|i| i as u32) - .collect::>(); - block.sort_unstable(); - let (num_bits, compressed) = encoder.compress_block_sorted(&block, 0); - let mut decoder = BlockDecoder::new(); - let bytes = decoder.decompress_block_sorted(compressed, num_bits, 0); - assert_eq!(decoder.output(), block); - assert_eq!(bytes, compressed.len()); - } - - #[test] - fn test_block_encoder_unsorted_random() { - let mut encoder = BlockEncoder::new(); - let block = rand::seq::index::sample(&mut rand::thread_rng(), 100000, 128) - .into_iter() - .map(|i| i as u32) - .collect::>(); - let (num_bits, compressed) = encoder.compress_block_unsorted(&block); - let mut decoder = BlockDecoder::new(); - let bytes = decoder.decompress_block_unsorted(compressed, num_bits); - assert_eq!(decoder.output(), block); - assert_eq!(bytes, compressed.len()); - } - - #[test] - fn test_block_encoder_vint_sorted_random() { - let mut encoder = BlockEncoder::new(); - let mut block = rand::seq::index::sample(&mut rand::thread_rng(), 100000, 100) - .into_iter() - .map(|i| i as u32) - .collect::>(); - block.sort_unstable(); - let compressed = encoder.compress_vint_sorted(&block, 0); - let mut decoder = BlockDecoder::new(); - let bytes = decoder.decompress_vint_sorted(compressed, 0, 100); - assert_eq!(decoder.output(), block); - assert_eq!(bytes, compressed.len()); - } - - #[test] - fn test_block_encoder_vint_unsorted_random() { - let mut encoder = BlockEncoder::new(); - let block = rand::seq::index::sample(&mut rand::thread_rng(), 100000, 100) - .into_iter() - .map(|i| i as u32) - .collect::>(); - let compressed = encoder.compress_vint_unsorted(&block); - let mut decoder = BlockDecoder::new(); - let bytes = decoder.decompress_vint_unsorted(compressed, 100); - assert_eq!(decoder.output(), block); - assert_eq!(bytes, compressed.len()); - } -} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 8ca9de1..cb459de 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,5 +1,4 @@ pub mod cells; -pub mod compress_block; pub mod loser_tree; pub mod topk_computer; pub mod vint; From 908eb62113a7bfd0826532e42acb716ad06f72d0 Mon Sep 17 00:00:00 2001 From: Mingzhuo Yin Date: Fri, 24 Jan 2025 12:33:55 +0800 Subject: [PATCH 2/2] feat: support custom stopwords Signed-off-by: Mingzhuo Yin --- Cargo.toml | 6 +++ src/sql/tokenizer.sql | 14 +++-- src/token.rs | 68 ++++++++++++++++++------ tests/sqllogictest/tokenizer.slt | 2 +- tests/sqllogictest/unicode_tokenizer.slt | 2 +- tokenizer.md | 14 ++--- 6 files changed, 76 insertions(+), 30 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3f65070..f07c2f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,8 @@ name = "pgrx_embed_vchord_bm25" path = "./src/bin/pgrx_embed.rs" [features] +pg12 = ["pgrx/pg12"] +pg13 = ["pgrx/pg13"] pg14 = ["pgrx/pg14"] pg15 = ["pgrx/pg15"] pg16 = ["pgrx/pg16"] @@ -34,6 +36,7 @@ tantivy-stemmers = { version = "0.4.0", features = [ thiserror = "2" tokenizers = { version = "0.20", default-features = false, features = ["onig"] } +bincode = "1.3.3" generator = "0.8.4" lending-iterator = "0.1.7" serde = { version = "1.0.217", features = ["derive"] } @@ -59,3 +62,6 @@ codegen-units = 8 missing_safety_doc = "allow" new_without_default = "allow" not_unsafe_ptr_arg_deref = "allow" + +[lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(pgrx_embed)'] } diff --git a/src/sql/tokenizer.sql b/src/sql/tokenizer.sql index cbbc753..fc4a486 100644 --- a/src/sql/tokenizer.sql +++ b/src/sql/tokenizer.sql @@ -1,6 +1,6 @@ CREATE TABLE bm25_catalog.tokenizers ( name TEXT NOT NULL UNIQUE PRIMARY KEY, - config TEXT NOT NULL + config BYTEA NOT NULL ); CREATE FUNCTION unicode_tokenizer_insert_trigger() @@ -10,8 +10,12 @@ DECLARE target_column TEXT := TG_ARGV[1]; BEGIN EXECUTE format(' - WITH new_tokens AS ( - SELECT unnest(unicode_tokenizer_split($1.%I)) AS token + WITH + config AS ( + SELECT config FROM bm25_catalog.tokenizers WHERE name = %L + ), + new_tokens AS ( + SELECT unnest(unicode_tokenizer_split($1.%I, config)) AS token FROM config ), to_insert AS ( SELECT token FROM new_tokens @@ -19,7 +23,7 @@ BEGIN SELECT 1 FROM bm25_catalog.%I WHERE token = new_tokens.token ) ) - INSERT INTO bm25_catalog.%I (token) SELECT token FROM to_insert ON CONFLICT (token) DO NOTHING', target_column, tokenizer_name, tokenizer_name) USING NEW; + INSERT INTO bm25_catalog.%I (token) SELECT token FROM to_insert ON CONFLICT (token) DO NOTHING', tokenizer_name, target_column, tokenizer_name, tokenizer_name) USING NEW; RETURN NEW; END; $$ LANGUAGE plpgsql; @@ -28,7 +32,7 @@ CREATE FUNCTION create_unicode_tokenizer_and_trigger(tokenizer_name TEXT, table_ RETURNS VOID AS $body$ BEGIN EXECUTE format('SELECT create_tokenizer(%L, $$ - tokenizer = ''Unicode'' + tokenizer = ''unicode'' table = %L column = %L $$)', tokenizer_name, table_name, source_column); diff --git a/src/token.rs b/src/token.rs index 165ae94..2d66d47 100644 --- a/src/token.rs +++ b/src/token.rs @@ -18,17 +18,22 @@ const TOKEN_PATTERN: &str = r"(?u)\b\w\w+\b"; lazy_static::lazy_static! { static ref TOKEN_PATTERN_RE: regex::Regex = regex::Regex::new(TOKEN_PATTERN).unwrap(); - pub static ref STOP_WORDS_LUCENE: HashSet = { + static ref STOP_WORDS_LUCENE: HashSet = { [ "a", "an", "and", "are", "as", "at", "be", "but", "by", "for", "if", "in", "into", "is", "it", "no", "not", "of", "on", "or", "such", "that", "the", "their", "then", "there", "these", "they", "this", "to", "was", "will", "with", ].iter().map(|s| s.to_string()).collect() }; - pub static ref STOP_WORDS_NLTK: HashSet = { + static ref STOP_WORDS_NLTK: HashSet = { let words = stop_words::get(stop_words::LANGUAGE::English); words.into_iter().collect() }; + static ref STOP_WORDS_LUCENE_PLUS_NLTK: HashSet = { + let mut words = STOP_WORDS_LUCENE.clone(); + words.extend(STOP_WORDS_NLTK.iter().cloned()); + words + }; static ref BERT_TOKENIZER: BertWithStemmerAndSplit = BertWithStemmerAndSplit::new(); static ref TOCKENIZER: Tocken = Tocken::new(); @@ -73,7 +78,12 @@ impl Tocken { } #[pgrx::pg_extern(immutable, strict, parallel_safe)] -pub fn unicode_tokenizer_split(text: &str) -> Vec { +pub fn unicode_tokenizer_split(text: &str, config: &[u8]) -> Vec { + let config: TokenizerConfig = bincode::deserialize(config).unwrap_or_report(); + unicode_tokenizer_split_inner(text, &config) +} + +fn unicode_tokenizer_split_inner(text: &str, config: &TokenizerConfig) -> Vec { let mut tokens = Vec::new(); for word in text.unicode_words() { // trim `'s` for English @@ -89,10 +99,12 @@ pub fn unicode_tokenizer_split(text: &str) -> Vec { if token.is_empty() { continue; } - if !STOP_WORDS_LUCENE.contains(&lowercase) { - tokens.push(token.clone()); - } - if !STOP_WORDS_NLTK.contains(&lowercase) { + let stopwords = match config.stopwords { + StopWordsKind::Lucene => &*STOP_WORDS_LUCENE, + StopWordsKind::Nltk => &*STOP_WORDS_NLTK, + StopWordsKind::LucenePlusNltk => &*STOP_WORDS_LUCENE_PLUS_NLTK, + }; + if !stopwords.contains(&token) { tokens.push(token); } } @@ -100,18 +112,28 @@ pub fn unicode_tokenizer_split(text: &str) -> Vec { } #[derive(Clone, Copy, Serialize, Deserialize)] -#[repr(i32)] +#[serde(rename_all = "snake_case")] enum TokenizerKind { Bert, Tocken, Unicode, } +#[derive(Clone, Copy, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +enum StopWordsKind { + Lucene, + Nltk, + LucenePlusNltk, +} + #[derive(Clone, Serialize, Deserialize, Validate)] #[validate(schema(function = "TokenizerConfig::validate_unicode"))] #[serde(deny_unknown_fields)] struct TokenizerConfig { tokenizer: TokenizerKind, + #[serde(default = "TokenizerConfig::default_stopwords")] + stopwords: StopWordsKind, #[serde(default)] table: Option, #[serde(default)] @@ -119,6 +141,10 @@ struct TokenizerConfig { } impl TokenizerConfig { + fn default_stopwords() -> StopWordsKind { + StopWordsKind::LucenePlusNltk + } + fn validate_unicode(&self) -> Result<(), ValidationError> { if !matches!(self.tokenizer, TokenizerKind::Unicode) { return Ok(()); @@ -161,7 +187,10 @@ pub fn create_tokenizer(tokenizer_name: &str, config_str: &str) { pgrx::PgBuiltInOids::TEXTOID.oid(), tokenizer_name.into_datum(), ), - (pgrx::PgBuiltInOids::TEXTOID.oid(), config_str.into_datum()), + ( + pgrx::PgBuiltInOids::BYTEAOID.oid(), + bincode::serialize(&config).unwrap().into_datum(), + ), ]); client.update(query, None, args).unwrap_or_report(); if matches!(config.tokenizer, TokenizerKind::Unicode) { @@ -187,13 +216,13 @@ fn drop_tokenizer(tokenizer_name: &str) { panic!("Tokenizer not found"); } - let config: &str = rows + let config: &[u8] = rows .next() .unwrap() .get(1) .expect("no config value") .expect("no config value"); - let config: TokenizerConfig = toml::from_str(config).unwrap_or_report(); + let config: TokenizerConfig = bincode::deserialize(config).unwrap_or_report(); if matches!(config.tokenizer, TokenizerKind::Unicode) { let table_name = format!("bm25_catalog.\"{}\"", tokenizer_name); let drop_table = format!("DROP TABLE IF EXISTS {}", table_name); @@ -272,7 +301,7 @@ fn create_unicode_tokenizer_table( let mut tokens = HashSet::new(); for row in rows { let text: &str = row.get(1).unwrap_or_report().expect("no text value"); - let words = unicode_tokenizer_split(text); + let words = unicode_tokenizer_split_inner(text, config); tokens.extend(words); } @@ -303,8 +332,13 @@ fn create_unicode_tokenizer_table( client.update(&trigger, None, None).unwrap_or_report(); } -fn unicode_tokenize(client: &SpiClient<'_>, text: &str, tokenizer_name: &str) -> Vec { - let tokens = unicode_tokenizer_split(text); +fn unicode_tokenize( + client: &SpiClient<'_>, + text: &str, + tokenizer_name: &str, + config: &TokenizerConfig, +) -> Vec { + let tokens = unicode_tokenizer_split_inner(text, config); let query = format!( "SELECT id, token FROM bm25_catalog.\"{}\" WHERE token = ANY($1)", tokenizer_name @@ -351,17 +385,17 @@ fn custom_tokenize(text: &str, tokenizer_name: &str) -> Vec { panic!("Tokenizer not found"); } - let config: &str = rows + let config: &[u8] = rows .next() .unwrap() .get(1) .expect("no config value") .expect("no config value"); - let config: TokenizerConfig = toml::from_str(config).unwrap_or_report(); + let config: TokenizerConfig = bincode::deserialize(config).unwrap_or_report(); match config.tokenizer { TokenizerKind::Bert => BERT_TOKENIZER.encode(text), TokenizerKind::Tocken => TOCKENIZER.encode(text), - TokenizerKind::Unicode => unicode_tokenize(&client, text, tokenizer_name), + TokenizerKind::Unicode => unicode_tokenize(&client, text, tokenizer_name, &config), } }) } diff --git a/tests/sqllogictest/tokenizer.slt b/tests/sqllogictest/tokenizer.slt index 90314f4..70eb7e0 100644 --- a/tests/sqllogictest/tokenizer.slt +++ b/tests/sqllogictest/tokenizer.slt @@ -13,7 +13,7 @@ SELECT tokenize('PostgreSQL', 'Tocken'); statement ok SELECT create_tokenizer('test_bert', $$ -tokenizer = "Bert" +tokenizer = "bert" $$); query I diff --git a/tests/sqllogictest/unicode_tokenizer.slt b/tests/sqllogictest/unicode_tokenizer.slt index a27fd1b..34e2fab 100644 --- a/tests/sqllogictest/unicode_tokenizer.slt +++ b/tests/sqllogictest/unicode_tokenizer.slt @@ -21,7 +21,7 @@ INSERT INTO documents (passage) VALUES statement ok SELECT create_tokenizer('documents_tokenizer', $$ -tokenizer = 'Unicode' +tokenizer = 'unicode' table = 'documents' column = 'passage' $$); diff --git a/tokenizer.md b/tokenizer.md index d403384..5e4353c 100644 --- a/tokenizer.md +++ b/tokenizer.md @@ -40,7 +40,8 @@ SELECT id, text, embedding <&> to_bm25query('corpus_embedding_bm25', 'PostgreSQL CREATE TABLE corpus (id SERIAL, text TEXT, embedding bm25vector); INSERT INTO corpus (text) VALUES ('PostgreSQL is a powerful, open-source object-relational database system.'); -- insert text to the table SELECT create_tokenizer('test_token', $$ -tokenizer = 'Unicode' +tokenizer = 'unicode' +stopwords = 'lucene_plus_nltk' table = 'corpus' column = 'text' $$); @@ -58,11 +59,12 @@ We utilize [`TOML`](https://toml.io/en/) to configure the tokenizer. You can spe Here is what each field means: -| Field | Type | Description | -| --------- | ------ | ---------------------------------------------------- | -| tokenizer | String | The tokenizer type (`Bert`, `Tocken`, or `Unicode`). | -| table | String | The table name to train on for Unicode tokenizer. | -| column | String | The column name to train on for Unicode tokenizer. | +| Field | Type | Description | +| --------- | ------ | ------------------------------------------------------------------------------------------------------------------ | +| tokenizer | String | The tokenizer type (`bert`, `tocken`, or `unicode`). | +| stopwords | String | The stopwords used for Unicode tokenizer (`lucene`, `nltk`, or `lucene_plus_nltk`), default is `lucene_plus_nltk`. | +| table | String | The table name to train on for Unicode tokenizer. | +| column | String | The column name to train on for Unicode tokenizer. | ## Note