From b016dca552d31f4c93a5b5653747b533d416dd34 Mon Sep 17 00:00:00 2001 From: tmontaigu Date: Wed, 1 Nov 2023 12:25:23 +0100 Subject: [PATCH] Improve dbase::File performances --- src/field/mod.rs | 88 ++++++++++-- src/file.rs | 353 +++++++++++++++++++++++++++++++++++------------ src/lib.rs | 2 +- src/memo.rs | 11 +- src/reading.rs | 10 +- 5 files changed, 363 insertions(+), 101 deletions(-) diff --git a/src/field/mod.rs b/src/field/mod.rs index f0ce300..53f0160 100644 --- a/src/field/mod.rs +++ b/src/field/mod.rs @@ -1,5 +1,7 @@ use std::convert::TryFrom; use std::io::{Read, Write}; +use std::ops::Index; +use std::slice::SliceIndex; use byteorder::{ReadBytesExt, WriteBytesExt}; @@ -155,6 +157,67 @@ impl FieldInfo { } } +pub struct FieldsInfo { + pub(crate) inner: Vec, +} + +impl FieldsInfo { + pub(crate) fn read_from(source: &mut R, num_fields: usize) -> Result { + let mut fields_info = Vec::::with_capacity(num_fields); + for _ in 0..num_fields { + let info = FieldInfo::read_from(source)?; + fields_info.push(info); + } + + Ok(Self { inner: fields_info }) + } + + pub(crate) fn field_position_in_record(&self, index: usize) -> Option { + self.inner + .get(..index) + .map(|slc| slc.iter().map(|i| i.field_length as usize).sum::()) + .map(|s| s + DELETION_FLAG_SIZE) + } + + pub(crate) fn size_of_all_fields(&self) -> usize { + self.inner + .iter() + .map(|i| i.field_length as usize) + .sum::() + } + + pub(crate) fn at_least_one_field_is_memo(&self) -> bool { + self.inner + .iter() + .any(|f_info| f_info.field_type == FieldType::Memo) + } + + pub fn len(&self) -> usize { + self.inner.len() + } + + pub fn iter(&self) -> std::slice::Iter<'_, FieldInfo> { + self.inner.iter() + } +} + +impl AsRef<[FieldInfo]> for FieldsInfo { + fn as_ref(&self) -> &[FieldInfo] { + &self.inner + } +} + +impl Index for FieldsInfo +where + I: SliceIndex<[FieldInfo]>, +{ + type Output = I::Output; + + fn index(&self, index: I) -> &Self::Output { + &self.inner.as_slice()[index] + } +} + impl std::fmt::Display for FieldInfo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( @@ -172,21 +235,28 @@ pub(crate) enum DeletionFlag { } impl DeletionFlag { - pub(crate) fn read_from(source: &mut T) -> std::io::Result { - let byte = source.read_u8()?; + pub(crate) const fn to_byte(self) -> u8 { + match self { + Self::NotDeleted => 0x20, + Self::Deleted => 0x2A, + } + } + + pub(crate) const fn from_byte(byte: u8) -> Self { match byte { - 0x20 => Ok(Self::NotDeleted), - 0x2A => Ok(Self::Deleted), + 0x20 => Self::NotDeleted, + 0x2A => Self::Deleted, // Silently consider other values as not deleted - _ => Ok(Self::NotDeleted), + _ => Self::NotDeleted, } } + pub(crate) fn read_from(source: &mut T) -> std::io::Result { + source.read_u8().map(Self::from_byte) + } + pub(crate) fn write_to(self, dst: &mut T) -> std::io::Result<()> { - match self { - Self::NotDeleted => dst.write_u8(0x20), - Self::Deleted => dst.write_u8(0x2A), - } + dst.write_u8(self.to_byte()) } } /// Flags describing a field diff --git a/src/file.rs b/src/file.rs index 7f1d7a7..28876a3 100644 --- a/src/file.rs +++ b/src/file.rs @@ -1,7 +1,7 @@ use crate::encoding::DynEncoding; -use crate::field::types::TrimOption; -use crate::field::{DeletionFlag, DELETION_FLAG_SIZE}; +use crate::field::{DeletionFlag, FieldsInfo, DELETION_FLAG_SIZE}; use crate::header::Header; +use crate::memo::MemoReader; use crate::reading::{ReadingOptions, BACKLINK_SIZE, TERMINATOR_VALUE}; use crate::writing::{write_header_parts, WritableAsDbaseField}; use crate::ErrorKind::UnsupportedCodePage; @@ -85,12 +85,15 @@ impl<'a, T> FieldRef<'a, T> { .header .record_position(self.record_index.0) .unwrap() as u64; - let position_in_record = self.file.fields_info[..self.field_index.0] - .iter() - .map(|i| i.field_length as u64) - .sum::(); - record_position + position_in_record + DELETION_FLAG_SIZE as u64 + record_position + self.position_in_record() as u64 + } + + fn position_in_record(&self) -> usize { + self.file + .fields_info + .field_position_in_record(self.field_index.0) + .expect("internal error: invalid field index") } } @@ -98,7 +101,7 @@ impl<'a, T> FieldRef<'a, T> where T: Seek, { - pub(crate) fn seek_to_beginning(&mut self) -> Result { + fn seek_to_beginning(&mut self) -> Result { let field_info = &self.file.fields_info[self.field_index.0]; self.file @@ -114,25 +117,21 @@ where { /// Reads and returns the value pub fn read(&mut self) -> Result { - self.seek_to_beginning() - .map_err(|e| Error::new(e, self.record_index.0))?; + self.file + .ensure_record_has_been_read_into_buffer(self.record_index)?; let field_info = &self.file.fields_info[self.field_index.0]; - let buffer = &mut self.file.field_data_buffer[..field_info.field_length as usize]; - self.file.inner.read(buffer).map_err(|e| { - Error::new( - FieldIOError::new(ErrorKind::IoError(e), Some(field_info.clone())), - self.record_index.0, - ) - })?; + let start_pos = self.position_in_record(); + let field_bytes = &mut self.file.record_data_buffer.get_mut() + [start_pos..start_pos + field_info.field_length as usize]; - FieldValue::read_from::>, _>( - &buffer, - &mut None, + FieldValue::read_from( + field_bytes, + &mut self.file.memo_reader, field_info, &self.file.encoding, - TrimOption::BeginEnd, + self.file.options.character_trim, ) .map_err(|e| { Error::new( @@ -170,14 +169,21 @@ where where ValueType: WritableAsDbaseField, { - self.seek_to_beginning() + self.file.file_position = self + .seek_to_beginning() .map_err(|e| Error::new(e, self.record_index.0))?; let field_info = &self.file.fields_info[self.field_index.0]; - let buffer = &mut self.file.field_data_buffer[..field_info.field_length as usize]; - buffer.fill(0); - let mut cursor = Cursor::new(buffer); + let start_pos = self.position_in_record(); + let field_bytes = &mut self.file.record_data_buffer.get_mut() + [start_pos..start_pos + field_info.field_length as usize]; + field_bytes.fill(0); + + // Note that since we modify the internal buffer, we don't need to re-read the + // record / buffer, meaning if a user writes then reads it should get correct + // value, and we did not re-read from file. + let mut cursor = Cursor::new(field_bytes); value .write_as(field_info, &self.file.encoding, &mut cursor) .map_err(|e| { @@ -188,13 +194,16 @@ where })?; let buffer = cursor.into_inner(); - self.file.inner.write_all(&buffer).map_err(|e| { + + self.file.inner.write_all(buffer).map_err(|e| { Error::new( FieldIOError::new(ErrorKind::IoError(e), Some(field_info.clone())), self.record_index.0, ) })?; + self.file.file_position += buffer.len() as u64; + Ok(()) } } @@ -240,15 +249,6 @@ impl<'a, T> RecordRef<'a, T> where T: Seek, { - pub fn seek_to_beginning(&mut self) -> Result { - self.file - .inner - .seek(SeekFrom::Start( - self.position_in_source() + DELETION_FLAG_SIZE as u64, - )) - .map_err(|e| FieldIOError::new(ErrorKind::IoError(e), None)) - } - pub fn seek_before_deletion_flag(&mut self) -> Result { self.file .inner @@ -266,33 +266,57 @@ where /// - true -> the record is marked as deleted /// - false -> the record is **not** marked as deleted pub fn is_deleted(&mut self) -> Result { - let deletion_flag_pos = self.position_in_source(); self.file - .inner - .seek(SeekFrom::Start(deletion_flag_pos)) - .map_err(|error| Error::io_error(error, self.index.0))?; - - let deletion_flag = DeletionFlag::read_from(&mut self.file.inner) - .map_err(|error| Error::io_error(error, self.index.0))?; + .ensure_record_has_been_read_into_buffer(self.index)?; + let deletion_flag = DeletionFlag::from_byte(self.file.record_data_buffer.get_ref()[0]); Ok(deletion_flag == DeletionFlag::Deleted) } + /// reads a field from the record + /// + /// Shortcut for `.field(index).unwrap().read().unwrap();` + pub fn read_field(&mut self, field_index: FieldIndex) -> Result { + let record_index = self.index.0; + let mut field = self + .field(field_index) + .ok_or_else(|| Error::new(FieldIOError::end_of_record(), record_index))?; + field.read() + } + + /// reads a field from the record + /// + /// Shortcut for `.field(index).unwrap().read_as().unwrap();` + pub fn read_field_as(&mut self, field_index: FieldIndex) -> Result + where + ValueType: TryFrom, + { + let record_index = self.index.0; + let mut field = self + .field(field_index) + .ok_or_else(|| Error::new(FieldIOError::end_of_record(), record_index))?; + field.read_as() + } + + /// Reads the record pub fn read(&mut self) -> Result { self.read_as() } + /// Reads the record as the given type pub fn read_as(&mut self) -> Result where R: ReadableRecord, { - self.seek_to_beginning() - .map_err(|error| Error::new(error, self.index.0))?; - - let mut field_iterator = FieldIterator::<_, Cursor>> { - source: &mut self.file.inner, + self.file + .ensure_record_has_been_read_into_buffer(self.index)?; + self.file + .record_data_buffer + .set_position(DELETION_FLAG_SIZE as u64); + let mut field_iterator = FieldIterator { + source: &mut self.file.record_data_buffer, fields_info: self.file.fields_info.iter().peekable(), - memo_reader: &mut None, + memo_reader: &mut self.file.memo_reader, field_data_buffer: &mut self.file.field_data_buffer, encoding: &self.file.encoding, options: self.file.options, @@ -306,29 +330,60 @@ impl<'a, T> RecordRef<'a, T> where T: Write + Seek, { + /// writes a field to the record + /// + /// Shortcut for `.field(index).unwrap().write(&value).unwrap();` + pub fn write_field( + &mut self, + field_index: FieldIndex, + value: &ValueType, + ) -> Result<(), Error> + where + ValueType: WritableAsDbaseField, + { + let record_index = self.index.0; + let mut field = self + .field(field_index) + .ok_or_else(|| Error::new(FieldIOError::end_of_record(), record_index))?; + field.write(value) + } + /// Writes the content of `record` ath the position /// pointed by `self`. pub fn write(&mut self, record: &R) -> Result<(), Error> where R: WritableRecord, { - self.seek_before_deletion_flag() - .map_err(|error| Error::new(error, self.index.0))?; + self.file.record_data_buffer.get_mut().fill(0); + self.file.record_data_buffer.get_mut()[0] = DeletionFlag::NotDeleted.to_byte(); + self.file.record_data_buffer.set_position(1); let mut field_writer = FieldWriter { - dst: &mut self.file.inner, + dst: &mut self.file.record_data_buffer, fields_info: self.file.fields_info.iter().peekable(), field_buffer: &mut Cursor::new(&mut self.file.field_data_buffer), encoding: &self.file.encoding, }; - field_writer - .write_deletion_flag() - .map_err(|error| Error::io_error(error, self.index.0))?; - record .write_using(&mut field_writer) - .map_err(|error| Error::new(error, self.index.0)) + .map_err(|error| Error::new(error, self.index.0))?; + + self.seek_before_deletion_flag() + .map_err(|error| Error::new(error, self.index.0))?; + + self.file + .inner + .write_all(self.file.record_data_buffer.get_ref()) + .map_err(|error| Error::io_error(error, self.index.0))?; + + // We don't need to update the file's inner position as we re-wrote the whole record + debug_assert_eq!( + self.file.file_position, + self.file.inner.stream_position().unwrap() + ); + + Ok(()) } } @@ -338,20 +393,18 @@ pub struct FileRecordIterator<'a, T> { current_record: RecordIndex, } -impl<'a, T> FileRecordIterator<'a, T> { +impl<'a, T> FileRecordIterator<'a, T> +where + T: Seek + Read, +{ // To implement iterator we need the Iterator trait to make use of GATs // which is not the case, to iteration will have to use the while let Some() pattern pub fn next<'s>(&'s mut self) -> Option> { - if self.current_record.0 >= self.file.header.num_records as usize { - None - } else { - let r = RecordRef { - file: &mut self.file, - index: self.current_record, - }; - self.current_record.0 += 1; - Some(r) + let record_ref = self.file.record(self.current_record.0); + if let Some(_) = record_ref { + self.current_record.0 += 1 } + record_ref } } @@ -396,22 +449,30 @@ impl<'a, T> FileRecordIterator<'a, T> { /// ``` pub struct File { pub(crate) inner: T, + memo_reader: Option>, pub(crate) header: Header, - pub(crate) fields_info: Vec, + pub(crate) fields_info: FieldsInfo, pub(crate) encoding: DynEncoding, + /// Buffer that contains a whole record worth of data + /// It also contains the deletion flag + record_data_buffer: Cursor>, /// Non-Memo field length is stored on a u8, /// so fields cannot exceed 255 bytes field_data_buffer: [u8; 255], pub(crate) options: ReadingOptions, + /// We track the position in the file + /// to avoid calling `seek` when we are reading buffer + /// in order (0, 1, 2, etc) + file_position: u64, } impl File { /// Returns the information about fields present in the records pub fn fields(&self) -> &[FieldInfo] { - self.fields_info.as_slice() + self.fields_info.as_ref() } - /// Returns the field infex that corresponds to the given name + /// Returns the field index that corresponds to the given name pub fn field_index(&self, name: &str) -> Option { self.fields_info .iter() @@ -430,9 +491,10 @@ impl File { } impl File { - /// creates of File using source as the storate space. + /// creates of File using source as the storage space. pub fn open(mut source: T) -> Result { - let header = Header::read_from(&mut source).map_err(|error| Error::io_error(error, 0))?; + let mut header = + Header::read_from(&mut source).map_err(|error| Error::io_error(error, 0))?; let offset = if header.file_type.is_visual_fox_pro() { if BACKLINK_SIZE > header.offset_to_first_record { @@ -445,15 +507,12 @@ impl File { let num_fields = (offset as usize - Header::SIZE - std::mem::size_of::()) / FieldInfo::SIZE; - let mut fields_info = Vec::::with_capacity(num_fields as usize); - for _ in 0..num_fields { - let info = FieldInfo::read_from(&mut source).map_err(|error| Error { + let fields_info = + FieldsInfo::read_from(&mut source, num_fields).map_err(|error| Error { record_num: 0, field: None, kind: error, })?; - fields_info.push(info); - } let terminator = source .read_u8() @@ -469,14 +528,24 @@ impl File { let field_error = FieldIOError::new(UnsupportedCodePage(header.code_page_mark), None); Error::new(field_error, 0) })?; + + let record_size: usize = DELETION_FLAG_SIZE + fields_info.size_of_all_fields(); + let record_data_buffer = Cursor::new(vec![0u8; record_size]); + // Some file seems not to include the DELETION_FLAG_SIZE into the record size, + // but we rely on it + header.size_of_record = record_size as u16; + // debug_assert_eq!(record_size - DELETION_FLAG_SIZE, header.size_of_record as usize); + Ok(Self { inner: source, - // memo_reader: None, + memo_reader: None, header, fields_info, encoding, + record_data_buffer, field_data_buffer: [0u8; 255], options: ReadingOptions::default(), + file_position: header.offset_to_first_record as u64, }) } @@ -487,10 +556,11 @@ impl File { if index >= self.header.num_records as usize { None } else { - Some(RecordRef { + let record_ref = RecordRef { file: self, index: RecordIndex(index), - }) + }; + Some(record_ref) } } @@ -503,19 +573,65 @@ impl File { current_record: RecordIndex(0), } } + + /// Returns true if it read from the source, false otherwise (used in tests). + fn ensure_record_has_been_read_into_buffer( + &mut self, + record_index: RecordIndex, + ) -> Result { + let record_ref = RecordRef { + file: self, + index: record_index, + }; + let start_of_record_pos = record_ref.position_in_source(); + let end_of_record_pos = start_of_record_pos + u64::from(self.header.size_of_record); + + if self.file_position > start_of_record_pos && self.file_position <= end_of_record_pos { + // If pos is in this range, then the record was already read into the buffer + return Ok(false); + } + + if start_of_record_pos != self.file_position { + // Only call seek of the record we need to read + // is the just after the one we read last + self.file_position = self + .inner + .seek(SeekFrom::Start(start_of_record_pos)) + .map_err(|e| Error::io_error(e, record_index.0))?; + } + + self.inner + .read_exact(self.record_data_buffer.get_mut()) + .map_err(|e| Error::io_error(e, record_index.0))?; + self.file_position += self.record_data_buffer.get_mut().len() as u64; + Ok(true) + } } impl File { pub fn create_new(mut dst: T, table_info: TableInfo) -> Result { write_header_parts(&mut dst, &table_info.header, &table_info.fields_info)?; - + let record_size: usize = DELETION_FLAG_SIZE + + table_info + .fields_info + .iter() + .map(|i| i.field_length as usize) + .sum::(); + let record_data_buffer = Cursor::new(vec![0u8; record_size]); + let file_position = table_info.header.offset_to_first_record as u64; + debug_assert_eq!(file_position, dst.stream_position().unwrap()); Ok(Self { inner: dst, + memo_reader: None, header: table_info.header, - fields_info: table_info.fields_info, + fields_info: FieldsInfo { + inner: table_info.fields_info, + }, encoding: table_info.encoding, + record_data_buffer, field_data_buffer: [0u8; 255], options: ReadingOptions::default(), + file_position, }) } @@ -574,7 +690,7 @@ impl File { } pub fn sync_all(&mut self) -> std::io::Result<()> { - let current_pos = self.inner.seek(SeekFrom::Current(0))?; + let current_pos = self.inner.stream_position()?; self.inner.seek(SeekFrom::Start(0))?; self.header.write_to(&mut self.inner)?; self.inner.seek(SeekFrom::Start(current_pos))?; @@ -595,9 +711,29 @@ impl File { /// Opens an existing dBase file in read only mode pub fn open_read_only>(path: P) -> Result { - let file = std::fs::File::open(path).map_err(|error| Error::io_error(error, 0))?; - - File::open(BufReadWriteFile::new(file).unwrap()) + let file = std::fs::File::open(path.as_ref()).map_err(|error| Error::io_error(error, 0))?; + + let mut file = File::open(BufReadWriteFile::new(file).unwrap())?; + if file.fields_info.at_least_one_field_is_memo() { + let p = path.as_ref(); + let memo_type = file.header.file_type.supported_memo_type(); + if let Some(mt) = memo_type { + let memo_path = p.with_extension(mt.extension()); + + let memo_file = std::fs::File::open(memo_path).map_err(|error| Error { + record_num: 0, + field: None, + kind: ErrorKind::ErrorOpeningMemoFile(error), + })?; + + let memo_reader = BufReadWriteFile::new(memo_file) + .and_then(|memo_file| MemoReader::new(mt, memo_file)) + .map_err(|error| Error::io_error(error, 0))?; + + file.memo_reader = Some(memo_reader); + } + } + Ok(file) } /// Opens an existing dBase file in write only mode @@ -627,3 +763,50 @@ impl File { File::create_new(BufReadWriteFile::new(file).unwrap(), table_info) } } + +#[cfg(test)] +mod tests { + #[test] + fn ensure_record_has_been_read_into_buffer() { + let mut file = crate::File::open_read_only("tests/data/stations.dbf").unwrap(); + + { + let mut record = file.record(0).unwrap(); + let _ = record.read_field(crate::FieldIndex(0)).unwrap(); + // Must return false, meaning it correctly understands the record 0 is in memory + assert!(!file + .ensure_record_has_been_read_into_buffer(crate::RecordIndex(0)) + .unwrap()); + assert!(file + .ensure_record_has_been_read_into_buffer(crate::RecordIndex(1)) + .unwrap()); + } + + { + let mut record = file.record(4).unwrap(); + let _ = record.read_field(crate::FieldIndex(3)).unwrap(); + // Must return false, meaning it correctly understands the record 4 is in memory + assert!(!file + .ensure_record_has_been_read_into_buffer(crate::RecordIndex(4)) + .unwrap()); + assert!(file + .ensure_record_has_been_read_into_buffer(crate::RecordIndex(1)) + .unwrap()); + } + + // Make sure writing a field still work with the ensure mechanism + { + let mut record = file.record(10).unwrap(); + let value = record.read_field(crate::FieldIndex(2)).unwrap(); + // Use record.file to avoid double borrow + assert!(!record + .file + .ensure_record_has_been_read_into_buffer(crate::RecordIndex(10)) + .unwrap()); + record.write_field(crate::FieldIndex(2), &value).unwrap(); + assert!(!file + .ensure_record_has_been_read_into_buffer(crate::RecordIndex(10)) + .unwrap()); + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 802dd75..4f2b41a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -288,7 +288,7 @@ mod reading; mod record; mod writing; -pub use file::{FieldRef, File, RecordRef}; +pub use file::{FieldIndex, FieldRef, File, RecordIndex, RecordRef}; #[cfg(feature = "datafusion")] pub use crate::datafusion::{DbaseTable, DbaseTableFactory}; diff --git a/src/memo.rs b/src/memo.rs index 82fc240..39a156f 100644 --- a/src/memo.rs +++ b/src/memo.rs @@ -9,6 +9,15 @@ pub(crate) enum MemoFileType { FoxBaseMemo, } +impl MemoFileType { + pub(crate) const fn extension(self) -> &'static str { + match self { + MemoFileType::DbaseMemo | MemoFileType::DbaseMemo4 => "dbt", + MemoFileType::FoxBaseMemo => "fpt", + } + } +} + /// Although there are different memo file type with each a different /// header organisation, we use the same struct internally #[derive(Debug, Copy, Clone)] @@ -45,7 +54,7 @@ impl MemoHeader { /// Struct that reads knows how to read data from a memo source #[derive(Debug, Clone)] -pub(crate) struct MemoReader { +pub(crate) struct MemoReader { memo_file_type: MemoFileType, header: MemoHeader, source: T, diff --git a/src/reading.rs b/src/reading.rs index 70e75f3..c7970c1 100644 --- a/src/reading.rs +++ b/src/reading.rs @@ -52,7 +52,7 @@ pub struct TableInfo { /// Options related to reading #[derive(Copy, Clone, Debug)] pub struct ReadingOptions { - character_trim: TrimOption, + pub(crate) character_trim: TrimOption, } impl Default for ReadingOptions { @@ -152,7 +152,7 @@ impl ReaderBuilder { source: file.inner, memo_reader, header: file.header, - fields_info: file.fields_info, + fields_info: file.fields_info.inner, encoding: self .encoding .map_or_else(|| file.encoding, DynEncoding::new), @@ -208,7 +208,7 @@ impl Reader { source: file.inner, memo_reader: None, header: file.header, - fields_info: file.fields_info, + fields_info: file.fields_info.inner, encoding: file.encoding, options: ReadingOptions::default(), }) @@ -622,7 +622,7 @@ pub fn read>(path: P) -> Result, Error> { #[cfg(test)] mod test { use std::fs::File; - use std::io::{Seek, SeekFrom}; + use std::io::Seek; use super::*; @@ -630,7 +630,7 @@ mod test { fn pos_after_reading() { let file = File::open("tests/data/line.dbf").unwrap(); let mut reader = Reader::new(file).unwrap(); - let pos_after_reading = reader.source.seek(SeekFrom::Current(0)).unwrap(); + let pos_after_reading = reader.source.stream_position().unwrap(); let mut expected_pos = Header::SIZE + ((reader.fields_info.len()) * FieldInfo::SIZE); // Don't forget terminator