From ce9ef07a546d90dd5c8516b0b92a7ec058652504 Mon Sep 17 00:00:00 2001 From: tabversion Date: Fri, 24 Jan 2025 14:29:11 +0800 Subject: [PATCH 1/2] fix s3 behavior --- .../src/source/filesystem/s3/enumerator.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/connector/src/source/filesystem/s3/enumerator.rs b/src/connector/src/source/filesystem/s3/enumerator.rs index ff15b1c00dc37..f19a95d2293c5 100644 --- a/src/connector/src/source/filesystem/s3/enumerator.rs +++ b/src/connector/src/source/filesystem/s3/enumerator.rs @@ -99,15 +99,14 @@ impl SplitEnumerator for S3SplitEnumerator { } async fn list_splits(&mut self) -> crate::error::ConnectorResult> { - let mut objects = Vec::new(); - loop { - let (files, has_finished) = self.get_next_page::().await?; - objects.extend(files); - if has_finished { - break; - } - } - Ok(objects) + // fetch one page as validation, no need to get all pages + let (_, _) = self.get_next_page::().await?; + + Ok(vec![FsSplit { + name: "empty_split".to_string(), + offset: 0, + size: 0, + }]) } } From 4c4225228adae44413acd99a4ab0447a259dd3e6 Mon Sep 17 00:00:00 2001 From: tabversion Date: Fri, 24 Jan 2025 14:56:16 +0800 Subject: [PATCH 2/2] fix --- .../source/filesystem/opendal_source/opendal_enumerator.rs | 6 ++++-- src/connector/src/source/filesystem/s3/enumerator.rs | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs index a770d600282a2..ff1cf47b9b364 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs @@ -18,7 +18,7 @@ use anyhow::anyhow; use async_trait::async_trait; use chrono::{DateTime, Utc}; use futures::stream::{self, BoxStream}; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use opendal::{Metakey, Operator}; use risingwave_common::types::Timestamptz; @@ -54,7 +54,9 @@ impl SplitEnumerator for OpendalEnumerator { let empty_split: OpendalFsSplit = OpendalFsSplit::empty_split(); let prefix = self.prefix.as_deref().unwrap_or("/"); - match self.op.list(prefix).await { + let mut lister = self.op.lister(prefix).await?; + // fetch one item as validation, no need to get all + match lister.try_next().await { Ok(_) => return Ok(vec![empty_split]), Err(e) => { return Err(anyhow!(e) diff --git a/src/connector/src/source/filesystem/s3/enumerator.rs b/src/connector/src/source/filesystem/s3/enumerator.rs index f19a95d2293c5..65eb551afd183 100644 --- a/src/connector/src/source/filesystem/s3/enumerator.rs +++ b/src/connector/src/source/filesystem/s3/enumerator.rs @@ -103,7 +103,7 @@ impl SplitEnumerator for S3SplitEnumerator { let (_, _) = self.get_next_page::().await?; Ok(vec![FsSplit { - name: "empty_split".to_string(), + name: "empty_split".to_owned(), offset: 0, size: 0, }])