From 66b4dff41d42b29595cb7ba78d849f4f59149992 Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Sun, 11 Aug 2024 19:44:38 +0200 Subject: [PATCH] Multiple DLPs support (#9) * Multiple DLP processing support * Progress bar update to files counter instead of bytes --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/args.rs | 111 +++++++++++++++++--------------- src/commands/copy_command.rs | 54 ++++++++++------ src/errors.rs | 3 + src/redacters/aws_comprehend.rs | 24 +++---- src/redacters/gcp_dlp.rs | 43 ++++++------- src/redacters/gemini_llm.rs | 25 ++++--- src/redacters/mod.rs | 83 +++++++++++++++++------- src/redacters/ms_presidio.rs | 37 +++++------ src/redacters/open_ai_llm.rs | 23 +++---- 11 files changed, 232 insertions(+), 175 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ea7e198..f588a86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2242,7 +2242,7 @@ dependencies = [ [[package]] name = "redacter" -version = "0.5.1" +version = "0.6.0" dependencies = [ "async-recursion", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 7e94ead..006fb01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "redacter" -version = "0.5.1" +version = "0.6.0" edition = "2021" authors = ["Abdulla Abdurakhmanov "] license = "Apache-2.0" diff --git a/src/args.rs b/src/args.rs index a1a7164..61ee878 100644 --- a/src/args.rs +++ b/src/args.rs @@ -95,8 +95,8 @@ impl Display for RedacterType { #[derive(Args, Debug, Clone)] #[group(required = false)] pub struct RedacterArgs { - #[arg(short = 'd', long, value_enum, help = "Redacter type")] - redact: Option, + #[arg(short = 'd', long, value_enum, help = "List of redacters to use")] + redact: Option>, #[arg( long, @@ -156,63 +156,68 @@ impl TryInto for RedacterArgs { type Error = AppError; fn try_into(self) -> Result { - let provider_options = match self.redact { - Some(RedacterType::GcpDlp) => match self.gcp_project_id { - Some(project_id) => Ok(RedacterProviderOptions::GcpDlp(GcpDlpRedacterOptions { - project_id, - })), - None => Err(AppError::RedacterConfigError { - message: "GCP project id is required for GCP DLP redacter".to_string(), - }), - }, - Some(RedacterType::AwsComprehend) => Ok(RedacterProviderOptions::AwsComprehend( - crate::redacters::AwsComprehendRedacterOptions { - region: self.aws_region.map(aws_config::Region::new), + let mut provider_options = + Vec::with_capacity(self.redact.as_ref().map(Vec::len).unwrap_or(0)); + for options in self.redact.unwrap_or_else(Vec::new) { + let redacter_options = match options { + RedacterType::GcpDlp => match self.gcp_project_id { + Some(ref project_id) => { + Ok(RedacterProviderOptions::GcpDlp(GcpDlpRedacterOptions { + project_id: project_id.clone(), + })) + } + None => Err(AppError::RedacterConfigError { + message: "GCP project id is required for GCP DLP redacter".to_string(), + }), }, - )), - Some(RedacterType::MsPresidio) => { - if self.ms_presidio_text_analyze_url.is_none() - && self.ms_presidio_image_redact_url.is_none() - { - return Err(AppError::RedacterConfigError { - message: + RedacterType::AwsComprehend => Ok(RedacterProviderOptions::AwsComprehend( + crate::redacters::AwsComprehendRedacterOptions { + region: self.aws_region.clone().map(aws_config::Region::new), + }, + )), + RedacterType::MsPresidio => { + if self.ms_presidio_text_analyze_url.is_none() + && self.ms_presidio_image_redact_url.is_none() + { + return Err(AppError::RedacterConfigError { + message: "MsPresidio requires text analyze/image URL specified (at least one)" .to_string(), - }); + }); + } + Ok(RedacterProviderOptions::MsPresidio( + crate::redacters::MsPresidioRedacterOptions { + text_analyze_url: self.ms_presidio_text_analyze_url.clone(), + image_redact_url: self.ms_presidio_image_redact_url.clone(), + }, + )) } - Ok(RedacterProviderOptions::MsPresidio( - crate::redacters::MsPresidioRedacterOptions { - text_analyze_url: self.ms_presidio_text_analyze_url, - image_redact_url: self.ms_presidio_image_redact_url, + RedacterType::GeminiLlm => Ok(RedacterProviderOptions::GeminiLlm( + crate::redacters::GeminiLlmRedacterOptions { + project_id: self.gcp_project_id.clone().ok_or_else(|| { + AppError::RedacterConfigError { + message: "GCP project id is required for Gemini LLM redacter" + .to_string(), + } + })?, + gemini_model: self.gemini_model.clone(), }, - )) - } - Some(RedacterType::GeminiLlm) => Ok(RedacterProviderOptions::GeminiLlm( - crate::redacters::GeminiLlmRedacterOptions { - project_id: self.gcp_project_id.ok_or_else(|| { - AppError::RedacterConfigError { - message: "GCP project id is required for Gemini LLM redacter" - .to_string(), - } - })?, - gemini_model: self.gemini_model, - }, - )), - Some(RedacterType::OpenAiLlm) => Ok(RedacterProviderOptions::OpenAiLlm( - crate::redacters::OpenAiLlmRedacterOptions { - api_key: self - .open_ai_api_key - .ok_or_else(|| AppError::RedacterConfigError { - message: "OpenAI API key is required for OpenAI LLM redacter" - .to_string(), + )), + RedacterType::OpenAiLlm => Ok(RedacterProviderOptions::OpenAiLlm( + crate::redacters::OpenAiLlmRedacterOptions { + api_key: self.open_ai_api_key.clone().ok_or_else(|| { + AppError::RedacterConfigError { + message: "OpenAI API key is required for OpenAI LLM redacter" + .to_string(), + } })?, - model: self.open_ai_model, - }, - )), - None => Err(AppError::RedacterConfigError { - message: "Redacter type is required".to_string(), - }), - }?; + model: self.open_ai_model.clone(), + }, + )), + }?; + provider_options.push(redacter_options); + } + let base_options = RedacterBaseOptions { allow_unsupported_copies: self.allow_unsupported_copies, csv_headers_disable: self.csv_headers_disable, diff --git a/src/commands/copy_command.rs b/src/commands/copy_command.rs index 7893569..e8a64df 100644 --- a/src/commands/copy_command.rs +++ b/src/commands/copy_command.rs @@ -72,21 +72,27 @@ pub async fn command_copy( .as_str(), )?; let bar = ProgressBar::new(1); - bar.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})") - .unwrap() - .with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) - .progress_chars("◉>◯")); + bar.set_style( + ProgressStyle::with_template( + "{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos:>3}/{len:3}", + )? + .progress_chars("◉>◯"), + ); bar.enable_steady_tick(Duration::from_millis(100)); let app_reporter = AppReporter::from(&bar); let mut source_fs = DetectFileSystem::open(source, &app_reporter).await?; let mut destination_fs = DetectFileSystem::open(destination, &app_reporter).await?; - let maybe_redacter = match redacter_options { - Some(options) => Some(( - options.base_options, - Redacters::new_redacter(options.provider_options, &app_reporter).await?, - )), + let maybe_redacters = match redacter_options { + Some(options) => { + let mut redacters = Vec::with_capacity(options.provider_options.len()); + for provider_options in options.provider_options { + let redacter = Redacters::new_redacter(provider_options, &app_reporter).await?; + redacters.push(redacter); + } + Some((options.base_options, redacters)) + } None => None, }; @@ -113,7 +119,7 @@ pub async fn command_copy( .as_str(), ); - bar.set_length(files_total_size); + bar.set_length(files_found as u64); let mut total_files_copied = 0; let mut total_files_skipped = source_files_result.skipped; for source_file in source_files { @@ -123,7 +129,7 @@ pub async fn command_copy( &mut source_fs, &mut destination_fs, &options, - &maybe_redacter, + &maybe_redacters, ) .await? { @@ -143,7 +149,7 @@ pub async fn command_copy( &mut source_fs, &mut destination_fs, &options, - &maybe_redacter, + &maybe_redacters, ) .await? { @@ -179,14 +185,14 @@ async fn transfer_and_redact_file< source_fs: &mut SFS, destination_fs: &mut DFS, options: &CopyCommandOptions, - redacter: &Option<(RedacterBaseOptions, impl Redacter)>, + redacter: &Option<(RedacterBaseOptions, Vec)>, ) -> AppResult { let bold_style = Style::new().bold().white(); let (base_file_ref, source_reader) = source_fs.download(source_file_ref).await?; let base_resolved_file_ref = source_fs.resolve(Some(&base_file_ref)); match options.file_matcher.matches(&base_file_ref) { FileMatcherResult::SkippedDueToSize | FileMatcherResult::SkippedDueToName => { - bar.inc(base_file_ref.file_size.unwrap_or(0)); + bar.inc(1); return Ok(TransferFileResult::Skipped); } FileMatcherResult::Matched => {} @@ -230,7 +236,7 @@ async fn transfer_and_redact_file< .await?; TransferFileResult::Copied }; - bar.inc(file_ref.file_size.unwrap_or(0)); + bar.inc(1); Ok(transfer_result) } @@ -246,17 +252,23 @@ async fn redact_upload_file< source_reader: S, base_resolved_file_ref: &AbsoluteFilePath, dest_file_ref: &FileSystemRef, - redacter_with_options: &(RedacterBaseOptions, impl Redacter), + redacter_with_options: &(RedacterBaseOptions, Vec), ) -> AppResult { - let (redacter_base_options, redacter) = redacter_with_options; - let redacter_supported_options = redacter.redact_supported_options(dest_file_ref).await?; - if redacter_supported_options != RedactSupportedOptions::Unsupported { + let (redacter_base_options, redacters) = redacter_with_options; + let mut support_redacters = Vec::new(); + for redacter in redacters { + let redacter_supported_options = redacter.redact_supported_options(dest_file_ref).await?; + if redacter_supported_options != RedactSupportedOptions::Unsupported { + support_redacters.push(redacter); + } + } + if !support_redacters.is_empty() { match crate::redacters::redact_stream( - redacter, + &support_redacters, redacter_base_options, - &redacter_supported_options, source_reader, dest_file_ref, + bar, ) .await { diff --git a/src/errors.rs b/src/errors.rs index ee62e58..4ca5002 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,4 +1,5 @@ use gcloud_sdk::tonic::metadata::errors::InvalidMetadataValue; +use indicatif::style::TemplateError; use thiserror::Error; #[derive(Error, Debug)] @@ -31,6 +32,8 @@ pub enum AppError { CsvParserError(#[from] csv_async::Error), #[error("Redacter config error: {message}")] RedacterConfigError { message: String }, + #[error("Template error: {0}")] + TemplateError(#[from] TemplateError), #[error("System error: {message}")] SystemError { message: String }, } diff --git a/src/redacters/aws_comprehend.rs b/src/redacters/aws_comprehend.rs index 723bd1a..7ad072b 100644 --- a/src/redacters/aws_comprehend.rs +++ b/src/redacters/aws_comprehend.rs @@ -1,3 +1,4 @@ +use crate::args::RedacterType; use crate::errors::AppError; use crate::filesystems::FileSystemRef; use crate::redacters::{ @@ -6,7 +7,6 @@ use crate::redacters::{ use crate::reporter::AppReporter; use crate::AppResult; use aws_config::Region; -use rvstruct::ValueStruct; #[derive(Debug, Clone)] pub struct AwsComprehendRedacterOptions { @@ -16,6 +16,7 @@ pub struct AwsComprehendRedacterOptions { #[derive(Clone)] pub struct AwsComprehendRedacter<'a> { client: aws_sdk_comprehend::Client, + #[allow(dead_code)] reporter: &'a AppReporter<'a>, } @@ -33,15 +34,7 @@ impl<'a> AwsComprehendRedacter<'a> { Ok(Self { client, reporter }) } - pub async fn redact_text_file( - &self, - input: RedacterDataItem, - ) -> AppResult { - self.reporter.report(format!( - "Redacting a text file: {} ({:?})", - input.file_ref.relative_path.value(), - input.file_ref.media_type - ))?; + pub async fn redact_text_file(&self, input: RedacterDataItem) -> AppResult { let text_content = match input.content { RedacterDataItemContent::Value(content) => Ok(content), _ => Err(AppError::SystemError { @@ -76,12 +69,15 @@ impl<'a> AwsComprehendRedacter<'a> { } }) }); - Ok(RedacterDataItemContent::Value(redacted_content)) + Ok(RedacterDataItem { + file_ref: input.file_ref, + content: RedacterDataItemContent::Value(redacted_content), + }) } } impl<'a> Redacter for AwsComprehendRedacter<'a> { - async fn redact(&self, input: RedacterDataItem) -> AppResult { + async fn redact(&self, input: RedacterDataItem) -> AppResult { match &input.content { RedacterDataItemContent::Value(_) => self.redact_text_file(input).await, RedacterDataItemContent::Table { .. } | RedacterDataItemContent::Image { .. } => { @@ -106,6 +102,10 @@ impl<'a> Redacter for AwsComprehendRedacter<'a> { _ => RedactSupportedOptions::Unsupported, }) } + + fn redacter_type(&self) -> RedacterType { + RedacterType::AwsComprehend + } } #[allow(unused_imports)] diff --git a/src/redacters/gcp_dlp.rs b/src/redacters/gcp_dlp.rs index ff8424f..910ea40 100644 --- a/src/redacters/gcp_dlp.rs +++ b/src/redacters/gcp_dlp.rs @@ -1,3 +1,4 @@ +use crate::args::RedacterType; use crate::common_types::GcpProjectId; use crate::errors::AppError; use crate::filesystems::FileSystemRef; @@ -16,6 +17,7 @@ use rvstruct::ValueStruct; pub struct GcpDlpRedacter<'a> { client: GoogleApi>, gcp_dlp_options: GcpDlpRedacterOptions, + #[allow(dead_code)] reporter: &'a AppReporter<'a>, } @@ -61,15 +63,7 @@ impl<'a> GcpDlpRedacter<'a> { }) } - pub async fn redact_text_file( - &self, - input: RedacterDataItem, - ) -> AppResult { - self.reporter.report(format!( - "Redacting a text file: {} ({:?})", - input.file_ref.relative_path.value(), - input.file_ref.media_type - ))?; + pub async fn redact_text_file(&self, input: RedacterDataItem) -> AppResult { let mut request = tonic::Request::new( gcloud_sdk::google::privacy::dlp::v2::DeidentifyContentRequest { parent: format!( @@ -91,7 +85,11 @@ impl<'a> GcpDlpRedacter<'a> { let response = self.client.get().deidentify_content(request).await?; if let Some(content_item) = response.into_inner().item { - content_item.try_into() + let content: RedacterDataItemContent = content_item.try_into()?; + Ok(RedacterDataItem { + file_ref: input.file_ref, + content, + }) } else { Err(AppError::SystemError { message: "No content item in the response".to_string(), @@ -99,18 +97,9 @@ impl<'a> GcpDlpRedacter<'a> { } } - pub async fn redact_image_file( - &self, - input: RedacterDataItem, - ) -> AppResult { + pub async fn redact_image_file(&self, input: RedacterDataItem) -> AppResult { match &input.content { - RedacterDataItemContent::Image { mime_type, data } => { - self.reporter.report(format!( - "Redacting an image file: {} ({:?}). Size: {}", - input.file_ref.relative_path.value(), - &mime_type, - data.len() - ))?; + RedacterDataItemContent::Image { mime_type, data: _ } => { let output_mime = mime_type.clone(); let mut request = tonic::Request::new(gcloud_sdk::google::privacy::dlp::v2::RedactImageRequest { @@ -130,9 +119,13 @@ impl<'a> GcpDlpRedacter<'a> { ); let response = self.client.get().redact_image(request).await?; - Ok(RedacterDataItemContent::Image { + let content = RedacterDataItemContent::Image { mime_type: output_mime, data: response.into_inner().redacted_image.into(), + }; + Ok(RedacterDataItem { + file_ref: input.file_ref, + content, }) } _ => Err(AppError::SystemError { @@ -194,7 +187,7 @@ impl<'a> GcpDlpRedacter<'a> { } impl<'a> Redacter for GcpDlpRedacter<'a> { - async fn redact(&self, input: RedacterDataItem) -> AppResult { + async fn redact(&self, input: RedacterDataItem) -> AppResult { match &input.content { RedacterDataItemContent::Table { .. } | RedacterDataItemContent::Value(_) => { self.redact_text_file(input).await @@ -226,6 +219,10 @@ impl<'a> Redacter for GcpDlpRedacter<'a> { }, ) } + + fn redacter_type(&self) -> RedacterType { + RedacterType::GcpDlp + } } impl TryInto for RedacterDataItemContent { diff --git a/src/redacters/gemini_llm.rs b/src/redacters/gemini_llm.rs index ed8f2d2..6c61f3d 100644 --- a/src/redacters/gemini_llm.rs +++ b/src/redacters/gemini_llm.rs @@ -1,3 +1,4 @@ +use crate::args::RedacterType; use crate::common_types::GcpProjectId; use crate::errors::AppError; use crate::filesystems::FileSystemRef; @@ -24,6 +25,7 @@ pub struct GeminiLlmModelName(String); pub struct GeminiLlmRedacter<'a> { client: GoogleApi>, gemini_llm_options: crate::redacters::GeminiLlmRedacterOptions, + #[allow(dead_code)] reporter: &'a AppReporter<'a>, } @@ -49,23 +51,13 @@ impl<'a> GeminiLlmRedacter<'a> { }) } - pub async fn redact_text_file( - &self, - input: RedacterDataItem, - ) -> AppResult { + pub async fn redact_text_file(&self, input: RedacterDataItem) -> AppResult { let model_name = self .gemini_llm_options .gemini_model .as_ref() .map(|model_name| model_name.value().to_string()) .unwrap_or_else(|| Self::DEFAULT_GEMINI_MODEL.to_string()); - self.reporter.report(format!( - "Redacting a text file: {} ({:?}) using Gemini LLM model: {}", - input.file_ref.relative_path.value(), - input.file_ref.media_type, - model_name - ))?; - let mut rand = rand::thread_rng(); let generate_random_text_separator = format!("---{}", rand.gen::()); @@ -154,7 +146,10 @@ impl<'a> GeminiLlmRedacter<'a> { _ => acc, }); - Ok(RedacterDataItemContent::Value(redacted_content_text)) + Ok(RedacterDataItem { + file_ref: input.file_ref, + content: RedacterDataItemContent::Value(redacted_content_text), + }) } else { Err(AppError::SystemError { message: "No content item in the response".to_string(), @@ -169,7 +164,7 @@ impl<'a> GeminiLlmRedacter<'a> { } impl<'a> Redacter for GeminiLlmRedacter<'a> { - async fn redact(&self, input: RedacterDataItem) -> AppResult { + async fn redact(&self, input: RedacterDataItem) -> AppResult { match &input.content { RedacterDataItemContent::Value(_) => self.redact_text_file(input).await, RedacterDataItemContent::Table { .. } | RedacterDataItemContent::Image { .. } => { @@ -194,6 +189,10 @@ impl<'a> Redacter for GeminiLlmRedacter<'a> { _ => RedactSupportedOptions::Unsupported, }) } + + fn redacter_type(&self) -> RedacterType { + RedacterType::GeminiLlm + } } #[allow(unused_imports)] diff --git a/src/redacters/mod.rs b/src/redacters/mod.rs index d9e8e51..1286825 100644 --- a/src/redacters/mod.rs +++ b/src/redacters/mod.rs @@ -4,7 +4,9 @@ use crate::reporter::AppReporter; use crate::AppResult; use futures::{Stream, TryStreamExt}; use gcloud_sdk::prost::bytes; +use indicatif::ProgressBar; use mime::Mime; +use rvstruct::ValueStruct; use std::fmt::Display; mod gcp_dlp; @@ -20,6 +22,7 @@ mod gemini_llm; pub use gemini_llm::*; mod open_ai_llm; +use crate::args::RedacterType; pub use open_ai_llm::*; #[derive(Debug, Clone)] @@ -44,7 +47,7 @@ pub enum RedacterDataItemContent { #[derive(Clone)] pub enum Redacters<'a> { GcpDlp(GcpDlpRedacter<'a>), - AwsComprehendDlp(AwsComprehendRedacter<'a>), + AwsComprehend(AwsComprehendRedacter<'a>), MsPresidio(MsPresidioRedacter<'a>), GeminiLlm(GeminiLlmRedacter<'a>), OpenAiLlm(OpenAiLlmRedacter<'a>), @@ -52,7 +55,7 @@ pub enum Redacters<'a> { #[derive(Debug, Clone)] pub struct RedacterOptions { - pub provider_options: RedacterProviderOptions, + pub provider_options: Vec, pub base_options: RedacterBaseOptions, } @@ -75,13 +78,19 @@ pub enum RedacterProviderOptions { impl Display for RedacterOptions { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.provider_options { - RedacterProviderOptions::GcpDlp(_) => write!(f, "gcp-dlp"), - RedacterProviderOptions::AwsComprehend(_) => write!(f, "aws-comprehend-dlp"), - RedacterProviderOptions::MsPresidio(_) => write!(f, "ms-presidio"), - RedacterProviderOptions::GeminiLlm(_) => write!(f, "gemini-llm"), - RedacterProviderOptions::OpenAiLlm(_) => write!(f, "openai-llm"), - } + let to_display = self + .provider_options + .iter() + .map(|o| match o { + RedacterProviderOptions::GcpDlp(_) => "gcp-dlp".to_string(), + RedacterProviderOptions::AwsComprehend(_) => "aws-comprehend-dlp".to_string(), + RedacterProviderOptions::MsPresidio(_) => "ms-presidio".to_string(), + RedacterProviderOptions::GeminiLlm(_) => "gemini-llm".to_string(), + RedacterProviderOptions::OpenAiLlm(_) => "openai-llm".to_string(), + }) + .collect::>() + .join(", "); + write!(f, "{}", to_display) } } @@ -94,7 +103,7 @@ impl<'a> Redacters<'a> { RedacterProviderOptions::GcpDlp(options) => Ok(Redacters::GcpDlp( GcpDlpRedacter::new(options, reporter).await?, )), - RedacterProviderOptions::AwsComprehend(options) => Ok(Redacters::AwsComprehendDlp( + RedacterProviderOptions::AwsComprehend(options) => Ok(Redacters::AwsComprehend( AwsComprehendRedacter::new(options, reporter).await?, )), RedacterProviderOptions::MsPresidio(options) => Ok(Redacters::MsPresidio( @@ -142,19 +151,21 @@ pub enum RedactSupportedOptions { } pub trait Redacter { - async fn redact(&self, input: RedacterDataItem) -> AppResult; + async fn redact(&self, input: RedacterDataItem) -> AppResult; async fn redact_supported_options( &self, file_ref: &FileSystemRef, ) -> AppResult; + + fn redacter_type(&self) -> RedacterType; } impl<'a> Redacter for Redacters<'a> { - async fn redact(&self, input: RedacterDataItem) -> AppResult { + async fn redact(&self, input: RedacterDataItem) -> AppResult { match self { Redacters::GcpDlp(redacter) => redacter.redact(input).await, - Redacters::AwsComprehendDlp(redacter) => redacter.redact(input).await, + Redacters::AwsComprehend(redacter) => redacter.redact(input).await, Redacters::MsPresidio(redacter) => redacter.redact(input).await, Redacters::GeminiLlm(redacter) => redacter.redact(input).await, Redacters::OpenAiLlm(redacter) => redacter.redact(input).await, @@ -167,30 +178,49 @@ impl<'a> Redacter for Redacters<'a> { ) -> AppResult { match self { Redacters::GcpDlp(redacter) => redacter.redact_supported_options(file_ref).await, - Redacters::AwsComprehendDlp(redacter) => { - redacter.redact_supported_options(file_ref).await - } + Redacters::AwsComprehend(redacter) => redacter.redact_supported_options(file_ref).await, Redacters::MsPresidio(redacter) => redacter.redact_supported_options(file_ref).await, Redacters::GeminiLlm(redacter) => redacter.redact_supported_options(file_ref).await, Redacters::OpenAiLlm(redacter) => redacter.redact_supported_options(file_ref).await, } } + + fn redacter_type(&self) -> RedacterType { + match self { + Redacters::GcpDlp(_) => RedacterType::GcpDlp, + Redacters::AwsComprehend(_) => RedacterType::AwsComprehend, + Redacters::MsPresidio(_) => RedacterType::MsPresidio, + Redacters::GeminiLlm(_) => RedacterType::GeminiLlm, + Redacters::OpenAiLlm(_) => RedacterType::OpenAiLlm, + } + } } pub async fn redact_stream< S: Stream> + Send + Unpin + Sync + 'static, >( - redacter: &impl Redacter, + redacters: &Vec<&impl Redacter>, redacter_base_options: &RedacterBaseOptions, - supported_options: &RedactSupportedOptions, input: S, file_ref: &FileSystemRef, + bar: &ProgressBar, ) -> AppResult> + Send + Sync + Unpin + 'static>> { - let content_to_redact = match file_ref.media_type { + let mut redacters_supported_options = Vec::with_capacity(redacters.len()); + for redacter in redacters { + let supported_options = redacter.redact_supported_options(file_ref).await?; + redacters_supported_options.push((redacter, supported_options)); + } + + let mut item_to_redact = match file_ref.media_type { Some(ref mime) if Redacters::is_mime_text(mime) || (Redacters::is_mime_table(mime) - && matches!(supported_options, RedactSupportedOptions::SupportedAsText)) => + && redacters_supported_options + .iter() + .any(|(_, o)| matches!(o, RedactSupportedOptions::SupportedAsText)) + && !redacters_supported_options + .iter() + .all(|(_, o)| matches!(o, RedactSupportedOptions::Supported))) => { let all_chunks: Vec = input.try_collect().await?; let all_bytes = all_chunks.concat(); @@ -267,9 +297,18 @@ pub async fn redact_stream< }), }?; - let content = redacter.redact(content_to_redact).await?; + for (redacter, options) in redacters_supported_options { + if !matches!(options, RedactSupportedOptions::Unsupported) { + bar.println(format!( + "Redacting {} with {} redacter", + file_ref.relative_path.value(), + redacter.redacter_type() + )); + item_to_redact = redacter.redact(item_to_redact).await?; + } + } - match content { + match item_to_redact.content { RedacterDataItemContent::Value(content) => { let bytes = bytes::Bytes::from(content.into_bytes()); Ok(Box::new(futures::stream::iter(vec![Ok(bytes)]))) diff --git a/src/redacters/ms_presidio.rs b/src/redacters/ms_presidio.rs index fb74ae9..6a74f9b 100644 --- a/src/redacters/ms_presidio.rs +++ b/src/redacters/ms_presidio.rs @@ -2,6 +2,7 @@ use rvstruct::ValueStruct; use serde::{Deserialize, Serialize}; use url::Url; +use crate::args::RedacterType; use crate::errors::AppError; use crate::filesystems::FileSystemRef; use crate::redacters::{ @@ -20,6 +21,7 @@ pub struct MsPresidioRedacterOptions { pub struct MsPresidioRedacter<'a> { client: reqwest::Client, ms_presidio_options: MsPresidioRedacterOptions, + #[allow(dead_code)] reporter: &'a AppReporter<'a>, } @@ -53,15 +55,7 @@ impl<'a> MsPresidioRedacter<'a> { }) } - pub async fn redact_text_file( - &self, - input: RedacterDataItem, - ) -> AppResult { - self.reporter.report(format!( - "Redacting a text file: {} ({:?})", - input.file_ref.relative_path.value(), - input.file_ref.media_type - ))?; + pub async fn redact_text_file(&self, input: RedacterDataItem) -> AppResult { let text_content = match input.content { RedacterDataItemContent::Value(content) => Ok(content), _ => Err(AppError::SystemError { @@ -119,13 +113,13 @@ impl<'a> MsPresidioRedacter<'a> { _ => acc, } }); - Ok(RedacterDataItemContent::Value(redacted_text_content)) + Ok(RedacterDataItem { + file_ref: input.file_ref, + content: RedacterDataItemContent::Value(redacted_text_content), + }) } - pub async fn redact_image_file( - &self, - input: RedacterDataItem, - ) -> AppResult { + pub async fn redact_image_file(&self, input: RedacterDataItem) -> AppResult { let redact_url = self.ms_presidio_options.image_redact_url.as_ref().ok_or( AppError::RedacterConfigError { message: "Image redact URL is not configured".to_string(), @@ -161,9 +155,12 @@ impl<'a> MsPresidioRedacter<'a> { }); } let redacted_image_bytes = response.bytes().await?; - Ok(RedacterDataItemContent::Image { - mime_type, - data: redacted_image_bytes, + Ok(RedacterDataItem { + file_ref: input.file_ref, + content: RedacterDataItemContent::Image { + mime_type, + data: redacted_image_bytes, + }, }) } _ => Err(AppError::SystemError { @@ -174,7 +171,7 @@ impl<'a> MsPresidioRedacter<'a> { } impl<'a> Redacter for MsPresidioRedacter<'a> { - async fn redact(&self, input: RedacterDataItem) -> AppResult { + async fn redact(&self, input: RedacterDataItem) -> AppResult { match &input.content { RedacterDataItemContent::Value(_) => self.redact_text_file(input).await, RedacterDataItemContent::Image { .. } => self.redact_image_file(input).await, @@ -210,6 +207,10 @@ impl<'a> Redacter for MsPresidioRedacter<'a> { _ => RedactSupportedOptions::Unsupported, }) } + + fn redacter_type(&self) -> RedacterType { + RedacterType::MsPresidio + } } #[allow(unused_imports)] diff --git a/src/redacters/open_ai_llm.rs b/src/redacters/open_ai_llm.rs index aa10a6c..03e907e 100644 --- a/src/redacters/open_ai_llm.rs +++ b/src/redacters/open_ai_llm.rs @@ -2,6 +2,7 @@ use rand::Rng; use rvstruct::ValueStruct; use serde::{Deserialize, Serialize}; +use crate::args::RedacterType; use crate::errors::AppError; use crate::filesystems::FileSystemRef; use crate::redacters::{ @@ -26,6 +27,7 @@ pub struct OpenAiLlmRedacterOptions { pub struct OpenAiLlmRedacter<'a> { client: reqwest::Client, open_ai_llm_options: OpenAiLlmRedacterOptions, + #[allow(dead_code)] reporter: &'a AppReporter<'a>, } @@ -66,15 +68,7 @@ impl<'a> OpenAiLlmRedacter<'a> { }) } - pub async fn redact_text_file( - &self, - input: RedacterDataItem, - ) -> AppResult { - self.reporter.report(format!( - "Redacting a text file: {} ({:?})", - input.file_ref.relative_path.value(), - input.file_ref.media_type - ))?; + pub async fn redact_text_file(&self, input: RedacterDataItem) -> AppResult { let text_content = match input.content { RedacterDataItemContent::Value(content) => Ok(content), _ => Err(AppError::SystemError { @@ -136,7 +130,10 @@ impl<'a> OpenAiLlmRedacter<'a> { } let mut open_ai_response: OpenAiLlmAnalyzeResponse = response.json().await?; if let Some(content) = open_ai_response.choices.pop() { - Ok(RedacterDataItemContent::Value(content.message.content)) + Ok(RedacterDataItem { + file_ref: input.file_ref, + content: RedacterDataItemContent::Value(content.message.content), + }) } else { Err(AppError::SystemError { message: "No content item in the response".to_string(), @@ -146,7 +143,7 @@ impl<'a> OpenAiLlmRedacter<'a> { } impl<'a> Redacter for OpenAiLlmRedacter<'a> { - async fn redact(&self, input: RedacterDataItem) -> AppResult { + async fn redact(&self, input: RedacterDataItem) -> AppResult { match &input.content { RedacterDataItemContent::Value(_) => self.redact_text_file(input).await, RedacterDataItemContent::Image { .. } | RedacterDataItemContent::Table { .. } => { @@ -171,6 +168,10 @@ impl<'a> Redacter for OpenAiLlmRedacter<'a> { _ => RedactSupportedOptions::Unsupported, }) } + + fn redacter_type(&self) -> RedacterType { + RedacterType::OpenAiLlm + } } #[allow(unused_imports)]