Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#159] WAL 로그 저장 포맷 선택 및 구현 #169

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ uuid = "1.1.2"
itertools = "0.10.5"
anyhow = "1.0.86"
mockall = "0.12.1"
bitcode = "0.6.3"

[target.'cfg(windows)'.dependencies]
winreg = "0.10.1"
Expand Down
8 changes: 7 additions & 1 deletion src/errors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod parsing_error;
pub mod predule;
pub mod server_error;
pub mod type_error;
pub mod wal_errors;

#[derive(Debug, PartialEq)]
pub enum RRDBError {
Expand All @@ -14,6 +15,7 @@ pub enum RRDBError {
ParsingError(parsing_error::ParsingError),
ServerError(server_error::ServerError),
TypeError(type_error::TypeError),
WALError(wal_errors::WALError),
}

impl ToString for RRDBError {
Expand All @@ -25,13 +27,14 @@ impl ToString for RRDBError {
RRDBError::ParsingError(e) => e.to_string(),
RRDBError::ServerError(e) => e.to_string(),
RRDBError::TypeError(e) => e.to_string(),
RRDBError::WALError(e) => e.to_string(),
}
}
}

#[cfg(test)]
mod tests {
use predule::{ExecuteError, IntoError, LexingError, ParsingError, ServerError, TypeError};
use predule::{ExecuteError, IntoError, LexingError, ParsingError, ServerError, TypeError, WALError};

use super::*;

Expand All @@ -54,5 +57,8 @@ mod tests {

let error = TypeError::wrap("test");
assert!(error.to_string().contains("test"));

let error = WALError::wrap("test");
assert!(error.to_string().contains("test"));
}
}
1 change: 1 addition & 0 deletions src/errors/predule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ pub use super::lexing_error::*;
pub use super::parsing_error::*;
pub use super::server_error::*;
pub use super::type_error::*;
pub use super::wal_errors::*;
55 changes: 55 additions & 0 deletions src/errors/wal_errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use super::RRDBError;

#[derive(Debug)]
pub struct WALError {
pub message: String,
pub backtrace: std::backtrace::Backtrace,
}

impl PartialEq for WALError {
fn eq(&self, other: &Self) -> bool {
self.message == other.message
}
}

impl WALError {
pub fn wrap<T: ToString>(message: T) -> RRDBError {
RRDBError::WALError(Self {
message: message.to_string(),
backtrace: std::backtrace::Backtrace::capture(),
})
}
}

impl std::error::Error for WALError {}

impl std::fmt::Display for WALError {
fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "wal error: {}", self.message)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_wal_error_eq() {
let error1 = WALError::wrap("test");
let error2 = WALError::wrap("test");
assert_eq!(error1, error2);
}

#[test]
fn test_wal_error_display() {
let error = WALError::wrap("test");

assert_eq!(error.to_string(), "wal error: test");
}

#[test]
fn test_wal_error_wrap() {
let error = WALError::wrap("test");
assert_eq!(error.to_string(), "wal error: test");
}
}
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod parser;
pub mod pgwire;
pub mod server;
pub mod utils;
pub mod wal;

use std::sync::Arc;

Expand Down
154 changes: 154 additions & 0 deletions src/wal/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use std::time::SystemTime;
#[allow(dead_code)]
#[allow(unused_variables)]
#[allow(unused_assignments)]
#[allow(unused_imports)]

use std::{fs, io::BufWriter, path::PathBuf};

use crate::errors::{predule::WALError, RRDBError};

use super::types::{EntryType, WALEntry};

pub struct WALManager {
sequence: usize,
buffers: Vec<WALEntry>,
page_size: usize,
directory: PathBuf,
}

