Skip to content

Commit

Permalink
replace get_array_memory_size with get_slice_memory_size
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
  • Loading branch information
zenghua committed Sep 6, 2024
1 parent 3800166 commit a49f7b5
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class NativeOptions {
public static final ConfigOption<String> MEM_LIMIT =
key("lakesoul.native_writer.mem_limit")
.stringType()
.defaultValue(String.valueOf(10 * 1024 * 1024))
.defaultValue(String.valueOf(50 * 1024 * 1024))
.withDescription("Option to set memory limit of native writer");

public static final ConfigOption<String> HASH_BUCKET_ID =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
public class SinkMemoryLeakTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointInterval(60_000);
TableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
tableEnvironment.executeSql("create catalog lakesoul with ('type'='lakesoul')").await();
tableEnvironment.executeSql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class NativeIOWriter extends NativeIOBase implements AutoCloseable {
public NativeIOWriter(Schema schema) {
super("NativeWriter");
setSchema(schema);
// setOption("mem_limit", String.valueOf(1024 * 1024 * 100));
}


Expand Down Expand Up @@ -101,6 +100,7 @@ public int writeIpc(byte[] encodedBatch) throws IOException {
}

public void write(VectorSchemaRoot batch) throws IOException {
System.out.println("writing batch: " + batch.getRowCount());
ArrowArray array = ArrowArray.allocateNew(allocator);
ArrowSchema schema = ArrowSchema.allocateNew(allocator);
Data.exportVectorSchemaRoot(allocator, batch, provider, array, schema);
Expand Down
4 changes: 2 additions & 2 deletions rust/lakesoul-io/src/async_writer/multipart_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
use url::Url;

use crate::{
constant::TBD_PARTITION_DESC, lakesoul_io_config::{create_session_context, LakeSoulIOConfig}, transform::{uniform_record_batch, uniform_schema}
constant::TBD_PARTITION_DESC, helpers::get_batch_memory_size, lakesoul_io_config::{create_session_context, LakeSoulIOConfig}, transform::{uniform_record_batch, uniform_schema}
};

use super::{AsyncBatchWriter, WriterFlushResult, InMemBuf};
Expand Down Expand Up @@ -173,7 +173,7 @@ impl AsyncBatchWriter for MultiPartAsyncWriter {
async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<()> {
let batch = uniform_record_batch(batch)?;
self.num_rows += batch.num_rows() as u64;
self.buffered_size += batch.get_array_memory_size() as u64;
self.buffered_size += get_batch_memory_size(&batch)? as u64;
MultiPartAsyncWriter::write_batch(batch, &mut self.arrow_writer, &mut self.in_mem_buf, &mut self.writer).await
}

Expand Down
27 changes: 2 additions & 25 deletions rust/lakesoul-io/src/async_writer/partitioning_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tokio_stream::StreamExt;
use tracing::debug;

use crate::{
helpers::{columnar_values_to_partition_desc, columnar_values_to_sub_path, get_columnar_values},
helpers::{columnar_values_to_partition_desc, columnar_values_to_sub_path, get_batch_memory_size, get_columnar_values},
lakesoul_io_config::{create_session_context, LakeSoulIOConfig, LakeSoulIOConfigBuilder},
repartition::RepartitionByRangeAndHashExec,
};
Expand Down Expand Up @@ -300,30 +300,7 @@ impl PartitioningAsyncWriter {
Err(e) => return Err(DataFusionError::Execution(format!("{}", e))),
}
}
// dbg!(&flatten_results);
Ok(flatten_results)
// let mut map: HashMap<String, Vec<FlushResult>> = HashMap::new();
// let results = futures::future::join_all(join_handles).await;
// for result in results {
// match result {
// Ok(Ok(partitioned_flush_result)) => {
// todo!()
// }
// Ok(Err(e)) => return Err(DataFusionError::External(Box::new(e))),
// Err(e) => return Err(DataFusionError::External(Box::new(e))),
// }
// }
// Ok(map)
// let partitioned_file_path_and_row_count = partitioned_file_path_and_row_count.lock().await;

// let mut summary = format!("{}", partitioned_file_path_and_row_count.len());
// for (partition_desc, (files, _)) in partitioned_file_path_and_row_count.iter() {
// summary += "\x01";
// summary += partition_desc.as_str();
// summary += "\x02";
// summary += files.join("\x02").as_str();
// }
// Ok(summary.into_bytes())
}
}

Expand All @@ -338,7 +315,7 @@ impl AsyncBatchWriter for PartitioningAsyncWriter {
)));
}

