Skip to content

Commit

Permalink
feat: 유연함을 위해 encoding / decoding 동작 분리
Browse files Browse the repository at this point in the history
  • Loading branch information
wHoIsDReAmer committed Nov 22, 2024
1 parent 6674a34 commit 2b6d3a7
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 22 deletions.
30 changes: 30 additions & 0 deletions src/wal/endec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use crate::errors::{predule::WALError, RRDBError};

use super::types::WALEntry;

pub trait WALEncoder<T>: Clone {
fn encode(&self, entry: &T) -> Result<Vec<u8>, RRDBError>;
}

pub trait WALDecoder<T>: Clone {
fn decode(&self, data: &[u8]) -> Result<T, RRDBError>;
}


#[derive(Clone)]
pub struct WALEncodeImpl {}

impl WALEncoder<Vec<WALEntry>> for WALEncodeImpl {
fn encode(&self, entry: &Vec<WALEntry>) -> Result<Vec<u8>, RRDBError> {
Ok(bitcode::encode(entry))
}
}

#[derive(Clone)]
pub struct WALDecodeImpl {}

impl WALDecoder<Vec<WALEntry>> for WALDecodeImpl {
fn decode(&self, data: &[u8]) -> Result<Vec<WALEntry>, RRDBError> {
Ok(bitcode::decode(data).map_err(|e| WALError::wrap(e.to_string()))?)
}
}
70 changes: 49 additions & 21 deletions src/wal/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ use std::{fs, io::BufWriter, path::PathBuf};

use crate::{errors::{predule::WALError, RRDBError}, executor::config::global::GlobalConfig};

use super::types::{EntryType, WALEntry};
use super::{endec::{WALDecoder, WALEncoder}, types::{EntryType, WALEntry}};

pub struct WALManager {
pub struct WALManager<T>
where
T: WALEncoder<Vec<WALEntry>>,
{
/// The sequence number of the WAL file
sequence: usize,
/// The buffer of the WAL file
Expand All @@ -21,19 +24,31 @@ pub struct WALManager {
directory: PathBuf,
/// The extension of the WAL file
extension: String,
encoder: T,
}

// TODO: gz 압축 구현
// TODO: 대용량 페이지 파일 XLOG_CONTINUATION 처리 구현
// TODO: 단순히 이름을 wal{}. 형식으로 로깅하지 말고, 체계적인 파일 관리 구현
impl WALManager {
fn new(sequence: usize, entries: Vec<WALEntry>, page_size: usize, directory: PathBuf, extension: String) -> Self {
impl<T> WALManager<T>
where
T: WALEncoder<Vec<WALEntry>>,
{
fn new(
sequence: usize,
entries: Vec<WALEntry>,
page_size: usize,
directory: PathBuf,
extension: String,
encoder: T,
) -> Self {
Self {
sequence,
buffers: entries,
page_size,
directory,
extension,
encoder,
}
}

Expand All @@ -60,7 +75,7 @@ impl WALManager {
fn save_to_file(&mut self) -> Result<(), RRDBError> {
let path = self.directory.join(format!("{}.{}", self.sequence, self.extension));

let encoded = bitcode::encode(&self.buffers);
let encoded = self.encoder.encode(&self.buffers)?;

fs::write(&path, encoded).map_err(|e| WALError::wrap(e.to_string()))?;

Expand All @@ -78,7 +93,7 @@ impl WALManager {
self.buffers.push(WALEntry {
data: None,
entry_type: EntryType::Checkpoint,
timestamp: WALManager::get_current_secs()?,
timestamp: Self::get_current_secs()?,
transaction_id: None,
});
self.save_to_file()?;
Expand All @@ -100,35 +115,49 @@ impl WALManager {
}


pub struct WALBuilder {
page_size: usize,
directory: PathBuf,
extension: String,
pub struct WALBuilder<'a, T>
where
T: WALDecoder<Vec<WALEntry>>,
{
config: &'a GlobalConfig,
decoder: T,
}

impl WALBuilder {
pub fn new(config: &GlobalConfig) -> Self {
impl<'a, T> WALBuilder<'a, T>
where
T: WALDecoder<Vec<WALEntry>>,
{
pub fn new(config: &'a GlobalConfig, decoder: T) -> Self {
Self {
page_size: config.wal_segment_size as usize,
directory: PathBuf::from(config.wal_directory.clone()),
extension: config.wal_extension.to_string(),
config,
decoder,
}
}

pub async fn build(&self) -> Result<WALManager, RRDBError> {
pub async fn build<D>(&self, encoder: D) -> Result<WALManager<D>, RRDBError>
where
D: WALEncoder<Vec<WALEntry>>,
{
let (sequence, entries) = self.load_data().await?;

Ok(WALManager::new(sequence, entries, self.page_size, self.directory.clone(), self.extension.clone()))
Ok(WALManager::new(
sequence,
entries,
self.config.wal_segment_size as usize,
PathBuf::from(self.config.wal_directory.clone()),
self.config.wal_extension.to_string(),
encoder,
))
}

async fn load_data(&self) -> Result<(usize, Vec<WALEntry>), RRDBError> {
let mut sequence = 1;

// get all log file entry
let logs = std::fs::read_dir(&self.directory)
let logs = std::fs::read_dir(&self.config.wal_directory)
.map_err(|e| WALError::wrap(e.to_string()))?
.filter_map(|entry| entry.ok())
.filter(|entry| entry.path().extension() == Some(self.extension.as_ref()))
.filter(|entry| entry.path().extension() == Some(self.config.wal_extension.as_ref()))
.collect::<Vec<_>>();

let mut entries = Vec::new();
Expand All @@ -138,8 +167,7 @@ impl WALBuilder {

let content = std::fs::read(last_log.path())
.map_err(|e| WALError::wrap(e.to_string()))?;
let saved_entries: Vec<WALEntry> = bitcode::decode(&content)
.map_err(|e| WALError::wrap(e.to_string()))?;
let saved_entries: Vec<WALEntry> = self.decoder.decode(&content)?;

match saved_entries.last() {
Some(entry)
Expand Down
3 changes: 2 additions & 1 deletion src/wal/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod manager;
pub mod types;
pub mod types;
pub mod endec;

0 comments on commit 2b6d3a7

Please sign in to comment.