From 2351d7eac62e3712b40c78b4dfd609ea3cbcf9b6 Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Fri, 24 Jan 2025 16:19:12 +0800 Subject: [PATCH] fix: list one page in fs source executor (#20292) Co-authored-by: tabversion --- .../opendal_source/opendal_enumerator.rs | 6 ++++-- .../src/source/filesystem/s3/enumerator.rs | 17 ++++++++--------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs index a770d600282a2..ff1cf47b9b364 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs @@ -18,7 +18,7 @@ use anyhow::anyhow; use async_trait::async_trait; use chrono::{DateTime, Utc}; use futures::stream::{self, BoxStream}; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use opendal::{Metakey, Operator}; use risingwave_common::types::Timestamptz; @@ -54,7 +54,9 @@ impl SplitEnumerator for OpendalEnumerator { let empty_split: OpendalFsSplit = OpendalFsSplit::empty_split(); let prefix = self.prefix.as_deref().unwrap_or("/"); - match self.op.list(prefix).await { + let mut lister = self.op.lister(prefix).await?; + // fetch one item as validation, no need to get all + match lister.try_next().await { Ok(_) => return Ok(vec![empty_split]), Err(e) => { return Err(anyhow!(e) diff --git a/src/connector/src/source/filesystem/s3/enumerator.rs b/src/connector/src/source/filesystem/s3/enumerator.rs index ff15b1c00dc37..65eb551afd183 100644 --- a/src/connector/src/source/filesystem/s3/enumerator.rs +++ b/src/connector/src/source/filesystem/s3/enumerator.rs @@ -99,15 +99,14 @@ impl SplitEnumerator for S3SplitEnumerator { } async fn list_splits(&mut self) -> crate::error::ConnectorResult> { - let mut objects = Vec::new(); - loop { - let (files, has_finished) = self.get_next_page::().await?; - objects.extend(files); - if has_finished { - break; - } - } - Ok(objects) + // fetch one page as validation, no need to get all pages + let (_, _) = self.get_next_page::().await?; + + Ok(vec![FsSplit { + name: "empty_split".to_owned(), + offset: 0, + size: 0, + }]) } }