Skip to content

Commit

Permalink
increased write buf size for FUSE
Browse files Browse the repository at this point in the history
debug print ret values for FUSE operations
fix #12 "copy the rest of the file from write pos only on release handle"
  • Loading branch information
radumarias committed Apr 27, 2024
1 parent 47afeac commit cb78d26
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 76 deletions.
109 changes: 56 additions & 53 deletions src/encryptedfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use secrecy::{ExposeSecret, SecretString, SecretVec};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use strum_macros::{Display, EnumIter, EnumString};
use thiserror::Error;
use tracing::{error, instrument};
use tracing::{error, instrument, warn};
use weak_table::WeakValueHashMap;

#[cfg(test)]
Expand Down Expand Up @@ -190,7 +190,6 @@ impl PartialEq for DirectoryEntry {
fn eq(&self, other: &Self) -> bool {
self.ino == other.ino && self.name.expose_secret() == other.name.expose_secret() && self.kind == other.kind
}

}

/// Like [`DirectoryEntry`] but with ['FileAttr'].
Expand Down Expand Up @@ -711,12 +710,17 @@ impl EncryptedFs {
decryptor.finish();
valid_fh = true;
}
if let Some((attr, path, _, encryptor, _)) = self.write_handles.remove(&handle) {
if let Some((attr, path, mut position, mut encryptor, _)) = self.write_handles.remove(&handle) {
// write attr only here to avoid serializing it multiple times while writing
// merge time fields with existing data because it might got change while we kept the handle
let mut attr_0 = self.get_inode(attr.ino)?;
merge_attr_time_and_set_size_obj(&mut attr_0, &attr);
self.write_inode(&attr_0)?;
// if position is before file end we copy the rest of the file from position to the end
if position < attr.size {
let ino_str = attr.ino.to_string();
Self::copy_remaining_of_file(&attr, &mut position, &mut encryptor, &self.cipher, &self.key, self.data_dir.join(CONTENTS_DIR).join(ino_str))?;
}
encryptor.finish()?;
// if we are in tmp file move it to actual file
if path.to_str().unwrap().ends_with(".tmp") {
Expand Down Expand Up @@ -776,18 +780,24 @@ impl EncryptedFs {
let binding = self.get_inode_lock(ino);
let _guard = binding.write().unwrap();

let mut decryptor: Option<Decryptor<File>> = None;

let (_, _, position, _, _) = self.write_handles.get_mut(&handle).unwrap();
let (attr, _, position, _, _) = self.write_handles.get_mut(&handle).unwrap();
if *position != offset {
// in order to seek we need to recreate all stream from the beginning until the desired position of file size
// for that we create a new encryptor into a tmp file reading from original file and writing to tmp one
// when we release the handle we will move this tmp file to the actual file

let mut pos = *position;
let ino_str = attr.ino.to_string();

// remove handle data because we will replace it with the tmp one
let (attr, path, _, encryptor, lock) =
let (attr, path, _, mut encryptor, lock) =
self.write_handles.remove(&handle).unwrap();

// if position is before file end we copy the rest of the file from position to the end
if pos < attr.size {
Self::copy_remaining_of_file(&attr, &mut pos, &mut encryptor, &self.cipher, &self.key, self.data_dir.join(CONTENTS_DIR).join(ino_str))?;
}

// finish the current writer so we flush all data to the file
encryptor.finish()?;

Expand Down Expand Up @@ -833,8 +843,6 @@ impl EncryptedFs {
}
}
self.replace_handle_data(handle, attr, tmp_path, position, encryptor, lock.clone());

decryptor = Some(decryptor2);
}
let (attr, _, position, encryptor, _) =
self.write_handles.get_mut(&handle).unwrap();
Expand All @@ -856,54 +864,44 @@ impl EncryptedFs {
encryptor.write_all(buf)?;
*position += buf.len() as u64;

// todo: move this to when we release the handle so if we get several writes on block after another before file end
// todo: we don't keep recreating the writer
// todo: but we need to be careful because we need to update the size of the file only when we release the handle
// if position is before file end we copy the rest of the file from position to the end
if *position < attr.size {
let mut decryptor = if let Some(mut decryptor) = decryptor {
// reuse the existing decryptor
// skip written bytes
decryptor.read_exact(vec![0_u8; buf.len()].as_mut_slice())?;
decryptor
} else {
// create a new decryptor by reading from the beginning of the file
let mut decryptor = crypto_util::create_decryptor(OpenOptions::new().read(true).open(self.data_dir.join(CONTENTS_DIR).join(attr.ino.to_string()))?,
&self.cipher, &self.key);
// move read position to the desired position
let mut buffer: [u8; 4096] = [0; 4096];
let mut read_pos = 0u64;
loop {
let len = min(4096, *position - read_pos) as usize;
decryptor.read_exact(&mut buffer[..len])?;
read_pos += len as u64;
if read_pos == *position {
break;
}
}
if *position > attr.size {
// if we write pass file size set the new size
attr.size = *position;
}
attr.mtime = SystemTime::now();
attr.ctime = SystemTime::now();

decryptor
};
Ok(())
}

// copy the rest of the file
let mut buffer: [u8; 4096] = [0; 4096];
loop {
let len = min(4096, attr.size - *position) as usize;
decryptor.read_exact(&mut buffer[..len])?;
encryptor.write_all(&buffer[..len])?;
*position += len as u64;
if *position == attr.size {
break;
}
fn copy_remaining_of_file(attr: &TimeAndSizeFileAttr, position: &mut u64, encryptor: &mut Encryptor<File>, cipher: &Cipher, key: &SecretVec<u8>, file: PathBuf) -> Result<(), FsError> {
warn!("copy_remaining_of_file");
// create a new decryptor by reading from the beginning of the file
let mut decryptor = crypto_util::create_decryptor(OpenOptions::new().read(true).open(file)?, cipher, key);
// move read position to the desired position
let mut buffer: [u8; 4096] = [0; 4096];
let mut read_pos = 0u64;
loop {
let len = min(4096, *position - read_pos) as usize;
decryptor.read_exact(&mut buffer[..len])?;
read_pos += len as u64;
if read_pos == *position {
break;
}
decryptor.finish();
}

let size = *position;
attr.size = size;
attr.mtime = SystemTime::now();
attr.ctime = SystemTime::now();

// copy the rest of the file
let mut buffer: [u8; 4096] = [0; 4096];
loop {
let len = min(4096, attr.size - *position) as usize;
decryptor.read_exact(&mut buffer[..len])?;
encryptor.write_all(&buffer[..len])?;
*position += len as u64;
if *position == attr.size {
break;
}
}
decryptor.finish();
Ok(())
}

Expand Down Expand Up @@ -1076,7 +1074,12 @@ impl EncryptedFs {
continue;
}
}
if let Some((attr, path, _, encryptor, _)) = self.write_handles.remove(&key) {
if let Some((attr, path, mut position, mut encryptor, _)) = self.write_handles.remove(&key) {
// if position is before file end we copy the rest of the file from position to the end
if position < attr.size {
let ino_str = attr.ino.to_string();
Self::copy_remaining_of_file(&attr, &mut position, &mut encryptor, &self.cipher, &self.key, self.data_dir.join(CONTENTS_DIR).join(ino_str))?;
}
encryptor.finish()?;
// if we are in tmp file move it to actual file
if path.to_str().unwrap().ends_with(".tmp") {
Expand Down
47 changes: 24 additions & 23 deletions src/encryptedfs_fuse3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl EncryptedFsFuse3 {
}
}

#[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::DEBUG))]
#[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
fn create_nod(&self, parent: u64, mut mode: u32, req: &Request, name: &OsStr, read: bool, write: bool) -> std::result::Result<(u64, FileAttr), c_int> {
let parent_attr = match self.get_fs().borrow_mut().get_inode(parent) {
Err(err) => {
Expand Down Expand Up @@ -233,7 +233,7 @@ fn from_attr(from: FileAttr) -> fuse3::raw::prelude::FileAttr {
}

impl Filesystem for EncryptedFsFuse3 {
#[instrument(skip(self), err(level = Level::DEBUG))]
#[instrument(skip(self), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn init(&self, _req: Request) -> Result<ReplyInit> {
trace!("");

Expand All @@ -247,7 +247,7 @@ impl Filesystem for EncryptedFsFuse3 {
trace!("");
}

#[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::DEBUG))]
#[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn lookup(&self, req: Request, parent: u64, name: &OsStr) -> Result<ReplyEntry> {
trace!("");

Expand Down Expand Up @@ -305,7 +305,7 @@ impl Filesystem for EncryptedFsFuse3 {
trace!("");
}

#[instrument(skip(self), err(level = Level::DEBUG))]
#[instrument(skip(self), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn getattr(
&self,
_req: Request,
Expand Down Expand Up @@ -334,7 +334,7 @@ impl Filesystem for EncryptedFsFuse3 {
}
}

#[instrument(skip(self), err(level = Level::DEBUG))]
#[instrument(skip(self), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn setattr(
&self,
req: Request,
Expand Down Expand Up @@ -488,7 +488,7 @@ impl Filesystem for EncryptedFsFuse3 {
})
}

#[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::DEBUG))]
#[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn mknod(
&self,
req: Request,
Expand Down Expand Up @@ -527,7 +527,7 @@ impl Filesystem for EncryptedFsFuse3 {
}
}

#[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::DEBUG))]
#[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn mkdir(
&self,
req: Request,
Expand Down Expand Up @@ -591,7 +591,7 @@ impl Filesystem for EncryptedFsFuse3 {
}
}

#[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::DEBUG))]
#[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn unlink(&self, req: Request, parent: Inode, name: &OsStr) -> Result<()> {
trace!("");

Expand Down Expand Up @@ -642,7 +642,7 @@ impl Filesystem for EncryptedFsFuse3 {
}


#[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::DEBUG))]
#[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn rmdir(&self, req: Request, parent: Inode, name: &OsStr) -> Result<()> {
trace!("");

Expand Down Expand Up @@ -692,7 +692,7 @@ impl Filesystem for EncryptedFsFuse3 {
Ok(())
}

#[instrument(skip(self, name, new_name), fields(name = name.to_str().unwrap(), new_name = new_name.to_str().unwrap()), err(level = Level::DEBUG))]
#[instrument(skip(self, name, new_name), fields(name = name.to_str().unwrap(), new_name = new_name.to_str().unwrap()), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn rename(
&self,
req: Request,
Expand Down Expand Up @@ -786,7 +786,7 @@ impl Filesystem for EncryptedFsFuse3 {
}
}

#[instrument(skip(self), err(level = Level::DEBUG))]
#[instrument(skip(self), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn open(&self, req: Request, inode: Inode, flags: u32) -> Result<ReplyOpen> {
trace!("");

Expand Down Expand Up @@ -861,7 +861,7 @@ impl Filesystem for EncryptedFsFuse3 {
}
}

#[instrument(skip(self, data), err(level = Level::DEBUG))]
#[instrument(skip(self, data), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn write(
&self,
_req: Request,
Expand All @@ -876,24 +876,24 @@ impl Filesystem for EncryptedFsFuse3 {
trace!("");
debug!(size = data.len());

if let Err(err) = self.get_fs().borrow_mut().write_all(inode, offset, data, fh) {
self.get_fs().borrow_mut().write_all(inode, offset, data, fh).map_err(|err| {
error!(err = %err);
return Err(EIO.into());
}
EIO
})?;

Ok(ReplyWrite {
written: data.len() as u32,
})
}

#[instrument(skip(self), err(level = Level::DEBUG))]
#[instrument(skip(self), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn statfs(&self, _req: Request, inode: u64) -> Result<ReplyStatFs> {
trace!("");
warn!("implementation is a stub");
Ok(STATFS)
}

#[instrument(skip(self), err(level = Level::DEBUG))]
#[instrument(skip(self), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn release(
&self,
req: Request,
Expand Down Expand Up @@ -933,7 +933,7 @@ impl Filesystem for EncryptedFsFuse3 {
Ok(())
}

#[instrument(skip(self), err(level = Level::DEBUG))]
#[instrument(skip(self), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn flush(&self, req: Request, inode: Inode, fh: u64, lock_owner: u64) -> Result<()> {
trace!("");

Expand All @@ -945,7 +945,7 @@ impl Filesystem for EncryptedFsFuse3 {
Ok(())
}

#[instrument(skip(self), err(level = Level::DEBUG))]
#[instrument(skip(self), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn opendir(&self, req: Request, inode: Inode, flags: u32) -> Result<ReplyOpen> {
trace!("");

Expand Down Expand Up @@ -1017,14 +1017,14 @@ impl Filesystem for EncryptedFsFuse3 {
})
}

