Skip to content

Commit

Permalink
feat(core): Implement list with deleted and versions for gcs
Browse files Browse the repository at this point in the history
  • Loading branch information
hoslo committed Jan 23, 2025
1 parent a8b793b commit 8b0d9cc
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 36 deletions.
33 changes: 23 additions & 10 deletions core/src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ impl GcsBuilder {
self
}

/// Set bucket versioning status for this backend
pub fn enable_versioning(mut self, enabled: bool) -> Self {
self.config.enable_versioning = enabled;

self
}

/// Set the predefined acl for GCS.
///
/// Available values are:
Expand Down Expand Up @@ -326,6 +333,7 @@ impl Builder for GcsBuilder {
predefined_acl: self.config.predefined_acl.clone(),
default_storage_class: self.config.default_storage_class.clone(),
allow_anonymous: self.config.allow_anonymous,
enable_versioning: self.config.enable_versioning,
}),
};

Expand Down Expand Up @@ -362,20 +370,22 @@ impl Access for GcsBackend {
stat_has_content_md5: true,
stat_has_content_length: true,
stat_has_content_type: true,
stat_with_version: self.core.enable_versioning,
stat_has_last_modified: true,
stat_has_user_metadata: true,

read: true,

read_with_if_match: true,
read_with_if_none_match: true,
read_with_version: self.core.enable_versioning,

write: true,
write_can_empty: true,
write_can_multi: true,
write_with_content_type: true,
write_with_user_metadata: true,
write_with_if_not_exists: true,
write_with_if_not_exists: !self.core.enable_versioning,

// The min multipart size of Gcs is 5 MiB.
//
Expand All @@ -392,6 +402,7 @@ impl Access for GcsBackend {

delete: true,
delete_max_size: Some(100),
delete_with_version: self.core.enable_versioning,
copy: true,

list: true,
Expand All @@ -403,6 +414,8 @@ impl Access for GcsBackend {
list_has_content_length: true,
list_has_content_type: true,
list_has_last_modified: true,
list_with_versions: self.core.enable_versioning,
list_with_deleted: self.core.enable_versioning,

presign: true,
presign_stat: true,
Expand Down Expand Up @@ -432,6 +445,7 @@ impl Access for GcsBackend {

m.set_etag(&meta.etag);
m.set_content_md5(&meta.md5_hash);
m.set_version(&meta.generation);

let size = meta
.size
Expand Down Expand Up @@ -485,15 +499,10 @@ impl Access for GcsBackend {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let l = GcsLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
);

Ok((RpList::default(), oio::PageLister::new(l)))
Ok((
RpList::default(),
oio::PageLister::new(GcsLister::new(self.core.clone(), path, args)),
))
}

async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
Expand Down Expand Up @@ -554,6 +563,10 @@ struct GetObjectJsonResponse {
///
/// For example: `"contentType": "image/png",`
content_type: String,
/// Generation of this object.
///
/// For example: `"generation": "1660563214863653"`
generation: String,
/// Custom metadata of this object.
///
/// For example: `"metadata" : { "my-key": "my-value" }`
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/gcs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub struct GcsConfig {
pub disable_vm_metadata: bool,
/// Disable loading configuration from the environment.
pub disable_config_load: bool,
/// Enable versioning for the bucket.
pub enable_versioning: bool,
/// A Google Cloud OAuth2 token.
///
/// Takes precedence over `credential` and `credential_path`.
Expand Down
84 changes: 74 additions & 10 deletions core/src/services/gcs/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub mod constants {
pub const X_GOOG_ACL: &str = "x-goog-acl";
pub const X_GOOG_STORAGE_CLASS: &str = "x-goog-storage-class";
pub const X_GOOG_META_PREFIX: &str = "x-goog-meta-";
pub const X_GOOG_IF_GENERATION_MATCH: &str = "x-goog-if-generation-match";
pub const GENERATION: &str = "generation";
}

pub struct GcsCore {
Expand All @@ -69,6 +71,7 @@ pub struct GcsCore {
pub default_storage_class: Option<String>,

pub allow_anonymous: bool,
pub enable_versioning: bool,
}

impl Debug for GcsCore {
Expand Down Expand Up @@ -184,13 +187,25 @@ impl GcsCore {
) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);

let url = format!(
let mut url = format!(
"{}/storage/v1/b/{}/o/{}?alt=media",
self.endpoint,
self.bucket,
percent_encode_path(&p)
);

let mut query_args = Vec::new();
if let Some(version) = args.version() {
query_args.push(format!(
"{}={}",
constants::GENERATION,
percent_decode_path(version)
))
}
if !query_args.is_empty() {
url.push_str(&format!("&{}", query_args.join("&")));
}

let mut req = Request::get(&url);

if let Some(if_match) = args.if_match() {
Expand All @@ -216,6 +231,10 @@ impl GcsCore {

let mut req = Request::get(&url);

if let Some(version) = args.version() {
req = req.header(constants::X_GOOG_IF_GENERATION_MATCH, version);
}

if let Some(if_match) = args.if_match() {
req = req.header(IF_MATCH, if_match);
}
Expand Down Expand Up @@ -363,13 +382,25 @@ impl GcsCore {
pub fn gcs_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);

let url = format!(
let mut url = format!(
"{}/storage/v1/b/{}/o/{}",
self.endpoint,
self.bucket,
percent_encode_path(&p)
);

let mut query_args = Vec::new();
if let Some(version) = args.version() {
query_args.push(format!(
"{}={}",
constants::GENERATION,
percent_decode_path(version)
))
}
if !query_args.is_empty() {
url.push_str(&format!("?{}", query_args.join("&")));
}

let mut req = Request::get(&url);

if let Some(if_none_match) = args.if_none_match() {
Expand All @@ -393,7 +424,19 @@ impl GcsCore {
) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);

let url = format!("{}/{}/{}", self.endpoint, self.bucket, p);
let mut url = format!("{}/{}/{}", self.endpoint, self.bucket, p);

let mut query_args = Vec::new();
if let Some(version) = args.version() {
query_args.push(format!(
"{}={}",
constants::GENERATION,
percent_decode_path(version)
))
}
if !query_args.is_empty() {
url.push_str(&format!("?{}", query_args.join("&")));
}

let mut req = Request::head(&url);

Expand Down Expand Up @@ -422,35 +465,50 @@ impl GcsCore {
self.send(req).await
}

pub async fn gcs_delete_object(&self, path: &str) -> Result<Response<Buffer>> {
let mut req = self.gcs_delete_object_request(path)?;
pub async fn gcs_delete_object(&self, path: &str, args: OpDelete) -> Result<Response<Buffer>> {
let mut req = self.gcs_delete_object_request(path, args)?;

self.sign(&mut req).await?;
self.send(req).await
}

pub fn gcs_delete_object_request(&self, path: &str) -> Result<Request<Buffer>> {
pub fn gcs_delete_object_request(&self, path: &str, args: OpDelete) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);

let url = format!(
let mut url = format!(
"{}/storage/v1/b/{}/o/{}",
self.endpoint,
self.bucket,
percent_encode_path(&p)
);

let mut query_args = Vec::new();
if let Some(version) = args.version() {
query_args.push(format!(
"{}={}",
constants::GENERATION,
percent_decode_path(version)
))
}
if !query_args.is_empty() {
url.push_str(&format!("?{}", query_args.join("&")));
}

Request::delete(&url)
.body(Buffer::new())
.map_err(new_request_build_error)
}

pub async fn gcs_delete_objects(&self, paths: Vec<String>) -> Result<Response<Buffer>> {
pub async fn gcs_delete_objects(
&self,
batch: Vec<(String, OpDelete)>,
) -> Result<Response<Buffer>> {
let uri = format!("{}/batch/storage/v1", self.endpoint);

let mut multipart = Multipart::new();

for (idx, path) in paths.iter().enumerate() {
let req = self.gcs_delete_object_request(path)?;
for (idx, (path, args)) in batch.iter().enumerate() {
let req = self.gcs_delete_object_request(path, args.clone())?;

multipart = multipart.part(
MixedPart::from_request(req).part_header("content-id".parse().unwrap(), idx.into()),
Expand Down Expand Up @@ -493,6 +551,7 @@ impl GcsCore {
delimiter: &str,
limit: Option<usize>,
start_after: Option<String>,
versions: bool,
) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);

Expand All @@ -502,6 +561,9 @@ impl GcsCore {
self.bucket,
percent_encode_path(&p)
);
if versions {
write!(url, "&versions=true").expect("write into string must succeed");
}
if !delimiter.is_empty() {
write!(url, "&delimiter={delimiter}").expect("write into string must succeed");
}
Expand Down Expand Up @@ -681,6 +743,8 @@ pub struct ListResponseItem {
pub md5_hash: String,
pub updated: String,
pub content_type: String,
pub time_deleted: Option<String>,
pub generation: String,
}

/// Result of CreateMultipartUpload
Expand Down
8 changes: 4 additions & 4 deletions core/src/services/gcs/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ impl GcsDeleter {
}

impl oio::BatchDelete for GcsDeleter {
async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
let resp = self.core.gcs_delete_object(&path).await?;
async fn delete_once(&self, path: String, args: OpDelete) -> Result<()> {
let resp = self.core.gcs_delete_object(&path, args).await?;

// deleting not existing objects is ok
if resp.status().is_success() || resp.status() == StatusCode::NOT_FOUND {
Expand All @@ -46,8 +46,8 @@ impl oio::BatchDelete for GcsDeleter {
}

async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result<BatchDeleteResult> {
let paths: Vec<String> = batch.into_iter().map(|(p, _)| p).collect();
let resp = self.core.gcs_delete_objects(paths.clone()).await?;
let paths: Vec<String> = batch.clone().into_iter().map(|(p, _)| p).collect();
let resp = self.core.gcs_delete_objects(batch).await?;

let status = resp.status();

Expand Down
Loading

0 comments on commit 8b0d9cc

Please sign in to comment.