Skip to content

Commit

Permalink
Make the BatchSerializer behind Arc to avoid unnecessary struct creat…
Browse files Browse the repository at this point in the history
…ion (#8666)

* Make the BatchSerializer behind Arc

* Commenting

* Review

* Incorporate review suggestions

* Use old names

---------

Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
metesynnada and ozankabak authored Dec 29, 2023
1 parent 8ced56e commit b85a397
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 150 deletions.
69 changes: 27 additions & 42 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,9 @@
use std::any::Any;
use std::collections::HashSet;
use std::fmt;
use std::fmt::Debug;
use std::fmt::{self, Debug};
use std::sync::Arc;

use arrow_array::RecordBatch;
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};

use bytes::{Buf, Bytes};
use datafusion_physical_plan::metrics::MetricsSet;
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};

use super::write::orchestration::stateless_multipart_put;
use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
Expand All @@ -47,11 +35,20 @@ use crate::physical_plan::insert::{DataSink, FileSinkExec};
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};

use arrow::array::RecordBatch;
use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow::{self, datatypes::SchemaRef};
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::{Buf, Bytes};
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};

/// Character Separated Value `FileFormat` implementation.
#[derive(Debug)]
Expand Down Expand Up @@ -400,8 +397,6 @@ impl Default for CsvSerializer {
pub struct CsvSerializer {
// CSV writer builder
builder: WriterBuilder,
// Inner buffer for avoiding reallocation
buffer: Vec<u8>,
// Flag to indicate whether there will be a header
header: bool,
}
Expand All @@ -412,7 +407,6 @@ impl CsvSerializer {
Self {
builder: WriterBuilder::new(),
header: true,
buffer: Vec::with_capacity(4096),
}
}

Expand All @@ -431,21 +425,14 @@ impl CsvSerializer {

#[async_trait]
impl BatchSerializer for CsvSerializer {
async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes> {
async fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes> {
let mut buffer = Vec::with_capacity(4096);
let builder = self.builder.clone();
let mut writer = builder.with_header(self.header).build(&mut self.buffer);
let header = self.header && initial;
let mut writer = builder.with_header(header).build(&mut buffer);
writer.write(&batch)?;
drop(writer);
self.header = false;
Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
}

fn duplicate(&mut self) -> Result<Box<dyn BatchSerializer>> {
let new_self = CsvSerializer::new()
.with_builder(self.builder.clone())
.with_header(self.header);
self.header = false;
Ok(Box::new(new_self))
Ok(Bytes::from(buffer))
}
}

Expand Down Expand Up @@ -488,13 +475,11 @@ impl CsvSink {
let builder_clone = builder.clone();
let options_clone = writer_options.clone();
let get_serializer = move || {
let inner_clone = builder_clone.clone();
let serializer: Box<dyn BatchSerializer> = Box::new(
Arc::new(
CsvSerializer::new()
.with_builder(inner_clone)
.with_builder(builder_clone.clone())
.with_header(options_clone.writer_options.header()),
);
serializer
) as _
};

stateless_multipart_put(
Expand Down Expand Up @@ -541,15 +526,15 @@ mod tests {
use crate::physical_plan::collect;
use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext};
use crate::test_util::arrow_test_data;

use arrow::compute::concat_batches;
use bytes::Bytes;
use chrono::DateTime;
use datafusion_common::cast::as_string_array;
use datafusion_common::internal_err;
use datafusion_common::stats::Precision;
use datafusion_common::FileType;
use datafusion_common::GetExt;
use datafusion_common::{internal_err, FileType, GetExt};
use datafusion_expr::{col, lit};

use bytes::Bytes;
use chrono::DateTime;
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
Expand Down Expand Up @@ -836,8 +821,8 @@ mod tests {
.collect()
.await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
let mut serializer = CsvSerializer::new();
let bytes = serializer.serialize(batch).await?;
let serializer = CsvSerializer::new();
let bytes = serializer.serialize(batch, true).await?;
assert_eq!(
"c2,c3\n2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n",
String::from_utf8(bytes.into()).unwrap()
Expand All @@ -860,8 +845,8 @@ mod tests {
.collect()
.await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
let mut serializer = CsvSerializer::new().with_header(false);
let bytes = serializer.serialize(batch).await?;
let serializer = CsvSerializer::new().with_header(false);
let bytes = serializer.serialize(batch, true).await?;
assert_eq!(
"2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n",
String::from_utf8(bytes.into()).unwrap()
Expand Down
77 changes: 30 additions & 47 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,34 @@ use std::fmt::Debug;
use std::io::BufReader;
use std::sync::Arc;

use super::{FileFormat, FileScanConfig};
use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use arrow::json;
use arrow::json::reader::infer_json_schema_from_iterator;
use arrow::json::reader::ValueIter;
use arrow_array::RecordBatch;
use async_trait::async_trait;
use bytes::Buf;

use bytes::Bytes;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::PhysicalSortRequirement;
use datafusion_physical_plan::ExecutionPlan;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};

use crate::datasource::physical_plan::FileGroupDisplay;
use crate::physical_plan::insert::DataSink;
use crate::physical_plan::insert::FileSinkExec;
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};

use super::write::orchestration::stateless_multipart_put;

use super::{FileFormat, FileScanConfig};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::BatchSerializer;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::physical_plan::FileGroupDisplay;
use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::insert::{DataSink, FileSinkExec};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, SendableRecordBatchStream, Statistics,
};

use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use arrow::json;
use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
use arrow_array::RecordBatch;
use datafusion_common::{not_impl_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::metrics::MetricsSet;
use datafusion_physical_plan::ExecutionPlan;

use async_trait::async_trait;
use bytes::{Buf, Bytes};
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};

/// New line delimited JSON `FileFormat` implementation.
#[derive(Debug)]
Expand Down Expand Up @@ -201,31 +195,22 @@ impl Default for JsonSerializer {
}

/// Define a struct for serializing Json records to a stream
pub struct JsonSerializer {
// Inner buffer for avoiding reallocation
buffer: Vec<u8>,
}
pub struct JsonSerializer {}

impl JsonSerializer {
/// Constructor for the JsonSerializer object
pub fn new() -> Self {
Self {
buffer: Vec::with_capacity(4096),
}
Self {}
}
}

#[async_trait]
impl BatchSerializer for JsonSerializer {
async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes> {
let mut writer = json::LineDelimitedWriter::new(&mut self.buffer);
async fn serialize(&self, batch: RecordBatch, _initial: bool) -> Result<Bytes> {
let mut buffer = Vec::with_capacity(4096);
let mut writer = json::LineDelimitedWriter::new(&mut buffer);
writer.write(&batch)?;
//drop(writer);
Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
}

fn duplicate(&mut self) -> Result<Box<dyn BatchSerializer>> {
Ok(Box::new(JsonSerializer::new()))
Ok(Bytes::from(buffer))
}
}

Expand Down Expand Up @@ -272,10 +257,7 @@ impl JsonSink {
let writer_options = self.config.file_type_writer_options.try_into_json()?;
let compression = &writer_options.compression;

let get_serializer = move || {
let serializer: Box<dyn BatchSerializer> = Box::new(JsonSerializer::new());
serializer
};
let get_serializer = move || Arc::new(JsonSerializer::new()) as _;

stateless_multipart_put(
data,
Expand Down Expand Up @@ -312,16 +294,17 @@ impl DataSink for JsonSink {
#[cfg(test)]
mod tests {
use super::super::test_util::scan_format;
use datafusion_common::cast::as_int64_array;
use datafusion_common::stats::Precision;
use futures::StreamExt;
use object_store::local::LocalFileSystem;

use super::*;
use crate::physical_plan::collect;
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::object_store::local_unpartitioned_file;

use datafusion_common::cast::as_int64_array;
use datafusion_common::stats::Precision;

use futures::StreamExt;
use object_store::local::LocalFileSystem;

#[tokio::test]
async fn read_small_batches() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
Expand Down
16 changes: 4 additions & 12 deletions datafusion/core/src/datasource/file_format/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,16 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use crate::datasource::file_format::file_compression_type::FileCompressionType;

use crate::error::Result;

use arrow_array::RecordBatch;

use datafusion_common::DataFusionError;

use async_trait::async_trait;
use bytes::Bytes;

use futures::future::BoxFuture;
use object_store::path::Path;
use object_store::{MultipartId, ObjectStore};

use tokio::io::AsyncWrite;

pub(crate) mod demux;
Expand Down Expand Up @@ -149,15 +145,11 @@ impl<W: AsyncWrite + Unpin + Send> AsyncWrite for AbortableWrite<W> {

/// A trait that defines the methods required for a RecordBatch serializer.
#[async_trait]
pub trait BatchSerializer: Unpin + Send {
pub trait BatchSerializer: Sync + Send {
/// Asynchronously serializes a `RecordBatch` and returns the serialized bytes.
async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes>;
/// Duplicates self to support serializing multiple batches in parallel on multiple cores
fn duplicate(&mut self) -> Result<Box<dyn BatchSerializer>> {
Err(DataFusionError::NotImplemented(
"Parallel serialization is not implemented for this file type".into(),
))
}
/// Parameter `initial` signals whether the given batch is the first batch.
/// This distinction is important for certain serializers (like CSV).
async fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>;
}

/// Returns an [`AbortableWrite`] which writes to the given object store location
Expand Down
Loading

0 comments on commit b85a397

Please sign in to comment.