Skip to content

Commit

Permalink
Merge pull request #3269 from danielSanchezQ/fragment-logs-iterators
Browse files Browse the repository at this point in the history
  • Loading branch information
eugene-babichenko authored May 7, 2021
2 parents fb0dc3b + 212391f commit 6f2e354
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 1 deletion.
1 change: 1 addition & 0 deletions jormungandr-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2018"
description = "Data structures and formats used by Jormungandr node API and configuration files"

[dependencies]
bincode = "1.3"
serde = { version = "1.0", features = ["derive"] }
serde_with = { version = "1.8", features = ["macros"] }
chain-impl-mockchain = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "master" }
Expand Down
109 changes: 109 additions & 0 deletions jormungandr-lib/src/interfaces/fragment_log_persistent.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
use std::fs;
use std::io;
use std::io::BufReader;
use std::path::{Path, PathBuf};

use crate::interfaces::FragmentDef;
use crate::time::SecondsSinceUnixEpoch;

use chain_impl_mockchain::fragment::Fragment;

use bincode::Options;
use serde::{Deserialize, Serialize};

#[derive(thiserror::Error, Debug)]
#[error("Couldn't deserialize entry {entry} in {file} due to: {cause}")]
pub struct DeserializeError {
file: String,
entry: usize,
cause: bincode::Error,
}

/// Represents a persistent fragments log entry.
#[derive(Debug, Serialize, Deserialize)]
pub struct PersistentFragmentLog {
Expand All @@ -14,3 +28,98 @@ pub struct PersistentFragmentLog {
#[serde(with = "FragmentDef")]
pub fragment: Fragment,
}

pub struct FileFragments {
reader: BufReader<fs::File>,
file_path: PathBuf,
}

pub struct FileFragmentsIterator {
reader: BufReader<fs::File>,
file_path: PathBuf,
counter: usize,
}

impl FileFragments {
pub fn from_path(
file_path: PathBuf,
) -> std::io::Result<Box<dyn Iterator<Item = Result<PersistentFragmentLog, DeserializeError>>>>
{
let metadata = fs::metadata(file_path.clone())?;
if metadata.len() == 0 {
return Ok(Box::new(vec![].into_iter()));
}
let reader = BufReader::new(fs::File::open(file_path.clone())?);
Ok(Box::new(Self { reader, file_path }.into_iter()))
}
}

impl IntoIterator for FileFragments {
type Item = Result<PersistentFragmentLog, DeserializeError>;
type IntoIter = FileFragmentsIterator;

fn into_iter(self) -> Self::IntoIter {
FileFragmentsIterator {
reader: self.reader,
file_path: self.file_path,
counter: 0,
}
}
}

impl Iterator for FileFragmentsIterator {
type Item = Result<PersistentFragmentLog, DeserializeError>;

fn next(&mut self) -> Option<Self::Item> {
// EOF reached when buffer is empty.
// Then we stop the iterator. File is guaranteed to be non-empty by construction, the check
// cannot be done till buffer is filled, hence first time should be able to read at least something.
if self.reader.buffer().is_empty() && self.counter != 0 {
return None;
}
let codec = bincode::DefaultOptions::new()
.with_fixint_encoding()
.allow_trailing_bytes();

let result = codec
.deserialize_from(&mut self.reader)
.map_err(|cause| DeserializeError {
file: self.file_path.to_string_lossy().to_string(),
entry: self.counter,
cause,
});
self.counter += 1;
Some(result)
}
}

pub fn list_persistent_fragment_log_files_from_folder_path(
folder: &Path,
) -> io::Result<impl Iterator<Item = PathBuf>> {
let mut entries: Vec<_> = fs::read_dir(folder)?
.filter_map(|entry| match entry {
Ok(entry) => Some(folder.join(entry.path())),
_ => None,
})
.collect();
entries.sort();
Ok(entries.into_iter())
}

pub fn read_persistent_fragment_logs_from_file_path(
entries: impl Iterator<Item = PathBuf>,
) -> io::Result<impl Iterator<Item = Result<PersistentFragmentLog, DeserializeError>>> {
let mut handles = Vec::new();
for entry in entries {
handles.push(FileFragments::from_path(entry)?);
}
Ok(handles.into_iter().flatten())
}

pub fn load_persistent_fragments_logs_from_folder_path(
folder: &Path,
) -> io::Result<impl Iterator<Item = Result<PersistentFragmentLog, DeserializeError>>> {
read_persistent_fragment_logs_from_file_path(
list_persistent_fragment_log_files_from_folder_path(folder)?,
)
}
5 changes: 4 additions & 1 deletion jormungandr-lib/src/interfaces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ pub use self::committee::CommitteeIdDef;
pub use self::config::*;
pub use self::fragment::FragmentDef;
pub use self::fragment_log::{FragmentLog, FragmentOrigin, FragmentStatus};
pub use self::fragment_log_persistent::PersistentFragmentLog;
pub use self::fragment_log_persistent::{
load_persistent_fragments_logs_from_folder_path, read_persistent_fragment_logs_from_file_path,
DeserializeError as FragmentLogDeserializeError, FileFragments, PersistentFragmentLog,
};
pub use self::fragments_batch::FragmentsBatch;
pub use self::fragments_processing_summary::{
FragmentRejectionReason, FragmentsProcessingSummary, RejectedFragmentInfo,
Expand Down

0 comments on commit 6f2e354

Please sign in to comment.