Skip to content

Commit

Permalink
fix: abort parquet writer (GreptimeTeam#1785)
Browse files Browse the repository at this point in the history
* fix: sst file size

* fix: avoid creating file when no row's been written

* chore: rename tests

* fix: some clippy issues

* fix: some cr comments
  • Loading branch information
v0y4g3r authored Jun 19, 2023
1 parent 69854c0 commit 960b842
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 80 deletions.
101 changes: 65 additions & 36 deletions src/common/datasource/src/buffered_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;

use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::parquet::format::FileMetaData;
use object_store::Writer;
use snafu::{OptionExt, ResultExt};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio_util::compat::Compat;

use crate::error::{self, Result};
use crate::share_buffer::SharedBuffer;

pub struct BufferedWriter<T, U> {
writer: T,
/// None stands for [`BufferedWriter`] closed.
pub struct LazyBufferedWriter<T, U, F> {
path: String,
writer_factory: F,
writer: Option<T>,
/// None stands for [`LazyBufferedWriter`] closed.
encoder: Option<U>,
buffer: SharedBuffer,
rows_written: usize,
bytes_written: u64,
flushed: bool,
threshold: usize,
}

Expand All @@ -42,58 +44,79 @@ pub trait ArrowWriterCloser {
async fn close(mut self) -> Result<FileMetaData>;
}

pub type DefaultBufferedWriter<E> = BufferedWriter<Compat<Writer>, E>;

impl<T: AsyncWrite + Send + Unpin, U: DfRecordBatchEncoder + ArrowWriterCloser>
BufferedWriter<T, U>
impl<
T: AsyncWrite + Send + Unpin,
U: DfRecordBatchEncoder + ArrowWriterCloser,
F: FnMut(String) -> Fut,
Fut: Future<Output = Result<T>>,
> LazyBufferedWriter<T, U, F>
{
/// Closes `LazyBufferedWriter` and optionally flushes all data to underlying storage
/// if any row's been written.
pub async fn close_with_arrow_writer(mut self) -> Result<(FileMetaData, u64)> {
let encoder = self
.encoder
.take()
.context(error::BufferedWriterClosedSnafu)?;
let metadata = encoder.close().await?;
let written = self.try_flush(true).await?;

// Use `rows_written` to keep a track of if any rows have been written.
// If no row's been written, then we can simply close the underlying
// writer without flush so that no file will be actually created.
if self.rows_written != 0 {
self.bytes_written += self.try_flush(true).await?;
}
// It's important to shut down! flushes all pending writes
self.close().await?;
Ok((metadata, written))
self.close_inner_writer().await?;
Ok((metadata, self.bytes_written))
}
}

impl<T: AsyncWrite + Send + Unpin, U: DfRecordBatchEncoder> BufferedWriter<T, U> {
pub async fn close(&mut self) -> Result<()> {
self.writer.shutdown().await.context(error::AsyncWriteSnafu)
impl<
T: AsyncWrite + Send + Unpin,
U: DfRecordBatchEncoder,
F: FnMut(String) -> Fut,
Fut: Future<Output = Result<T>>,
> LazyBufferedWriter<T, U, F>
{
/// Closes the writer without flushing the buffer data.
pub async fn close_inner_writer(&mut self) -> Result<()> {
if let Some(writer) = &mut self.writer {
writer.shutdown().await.context(error::AsyncWriteSnafu)?;
}
Ok(())
}

pub fn new(threshold: usize, buffer: SharedBuffer, encoder: U, writer: T) -> Self {
pub fn new(
threshold: usize,
buffer: SharedBuffer,
encoder: U,
path: impl AsRef<str>,
writer_factory: F,
) -> Self {
Self {
path: path.as_ref().to_string(),
threshold,
writer,
encoder: Some(encoder),
buffer,
rows_written: 0,
bytes_written: 0,
flushed: false,
writer_factory,
writer: None,
}
}

pub fn bytes_written(&self) -> u64 {
self.bytes_written
}

pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
let encoder = self
.encoder
.as_mut()
.context(error::BufferedWriterClosedSnafu)?;
encoder.write(batch)?;
self.try_flush(false).await?;
self.rows_written += batch.num_rows();
self.bytes_written += self.try_flush(false).await?;
Ok(())
}

pub fn flushed(&self) -> bool {
self.flushed
}

pub async fn try_flush(&mut self, all: bool) -> Result<u64> {
let mut bytes_written: u64 = 0;

Expand All @@ -106,7 +129,8 @@ impl<T: AsyncWrite + Send + Unpin, U: DfRecordBatchEncoder> BufferedWriter<T, U>
};
let size = chunk.len();

self.writer
self.maybe_init_writer()
.await?
.write_all(&chunk)
.await
.context(error::AsyncWriteSnafu)?;
Expand All @@ -117,22 +141,27 @@ impl<T: AsyncWrite + Send + Unpin, U: DfRecordBatchEncoder> BufferedWriter<T, U>
if all {
bytes_written += self.try_flush_all().await?;
}

self.flushed = bytes_written > 0;
self.bytes_written += bytes_written;

Ok(bytes_written)
}

/// Only initiates underlying file writer when rows have been written.
async fn maybe_init_writer(&mut self) -> Result<&mut T> {
if let Some(ref mut writer) = self.writer {
Ok(writer)
} else {
let writer = (self.writer_factory)(self.path.clone()).await?;
Ok(self.writer.insert(writer))
}
}

async fn try_flush_all(&mut self) -> Result<u64> {
let remain = self.buffer.buffer.lock().unwrap().split();
let size = remain.len();

self.writer
self.maybe_init_writer()
.await?
.write_all(&remain)
.await
.context(error::AsyncWriteSnafu)?;

Ok(size as u64)
}
}
19 changes: 8 additions & 11 deletions src/common/datasource/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt;
use object_store::ObjectStore;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncWriteCompatExt;

use self::csv::CsvFormat;
use self::json::JsonFormat;
use self::parquet::ParquetFormat;
use crate::buffered_writer::{BufferedWriter, DfRecordBatchEncoder};
use crate::buffered_writer::{DfRecordBatchEncoder, LazyBufferedWriter};
use crate::compression::CompressionType;
use crate::error::{self, Result};
use crate::share_buffer::SharedBuffer;
Expand Down Expand Up @@ -181,15 +180,14 @@ pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
threshold: usize,
encoder_factory: U,
) -> Result<usize> {
let writer = store
.writer(path)
.await
.context(error::WriteObjectSnafu { path })?
.compat_write();

let buffer = SharedBuffer::with_capacity(threshold);
let encoder = encoder_factory(buffer.clone());
let mut writer = BufferedWriter::new(threshold, buffer, encoder, writer);
let mut writer = LazyBufferedWriter::new(threshold, buffer, encoder, path, |path| async {
store
.writer(&path)
.await
.context(error::WriteObjectSnafu { path })
});

let mut rows = 0;

Expand All @@ -201,8 +199,7 @@ pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(

// Flushes all pending writes
writer.try_flush(true).await?;

writer.close().await?;
writer.close_inner_writer().await?;

Ok(rows)
}
58 changes: 47 additions & 11 deletions src/storage/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow_array::{
use async_compat::CompatExt;
use async_stream::try_stream;
use async_trait::async_trait;
use common_telemetry::{error, warn};
use common_telemetry::{debug, error};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
Expand Down Expand Up @@ -132,13 +132,8 @@ impl<'a> ParquetWriter<'a> {
}

if rows_written == 0 {
// if the source does not contain any batch, we skip writing an empty parquet file.
if !buffered_writer.abort().await {
warn!(
"Partial file {} has been uploaded to remote storage",
self.file_path
);
}
debug!("No data written, try abort writer: {}", self.file_path);
buffered_writer.close().await?;
return Ok(None);
}

Expand Down Expand Up @@ -547,8 +542,10 @@ impl BatchReader for ChunkStream {

#[cfg(test)]
mod tests {
use std::ops::Range;
use std::sync::Arc;

use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_temp_dir;
use datatypes::arrow::array::{Array, UInt64Array, UInt8Array};
use datatypes::prelude::{ScalarVector, Vector};
Expand Down Expand Up @@ -653,6 +650,44 @@ mod tests {
);
}

#[tokio::test]
async fn test_write_large_data() {
common_telemetry::init_default_ut_logging();
let schema = memtable_tests::schema_for_test();
let memtable = DefaultMemtableBuilder::default().build(schema);

let mut rows_written = 0;
for i in 0..16 {
let range: Range<i64> = i * 1024..(i + 1) * 1024;
let keys = range.clone().collect::<Vec<_>>();
let values = range
.map(|idx| (Some(idx as u64), Some(idx as u64)))
.collect::<Vec<_>>();
memtable_tests::write_kvs(&*memtable, i as u64, OpType::Put, &keys, &values);
rows_written += keys.len();
}

let dir = create_temp_dir("write_large_parquet");
let path = dir.path().to_str().unwrap();

let object_store = create_object_store(path);
let sst_file_name = "test-large.parquet";
let iter = memtable.iter(IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone());

let sst_info = writer
.write_sst(&sst::WriteOptions {
sst_write_buffer_size: ReadableSize::kb(4),
})
.await
.unwrap()
.unwrap();
let file_meta = object_store.stat(sst_file_name).await.unwrap();
assert!(file_meta.is_file());
assert_eq!(sst_info.file_size, file_meta.content_length());
assert_eq!(rows_written, sst_info.num_rows);
}

#[tokio::test]
async fn test_parquet_read_large_batch() {
common_telemetry::init_default_ut_logging();
Expand Down Expand Up @@ -962,21 +997,22 @@ mod tests {
let schema = memtable_tests::schema_for_test();
let memtable = DefaultMemtableBuilder::default().build(schema.clone());

let dir = create_temp_dir("read-parquet-by-range");
let dir = create_temp_dir("write-empty-file");
let path = dir.path().to_str().unwrap();
let mut builder = Fs::default();
builder.root(path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let sst_file_name = "test-read.parquet";
let sst_file_name = "test-empty.parquet";
let iter = memtable.iter(IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone());

let sst_info_opt = writer
.write_sst(&sst::WriteOptions::default())
.await
.unwrap();

assert!(sst_info_opt.is_none());
// The file should not exist when no row has been written.
assert!(!object_store.is_exist(sst_file_name).await.unwrap());
}

#[test]
Expand Down
Loading

0 comments on commit 960b842

Please sign in to comment.