Skip to content

Commit

Permalink
Redacting stream refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Aug 8, 2024
1 parent a8cd479 commit 39eb872
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 118 deletions.
135 changes: 126 additions & 9 deletions src/commands/copy_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ use crate::filesystems::{
AbsoluteFilePath, DetectFileSystem, FileMatcher, FileMatcherResult, FileSystemConnection,
FileSystemRef,
};
use crate::redacters::{RedactSupportedOptions, Redacter, RedacterOptions, Redacters};
use crate::redacters::{
RedactSupportedOptions, Redacter, RedacterDataItem, RedacterDataItemContent, RedacterOptions,
Redacters,
};
use crate::reporter::AppReporter;
use crate::AppResult;
use console::{Style, Term};
use futures::Stream;
use futures::{Stream, TryStreamExt};
use gcloud_sdk::prost::bytes;
use indicatif::*;
use std::error::Error;
Expand Down Expand Up @@ -199,7 +202,7 @@ async fn transfer_and_redact_file<
.as_str(),
);
let transfer_result = if let Some(ref redacter) = redacter {
redact_upload_file::<SFS, DFS>(
redact_upload_file::<SFS, DFS, _>(
bar,
destination_fs,
bold_style,
Expand All @@ -219,19 +222,30 @@ async fn transfer_and_redact_file<
Ok(transfer_result)
}

async fn redact_upload_file<'a, SFS: FileSystemConnection<'a>, DFS: FileSystemConnection<'a>>(
async fn redact_upload_file<
'a,
SFS: FileSystemConnection<'a>,
DFS: FileSystemConnection<'a>,
S: Stream<Item = AppResult<bytes::Bytes>> + Send + Unpin + Sync + 'static,
>(
bar: &ProgressBar,
destination_fs: &mut DFS,
bold_style: Style,
source_reader: Box<dyn Stream<Item = AppResult<bytes::Bytes>> + Send + Sync + Unpin + 'static>,
source_reader: S,
base_resolved_file_ref: &AbsoluteFilePath,
dest_file_ref: &FileSystemRef,
redacter: &impl Redacter,
) -> AppResult<TransferFileResult> {
if redacter.redact_supported_options(dest_file_ref).await?
!= RedactSupportedOptions::Unsupported
{
match redacter.redact_stream(source_reader, dest_file_ref).await {
let redacter_supported_options = redacter.redact_supported_options(dest_file_ref).await?;
if redacter_supported_options != RedactSupportedOptions::Unsupported {
match redact_stream(
redacter,
&redacter_supported_options,
source_reader,
dest_file_ref,
)
.await
{
Ok(redacted_reader) => {
destination_fs
.upload(redacted_reader, Some(dest_file_ref))
Expand Down Expand Up @@ -286,3 +300,106 @@ async fn redact_upload_file<'a, SFS: FileSystemConnection<'a>, DFS: FileSystemCo
Ok(TransferFileResult::Skipped)
}
}

async fn redact_stream<
S: Stream<Item = AppResult<bytes::Bytes>> + Send + Unpin + Sync + 'static,
>(
redacter: &impl Redacter,
supported_options: &RedactSupportedOptions,
input: S,
file_ref: &FileSystemRef,
) -> AppResult<Box<dyn Stream<Item = AppResult<bytes::Bytes>> + Send + Sync + Unpin + 'static>> {
let content_to_redact = match file_ref.media_type {
Some(ref mime)
if Redacters::is_mime_text(mime)
|| (Redacters::is_mime_table(mime)
&& matches!(supported_options, RedactSupportedOptions::SupportedAsText)) =>
{
let all_chunks: Vec<bytes::Bytes> = input.try_collect().await?;
let all_bytes = all_chunks.concat();
let content = String::from_utf8(all_bytes).map_err(|e| AppError::SystemError {
message: format!("Failed to convert bytes to string: {}", e),
})?;
Ok(RedacterDataItem {
content: RedacterDataItemContent::Value(content),
file_ref: file_ref.clone(),
})
}
Some(ref mime) if Redacters::is_mime_image(mime) => {
let all_chunks: Vec<bytes::Bytes> = input.try_collect().await?;
let all_bytes = all_chunks.concat();
Ok(RedacterDataItem {
content: RedacterDataItemContent::Image {
mime_type: mime.clone(),
data: all_bytes.into(),
},
file_ref: file_ref.clone(),
})
}
Some(ref mime) if Redacters::is_mime_table(mime) => {
let reader = tokio_util::io::StreamReader::new(
input.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)),
);
let mut reader = csv_async::AsyncReaderBuilder::default()
.has_headers(!redacter.options().csv_headers_disable)
.delimiter(
redacter
.options()
.csv_delimiter
.as_ref()
.cloned()
.unwrap_or(b','),
)
.create_reader(reader);
let headers = if !redacter.options().csv_headers_disable {
reader
.headers()
.await?
.into_iter()
.map(|h| h.to_string())
.collect()
} else {
vec![]
};
let records: Vec<csv_async::StringRecord> = reader.records().try_collect().await?;
Ok(RedacterDataItem {
content: RedacterDataItemContent::Table {
headers,
rows: records
.iter()
.map(|r| r.iter().map(|c| c.to_string()).collect())
.collect(),
},
file_ref: file_ref.clone(),
})
}
Some(ref mime) => Err(AppError::SystemError {
message: format!("Media type {} is not supported for redaction", mime),
}),
None => Err(AppError::SystemError {
message: "Media type is not provided to redact".to_string(),
}),
}?;

let content = redacter.redact(content_to_redact).await?;

match content {
RedacterDataItemContent::Value(content) => {
let bytes = bytes::Bytes::from(content.into_bytes());
Ok(Box::new(futures::stream::iter(vec![Ok(bytes)])))
}
RedacterDataItemContent::Image { data, .. } => {
Ok(Box::new(futures::stream::iter(vec![Ok(data)])))
}
RedacterDataItemContent::Table { headers, rows } => {
let mut writer = csv_async::AsyncWriter::from_writer(vec![]);
writer.write_record(headers).await?;
for row in rows {
writer.write_record(row).await?;
}
writer.flush().await?;
let bytes = bytes::Bytes::from(writer.into_inner().await?);
Ok(Box::new(futures::stream::iter(vec![Ok(bytes)])))
}
}
}
109 changes: 0 additions & 109 deletions src/redacters/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use crate::AppResult;
use csv_async::StringRecord;
use futures::{Stream, TryStreamExt};
use gcloud_sdk::prost::bytes;
use mime::Mime;
use std::fmt::Display;

use crate::errors::AppError;
use crate::filesystems::FileSystemRef;
use crate::reporter::AppReporter;

Expand Down Expand Up @@ -139,112 +136,6 @@ pub trait Redacter {
) -> AppResult<RedactSupportedOptions>;

fn options(&self) -> &RedacterOptions;

async fn redact_stream<
S: Stream<Item = AppResult<bytes::Bytes>> + Send + Unpin + Sync + 'static,
>(
&self,
input: S,
file_ref: &FileSystemRef,
) -> AppResult<Box<dyn Stream<Item = AppResult<bytes::Bytes>> + Send + Sync + Unpin + 'static>>
{
let supported_options = self.redact_supported_options(file_ref).await?;
let content_to_redact = match file_ref.media_type {
Some(ref mime)
if Redacters::is_mime_text(mime)
|| (Redacters::is_mime_table(mime)
&& matches!(
supported_options,
RedactSupportedOptions::SupportedAsText
)) =>
{
let all_chunks: Vec<bytes::Bytes> = input.try_collect().await?;
let all_bytes = all_chunks.concat();
let content = String::from_utf8(all_bytes).map_err(|e| AppError::SystemError {
message: format!("Failed to convert bytes to string: {}", e),
})?;
Ok(RedacterDataItem {
content: RedacterDataItemContent::Value(content),
file_ref: file_ref.clone(),
})
}
Some(ref mime) if Redacters::is_mime_image(mime) => {
let all_chunks: Vec<bytes::Bytes> = input.try_collect().await?;
let all_bytes = all_chunks.concat();
Ok(RedacterDataItem {
content: RedacterDataItemContent::Image {
mime_type: mime.clone(),
data: all_bytes.into(),
},
file_ref: file_ref.clone(),
})
}
Some(ref mime) if Redacters::is_mime_table(mime) => {
let reader = tokio_util::io::StreamReader::new(
input.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)),
);
let mut reader = csv_async::AsyncReaderBuilder::default()
.has_headers(!self.options().csv_headers_disable)
.delimiter(
self.options()
.csv_delimiter
.as_ref()
.cloned()
.unwrap_or(b','),
)
.create_reader(reader);
let headers = if !self.options().csv_headers_disable {
reader
.headers()
.await?
.into_iter()
.map(|h| h.to_string())
.collect()
} else {
vec![]
};
let records: Vec<StringRecord> = reader.records().try_collect().await?;
Ok(RedacterDataItem {
content: RedacterDataItemContent::Table {
headers,
rows: records
.iter()
.map(|r| r.iter().map(|c| c.to_string()).collect())
.collect(),
},
file_ref: file_ref.clone(),
})
}
Some(ref mime) => Err(AppError::SystemError {
message: format!("Media type {} is not supported for redaction", mime),
}),
None => Err(AppError::SystemError {
message: "Media type is not provided to redact".to_string(),
}),
}?;

let content = self.redact(content_to_redact).await?;

match content {
RedacterDataItemContent::Value(content) => {
let bytes = bytes::Bytes::from(content.into_bytes());
Ok(Box::new(futures::stream::iter(vec![Ok(bytes)])))
}
RedacterDataItemContent::Image { data, .. } => {
Ok(Box::new(futures::stream::iter(vec![Ok(data)])))
}
RedacterDataItemContent::Table { headers, rows } => {
let mut writer = csv_async::AsyncWriter::from_writer(vec![]);
writer.write_record(headers).await?;
for row in rows {
writer.write_record(row).await?;
}
writer.flush().await?;
let bytes = bytes::Bytes::from(writer.into_inner().await?);
Ok(Box::new(futures::stream::iter(vec![Ok(bytes)])))
}
}
}
}

impl<'a> Redacter for Redacters<'a> {
Expand Down

0 comments on commit 39eb872

Please sign in to comment.