#[instrument(skip(self), err(level = Level::DEBUG))]
#[instrument(skip(self), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn releasedir(&self, req: Request, inode: Inode, fh: u64, flags: u32) -> Result<()> {
trace!("");

Ok(())
}

#[instrument(skip(self), err(level = Level::DEBUG))]
#[instrument(skip(self), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn access(&self, req: Request, inode: u64, mask: u32) -> Result<()> {
trace!("");

Expand All @@ -1040,7 +1040,7 @@ impl Filesystem for EncryptedFsFuse3 {
}
}

#[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::DEBUG))]
#[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn create(
&self,
req: Request,
Expand Down Expand Up @@ -1083,6 +1083,7 @@ impl Filesystem for EncryptedFsFuse3 {

type DirEntryPlusStream<'a> = Iter<Skip<DirectoryEntryPlusIterator>> where Self: 'a;

#[instrument(skip(self), err(level = Level::DEBUG))]
async fn readdirplus(
&self,
_req: Request,
Expand All @@ -1107,7 +1108,7 @@ impl Filesystem for EncryptedFsFuse3 {
})
}

#[instrument(skip(self), err(level = Level::DEBUG))]
#[instrument(skip(self), err(level = Level::DEBUG), ret(level = Level::DEBUG))]
async fn copy_file_range(
&self,
req: Request,
Expand Down

0 comments on commit cb78d26

Please sign in to comment.