From bb8b8554eb8e4e30a68d9806a7d8dd35bcc6520b Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Tue, 7 Jan 2025 16:12:42 +0800 Subject: [PATCH] feat(connector): introduce gcs file scan (#19974) --- proto/batch_plan.proto | 14 +++ src/batch/executors/src/executor.rs | 3 + .../executors/src/executor/gcs_file_scan.rs | 118 ++++++++++++++++++ .../executors/src/executor/s3_file_scan.rs | 4 +- .../source/iceberg/parquet_file_handler.rs | 61 +++++---- src/frontend/src/expr/table_function.rs | 76 +++++++---- .../optimizer/plan_node/batch_file_scan.rs | 56 +++++---- .../optimizer/plan_node/generic/file_scan.rs | 94 +++++++++++++- .../optimizer/plan_node/logical_file_scan.rs | 33 ++++- .../rule/table_function_to_file_scan_rule.rs | 60 ++++++--- src/frontend/src/scheduler/plan_fragmenter.rs | 2 +- 11 files changed, 422 insertions(+), 99 deletions(-) create mode 100644 src/batch/executors/src/executor/gcs_file_scan.rs diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 7ed497d3110f3..14c587cad99b4 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -89,6 +89,7 @@ message FileScanNode { enum StorageType { STORAGE_TYPE_UNSPECIFIED = 0; S3 = 1; + GCS = 2; } repeated plan_common.ColumnDesc columns = 1; @@ -100,6 +101,18 @@ message FileScanNode { repeated string file_location = 7; } +message GcsFileScanNode { + enum FileFormat { + FILE_FORMAT_UNSPECIFIED = 0; + PARQUET = 1; + } + + repeated plan_common.ColumnDesc columns = 1; + FileFormat file_format = 2; + string credential = 3; + repeated string file_location = 4; +} + // NOTE(kwannoel): This will only be used in batch mode. We can change the definition as needed. message PostgresQueryNode { repeated plan_common.ColumnDesc columns = 1; @@ -406,6 +419,7 @@ message PlanNode { IcebergScanNode iceberg_scan = 39; PostgresQueryNode postgres_query = 40; MySqlQueryNode mysql_query = 41; + GcsFileScanNode gcs_file_scan = 42; // The following nodes are used for testing. bool block_executor = 100; bool busy_loop_executor = 101; diff --git a/src/batch/executors/src/executor.rs b/src/batch/executors/src/executor.rs index 52b3df3095093..c953faaa47fe7 100644 --- a/src/batch/executors/src/executor.rs +++ b/src/batch/executors/src/executor.rs @@ -18,6 +18,7 @@ pub mod aggregation; mod delete; mod expand; mod filter; +mod gcs_file_scan; mod generic_exchange; mod group_top_n; mod hash_agg; @@ -52,6 +53,7 @@ mod values; pub use delete::*; pub use expand::*; pub use filter::*; +use gcs_file_scan::GcsFileScanExecutorBuilder; pub use generic_exchange::*; pub use group_top_n::*; pub use hash_agg::*; @@ -112,6 +114,7 @@ register_executor!(Source, SourceExecutor); register_executor!(SortOverWindow, SortOverWindowExecutor); register_executor!(MaxOneRow, MaxOneRowExecutor); register_executor!(FileScan, FileScanExecutorBuilder); +register_executor!(GcsFileScan, GcsFileScanExecutorBuilder); register_executor!(IcebergScan, IcebergScanExecutorBuilder); register_executor!(PostgresQuery, PostgresQueryExecutorBuilder); register_executor!(MysqlQuery, MySqlQueryExecutorBuilder); diff --git a/src/batch/executors/src/executor/gcs_file_scan.rs b/src/batch/executors/src/executor/gcs_file_scan.rs new file mode 100644 index 0000000000000..aa0e9c6d4a0b0 --- /dev/null +++ b/src/batch/executors/src/executor/gcs_file_scan.rs @@ -0,0 +1,118 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use futures_async_stream::try_stream; +use futures_util::stream::StreamExt; +use risingwave_common::array::DataChunk; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_connector::source::iceberg::{ + extract_bucket_and_file_name, new_gcs_operator, read_parquet_file, FileScanBackend, +}; +use risingwave_pb::batch_plan::file_scan_node; +use risingwave_pb::batch_plan::plan_node::NodeBody; + +use crate::error::BatchError; +use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder}; + +#[derive(PartialEq, Debug)] +pub enum FileFormat { + Parquet, +} + +/// Gcs file scan executor. Currently only support parquet file format. +pub struct GcsFileScanExecutor { + file_format: FileFormat, + file_location: Vec, + gcs_credential: String, + batch_size: usize, + schema: Schema, + identity: String, +} + +impl Executor for GcsFileScanExecutor { + fn schema(&self) -> &risingwave_common::catalog::Schema { + &self.schema + } + + fn identity(&self) -> &str { + &self.identity + } + + fn execute(self: Box) -> super::BoxedDataChunkStream { + self.do_execute().boxed() + } +} + +impl GcsFileScanExecutor { + pub fn new( + file_format: FileFormat, + file_location: Vec, + gcs_credential: String, + batch_size: usize, + schema: Schema, + identity: String, + ) -> Self { + Self { + file_format, + file_location, + gcs_credential, + batch_size, + schema, + identity, + } + } + + #[try_stream(ok = DataChunk, error = BatchError)] + async fn do_execute(self: Box) { + assert_eq!(self.file_format, FileFormat::Parquet); + for file in self.file_location { + let (bucket, file_name) = extract_bucket_and_file_name(&file, &FileScanBackend::Gcs)?; + let op = new_gcs_operator(self.gcs_credential.clone(), bucket.clone())?; + let chunk_stream = + read_parquet_file(op, file_name, None, None, self.batch_size, 0).await?; + #[for_await] + for stream_chunk in chunk_stream { + let stream_chunk = stream_chunk?; + let (data_chunk, _) = stream_chunk.into_parts(); + yield data_chunk; + } + } + } +} + +pub struct GcsFileScanExecutorBuilder {} + +impl BoxedExecutorBuilder for GcsFileScanExecutorBuilder { + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, + _inputs: Vec, + ) -> crate::error::Result { + let file_scan_node = try_match_expand!( + source.plan_node().get_node_body().unwrap(), + NodeBody::GcsFileScan + )?; + + Ok(Box::new(GcsFileScanExecutor::new( + match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() { + file_scan_node::FileFormat::Parquet => FileFormat::Parquet, + file_scan_node::FileFormat::Unspecified => unreachable!(), + }, + file_scan_node.file_location.clone(), + file_scan_node.credential.clone(), + source.context().get_config().developer.chunk_size, + Schema::from_iter(file_scan_node.columns.iter().map(Field::from)), + source.plan_node().get_identity().clone(), + ))) + } +} diff --git a/src/batch/executors/src/executor/s3_file_scan.rs b/src/batch/executors/src/executor/s3_file_scan.rs index 926ac5bea3778..ee3b08508cd66 100644 --- a/src/batch/executors/src/executor/s3_file_scan.rs +++ b/src/batch/executors/src/executor/s3_file_scan.rs @@ -17,7 +17,7 @@ use futures_util::stream::StreamExt; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; use risingwave_connector::source::iceberg::{ - extract_bucket_and_file_name, new_s3_operator, read_parquet_file, + extract_bucket_and_file_name, new_s3_operator, read_parquet_file, FileScanBackend, }; use risingwave_pb::batch_plan::file_scan_node; use risingwave_pb::batch_plan::file_scan_node::StorageType; @@ -84,7 +84,7 @@ impl S3FileScanExecutor { async fn do_execute(self: Box) { assert_eq!(self.file_format, FileFormat::Parquet); for file in self.file_location { - let (bucket, file_name) = extract_bucket_and_file_name(&file)?; + let (bucket, file_name) = extract_bucket_and_file_name(&file, &FileScanBackend::S3)?; let op = new_s3_operator( self.s3_region.clone(), self.s3_access_key.clone(), diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs index f676ec7f9e3f1..2cae369aa6a22 100644 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -28,7 +28,7 @@ use iceberg::io::{ use iceberg::{Error, ErrorKind}; use itertools::Itertools; use opendal::layers::{LoggingLayer, RetryLayer}; -use opendal::services::S3; +use opendal::services::{Gcs, S3}; use opendal::Operator; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask}; @@ -127,45 +127,56 @@ pub fn new_s3_operator( Ok(op) } -pub fn extract_bucket_and_file_name(location: &str) -> ConnectorResult<(String, String)> { +pub fn new_gcs_operator(credential: String, bucket: String) -> ConnectorResult { + // Create gcs builder. + let builder = Gcs::default().bucket(&bucket).credential(&credential); + + let operator: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + Ok(operator) +} + +#[derive(Debug, Clone)] +pub enum FileScanBackend { + S3, + Gcs, +} + +pub fn extract_bucket_and_file_name( + location: &str, + file_scan_backend: &FileScanBackend, +) -> ConnectorResult<(String, String)> { let url = Url::parse(location)?; let bucket = url .host_str() .ok_or_else(|| { Error::new( ErrorKind::DataInvalid, - format!("Invalid s3 url: {}, missing bucket", location), + format!("Invalid url: {}, missing bucket", location), ) })? .to_owned(); - let prefix = format!("s3://{}/", bucket); + let prefix = match file_scan_backend { + FileScanBackend::S3 => format!("s3://{}/", bucket), + FileScanBackend::Gcs => format!("gcs://{}/", bucket), + }; let file_name = location[prefix.len()..].to_string(); Ok((bucket, file_name)) } -pub async fn list_s3_directory( - s3_region: String, - s3_access_key: String, - s3_secret_key: String, +pub async fn list_data_directory( + op: Operator, dir: String, + file_scan_backend: &FileScanBackend, ) -> Result, anyhow::Error> { - let (bucket, file_name) = extract_bucket_and_file_name(&dir)?; - let prefix = format!("s3://{}/", bucket); + let (bucket, file_name) = extract_bucket_and_file_name(&dir, file_scan_backend)?; + let prefix = match file_scan_backend { + FileScanBackend::S3 => format!("s3://{}/", bucket), + FileScanBackend::Gcs => format!("gcs://{}/", bucket), + }; if dir.starts_with(&prefix) { - let mut builder = S3::default(); - builder = builder - .region(&s3_region) - .access_key_id(&s3_access_key) - .secret_access_key(&s3_secret_key) - .bucket(&bucket); - builder = builder.endpoint(&format!( - "https://{}.s3.{}.amazonaws.com", - bucket, s3_region - )); - let op = Operator::new(builder)? - .layer(RetryLayer::default()) - .finish(); - op.list(&file_name) .await .map_err(|e| anyhow!(e)) @@ -177,7 +188,7 @@ pub async fn list_s3_directory( } else { Err(Error::new( ErrorKind::DataInvalid, - format!("Invalid s3 url: {}, should start with {}", dir, prefix), + format!("Invalid url: {}, should start with {}", dir, prefix), ))? } } diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index 725b12a92851a..7905747a52b88 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -21,7 +21,8 @@ use mysql_async::prelude::*; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::types::{DataType, ScalarImpl, StructType}; use risingwave_connector::source::iceberg::{ - extract_bucket_and_file_name, get_parquet_fields, list_s3_directory, new_s3_operator, + extract_bucket_and_file_name, get_parquet_fields, list_data_directory, new_gcs_operator, + new_s3_operator, FileScanBackend, }; pub use risingwave_pb::expr::table_function::PbType as TableFunctionType; use risingwave_pb::expr::PbTableFunction; @@ -79,13 +80,11 @@ impl TableFunction { let return_type = { // arguments: // file format e.g. parquet - // storage type e.g. s3 - // s3 region - // s3 access key - // s3 secret key - // file location - if args.len() != 6 { - return Err(BindError("file_scan function only accepts 6 arguments: file_scan('parquet', 's3', s3 region, s3 access key, s3 secret key, file location)".to_owned()).into()); + // storage type e.g. s3, gcs + // For s3: file_scan(file_format, s3, s3_region, s3_access_key, s3_secret_key, file_location_or_directory) + // For gcs: file_scan(file_format, gcs, credential, file_location_or_directory) + if args.len() != 6 && args.len() != 4 { + return Err(BindError("file_scan function only accepts: file_scan('parquet', 's3', s3 region, s3 access key, s3 secret key, file location) or file_scan('parquet', 'gcs', credential, service_account, file location)".to_owned()).into()); } let mut eval_args: Vec = vec![]; for arg in &args { @@ -133,9 +132,11 @@ impl TableFunction { .into()); } - if !"s3".eq_ignore_ascii_case(&eval_args[1]) { + if !"s3".eq_ignore_ascii_case(&eval_args[1]) + && !"gcs".eq_ignore_ascii_case(&eval_args[1]) + { return Err(BindError( - "file_scan function only accepts 's3' as storage type".to_owned(), + "file_scan function only accepts 's3' or 'gcs' as storage type".to_owned(), ) .into()); } @@ -148,14 +149,40 @@ impl TableFunction { #[cfg(not(madsim))] { - let files = if eval_args[5].ends_with('/') { + let (file_scan_backend, input_file_location) = + if "s3".eq_ignore_ascii_case(&eval_args[1]) { + (FileScanBackend::S3, eval_args[5].clone()) + } else if "gcs".eq_ignore_ascii_case(&eval_args[1]) { + (FileScanBackend::Gcs, eval_args[3].clone()) + } else { + unreachable!(); + }; + let op = match file_scan_backend { + FileScanBackend::S3 => { + let (bucket, _) = + extract_bucket_and_file_name(&input_file_location, &file_scan_backend)?; + + new_s3_operator( + eval_args[2].clone(), + eval_args[3].clone(), + eval_args[4].clone(), + bucket.clone(), + )? + } + FileScanBackend::Gcs => { + let (bucket, _) = + extract_bucket_and_file_name(&input_file_location, &file_scan_backend)?; + + new_gcs_operator(eval_args[2].clone(), bucket.clone())? + } + }; + let files = if input_file_location.ends_with('/') { let files = tokio::task::block_in_place(|| { FRONTEND_RUNTIME.block_on(async { - let files = list_s3_directory( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - eval_args[5].clone(), + let files = list_data_directory( + op.clone(), + input_file_location.clone(), + &file_scan_backend, ) .await?; @@ -174,20 +201,14 @@ impl TableFunction { } else { None }; - let schema = tokio::task::block_in_place(|| { FRONTEND_RUNTIME.block_on(async { let location = match files.as_ref() { Some(files) => files[0].clone(), - None => eval_args[5].clone(), + None => input_file_location.clone(), }; - let (bucket, file_name) = extract_bucket_and_file_name(&location)?; - let op = new_s3_operator( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - bucket.clone(), - )?; + let (_, file_name) = + extract_bucket_and_file_name(&location, &file_scan_backend)?; let fields = get_parquet_fields(op, file_name).await?; @@ -207,7 +228,10 @@ impl TableFunction { if let Some(files) = files { // if the file location is a directory, we need to remove the last argument and add all files in the directory as arguments - args.remove(5); + match file_scan_backend { + FileScanBackend::S3 => args.remove(5), + FileScanBackend::Gcs => args.remove(3), + }; for file in files { args.push(ExprImpl::Literal(Box::new(Literal::new( Some(ScalarImpl::Utf8(file.into())), diff --git a/src/frontend/src/optimizer/plan_node/batch_file_scan.rs b/src/frontend/src/optimizer/plan_node/batch_file_scan.rs index 3d26515a032ae..0de65f4bd8555 100644 --- a/src/frontend/src/optimizer/plan_node/batch_file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_file_scan.rs @@ -15,7 +15,7 @@ use pretty_xmlish::XmlNode; use risingwave_pb::batch_plan::file_scan_node::{FileFormat, StorageType}; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::FileScanNode; +use risingwave_pb::batch_plan::{FileScanNode, GcsFileScanNode}; use super::batch::prelude::*; use super::utils::{childless_record, column_names_pretty, Distill}; @@ -29,11 +29,11 @@ use crate::optimizer::property::{Distribution, Order}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchFileScan { pub base: PlanBase, - pub core: generic::FileScan, + pub core: generic::FileScanBackend, } impl BatchFileScan { - pub fn new(core: generic::FileScan) -> Self { + pub fn new(core: generic::FileScanBackend) -> Self { let base = PlanBase::new_batch_with_core(&core, Distribution::Single, Order::any()); Self { base, core } @@ -77,24 +77,38 @@ impl ToDistributedBatch for BatchFileScan { impl ToBatchPb for BatchFileScan { fn to_batch_prost_body(&self) -> NodeBody { - NodeBody::FileScan(FileScanNode { - columns: self - .core - .columns() - .into_iter() - .map(|col| col.to_protobuf()) - .collect(), - file_format: match self.core.file_format { - generic::FileFormat::Parquet => FileFormat::Parquet as i32, - }, - storage_type: match self.core.storage_type { - generic::StorageType::S3 => StorageType::S3 as i32, - }, - s3_region: self.core.s3_region.clone(), - s3_access_key: self.core.s3_access_key.clone(), - s3_secret_key: self.core.s3_secret_key.clone(), - file_location: self.core.file_location.clone(), - }) + match &self.core { + generic::FileScanBackend::FileScan(file_scan) => NodeBody::FileScan(FileScanNode { + columns: file_scan + .columns() + .into_iter() + .map(|col| col.to_protobuf()) + .collect(), + file_format: match file_scan.file_format { + generic::FileFormat::Parquet => FileFormat::Parquet as i32, + }, + storage_type: StorageType::S3 as i32, + + s3_region: file_scan.s3_region.clone(), + s3_access_key: file_scan.s3_access_key.clone(), + s3_secret_key: file_scan.s3_secret_key.clone(), + file_location: file_scan.file_location.clone(), + }), + generic::FileScanBackend::GcsFileScan(gcs_file_scan) => { + NodeBody::GcsFileScan(GcsFileScanNode { + columns: gcs_file_scan + .columns() + .into_iter() + .map(|col| col.to_protobuf()) + .collect(), + file_format: match gcs_file_scan.file_format { + generic::FileFormat::Parquet => FileFormat::Parquet as i32, + }, + credential: gcs_file_scan.credential.clone(), + file_location: gcs_file_scan.file_location.clone(), + }) + } + } } } diff --git a/src/frontend/src/optimizer/plan_node/generic/file_scan.rs b/src/frontend/src/optimizer/plan_node/generic/file_scan.rs index c50683c2d3dcc..95ea6e1f4fcce 100644 --- a/src/frontend/src/optimizer/plan_node/generic/file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/file_scan.rs @@ -27,6 +27,59 @@ pub enum FileFormat { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum StorageType { S3, + Gcs, +} + +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub enum FileScanBackend { + FileScan(FileScan), + GcsFileScan(GcsFileScan), +} + +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub struct GcsFileScan { + pub schema: Schema, + pub file_format: FileFormat, + pub storage_type: StorageType, + pub credential: String, + pub file_location: Vec, + + #[educe(PartialEq(ignore))] + #[educe(Hash(ignore))] + pub ctx: OptimizerContextRef, +} + +impl GenericPlanNode for GcsFileScan { + fn schema(&self) -> Schema { + self.schema.clone() + } + + fn stream_key(&self) -> Option> { + None + } + + fn ctx(&self) -> OptimizerContextRef { + self.ctx.clone() + } + + fn functional_dependency(&self) -> FunctionalDependencySet { + FunctionalDependencySet::new(self.schema.len()) + } +} + +impl FileScan { + pub fn columns(&self) -> Vec { + self.schema + .fields + .iter() + .enumerate() + .map(|(i, f)| { + ColumnDesc::named(f.name.clone(), ColumnId::new(i as i32), f.data_type.clone()) + }) + .collect() + } } #[derive(Debug, Clone, Educe)] @@ -63,7 +116,7 @@ impl GenericPlanNode for FileScan { } } -impl FileScan { +impl GcsFileScan { pub fn columns(&self) -> Vec { self.schema .fields @@ -75,3 +128,42 @@ impl FileScan { .collect() } } + +impl GenericPlanNode for FileScanBackend { + fn schema(&self) -> Schema { + match self { + FileScanBackend::FileScan(file_scan) => file_scan.schema(), + FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.schema(), + } + } + + fn stream_key(&self) -> Option> { + match self { + FileScanBackend::FileScan(file_scan) => file_scan.stream_key(), + FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.stream_key(), + } + } + + fn ctx(&self) -> OptimizerContextRef { + match self { + FileScanBackend::FileScan(file_scan) => file_scan.ctx(), + FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.ctx(), + } + } + + fn functional_dependency(&self) -> FunctionalDependencySet { + match self { + FileScanBackend::FileScan(file_scan) => file_scan.functional_dependency(), + FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.functional_dependency(), + } + } +} + +impl FileScanBackend { + pub fn file_location(&self) -> Vec { + match self { + FileScanBackend::FileScan(file_scan) => file_scan.file_location.clone(), + FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.file_location.clone(), + } + } +} diff --git a/src/frontend/src/optimizer/plan_node/logical_file_scan.rs b/src/frontend/src/optimizer/plan_node/logical_file_scan.rs index 5f6a2753cfcc3..429e1974d8e84 100644 --- a/src/frontend/src/optimizer/plan_node/logical_file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_file_scan.rs @@ -35,11 +35,11 @@ use crate::OptimizerContextRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogicalFileScan { pub base: PlanBase, - pub core: generic::FileScan, + pub core: generic::FileScanBackend, } impl LogicalFileScan { - pub fn new( + pub fn new_s3_logical_file_scan( ctx: OptimizerContextRef, schema: Schema, file_format: String, @@ -52,7 +52,7 @@ impl LogicalFileScan { assert!("parquet".eq_ignore_ascii_case(&file_format)); assert!("s3".eq_ignore_ascii_case(&storage_type)); - let core = generic::FileScan { + let core = generic::FileScanBackend::FileScan(generic::FileScan { schema, file_format: generic::FileFormat::Parquet, storage_type: generic::StorageType::S3, @@ -61,7 +61,32 @@ impl LogicalFileScan { s3_secret_key, file_location, ctx, - }; + }); + + let base = PlanBase::new_logical_with_core(&core); + + LogicalFileScan { base, core } + } + + pub fn new_gcs_logical_file_scan( + ctx: OptimizerContextRef, + schema: Schema, + file_format: String, + storage_type: String, + credential: String, + file_location: Vec, + ) -> Self { + assert!("parquet".eq_ignore_ascii_case(&file_format)); + assert!("gcs".eq_ignore_ascii_case(&storage_type)); + + let core = generic::FileScanBackend::GcsFileScan(generic::GcsFileScan { + schema, + file_format: generic::FileFormat::Parquet, + storage_type: generic::StorageType::Gcs, + credential, + file_location, + ctx, + }); let base = PlanBase::new_logical_with_core(&core); diff --git a/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs index cc0ba289862ae..1c081796bbe22 100644 --- a/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs +++ b/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs @@ -43,7 +43,6 @@ impl Rule for TableFunctionToFileScanRule { let schema = Schema::new(fields); - assert!(logical_table_function.table_function().args.len() >= 6); let mut eval_args = vec![]; for arg in &logical_table_function.table_function().args { assert_eq!(arg.return_type(), DataType::Varchar); @@ -58,25 +57,48 @@ impl Rule for TableFunctionToFileScanRule { } } assert!("parquet".eq_ignore_ascii_case(&eval_args[0])); - assert!("s3".eq_ignore_ascii_case(&eval_args[1])); - let s3_region = eval_args[2].clone(); - let s3_access_key = eval_args[3].clone(); - let s3_secret_key = eval_args[4].clone(); - // The rest of the arguments are file locations - let file_location = eval_args[5..].iter().cloned().collect_vec(); - Some( - LogicalFileScan::new( - logical_table_function.ctx(), - schema, - "parquet".to_owned(), - "s3".to_owned(), - s3_region, - s3_access_key, - s3_secret_key, - file_location, + assert!( + ("s3".eq_ignore_ascii_case(&eval_args[1])) + || "gcs".eq_ignore_ascii_case(&eval_args[1]) + ); + + if "s3".eq_ignore_ascii_case(&eval_args[1]) { + let s3_region = eval_args[2].clone(); + let s3_access_key = eval_args[3].clone(); + let s3_secret_key = eval_args[4].clone(); + // The rest of the arguments are file locations + let file_location = eval_args[5..].iter().cloned().collect_vec(); + Some( + LogicalFileScan::new_s3_logical_file_scan( + logical_table_function.ctx(), + schema, + "parquet".to_owned(), + "s3".to_owned(), + s3_region, + s3_access_key, + s3_secret_key, + file_location, + ) + .into(), + ) + } else if "gcs".eq_ignore_ascii_case(&eval_args[1]) { + let credential = eval_args[2].clone(); + // The rest of the arguments are file locations + let file_location = eval_args[3..].iter().cloned().collect_vec(); + Some( + LogicalFileScan::new_gcs_logical_file_scan( + logical_table_function.ctx(), + schema, + "parquet".to_owned(), + "gcs".to_owned(), + credential, + file_location, + ) + .into(), ) - .into(), - ) + } else { + unreachable!() + } } else { unreachable!("TableFunction return type should be struct") } diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index e5f822fbaf477..76f5428348003 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -1188,7 +1188,7 @@ impl BatchPlanFragmenter { if let Some(batch_file_scan) = node.as_batch_file_scan() { return Ok(Some(FileScanInfo { - file_location: batch_file_scan.core.file_location.clone(), + file_location: batch_file_scan.core.file_location().clone(), })); }