From 997792038352fb23c4a8f45a0c3903594c3770d9 Mon Sep 17 00:00:00 2001 From: zhaohaiyuan Date: Sun, 9 Feb 2025 18:38:12 +0800 Subject: [PATCH] list status when create lister --- core/src/services/hdfs_native/backend.rs | 28 +++++++++++- core/src/services/hdfs_native/lister.rs | 57 ++---------------------- 2 files changed, 30 insertions(+), 55 deletions(-) diff --git a/core/src/services/hdfs_native/backend.rs b/core/src/services/hdfs_native/backend.rs index 80f607d5140b..4eb3f788151e 100644 --- a/core/src/services/hdfs_native/backend.rs +++ b/core/src/services/hdfs_native/backend.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::VecDeque; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; @@ -240,8 +241,31 @@ impl Access for HdfsNativeBackend { async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> { let p = build_rooted_abs_path(&self.root, path); - let l = HdfsNativeLister::new(p, self.client.clone()); - Ok((RpList::default(), Some(l))) + let mut deque = VecDeque::new(); + let statues = self + .client + .list_status(&p, true) + .await + .map_err(parse_hdfs_error)?; + + for entry in statues { + let path = format!("{}/{}", p.trim_end_matches('/'), entry.path); + let path = path.trim_start_matches('/').to_string(); + + let mode = if entry.isdir { + EntryMode::DIR + } else { + EntryMode::FILE + }; + + let mut meta = Metadata::new(mode); + meta.set_content_length(entry.length as u64) + .set_last_modified(parse_datetime_from_from_timestamp_millis( + entry.modification_time as i64, + )?); + deque.push_back(oio::Entry::new(&path, meta)); + } + Ok((RpList::default(), Some(HdfsNativeLister::new(deque)))) } async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { diff --git a/core/src/services/hdfs_native/lister.rs b/core/src/services/hdfs_native/lister.rs index 696b3d6d925e..f0e410bc4fad 100644 --- a/core/src/services/hdfs_native/lister.rs +++ b/core/src/services/hdfs_native/lister.rs @@ -16,75 +16,26 @@ // under the License. use std::collections::VecDeque; -use std::sync::Arc; -use hdfs_native::client::Client; - -use crate::raw::*; -use crate::services::hdfs_native::error::parse_hdfs_error; -use crate::*; +use crate::raw::oio; +use crate::Result; pub struct HdfsNativeLister { - client: Arc, - path: String, entries: VecDeque, } impl HdfsNativeLister { - pub fn new(path: String, client: Arc) -> Self { - HdfsNativeLister { - client, - path, - entries: VecDeque::new(), - } + pub fn new(entries: VecDeque) -> Self { + HdfsNativeLister { entries } } } impl oio::List for HdfsNativeLister { async fn next(&mut self) -> Result> { - // 如果队列中还有条目,直接返回 if let Some(entry) = self.entries.pop_front() { return Ok(Some(entry)); } - // 第一次调用时获取所有条目 - if self.entries.is_empty() { - let entries = self - .client - .list_status(&self.path, true) - .await - .map_err(parse_hdfs_error)?; - - // 如果目录为空,返回 - if entries.is_empty() { - return Ok(None); - } - - // 转换所有条目并存入队列 - for entry in entries { - let path = format!("{}/{}", self.path.trim_end_matches('/'), entry.path); - let path = path.trim_start_matches('/').to_string(); - - let mode = if entry.isdir { - EntryMode::DIR - } else { - EntryMode::FILE - }; - - let mut meta = Metadata::new(mode); - meta.set_content_length(entry.length as u64) - .set_last_modified(parse_datetime_from_from_timestamp_millis( - entry.modification_time as i64, - )?); - - self.entries.push_back(oio::Entry::new(&path, meta)); - } - - // 返回第一个条目 - return Ok(self.entries.pop_front()); - } - - // 所有条目都已返回 Ok(None) } }