Skip to content

Commit

Permalink
Simple text sampling (#7)
Browse files Browse the repository at this point in the history
* Redact stream refactoring

* Sampling size argument support for text
  • Loading branch information
abdolence authored Aug 8, 2024
1 parent 39eb872 commit 27e5be7
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 117 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ Options:
URL for image redact endpoint for MsPresidio redacter
--gemini-model <GEMINI_MODEL>
Gemini model name for Gemini LLM redacter. Default is 'models/gemini-1.5-flash'
--sampling-size <SAMPLING_SIZE>
Sampling size in bytes before redacting files. Disabled by default
-h, --help
Print help
```
Expand Down
7 changes: 7 additions & 0 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ pub struct RedacterArgs {
help = "Gemini model name for Gemini LLM redacter. Default is 'models/gemini-1.5-flash'"
)]
pub gemini_model: Option<GeminiLlmModelName>,

#[arg(
long,
help = "Sampling size in bytes before redacting files. Disabled by default"
)]
pub sampling_size: Option<usize>,
}

impl TryInto<RedacterOptions> for RedacterArgs {
Expand Down Expand Up @@ -189,6 +195,7 @@ impl TryInto<RedacterOptions> for RedacterArgs {
allow_unsupported_copies: self.allow_unsupported_copies,
csv_headers_disable: self.csv_headers_disable,
csv_delimiter: self.csv_delimiter.map(|c| c as u8),
sampling_size: self.sampling_size,
})
}
}
129 changes: 15 additions & 114 deletions src/commands/copy_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@ use crate::filesystems::{
AbsoluteFilePath, DetectFileSystem, FileMatcher, FileMatcherResult, FileSystemConnection,
FileSystemRef,
};
use crate::redacters::{
RedactSupportedOptions, Redacter, RedacterDataItem, RedacterDataItemContent, RedacterOptions,
Redacters,
};
use crate::redacters::{RedactSupportedOptions, Redacter, RedacterOptions, Redacters};
use crate::reporter::AppReporter;
use crate::AppResult;
use console::{Style, Term};
use futures::{Stream, TryStreamExt};
use futures::Stream;
use gcloud_sdk::prost::bytes;
use indicatif::*;
use std::error::Error;
Expand Down Expand Up @@ -46,20 +43,27 @@ pub async fn command_copy(
redacter_options: Option<RedacterOptions>,
) -> AppResult<CopyCommandResult> {
let bold_style = Style::new().bold();
let redacted_output = if let Some(ref options) = redacter_options {
let redacted_output = if let Some(ref options) = redacter_options.as_ref() {
bold_style
.clone()
.green()
.apply_to(format!("✓ Yes ({})", options))
.apply_to(format!("✓ Yes ({})", &options))
} else {
bold_style.clone().red().apply_to("✗ No".to_string())
};
let sampling_output =
if let Some(ref sampling_size) = redacter_options.as_ref().and_then(|o| o.sampling_size) {
Style::new().apply_to(format!("{} bytes.", sampling_size))
} else {
Style::new().dim().apply_to("-".to_string())
};
term.write_line(
format!(
"Copying from {} to {}.\nRedacting: {}.",
"Copying from {} to {}.\nRedacting: {}.\nSampling: {}\n",
bold_style.clone().white().apply_to(source),
bold_style.clone().yellow().apply_to(destination),
redacted_output
redacted_output,
sampling_output
)
.as_str(),
)?;
Expand Down Expand Up @@ -235,10 +239,10 @@ async fn redact_upload_file<
base_resolved_file_ref: &AbsoluteFilePath,
dest_file_ref: &FileSystemRef,
redacter: &impl Redacter,
) -> AppResult<TransferFileResult> {
) -> AppResult<crate::commands::copy_command::TransferFileResult> {
let redacter_supported_options = redacter.redact_supported_options(dest_file_ref).await?;
if redacter_supported_options != RedactSupportedOptions::Unsupported {
match redact_stream(
match crate::redacters::redact_stream(
redacter,
&redacter_supported_options,
source_reader,
Expand Down Expand Up @@ -300,106 +304,3 @@ async fn redact_upload_file<
Ok(TransferFileResult::Skipped)
}
}

async fn redact_stream<
S: Stream<Item = AppResult<bytes::Bytes>> + Send + Unpin + Sync + 'static,
>(
redacter: &impl Redacter,
supported_options: &RedactSupportedOptions,
input: S,
file_ref: &FileSystemRef,
) -> AppResult<Box<dyn Stream<Item = AppResult<bytes::Bytes>> + Send + Sync + Unpin + 'static>> {
let content_to_redact = match file_ref.media_type {
Some(ref mime)
if Redacters::is_mime_text(mime)
|| (Redacters::is_mime_table(mime)
&& matches!(supported_options, RedactSupportedOptions::SupportedAsText)) =>
{
let all_chunks: Vec<bytes::Bytes> = input.try_collect().await?;
let all_bytes = all_chunks.concat();
let content = String::from_utf8(all_bytes).map_err(|e| AppError::SystemError {
message: format!("Failed to convert bytes to string: {}", e),
})?;
Ok(RedacterDataItem {
content: RedacterDataItemContent::Value(content),
file_ref: file_ref.clone(),
})
}
Some(ref mime) if Redacters::is_mime_image(mime) => {
let all_chunks: Vec<bytes::Bytes> = input.try_collect().await?;
let all_bytes = all_chunks.concat();
Ok(RedacterDataItem {
content: RedacterDataItemContent::Image {
mime_type: mime.clone(),
data: all_bytes.into(),
},
file_ref: file_ref.clone(),
})
}
Some(ref mime) if Redacters::is_mime_table(mime) => {
let reader = tokio_util::io::StreamReader::new(
input.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)),
);
let mut reader = csv_async::AsyncReaderBuilder::default()
.has_headers(!redacter.options().csv_headers_disable)
.delimiter(
redacter
.options()
.csv_delimiter
.as_ref()
.cloned()
.unwrap_or(b','),
)
.create_reader(reader);
let headers = if !redacter.options().csv_headers_disable {
reader
.headers()
.await?
.into_iter()
.map(|h| h.to_string())
.collect()
} else {
vec![]
};
let records: Vec<csv_async::StringRecord> = reader.records().try_collect().await?;
Ok(RedacterDataItem {
content: RedacterDataItemContent::Table {
headers,
rows: records
.iter()
.map(|r| r.iter().map(|c| c.to_string()).collect())
.collect(),
},
file_ref: file_ref.clone(),
})
}
Some(ref mime) => Err(AppError::SystemError {
message: format!("Media type {} is not supported for redaction", mime),
}),
None => Err(AppError::SystemError {
message: "Media type is not provided to redact".to_string(),
}),
}?;

let content = redacter.redact(content_to_redact).await?;

match content {
RedacterDataItemContent::Value(content) => {
let bytes = bytes::Bytes::from(content.into_bytes());
Ok(Box::new(futures::stream::iter(vec![Ok(bytes)])))
}
RedacterDataItemContent::Image { data, .. } => {
Ok(Box::new(futures::stream::iter(vec![Ok(data)])))
}
RedacterDataItemContent::Table { headers, rows } => {
let mut writer = csv_async::AsyncWriter::from_writer(vec![]);
writer.write_record(headers).await?;
for row in rows {
writer.write_record(row).await?;
}
writer.flush().await?;
let bytes = bytes::Bytes::from(writer.into_inner().await?);
Ok(Box::new(futures::stream::iter(vec![Ok(bytes)])))
}
}
}
1 change: 1 addition & 0 deletions src/redacters/aws_comprehend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ mod tests {
allow_unsupported_copies: false,
csv_headers_disable: false,
csv_delimiter: None,
sampling_size: None,
};

let redacter = AwsComprehendRedacter::new(
Expand Down
1 change: 1 addition & 0 deletions src/redacters/gcp_dlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ mod tests {
allow_unsupported_copies: false,
csv_headers_disable: false,
csv_delimiter: None,
sampling_size: None,
};

let redacter = GcpDlpRedacter::new(
Expand Down
1 change: 1 addition & 0 deletions src/redacters/gemini_llm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ mod tests {
allow_unsupported_copies: false,
csv_headers_disable: false,
csv_delimiter: None,
sampling_size: None,
};

let redacter = GeminiLlmRedacter::new(
Expand Down
121 changes: 118 additions & 3 deletions src/redacters/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::filesystems::FileSystemRef;
use crate::reporter::AppReporter;
use crate::AppResult;
use futures::{Stream, TryStreamExt};
use gcloud_sdk::prost::bytes;
use mime::Mime;
use std::fmt::Display;

use crate::filesystems::FileSystemRef;
use crate::reporter::AppReporter;

mod gcp_dlp;
pub use gcp_dlp::*;

Expand All @@ -16,6 +16,7 @@ mod ms_presidio;
pub use ms_presidio::*;

mod gemini_llm;
use crate::errors::AppError;
pub use gemini_llm::*;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -51,6 +52,7 @@ pub struct RedacterOptions {
pub allow_unsupported_copies: bool,
pub csv_headers_disable: bool,
pub csv_delimiter: Option<u8>,
pub sampling_size: Option<usize>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -171,3 +173,116 @@ impl<'a> Redacter for Redacters<'a> {
}
}
}

pub async fn redact_stream<
S: Stream<Item = AppResult<bytes::Bytes>> + Send + Unpin + Sync + 'static,
>(
redacter: &impl Redacter,
supported_options: &RedactSupportedOptions,
input: S,
file_ref: &FileSystemRef,
) -> AppResult<Box<dyn Stream<Item = AppResult<bytes::Bytes>> + Send + Sync + Unpin + 'static>> {
let content_to_redact = match file_ref.media_type {
Some(ref mime)
if Redacters::is_mime_text(mime)
|| (Redacters::is_mime_table(mime)
&& matches!(supported_options, RedactSupportedOptions::SupportedAsText)) =>
{
let all_chunks: Vec<bytes::Bytes> = input.try_collect().await?;
let all_bytes = all_chunks.concat();
let whole_content =
String::from_utf8(all_bytes).map_err(|e| AppError::SystemError {
message: format!("Failed to convert bytes to string: {}", e),
})?;
let content = if let Some(sampling_size) = redacter.options().sampling_size {
let sampling_size = std::cmp::min(sampling_size, whole_content.len());
whole_content
.chars()
.take(sampling_size)
.collect::<String>()
} else {
whole_content
};
Ok(RedacterDataItem {
content: RedacterDataItemContent::Value(content),
file_ref: file_ref.clone(),
})
}
Some(ref mime) if Redacters::is_mime_image(mime) => {
let all_chunks: Vec<bytes::Bytes> = input.try_collect().await?;
let all_bytes = all_chunks.concat();
Ok(RedacterDataItem {
content: RedacterDataItemContent::Image {
mime_type: mime.clone(),
data: all_bytes.into(),
},
file_ref: file_ref.clone(),
})
}
Some(ref mime) if Redacters::is_mime_table(mime) => {
let reader = tokio_util::io::StreamReader::new(
input.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)),
);
let mut reader = csv_async::AsyncReaderBuilder::default()
.has_headers(!redacter.options().csv_headers_disable)
.delimiter(
redacter
.options()
.csv_delimiter
.as_ref()
.cloned()
.unwrap_or(b','),
)
.create_reader(reader);
let headers = if !redacter.options().csv_headers_disable {
reader
.headers()
.await?
.into_iter()
.map(|h| h.to_string())
.collect()
} else {
vec![]
};
let records: Vec<csv_async::StringRecord> = reader.records().try_collect().await?;
Ok(RedacterDataItem {
content: RedacterDataItemContent::Table {
headers,
rows: records
.iter()
.map(|r| r.iter().map(|c| c.to_string()).collect())
.collect(),
},
file_ref: file_ref.clone(),
})
}
Some(ref mime) => Err(AppError::SystemError {
message: format!("Media type {} is not supported for redaction", mime),
}),
None => Err(AppError::SystemError {
message: "Media type is not provided to redact".to_string(),
}),
}?;

let content = redacter.redact(content_to_redact).await?;

match content {
RedacterDataItemContent::Value(content) => {
let bytes = bytes::Bytes::from(content.into_bytes());
Ok(Box::new(futures::stream::iter(vec![Ok(bytes)])))
}
RedacterDataItemContent::Image { data, .. } => {
Ok(Box::new(futures::stream::iter(vec![Ok(data)])))
}
RedacterDataItemContent::Table { headers, rows } => {
let mut writer = csv_async::AsyncWriter::from_writer(vec![]);
writer.write_record(headers).await?;
for row in rows {
writer.write_record(row).await?;
}
writer.flush().await?;
let bytes = bytes::Bytes::from(writer.into_inner().await?);
Ok(Box::new(futures::stream::iter(vec![Ok(bytes)])))
}
}
}
1 change: 1 addition & 0 deletions src/redacters/ms_presidio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ mod tests {
allow_unsupported_copies: false,
csv_headers_disable: false,
csv_delimiter: None,
sampling_size: None,
};

let redacter = MsPresidioRedacter::new(
Expand Down

0 comments on commit 27e5be7

Please sign in to comment.