Skip to content

Commit

Permalink
list status when create lister
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaohaiyuan committed Feb 9, 2025
1 parent 8e3bdf9 commit 9977920
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 55 deletions.
28 changes: 26 additions & 2 deletions core/src/services/hdfs_native/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::VecDeque;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
Expand Down Expand Up @@ -240,8 +241,31 @@ impl Access for HdfsNativeBackend {

async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
let p = build_rooted_abs_path(&self.root, path);
let l = HdfsNativeLister::new(p, self.client.clone());
Ok((RpList::default(), Some(l)))
let mut deque = VecDeque::new();
let statues = self
.client
.list_status(&p, true)
.await
.map_err(parse_hdfs_error)?;

for entry in statues {
let path = format!("{}/{}", p.trim_end_matches('/'), entry.path);
let path = path.trim_start_matches('/').to_string();

let mode = if entry.isdir {
EntryMode::DIR
} else {
EntryMode::FILE
};

let mut meta = Metadata::new(mode);
meta.set_content_length(entry.length as u64)
.set_last_modified(parse_datetime_from_from_timestamp_millis(
entry.modification_time as i64,
)?);
deque.push_back(oio::Entry::new(&path, meta));
}
Ok((RpList::default(), Some(HdfsNativeLister::new(deque))))
}

async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
Expand Down
57 changes: 4 additions & 53 deletions core/src/services/hdfs_native/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,75 +16,26 @@
// under the License.

use std::collections::VecDeque;
use std::sync::Arc;

use hdfs_native::client::Client;

use crate::raw::*;
use crate::services::hdfs_native::error::parse_hdfs_error;
use crate::*;
use crate::raw::oio;
use crate::Result;

pub struct HdfsNativeLister {
client: Arc<Client>,
path: String,
entries: VecDeque<oio::Entry>,
}

impl HdfsNativeLister {
pub fn new(path: String, client: Arc<Client>) -> Self {
HdfsNativeLister {
client,
path,
entries: VecDeque::new(),
}
pub fn new(entries: VecDeque<oio::Entry>) -> Self {
HdfsNativeLister { entries }
}
}

impl oio::List for HdfsNativeLister {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
// 如果队列中还有条目,直接返回
if let Some(entry) = self.entries.pop_front() {
return Ok(Some(entry));
}

// 第一次调用时获取所有条目
if self.entries.is_empty() {
let entries = self
.client
.list_status(&self.path, true)
.await
.map_err(parse_hdfs_error)?;

// 如果目录为空,返回
if entries.is_empty() {
return Ok(None);
}

// 转换所有条目并存入队列
for entry in entries {
let path = format!("{}/{}", self.path.trim_end_matches('/'), entry.path);
let path = path.trim_start_matches('/').to_string();

let mode = if entry.isdir {
EntryMode::DIR
} else {
EntryMode::FILE
};

let mut meta = Metadata::new(mode);
meta.set_content_length(entry.length as u64)
.set_last_modified(parse_datetime_from_from_timestamp_millis(
entry.modification_time as i64,
)?);

self.entries.push_back(oio::Entry::new(&path, meta));
}

// 返回第一个条目
return Ok(self.entries.pop_front());
}

// 所有条目都已返回
Ok(None)
}
}

0 comments on commit 9977920

Please sign in to comment.