diff --git a/src/common/datasource/src/buffered_writer.rs b/src/common/datasource/src/buffered_writer.rs index f29d12caedf3..5ab3ce588681 100644 --- a/src/common/datasource/src/buffered_writer.rs +++ b/src/common/datasource/src/buffered_writer.rs @@ -12,24 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::future::Future; + use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::parquet::format::FileMetaData; -use object_store::Writer; use snafu::{OptionExt, ResultExt}; use tokio::io::{AsyncWrite, AsyncWriteExt}; -use tokio_util::compat::Compat; use crate::error::{self, Result}; use crate::share_buffer::SharedBuffer; -pub struct BufferedWriter { - writer: T, - /// None stands for [`BufferedWriter`] closed. +pub struct LazyBufferedWriter { + path: String, + writer_factory: F, + writer: Option, + /// None stands for [`LazyBufferedWriter`] closed. encoder: Option, buffer: SharedBuffer, + rows_written: usize, bytes_written: u64, - flushed: bool, threshold: usize, } @@ -42,58 +44,79 @@ pub trait ArrowWriterCloser { async fn close(mut self) -> Result; } -pub type DefaultBufferedWriter = BufferedWriter, E>; - -impl - BufferedWriter +impl< + T: AsyncWrite + Send + Unpin, + U: DfRecordBatchEncoder + ArrowWriterCloser, + F: FnMut(String) -> Fut, + Fut: Future>, + > LazyBufferedWriter { + /// Closes `LazyBufferedWriter` and optionally flushes all data to underlying storage + /// if any row's been written. pub async fn close_with_arrow_writer(mut self) -> Result<(FileMetaData, u64)> { let encoder = self .encoder .take() .context(error::BufferedWriterClosedSnafu)?; let metadata = encoder.close().await?; - let written = self.try_flush(true).await?; + + // Use `rows_written` to keep a track of if any rows have been written. + // If no row's been written, then we can simply close the underlying + // writer without flush so that no file will be actually created. + if self.rows_written != 0 { + self.bytes_written += self.try_flush(true).await?; + } // It's important to shut down! flushes all pending writes - self.close().await?; - Ok((metadata, written)) + self.close_inner_writer().await?; + Ok((metadata, self.bytes_written)) } } -impl BufferedWriter { - pub async fn close(&mut self) -> Result<()> { - self.writer.shutdown().await.context(error::AsyncWriteSnafu) +impl< + T: AsyncWrite + Send + Unpin, + U: DfRecordBatchEncoder, + F: FnMut(String) -> Fut, + Fut: Future>, + > LazyBufferedWriter +{ + /// Closes the writer without flushing the buffer data. + pub async fn close_inner_writer(&mut self) -> Result<()> { + if let Some(writer) = &mut self.writer { + writer.shutdown().await.context(error::AsyncWriteSnafu)?; + } + Ok(()) } - pub fn new(threshold: usize, buffer: SharedBuffer, encoder: U, writer: T) -> Self { + pub fn new( + threshold: usize, + buffer: SharedBuffer, + encoder: U, + path: impl AsRef, + writer_factory: F, + ) -> Self { Self { + path: path.as_ref().to_string(), threshold, - writer, encoder: Some(encoder), buffer, + rows_written: 0, bytes_written: 0, - flushed: false, + writer_factory, + writer: None, } } - pub fn bytes_written(&self) -> u64 { - self.bytes_written - } - pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> { let encoder = self .encoder .as_mut() .context(error::BufferedWriterClosedSnafu)?; encoder.write(batch)?; - self.try_flush(false).await?; + self.rows_written += batch.num_rows(); + self.bytes_written += self.try_flush(false).await?; Ok(()) } - pub fn flushed(&self) -> bool { - self.flushed - } - pub async fn try_flush(&mut self, all: bool) -> Result { let mut bytes_written: u64 = 0; @@ -106,7 +129,8 @@ impl BufferedWriter }; let size = chunk.len(); - self.writer + self.maybe_init_writer() + .await? .write_all(&chunk) .await .context(error::AsyncWriteSnafu)?; @@ -117,22 +141,27 @@ impl BufferedWriter if all { bytes_written += self.try_flush_all().await?; } - - self.flushed = bytes_written > 0; - self.bytes_written += bytes_written; - Ok(bytes_written) } + /// Only initiates underlying file writer when rows have been written. + async fn maybe_init_writer(&mut self) -> Result<&mut T> { + if let Some(ref mut writer) = self.writer { + Ok(writer) + } else { + let writer = (self.writer_factory)(self.path.clone()).await?; + Ok(self.writer.insert(writer)) + } + } + async fn try_flush_all(&mut self) -> Result { let remain = self.buffer.buffer.lock().unwrap().split(); let size = remain.len(); - - self.writer + self.maybe_init_writer() + .await? .write_all(&remain) .await .context(error::AsyncWriteSnafu)?; - Ok(size as u64) } } diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index 4987510ecc0d..bcb777a9ddf3 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -35,12 +35,11 @@ use datafusion::physical_plan::SendableRecordBatchStream; use futures::StreamExt; use object_store::ObjectStore; use snafu::ResultExt; -use tokio_util::compat::FuturesAsyncWriteCompatExt; use self::csv::CsvFormat; use self::json::JsonFormat; use self::parquet::ParquetFormat; -use crate::buffered_writer::{BufferedWriter, DfRecordBatchEncoder}; +use crate::buffered_writer::{DfRecordBatchEncoder, LazyBufferedWriter}; use crate::compression::CompressionType; use crate::error::{self, Result}; use crate::share_buffer::SharedBuffer; @@ -181,15 +180,14 @@ pub async fn stream_to_file T>( threshold: usize, encoder_factory: U, ) -> Result { - let writer = store - .writer(path) - .await - .context(error::WriteObjectSnafu { path })? - .compat_write(); - let buffer = SharedBuffer::with_capacity(threshold); let encoder = encoder_factory(buffer.clone()); - let mut writer = BufferedWriter::new(threshold, buffer, encoder, writer); + let mut writer = LazyBufferedWriter::new(threshold, buffer, encoder, path, |path| async { + store + .writer(&path) + .await + .context(error::WriteObjectSnafu { path }) + }); let mut rows = 0; @@ -201,8 +199,7 @@ pub async fn stream_to_file T>( // Flushes all pending writes writer.try_flush(true).await?; - - writer.close().await?; + writer.close_inner_writer().await?; Ok(rows) } diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 43a4a81a34d3..7f9d697ebf6b 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -27,7 +27,7 @@ use arrow_array::{ use async_compat::CompatExt; use async_stream::try_stream; use async_trait::async_trait; -use common_telemetry::{error, warn}; +use common_telemetry::{debug, error}; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; use common_time::Timestamp; @@ -132,13 +132,8 @@ impl<'a> ParquetWriter<'a> { } if rows_written == 0 { - // if the source does not contain any batch, we skip writing an empty parquet file. - if !buffered_writer.abort().await { - warn!( - "Partial file {} has been uploaded to remote storage", - self.file_path - ); - } + debug!("No data written, try abort writer: {}", self.file_path); + buffered_writer.close().await?; return Ok(None); } @@ -547,8 +542,10 @@ impl BatchReader for ChunkStream { #[cfg(test)] mod tests { + use std::ops::Range; use std::sync::Arc; + use common_base::readable_size::ReadableSize; use common_test_util::temp_dir::create_temp_dir; use datatypes::arrow::array::{Array, UInt64Array, UInt8Array}; use datatypes::prelude::{ScalarVector, Vector}; @@ -653,6 +650,44 @@ mod tests { ); } + #[tokio::test] + async fn test_write_large_data() { + common_telemetry::init_default_ut_logging(); + let schema = memtable_tests::schema_for_test(); + let memtable = DefaultMemtableBuilder::default().build(schema); + + let mut rows_written = 0; + for i in 0..16 { + let range: Range = i * 1024..(i + 1) * 1024; + let keys = range.clone().collect::>(); + let values = range + .map(|idx| (Some(idx as u64), Some(idx as u64))) + .collect::>(); + memtable_tests::write_kvs(&*memtable, i as u64, OpType::Put, &keys, &values); + rows_written += keys.len(); + } + + let dir = create_temp_dir("write_large_parquet"); + let path = dir.path().to_str().unwrap(); + + let object_store = create_object_store(path); + let sst_file_name = "test-large.parquet"; + let iter = memtable.iter(IterContext::default()).unwrap(); + let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); + + let sst_info = writer + .write_sst(&sst::WriteOptions { + sst_write_buffer_size: ReadableSize::kb(4), + }) + .await + .unwrap() + .unwrap(); + let file_meta = object_store.stat(sst_file_name).await.unwrap(); + assert!(file_meta.is_file()); + assert_eq!(sst_info.file_size, file_meta.content_length()); + assert_eq!(rows_written, sst_info.num_rows); + } + #[tokio::test] async fn test_parquet_read_large_batch() { common_telemetry::init_default_ut_logging(); @@ -962,12 +997,12 @@ mod tests { let schema = memtable_tests::schema_for_test(); let memtable = DefaultMemtableBuilder::default().build(schema.clone()); - let dir = create_temp_dir("read-parquet-by-range"); + let dir = create_temp_dir("write-empty-file"); let path = dir.path().to_str().unwrap(); let mut builder = Fs::default(); builder.root(path); let object_store = ObjectStore::new(builder).unwrap().finish(); - let sst_file_name = "test-read.parquet"; + let sst_file_name = "test-empty.parquet"; let iter = memtable.iter(IterContext::default()).unwrap(); let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); @@ -975,8 +1010,9 @@ mod tests { .write_sst(&sst::WriteOptions::default()) .await .unwrap(); - assert!(sst_info_opt.is_none()); + // The file should not exist when no row has been written. + assert!(!object_store.is_exist(sst_file_name).await.unwrap()); } #[test] diff --git a/src/storage/src/sst/stream_writer.rs b/src/storage/src/sst/stream_writer.rs index f82c80204b67..6ae2aff150d0 100644 --- a/src/storage/src/sst/stream_writer.rs +++ b/src/storage/src/sst/stream_writer.rs @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::future::Future; +use std::pin::Pin; + use arrow_array::RecordBatch; -use common_datasource::buffered_writer::{ - BufferedWriter as DatasourceBufferedWriter, DefaultBufferedWriter, -}; +use common_datasource::buffered_writer::LazyBufferedWriter as DatasourceBufferedWriter; use common_datasource::share_buffer::SharedBuffer; use datatypes::schema::SchemaRef; use object_store::ObjectStore; @@ -23,10 +24,9 @@ use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; use parquet::format::FileMetaData; use snafu::ResultExt; -use tokio_util::compat::FuturesAsyncWriteCompatExt; use crate::error; -use crate::error::{NewRecordBatchSnafu, WriteObjectSnafu, WriteParquetSnafu}; +use crate::error::{NewRecordBatchSnafu, WriteParquetSnafu}; use crate::read::Batch; /// Parquet writer that buffers row groups in memory and writes buffered data to an underlying /// storage by chunks to reduce memory consumption. @@ -35,7 +35,20 @@ pub struct BufferedWriter { arrow_schema: arrow::datatypes::SchemaRef, } -type InnerBufferedWriter = DefaultBufferedWriter>; +type InnerBufferedWriter = DatasourceBufferedWriter< + object_store::Writer, + ArrowWriter, + Box< + dyn FnMut( + String, + ) -> Pin< + Box< + dyn Future> + + Send, + >, + > + Send, + >, +>; impl BufferedWriter { pub async fn try_new( @@ -47,22 +60,25 @@ impl BufferedWriter { ) -> error::Result { let arrow_schema = schema.arrow_schema(); let buffer = SharedBuffer::with_capacity(buffer_threshold); - let writer = store - .writer(&path) - .await - .context(WriteObjectSnafu { path: &path })?; let arrow_writer = ArrowWriter::try_new(buffer.clone(), arrow_schema.clone(), props) .context(WriteParquetSnafu)?; - let writer = writer.compat_write(); - Ok(Self { inner: DatasourceBufferedWriter::new( buffer_threshold, - buffer.clone(), + buffer, arrow_writer, - writer, + &path, + Box::new(move |path| { + let store = store.clone(); + Box::pin(async move { + store + .writer(&path) + .await + .context(common_datasource::error::WriteObjectSnafu { path }) + }) + }), ), arrow_schema: arrow_schema.clone(), }) @@ -92,14 +108,7 @@ impl BufferedWriter { Ok(()) } - /// Abort writer. - pub async fn abort(self) -> bool { - // TODO(hl): Currently we can do nothing if file's parts have been uploaded to remote storage - // on abortion, we need to find a way to abort the upload. see https://help.aliyun.com/document_detail/31996.htm?spm=a2c4g.11186623.0.0.3eb42cb7b2mwUz#reference-txp-bvx-wdb - !self.inner.flushed() - } - - /// Close parquet writer and ensure all buffered data are written into underlying storage. + /// Close parquet writer. pub async fn close(self) -> error::Result<(FileMetaData, u64)> { self.inner .close_with_arrow_writer()