Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaohaiyuan committed Feb 12, 2025
1 parent 9977920 commit 6d91de4
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 74 deletions.
32 changes: 7 additions & 25 deletions core/src/services/hdfs_native/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl Access for HdfsNativeBackend {

let f = self.client.read(&p).await.map_err(parse_hdfs_error)?;

let r = HdfsNativeReader::new(f);
let r = HdfsNativeReader::new(f, args.range().size().unwrap_or(u64::MAX) as _);

Ok((RpRead::new(), r))
}
Expand Down Expand Up @@ -241,31 +241,13 @@ impl Access for HdfsNativeBackend {

async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
let p = build_rooted_abs_path(&self.root, path);
let mut deque = VecDeque::new();
let statues = self
.client
.list_status(&p, true)
.await
.map_err(parse_hdfs_error)?;
let iter = self.client.list_status_iter(path, recursive);
let stream: BoxStream<'static, Result<FileStatus>> = iter.into_stream();

for entry in statues {
let path = format!("{}/{}", p.trim_end_matches('/'), entry.path);
let path = path.trim_start_matches('/').to_string();

let mode = if entry.isdir {
EntryMode::DIR
} else {
EntryMode::FILE
};

let mut meta = Metadata::new(mode);
meta.set_content_length(entry.length as u64)
.set_last_modified(parse_datetime_from_from_timestamp_millis(
entry.modification_time as i64,
)?);
deque.push_back(oio::Entry::new(&path, meta));
}
Ok((RpList::default(), Some(HdfsNativeLister::new(deque))))
Ok((
RpList::default(),
Some(HdfsNativeLister::new(&self.root, stream, path)),
))
}

async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
Expand Down
37 changes: 31 additions & 6 deletions core/src/services/hdfs_native/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,46 @@ use crate::raw::oio;
use crate::Result;

pub struct HdfsNativeLister {
entries: VecDeque<oio::Entry>,
root: String,
stream: BoxStream<'static, Result<FileStatus>>,
current_path: Option<String>,
}

impl HdfsNativeLister {
pub fn new(entries: VecDeque<oio::Entry>) -> Self {
HdfsNativeLister { entries }
pub fn new(root: &str, stream: BoxStream<'static, Result<FileStatus>>, path: &str) -> Self {
HdfsNativeLister {
root: root.to_string(),
stream,
current_path: Some(path.to_string()),
}
}
}

impl oio::List for HdfsNativeLister {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
if let Some(entry) = self.entries.pop_front() {
return Ok(Some(entry));
if let Some(path) = self.current_path.take() {
return Ok(Some(oio::Entry::new(&path, Metadata::new(EntryMode::DIR))));
}

Ok(None)
let status: FileStatus = match self.stream.next().await.map_err(parse_hdfs_error)? {
Some(status) => status,
None => return Ok(None),
};

let path = build_rel_path(&self.root, status.path());

let entry = if status.isdir {
// Make sure we are returning the correct path.
oio::Entry::new(&format!("{path}/"), Metadata::new(EntryMode::DIR))
} else {
let meta = Metadata::new(EntryMode::FILE)
.with_content_length(status.length as u64)
.with_last_modified(parse_datetime_from_from_timestamp_millis(
status.modification_time as i64,
)?);
oio::Entry::new(&path, meta)
};

Ok(Some(entry))
}
}
41 changes: 4 additions & 37 deletions core/src/services/hdfs_native/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,51 +26,18 @@ use crate::*;

pub struct HdfsNativeReader {
f: FileReader,
read: usize,
size: usize,
buf_size: usize,
buf: BytesMut,
}

impl HdfsNativeReader {
pub fn new(f: FileReader) -> Self {
HdfsNativeReader {
f,
read: 0,
size: 0,
buf_size: 0,
buf: BytesMut::new(),
}
pub fn new(f: FileReader, size: usize) -> Self {
HdfsNativeReader { f, size }
}
}

impl oio::Read for HdfsNativeReader {
async fn read(&mut self) -> Result<Buffer> {
if self.read >= self.size {
return Ok(Buffer::new());
}

let size = (self.size - self.read).min(self.buf_size);
self.buf.reserve(size);

let buf = &mut self.buf.spare_capacity_mut()[..size];
let mut read_buf: ReadBuf = ReadBuf::uninit(buf);

// SAFETY: Read at most `limit` bytes into `read_buf`.
unsafe {
read_buf.assume_init(size);
}

let len = read_buf.initialize_unfilled().len();
let bytes: Bytes = self.f.read(len).await.map_err(parse_hdfs_error)?;
read_buf.put_slice(&bytes);
self.read += bytes.len();

// Safety: We make sure that bs contains `n` more bytes.
let filled = read_buf.filled().len();
unsafe { self.buf.set_len(filled) }

let frozen = self.buf.split().freeze();
Ok(Buffer::from(frozen))
let bytes: Bytes = self.f.read(self.size).await.map_err(parse_hdfs_error)?;
Ok(Buffer::from(bytes))
}
}
8 changes: 2 additions & 6 deletions core/src/services/hdfs_native/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,8 @@ impl HdfsNativeWriter {

impl oio::Write for HdfsNativeWriter {
async fn write(&mut self, mut bs: Buffer) -> Result<()> {
while bs.has_remaining() {
let n = self
.f
.write(Bytes::copy_from_slice(bs.chunk()))
.await
.map_err(parse_hdfs_error)?;
for chunk in bs {
let n = self.f.write(chunk).await.map_err(parse_hdfs_error)?;
bs.advance(n);
}

Expand Down

0 comments on commit 6d91de4

Please sign in to comment.