diff --git a/core/src/services/hdfs_native/backend.rs b/core/src/services/hdfs_native/backend.rs index 4eb3f788151..00b651d2e74 100644 --- a/core/src/services/hdfs_native/backend.rs +++ b/core/src/services/hdfs_native/backend.rs @@ -213,7 +213,7 @@ impl Access for HdfsNativeBackend { let f = self.client.read(&p).await.map_err(parse_hdfs_error)?; - let r = HdfsNativeReader::new(f); + let r = HdfsNativeReader::new(f, args.range().size().unwrap_or(u64::MAX) as _); Ok((RpRead::new(), r)) } @@ -241,31 +241,13 @@ impl Access for HdfsNativeBackend { async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> { let p = build_rooted_abs_path(&self.root, path); - let mut deque = VecDeque::new(); - let statues = self - .client - .list_status(&p, true) - .await - .map_err(parse_hdfs_error)?; + let iter = self.client.list_status_iter(path, recursive); + let stream: BoxStream<'static, Result> = iter.into_stream(); - for entry in statues { - let path = format!("{}/{}", p.trim_end_matches('/'), entry.path); - let path = path.trim_start_matches('/').to_string(); - - let mode = if entry.isdir { - EntryMode::DIR - } else { - EntryMode::FILE - }; - - let mut meta = Metadata::new(mode); - meta.set_content_length(entry.length as u64) - .set_last_modified(parse_datetime_from_from_timestamp_millis( - entry.modification_time as i64, - )?); - deque.push_back(oio::Entry::new(&path, meta)); - } - Ok((RpList::default(), Some(HdfsNativeLister::new(deque)))) + Ok(( + RpList::default(), + Some(HdfsNativeLister::new(&self.root, stream, path)), + )) } async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { diff --git a/core/src/services/hdfs_native/lister.rs b/core/src/services/hdfs_native/lister.rs index f0e410bc4fa..ba794092a77 100644 --- a/core/src/services/hdfs_native/lister.rs +++ b/core/src/services/hdfs_native/lister.rs @@ -21,21 +21,46 @@ use crate::raw::oio; use crate::Result; pub struct HdfsNativeLister { - entries: VecDeque, + root: String, + stream: BoxStream<'static, Result>, + current_path: Option, } impl HdfsNativeLister { - pub fn new(entries: VecDeque) -> Self { - HdfsNativeLister { entries } + pub fn new(root: &str, stream: BoxStream<'static, Result>, path: &str) -> Self { + HdfsNativeLister { + root: root.to_string(), + stream, + current_path: Some(path.to_string()), + } } } impl oio::List for HdfsNativeLister { async fn next(&mut self) -> Result> { - if let Some(entry) = self.entries.pop_front() { - return Ok(Some(entry)); + if let Some(path) = self.current_path.take() { + return Ok(Some(oio::Entry::new(&path, Metadata::new(EntryMode::DIR)))); } - Ok(None) + let status: FileStatus = match self.stream.next().await.map_err(parse_hdfs_error)? { + Some(status) => status, + None => return Ok(None), + }; + + let path = build_rel_path(&self.root, status.path()); + + let entry = if status.isdir { + // Make sure we are returning the correct path. + oio::Entry::new(&format!("{path}/"), Metadata::new(EntryMode::DIR)) + } else { + let meta = Metadata::new(EntryMode::FILE) + .with_content_length(status.length as u64) + .with_last_modified(parse_datetime_from_from_timestamp_millis( + status.modification_time as i64, + )?); + oio::Entry::new(&path, meta) + }; + + Ok(Some(entry)) } } diff --git a/core/src/services/hdfs_native/reader.rs b/core/src/services/hdfs_native/reader.rs index 968dcfbf68f..8b9f01a49d4 100644 --- a/core/src/services/hdfs_native/reader.rs +++ b/core/src/services/hdfs_native/reader.rs @@ -26,51 +26,18 @@ use crate::*; pub struct HdfsNativeReader { f: FileReader, - read: usize, size: usize, - buf_size: usize, - buf: BytesMut, } impl HdfsNativeReader { - pub fn new(f: FileReader) -> Self { - HdfsNativeReader { - f, - read: 0, - size: 0, - buf_size: 0, - buf: BytesMut::new(), - } + pub fn new(f: FileReader, size: usize) -> Self { + HdfsNativeReader { f, size } } } impl oio::Read for HdfsNativeReader { async fn read(&mut self) -> Result { - if self.read >= self.size { - return Ok(Buffer::new()); - } - - let size = (self.size - self.read).min(self.buf_size); - self.buf.reserve(size); - - let buf = &mut self.buf.spare_capacity_mut()[..size]; - let mut read_buf: ReadBuf = ReadBuf::uninit(buf); - - // SAFETY: Read at most `limit` bytes into `read_buf`. - unsafe { - read_buf.assume_init(size); - } - - let len = read_buf.initialize_unfilled().len(); - let bytes: Bytes = self.f.read(len).await.map_err(parse_hdfs_error)?; - read_buf.put_slice(&bytes); - self.read += bytes.len(); - - // Safety: We make sure that bs contains `n` more bytes. - let filled = read_buf.filled().len(); - unsafe { self.buf.set_len(filled) } - - let frozen = self.buf.split().freeze(); - Ok(Buffer::from(frozen)) + let bytes: Bytes = self.f.read(self.size).await.map_err(parse_hdfs_error)?; + Ok(Buffer::from(bytes)) } } diff --git a/core/src/services/hdfs_native/writer.rs b/core/src/services/hdfs_native/writer.rs index 4f01cf4204e..82045d286ee 100644 --- a/core/src/services/hdfs_native/writer.rs +++ b/core/src/services/hdfs_native/writer.rs @@ -34,12 +34,8 @@ impl HdfsNativeWriter { impl oio::Write for HdfsNativeWriter { async fn write(&mut self, mut bs: Buffer) -> Result<()> { - while bs.has_remaining() { - let n = self - .f - .write(Bytes::copy_from_slice(bs.chunk())) - .await - .map_err(parse_hdfs_error)?; + for chunk in bs { + let n = self.f.write(chunk).await.map_err(parse_hdfs_error)?; bs.advance(n); }