diff --git a/.github/workflows/security-audit.yml b/.github/workflows/security-audit.yml new file mode 100644 index 0000000..9897e8b --- /dev/null +++ b/.github/workflows/security-audit.yml @@ -0,0 +1,21 @@ +name: security audit +on: + push: + paths: + - '**/Cargo.toml' + - '**/Cargo.lock' + schedule: + - cron: '5 4 * * 6' +concurrency: + group: ${{ github.workflow }}-${{ github.ref_protected && github.run_id || github.event.pull_request.number || github.ref }} + cancel-in-progress: true +jobs: + security_audit: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: stable + components: rustfmt, clippy + - run: cargo install cargo-audit && cargo audit || true && cargo audit diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..59b035b --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,45 @@ +name: tests & formatting +on: + push: + branches: + - master + pull_request: + workflow_dispatch: +env: + GCP_PROJECT: latestbit + GCP_PROJECT_ID: 288860578009 +concurrency: + group: ${{ github.workflow }}-${{ github.ref_protected && github.run_id || github.event.pull_request.number || github.ref }} + cancel-in-progress: true +jobs: + build: + runs-on: ubuntu-latest + permissions: + contents: 'read' + id-token: 'write' + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: stable + components: rustfmt, clippy + - name: Authenticate to Google Cloud development + id: auth + uses: google-github-actions/auth@v2 + if: github.ref == 'refs/heads/master' + with: + workload_identity_provider: 'projects/${{ env.GCP_PROJECT_ID }}/locations/global/workloadIdentityPools/lb-github-identity-pool/providers/lb-github-identity-pool-provider' + service_account: 'lb-github-service-account@${{ env.GCP_PROJECT }}.iam.gserviceaccount.com' + create_credentials_file: true + access_token_lifetime: '240s' + - name: 'Set up Cloud SDK' + uses: google-github-actions/setup-gcloud@v2 + if: github.ref == 'refs/heads/master' + - name: 'Checking formatting and clippy' + run: cargo fmt -- --check && cargo clippy -- -Dwarnings + - name: 'Run tests without access to GCP' + run: cargo test + if: github.ref != 'refs/heads/master' + - name: 'Run all test' + run: cargo test --features "ci" + if: github.ref == 'refs/heads/master' diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b18da39 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +/target/ +Cargo.lock +**/*.rs.bk +.idea/ +*.tmp +*.orig +*.swp +tmp/ diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..a52a4dc --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,76 @@ +# Contributing + +Welcome! Please read this document to understand what you can do: +* [Analyze Issues](#analyze-issues) +* [Report an Issue](#report-an-issue) +* [Contribute Code](#contribute-code) + +## Analyze Issues + +Analyzing issue reports can be a lot of effort. Any help is welcome! +Go to the GitHub issue tracker and find an open issue which needs additional work or a bugfix (e.g. issues labeled with "help wanted" or "bug"). +Additional work could include any further information, or a gist, or it might be a hint that helps understanding the issue. + +## Report an Issue + +If you find a bug - you are welcome to report it. +You can go to the GitHub issue tracker to report the issue. + +### Quick Checklist for Bug Reports + +Issue report checklist: +* Real, current bug for the latest/supported version +* No duplicate +* Reproducible +* Minimal example + +### Issue handling process + +When an issue is reported, a committer will look at it and either confirm it as a real issue, close it if it is not an issue, or ask for more details. +An issue that is about a real bug is closed as soon as the fix is committed. + + +### Reporting Security Issues + +If you find or suspect a security issue, please act responsibly and do not report it in the public issue tracker, but directly to us, so we can fix it before it can be exploited. +For details please check our [Security policy](SECURITY.md). + +## Contribute Code + +You are welcome to contribute code in order to fix bugs or to implement new features. + +There are three important things to know: + +1. You must be aware of the Apache License (which describes contributions) and **agree to the Contributors License Agreement**. This is common practice in all major Open Source projects. + For company contributors special rules apply. See the respective section below for details. +2. **Not all proposed contributions can be accepted**. Some features may e.g. just fit a third-party add-on better. The code must fit the overall direction and really improve it. The more effort you invest, the better you should clarify in advance whether the contribution fits: the best way would be to just open an issue to discuss the feature you plan to implement (make it clear you intend to contribute). + +### Contributor License Agreement + +When you contribute (code, documentation, or anything else), you have to be aware that your contribution is covered by the same [Apache 2.0 License](https://www.apache.org/licenses/LICENSE-2.0). + +This applies to all contributors, including those contributing on behalf of a company. + +### Contribution Content Guidelines + +These are some of the rules we try to follow: + +- Apply a clean coding style adapted to the surrounding code, even though we are aware the existing code is not fully clean +- Use variable naming conventions like in the other files you are seeing +- No println() - use logging service if needed +- Comment your code where it gets non-trivial +- Keep an eye on performance and memory consumption, properly destroy objects when not used anymore +- Avoid incompatible changes if possible, especially do not modify the name or behavior of public API methods or properties + +### How to contribute - the Process + +1. Make sure the change would be welcome (e.g. a bugfix or a useful feature); best do so by proposing it in a GitHub issue +2. Create a branch forking the repository and do your change +3. Commit and push your changes on that branch +4. In the commit message + - Describe the problem you fix with this change. + - Describe the effect that this change has from a user's point of view. App crashes and lockups are pretty convincing for example, but not all bugs are that obvious and should be mentioned in the text. + - Describe the technical details of what you changed. It is important to describe the change in a most understandable way so the reviewer is able to verify that the code is behaving as you intend it to. +5. Create a Pull Request +6. Once the change has been approved we will inform you in a comment +7. We will close the pull request, feel free to delete the now obsolete branch diff --git a/COPYRIGHT.md b/COPYRIGHT.md new file mode 100644 index 0000000..a1af672 --- /dev/null +++ b/COPYRIGHT.md @@ -0,0 +1,13 @@ + Copyright 2022 Abdulla Abdurakhmanov (me@abdolence.dev) + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..f3683e6 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,50 @@ +[package] +name = "redacter" +version = "0.1.0" +edition = "2021" +authors = ["Abdulla Abdurakhmanov "] +license = "Apache-2.0" +homepage = "https://github.com/abdolence/redacter-rs" +repository = "https://github.com/abdolence/redacter-rs" +documentation = "https://docs.rs/redacter" +readme = "README.md" +include = ["Cargo.toml", "src/**/*.rs", "README.md", "LICENSE"] +rust-version = "1.77.0" + +[features] +default = [] +ci-gcp = [] # For testing on CI/GCP +ci-aws = [] # For testing on CI/AWS +ci = ["ci-gcp", "ci-aws"] + + +[dependencies] +rsb_derive = "0.5" +rvstruct = "0.3" +chrono = { version = "0.4", features = ["serde"] } +serde = { version = "1.0", features = ["derive"] } +console = { version = "0.15" } +indicatif = { version = "0.17" } +clap = { version = "4.1", features = ["derive"] } +tokio = { version = "1.14", features = ["fs", "rt-multi-thread", "sync", "rt", "macros"] } +tokio-util = { version = "0.7", features = ["compat"] } +gcloud-sdk = { version = "0.25.4", features = ["google-privacy-dlp-v2", "google-rest-storage-v1"] } +futures = "0.3" +sha2 = "0.10" +async-trait = "0.1" +hex = "0.4" +thiserror = "1" +sync_wrapper = { version = "1", features = ["futures"] } +async-recursion = "1" +mime = "0.3" +mime_guess = "2" +zip = "2" +globset = "0.4" +tempdir = "0.3" +csv-async = { version = "1", default-features = false, features = ["tokio", "tokio-stream"] } +aws-config = { version = "1", features = ["behavior-version-latest"] } +aws-sdk-s3 = { version = "1" } + + +[dev-dependencies] +cargo-husky = { version = "1.5", default-features = false, features = ["run-for-all", "prepush-hook", "run-cargo-fmt"] } diff --git a/SECURITY.md b/SECURITY.md new file mode 100644 index 0000000..6bc5dbb --- /dev/null +++ b/SECURITY.md @@ -0,0 +1,9 @@ +# Security Policy + +## Reporting a Vulnerability + +Please follow general guidlines defined here: +https://cheatsheetseries.owasp.org/cheatsheets/Vulnerability_Disclosure_Cheat_Sheet.html + +## Contacts +E-mail: me@abdolence.dev diff --git a/src/args.rs b/src/args.rs new file mode 100644 index 0000000..4b34cb8 --- /dev/null +++ b/src/args.rs @@ -0,0 +1,118 @@ +use crate::common_types::GcpProjectId; +use crate::errors::AppError; +use crate::redacters::{GcpDlpRedacterOptions, RedacterOptions, RedacterProviderOptions}; +use clap::*; +use std::fmt::Display; + +#[derive(Parser, Debug)] +#[command(author, about)] +pub struct CliArgs { + #[command(subcommand)] + pub command: CliCommand, +} + +#[derive(Subcommand, Debug)] +pub enum CliCommand { + #[command(about = "Copy and redact files from source to destination")] + Cp { + #[arg( + help = "Source directory or file such as /tmp, /tmp/file.txt or gs://bucket/file.txt and others supported providers" + )] + source: String, + #[arg( + help = "Destination directory or file such as /tmp, /tmp/file.txt or gs://bucket/file.txt and others supported providers" + )] + destination: String, + #[arg(short = 'm', long, help = "Maximum size of files to copy in bytes")] + max_size_limit: Option, + #[arg( + short = 'f', + long, + help = "Filter by name using glob patterns such as *.txt" + )] + filename_filter: Option, + + #[command(flatten)] + redacter_args: Option, + }, +} + +#[derive(ValueEnum, Debug, Clone)] +pub enum RedacterType { + GcpDlp, +} + +impl std::str::FromStr for RedacterType { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "gcp-dlp" => Ok(RedacterType::GcpDlp), + _ => Err(format!("Unknown redacter type: {}", s)), + } + } +} + +impl Display for RedacterType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RedacterType::GcpDlp => write!(f, "gcp-dlp"), + } + } +} + +#[derive(Args, Debug, Clone)] +#[group(required = false)] +pub struct RedacterArgs { + #[arg(short = 'd', long, value_enum, help = "Redacter type")] + redact: Option, + + #[arg( + long, + help = "GCP project id that will be used to redact and bill API calls" + )] + pub gcp_project_id: Option, + + #[arg( + long, + help = "Allow unsupported types to be copied without redaction", + default_value = "false" + )] + pub allow_unsupported_copies: bool, + + #[arg( + long, + help = "Disable CSV headers (if they are not present)", + default_value = "false" + )] + pub csv_headers_disable: bool, + + #[arg(long, help = "CSV delimiter (default is ','")] + pub csv_delimiter: Option, +} + +impl TryInto for RedacterArgs { + type Error = AppError; + + fn try_into(self) -> Result { + let provider_options = match self.redact { + Some(RedacterType::GcpDlp) => match self.gcp_project_id { + Some(project_id) => Ok(RedacterProviderOptions::GcpDlp(GcpDlpRedacterOptions { + project_id, + })), + None => Err(AppError::RedacterConfigError { + message: "GCP project id is required for GCP DLP redacter".to_string(), + }), + }, + None => Err(AppError::RedacterConfigError { + message: "Redacter type is required".to_string(), + }), + }?; + Ok(RedacterOptions { + provider_options, + allow_unsupported_copies: self.allow_unsupported_copies, + csv_headers_disable: self.csv_headers_disable, + csv_delimiter: self.csv_delimiter.map(|c| c as u8), + }) + } +} diff --git a/src/commands/copy_command.rs b/src/commands/copy_command.rs new file mode 100644 index 0000000..971e207 --- /dev/null +++ b/src/commands/copy_command.rs @@ -0,0 +1,286 @@ +use crate::errors::AppError; +use crate::filesystems::{ + AbsoluteFilePath, DetectFileSystem, FileMatcher, FileMatcherResult, FileSystemConnection, + FileSystemRef, +}; +use crate::redacters::{Redacter, RedacterOptions, Redacters}; +use crate::reporter::AppReporter; +use crate::AppResult; +use console::{Style, Term}; +use futures::Stream; +use gcloud_sdk::prost::bytes; +use indicatif::*; +use std::error::Error; +use std::fmt::Write; +use std::time::Duration; + +pub struct CopyCommandResult { + pub files_copied: usize, + pub files_skipped: usize, +} + +#[derive(Debug, Clone)] +pub struct CopyCommandOptions { + pub file_matcher: FileMatcher, +} + +impl CopyCommandOptions { + pub fn new(filename_filter: Option, max_size_limit: Option) -> Self { + let filename_matcher = filename_filter + .as_ref() + .map(|filter| filter.compile_matcher()); + CopyCommandOptions { + file_matcher: FileMatcher::new(filename_matcher, max_size_limit), + } + } +} + +pub async fn command_copy( + term: &Term, + source: &str, + destination: &str, + options: CopyCommandOptions, + redacter_options: Option, +) -> AppResult { + let bold_style = Style::new().bold(); + let redacted_output = if let Some(ref options) = redacter_options { + bold_style + .clone() + .green() + .apply_to(format!("✓ Yes ({})", options)) + } else { + bold_style.clone().red().apply_to("✗ No".to_string()) + }; + term.write_line( + format!( + "Copying and redacting from {} to {}.\nRedacting: {}.", + bold_style.clone().white().apply_to(source), + bold_style.clone().yellow().apply_to(destination), + redacted_output + ) + .as_str(), + )?; + let bar = ProgressBar::new(1); + bar.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})") + .unwrap() + .with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) + .progress_chars("◉>◯")); + bar.enable_steady_tick(Duration::from_millis(100)); + let app_reporter = AppReporter::from(&bar); + + let mut source_fs = DetectFileSystem::open(source, &app_reporter).await?; + let mut destination_fs = DetectFileSystem::open(destination, &app_reporter).await?; + + let maybe_redacter = match redacter_options { + Some(ref options) => Some(Redacters::new_redacter(options, &app_reporter).await?), + None => None, + }; + + let copy_result: AppResult = if source_fs.has_multiple_files().await? { + if !destination_fs.accepts_multiple_files().await? { + return Err(AppError::DestinationDoesNotSupportMultipleFiles { + destination: destination.to_string(), + }); + } + bar.println("Copying directory and listing source files..."); + let source_files_result = source_fs.list_files(Some(&options.file_matcher)).await?; + let source_files = source_files_result.files; + let files_found = source_files.len(); + let files_total_size: u64 = source_files + .iter() + .map(|file| file.file_size.unwrap_or(0)) + .sum(); + bar.println( + format!( + "Found {} files. Total size: {}", + bold_style.apply_to(files_found), + bold_style.apply_to(HumanBytes(files_total_size)) + ) + .as_str(), + ); + + bar.set_length(files_total_size); + let mut total_files_copied = 0; + let mut total_files_skipped = source_files_result.skipped; + for source_file in source_files { + match transfer_and_redact_file( + Some(&source_file), + &bar, + &mut source_fs, + &mut destination_fs, + &options, + &maybe_redacter, + ) + .await? + { + TransferFileResult::Copied => total_files_copied += 1, + TransferFileResult::Skipped => total_files_skipped += 1, + } + } + Ok(CopyCommandResult { + files_copied: total_files_copied, + files_skipped: total_files_skipped, + }) + } else { + Ok( + match transfer_and_redact_file( + None, + &bar, + &mut source_fs, + &mut destination_fs, + &options, + &maybe_redacter, + ) + .await? + { + TransferFileResult::Copied => CopyCommandResult { + files_copied: 1, + files_skipped: 0, + }, + TransferFileResult::Skipped => CopyCommandResult { + files_copied: 0, + files_skipped: 1, + }, + }, + ) + }; + + destination_fs.close().await?; + source_fs.close().await?; + copy_result +} + +enum TransferFileResult { + Copied, + Skipped, +} + +async fn transfer_and_redact_file< + 'a, + SFS: FileSystemConnection<'a>, + DFS: FileSystemConnection<'a>, +>( + source_file_ref: Option<&FileSystemRef>, + bar: &ProgressBar, + source_fs: &mut SFS, + destination_fs: &mut DFS, + options: &CopyCommandOptions, + redacter: &Option, +) -> AppResult { + let bold_style = Style::new().bold().white(); + let (base_file_ref, source_reader) = source_fs.download(source_file_ref).await?; + let base_resolved_file_ref = source_fs.resolve(Some(&base_file_ref)); + match options.file_matcher.matches(&base_file_ref) { + FileMatcherResult::SkippedDueToSize | FileMatcherResult::SkippedDueToName => { + bar.inc(base_file_ref.file_size.unwrap_or(0)); + return Ok(TransferFileResult::Skipped); + } + FileMatcherResult::Matched => {} + } + + let file_ref = source_file_ref.unwrap_or(&base_file_ref); + let dest_file_ref = FileSystemRef { + relative_path: file_ref.relative_path.clone(), + media_type: file_ref.media_type.clone(), + file_size: file_ref.file_size, + }; + bar.println( + format!( + "Copying {} ({}) to {}. Size: {}", + bold_style.apply_to(&base_resolved_file_ref.file_path), + file_ref + .media_type + .as_ref() + .map(|media_type| media_type.to_string()) + .unwrap_or_else(|| "unknown".to_string()), + bold_style.apply_to(destination_fs.resolve(Some(&dest_file_ref)).file_path), + bold_style.apply_to(HumanBytes(file_ref.file_size.unwrap_or(0))) + ) + .as_str(), + ); + let transfer_result = if let Some(ref redacter) = redacter { + redact_upload_file::( + bar, + destination_fs, + bold_style, + source_reader, + &base_resolved_file_ref, + file_ref, + redacter, + ) + .await? + } else { + destination_fs + .upload(source_reader, Some(&dest_file_ref)) + .await?; + TransferFileResult::Copied + }; + bar.inc(file_ref.file_size.unwrap_or(0)); + Ok(transfer_result) +} + +async fn redact_upload_file<'a, SFS: FileSystemConnection<'a>, DFS: FileSystemConnection<'a>>( + bar: &ProgressBar, + destination_fs: &mut DFS, + bold_style: Style, + source_reader: Box> + Send + Sync + Unpin + 'static>, + base_resolved_file_ref: &AbsoluteFilePath, + dest_file_ref: &FileSystemRef, + redacter: &impl Redacter, +) -> AppResult { + if redacter.is_redact_supported(dest_file_ref).await? { + match redacter.redact_stream(source_reader, dest_file_ref).await { + Ok(redacted_reader) => { + destination_fs + .upload(redacted_reader, Some(dest_file_ref)) + .await?; + Ok(TransferFileResult::Copied) + } + Err(ref error) => { + bar.println( + format!( + "{}. Skipping {} due to: {}\n{:?}\n", + bold_style.clone().red().apply_to("Error redacting"), + bold_style.apply_to(&base_resolved_file_ref.file_path), + bold_style.apply_to(error), + error.source() + ) + .as_str(), + ); + Ok(TransferFileResult::Skipped) + } + } + } else if redacter.options().allow_unsupported_copies { + bar.println( + format!( + "Still copying {} {} because it is allowed by arguments", + bold_style.apply_to(&base_resolved_file_ref.file_path), + bold_style + .clone() + .yellow() + .apply_to("unredacted".to_string()) + ) + .as_str(), + ); + destination_fs + .upload(source_reader, Some(dest_file_ref)) + .await?; + Ok(TransferFileResult::Copied) + } else { + bar.println( + format!( + "Skipping redaction of {} because {} media type is not supported", + bold_style.apply_to(&base_resolved_file_ref.file_path), + bold_style.apply_to( + dest_file_ref + .media_type + .as_ref() + .map(|mt| mt.to_string()) + .unwrap_or("".to_string()) + ) + ) + .as_str(), + ); + Ok(TransferFileResult::Skipped) + } +} diff --git a/src/commands/mod.rs b/src/commands/mod.rs new file mode 100644 index 0000000..48ac525 --- /dev/null +++ b/src/commands/mod.rs @@ -0,0 +1,2 @@ +mod copy_command; +pub use copy_command::*; diff --git a/src/common_types.rs b/src/common_types.rs new file mode 100644 index 0000000..4e2840b --- /dev/null +++ b/src/common_types.rs @@ -0,0 +1,7 @@ +use rvstruct::ValueStruct; + +#[derive(Debug, Clone, ValueStruct)] +pub struct GcpProjectId(String); + +#[derive(Debug, Clone, ValueStruct)] +pub struct AwsAccountId(String); diff --git a/src/errors.rs b/src/errors.rs new file mode 100644 index 0000000..16f5466 --- /dev/null +++ b/src/errors.rs @@ -0,0 +1,60 @@ +use gcloud_sdk::google_rest_apis::storage_v1::objects_api::{ + StoragePeriodObjectsPeriodGetError, StoragePeriodObjectsPeriodInsertError, + StoragePeriodObjectsPeriodListError, +}; +use gcloud_sdk::tonic::metadata::errors::InvalidMetadataValue; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum AppError { + #[error("Unknown file system is specified: {file_path}")] + UnknownFileSystem { file_path: String }, + #[error("Unknown file system is specified: {redacter_type}")] + UnknownRedacter { redacter_type: String }, + #[error("Input/output error")] + InputOutputError(#[from] std::io::Error), + #[error("Destination '{destination}' doesn't support multiple files. Trailing slash needed?")] + DestinationDoesNotSupportMultipleFiles { destination: String }, + #[error("Google Cloud SDK error")] + GoogleCloudRestSdkError(#[from] gcloud_sdk::error::Error), + #[error("Google Cloud Storage download error")] + GoogleCloudStorageGetObjectError( + #[from] gcloud_sdk::google_rest_apis::storage_v1::Error, + ), + #[error("Google Cloud Storage upload error")] + GoogleCloudStorageInsertObjectError( + #[from] + gcloud_sdk::google_rest_apis::storage_v1::Error, + ), + #[error("Google Cloud Storage upload error")] + GoogleCloudStorageListObjectError( + #[from] + gcloud_sdk::google_rest_apis::storage_v1::Error, + ), + #[error("Google Cloud SDK error")] + GoogleCloudGrpcError(#[from] gcloud_sdk::tonic::Status), + #[error("Google Cloud invalid metadata value")] + GoogleCloudInvalidMetadataValue(#[from] InvalidMetadataValue), + #[error("AWS SDK error occurred")] + AwsSdkError(#[from] Box), + #[error("MIME error")] + MimeError(#[from] mime::FromStrError), + #[error("Zip error")] + ZipError(#[from] zip::result::ZipError), + #[error("CSV parser error")] + CsvParserError(#[from] csv_async::Error), + #[error("Redacter config error: {message}")] + RedacterConfigError { message: String }, + #[error("System error: {message}")] + SystemError { message: String }, +} + +impl< + O: std::error::Error + std::fmt::Debug + Send + Sync + 'static, + H: std::fmt::Debug + Send + Sync + 'static, + > From> for AppError +{ + fn from(err: aws_sdk_s3::error::SdkError) -> Self { + Self::AwsSdkError(Box::new(err)) + } +} diff --git a/src/filesystems/aws_s3.rs b/src/filesystems/aws_s3.rs new file mode 100644 index 0000000..d81e710 --- /dev/null +++ b/src/filesystems/aws_s3.rs @@ -0,0 +1,340 @@ +use crate::errors::AppError; +use crate::filesystems::{ + AbsoluteFilePath, FileMatcher, FileMatcherResult, FileSystemConnection, FileSystemRef, + ListFilesResult, RelativeFilePath, +}; +use crate::reporter::AppReporter; +use crate::AppResult; +use futures::Stream; +use futures::TryStreamExt; +use gcloud_sdk::prost::bytes::Bytes; +use rvstruct::ValueStruct; + +pub struct AwsS3FileSystem<'a> { + bucket_name: String, + object_name: String, + client: aws_sdk_s3::Client, + is_dir: bool, + reporter: &'a AppReporter<'a>, +} + +impl<'a> AwsS3FileSystem<'a> { + pub async fn new(path: &str, reporter: &'a AppReporter<'a>) -> AppResult { + let shared_config = aws_config::load_from_env().await; + let (bucket_name, object_name) = Self::parse_s3_path(path)?; + let is_dir = object_name.ends_with('/'); + println!("Bucket: {}, Object: {}", bucket_name, object_name); + let client = aws_sdk_s3::Client::new(&shared_config); + + Ok(AwsS3FileSystem { + bucket_name, + object_name, + client, + is_dir, + reporter, + }) + } + + fn parse_s3_path(path: &str) -> AppResult<(String, String)> { + let path_parts: Vec<&str> = path.trim_start_matches("s3://").split('/').collect(); + if path_parts.len() < 2 { + return Err(AppError::SystemError { + message: format!("Invalid S3 path: {}", path), + }); + } + if path_parts[1].is_empty() { + Ok((path_parts[0].to_string(), "/".to_string())) + } else { + Ok((path_parts[0].to_string(), path_parts[1..].join("/"))) + } + } + + #[async_recursion::async_recursion] + async fn list_files_recursively( + &self, + prefix: Option, + continuation_token: Option, + file_matcher: &Option<&FileMatcher>, + ) -> AppResult { + let list_req = self + .client + .list_objects_v2() + .bucket(&self.bucket_name) + .set_prefix(prefix) + .set_continuation_token(continuation_token.clone()); + let list_resp = list_req.send().await?; + + match list_resp.contents { + Some(contents) => { + let all_found: Vec = contents + .into_iter() + .filter(|item| item.key.iter().all(|key| !key.ends_with('/'))) + .filter_map(|item| { + item.key.map(|name| { + let relative_path: RelativeFilePath = + name.trim_start_matches(&self.object_name).into(); + let media_type = mime_guess::from_path(&name).first(); + FileSystemRef { + relative_path: relative_path, + media_type: media_type, + file_size: item.size.map(|v| v as u64), + } + }) + }) + .collect(); + + let next_list_result = if list_resp + .next_continuation_token + .as_ref() + .iter() + .any(|v| !v.is_empty()) + { + self.list_files_recursively( + None, + list_resp.next_continuation_token, + file_matcher, + ) + .await? + } else { + ListFilesResult::EMPTY + }; + + let all_found_len = all_found.len(); + let filtered_files: Vec = all_found + .into_iter() + .filter(|file_ref| { + file_matcher.iter().all(|matcher| { + matches!(matcher.matches(file_ref), FileMatcherResult::Matched) + }) + }) + .collect(); + let skipped = all_found_len - filtered_files.len(); + + Ok(ListFilesResult { + files: [filtered_files, next_list_result.files].concat(), + skipped: next_list_result.skipped + skipped, + }) + } + None => Ok(ListFilesResult::EMPTY), + } + } +} + +impl<'a> FileSystemConnection<'a> for AwsS3FileSystem<'a> { + async fn download( + &mut self, + file_ref: Option<&FileSystemRef>, + ) -> AppResult<( + FileSystemRef, + Box> + Send + Sync + Unpin + 'static>, + )> { + let object_name = self.resolve(file_ref).file_path; + let relative_path: RelativeFilePath = if self.is_dir { + object_name + .clone() + .trim_start_matches(&self.object_name) + .into() + } else { + object_name + .split('/') + .last() + .map(|file_name| file_name.to_string()) + .unwrap_or_else(|| object_name.clone()) + .into() + }; + + let object = self + .client + .get_object() + .bucket(&self.bucket_name) + .key(&object_name) + .send() + .await?; + + let found_file_ref = FileSystemRef { + relative_path: relative_path.clone(), + media_type: object + .content_type + .map(|v| v.parse()) + .transpose()? + .or_else(|| mime_guess::from_path(relative_path.value()).first()), + file_size: object.content_length.map(|v| v as u64), + }; + + let reader = object.body.into_async_read(); + let stream = tokio_util::io::ReaderStream::new(reader).map_err(AppError::from); + + Ok((found_file_ref, Box::new(stream))) + } + + async fn upload> + Send + Unpin + Sync + 'static>( + &mut self, + input: S, + file_ref: Option<&FileSystemRef>, + ) -> AppResult<()> { + let object_name = self.resolve(file_ref).file_path; + let content_type = file_ref + .and_then(|fr| fr.media_type.as_ref()) + .map(|v| v.to_string()); + let body_bytes: Vec = input.try_collect().await?; + let all_bytes = body_bytes.concat(); + let body = aws_sdk_s3::primitives::ByteStream::from(all_bytes); + + self.client + .put_object() + .bucket(&self.bucket_name) + .key(&object_name) + .set_content_type(content_type) + .body(body) + .send() + .await?; + + Ok(()) + } + + async fn list_files( + &mut self, + file_matcher: Option<&FileMatcher>, + ) -> AppResult { + self.reporter.report(format!( + "Listing files in bucket: {} with prefix: {}", + self.bucket_name, self.object_name + ))?; + if self.object_name.ends_with('/') { + self.list_files_recursively( + if self.object_name == "/" { + None + } else { + Some(self.object_name.clone()) + }, + None, + &file_matcher, + ) + .await + } else { + Ok(ListFilesResult::EMPTY) + } + } + + async fn close(self) -> AppResult<()> { + Ok(()) + } + + async fn has_multiple_files(&self) -> AppResult { + Ok(self.is_dir) + } + + async fn accepts_multiple_files(&self) -> AppResult { + Ok(self.is_dir) + } + + fn resolve(&self, file_ref: Option<&FileSystemRef>) -> AbsoluteFilePath { + AbsoluteFilePath { + file_path: if self.is_dir { + format!( + "{}{}", + &self.object_name, + file_ref + .map(|fr| fr.relative_path.value().clone()) + .unwrap_or_default() + ) + } else { + self.object_name.clone() + }, + scheme: "s3".to_string(), + } + } +} + +mod tests { + use super::*; + use crate::reporter::AppReporter; + use rvstruct::ValueStruct; + use tokio_util::bytes; + + #[tokio::test] + #[cfg_attr(not(feature = "ci-aws"), ignore)] + async fn upload_download_test() -> Result<(), Box> { + let term = console::Term::stdout(); + let reporter: AppReporter = AppReporter::from(&term); + let test_gcp_bucket_name = + std::env::var("TEST_AWS_BUCKET_NAME").expect("TEST_AWS_BUCKET_NAME required"); + + let mut fs = AwsS3FileSystem::new( + &format!("s3://{}/redacter/test-upload/", test_gcp_bucket_name), + &reporter, + ) + .await?; + + let test_data = "test content"; + let test_data_stream = futures::stream::iter(vec![Ok(bytes::Bytes::from(test_data))]); + fs.upload( + test_data_stream, + Some(&FileSystemRef { + relative_path: "test-upload.txt".into(), + media_type: Some(mime::TEXT_PLAIN), + file_size: Some(test_data.len() as u64), + }), + ) + .await?; + + let (file_ref, down_stream) = fs + .download(Some(&FileSystemRef { + relative_path: "test-upload.txt".into(), + media_type: Some(mime::TEXT_PLAIN), + file_size: Some(test_data.len() as u64), + })) + .await?; + + let downloaded_bytes: Vec = down_stream.try_collect().await?; + let flattened_bytes = downloaded_bytes.concat(); + let downloaded_content = std::str::from_utf8(&flattened_bytes)?; + assert_eq!(downloaded_content, test_data); + + assert_eq!(file_ref.relative_path.value(), "test-upload.txt"); + assert_eq!(file_ref.media_type, Some(mime::TEXT_PLAIN)); + assert_eq!(file_ref.file_size, Some(test_data.len() as u64)); + + fs.close().await?; + + Ok(()) + } + + #[tokio::test] + #[cfg_attr(not(feature = "ci-aws"), ignore)] + async fn list_test() -> Result<(), Box> { + let term = console::Term::stdout(); + let reporter: AppReporter = AppReporter::from(&term); + let test_gcp_bucket_name = + std::env::var("TEST_AWS_BUCKET_NAME").expect("TEST_AWS_BUCKET_NAME required"); + + let mut fs = AwsS3FileSystem::new( + &format!("s3://{}/redacter/test-list/", test_gcp_bucket_name), + &reporter, + ) + .await?; + + let test_data = "test content"; + let test_data_stream = futures::stream::iter(vec![Ok(bytes::Bytes::from(test_data))]); + fs.upload( + test_data_stream, + Some(&FileSystemRef { + relative_path: "test-upload.txt".into(), + media_type: Some(mime::TEXT_PLAIN), + file_size: Some(test_data.len() as u64), + }), + ) + .await?; + + let list_result = fs.list_files(None).await?; + assert_eq!(list_result.files.len(), 1); + let file_ref = &list_result.files[0]; + assert_eq!(file_ref.relative_path.value(), "test-upload.txt"); + assert_eq!(file_ref.media_type, Some(mime::TEXT_PLAIN)); + assert_eq!(file_ref.file_size, Some(test_data.len() as u64)); + + fs.close().await?; + + Ok(()) + } +} diff --git a/src/filesystems/file_matcher.rs b/src/filesystems/file_matcher.rs new file mode 100644 index 0000000..9314d6e --- /dev/null +++ b/src/filesystems/file_matcher.rs @@ -0,0 +1,90 @@ +use crate::filesystems::FileSystemRef; +use rvstruct::ValueStruct; + +#[derive(Debug, Clone)] +pub struct FileMatcher { + pub filename_matcher: Option, + pub max_size_limit: Option, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum FileMatcherResult { + Matched, + SkippedDueToSize, + SkippedDueToName, +} + +impl FileMatcher { + pub fn new( + filename_matcher: Option, + max_size_limit: Option, + ) -> Self { + FileMatcher { + filename_matcher, + max_size_limit, + } + } + + pub fn matches(&self, file_ref: &FileSystemRef) -> FileMatcherResult { + if let Some(max_size_limit) = self.max_size_limit { + if let Some(file_size) = file_ref.file_size { + if file_size > max_size_limit { + return FileMatcherResult::SkippedDueToSize; + } + } + } + + if let Some(filename_matcher) = &self.filename_matcher { + if !filename_matcher.is_match(file_ref.relative_path.value().as_str()) { + return FileMatcherResult::SkippedDueToName; + } + } + + FileMatcherResult::Matched + } +} + +mod tests { + use super::*; + use crate::filesystems::*; + use mime::Mime; + use std::str::FromStr; + + #[test] + fn test_file_matcher() { + let file_matcher = FileMatcher::new( + Some(globset::Glob::new("*.txt").unwrap().compile_matcher()), + Some(100), + ); + + let file_ref = FileSystemRef { + relative_path: RelativeFilePath("test.txt".to_string()), + media_type: Some(Mime::from_str("text/plain").unwrap()), + file_size: Some(50), + }; + + assert_eq!(file_matcher.matches(&file_ref), FileMatcherResult::Matched); + + let file_ref = FileSystemRef { + relative_path: RelativeFilePath("test.txt".to_string()), + media_type: Some(Mime::from_str("text/plain").unwrap()), + file_size: Some(150), + }; + + assert_eq!( + file_matcher.matches(&file_ref), + FileMatcherResult::SkippedDueToSize + ); + + let file_ref = FileSystemRef { + relative_path: RelativeFilePath("test.md".to_string()), + media_type: Some(Mime::from_str("text/plain").unwrap()), + file_size: Some(50), + }; + + assert_eq!( + file_matcher.matches(&file_ref), + FileMatcherResult::SkippedDueToName + ); + } +} diff --git a/src/filesystems/gcs.rs b/src/filesystems/gcs.rs new file mode 100644 index 0000000..fd1d0b0 --- /dev/null +++ b/src/filesystems/gcs.rs @@ -0,0 +1,330 @@ +use crate::filesystems::{ + AbsoluteFilePath, FileMatcher, FileMatcherResult, FileSystemConnection, FileSystemRef, + ListFilesResult, RelativeFilePath, +}; +use crate::reporter::AppReporter; +use crate::AppResult; +use futures::{Stream, TryStreamExt}; +use gcloud_sdk::prost::bytes; +use rvstruct::ValueStruct; +use std::default::Default; + +pub struct GoogleCloudStorageFileSystem<'a> { + google_rest_client: gcloud_sdk::GoogleRestApi, + bucket_name: String, + object_name: String, + is_dir: bool, + reporter: &'a AppReporter<'a>, +} + +impl<'a> GoogleCloudStorageFileSystem<'a> { + pub async fn new(path: &str, reporter: &'a AppReporter<'a>) -> AppResult { + let google_rest_client = gcloud_sdk::GoogleRestApi::new().await?; + let (bucket_name, object_name) = GoogleCloudStorageFileSystem::parse_gcs_path(path); + let is_dir = object_name.ends_with('/'); + Ok(GoogleCloudStorageFileSystem { + google_rest_client, + bucket_name, + object_name, + is_dir, + reporter, + }) + } + + fn parse_gcs_path(path: &str) -> (String, String) { + let path = path.trim_start_matches("gs://"); + let parts: Vec<&str> = path.split('/').collect(); + let bucket = parts[0]; + let object = parts[1..].join("/"); + (bucket.to_string(), object.to_string()) + } + + #[async_recursion::async_recursion] + async fn list_files_with_token( + &self, + prefix: Option, + page_token: Option, + file_matcher: &Option<&FileMatcher>, + ) -> AppResult { + let config = self + .google_rest_client + .create_google_storage_v1_config() + .await?; + let list_params = gcloud_sdk::google_rest_apis::storage_v1::objects_api::StoragePeriodObjectsPeriodListParams { + bucket: self.bucket_name.clone(), + prefix, + page_token, + ..gcloud_sdk::google_rest_apis::storage_v1::objects_api::StoragePeriodObjectsPeriodListParams::default() + }; + let list = gcloud_sdk::google_rest_apis::storage_v1::objects_api::storage_objects_list( + &config, + list_params, + ) + .await?; + + match list.items { + Some(items) => Ok({ + let all_found: Vec = items + .into_iter() + .filter_map(|item| { + item.name.map(|name| FileSystemRef { + relative_path: name.trim_start_matches(&self.object_name).into(), + media_type: item.content_type.and_then(|v| v.parse().ok()), + file_size: item.size.and_then(|v| v.parse::().ok()), + }) + }) + .collect(); + let next_list_result = + if list.next_page_token.as_ref().iter().any(|v| !v.is_empty()) { + self.list_files_with_token(None, list.next_page_token, file_matcher) + .await? + } else { + ListFilesResult::EMPTY + }; + let all_found_len = all_found.len(); + let filtered_files: Vec = all_found + .into_iter() + .filter(|file_ref| { + file_matcher.iter().all(|matcher| { + matches!(matcher.matches(file_ref), FileMatcherResult::Matched) + }) + }) + .collect(); + let skipped = all_found_len - filtered_files.len(); + ListFilesResult { + files: [filtered_files, next_list_result.files].concat(), + skipped: next_list_result.skipped + skipped, + } + }), + None => Ok(ListFilesResult::EMPTY), + } + } +} + +impl<'a> FileSystemConnection<'a> for GoogleCloudStorageFileSystem<'a> { + async fn download( + &mut self, + file_ref: Option<&FileSystemRef>, + ) -> AppResult<( + FileSystemRef, + Box> + Send + Sync + Unpin + 'static>, + )> { + let config = self + .google_rest_client + .create_google_storage_v1_config() + .await?; + + let object_name = self.resolve(file_ref).file_path; + + let object = gcloud_sdk::google_rest_apis::storage_v1::objects_api::storage_objects_get( + &config, + gcloud_sdk::google_rest_apis::storage_v1::objects_api::StoragePeriodObjectsPeriodGetParams { + bucket: self.bucket_name.clone(), + object: object_name.clone(), + ..gcloud_sdk::google_rest_apis::storage_v1::objects_api::StoragePeriodObjectsPeriodGetParams::default() + }, + ).await?; + + let relative_path: RelativeFilePath = if self.is_dir { + object_name + .clone() + .trim_start_matches(&self.object_name) + .into() + } else { + object_name + .split('/') + .last() + .map(|file_name| file_name.to_string()) + .unwrap_or_else(|| object_name.clone()) + .into() + }; + + let found_file_ref = FileSystemRef { + relative_path: relative_path.clone(), + media_type: object + .content_type + .map(|v| v.parse()) + .transpose()? + .or_else(|| mime_guess::from_path(relative_path.value()).first()), + file_size: object.size.and_then(|v| v.parse::().ok()), + }; + + let stream = gcloud_sdk::google_rest_apis::storage_v1::objects_api::storage_objects_get_stream( + &config, + gcloud_sdk::google_rest_apis::storage_v1::objects_api::StoragePeriodObjectsPeriodGetParams { + bucket: self.bucket_name.clone(), + object: object_name.clone(), + ..gcloud_sdk::google_rest_apis::storage_v1::objects_api::StoragePeriodObjectsPeriodGetParams::default() + } + ).await?; + Ok(( + found_file_ref, + Box::new(stream.map_err(|err| gcloud_sdk::error::Error::from(err).into())), + )) + } + + async fn upload> + Send + Unpin + Sync + 'static>( + &mut self, + input: S, + file_ref: Option<&FileSystemRef>, + ) -> AppResult<()> { + let object_name = self.resolve(file_ref).file_path; + + let config = self + .google_rest_client + .create_google_storage_v1_config() + .await?; + let content_type = file_ref + .and_then(|fr| fr.media_type.as_ref()) + .map(|v| v.to_string()); + let reader = sync_wrapper::SyncStream::new(input); + let params =gcloud_sdk::google_rest_apis::storage_v1::objects_api::StoragePeriodObjectsPeriodInsertParams { + bucket: self.bucket_name.clone(), + name: Some(object_name), + ..gcloud_sdk::google_rest_apis::storage_v1::objects_api::StoragePeriodObjectsPeriodInsertParams::default() + }; + let _ = gcloud_sdk::google_rest_apis::storage_v1::objects_api::storage_objects_insert_ext_stream( + &config, + params, + content_type, + reader + ).await?; + Ok(()) + } + + async fn list_files( + &mut self, + file_matcher: Option<&FileMatcher>, + ) -> AppResult { + self.reporter.report(format!( + "Listing files in bucket: {} with prefix: {}", + self.bucket_name, self.object_name + ))?; + if self.object_name.ends_with('/') { + self.list_files_with_token(Some(self.object_name.clone()), None, &file_matcher) + .await + } else { + Ok(ListFilesResult::EMPTY) + } + } + + async fn close(self) -> AppResult<()> { + Ok(()) + } + + async fn has_multiple_files(&self) -> AppResult { + Ok(self.is_dir) + } + + async fn accepts_multiple_files(&self) -> AppResult { + Ok(self.is_dir) + } + + fn resolve(&self, file_ref: Option<&FileSystemRef>) -> AbsoluteFilePath { + AbsoluteFilePath { + file_path: if self.is_dir { + format!( + "{}{}", + &self.object_name, + file_ref + .map(|fr| fr.relative_path.value().clone()) + .unwrap_or_default() + ) + } else { + self.object_name.clone() + }, + scheme: "gcs".to_string(), + } + } +} + +mod tests { + use super::*; + use crate::reporter::AppReporter; + + #[tokio::test] + #[cfg_attr(not(feature = "ci-gcp"), ignore)] + async fn upload_download_test() -> Result<(), Box> { + let term = console::Term::stdout(); + let reporter: AppReporter = AppReporter::from(&term); + let test_gcp_bucket_name = + std::env::var("TEST_GCS_BUCKET_NAME").expect("TEST_GCS_BUCKET_NAME required"); + + let mut fs = GoogleCloudStorageFileSystem::new( + &format!("gs://{}/redacter/test-upload/", test_gcp_bucket_name), + &reporter, + ) + .await?; + + let test_data = "test content"; + let test_data_stream = futures::stream::iter(vec![Ok(bytes::Bytes::from(test_data))]); + fs.upload( + test_data_stream, + Some(&FileSystemRef { + relative_path: "test-upload.txt".into(), + media_type: Some(mime::TEXT_PLAIN), + file_size: Some(test_data.len() as u64), + }), + ) + .await?; + + let (file_ref, down_stream) = fs + .download(Some(&FileSystemRef { + relative_path: "test-upload.txt".into(), + media_type: Some(mime::TEXT_PLAIN), + file_size: Some(test_data.len() as u64), + })) + .await?; + + let downloaded_bytes: Vec = down_stream.try_collect().await?; + let flattened_bytes = downloaded_bytes.concat(); + let downloaded_content = std::str::from_utf8(&flattened_bytes)?; + assert_eq!(downloaded_content, test_data); + + assert_eq!(file_ref.relative_path.value(), "test-upload.txt"); + assert_eq!(file_ref.media_type, Some(mime::TEXT_PLAIN)); + assert_eq!(file_ref.file_size, Some(test_data.len() as u64)); + + fs.close().await?; + + Ok(()) + } + + #[tokio::test] + #[cfg_attr(not(feature = "ci-gcp"), ignore)] + async fn list_test() -> Result<(), Box> { + let term = console::Term::stdout(); + let reporter: AppReporter = AppReporter::from(&term); + let test_gcp_bucket_name = + std::env::var("TEST_GCS_BUCKET_NAME").expect("TEST_GCS_BUCKET_NAME required"); + + let mut fs = GoogleCloudStorageFileSystem::new( + &format!("gs://{}/redacter/test-list/", test_gcp_bucket_name), + &reporter, + ) + .await?; + + let test_data = "test content"; + let test_data_stream = futures::stream::iter(vec![Ok(bytes::Bytes::from(test_data))]); + fs.upload( + test_data_stream, + Some(&FileSystemRef { + relative_path: "test-upload.txt".into(), + media_type: Some(mime::TEXT_PLAIN), + file_size: Some(test_data.len() as u64), + }), + ) + .await?; + + let list_result = fs.list_files(None).await?; + assert_eq!(list_result.files.len(), 1); + let file_ref = &list_result.files[0]; + assert_eq!(file_ref.relative_path.value(), "test-upload.txt"); + assert_eq!(file_ref.media_type, Some(mime::TEXT_PLAIN)); + assert_eq!(file_ref.file_size, Some(test_data.len() as u64)); + + fs.close().await?; + + Ok(()) + } +} diff --git a/src/filesystems/local.rs b/src/filesystems/local.rs new file mode 100644 index 0000000..694191c --- /dev/null +++ b/src/filesystems/local.rs @@ -0,0 +1,288 @@ +use crate::errors::AppError; +use crate::filesystems::{ + AbsoluteFilePath, FileMatcher, FileMatcherResult, FileSystemConnection, FileSystemRef, + ListFilesResult, +}; +use crate::reporter::AppReporter; +use crate::AppResult; +use futures::{Stream, TryStreamExt}; +use gcloud_sdk::prost::bytes; +use rvstruct::ValueStruct; +use std::path::PathBuf; +use tokio::fs::File; + +pub struct LocalFileSystem<'a> { + root_path: String, + is_dir: bool, + reporter: &'a AppReporter<'a>, +} + +impl<'a> LocalFileSystem<'a> { + pub async fn new(root_path: &str, reporter: &'a AppReporter<'a>) -> AppResult { + let root_path_base_str = root_path.trim_start_matches("file://").to_string(); + let root_path_path = PathBuf::from(&root_path_base_str); + let is_dir = root_path.ends_with('/') || root_path_path.is_dir(); + let root_path_str = if is_dir && !root_path_base_str.ends_with('/') { + format!("{}/", root_path_base_str) + } else { + root_path_base_str + }; + Ok(LocalFileSystem { + root_path: root_path_str, + is_dir, + reporter, + }) + } + + #[async_recursion::async_recursion] + pub async fn list_files_recursive( + &self, + dir_path: String, + file_matcher: &Option<&FileMatcher>, + ) -> AppResult { + let mut entries = tokio::fs::read_dir(dir_path).await?; + let mut files = Vec::new(); + let mut skipped: usize = 0; + while let Some(entry) = entries.next_entry().await? { + let file_type = entry.file_type().await?; + if file_type.is_file() { + let file_ref = FileSystemRef { + relative_path: entry + .path() + .to_string_lossy() + .to_string() + .replace(self.root_path.as_str(), "") + .into(), + media_type: mime_guess::from_path(entry.path()).first(), + file_size: Some(entry.metadata().await?.len()), + }; + if file_matcher + .iter() + .all(|matcher| matches!(matcher.matches(&file_ref), FileMatcherResult::Matched)) + { + files.push(file_ref); + } else { + skipped += 1; + } + } else if file_type.is_dir() { + let dir_files = self + .list_files_recursive(entry.path().to_string_lossy().to_string(), file_matcher) + .await?; + skipped += dir_files.skipped; + files.extend(dir_files.files); + } + } + Ok(ListFilesResult { files, skipped }) + } +} + +impl<'a> FileSystemConnection<'a> for LocalFileSystem<'a> { + async fn download( + &mut self, + file_ref: Option<&FileSystemRef>, + ) -> AppResult<( + FileSystemRef, + Box> + Send + Sync + Unpin + 'static>, + )> { + use futures::TryStreamExt; + let file_path = PathBuf::from(self.resolve(file_ref).file_path); + + let file = tokio::fs::File::open(&file_path).await?; + let stream = tokio_util::io::ReaderStream::new(file).map_err(AppError::from); + let relative_file_path = file_path + .file_name() + .ok_or_else(|| AppError::SystemError { + message: "Filename is empty".to_string(), + })? + .to_string_lossy() + .replace(self.root_path.as_str(), "") + .to_string(); + let file_metadata = tokio::fs::metadata(&file_path).await?; + let file_ref = FileSystemRef { + relative_path: relative_file_path.into(), + media_type: mime_guess::from_path(&file_path).first(), + file_size: Some(file_metadata.len()), + }; + Ok((file_ref, Box::new(stream))) + } + + async fn upload> + Send + Sync + Unpin + 'static>( + &mut self, + input: S, + file_ref: Option<&FileSystemRef>, + ) -> AppResult<()> { + let file_path = PathBuf::from(self.resolve(file_ref).file_path); + + if let Some(parent) = file_path.parent() { + if !parent.exists() { + tokio::fs::create_dir_all(parent).await?; + } + } + + let mut file = File::create(file_path).await?; + let mut reader = tokio_util::io::StreamReader::new( + input.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)), + ); + tokio::io::copy(&mut reader, &mut file).await?; + Ok(()) + } + + async fn list_files( + &mut self, + file_matcher: Option<&FileMatcher>, + ) -> AppResult { + self.reporter + .report(format!("Listing files in dir: {}", self.root_path.as_str()))?; + let source = PathBuf::from(self.root_path.as_str()); + let source_str = source.to_string_lossy().to_string(); + self.list_files_recursive(source_str.clone(), &file_matcher) + .await + } + + async fn close(self) -> AppResult<()> { + Ok(()) + } + + async fn has_multiple_files(&self) -> AppResult { + Ok(self.is_dir) + } + + async fn accepts_multiple_files(&self) -> AppResult { + Ok(self.is_dir) + } + + fn resolve(&self, file_ref: Option<&FileSystemRef>) -> AbsoluteFilePath { + AbsoluteFilePath { + file_path: if self.is_dir { + format!( + "{}{}", + self.root_path, + file_ref + .map(|fr| fr.relative_path.value().clone()) + .unwrap_or("".to_string()) + ) + } else { + self.root_path.clone() + }, + scheme: "file".to_string(), + } + } +} + +mod tests { + use super::*; + use crate::filesystems::DetectFileSystem; + use console::Term; + + #[tokio::test] + async fn download_test() -> Result<(), Box> { + let term = Term::stdout(); + let reporter: AppReporter = AppReporter::from(&term); + let temp_dir = tempdir::TempDir::new("local_file_system_tests_download")?; + let temp_dir_path = temp_dir.path(); + + let fs = DetectFileSystem::open( + &format!("file://{}", temp_dir_path.to_string_lossy()), + &reporter, + ) + .await?; + // Create a temp file in the temp dir + let temp_file = temp_dir_path.join("temp_file.txt"); + let temp_content = "test content"; + tokio::fs::write(&temp_file, temp_content).await?; + + let mut fs = fs; + let (file_ref, stream) = fs + .download(Some(&FileSystemRef { + relative_path: "temp_file.txt".into(), + media_type: None, + file_size: None, + })) + .await?; + + let downloaded_bytes: Vec = stream.try_collect().await?; + let flattened_bytes = downloaded_bytes.concat(); + let downloaded_content = std::str::from_utf8(&flattened_bytes)?; + assert_eq!(downloaded_content, temp_content); + assert_eq!(file_ref.relative_path.value(), "temp_file.txt"); + assert_eq!(file_ref.media_type, Some(mime::TEXT_PLAIN)); + assert_eq!(file_ref.file_size, Some(temp_content.len() as u64)); + + fs.close().await?; + + Ok(()) + } + + #[tokio::test] + async fn upload_test() -> Result<(), Box> { + let term = Term::stdout(); + let reporter: AppReporter = AppReporter::from(&term); + let temp_dir = tempdir::TempDir::new("local_file_system_tests_upload")?; + let temp_dir_path = temp_dir.path(); + + let fs = DetectFileSystem::open( + &format!("file://{}", temp_dir_path.to_string_lossy()), + &reporter, + ) + .await?; + + let mut fs = fs; + let content = "test content"; + let stream = futures::stream::iter(vec![Ok(bytes::Bytes::from(content))]); + fs.upload( + stream, + Some(&FileSystemRef { + relative_path: "temp_file.txt".into(), + media_type: None, + file_size: None, + }), + ) + .await?; + + let temp_file = temp_dir_path.join("temp_file.txt"); + let file_content = tokio::fs::read_to_string(&temp_file).await?; + assert_eq!(file_content, content); + + fs.close().await?; + + Ok(()) + } + + #[tokio::test] + async fn list_test() -> Result<(), Box> { + let term = Term::stdout(); + let reporter: AppReporter = AppReporter::from(&term); + let temp_dir = tempdir::TempDir::new("local_file_system_tests_list")?; + let temp_dir_path = temp_dir.path(); + + let fs = DetectFileSystem::open( + &format!("file://{}", temp_dir_path.to_string_lossy()), + &reporter, + ) + .await?; + + let mut fs = fs; + let content = "test content"; + let stream = futures::stream::iter(vec![Ok(bytes::Bytes::from(content))]); + fs.upload( + stream, + Some(&FileSystemRef { + relative_path: "temp_file.txt".into(), + media_type: None, + file_size: None, + }), + ) + .await?; + + let list_files_result = fs.list_files(None).await?; + assert_eq!(list_files_result.files.len(), 1); + assert_eq!( + list_files_result.files[0].relative_path.value(), + "temp_file.txt" + ); + + fs.close().await?; + + Ok(()) + } +} diff --git a/src/filesystems/mod.rs b/src/filesystems/mod.rs new file mode 100644 index 0000000..87416c1 --- /dev/null +++ b/src/filesystems/mod.rs @@ -0,0 +1,204 @@ +use crate::errors::AppError; +use crate::filesystems::gcs::GoogleCloudStorageFileSystem; +use crate::filesystems::local::LocalFileSystem; +use crate::filesystems::zip::ZipFileSystem; +use crate::AppResult; +use futures::Stream; +use gcloud_sdk::prost::bytes; +use gcloud_sdk::prost::bytes::Bytes; +use mime::Mime; +use rvstruct::ValueStruct; + +mod aws_s3; +mod gcs; +mod local; +mod zip; + +mod file_matcher; +use crate::filesystems::aws_s3::AwsS3FileSystem; +use crate::reporter::AppReporter; +pub use file_matcher::*; + +#[derive(Debug, Clone, ValueStruct)] +pub struct RelativeFilePath(pub String); + +#[derive(Debug, Clone)] +pub struct AbsoluteFilePath { + pub file_path: String, + pub scheme: String, +} + +impl AbsoluteFilePath { + pub fn value(&self) -> String { + format!("{}://{}", self.scheme, self.file_path) + } +} + +impl RelativeFilePath { + pub fn is_dir(&self) -> bool { + self.value().ends_with('/') + } +} + +#[derive(Debug, Clone)] +pub struct FileSystemRef { + pub relative_path: RelativeFilePath, + pub media_type: Option, + pub file_size: Option, +} + +#[derive(Debug, Clone)] +pub struct ListFilesResult { + pub files: Vec, + pub skipped: usize, +} + +impl ListFilesResult { + pub const EMPTY: ListFilesResult = ListFilesResult { + files: Vec::new(), + skipped: 0, + }; +} + +pub trait FileSystemConnection<'a> { + async fn download( + &mut self, + file_ref: Option<&FileSystemRef>, + ) -> AppResult<( + FileSystemRef, + Box> + Send + Sync + Unpin + 'static>, + )>; + + async fn upload> + Send + Unpin + Sync + 'static>( + &mut self, + input: S, + file_ref: Option<&FileSystemRef>, + ) -> AppResult<()>; + + async fn list_files( + &mut self, + file_matcher: Option<&FileMatcher>, + ) -> AppResult; + + async fn close(self) -> AppResult<()>; + + async fn has_multiple_files(&self) -> AppResult; + + async fn accepts_multiple_files(&self) -> AppResult; + + fn resolve(&self, file_ref: Option<&FileSystemRef>) -> AbsoluteFilePath; +} + +pub enum DetectFileSystem<'a> { + Local(LocalFileSystem<'a>), + GoogleCloudStorage(GoogleCloudStorageFileSystem<'a>), + AwsS3(AwsS3FileSystem<'a>), + ZipFile(ZipFileSystem<'a>), +} + +impl<'a> DetectFileSystem<'a> { + pub async fn open( + file_path: &str, + reporter: &'a AppReporter<'a>, + ) -> AppResult> { + if file_path.starts_with("file://") || !file_path.contains("://") { + return Ok(DetectFileSystem::Local( + LocalFileSystem::new(file_path, reporter).await?, + )); + } else if file_path.starts_with("gs://") { + return Ok(DetectFileSystem::GoogleCloudStorage( + GoogleCloudStorageFileSystem::new(file_path, reporter).await?, + )); + } else if file_path.starts_with("s3://") { + return Ok(DetectFileSystem::AwsS3( + AwsS3FileSystem::new(file_path, reporter).await?, + )); + } else if file_path.starts_with("zip://") { + return Ok(DetectFileSystem::ZipFile( + ZipFileSystem::new(file_path, reporter).await?, + )); + } else { + Err(AppError::UnknownFileSystem { + file_path: file_path.to_string(), + }) + } + } +} + +impl<'a> FileSystemConnection<'a> for DetectFileSystem<'a> { + async fn download( + &mut self, + file_ref: Option<&FileSystemRef>, + ) -> AppResult<( + FileSystemRef, + Box> + Send + Sync + Unpin + 'static>, + )> { + match self { + DetectFileSystem::Local(fs) => fs.download(file_ref).await, + DetectFileSystem::GoogleCloudStorage(fs) => fs.download(file_ref).await, + DetectFileSystem::AwsS3(fs) => fs.download(file_ref).await, + DetectFileSystem::ZipFile(fs) => fs.download(file_ref).await, + } + } + + async fn upload> + Send + Unpin + Sync + 'static>( + &mut self, + input: S, + file_ref: Option<&FileSystemRef>, + ) -> AppResult<()> { + match self { + DetectFileSystem::Local(fs) => fs.upload(input, file_ref).await, + DetectFileSystem::GoogleCloudStorage(fs) => fs.upload(input, file_ref).await, + DetectFileSystem::AwsS3(fs) => fs.upload(input, file_ref).await, + DetectFileSystem::ZipFile(fs) => fs.upload(input, file_ref).await, + } + } + + async fn list_files( + &mut self, + file_matcher: Option<&FileMatcher>, + ) -> AppResult { + match self { + DetectFileSystem::Local(fs) => fs.list_files(file_matcher).await, + DetectFileSystem::GoogleCloudStorage(fs) => fs.list_files(file_matcher).await, + DetectFileSystem::AwsS3(fs) => fs.list_files(file_matcher).await, + DetectFileSystem::ZipFile(fs) => fs.list_files(file_matcher).await, + } + } + + async fn close(self) -> AppResult<()> { + match self { + DetectFileSystem::Local(fs) => fs.close().await, + DetectFileSystem::GoogleCloudStorage(fs) => fs.close().await, + DetectFileSystem::AwsS3(fs) => fs.close().await, + DetectFileSystem::ZipFile(fs) => fs.close().await, + } + } + + async fn has_multiple_files(&self) -> AppResult { + match self { + DetectFileSystem::Local(fs) => fs.has_multiple_files().await, + DetectFileSystem::GoogleCloudStorage(fs) => fs.has_multiple_files().await, + DetectFileSystem::AwsS3(fs) => fs.has_multiple_files().await, + DetectFileSystem::ZipFile(fs) => fs.has_multiple_files().await, + } + } + + async fn accepts_multiple_files(&self) -> AppResult { + match self { + DetectFileSystem::Local(fs) => fs.accepts_multiple_files().await, + DetectFileSystem::GoogleCloudStorage(fs) => fs.accepts_multiple_files().await, + DetectFileSystem::AwsS3(fs) => fs.accepts_multiple_files().await, + DetectFileSystem::ZipFile(fs) => fs.accepts_multiple_files().await, + } + } + + fn resolve(&self, file_ref: Option<&FileSystemRef>) -> AbsoluteFilePath { + match self { + DetectFileSystem::Local(fs) => fs.resolve(file_ref), + DetectFileSystem::GoogleCloudStorage(fs) => fs.resolve(file_ref), + DetectFileSystem::AwsS3(fs) => fs.resolve(file_ref), + DetectFileSystem::ZipFile(fs) => fs.resolve(file_ref), + } + } +} diff --git a/src/filesystems/zip.rs b/src/filesystems/zip.rs new file mode 100644 index 0000000..6e6fe55 --- /dev/null +++ b/src/filesystems/zip.rs @@ -0,0 +1,286 @@ +use crate::errors::AppError; +use crate::filesystems::local::LocalFileSystem; +use crate::filesystems::{ + AbsoluteFilePath, FileMatcher, FileSystemConnection, FileSystemRef, ListFilesResult, +}; +use crate::reporter::AppReporter; +use crate::AppResult; +use futures::{Stream, TryStreamExt}; +use gcloud_sdk::prost::bytes::Bytes; +use rvstruct::ValueStruct; +use std::io::Write; +use std::path::{Path, PathBuf}; +use tempdir::TempDir; +use zip::*; + +pub struct ZipFileSystem<'a> { + zip_file_path: PathBuf, + mode: Option>, + reporter: &'a AppReporter<'a>, +} + +#[allow(clippy::large_enum_variant)] +enum ZipFileSystemMode<'a> { + Read { + _temp_dir: TempDir, + temp_file_system: LocalFileSystem<'a>, + }, + Write { + zip_writer: ZipWriter, + }, +} + +impl<'a> ZipFileSystem<'a> { + pub async fn new(file_path: &str, reporter: &'a AppReporter<'a>) -> AppResult { + let root_path_base_str = file_path.trim_start_matches("zip://").to_string(); + let root_path_path = PathBuf::from(&root_path_base_str); + let is_dir = file_path.ends_with('/') || root_path_path.is_dir(); + if is_dir { + return Err(AppError::SystemError { + message: "ZipFileSystem does not support directories".into(), + }); + } + Ok(Self { + zip_file_path: root_path_path, + mode: None, + reporter, + }) + } + + async fn extract_zip_for_read(&mut self) -> Result<(), AppError> { + if self.mode.is_none() { + let file = std::fs::File::open(&self.zip_file_path)?; + let mut archive = ZipArchive::new(file)?; + let temp_dir = TempDir::new("redacter")?; + archive.extract(temp_dir.path())?; + let temp_dir_str = temp_dir.path().to_string_lossy(); + self.reporter + .report(format!("Extracting files to temp dir: {}", temp_dir_str))?; + let temp_file_system = + LocalFileSystem::new(temp_dir_str.as_ref(), self.reporter).await?; + self.mode = Some(ZipFileSystemMode::Read { + _temp_dir: temp_dir, + temp_file_system, + }); + } + Ok(()) + } +} + +impl<'a> FileSystemConnection<'a> for ZipFileSystem<'a> { + async fn download( + &mut self, + file_ref: Option<&FileSystemRef>, + ) -> AppResult<( + FileSystemRef, + Box> + Send + Sync + Unpin + 'static>, + )> { + self.extract_zip_for_read().await?; + match self.mode { + Some(ZipFileSystemMode::Read { + _temp_dir: _, + ref mut temp_file_system, + }) => match file_ref { + Some(file_ref) => temp_file_system.download(Some(file_ref)).await, + None => Err(AppError::SystemError { + message: "FileSystemRef is required for ZipFileSystem".into(), + }), + }, + _ => Err(AppError::SystemError { + message: "ZipFileSystem is not in read mode".into(), + }), + } + } + + async fn upload> + Send + Unpin + Sync + 'static>( + &mut self, + mut input: S, + file_ref: Option<&FileSystemRef>, + ) -> AppResult<()> { + if self.mode.is_none() { + let zip_file = if self.zip_file_path.exists() { + return Err(AppError::SystemError { + message: "Zip file already exists".into(), + }); + } else { + std::fs::File::create_new(&self.zip_file_path)? + }; + + let zip_writer = ZipWriter::new(zip_file); + self.mode = Some(ZipFileSystemMode::Write { zip_writer }); + } + match self.mode { + Some(ZipFileSystemMode::Write { ref mut zip_writer }) => match file_ref { + Some(file_ref) => { + let file_path = Path::new(file_ref.relative_path.value()); + let file_path_str = file_path.to_string_lossy().to_string(); + let file_options = zip::write::FullFileOptions::default(); + zip_writer.start_file(file_path_str, file_options)?; + while let Some(chunk) = input.try_next().await? { + zip_writer.write_all(&chunk)?; + } + Ok(()) + } + None => Err(AppError::SystemError { + message: "FileSystemRef is required for ZipFileSystem".into(), + }), + }, + _ => Err(AppError::SystemError { + message: "ZipFileSystem is not in write mode".into(), + }), + } + } + + async fn list_files( + &mut self, + file_matcher: Option<&FileMatcher>, + ) -> AppResult { + self.extract_zip_for_read().await?; + match self.mode { + Some(ZipFileSystemMode::Read { + _temp_dir: _, + ref mut temp_file_system, + }) => temp_file_system.list_files(file_matcher).await, + _ => Err(AppError::SystemError { + message: "ZipFileSystem is not in read mode".into(), + }), + } + } + + async fn close(mut self) -> AppResult<()> { + if let Some(ZipFileSystemMode::Write { zip_writer }) = self.mode { + zip_writer.finish()?; + } + self.mode = None; + Ok(()) + } + + async fn has_multiple_files(&self) -> AppResult { + Ok(true) + } + + async fn accepts_multiple_files(&self) -> AppResult { + Ok(true) + } + + fn resolve(&self, file_ref: Option<&FileSystemRef>) -> AbsoluteFilePath { + AbsoluteFilePath { + file_path: format!( + "{}{}", + self.zip_file_path.to_string_lossy(), + file_ref + .map(|fr| format!(":{}", fr.relative_path.value())) + .unwrap_or("".to_string()) + ), + scheme: "zip".to_string(), + } + } +} + +mod tests { + use super::*; + use gcloud_sdk::prost::bytes; + use std::io::Read; + use tempdir::TempDir; + + #[tokio::test] + async fn download_test() -> Result<(), Box> { + let term = console::Term::stdout(); + let reporter: AppReporter = AppReporter::from(&term); + let temp_dir = TempDir::new("zip_file_system_tests_download")?; + let temp_dir_path = temp_dir.path(); + let zip_file_path = temp_dir_path.join("test.zip"); + let mut zip = ZipWriter::new(std::fs::File::create(&zip_file_path)?); + zip.start_file("file1.txt", zip::write::SimpleFileOptions::default())?; + let test_content = b"test content"; + zip.write_all(test_content)?; + zip.finish()?; + + let mut fs = ZipFileSystem::new( + &format!("zip://{}", zip_file_path.to_string_lossy()), + &reporter, + ) + .await?; + let (file_ref, stream) = fs + .download(Some(&FileSystemRef { + relative_path: "file1.txt".into(), + media_type: None, + file_size: None, + })) + .await?; + let downloaded_bytes: Vec = stream.try_collect().await?; + let flattened_bytes = downloaded_bytes.concat(); + let downloaded_content = std::str::from_utf8(&flattened_bytes)?; + assert_eq!(downloaded_content, std::str::from_utf8(test_content)?); + assert_eq!(file_ref.relative_path.value(), "file1.txt"); + assert_eq!(file_ref.media_type, Some(mime::TEXT_PLAIN)); + assert_eq!(file_ref.file_size, Some(test_content.len() as u64)); + + fs.close().await?; + + Ok(()) + } + + #[tokio::test] + async fn upload_test() -> Result<(), Box> { + let term = console::Term::stdout(); + let reporter: AppReporter = AppReporter::from(&term); + let temp_dir = TempDir::new("zip_file_system_tests_upload")?; + let temp_dir_path = temp_dir.path(); + let zip_file_path = temp_dir_path.join("test.zip"); + + let mut fs = ZipFileSystem::new( + &format!("zip://{}", zip_file_path.to_string_lossy()), + &reporter, + ) + .await?; + + let test_content = b"test content"; + let stream = futures::stream::iter(vec![Ok(bytes::Bytes::from(test_content.to_vec()))]); + fs.upload( + stream, + Some(&FileSystemRef { + relative_path: "file1.txt".into(), + media_type: None, + file_size: None, + }), + ) + .await?; + + fs.close().await?; + + let mut zip = ZipArchive::new(std::fs::File::open(&zip_file_path)?)?; + let mut file = zip.by_index(0)?; + let mut content = Vec::new(); + file.read_to_end(&mut content)?; + assert_eq!(content, test_content); + + Ok(()) + } + + #[tokio::test] + async fn list_files_test() -> Result<(), Box> { + let term = console::Term::stdout(); + let reporter: AppReporter = AppReporter::from(&term); + let temp_dir = TempDir::new("zip_file_system_tests_list_files")?; + let temp_dir_path = temp_dir.path(); + let zip_file_path = temp_dir_path.join("test.zip"); + let mut zip = ZipWriter::new(std::fs::File::create(&zip_file_path)?); + zip.start_file("file1.txt", zip::write::SimpleFileOptions::default())?; + zip.start_file("file2.txt", zip::write::SimpleFileOptions::default())?; + zip.finish()?; + + let mut fs = ZipFileSystem::new( + &format!("zip://{}", zip_file_path.to_string_lossy()), + &reporter, + ) + .await?; + let list_files_result = fs.list_files(None).await?; + assert_eq!(list_files_result.files.len(), 2); + assert_eq!(list_files_result.skipped, 0); + + fs.close().await?; + + Ok(()) + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..cc280b0 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,96 @@ +use clap::Parser; +use console::{Style, Term}; + +use std::error::Error; + +mod args; +use crate::commands::*; +use crate::errors::AppError; +use args::*; + +mod reporter; + +mod filesystems; + +mod errors; + +mod commands; + +mod redacters; + +pub type AppResult = Result; + +mod common_types; + +pub fn config_env_var(name: &str) -> Result { + std::env::var(name).map_err(|e| format!("{}: {}", name, e)) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let term = Term::stdout(); + let bold_style = Style::new().bold(); + + term.write_line( + format!( + "{} v{} (https://github.com/abdolence/redacter-rs)", + bold_style.clone().green().apply_to("Redacter"), + bold_style.apply_to(env!("CARGO_PKG_VERSION")) + ) + .as_str(), + )?; + + let cli = CliArgs::parse(); + if let Err(err) = handle_args(cli, &term).await { + term.write_line( + format!( + "{}: {}\nDetails: {:?}", + bold_style.clone().red().apply_to("Error"), + err, + err.source() + ) + .as_str(), + )?; + } + + Ok(()) +} + +async fn handle_args(cli: CliArgs, term: &Term) -> AppResult<()> { + let bold_style = Style::new().bold(); + + match cli.command { + CliCommand::Cp { + source, + destination, + max_size_limit, + filename_filter, + redacter_args, + } => { + let options = CopyCommandOptions::new(filename_filter, max_size_limit); + let copy_result = command_copy( + term, + &source, + &destination, + options, + redacter_args.map(|args| args.try_into()).transpose()?, + ) + .await?; + term.write_line( + format!( + "{} -> {}\n{} files processed.\n{} files skipped.", + source, + destination, + bold_style + .clone() + .green() + .apply_to(copy_result.files_copied), + Style::new().yellow().apply_to(copy_result.files_skipped), + ) + .as_str(), + )?; + } + } + + Ok(()) +} diff --git a/src/redacters/gcp_dlp.rs b/src/redacters/gcp_dlp.rs new file mode 100644 index 0000000..6d6e1ba --- /dev/null +++ b/src/redacters/gcp_dlp.rs @@ -0,0 +1,413 @@ +use crate::common_types::GcpProjectId; +use crate::errors::AppError; +use crate::filesystems::FileSystemRef; +use crate::redacters::{ + Redacter, RedacterDataItem, RedacterDataItemContent, RedacterOptions, Redacters, +}; +use crate::reporter::AppReporter; +use crate::AppResult; +use gcloud_sdk::google::privacy::dlp::v2::dlp_service_client::DlpServiceClient; +use gcloud_sdk::tonic::metadata::MetadataValue; +use gcloud_sdk::{tonic, GoogleApi, GoogleAuthMiddleware}; +use mime::Mime; +use rvstruct::ValueStruct; + +#[derive(Clone)] +pub struct GcpDlpRedacter<'a> { + pub client: GoogleApi>, + pub redacter_options: RedacterOptions, + pub gcp_dlp_options: GcpDlpRedacterOptions, + pub reporter: &'a AppReporter<'a>, +} + +#[derive(Debug, Clone)] +pub struct GcpDlpRedacterOptions { + pub project_id: GcpProjectId, +} + +impl<'a> GcpDlpRedacter<'a> { + pub const INFO_TYPES: [&'static str; 11] = [ + "PHONE_NUMBER", + "EMAIL_ADDRESS", + "CREDIT_CARD_NUMBER", + "LOCATION", + "PERSON_NAME", + "AGE", + "DATE_OF_BIRTH", + "FINANCIAL_ACCOUNT_NUMBER", + "GENDER", + "IP_ADDRESS", + "PASSPORT", + ]; + pub async fn new( + redacter_options: RedacterOptions, + gcp_dlp_options: GcpDlpRedacterOptions, + reporter: &'a AppReporter<'a>, + ) -> AppResult { + let client = + GoogleApi::from_function(DlpServiceClient::new, "https://dlp.googleapis.com", None) + .await?; + Ok(GcpDlpRedacter { + client, + redacter_options, + gcp_dlp_options, + reporter, + }) + } + + pub async fn redact_text_file( + &self, + input: RedacterDataItem, + ) -> AppResult { + self.reporter.report(format!( + "Redacting a text file: {} ({:?})", + input.file_ref.relative_path.value(), + input.file_ref.media_type + ))?; + let mut request = tonic::Request::new( + gcloud_sdk::google::privacy::dlp::v2::DeidentifyContentRequest { + parent: format!( + "projects/{}/locations/global", + self.gcp_dlp_options.project_id.value() + ), + inspect_config: Some(Self::create_inspect_config()), + deidentify_config: Some(Self::create_deidentify_config()), + item: Some(input.content.try_into()?), + ..gcloud_sdk::google::privacy::dlp::v2::DeidentifyContentRequest::default() + }, + ); + request.metadata_mut().insert( + "x-goog-user-project", + MetadataValue::::try_from( + self.gcp_dlp_options.project_id.value(), + )?, + ); + let response = self.client.get().deidentify_content(request).await?; + + if let Some(content_item) = response.into_inner().item { + content_item.try_into() + } else { + Err(AppError::SystemError { + message: "No content item in the response".to_string(), + }) + } + } + + pub async fn redact_image_file( + &self, + input: RedacterDataItem, + ) -> AppResult { + match &input.content { + RedacterDataItemContent::Image { mime_type, data } => { + self.reporter.report(format!( + "Redacting an image file: {} ({:?}). Size: {}", + input.file_ref.relative_path.value(), + &mime_type, + data.len() + ))?; + let output_mime = mime_type.clone(); + let mut request = + tonic::Request::new(gcloud_sdk::google::privacy::dlp::v2::RedactImageRequest { + parent: format!( + "projects/{}/locations/global", + self.gcp_dlp_options.project_id.value() + ), + inspect_config: Some(Self::create_inspect_config()), + byte_item: Some(input.content.try_into()?), + ..gcloud_sdk::google::privacy::dlp::v2::RedactImageRequest::default() + }); + request.metadata_mut().insert( + "x-goog-user-project", + MetadataValue::::try_from( + self.gcp_dlp_options.project_id.value(), + )?, + ); + let response = self.client.get().redact_image(request).await?; + + Ok(RedacterDataItemContent::Image { + mime_type: output_mime, + data: response.into_inner().redacted_image.into(), + }) + } + _ => Err(AppError::SystemError { + message: "Attempt to redact of unsupported image type".to_string(), + }), + } + } + + fn create_inspect_config() -> gcloud_sdk::google::privacy::dlp::v2::InspectConfig { + gcloud_sdk::google::privacy::dlp::v2::InspectConfig { + info_types: Self::INFO_TYPES + .iter() + .map(|v| gcloud_sdk::google::privacy::dlp::v2::InfoType { + name: v.to_string(), + ..gcloud_sdk::google::privacy::dlp::v2::InfoType::default() + }) + .collect(), + ..gcloud_sdk::google::privacy::dlp::v2::InspectConfig::default() + } + } + + fn create_deidentify_config() -> gcloud_sdk::google::privacy::dlp::v2::DeidentifyConfig { + gcloud_sdk::google::privacy::dlp::v2::DeidentifyConfig { + transformation: Some(gcloud_sdk::google::privacy::dlp::v2::deidentify_config::Transformation::InfoTypeTransformations( + gcloud_sdk::google::privacy::dlp::v2::InfoTypeTransformations { + transformations: vec![ + gcloud_sdk::google::privacy::dlp::v2::info_type_transformations::InfoTypeTransformation { + info_types: Self::INFO_TYPES.iter().map(|v| gcloud_sdk::google::privacy::dlp::v2::InfoType { + name: v.to_string(), + ..gcloud_sdk::google::privacy::dlp::v2::InfoType::default() + }).collect(), + primitive_transformation: Some(gcloud_sdk::google::privacy::dlp::v2::PrimitiveTransformation { + transformation: Some( + gcloud_sdk::google::privacy::dlp::v2::primitive_transformation::Transformation::ReplaceConfig(gcloud_sdk::google::privacy::dlp::v2::ReplaceValueConfig { + new_value: Some(gcloud_sdk::google::privacy::dlp::v2::Value { + r#type: Some(gcloud_sdk::google::privacy::dlp::v2::value::Type::StringValue( + "[REDACTED]".to_string() + )) + }) + }) + ) + }), + } + ] + })), + ..gcloud_sdk::google::privacy::dlp::v2::DeidentifyConfig::default() + } + } + + fn check_supported_image_type(mime_type: &Mime) -> bool { + Redacters::is_mime_image(mime_type) + && (mime_type.subtype() == "png" + || mime_type.subtype() == "jpeg" + || mime_type.subtype() == "jpg" + || mime_type.subtype() == "jpe" + || mime_type.subtype() == "gif" + || mime_type.subtype() == "bmp") + } +} + +impl<'a> Redacter for GcpDlpRedacter<'a> { + async fn redact(&self, input: RedacterDataItem) -> AppResult { + match &input.content { + RedacterDataItemContent::Table { .. } | RedacterDataItemContent::Value(_) => { + self.redact_text_file(input).await + } + RedacterDataItemContent::Image { mime_type, .. } + if Self::check_supported_image_type(mime_type) => + { + self.redact_image_file(input).await + } + RedacterDataItemContent::Image { .. } => Err(AppError::SystemError { + message: "Attempt to redact of unsupported image type".to_string(), + }), + } + } + + async fn is_redact_supported(&self, file_ref: &FileSystemRef) -> AppResult { + Ok(file_ref.media_type.as_ref().iter().all(|media_type| { + Redacters::is_mime_text(media_type) + || Redacters::is_mime_table(media_type) + || Self::check_supported_image_type(media_type) + })) + } + + fn options(&self) -> &RedacterOptions { + &self.redacter_options + } +} + +impl TryInto for RedacterDataItemContent { + type Error = AppError; + + fn try_into(self) -> Result { + match self { + RedacterDataItemContent::Value(value) => { + Ok(gcloud_sdk::google::privacy::dlp::v2::ContentItem { + data_item: Some( + gcloud_sdk::google::privacy::dlp::v2::content_item::DataItem::Value(value), + ), + }) + } + RedacterDataItemContent::Table { headers, rows } => { + let headers = if headers.is_empty() { + rows.first().map_or(vec![], |row| { + (0..row.len()) + .map(|i| gcloud_sdk::google::privacy::dlp::v2::FieldId { + name: format!("Column {}", i), + }) + .collect() + }) + } else { + headers + .into_iter() + .map(|header| gcloud_sdk::google::privacy::dlp::v2::FieldId { + name: header, + }) + .collect() + }; + Ok(gcloud_sdk::google::privacy::dlp::v2::ContentItem { + data_item: Some( + gcloud_sdk::google::privacy::dlp::v2::content_item::DataItem::Table( + gcloud_sdk::google::privacy::dlp::v2::Table { + headers, + rows: rows + .iter() + .map(|cols| gcloud_sdk::google::privacy::dlp::v2::table::Row { + values: cols.iter().map(|col| { + gcloud_sdk::google::privacy::dlp::v2::Value { + r#type: Some(gcloud_sdk::google::privacy::dlp::v2::value::Type::StringValue( + col.to_string(), + )), + } + }).collect() + + }) + .collect(), + }, + ), + ), + }) + } + RedacterDataItemContent::Image { .. } => Err(AppError::SystemError { + message: "Attempt to convert image content to ContentItem".to_string(), + }), + } + } +} + +impl TryFrom for RedacterDataItemContent { + type Error = AppError; + + fn try_from( + value: gcloud_sdk::google::privacy::dlp::v2::ContentItem, + ) -> Result { + match value.data_item { + Some(gcloud_sdk::google::privacy::dlp::v2::content_item::DataItem::Value(value)) => { + Ok(RedacterDataItemContent::Value(value)) + } + Some(gcloud_sdk::google::privacy::dlp::v2::content_item::DataItem::Table(table)) => { + Ok(RedacterDataItemContent::Table { + headers: table + .headers + .into_iter() + .map(|header| header.name) + .collect(), + rows: table + .rows + .into_iter() + .map(|row| { + row.values + .into_iter() + .map(|value| match value.r#type { + Some(gcloud_sdk::google::privacy::dlp::v2::value::Type::StringValue( + value, + )) => value, + _ => "".to_string(), + }) + .collect() + }) + .collect(), + }) + } + _ => Err(AppError::SystemError { + message: "Unknown data item type".to_string(), + }), + } + } +} + +impl TryInto for RedacterDataItemContent { + type Error = AppError; + + fn try_into( + self, + ) -> Result { + fn mime_type_to_image_type( + mime_type: &Mime, + ) -> gcloud_sdk::google::privacy::dlp::v2::byte_content_item::BytesType { + match mime_type { + mime if mime.subtype() == "png" => { + gcloud_sdk::google::privacy::dlp::v2::byte_content_item::BytesType::ImagePng + } + mime if mime.subtype() == "jpeg" || mime.subtype() == "jpg" => { + gcloud_sdk::google::privacy::dlp::v2::byte_content_item::BytesType::ImageJpeg + } + mime if mime.subtype() == "jpe" => { + gcloud_sdk::google::privacy::dlp::v2::byte_content_item::BytesType::ImageJpeg + } + mime if mime.subtype() == "gif" => { + gcloud_sdk::google::privacy::dlp::v2::byte_content_item::BytesType::Image + } + mime if mime.subtype() == "bmp" => { + gcloud_sdk::google::privacy::dlp::v2::byte_content_item::BytesType::ImageBmp + } + _ => gcloud_sdk::google::privacy::dlp::v2::byte_content_item::BytesType::Image, + } + } + match self { + RedacterDataItemContent::Image { mime_type, data } => { + Ok(gcloud_sdk::google::privacy::dlp::v2::ByteContentItem { + data: data.to_vec(), + r#type: mime_type_to_image_type(&mime_type).into(), + }) + } + _ => Err(AppError::SystemError { + message: "Attempt to convert non-image content to ByteContentItem".to_string(), + }), + } + } +} + +mod tests { + use super::*; + use crate::redacters::RedacterProviderOptions; + use console::Term; + + #[tokio::test] + #[cfg_attr(not(feature = "ci"), ignore)] + async fn redact_text_file_test() -> Result<(), Box> { + let term = Term::stdout(); + let reporter: AppReporter = AppReporter::from(&term); + let test_gcp_project_id = + std::env::var("TEST_GCP_PROJECT").expect("TEST_GCP_PROJECT required"); + let test_content = "Hello, John"; + + let file_ref = FileSystemRef { + relative_path: "temp_file.txt".into(), + media_type: Some(mime::TEXT_PLAIN), + file_size: Some(test_content.len() as u64), + }; + + let content = RedacterDataItemContent::Value(test_content.to_string()); + let input = RedacterDataItem { file_ref, content }; + + let redacter_options = RedacterOptions { + provider_options: RedacterProviderOptions::GcpDlp(GcpDlpRedacterOptions { + project_id: GcpProjectId::new(test_gcp_project_id.clone()), + }), + allow_unsupported_copies: false, + csv_headers_disable: false, + csv_delimiter: None, + }; + + let redacter = GcpDlpRedacter::new( + redacter_options, + GcpDlpRedacterOptions { + project_id: GcpProjectId::new(test_gcp_project_id), + }, + &reporter, + ) + .await?; + + let redacted_content = redacter.redact(input).await?; + match redacted_content { + RedacterDataItemContent::Value(value) => { + assert_eq!(value, "Hello, [REDACTED]"); + } + _ => panic!("Unexpected redacted content type"), + } + + Ok(()) + } +} diff --git a/src/redacters/mod.rs b/src/redacters/mod.rs new file mode 100644 index 0000000..551bc3e --- /dev/null +++ b/src/redacters/mod.rs @@ -0,0 +1,219 @@ +use crate::AppResult; +use csv_async::StringRecord; +use futures::{Stream, TryStreamExt}; +use gcloud_sdk::prost::bytes; +use mime::Mime; +use std::fmt::Display; + +mod gcp_dlp; +use crate::errors::AppError; +use crate::filesystems::FileSystemRef; +use crate::reporter::AppReporter; +pub use gcp_dlp::*; + +#[derive(Debug, Clone)] +pub struct RedacterDataItem { + pub content: RedacterDataItemContent, + pub file_ref: FileSystemRef, +} + +#[derive(Debug, Clone)] +pub enum RedacterDataItemContent { + Value(String), + Table { + headers: Vec, + rows: Vec>, + }, + Image { + mime_type: Mime, + data: bytes::Bytes, + }, +} + +#[derive(Clone)] +pub enum Redacters<'a> { + GcpDlp(GcpDlpRedacter<'a>), +} + +#[derive(Debug, Clone)] +pub struct RedacterOptions { + pub provider_options: RedacterProviderOptions, + pub allow_unsupported_copies: bool, + pub csv_headers_disable: bool, + pub csv_delimiter: Option, +} + +#[derive(Debug, Clone)] +pub enum RedacterProviderOptions { + GcpDlp(GcpDlpRedacterOptions), +} + +impl Display for RedacterOptions { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.provider_options { + RedacterProviderOptions::GcpDlp(_) => write!(f, "gcp-dlp"), + } + } +} + +impl<'a> Redacters<'a> { + pub async fn new_redacter( + redacter_options: &RedacterOptions, + reporter: &'a AppReporter<'a>, + ) -> AppResult { + match redacter_options.provider_options { + RedacterProviderOptions::GcpDlp(ref options) => Ok(Redacters::GcpDlp( + GcpDlpRedacter::new(redacter_options.clone(), options.clone(), reporter).await?, + )), + } + } + + pub fn is_mime_text(mime: &Mime) -> bool { + let mime_subtype_as_str = mime.subtype().as_str().to_lowercase(); + (mime.type_() == mime::TEXT + && (mime.subtype() == mime::PLAIN + || mime.subtype() == mime::HTML + || mime.subtype() == mime::XML + || mime.subtype() == mime::CSS)) + || (mime.type_() == mime::APPLICATION + && (mime.subtype() == mime::XML + || mime.subtype() == mime::JSON + || mime_subtype_as_str == "yaml" + || mime_subtype_as_str == "x-yaml")) + } + + pub fn is_mime_table(mime: &Mime) -> bool { + mime.type_() == mime::TEXT && mime.subtype() == mime::CSV + } + + pub fn is_mime_image(mime: &Mime) -> bool { + mime.type_() == mime::IMAGE + } +} + +pub trait Redacter { + async fn redact(&self, input: RedacterDataItem) -> AppResult; + + async fn is_redact_supported(&self, file_ref: &FileSystemRef) -> AppResult; + + fn options(&self) -> &RedacterOptions; + + async fn redact_stream< + S: Stream> + Send + Unpin + Sync + 'static, + >( + &self, + input: S, + file_ref: &FileSystemRef, + ) -> AppResult> + Send + Sync + Unpin + 'static>> + { + let content_to_redact = match file_ref.media_type { + Some(ref mime) if Redacters::is_mime_text(mime) => { + let all_chunks: Vec = input.try_collect().await?; + let all_bytes = all_chunks.concat(); + let content = + String::from_utf8(all_bytes).map_err(|e| crate::AppError::SystemError { + message: format!("Failed to convert bytes to string: {}", e), + })?; + Ok(RedacterDataItem { + content: RedacterDataItemContent::Value(content), + file_ref: file_ref.clone(), + }) + } + Some(ref mime) if Redacters::is_mime_image(mime) => { + let all_chunks: Vec = input.try_collect().await?; + let all_bytes = all_chunks.concat(); + Ok(RedacterDataItem { + content: RedacterDataItemContent::Image { + mime_type: mime.clone(), + data: all_bytes.into(), + }, + file_ref: file_ref.clone(), + }) + } + Some(ref mime) if Redacters::is_mime_table(mime) => { + let reader = tokio_util::io::StreamReader::new( + input.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)), + ); + let mut reader = csv_async::AsyncReaderBuilder::default() + .has_headers(!self.options().csv_headers_disable) + .delimiter( + self.options() + .csv_delimiter + .as_ref() + .cloned() + .unwrap_or(b','), + ) + .create_reader(reader); + let headers = if !self.options().csv_headers_disable { + reader + .headers() + .await? + .into_iter() + .map(|h| h.to_string()) + .collect() + } else { + vec![] + }; + let records: Vec = reader.records().try_collect().await?; + Ok(RedacterDataItem { + content: RedacterDataItemContent::Table { + headers, + rows: records + .iter() + .map(|r| r.iter().map(|c| c.to_string()).collect()) + .collect(), + }, + file_ref: file_ref.clone(), + }) + } + Some(ref mime) => Err(AppError::SystemError { + message: format!("Media type {} is not supported for redaction", mime), + }), + None => Err(AppError::SystemError { + message: "Media type is not provided to redact".to_string(), + }), + }?; + + let content = self.redact(content_to_redact).await?; + + match content { + RedacterDataItemContent::Value(content) => { + let bytes = bytes::Bytes::from(content.into_bytes()); + Ok(Box::new(futures::stream::iter(vec![Ok(bytes)]))) + } + RedacterDataItemContent::Image { data, .. } => { + Ok(Box::new(futures::stream::iter(vec![Ok(data)]))) + } + RedacterDataItemContent::Table { headers, rows } => { + let mut writer = csv_async::AsyncWriter::from_writer(vec![]); + writer.write_record(headers).await?; + for row in rows { + writer.write_record(row).await?; + } + writer.flush().await?; + let bytes = bytes::Bytes::from(writer.into_inner().await?); + Ok(Box::new(futures::stream::iter(vec![Ok(bytes)]))) + } + } + } +} + +impl<'a> Redacter for Redacters<'a> { + async fn redact(&self, input: RedacterDataItem) -> AppResult { + match self { + Redacters::GcpDlp(redacter) => redacter.redact(input).await, + } + } + + async fn is_redact_supported(&self, file_ref: &FileSystemRef) -> AppResult { + match self { + Redacters::GcpDlp(redacter) => redacter.is_redact_supported(file_ref).await, + } + } + + fn options(&self) -> &RedacterOptions { + match self { + Redacters::GcpDlp(redacter) => redacter.options(), + } + } +} diff --git a/src/reporter.rs b/src/reporter.rs new file mode 100644 index 0000000..6f16c14 --- /dev/null +++ b/src/reporter.rs @@ -0,0 +1,45 @@ +use crate::AppResult; +use console::Term; +use indicatif::ProgressBar; + +#[derive(Debug, Clone)] +pub struct AppReporter<'a> { + inner: AppReporterInner<'a>, +} + +impl<'a> AppReporter<'a> { + pub fn report(&'a self, message: S) -> AppResult<()> + where + S: AsRef, + { + match &self.inner { + AppReporterInner::Term(term) => Ok(term.write_line(message.as_ref())?), + AppReporterInner::ProgressBar(progress_bar) => { + progress_bar.println(message.as_ref()); + Ok(()) + } + } + } +} + +#[derive(Debug, Clone)] +enum AppReporterInner<'a> { + Term(&'a Term), + ProgressBar(&'a ProgressBar), +} + +impl<'a> From<&'a Term> for AppReporter<'a> { + fn from(term: &'a Term) -> Self { + AppReporter { + inner: AppReporterInner::Term(term), + } + } +} + +impl<'a> From<&'a ProgressBar> for AppReporter<'a> { + fn from(progress_bar: &'a ProgressBar) -> Self { + AppReporter { + inner: AppReporterInner::ProgressBar(progress_bar), + } + } +}