let memory_size = batch.get_array_memory_size() as u64;
let memory_size = get_batch_memory_size(&batch)? as u64;
let send_result = self.sorter_sender.send(Ok(batch)).await;
self.buffered_size += memory_size;
match send_result {
Expand Down
4 changes: 2 additions & 2 deletions rust/lakesoul-io/src/async_writer/sort_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use datafusion_common::{DataFusionError, Result};
use tokio::{sync::mpsc::Sender, task::JoinHandle};
use tokio_stream::StreamExt;

use crate::lakesoul_io_config::LakeSoulIOConfig;
use crate::{helpers::get_batch_memory_size, lakesoul_io_config::LakeSoulIOConfig};

use super::{AsyncBatchWriter, WriterFlushResult, MultiPartAsyncWriter, ReceiverStreamExec};

Expand Down Expand Up @@ -141,7 +141,7 @@ impl AsyncBatchWriter for SortAsyncWriter {
)));
}

let memory_size = batch.get_array_memory_size() as u64;
let memory_size = get_batch_memory_size(&batch)? as u64;
let send_result = self.sorter_sender.send(Ok(batch)).await;
self.buffered_size += memory_size;

Expand Down
19 changes: 16 additions & 3 deletions rust/lakesoul-io/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{collections::HashMap, sync::Arc};
use arrow::datatypes::UInt32Type;
use arrow_array::{RecordBatch, UInt32Array};
use arrow_buffer::i256;
use arrow_schema::{DataType, Field, Schema, SchemaBuilder, SchemaRef, TimeUnit};
use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaBuilder, SchemaRef, TimeUnit};
use chrono::{DateTime, Duration};
use datafusion::{
datasource::{
Expand All @@ -32,7 +32,9 @@ use url::Url;

use crate::{
constant::{
DATE32_FORMAT, FLINK_TIMESTAMP_FORMAT, LAKESOUL_COMMA, LAKESOUL_EMPTY_STRING, LAKESOUL_EQ, LAKESOUL_NULL_STRING, TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT, TIMESTAMP_SECOND_FORMAT
DATE32_FORMAT, FLINK_TIMESTAMP_FORMAT, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING,
TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT,
TIMESTAMP_SECOND_FORMAT, LAKESOUL_COMMA, LAKESOUL_EQ
},
filter::parser::Parser,
lakesoul_io_config::LakeSoulIOConfig,
Expand Down Expand Up @@ -515,4 +517,15 @@ pub fn column_with_name_and_name2index<'a>(schema: &'a SchemaRef, name: &str, na
} else {
schema.column_with_name(name)
}
}
}

pub fn get_batch_memory_size(batch: &RecordBatch) -> Result<usize> {
Ok(
batch.columns()
.iter()
.map(|array| array.to_data().get_slice_memory_size())
.collect::<std::result::Result<Vec<usize>, ArrowError>>()?
.into_iter()
.sum()
)
}
4 changes: 2 additions & 2 deletions rust/lakesoul-io/src/lakesoul_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio::sync::Mutex;
use tracing::debug;

use crate::async_writer::{AsyncBatchWriter, MultiPartAsyncWriter, PartitioningAsyncWriter, SortAsyncWriter};
use crate::helpers::get_batch_memory_size;
use crate::lakesoul_io_config::{IOSchema, LakeSoulIOConfig};
use crate::transform::uniform_schema;

Expand Down Expand Up @@ -147,10 +148,9 @@ impl SyncSendableMutableLakeSoulWriter {
};
let mut guard = in_progress_writer.lock().await;

let batch_memory_size = record_batch.get_array_memory_size() as u64;
let batch_memory_size = get_batch_memory_size(&record_batch)? as u64;
let batch_rows = record_batch.num_rows() as u64;
// If would exceed max_file_size, split batch

if !do_spill && guard.buffered_size() + batch_memory_size > max_file_size {
let to_write = (batch_rows * (max_file_size - guard.buffered_size())) / batch_memory_size;
if to_write + 1 < batch_rows {
Expand Down

0 comments on commit a49f7b5

Please sign in to comment.