diff --git a/Cargo.toml b/Cargo.toml index 5f27d1b3..9679cb48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ uuid = "1.1.2" itertools = "0.10.5" anyhow = "1.0.86" mockall = "0.12.1" +bitcode = "0.6.3" [target.'cfg(windows)'.dependencies] winreg = "0.10.1" diff --git a/src/constants.rs b/src/constants.rs index ecc07488..1494a4aa 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -7,6 +7,12 @@ pub const DEFAULT_CONFIG_FILENAME: &str = "rrdb.config"; // 기본 Data 디렉터리 이름 pub const DEFAULT_DATA_DIRNAME: &str = "data"; +// 기본 WAL 디렉터리 이름 +pub const DEFAULT_WAL_DIRNAME: &str = "wal"; + +// 기본 WAL 확장자 +pub const DEFAULT_WAL_EXTENSION: &str = "log"; + // 운영체제별 기본 저장 경로를 반환합니다. #[cfg(target_os = "linux")] pub const DEFAULT_CONFIG_BASEPATH: &str = "/var/lib/rrdb"; diff --git a/src/errors/mod.rs b/src/errors/mod.rs index c0f3d55d..1762c08e 100644 --- a/src/errors/mod.rs +++ b/src/errors/mod.rs @@ -5,6 +5,7 @@ pub mod parsing_error; pub mod predule; pub mod server_error; pub mod type_error; +pub mod wal_errors; #[derive(Debug, PartialEq)] pub enum RRDBError { @@ -14,6 +15,7 @@ pub enum RRDBError { ParsingError(parsing_error::ParsingError), ServerError(server_error::ServerError), TypeError(type_error::TypeError), + WALError(wal_errors::WALError), } impl ToString for RRDBError { @@ -25,13 +27,14 @@ impl ToString for RRDBError { RRDBError::ParsingError(e) => e.to_string(), RRDBError::ServerError(e) => e.to_string(), RRDBError::TypeError(e) => e.to_string(), + RRDBError::WALError(e) => e.to_string(), } } } #[cfg(test)] mod tests { - use predule::{ExecuteError, IntoError, LexingError, ParsingError, ServerError, TypeError}; + use predule::{ExecuteError, IntoError, LexingError, ParsingError, ServerError, TypeError, WALError}; use super::*; @@ -54,5 +57,8 @@ mod tests { let error = TypeError::wrap("test"); assert!(error.to_string().contains("test")); + + let error = WALError::wrap("test"); + assert!(error.to_string().contains("test")); } } diff --git a/src/errors/predule.rs b/src/errors/predule.rs index 15939a08..2e224c33 100644 --- a/src/errors/predule.rs +++ b/src/errors/predule.rs @@ -4,3 +4,4 @@ pub use super::lexing_error::*; pub use super::parsing_error::*; pub use super::server_error::*; pub use super::type_error::*; +pub use super::wal_errors::*; \ No newline at end of file diff --git a/src/errors/wal_errors.rs b/src/errors/wal_errors.rs new file mode 100644 index 00000000..ee99dced --- /dev/null +++ b/src/errors/wal_errors.rs @@ -0,0 +1,55 @@ +use super::RRDBError; + +#[derive(Debug)] +pub struct WALError { + pub message: String, + pub backtrace: std::backtrace::Backtrace, +} + +impl PartialEq for WALError { + fn eq(&self, other: &Self) -> bool { + self.message == other.message + } +} + +impl WALError { + pub fn wrap(message: T) -> RRDBError { + RRDBError::WALError(Self { + message: message.to_string(), + backtrace: std::backtrace::Backtrace::capture(), + }) + } +} + +impl std::error::Error for WALError {} + +impl std::fmt::Display for WALError { + fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(formatter, "wal error: {}", self.message) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_wal_error_eq() { + let error1 = WALError::wrap("test"); + let error2 = WALError::wrap("test"); + assert_eq!(error1, error2); + } + + #[test] + fn test_wal_error_display() { + let error = WALError::wrap("test"); + + assert_eq!(error.to_string(), "wal error: test"); + } + + #[test] + fn test_wal_error_wrap() { + let error = WALError::wrap("test"); + assert_eq!(error.to_string(), "wal error: test"); + } +} diff --git a/src/executor/config/global.rs b/src/executor/config/global.rs index e3844199..d13ddc49 100644 --- a/src/executor/config/global.rs +++ b/src/executor/config/global.rs @@ -2,13 +2,18 @@ use std::path::PathBuf; use serde::{Deserialize, Serialize}; -use crate::constants::{DEFAULT_CONFIG_BASEPATH, DEFAULT_CONFIG_FILENAME, DEFAULT_DATA_DIRNAME}; +use crate::constants::{DEFAULT_CONFIG_BASEPATH, DEFAULT_CONFIG_FILENAME, DEFAULT_DATA_DIRNAME, DEFAULT_WAL_DIRNAME, DEFAULT_WAL_EXTENSION}; #[derive(Deserialize, Serialize, Debug, Clone)] pub struct GlobalConfig { pub port: u32, pub host: String, pub data_directory: String, + + pub wal_enabled: bool, + pub wal_directory: String, + pub wal_segment_size: u32, + pub wal_extension: String, } #[allow(clippy::derivable_impls)] @@ -24,6 +29,14 @@ impl std::default::Default for GlobalConfig { .to_str() .unwrap() .to_string(), + wal_enabled: true, + wal_directory: base_path + .join(DEFAULT_WAL_DIRNAME) + .to_str() + .unwrap() + .to_string(), + wal_segment_size: 1024 * 1024 * 16, // 16MB 세그먼트 사이즈 + wal_extension: DEFAULT_WAL_EXTENSION.to_string(), } } } diff --git a/src/executor/executor.rs b/src/executor/executor.rs index 7346db73..20b6aee9 100644 --- a/src/executor/executor.rs +++ b/src/executor/executor.rs @@ -5,6 +5,8 @@ use crate::errors::execute_error::ExecuteError; use crate::errors::RRDBError; use crate::executor::predule::ExecuteResult; use crate::logger::predule::Logger; +use crate::wal::endec::BitcodeEncoder; +use crate::wal::manager::WALManager; use super::config::global::GlobalConfig; use super::mocking::{CommandRunner, FileSystem, RealCommandRunner, RealFileSystem}; @@ -28,10 +30,13 @@ impl Executor { pub async fn process_query( &self, statement: SQLStatement, + wal_manager: Arc>, _connection_id: String, ) -> Result { Logger::info(format!("AST echo: {:?}", statement)); + // TODO: WAL 로깅 추가 + // 쿼리 실행 let result = match statement { SQLStatement::DDL(DDLStatement::CreateDatabaseQuery(query)) => { diff --git a/src/executor/initializer.rs b/src/executor/initializer.rs index 60d88765..2d04a822 100644 --- a/src/executor/initializer.rs +++ b/src/executor/initializer.rs @@ -26,10 +26,13 @@ impl Executor { // 3. 데이터 디렉터리 생성 (없다면) self.create_data_directory_if_not_exists().await?; - // 4. 데몬 설정파일 생성 (없다면) + // 4. WAL 디렉터리 생성 (없다면) + self.create_wal_directory_if_not_exists().await?; + + // 5. 데몬 설정파일 생성 (없다면) self.create_daemon_config_if_not_exists().await?; - // 5. 데몬 실행 + // 6. 데몬 실행 self.start_daemon().await?; Ok(()) @@ -96,6 +99,18 @@ impl Executor { Ok(()) } + async fn create_wal_directory_if_not_exists(&self) -> Result<(), RRDBError> { + let wal_path = self.config.wal_directory.clone(); + + if let Err(error) = self.file_system.create_dir(&wal_path).await { + if error.kind() != std::io::ErrorKind::AlreadyExists { + return Err(ExecuteError::wrap(error.to_string())); + } + } + + Ok(()) + } + #[cfg(target_os = "linux")] async fn create_daemon_config_if_not_exists(&self) -> Result<(), RRDBError> { use crate::constants::SYSTEMD_DAEMON_SCRIPT; @@ -188,9 +203,9 @@ mod tests { async fn test_init_config() { use mockall::predicate::eq; - use crate::executor::mocking::{ + use crate::{constants::{DEFAULT_DATA_DIRNAME, DEFAULT_WAL_DIRNAME}, executor::mocking::{ CommandRunner, FileSystem, MockCommandRunner, MockFileSystem, - }; + }}; use super::*; use std::sync::Arc; @@ -198,6 +213,10 @@ mod tests { const CONFIG: &[u8] = br##"port = 22208 host = "0.0.0.0" data_directory = "/var/lib/rrdb/data" +wal_enabled = true +wal_directory = "/var/lib/rrdb/wal" +wal_segment_size = 16777216 +wal_extension = "log" "##; use crate::constants::SYSTEMD_DAEMON_SCRIPT; @@ -240,10 +259,16 @@ data_directory = "/var/lib/rrdb/data" // 3. 데이터 디렉터리 생성 mock.expect_create_dir() .times(1) - .with(eq(DEFAULT_CONFIG_BASEPATH.to_owned() + "/data")) + .with(eq(DEFAULT_CONFIG_BASEPATH.to_owned() + "/" + DEFAULT_DATA_DIRNAME)) + .returning(|_| Ok(())); + + // 4. WAL 디렉터리 생성 + mock.expect_create_dir() + .times(1) + .with(eq(DEFAULT_CONFIG_BASEPATH.to_owned() + "/" + DEFAULT_WAL_DIRNAME)) .returning(|_| Ok(())); - // 4. 데몬 설정파일 생성 + // 5. 데몬 설정파일 생성 mock.expect_write_file() .times(1) .with( @@ -294,10 +319,15 @@ data_directory = "/var/lib/rrdb/data" // 3. 데이터 디렉터리 생성 mock.expect_create_dir() - .with(eq(DEFAULT_CONFIG_BASEPATH.to_owned() + "/data")) + .with(eq(DEFAULT_CONFIG_BASEPATH.to_owned() + "/" + DEFAULT_DATA_DIRNAME)) .returning(|_| Ok(())); - // 4. 데몬 설정파일 생성 + // 4. WAL 디렉터리 생성 + mock.expect_create_dir() + .with(eq(DEFAULT_CONFIG_BASEPATH.to_owned() + "/" + DEFAULT_WAL_DIRNAME)) + .returning(|_| Ok(())); + + // 5. 데몬 설정파일 생성 mock.expect_write_file() .with( eq("/etc/systemd/system/rrdb.service"), @@ -316,6 +346,52 @@ data_directory = "/var/lib/rrdb/data" Arc::new(mock) }), }, + TestCase { + name: "WAL 디렉터리 생성 실패", + want_error: true, + mock_config: Box::new(|| { + let config = GlobalConfig::default(); + + Arc::new(config) + }), + mock_file_system: Box::new(move || { + let mut mock = MockFileSystem::new(); + + // 1. 최상위 디렉터리 생성 + mock.expect_create_dir() + .times(1) + .with(eq(DEFAULT_CONFIG_BASEPATH)) + .returning(|_| Ok(())); + + // 2. 전역 설정파일 생성 + mock.expect_write_file() + .times(1) + .with( + eq(DEFAULT_CONFIG_BASEPATH.to_owned() + "/" + DEFAULT_CONFIG_FILENAME), + eq(CONFIG), + ) + .returning(|_, _| Ok(())); + + // 3. 데이터 디렉터리 생성 + mock.expect_create_dir() + .times(1) + .with(eq(DEFAULT_CONFIG_BASEPATH.to_owned() + "/" + DEFAULT_DATA_DIRNAME)) + .returning(|_| Ok(())); + + // 4. WAL 디렉터리 생성 + mock.expect_create_dir() + .times(1) + .with(eq(DEFAULT_CONFIG_BASEPATH.to_owned() + "/" + DEFAULT_WAL_DIRNAME)) + .returning(|_| Err(Error::from_raw_os_error(1))); + + Arc::new(mock) + }), + mock_command_runner: Box::new(|| { + let mock = MockCommandRunner::new(); + + Arc::new(mock) + }), + }, TestCase { name: "데몬 설정파일 생성 실패", want_error: true, @@ -348,7 +424,13 @@ data_directory = "/var/lib/rrdb/data" .with(eq(DEFAULT_CONFIG_BASEPATH.to_owned() + "/data")) .returning(|_| Ok(())); - // 4. 데몬 설정파일 생성 + // 4. WAL 디렉터리 생성 + mock.expect_create_dir() + .times(1) + .with(eq(DEFAULT_CONFIG_BASEPATH.to_owned() + "/" + DEFAULT_WAL_DIRNAME)) + .returning(|_| Ok(())); + + // 5. 데몬 설정파일 생성 mock.expect_write_file() .times(1) .with( @@ -397,7 +479,7 @@ data_directory = "/var/lib/rrdb/data" // 3. 데이터 디렉터리 생성 mock.expect_create_dir() .times(1) - .with(eq(DEFAULT_CONFIG_BASEPATH.to_owned() + "/data")) + .with(eq(DEFAULT_CONFIG_BASEPATH.to_owned() + "/" + DEFAULT_DATA_DIRNAME)) .returning(|_| Err(Error::from_raw_os_error(1))); Arc::new(mock) diff --git a/src/main.rs b/src/main.rs index d55cf49e..146b0c81 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ pub mod parser; pub mod pgwire; pub mod server; pub mod utils; +pub mod wal; use std::sync::Arc; diff --git a/src/server/server.rs b/src/server/server.rs index 4e7e4c1d..cd0990df 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -8,6 +8,8 @@ use crate::logger::predule::Logger; use crate::pgwire::predule::Connection; use crate::server::channel::ChannelResponse; use crate::server::predule::{ChannelRequest, SharedState}; +use crate::wal::endec::{BitcodeDecoder, BitcodeEncoder}; +use crate::wal::manager::WALBuilder; use futures::future::join_all; use tokio::net::TcpListener; @@ -31,21 +33,33 @@ impl Server { pub async fn run(&self) -> Result<(), RRDBError> { // TODO: 인덱스 로딩 등 기본 로직 실행. + // WAL 관리자 생성 + let encoder = BitcodeEncoder::new(); + let decoder = BitcodeDecoder::new(); + let wal_manager = Arc::new( + WALBuilder::new(&self.config, decoder) + .build(encoder) + .await + .map_err(|error| ExecuteError::wrap(error.to_string()))?, + ); + let (request_sender, mut request_receiver) = mpsc::channel::(1000); // background task // 쿼리 실행 요청을 전달받음 let config = self.config.clone(); + let wal_manager_cloned = wal_manager.clone(); let background_task = tokio::spawn(async move { while let Some(request) = request_receiver.recv().await { let config = config.clone(); + let wal_manager_cloned = wal_manager_cloned.clone(); // 쿼리 실행 태스크 tokio::spawn(async move { let executor = Executor::new(config); let result = executor - .process_query(request.statement, request.connection_id) + .process_query(request.statement, wal_manager_cloned, request.connection_id) .await; match result { diff --git a/src/wal/endec.rs b/src/wal/endec.rs new file mode 100644 index 00000000..da789d8d --- /dev/null +++ b/src/wal/endec.rs @@ -0,0 +1,40 @@ +use crate::errors::{predule::WALError, RRDBError}; + +use super::types::WALEntry; + +pub trait WALEncoder: Clone { + fn encode(&self, entry: &T) -> Result, RRDBError>; +} + +pub trait WALDecoder: Clone { + fn decode(&self, data: &[u8]) -> Result; +} + + +#[derive(Clone)] +pub struct BitcodeEncoder {} +impl BitcodeEncoder { + pub fn new() -> Self { + Self {} + } +} + +impl WALEncoder> for BitcodeEncoder { + fn encode(&self, entry: &Vec) -> Result, RRDBError> { + Ok(bitcode::encode(entry)) + } +} + +#[derive(Clone)] +pub struct BitcodeDecoder {} +impl BitcodeDecoder { + pub fn new() -> Self { + Self {} + } +} + +impl WALDecoder> for BitcodeDecoder { + fn decode(&self, data: &[u8]) -> Result, RRDBError> { + Ok(bitcode::decode(data).map_err(|e| WALError::wrap(e.to_string()))?) + } +} diff --git a/src/wal/manager.rs b/src/wal/manager.rs new file mode 100644 index 00000000..51ca635e --- /dev/null +++ b/src/wal/manager.rs @@ -0,0 +1,182 @@ +use std::time::SystemTime; +#[allow(dead_code)] +#[allow(unused_variables)] +#[allow(unused_assignments)] +#[allow(unused_imports)] + +use std::{fs, io::BufWriter, path::PathBuf}; + +use crate::{errors::{predule::WALError, RRDBError}, executor::config::global::GlobalConfig}; + +use super::{endec::{WALDecoder, WALEncoder}, types::{EntryType, WALEntry}}; + +#[derive(Default, Debug, Clone)] +pub struct WALManager +where + T: WALEncoder>, +{ + /// The sequence number of the WAL file + sequence: usize, + /// The buffer of the WAL file + buffers: Vec, + /// The page size of the WAL file + page_size: usize, + /// The directory of the WAL file + directory: PathBuf, + /// The extension of the WAL file + extension: String, + encoder: T, +} + +// TODO: gz 압축 구현 +// TODO: 대용량 페이지 파일 XLOG_CONTINUATION 처리 구현 +// TODO: 단순히 이름을 wal{}. 형식으로 로깅하지 말고, 체계적인 파일 관리 구현 +impl WALManager +where + T: WALEncoder>, +{ + fn new( + sequence: usize, + entries: Vec, + page_size: usize, + directory: PathBuf, + extension: String, + encoder: T, + ) -> Self { + Self { + sequence, + buffers: entries, + page_size, + directory, + extension, + encoder, + } + } + + pub fn append(&mut self, entry: WALEntry) -> Result<(), RRDBError> { + self.buffers.push(entry); + + self.check_and_mark()?; + Ok(()) + } + + fn check_and_mark(&mut self) -> Result<(), RRDBError> { + let size = self.buffers.iter().map(|entry| entry.size()).sum::(); + + if size > self.page_size { + self.checkpoint()?; + + self.buffers.clear(); + self.sequence += 1; + } + + Ok(()) + } + + fn save_to_file(&mut self) -> Result<(), RRDBError> { + let path = self.directory.join(format!("{}.{}", self.sequence, self.extension)); + + let encoded = self.encoder.encode(&self.buffers)?; + + fs::write(&path, encoded).map_err(|e| WALError::wrap(e.to_string()))?; + + // fsync 디스크 동기화 보장 + let file = fs::OpenOptions::new() + .write(true) + .open(path) + .map_err(|e| WALError::wrap(e.to_string()))?; + file.sync_all().map_err(|e| WALError::wrap(e.to_string()))?; + + Ok(()) + } + + fn checkpoint(&mut self) -> Result<(), RRDBError> { + self.buffers.push(WALEntry { + data: None, + entry_type: EntryType::Checkpoint, + timestamp: Self::get_current_secs()?, + transaction_id: None, + }); + self.save_to_file()?; + + Ok(()) + } + + pub fn flush(&mut self) -> Result<(), RRDBError> { + self.checkpoint()?; + Ok(()) + } + + fn get_current_secs() -> Result { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map_err(|e| WALError::wrap(e.to_string())) + .map(|duration| duration.as_millis()) + } +} + + +pub struct WALBuilder<'a, T> +where + T: WALDecoder>, +{ + config: &'a GlobalConfig, + decoder: T, +} + +impl<'a, T> WALBuilder<'a, T> +where + T: WALDecoder>, +{ + pub fn new(config: &'a GlobalConfig, decoder: T) -> Self { + Self { + config, + decoder, + } + } + + pub async fn build(&self, encoder: D) -> Result, RRDBError> + where + D: WALEncoder>, + { + let (sequence, entries) = self.load_data().await?; + + Ok(WALManager::new( + sequence, + entries, + self.config.wal_segment_size as usize, + PathBuf::from(self.config.wal_directory.clone()), + self.config.wal_extension.to_string(), + encoder, + )) + } + + async fn load_data(&self) -> Result<(usize, Vec), RRDBError> { + let mut sequence = 1; + + // get all log file entry + let logs = std::fs::read_dir(&self.config.wal_directory) + .map_err(|e| WALError::wrap(e.to_string()))? + .filter_map(|entry| entry.ok()) + .filter(|entry| entry.path().extension() == Some(self.config.wal_extension.as_ref())) + .collect::>(); + + let mut entries = Vec::new(); + + if let Some(last_log) = logs.last() { + sequence = logs.len(); + + let content = std::fs::read(last_log.path()) + .map_err(|e| WALError::wrap(e.to_string()))?; + let saved_entries: Vec = self.decoder.decode(&content)?; + + match saved_entries.last() { + Some(entry) + if matches!(entry.entry_type, EntryType::Checkpoint) => entries = saved_entries, + _ => (), + } + } + + Ok((sequence, entries)) + } +} diff --git a/src/wal/mod.rs b/src/wal/mod.rs new file mode 100644 index 00000000..cd93f3d5 --- /dev/null +++ b/src/wal/mod.rs @@ -0,0 +1,3 @@ +pub mod manager; +pub mod types; +pub mod endec; \ No newline at end of file diff --git a/src/wal/types.rs b/src/wal/types.rs new file mode 100644 index 00000000..6ac9ce5b --- /dev/null +++ b/src/wal/types.rs @@ -0,0 +1,28 @@ +use bitcode::{Decode, Encode}; + +#[derive(Default, Clone, Debug, Encode, Decode)] +pub struct WALEntry { + pub entry_type: EntryType, + pub data: Option>, + pub timestamp: u128, + pub transaction_id: Option, +} + +impl WALEntry { + pub fn size(&self) -> usize { + let data_size = self.data.as_ref().map_or(0, |data| data.len()); + size_of::() + size_of::() + size_of::>() + data_size + } +} + +#[derive(Default, Clone, Debug, Encode, Decode)] +pub enum EntryType { + #[default] + Insert, + Set, + Delete, + Checkpoint, + + TransactionBegin, + TransactionCommit, +}