// TODO: gz 압축 구현
// TODO: 대용량 페이지 파일 TOAST 등 처리 방법 고려
impl WALManager {
fn new(sequence: usize, entries: Vec<WALEntry>, page_size: usize, directory: PathBuf) -> Self {
Self {
sequence,
buffers: entries,
page_size,
directory,
}
}

pub fn append(&mut self, entry: WALEntry) -> Result<(), RRDBError> {
self.buffers.push(entry);

self.check_and_mark()?;
Ok(())
}

fn check_and_mark(&mut self) -> Result<(), RRDBError> {
let size = self.buffers.iter().map(|entry| entry.size()).sum::<usize>();

if size > self.page_size {
self.checkpoint()?;
}

Ok(())
}

fn save_to_file(&mut self) -> Result<(), RRDBError> {
let path = self.directory.join(format!("{}.log", self.sequence));

let encoded = bitcode::encode(&self.buffers);

fs::write(&path, encoded).map_err(|e| WALError::wrap(e.to_string()))?;

// fsync 디스크 동기화 보장
let file = fs::OpenOptions::new()
.write(true)
.open(path)
.map_err(|e| WALError::wrap(e.to_string()))?;
file.sync_all().map_err(|e| WALError::wrap(e.to_string()))?;

Ok(())
}

fn checkpoint(&mut self) -> Result<(), RRDBError> {
self.buffers.push(WALEntry {
data: None,
entry_type: EntryType::Checkpoint,
timestamp: WALManager::get_current_secs()?,
transaction_id: None,
});
self.save_to_file()?;

self.buffers.clear();
self.sequence += 1;

Ok(())
}

pub fn flush(&mut self) -> Result<(), WALError> {
todo!()
}

fn get_current_secs() -> Result<f64, RRDBError> {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(|e| WALError::wrap(e.to_string()))
.map(|duration| duration.as_secs_f64())
}
}


pub struct WALBuilder {
page_size: usize,
directory: PathBuf,
}

impl Default for WALBuilder {
fn default() -> Self {
Self {
page_size: 4096,
directory: PathBuf::from("./wal"),
}
}
}

impl WALBuilder {
pub fn build(&self) -> Result<WALManager, RRDBError> {
let (sequence, entries) = self.load_data()?;

Ok(WALManager::new(sequence, entries, self.page_size, self.directory.clone()))
}

pub fn set_page_size(mut self, page_size: usize) -> Self {
self.page_size = page_size;
self
}

pub fn set_directory(mut self, directory: PathBuf) -> Self {
self.directory = directory;
self
}

fn load_data(&self) -> Result<(usize, Vec<WALEntry>), RRDBError> {
wHoIsDReAmer marked this conversation as resolved.
Show resolved Hide resolved
let mut sequence = 1;

// get all log file entry
let logs = std::fs::read_dir(&self.directory)
.map_err(|e| WALError::wrap(e.to_string()))?
.filter_map(|entry| entry.ok())
.filter(|entry| entry.path().extension() == Some("log".as_ref()))
wHoIsDReAmer marked this conversation as resolved.
Show resolved Hide resolved
.collect::<Vec<_>>();

let mut entries = Vec::new();

if let Some(last_log) = logs.last() {
sequence = logs.len();

let content = std::fs::read(last_log.path())
.map_err(|e| WALError::wrap(e.to_string()))?;
let saved_entries: Vec<WALEntry> = bitcode::decode(&content)
wHoIsDReAmer marked this conversation as resolved.
Show resolved Hide resolved
.map_err(|e| WALError::wrap(e.to_string()))?;

match saved_entries.last() {
Some(entry)
if matches!(entry.entry_type, EntryType::Checkpoint) => entries = saved_entries,
_ => (),
}
}

Ok((sequence, entries))
}
}
2 changes: 2 additions & 0 deletions src/wal/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod manager;
pub mod types;
27 changes: 27 additions & 0 deletions src/wal/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use bitcode::{Decode, Encode};

#[derive(Clone, Debug, Encode, Decode)]
pub struct WALEntry {
pub entry_type: EntryType,
pub data: Option<Vec<u8>>,
pub timestamp: f64,
wHoIsDReAmer marked this conversation as resolved.
Show resolved Hide resolved
pub transaction_id: Option<u64>,
}

impl WALEntry {
pub fn size(&self) -> usize {
let data_size = self.data.as_ref().map_or(0, |data| data.len());
size_of::<EntryType>() + size_of::<f64>() + size_of::<u64>() + data_size
}
}

#[derive(Clone, Debug, Encode, Decode)]
pub enum EntryType {
Insert,
Set,
Delete,
Checkpoint,

TransactionBegin,
TransactionCommit,
}
Loading