Skip to content

Commit

Permalink
feat(webdav): add list and improve create (#1330)
Browse files Browse the repository at this point in the history
* add initial

* add list response

* add webdav list_response parser

* add more response

* RFR: making PUT more generic

* progress implementatin

* add dir_stream

* make fields public

* finish backend

* finish

* rename to DirStream

* add file header

* fix lint

* update comments

* set_capabilities

* address backend feedbacks

* refine lock & clean up

* fix logic

* fix lint: dir_stream.rs

* trying to fix

* Update src/services/webdav/dir_stream.rs per comment from @ClSlaid

Co-authored-by: ClSlaid <[email protected]>

* dav_ext_methods PROPFIND;

* PROPFIND

* fix formatting

* improve nginx config

* Install nginx full

Signed-off-by: Xuanwo <[email protected]>

* Fix apt install

Signed-off-by: Xuanwo <[email protected]>

* import module

Signed-off-by: Xuanwo <[email protected]>

* fix lint

* nginx

* build relative path

* trying to fix self.root

* add default depth for propfind

* feat: support auth for HttpBackend (#1359)

support auth for HttpBackend

* feat: Add batch delete support (#1357)

* Save work

Signed-off-by: Xuanwo <[email protected]>

* Refactor batch operater

Signed-off-by: Xuanwo <[email protected]>

* Add docs

Signed-off-by: Xuanwo <[email protected]>

* Add behavior test

Signed-off-by: Xuanwo <[email protected]>

* Format code

Signed-off-by: Xuanwo <[email protected]>

* Fix test on fs

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>

* docs: clarify about opendal user defined client (#1356)

* docs: clarify opendal user defined client

1. no auto redirection
2. examples for disable it

Signed-off-by: ClSlaid <[email protected]>

* fix: make document test compilable

Signed-off-by: ClSlaid <[email protected]>

---------

Signed-off-by: ClSlaid <[email protected]>

* fix(webhdfs): should prepend http:// scheme (#1354)

* fix(webhdfs): should prepend http:// scheme

Signed-off-by: ClSlaid <[email protected]>

* refact: check scheme existence only on create

Signed-off-by: ClSlaid <[email protected]>

* fmt: make msrv-clippy happy

Signed-off-by: ClSlaid <[email protected]>

---------

Signed-off-by: ClSlaid <[email protected]>

* ci: Pin time <= 0.3.17 until we decide to bump MSRV (#1361)

* ci: Pin time <= 0.3.17 until we decide to bump MSRV

Signed-off-by: Xuanwo <[email protected]>

* Fix build

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>

* ci: Only run service test on changing (#1363)

Only run service test on changing

Signed-off-by: Xuanwo <[email protected]>

* add auth for propfind

* fix list op

* fix root path

* add xml header

* fix prop xml

* remove TODO

* skip current path while listing

* handle 404 for dir

* add MKCOL to fix mkdir

* add MKCOL for dirs

* end

* create dir recursively

* handle StatusCode::METHOD_NOT_ALLOWED

* fix iteration

* fix typo

* fix dir creation

* fix write

---------

Signed-off-by: Xuanwo <[email protected]>
Signed-off-by: ClSlaid <[email protected]>
Co-authored-by: ClSlaid <[email protected]>
Co-authored-by: Xuanwo <[email protected]>
Co-authored-by: Young-Flash <[email protected]>
  • Loading branch information
4 people authored Feb 19, 2023
1 parent 3141029 commit 9896845
Show file tree
Hide file tree
Showing 7 changed files with 692 additions and 29 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/service_test_webdav.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ jobs:
steps:
- uses: actions/checkout@v3

- name: Install nginx full for dav_ext modules
run: sudo apt install nginx-full

- name: Start nginx
shell: bash
run: |
Expand All @@ -51,6 +54,9 @@ jobs:
steps:
- uses: actions/checkout@v3

- name: Install nginx full for dav_ext modules
run: sudo apt install nginx-full

- name: Start nginx
shell: bash
run: |
Expand Down
180 changes: 153 additions & 27 deletions src/services/webdav/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;

use async_trait::async_trait;
use bytes::Buf;
use http::header;
use http::Request;
use http::Response;
use http::StatusCode;
use log::debug;
use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;

use super::dir_stream::DirStream;
use super::error::parse_error;
use super::list_response::Multistatus;
use crate::ops::*;
use crate::raw::*;
use crate::*;
Expand All @@ -36,7 +38,7 @@ use crate::*;
///
/// - [x] read
/// - [x] write
/// - [ ] list
/// - [x] list
/// - [ ] ~~scan~~
/// - [ ] ~~presign~~
/// - [ ] ~~multipart~~
Expand All @@ -47,10 +49,6 @@ use crate::*;
/// Bazel Remote Caching and Ccache HTTP Storage is also part of this service.
/// Users can use `webdav` to connect those services.
///
/// # Status
///
/// - `list` is not supported so far.
///
/// # Configuration
///
/// - `endpoint`: set the endpoint for webdav
Expand Down Expand Up @@ -255,40 +253,83 @@ impl Debug for WebdavBackend {
impl Accessor for WebdavBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
type Pager = ();
type Pager = DirStream;
type BlockingPager = ();

fn metadata(&self) -> AccessorMetadata {
let mut ma = AccessorMetadata::default();
ma.set_scheme(Scheme::Webdav)
.set_root(&self.root)
.set_capabilities(AccessorCapability::Read | AccessorCapability::Write)
.set_capabilities(
AccessorCapability::Read | AccessorCapability::Write | AccessorCapability::List,
)
.set_hints(AccessorHint::ReadStreamable);

ma
}

async fn create(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
// XML body must start without a new line. Otherwise, the server will panic: `xmlParseChunk() failed`
let all_prop_xml_body = r#"<?xml version="1.0" encoding="utf-8" ?>
<D:propfind xmlns:D="DAV:">
<D:allprop/>
</D:propfind>
"#;

let async_body = AsyncBody::Bytes(bytes::Bytes::from(all_prop_xml_body));
let resp = self
.webdav_put(path, Some(0), None, None, AsyncBody::Empty)
.webdav_propfind(path, None, "application/xml".into(), async_body)
.await?;

let status = resp.status();

match status {
StatusCode::CREATED
| StatusCode::OK
// create existing dir will return conflict
| StatusCode::CONFLICT
// create existing file will return no_content
| StatusCode::NO_CONTENT => {
resp.into_body().consume().await?;
Ok(RpCreate::default())
StatusCode::OK | StatusCode::MULTI_STATUS => {
let bs = resp.into_body().bytes().await?;
let result: Multistatus =
quick_xml::de::from_reader(bs.reader()).map_err(|err| {
Error::new(ErrorKind::Unexpected, &err.to_string())
.with_context("service", Scheme::Webdav)
})?;

Ok((
RpList::default(),
DirStream::new(&self.root, path, result, args.limit()),
))
}
StatusCode::NOT_FOUND if path.ends_with('/') => Ok((
RpList::default(),
DirStream::new(
&self.root,
path,
Multistatus {
response: Vec::new(),
},
args.limit(),
),
)),
_ => Err(parse_error(resp).await?),
}
}

async fn create(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
// create dir recursively, split path by `/` and create each dir except the last one
let abs_path = build_abs_path(&self.root, path);
let abs_path = abs_path.as_str();
let mut parts: Vec<&str> = abs_path.split('/').filter(|x| !x.is_empty()).collect();
if !parts.is_empty() {
parts.pop();
}

let mut sub_path = String::new();
for sub_part in parts {
let sub_path_with_slash = sub_part.to_owned() + "/";
sub_path.push_str(&sub_path_with_slash);
self.create_internal(&sub_path).await?;
}

self.create_internal(abs_path).await
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let resp = self.webdav_get(path, args.range()).await?;

Expand All @@ -304,9 +345,10 @@ impl Accessor for WebdavBackend {
}

async fn write(&self, path: &str, args: OpWrite, r: input::Reader) -> Result<RpWrite> {
let abs_path = &build_abs_path(&self.root, path);
let resp = self
.webdav_put(
path,
abs_path,
Some(args.size()),
args.content_type(),
args.content_disposition(),
Expand Down Expand Up @@ -387,15 +429,13 @@ impl WebdavBackend {

async fn webdav_put(
&self,
path: &str,
abs_path: &str,
size: Option<u64>,
content_type: Option<&str>,
content_disposition: Option<&str>,
body: AsyncBody,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);

let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
let url = format!("{}/{}", self.endpoint, percent_encode_path(abs_path));

let mut req = Request::put(&url);

Expand All @@ -421,6 +461,65 @@ impl WebdavBackend {
self.client.send_async(req).await
}

async fn webdav_mkcol(
&self,
abs_path: &str,
content_type: Option<&str>,
content_disposition: Option<&str>,
body: AsyncBody,
) -> Result<Response<IncomingAsyncBody>> {
let url = format!("{}/{}", self.endpoint, percent_encode_path(abs_path));

let mut req = Request::builder().method("MKCOL").uri(&url);
if let Some(auth) = &self.authorization {
req = req.header(header::AUTHORIZATION, auth);
}

if let Some(mime) = content_type {
req = req.header(header::CONTENT_TYPE, mime)
}

if let Some(cd) = content_disposition {
req = req.header(header::CONTENT_DISPOSITION, cd)
}

let req = req.body(body).map_err(new_request_build_error)?;

self.client.send_async(req).await
}

async fn webdav_propfind(
&self,
path: &str,
size: Option<u64>,
content_type: Option<&str>,
body: AsyncBody,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);

let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
let mut req = Request::builder()
.method("PROPFIND")
.uri(&url)
.header("Depth", "1");

if let Some(auth) = &self.authorization {
req = req.header(header::AUTHORIZATION, auth);
}

if let Some(size) = size {
req = req.header(header::CONTENT_LENGTH, size)
}

if let Some(mime) = content_type {
req = req.header(header::CONTENT_TYPE, mime)
}

let req = req.body(body).map_err(new_request_build_error)?;

self.client.send_async(req).await
}

async fn webdav_head(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
let p = build_rooted_abs_path(&self.root, path);

Expand Down Expand Up @@ -456,4 +555,31 @@ impl WebdavBackend {

self.client.send_async(req).await
}

async fn create_internal(&self, abs_path: &str) -> Result<RpCreate> {
let resp = if abs_path.ends_with('/') {
self.webdav_mkcol(abs_path, None, None, AsyncBody::Empty)
.await?
} else {
self.webdav_put(abs_path, Some(0), None, None, AsyncBody::Empty)
.await?
};

let status = resp.status();

match status {
StatusCode::CREATED
| StatusCode::OK
// `File exists` will return `Method Not Allowed`
| StatusCode::METHOD_NOT_ALLOWED
// create existing dir will return conflict
| StatusCode::CONFLICT
// create existing file will return no_content
| StatusCode::NO_CONTENT => {
resp.into_body().consume().await?;
Ok(RpCreate::default())
}
_ => Err(parse_error(resp).await?),
}
}
}
71 changes: 71 additions & 0 deletions src/services/webdav/dir_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::raw::build_rel_path;
use crate::Result;
use crate::{raw::output, ObjectMetadata, ObjectMode};
use async_trait::async_trait;

use super::list_response::Multistatus;

pub struct DirStream {
root: String,
path: String,
size: usize,
multistates: Multistatus,
}

impl DirStream {
pub fn new(root: &str, path: &str, multistates: Multistatus, limit: Option<usize>) -> Self {
Self {
root: root.into(),
path: path.into(),
size: limit.unwrap_or(1000),
multistates,
}
}
}

#[async_trait]
impl output::Page for DirStream {
async fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
let mut oes: Vec<output::Entry> = Vec::new();
for _ in 0..self.size {
if let Some(de) = self.multistates.response.pop() {
let path = de.href.clone();
let normalized_path = &if self.root != path {
build_rel_path(&self.root, &path)
} else {
path
};

if normalized_path.eq(&self.path) {
// WebDav server may return the current path as an entry.
continue;
}

let entry = if de.propstat.prop.resourcetype.value
== Some(super::list_response::ResourceType::Collection)
{
output::Entry::new(normalized_path, ObjectMetadata::new(ObjectMode::DIR))
} else {
output::Entry::new(normalized_path, ObjectMetadata::new(ObjectMode::FILE))
};
oes.push(entry);
}
}

Ok(if oes.is_empty() { None } else { Some(oes) })
}
}
5 changes: 4 additions & 1 deletion src/services/webdav/fixtures/nginx-with-basic-auth.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
load_module /usr/lib/nginx/modules/ngx_http_dav_ext_module.so;

error_log /tmp/error.log;
pid /tmp/nginx.pid;

Expand All @@ -15,7 +17,8 @@ http {
location / {
client_body_temp_path /tmp;
log_not_found off;
dav_methods PUT DELETE;
dav_methods PUT DELETE MKCOL;
dav_ext_methods PROPFIND;
create_full_put_path on;
client_max_body_size 1024M;
auth_basic "Administrator’s Area";
Expand Down
5 changes: 4 additions & 1 deletion src/services/webdav/fixtures/nginx.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
load_module /usr/lib/nginx/modules/ngx_http_dav_ext_module.so;

error_log /tmp/error.log;
pid /tmp/nginx.pid;

Expand All @@ -15,7 +17,8 @@ http {
location / {
client_body_temp_path /tmp;
log_not_found off;
dav_methods PUT DELETE;
dav_methods PUT DELETE MKCOL;
dav_ext_methods PROPFIND;
create_full_put_path on;
client_max_body_size 1024M;
}
Expand Down
Loading

1 comment on commit 9896845

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for opendal ready!

✅ Preview
https://opendal-qpsxmqyx3-databend.vercel.app

Built with commit 9896845.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.