diff --git a/.github/workflows/service_test_webdav.yml b/.github/workflows/service_test_webdav.yml index eb12d5733b6e..bce1c69c7cee 100644 --- a/.github/workflows/service_test_webdav.yml +++ b/.github/workflows/service_test_webdav.yml @@ -27,6 +27,9 @@ jobs: steps: - uses: actions/checkout@v3 + - name: Install nginx full for dav_ext modules + run: sudo apt install nginx-full + - name: Start nginx shell: bash run: | @@ -51,6 +54,9 @@ jobs: steps: - uses: actions/checkout@v3 + - name: Install nginx full for dav_ext modules + run: sudo apt install nginx-full + - name: Start nginx shell: bash run: | diff --git a/src/services/webdav/backend.rs b/src/services/webdav/backend.rs index 5cfc57b941c3..2c6d6d34ef4e 100644 --- a/src/services/webdav/backend.rs +++ b/src/services/webdav/backend.rs @@ -12,18 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; -use std::fmt::Debug; -use std::fmt::Formatter; - use async_trait::async_trait; +use bytes::Buf; use http::header; use http::Request; use http::Response; use http::StatusCode; use log::debug; +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; +use super::dir_stream::DirStream; use super::error::parse_error; +use super::list_response::Multistatus; use crate::ops::*; use crate::raw::*; use crate::*; @@ -36,7 +38,7 @@ use crate::*; /// /// - [x] read /// - [x] write -/// - [ ] list +/// - [x] list /// - [ ] ~~scan~~ /// - [ ] ~~presign~~ /// - [ ] ~~multipart~~ @@ -47,10 +49,6 @@ use crate::*; /// Bazel Remote Caching and Ccache HTTP Storage is also part of this service. /// Users can use `webdav` to connect those services. /// -/// # Status -/// -/// - `list` is not supported so far. -/// /// # Configuration /// /// - `endpoint`: set the endpoint for webdav @@ -255,40 +253,83 @@ impl Debug for WebdavBackend { impl Accessor for WebdavBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Pager = (); + type Pager = DirStream; type BlockingPager = (); fn metadata(&self) -> AccessorMetadata { let mut ma = AccessorMetadata::default(); ma.set_scheme(Scheme::Webdav) .set_root(&self.root) - .set_capabilities(AccessorCapability::Read | AccessorCapability::Write) + .set_capabilities( + AccessorCapability::Read | AccessorCapability::Write | AccessorCapability::List, + ) .set_hints(AccessorHint::ReadStreamable); ma } - async fn create(&self, path: &str, _: OpCreate) -> Result { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { + // XML body must start without a new line. Otherwise, the server will panic: `xmlParseChunk() failed` + let all_prop_xml_body = r#" + + + + "#; + + let async_body = AsyncBody::Bytes(bytes::Bytes::from(all_prop_xml_body)); let resp = self - .webdav_put(path, Some(0), None, None, AsyncBody::Empty) + .webdav_propfind(path, None, "application/xml".into(), async_body) .await?; - let status = resp.status(); match status { - StatusCode::CREATED - | StatusCode::OK - // create existing dir will return conflict - | StatusCode::CONFLICT - // create existing file will return no_content - | StatusCode::NO_CONTENT => { - resp.into_body().consume().await?; - Ok(RpCreate::default()) + StatusCode::OK | StatusCode::MULTI_STATUS => { + let bs = resp.into_body().bytes().await?; + let result: Multistatus = + quick_xml::de::from_reader(bs.reader()).map_err(|err| { + Error::new(ErrorKind::Unexpected, &err.to_string()) + .with_context("service", Scheme::Webdav) + })?; + + Ok(( + RpList::default(), + DirStream::new(&self.root, path, result, args.limit()), + )) } + StatusCode::NOT_FOUND if path.ends_with('/') => Ok(( + RpList::default(), + DirStream::new( + &self.root, + path, + Multistatus { + response: Vec::new(), + }, + args.limit(), + ), + )), _ => Err(parse_error(resp).await?), } } + async fn create(&self, path: &str, _: OpCreate) -> Result { + // create dir recursively, split path by `/` and create each dir except the last one + let abs_path = build_abs_path(&self.root, path); + let abs_path = abs_path.as_str(); + let mut parts: Vec<&str> = abs_path.split('/').filter(|x| !x.is_empty()).collect(); + if !parts.is_empty() { + parts.pop(); + } + + let mut sub_path = String::new(); + for sub_part in parts { + let sub_path_with_slash = sub_part.to_owned() + "/"; + sub_path.push_str(&sub_path_with_slash); + self.create_internal(&sub_path).await?; + } + + self.create_internal(abs_path).await + } + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { let resp = self.webdav_get(path, args.range()).await?; @@ -304,9 +345,10 @@ impl Accessor for WebdavBackend { } async fn write(&self, path: &str, args: OpWrite, r: input::Reader) -> Result { + let abs_path = &build_abs_path(&self.root, path); let resp = self .webdav_put( - path, + abs_path, Some(args.size()), args.content_type(), args.content_disposition(), @@ -387,15 +429,13 @@ impl WebdavBackend { async fn webdav_put( &self, - path: &str, + abs_path: &str, size: Option, content_type: Option<&str>, content_disposition: Option<&str>, body: AsyncBody, ) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + let url = format!("{}/{}", self.endpoint, percent_encode_path(abs_path)); let mut req = Request::put(&url); @@ -421,6 +461,65 @@ impl WebdavBackend { self.client.send_async(req).await } + async fn webdav_mkcol( + &self, + abs_path: &str, + content_type: Option<&str>, + content_disposition: Option<&str>, + body: AsyncBody, + ) -> Result> { + let url = format!("{}/{}", self.endpoint, percent_encode_path(abs_path)); + + let mut req = Request::builder().method("MKCOL").uri(&url); + if let Some(auth) = &self.authorization { + req = req.header(header::AUTHORIZATION, auth); + } + + if let Some(mime) = content_type { + req = req.header(header::CONTENT_TYPE, mime) + } + + if let Some(cd) = content_disposition { + req = req.header(header::CONTENT_DISPOSITION, cd) + } + + let req = req.body(body).map_err(new_request_build_error)?; + + self.client.send_async(req).await + } + + async fn webdav_propfind( + &self, + path: &str, + size: Option, + content_type: Option<&str>, + body: AsyncBody, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + let mut req = Request::builder() + .method("PROPFIND") + .uri(&url) + .header("Depth", "1"); + + if let Some(auth) = &self.authorization { + req = req.header(header::AUTHORIZATION, auth); + } + + if let Some(size) = size { + req = req.header(header::CONTENT_LENGTH, size) + } + + if let Some(mime) = content_type { + req = req.header(header::CONTENT_TYPE, mime) + } + + let req = req.body(body).map_err(new_request_build_error)?; + + self.client.send_async(req).await + } + async fn webdav_head(&self, path: &str) -> Result> { let p = build_rooted_abs_path(&self.root, path); @@ -456,4 +555,31 @@ impl WebdavBackend { self.client.send_async(req).await } + + async fn create_internal(&self, abs_path: &str) -> Result { + let resp = if abs_path.ends_with('/') { + self.webdav_mkcol(abs_path, None, None, AsyncBody::Empty) + .await? + } else { + self.webdav_put(abs_path, Some(0), None, None, AsyncBody::Empty) + .await? + }; + + let status = resp.status(); + + match status { + StatusCode::CREATED + | StatusCode::OK + // `File exists` will return `Method Not Allowed` + | StatusCode::METHOD_NOT_ALLOWED + // create existing dir will return conflict + | StatusCode::CONFLICT + // create existing file will return no_content + | StatusCode::NO_CONTENT => { + resp.into_body().consume().await?; + Ok(RpCreate::default()) + } + _ => Err(parse_error(resp).await?), + } + } } diff --git a/src/services/webdav/dir_stream.rs b/src/services/webdav/dir_stream.rs new file mode 100644 index 000000000000..f13bb6d46653 --- /dev/null +++ b/src/services/webdav/dir_stream.rs @@ -0,0 +1,71 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::raw::build_rel_path; +use crate::Result; +use crate::{raw::output, ObjectMetadata, ObjectMode}; +use async_trait::async_trait; + +use super::list_response::Multistatus; + +pub struct DirStream { + root: String, + path: String, + size: usize, + multistates: Multistatus, +} + +impl DirStream { + pub fn new(root: &str, path: &str, multistates: Multistatus, limit: Option) -> Self { + Self { + root: root.into(), + path: path.into(), + size: limit.unwrap_or(1000), + multistates, + } + } +} + +#[async_trait] +impl output::Page for DirStream { + async fn next_page(&mut self) -> Result>> { + let mut oes: Vec = Vec::new(); + for _ in 0..self.size { + if let Some(de) = self.multistates.response.pop() { + let path = de.href.clone(); + let normalized_path = &if self.root != path { + build_rel_path(&self.root, &path) + } else { + path + }; + + if normalized_path.eq(&self.path) { + // WebDav server may return the current path as an entry. + continue; + } + + let entry = if de.propstat.prop.resourcetype.value + == Some(super::list_response::ResourceType::Collection) + { + output::Entry::new(normalized_path, ObjectMetadata::new(ObjectMode::DIR)) + } else { + output::Entry::new(normalized_path, ObjectMetadata::new(ObjectMode::FILE)) + }; + oes.push(entry); + } + } + + Ok(if oes.is_empty() { None } else { Some(oes) }) + } +} diff --git a/src/services/webdav/fixtures/nginx-with-basic-auth.conf b/src/services/webdav/fixtures/nginx-with-basic-auth.conf index 63d89768dc7e..384defa3abdd 100644 --- a/src/services/webdav/fixtures/nginx-with-basic-auth.conf +++ b/src/services/webdav/fixtures/nginx-with-basic-auth.conf @@ -1,3 +1,5 @@ +load_module /usr/lib/nginx/modules/ngx_http_dav_ext_module.so; + error_log /tmp/error.log; pid /tmp/nginx.pid; @@ -15,7 +17,8 @@ http { location / { client_body_temp_path /tmp; log_not_found off; - dav_methods PUT DELETE; + dav_methods PUT DELETE MKCOL; + dav_ext_methods PROPFIND; create_full_put_path on; client_max_body_size 1024M; auth_basic "Administrator’s Area"; diff --git a/src/services/webdav/fixtures/nginx.conf b/src/services/webdav/fixtures/nginx.conf index f76ece6c1394..22ab20df8b36 100644 --- a/src/services/webdav/fixtures/nginx.conf +++ b/src/services/webdav/fixtures/nginx.conf @@ -1,3 +1,5 @@ +load_module /usr/lib/nginx/modules/ngx_http_dav_ext_module.so; + error_log /tmp/error.log; pid /tmp/nginx.pid; @@ -15,7 +17,8 @@ http { location / { client_body_temp_path /tmp; log_not_found off; - dav_methods PUT DELETE; + dav_methods PUT DELETE MKCOL; + dav_ext_methods PROPFIND; create_full_put_path on; client_max_body_size 1024M; } diff --git a/src/services/webdav/list_response.rs b/src/services/webdav/list_response.rs new file mode 100644 index 000000000000..07e3d6708d7b --- /dev/null +++ b/src/services/webdav/list_response.rs @@ -0,0 +1,452 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::Deserialize; + +#[derive(Deserialize, Debug, PartialEq)] +pub struct LockScopeContainer { + #[serde(rename = "$value")] + pub value: LockScope, +} + +#[derive(Deserialize, Debug, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum LockScope { + Exclusive, +} + +#[derive(Deserialize, Debug, PartialEq)] +pub struct LockTypeContainer { + #[serde(rename = "$value")] + pub value: LockType, +} + +#[derive(Deserialize, Debug, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum LockType { + Write, +} + +#[derive(Deserialize, Debug, PartialEq)] +pub struct Multistatus { + pub response: Vec, +} + +#[derive(Deserialize, Debug, PartialEq)] +pub struct ListOpResponse { + pub href: String, + pub propstat: Propstat, +} + +#[derive(Deserialize, Debug, PartialEq)] +pub struct Propstat { + pub prop: Prop, + pub status: String, +} + +#[derive(Deserialize, Debug, PartialEq)] +pub struct Prop { + pub getlastmodified: String, + pub resourcetype: ResourceTypeContainer, +} + +#[derive(Deserialize, Debug, PartialEq)] +pub struct ResourceTypeContainer { + #[serde(rename = "$value")] + pub value: Option, +} + +#[derive(Deserialize, Debug, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum ResourceType { + Collection, +} + +#[cfg(test)] +mod tests { + use super::*; + use quick_xml::de::from_str; + + #[test] + fn test_propstat() { + let xml = r#" + + / + Tue, 01 May 2022 06:39:47 GMT + + + + + + + + + + HTTP/1.1 200 OK + "#; + + let propstat = from_str::(xml).unwrap(); + assert_eq!( + propstat.prop.getlastmodified, + "Tue, 01 May 2022 06:39:47 GMT" + ); + assert_eq!( + propstat.prop.resourcetype.value.unwrap(), + ResourceType::Collection + ); + + assert_eq!(propstat.status, "HTTP/1.1 200 OK"); + } + + #[test] + fn test_response_simple() { + let xml = r#" + / + + + / + Tue, 01 May 2022 06:39:47 GMT + + + + + + + + + + HTTP/1.1 200 OK + + "#; + + let response = from_str::(xml).unwrap(); + assert_eq!(response.href, "/"); + assert_eq!( + response.propstat.prop.getlastmodified, + "Tue, 01 May 2022 06:39:47 GMT" + ); + assert_eq!( + response.propstat.prop.resourcetype.value.unwrap(), + ResourceType::Collection + ); + assert_eq!(response.propstat.status, "HTTP/1.1 200 OK"); + } + + #[test] + fn test_response_file() { + let xml = r#" + /test_file + + + test_file + 1 + Tue, 07 May 2022 05:52:22 GMT + + + + + + + + + + + + + + HTTP/1.1 200 OK + + "#; + + let response = from_str::(xml).unwrap(); + assert_eq!(response.href, "/test_file"); + assert_eq!( + response.propstat.prop.getlastmodified, + "Tue, 07 May 2022 05:52:22 GMT" + ); + assert_eq!(response.propstat.prop.resourcetype.value, None); + assert_eq!(response.propstat.status, "HTTP/1.1 200 OK"); + } + + #[test] + fn test_with_multiple_items_simple() { + let xml = r#" + + / + + + / + Tue, 01 May 2022 06:39:47 GMT + + + + + + + + + + HTTP/1.1 200 OK + + + + / + + + / + Tue, 01 May 2022 06:39:47 GMT + + + + + + + + + + HTTP/1.1 200 OK + + + "#; + + let multistatus = from_str::(xml).unwrap(); + assert_eq!(multistatus.response.len(), 2); + assert_eq!(multistatus.response[0].href, "/"); + assert_eq!( + multistatus.response[0].propstat.prop.getlastmodified, + "Tue, 01 May 2022 06:39:47 GMT" + ); + } + + #[test] + fn test_with_multiple_items_mixed() { + let xml = r#" + + + / + + + / + Tue, 07 May 2022 06:39:47 GMT + + + + + + + + + + + + + + + + HTTP/1.1 200 OK + + + + /testdir/ + + + testdir + Tue, 07 May 2022 06:40:10 GMT + + + + + + + + + + + + + + + + HTTP/1.1 200 OK + + + + /test_file + + + test_file + 1 + Tue, 07 May 2022 05:52:22 GMT + + + + + + + + + + + + + + HTTP/1.1 200 OK + + + "#; + + let multistatus = from_str::(xml).unwrap(); + + assert_eq!(multistatus.response.len(), 3); + let first_response = &multistatus.response[0]; + assert_eq!(first_response.href, "/"); + assert_eq!( + first_response.propstat.prop.getlastmodified, + "Tue, 07 May 2022 06:39:47 GMT" + ); + + let second_response = &multistatus.response[1]; + assert_eq!(second_response.href, "/testdir/"); + assert_eq!( + second_response.propstat.prop.getlastmodified, + "Tue, 07 May 2022 06:40:10 GMT" + ); + + let third_response = &multistatus.response[2]; + assert_eq!(third_response.href, "/test_file"); + assert_eq!( + third_response.propstat.prop.getlastmodified, + "Tue, 07 May 2022 05:52:22 GMT" + ); + } + + #[test] + fn test_with_multiple_items_mixed_nginx() { + let xml = r#" + + + / + + + Fri, 17 Feb 2023 03:37:22 GMT + + + + + HTTP/1.1 200 OK + + + + /test_file_75 + + + 1 + Fri, 17 Feb 2023 03:36:54 GMT + + + HTTP/1.1 200 OK + + + + /test_file_36 + + + 1 + Fri, 17 Feb 2023 03:36:54 GMT + + + HTTP/1.1 200 OK + + + + /test_file_38 + + + 1 + Fri, 17 Feb 2023 03:36:54 GMT + + + HTTP/1.1 200 OK + + + + /test_file_59 + + + 1 + Fri, 17 Feb 2023 03:36:54 GMT + + + HTTP/1.1 200 OK + + + + /test_file_9 + + + 1 + Fri, 17 Feb 2023 03:36:54 GMT + + + HTTP/1.1 200 OK + + + + /test_file_93 + + + 1 + Fri, 17 Feb 2023 03:36:54 GMT + + + HTTP/1.1 200 OK + + + + /test_file_43 + + + 1 + Fri, 17 Feb 2023 03:36:54 GMT + + + HTTP/1.1 200 OK + + + + /test_file_95 + + + 1 + Fri, 17 Feb 2023 03:36:54 GMT + + + HTTP/1.1 200 OK + + + + "#; + + let multistatus: Multistatus = from_str(xml).unwrap(); + + assert_eq!(multistatus.response.len(), 9); + + let first_response = &multistatus.response[0]; + assert_eq!(first_response.href, "/"); + assert_eq!( + first_response.propstat.prop.getlastmodified, + "Fri, 17 Feb 2023 03:37:22 GMT" + ); + } +} diff --git a/src/services/webdav/mod.rs b/src/services/webdav/mod.rs index ba8c6e5b3b39..90cc4694ea71 100644 --- a/src/services/webdav/mod.rs +++ b/src/services/webdav/mod.rs @@ -15,4 +15,6 @@ mod backend; pub use backend::WebdavBuilder as Webdav; +mod dir_stream; mod error; +mod list_response;