Skip to content

Commit

Permalink
feat(connector): introduce gcs file scan (#19974)
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Jan 7, 2025
1 parent 189319d commit bb8b855
Show file tree
Hide file tree
Showing 11 changed files with 422 additions and 99 deletions.
14 changes: 14 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ message FileScanNode {
enum StorageType {
STORAGE_TYPE_UNSPECIFIED = 0;
S3 = 1;
GCS = 2;
}

repeated plan_common.ColumnDesc columns = 1;
Expand All @@ -100,6 +101,18 @@ message FileScanNode {
repeated string file_location = 7;
}

message GcsFileScanNode {
enum FileFormat {
FILE_FORMAT_UNSPECIFIED = 0;
PARQUET = 1;
}

repeated plan_common.ColumnDesc columns = 1;
FileFormat file_format = 2;
string credential = 3;
repeated string file_location = 4;
}

// NOTE(kwannoel): This will only be used in batch mode. We can change the definition as needed.
message PostgresQueryNode {
repeated plan_common.ColumnDesc columns = 1;
Expand Down Expand Up @@ -406,6 +419,7 @@ message PlanNode {
IcebergScanNode iceberg_scan = 39;
PostgresQueryNode postgres_query = 40;
MySqlQueryNode mysql_query = 41;
GcsFileScanNode gcs_file_scan = 42;
// The following nodes are used for testing.
bool block_executor = 100;
bool busy_loop_executor = 101;
Expand Down
3 changes: 3 additions & 0 deletions src/batch/executors/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod aggregation;
mod delete;
mod expand;
mod filter;
mod gcs_file_scan;
mod generic_exchange;
mod group_top_n;
mod hash_agg;
Expand Down Expand Up @@ -52,6 +53,7 @@ mod values;
pub use delete::*;
pub use expand::*;
pub use filter::*;
use gcs_file_scan::GcsFileScanExecutorBuilder;
pub use generic_exchange::*;
pub use group_top_n::*;
pub use hash_agg::*;
Expand Down Expand Up @@ -112,6 +114,7 @@ register_executor!(Source, SourceExecutor);
register_executor!(SortOverWindow, SortOverWindowExecutor);
register_executor!(MaxOneRow, MaxOneRowExecutor);
register_executor!(FileScan, FileScanExecutorBuilder);
register_executor!(GcsFileScan, GcsFileScanExecutorBuilder);
register_executor!(IcebergScan, IcebergScanExecutorBuilder);
register_executor!(PostgresQuery, PostgresQueryExecutorBuilder);
register_executor!(MysqlQuery, MySqlQueryExecutorBuilder);
Expand Down
118 changes: 118 additions & 0 deletions src/batch/executors/src/executor/gcs_file_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_connector::source::iceberg::{
extract_bucket_and_file_name, new_gcs_operator, read_parquet_file, FileScanBackend,
};
use risingwave_pb::batch_plan::file_scan_node;
use risingwave_pb::batch_plan::plan_node::NodeBody;

use crate::error::BatchError;
use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};

#[derive(PartialEq, Debug)]
pub enum FileFormat {
Parquet,
}

/// Gcs file scan executor. Currently only support parquet file format.
pub struct GcsFileScanExecutor {
file_format: FileFormat,
file_location: Vec<String>,
gcs_credential: String,
batch_size: usize,
schema: Schema,
identity: String,
}

impl Executor for GcsFileScanExecutor {
fn schema(&self) -> &risingwave_common::catalog::Schema {
&self.schema
}

fn identity(&self) -> &str {
&self.identity
}

fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
self.do_execute().boxed()
}
}

impl GcsFileScanExecutor {
pub fn new(
file_format: FileFormat,
file_location: Vec<String>,
gcs_credential: String,
batch_size: usize,
schema: Schema,
identity: String,
) -> Self {
Self {
file_format,
file_location,
gcs_credential,
batch_size,
schema,
identity,
}
}

#[try_stream(ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
assert_eq!(self.file_format, FileFormat::Parquet);
for file in self.file_location {
let (bucket, file_name) = extract_bucket_and_file_name(&file, &FileScanBackend::Gcs)?;
let op = new_gcs_operator(self.gcs_credential.clone(), bucket.clone())?;
let chunk_stream =
read_parquet_file(op, file_name, None, None, self.batch_size, 0).await?;
#[for_await]
for stream_chunk in chunk_stream {
let stream_chunk = stream_chunk?;
let (data_chunk, _) = stream_chunk.into_parts();
yield data_chunk;
}
}
}
}

pub struct GcsFileScanExecutorBuilder {}

impl BoxedExecutorBuilder for GcsFileScanExecutorBuilder {
async fn new_boxed_executor(
source: &ExecutorBuilder<'_>,
_inputs: Vec<BoxedExecutor>,
) -> crate::error::Result<BoxedExecutor> {
let file_scan_node = try_match_expand!(
source.plan_node().get_node_body().unwrap(),
NodeBody::GcsFileScan
)?;

Ok(Box::new(GcsFileScanExecutor::new(
match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() {
file_scan_node::FileFormat::Parquet => FileFormat::Parquet,
file_scan_node::FileFormat::Unspecified => unreachable!(),
},
file_scan_node.file_location.clone(),
file_scan_node.credential.clone(),
source.context().get_config().developer.chunk_size,
Schema::from_iter(file_scan_node.columns.iter().map(Field::from)),
source.plan_node().get_identity().clone(),
)))
}
}
4 changes: 2 additions & 2 deletions src/batch/executors/src/executor/s3_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use futures_util::stream::StreamExt;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_connector::source::iceberg::{
extract_bucket_and_file_name, new_s3_operator, read_parquet_file,
extract_bucket_and_file_name, new_s3_operator, read_parquet_file, FileScanBackend,
};
use risingwave_pb::batch_plan::file_scan_node;
use risingwave_pb::batch_plan::file_scan_node::StorageType;
Expand Down Expand Up @@ -84,7 +84,7 @@ impl S3FileScanExecutor {
async fn do_execute(self: Box<Self>) {
assert_eq!(self.file_format, FileFormat::Parquet);
for file in self.file_location {
let (bucket, file_name) = extract_bucket_and_file_name(&file)?;
let (bucket, file_name) = extract_bucket_and_file_name(&file, &FileScanBackend::S3)?;
let op = new_s3_operator(
self.s3_region.clone(),
self.s3_access_key.clone(),
Expand Down
61 changes: 36 additions & 25 deletions src/connector/src/source/iceberg/parquet_file_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use iceberg::io::{
use iceberg::{Error, ErrorKind};
use itertools::Itertools;
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::S3;
use opendal::services::{Gcs, S3};
use opendal::Operator;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask};
Expand Down Expand Up @@ -127,45 +127,56 @@ pub fn new_s3_operator(
Ok(op)
}

pub fn extract_bucket_and_file_name(location: &str) -> ConnectorResult<(String, String)> {
pub fn new_gcs_operator(credential: String, bucket: String) -> ConnectorResult<Operator> {
// Create gcs builder.
let builder = Gcs::default().bucket(&bucket).credential(&credential);

let operator: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
.finish();
Ok(operator)
}

#[derive(Debug, Clone)]
pub enum FileScanBackend {
S3,
Gcs,
}

pub fn extract_bucket_and_file_name(
location: &str,
file_scan_backend: &FileScanBackend,
) -> ConnectorResult<(String, String)> {
let url = Url::parse(location)?;
let bucket = url
.host_str()
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Invalid s3 url: {}, missing bucket", location),
format!("Invalid url: {}, missing bucket", location),
)
})?
.to_owned();
let prefix = format!("s3://{}/", bucket);
let prefix = match file_scan_backend {
FileScanBackend::S3 => format!("s3://{}/", bucket),
FileScanBackend::Gcs => format!("gcs://{}/", bucket),
};
let file_name = location[prefix.len()..].to_string();
Ok((bucket, file_name))
}

pub async fn list_s3_directory(
s3_region: String,
s3_access_key: String,
s3_secret_key: String,
pub async fn list_data_directory(
op: Operator,
dir: String,
file_scan_backend: &FileScanBackend,
) -> Result<Vec<String>, anyhow::Error> {
let (bucket, file_name) = extract_bucket_and_file_name(&dir)?;
let prefix = format!("s3://{}/", bucket);
let (bucket, file_name) = extract_bucket_and_file_name(&dir, file_scan_backend)?;
let prefix = match file_scan_backend {
FileScanBackend::S3 => format!("s3://{}/", bucket),
FileScanBackend::Gcs => format!("gcs://{}/", bucket),
};
if dir.starts_with(&prefix) {
let mut builder = S3::default();
builder = builder
.region(&s3_region)
.access_key_id(&s3_access_key)
.secret_access_key(&s3_secret_key)
.bucket(&bucket);
builder = builder.endpoint(&format!(
"https://{}.s3.{}.amazonaws.com",
bucket, s3_region
));
let op = Operator::new(builder)?
.layer(RetryLayer::default())
.finish();

op.list(&file_name)
.await
.map_err(|e| anyhow!(e))
Expand All @@ -177,7 +188,7 @@ pub async fn list_s3_directory(
} else {
Err(Error::new(
ErrorKind::DataInvalid,
format!("Invalid s3 url: {}, should start with {}", dir, prefix),
format!("Invalid url: {}, should start with {}", dir, prefix),
))?
}
}
Expand Down
Loading

0 comments on commit bb8b855

Please sign in to comment.