Skip to content

Commit

Permalink
Multiple DLPs support (#9)
Browse files Browse the repository at this point in the history
* Multiple DLP processing support

* Progress bar update to files counter instead of bytes
  • Loading branch information
abdolence authored Aug 11, 2024
1 parent 2af5985 commit 66b4dff
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 175 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "redacter"
version = "0.5.1"
version = "0.6.0"
edition = "2021"
authors = ["Abdulla Abdurakhmanov <[email protected]>"]
license = "Apache-2.0"
Expand Down
111 changes: 58 additions & 53 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ impl Display for RedacterType {
#[derive(Args, Debug, Clone)]
#[group(required = false)]
pub struct RedacterArgs {
#[arg(short = 'd', long, value_enum, help = "Redacter type")]
redact: Option<RedacterType>,
#[arg(short = 'd', long, value_enum, help = "List of redacters to use")]
redact: Option<Vec<RedacterType>>,

#[arg(
long,
Expand Down Expand Up @@ -156,63 +156,68 @@ impl TryInto<RedacterOptions> for RedacterArgs {
type Error = AppError;

fn try_into(self) -> Result<RedacterOptions, Self::Error> {
let provider_options = match self.redact {
Some(RedacterType::GcpDlp) => match self.gcp_project_id {
Some(project_id) => Ok(RedacterProviderOptions::GcpDlp(GcpDlpRedacterOptions {
project_id,
})),
None => Err(AppError::RedacterConfigError {
message: "GCP project id is required for GCP DLP redacter".to_string(),
}),
},
Some(RedacterType::AwsComprehend) => Ok(RedacterProviderOptions::AwsComprehend(
crate::redacters::AwsComprehendRedacterOptions {
region: self.aws_region.map(aws_config::Region::new),
let mut provider_options =
Vec::with_capacity(self.redact.as_ref().map(Vec::len).unwrap_or(0));
for options in self.redact.unwrap_or_else(Vec::new) {
let redacter_options = match options {
RedacterType::GcpDlp => match self.gcp_project_id {
Some(ref project_id) => {
Ok(RedacterProviderOptions::GcpDlp(GcpDlpRedacterOptions {
project_id: project_id.clone(),
}))
}
None => Err(AppError::RedacterConfigError {
message: "GCP project id is required for GCP DLP redacter".to_string(),
}),
},
)),
Some(RedacterType::MsPresidio) => {
if self.ms_presidio_text_analyze_url.is_none()
&& self.ms_presidio_image_redact_url.is_none()
{
return Err(AppError::RedacterConfigError {
message:
RedacterType::AwsComprehend => Ok(RedacterProviderOptions::AwsComprehend(
crate::redacters::AwsComprehendRedacterOptions {
region: self.aws_region.clone().map(aws_config::Region::new),
},
)),
RedacterType::MsPresidio => {
if self.ms_presidio_text_analyze_url.is_none()
&& self.ms_presidio_image_redact_url.is_none()
{
return Err(AppError::RedacterConfigError {
message:
"MsPresidio requires text analyze/image URL specified (at least one)"
.to_string(),
});
});
}
Ok(RedacterProviderOptions::MsPresidio(
crate::redacters::MsPresidioRedacterOptions {
text_analyze_url: self.ms_presidio_text_analyze_url.clone(),
image_redact_url: self.ms_presidio_image_redact_url.clone(),
},
))
}
Ok(RedacterProviderOptions::MsPresidio(
crate::redacters::MsPresidioRedacterOptions {
text_analyze_url: self.ms_presidio_text_analyze_url,
image_redact_url: self.ms_presidio_image_redact_url,
RedacterType::GeminiLlm => Ok(RedacterProviderOptions::GeminiLlm(
crate::redacters::GeminiLlmRedacterOptions {
project_id: self.gcp_project_id.clone().ok_or_else(|| {
AppError::RedacterConfigError {
message: "GCP project id is required for Gemini LLM redacter"
.to_string(),
}
})?,
gemini_model: self.gemini_model.clone(),
},
))
}
Some(RedacterType::GeminiLlm) => Ok(RedacterProviderOptions::GeminiLlm(
crate::redacters::GeminiLlmRedacterOptions {
project_id: self.gcp_project_id.ok_or_else(|| {
AppError::RedacterConfigError {
message: "GCP project id is required for Gemini LLM redacter"
.to_string(),
}
})?,
gemini_model: self.gemini_model,
},
)),
Some(RedacterType::OpenAiLlm) => Ok(RedacterProviderOptions::OpenAiLlm(
crate::redacters::OpenAiLlmRedacterOptions {
api_key: self
.open_ai_api_key
.ok_or_else(|| AppError::RedacterConfigError {
message: "OpenAI API key is required for OpenAI LLM redacter"
.to_string(),
)),
RedacterType::OpenAiLlm => Ok(RedacterProviderOptions::OpenAiLlm(
crate::redacters::OpenAiLlmRedacterOptions {
api_key: self.open_ai_api_key.clone().ok_or_else(|| {
AppError::RedacterConfigError {
message: "OpenAI API key is required for OpenAI LLM redacter"
.to_string(),
}
})?,
model: self.open_ai_model,
},
)),
None => Err(AppError::RedacterConfigError {
message: "Redacter type is required".to_string(),
}),
}?;
model: self.open_ai_model.clone(),
},
)),
}?;
provider_options.push(redacter_options);
}

let base_options = RedacterBaseOptions {
allow_unsupported_copies: self.allow_unsupported_copies,
csv_headers_disable: self.csv_headers_disable,
Expand Down
54 changes: 33 additions & 21 deletions src/commands/copy_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,27 @@ pub async fn command_copy(
.as_str(),
)?;
let bar = ProgressBar::new(1);
bar.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")
.unwrap()
.with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap())
.progress_chars("◉>◯"));
bar.set_style(
ProgressStyle::with_template(
"{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos:>3}/{len:3}",
)?
.progress_chars("◉>◯"),
);
bar.enable_steady_tick(Duration::from_millis(100));
let app_reporter = AppReporter::from(&bar);

let mut source_fs = DetectFileSystem::open(source, &app_reporter).await?;
let mut destination_fs = DetectFileSystem::open(destination, &app_reporter).await?;

let maybe_redacter = match redacter_options {
Some(options) => Some((
options.base_options,
Redacters::new_redacter(options.provider_options, &app_reporter).await?,
)),
let maybe_redacters = match redacter_options {
Some(options) => {
let mut redacters = Vec::with_capacity(options.provider_options.len());
for provider_options in options.provider_options {
let redacter = Redacters::new_redacter(provider_options, &app_reporter).await?;
redacters.push(redacter);
}
Some((options.base_options, redacters))
}
None => None,
};

Expand All @@ -113,7 +119,7 @@ pub async fn command_copy(
.as_str(),
);

bar.set_length(files_total_size);
bar.set_length(files_found as u64);
let mut total_files_copied = 0;
let mut total_files_skipped = source_files_result.skipped;
for source_file in source_files {
Expand All @@ -123,7 +129,7 @@ pub async fn command_copy(
&mut source_fs,
&mut destination_fs,
&options,
&maybe_redacter,
&maybe_redacters,
)
.await?
{
Expand All @@ -143,7 +149,7 @@ pub async fn command_copy(
&mut source_fs,
&mut destination_fs,
&options,
&maybe_redacter,
&maybe_redacters,
)
.await?
{
Expand Down Expand Up @@ -179,14 +185,14 @@ async fn transfer_and_redact_file<
source_fs: &mut SFS,
destination_fs: &mut DFS,
options: &CopyCommandOptions,
redacter: &Option<(RedacterBaseOptions, impl Redacter)>,
redacter: &Option<(RedacterBaseOptions, Vec<impl Redacter>)>,
) -> AppResult<TransferFileResult> {
let bold_style = Style::new().bold().white();
let (base_file_ref, source_reader) = source_fs.download(source_file_ref).await?;
let base_resolved_file_ref = source_fs.resolve(Some(&base_file_ref));
match options.file_matcher.matches(&base_file_ref) {
FileMatcherResult::SkippedDueToSize | FileMatcherResult::SkippedDueToName => {
bar.inc(base_file_ref.file_size.unwrap_or(0));
bar.inc(1);
return Ok(TransferFileResult::Skipped);
}
FileMatcherResult::Matched => {}
Expand Down Expand Up @@ -230,7 +236,7 @@ async fn transfer_and_redact_file<
.await?;
TransferFileResult::Copied
};
bar.inc(file_ref.file_size.unwrap_or(0));
bar.inc(1);
Ok(transfer_result)
}

Expand All @@ -246,17 +252,23 @@ async fn redact_upload_file<
source_reader: S,
base_resolved_file_ref: &AbsoluteFilePath,
dest_file_ref: &FileSystemRef,
redacter_with_options: &(RedacterBaseOptions, impl Redacter),
redacter_with_options: &(RedacterBaseOptions, Vec<impl Redacter>),
) -> AppResult<crate::commands::copy_command::TransferFileResult> {
let (redacter_base_options, redacter) = redacter_with_options;
let redacter_supported_options = redacter.redact_supported_options(dest_file_ref).await?;
if redacter_supported_options != RedactSupportedOptions::Unsupported {
let (redacter_base_options, redacters) = redacter_with_options;
let mut support_redacters = Vec::new();
for redacter in redacters {
let redacter_supported_options = redacter.redact_supported_options(dest_file_ref).await?;
if redacter_supported_options != RedactSupportedOptions::Unsupported {
support_redacters.push(redacter);
}
}
if !support_redacters.is_empty() {
match crate::redacters::redact_stream(
redacter,
&support_redacters,
redacter_base_options,
&redacter_supported_options,
source_reader,
dest_file_ref,
bar,
)
.await
{
Expand Down
3 changes: 3 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use gcloud_sdk::tonic::metadata::errors::InvalidMetadataValue;
use indicatif::style::TemplateError;
use thiserror::Error;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -31,6 +32,8 @@ pub enum AppError {
CsvParserError(#[from] csv_async::Error),
#[error("Redacter config error: {message}")]
RedacterConfigError { message: String },
#[error("Template error: {0}")]
TemplateError(#[from] TemplateError),
#[error("System error: {message}")]
SystemError { message: String },
}
Expand Down
24 changes: 12 additions & 12 deletions src/redacters/aws_comprehend.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::args::RedacterType;
use crate::errors::AppError;
use crate::filesystems::FileSystemRef;
use crate::redacters::{
Expand All @@ -6,7 +7,6 @@ use crate::redacters::{
use crate::reporter::AppReporter;
use crate::AppResult;
use aws_config::Region;
use rvstruct::ValueStruct;

#[derive(Debug, Clone)]
pub struct AwsComprehendRedacterOptions {
Expand All @@ -16,6 +16,7 @@ pub struct AwsComprehendRedacterOptions {
#[derive(Clone)]
pub struct AwsComprehendRedacter<'a> {
client: aws_sdk_comprehend::Client,
#[allow(dead_code)]
reporter: &'a AppReporter<'a>,
}

Expand All @@ -33,15 +34,7 @@ impl<'a> AwsComprehendRedacter<'a> {
Ok(Self { client, reporter })
}

pub async fn redact_text_file(
&self,
input: RedacterDataItem,
) -> AppResult<RedacterDataItemContent> {
self.reporter.report(format!(
"Redacting a text file: {} ({:?})",
input.file_ref.relative_path.value(),
input.file_ref.media_type
))?;
pub async fn redact_text_file(&self, input: RedacterDataItem) -> AppResult<RedacterDataItem> {
let text_content = match input.content {
RedacterDataItemContent::Value(content) => Ok(content),
_ => Err(AppError::SystemError {
Expand Down Expand Up @@ -76,12 +69,15 @@ impl<'a> AwsComprehendRedacter<'a> {
}
})
});
Ok(RedacterDataItemContent::Value(redacted_content))
Ok(RedacterDataItem {
file_ref: input.file_ref,
content: RedacterDataItemContent::Value(redacted_content),
})
}
}

impl<'a> Redacter for AwsComprehendRedacter<'a> {
async fn redact(&self, input: RedacterDataItem) -> AppResult<RedacterDataItemContent> {
async fn redact(&self, input: RedacterDataItem) -> AppResult<RedacterDataItem> {
match &input.content {
RedacterDataItemContent::Value(_) => self.redact_text_file(input).await,
RedacterDataItemContent::Table { .. } | RedacterDataItemContent::Image { .. } => {
Expand All @@ -106,6 +102,10 @@ impl<'a> Redacter for AwsComprehendRedacter<'a> {
_ => RedactSupportedOptions::Unsupported,
})
}

fn redacter_type(&self) -> RedacterType {
RedacterType::AwsComprehend
}
}

#[allow(unused_imports)]
Expand Down
Loading

0 comments on commit 66b4dff

Please sign in to comment.