Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple text sampling #7

Merged
merged 2 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ Options:
URL for image redact endpoint for MsPresidio redacter
--gemini-model <GEMINI_MODEL>
Gemini model name for Gemini LLM redacter. Default is 'models/gemini-1.5-flash'
--sampling-size <SAMPLING_SIZE>
Sampling size in bytes before redacting files. Disabled by default
-h, --help
Print help
```
Expand Down
7 changes: 7 additions & 0 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ pub struct RedacterArgs {
help = "Gemini model name for Gemini LLM redacter. Default is 'models/gemini-1.5-flash'"
)]
pub gemini_model: Option<GeminiLlmModelName>,

#[arg(
long,
help = "Sampling size in bytes before redacting files. Disabled by default"
)]
pub sampling_size: Option<usize>,
}

impl TryInto<RedacterOptions> for RedacterArgs {
Expand Down Expand Up @@ -189,6 +195,7 @@ impl TryInto<RedacterOptions> for RedacterArgs {
allow_unsupported_copies: self.allow_unsupported_copies,
csv_headers_disable: self.csv_headers_disable,
csv_delimiter: self.csv_delimiter.map(|c| c as u8),
sampling_size: self.sampling_size,
})
}
}
129 changes: 15 additions & 114 deletions src/commands/copy_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@ use crate::filesystems::{
AbsoluteFilePath, DetectFileSystem, FileMatcher, FileMatcherResult, FileSystemConnection,
FileSystemRef,
};
use crate::redacters::{
RedactSupportedOptions, Redacter, RedacterDataItem, RedacterDataItemContent, RedacterOptions,
Redacters,
};
use crate::redacters::{RedactSupportedOptions, Redacter, RedacterOptions, Redacters};
use crate::reporter::AppReporter;
use crate::AppResult;
use console::{Style, Term};
use futures::{Stream, TryStreamExt};
use futures::Stream;
use gcloud_sdk::prost::bytes;
use indicatif::*;
use std::error::Error;
Expand Down Expand Up @@ -46,20 +43,27 @@ pub async fn command_copy(
redacter_options: Option<RedacterOptions>,
) -> AppResult<CopyCommandResult> {
let bold_style = Style::new().bold();
let redacted_output = if let Some(ref options) = redacter_options {
let redacted_output = if let Some(ref options) = redacter_options.as_ref() {
bold_style
.clone()
.green()
.apply_to(format!("✓ Yes ({})", options))
.apply_to(format!("✓ Yes ({})", &options))
} else {
bold_style.clone().red().apply_to("✗ No".to_string())
};
let sampling_output =
if let Some(ref sampling_size) = redacter_options.as_ref().and_then(|o| o.sampling_size) {
Style::new().apply_to(format!("{} bytes.", sampling_size))
} else {
Style::new().dim().apply_to("-".to_string())
};
term.write_line(
format!(
"Copying from {} to {}.\nRedacting: {}.",
"Copying from {} to {}.\nRedacting: {}.\nSampling: {}\n",
bold_style.clone().white().apply_to(source),
bold_style.clone().yellow().apply_to(destination),
redacted_output
redacted_output,
sampling_output
)
.as_str(),
)?;
Expand Down Expand Up @@ -235,10 +239,10 @@ async fn redact_upload_file<
base_resolved_file_ref: &AbsoluteFilePath,
dest_file_ref: &FileSystemRef,
redacter: &impl Redacter,
) -> AppResult<TransferFileResult> {
) -> AppResult<crate::commands::copy_command::TransferFileResult> {
let redacter_supported_options = redacter.redact_supported_options(dest_file_ref).await?;
if redacter_supported_options != RedactSupportedOptions::Unsupported {
match redact_stream(
match crate::redacters::redact_stream(
redacter,
&redacter_supported_options,
source_reader,
Expand Down Expand Up @@ -300,106 +304,3 @@ async fn redact_upload_file<
Ok(TransferFileResult::Skipped)
}
}

async fn redact_stream<
S: Stream<Item = AppResult<bytes::Bytes>> + Send + Unpin + Sync + 'static,
>(
redacter: &impl Redacter,
supported_options: &RedactSupportedOptions,
input: S,
file_ref: &FileSystemRef,
) -> AppResult<Box<dyn Stream<Item = AppResult<bytes::Bytes>> + Send + Sync + Unpin + 'static>> {
let content_to_redact = match file_ref.media_type {
Some(ref mime)
if Redacters::is_mime_text(mime)
|| (Redacters::is_mime_table(mime)
&& matches!(supported_options, RedactSupportedOptions::SupportedAsText)) =>
{
let all_chunks: Vec<bytes::Bytes> = input.try_collect().await?;
let all_bytes = all_chunks.concat();
let content = String::from_utf8(all_bytes).map_err(|e| AppError::SystemError {
message: format!("Failed to convert bytes to string: {}", e),
})?;
Ok(RedacterDataItem {
content: RedacterDataItemContent::Value(content),
file_ref: file_ref.clone(),
})
}
Some(ref mime) if Redacters::is_mime_image(mime) => {
let all_chunks: Vec<bytes::Bytes> = input.try_collect().await?;
let all_bytes = all_chunks.concat();
Ok(RedacterDataItem {
content: RedacterDataItemContent::Image {
mime_type: mime.clone(),
data: all_bytes.into(),
},
file_ref: file_ref.clone(),
})
}
Some(ref mime) if Redacters::is_mime_table(mime) => {
let reader = tokio_util::io::StreamReader::new(
input.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)),
);
let mut reader = csv_async::AsyncReaderBuilder::default()
.has_headers(!redacter.options().csv_headers_disable)
.delimiter(
redacter
.options()
.csv_delimiter
.as_ref()
.cloned()
.unwrap_or(b','),
)
.create_reader(reader);
let headers = if !redacter.options().csv_headers_disable {
reader
.headers()
.await?
.into_iter()
.map(|h| h.to_string())
.collect()
} else {
vec![]
};
let records: Vec<csv_async::StringRecord> = reader.records().try_collect().await?;
Ok(RedacterDataItem {
content: RedacterDataItemContent::Table {
headers,
rows: records
.iter()
.map(|r| r.iter().map(|c| c.to_string()).collect())
.collect(),
},
file_ref: file_ref.clone(),
})
}
Some(ref mime) => Err(AppError::SystemError {
message: format!("Media type {} is not supported for redaction", mime),
}),
None => Err(AppError::SystemError {
message: "Media type is not provided to redact".to_string(),
}),
}?;

let content = redacter.redact(content_to_redact).await?;

match content {
RedacterDataItemContent::Value(content) => {
let bytes = bytes::Bytes::from(content.into_bytes());
Ok(Box::new(futures::stream::iter(vec![Ok(bytes)])))
}
RedacterDataItemContent::Image { data, .. } => {
Ok(Box::new(futures::stream::iter(vec![Ok(data)])))
}
RedacterDataItemContent::Table { headers, rows } => {
let mut writer = csv_async::AsyncWriter::from_writer(vec![]);
writer.write_record(headers).await?;
for row in rows {
writer.write_record(row).await?;
}
writer.flush().await?;
let bytes = bytes::Bytes::from(writer.into_inner().await?);
Ok(Box::new(futures::stream::iter(vec![Ok(bytes)])))
}
}
}
1 change: 1 addition & 0 deletions src/redacters/aws_comprehend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ mod tests {
allow_unsupported_copies: false,
csv_headers_disable: false,
csv_delimiter: None,
sampling_size: None,
};

let redacter = AwsComprehendRedacter::new(
Expand Down
1 change: 1 addition & 0 deletions src/redacters/gcp_dlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ mod tests {
allow_unsupported_copies: false,
csv_headers_disable: false,
csv_delimiter: None,
sampling_size: None,
};

let redacter = GcpDlpRedacter::new(
Expand Down
1 change: 1 addition & 0 deletions src/redacters/gemini_llm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ mod tests {
allow_unsupported_copies: false,
csv_headers_disable: false,
csv_delimiter: None,
sampling_size: None,
};

let redacter = GeminiLlmRedacter::new(
Expand Down
121 changes: 118 additions & 3 deletions src/redacters/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::filesystems::FileSystemRef;
use crate::reporter::AppReporter;
use crate::AppResult;
use futures::{Stream, TryStreamExt};
use gcloud_sdk::prost::bytes;
use mime::Mime;
use std::fmt::Display;

use crate::filesystems::FileSystemRef;
use crate::reporter::AppReporter;

mod gcp_dlp;
pub use gcp_dlp::*;

Expand All @@ -16,6 +16,7 @@ mod ms_presidio;
pub use ms_presidio::*;

mod gemini_llm;
use crate::errors::AppError;
pub use gemini_llm::*;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -51,6 +52,7 @@ pub struct RedacterOptions {
pub allow_unsupported_copies: bool,
pub csv_headers_disable: bool,
pub csv_delimiter: Option<u8>,
pub sampling_size: Option<usize>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -171,3 +173,116 @@ impl<'a> Redacter for Redacters<'a> {
}
}
}

pub async fn redact_stream<
S: Stream<Item = AppResult<bytes::Bytes>> + Send + Unpin + Sync + 'static,
>(
redacter: &impl Redacter,
supported_options: &RedactSupportedOptions,
input: S,
file_ref: &FileSystemRef,
) -> AppResult<Box<dyn Stream<Item = AppResult<bytes::Bytes>> + Send + Sync + Unpin + 'static>> {
let content_to_redact = match file_ref.media_type {
Some(ref mime)
if Redacters::is_mime_text(mime)
|| (Redacters::is_mime_table(mime)
&& matches!(supported_options, RedactSupportedOptions::SupportedAsText)) =>
{
let all_chunks: Vec<bytes::Bytes> = input.try_collect().await?;
let all_bytes = all_chunks.concat();
let whole_content =
String::from_utf8(all_bytes).map_err(|e| AppError::SystemError {
message: format!("Failed to convert bytes to string: {}", e),
})?;
let content = if let Some(sampling_size) = redacter.options().sampling_size {
let sampling_size = std::cmp::min(sampling_size, whole_content.len());
whole_content
.chars()
.take(sampling_size)
.collect::<String>()
} else {
whole_content
};
Ok(RedacterDataItem {
content: RedacterDataItemContent::Value(content),
file_ref: file_ref.clone(),
})
}
Some(ref mime) if Redacters::is_mime_image(mime) => {
let all_chunks: Vec<bytes::Bytes> = input.try_collect().await?;
let all_bytes = all_chunks.concat();
Ok(RedacterDataItem {
content: RedacterDataItemContent::Image {
mime_type: mime.clone(),
data: all_bytes.into(),
},
file_ref: file_ref.clone(),
})
}
Some(ref mime) if Redacters::is_mime_table(mime) => {
let reader = tokio_util::io::StreamReader::new(
input.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)),
);
let mut reader = csv_async::AsyncReaderBuilder::default()
.has_headers(!redacter.options().csv_headers_disable)
.delimiter(
redacter
.options()
.csv_delimiter
.as_ref()
.cloned()
.unwrap_or(b','),
)
.create_reader(reader);
let headers = if !redacter.options().csv_headers_disable {
reader
.headers()
.await?
.into_iter()
.map(|h| h.to_string())
.collect()
} else {
vec![]
};
let records: Vec<csv_async::StringRecord> = reader.records().try_collect().await?;
Ok(RedacterDataItem {
content: RedacterDataItemContent::Table {
headers,
rows: records
.iter()
.map(|r| r.iter().map(|c| c.to_string()).collect())
.collect(),
},
file_ref: file_ref.clone(),
})
}
Some(ref mime) => Err(AppError::SystemError {
message: format!("Media type {} is not supported for redaction", mime),
}),
None => Err(AppError::SystemError {
message: "Media type is not provided to redact".to_string(),
}),
}?;

let content = redacter.redact(content_to_redact).await?;

match content {
RedacterDataItemContent::Value(content) => {
let bytes = bytes::Bytes::from(content.into_bytes());
Ok(Box::new(futures::stream::iter(vec![Ok(bytes)])))
}
RedacterDataItemContent::Image { data, .. } => {
Ok(Box::new(futures::stream::iter(vec![Ok(data)])))
}
RedacterDataItemContent::Table { headers, rows } => {
let mut writer = csv_async::AsyncWriter::from_writer(vec![]);
writer.write_record(headers).await?;
for row in rows {
writer.write_record(row).await?;
}
writer.flush().await?;
let bytes = bytes::Bytes::from(writer.into_inner().await?);
Ok(Box::new(futures::stream::iter(vec![Ok(bytes)])))
}
}
}
1 change: 1 addition & 0 deletions src/redacters/ms_presidio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ mod tests {
allow_unsupported_copies: false,
csv_headers_disable: false,
csv_delimiter: None,
sampling_size: None,
};

let redacter = MsPresidioRedacter::new(
Expand Down