Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The get_array_memory_size() get wrong result(with different compression method) after deconde record from ipc format #6363

Open
haohuaijin opened this issue Sep 6, 2024 · 5 comments
Labels

Comments

@haohuaijin
Copy link
Contributor

haohuaijin commented Sep 6, 2024

Describe the bug

We use the ipc format to transfer recordbatch between nodes, and then we find that using lz4 compression or no compression will cause the result returned by the get_array_memory_size() method of the recordbatch after transmission to be particularly large. And use zstd compression the result is smaller.

To Reproduce

check this repo https://github.com/haohuaijin/ipc-bug, or the code is below, the arrow version is 52.2.0

use std::sync::Arc;

use arrow::array::{ArrayRef, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};

use arrow::ipc::{
    writer::{FileWriter, IpcWriteOptions},
    CompressionType,
};
use std::io::Cursor;

fn main() {
    let batch = generate_record_batch(100, 100);
    println!(
        "num_rows: {}, original batch size: {:?}",
        batch.num_rows(),
        batch.get_array_memory_size(),
    );

    let ipc_options = IpcWriteOptions::default();
    let ipc_options = ipc_options
        .try_with_compression(Some(CompressionType::LZ4_FRAME))
        .unwrap();
    let buf = Vec::new();
    let mut writer = FileWriter::try_new_with_options(buf, &batch.schema(), ipc_options).unwrap();

    writer.write(&batch).unwrap();
    let hits_buf = writer.into_inner().unwrap();

    println!("ipc buffer size: {:?}", hits_buf.len());

    // read from ipc
    let buf = Cursor::new(hits_buf);
    let reader = arrow::ipc::reader::FileReader::try_new(buf, None).unwrap();
    let batches = reader.into_iter().map(|r| r.unwrap()).collect::<Vec<_>>();

    println!(
        "num rows: {}, after decode batch size: {}, batches len: {}",
        batches.iter().map(|b| b.num_rows()).sum::<usize>(),
        batches
            .iter()
            .map(|b| b.get_array_memory_size())
            .sum::<usize>(),
        batches.len()
    );
}

pub fn generate_record_batch(columns: usize, rows: usize) -> RecordBatch {
    let mut fields = Vec::with_capacity(columns);
    for i in 0..columns {
        fields.push(Field::new(&format!("column_{}", i), DataType::Utf8, false));
    }
    let schema = Arc::new(Schema::new(fields));

    let mut arrays: Vec<ArrayRef> = Vec::with_capacity(columns);
    for i in 0..columns {
        let column_data: Vec<String> = (0..rows).map(|j| format!("row_{}_col_{}", j, i)).collect();
        let array = StringArray::from(column_data);
        arrays.push(Arc::new(array) as ArrayRef);
    }
    RecordBatch::try_new(schema, arrays).unwrap()
}

the output with LZ4_FRAME

num_rows: 100, original batch size: 261600 <-- (255KB)
ipc buffer size: 112446
num rows: 100, after decode batch size: 10392800, batches len: 1 <-- (10MB)

the output with ZSTD

num_rows: 100, original batch size: 261600
ipc buffer size: 73406
num rows: 100, after decode batch size: 180400, batches len: 1 <-- (176KB)

the output whtiout compression

num_rows: 100, original batch size: 261600
ipc buffer size: 200766
num rows: 100, after decode batch size: 38181600, batches len: 1 <-- (36MB)

Expected behavior

decoded recordbatch size should be similar regardless what compression type was used during encoding.

Additional context

@haohuaijin haohuaijin added the bug label Sep 6, 2024
@haohuaijin
Copy link
Contributor Author

haohuaijin commented Sep 10, 2024

Hi @alamb, the use case is that we use DataFusion for distributed search and Arrow Flight for data transmission. When dealing with large tables (e.g., 1000 columns), DataFusion reports "resource exhausted" errors, even with a small amount of data(e.g., a few MB). After investigation, we found that the issue stems from DataFusion's use of get_array_memory_size() to calculate the size of a RecordBatch. When using LZ4 compression or no compression, the size returned by get_array_memory_size() after decoding is significantly large, leading to the "resource exhausted" errors.
you can check the above code. When changing RecordBatch to 1000 columns and the LZ4 method, the decode data size is almost 1GB(return by get_array_memory_size), and the original data size is only 2.4 MB. (note: arrow-flight use ipc to encode and decode)

@alamb
Copy link
Contributor

alamb commented Sep 11, 2024

Hi @haohuaijin -- the symptoms you describe certainly sounds non ideal

I wonder if the problem is with the reporting of memory used (get_array_memory_size) or if that much memory is really being allocated by the IPC decoder.

For example, if something in the IPC decoder was using 1MB buffers by default, then 1000 columns * 1MB buffers would result in 1GB of memory used.

So I agree this sounds like a bug, but my guess is that it is a real memory allocation issue rather than a reporting issue (but I haven't confirmed this)

Perhaps you can use something like https://github.com/KDE/heaptrack to track the actual allocations

@haohuaijin
Copy link
Contributor Author

Sorry for the delay, @alamb. Recently, I have had some travels.

I set the paraments of generate_record_batch to (1000, 100) and use cargo build --release to compile https://github.com/haohuaijin/ipc-bug and run heaptrack ./target/release/ipc-bug to get the data.

heaptrack output will be written to "/home/hhj/ipc-bug/heaptrack.ipc-bug.1013097.zst"
/usr/lib/heaptrack/libheaptrack_preload.so
starting application, this might take some time...
num_rows: 100, original batch size: 2616000
ipc buffer size: 1120446
num rows: 100, after decode batch size: 1025627000, batches len: 1
heaptrack stats:
        allocations:            137148
        leaked allocations:     0
        temporary allocations:  3034
Heaptrack finished! Now run the following to investigate the data:

  heaptrack --analyze "/home/hhj/ipc-bug/heaptrack.ipc-bug.1013097.zst"

Then, I used heaptrack to visualize the data and get the following pictures.
image
image
image
image
the file in https://github.com/haohuaijin/ipc-bug/blob/main/heaptrack.ipc-bug.1013097.zst.

@haohuaijin
Copy link
Contributor Author

Hi @alamb, I think I found the reason after reading the code.

While decoding the RecordBatch, we first construct an ArrayReader. Then, we call the next_buffer method in ArrayReader to get the Buffer needed to construct each Array. In next_buffer, we use slice_with_length to obtain a slice of the total Buffer.

The code shows that the data in Buffer is shared among all Buffer instances. When get_array_memory_size() is called on a RecordBatch, it calls get_array_memory_size() for each Array. Each Array then calls capacity() on each Buffer. Since capacity() refers to the total capacity of the data(the size of RecordBatch) rather than the capacity used by this Array, this has caused get_array_memory_size() to be very large(especially when there are many fields).

let mut reader = ArrayReader {
dictionaries_by_id,
compression,
version: *metadata,
data: buf,
nodes: field_nodes.iter(),
buffers: buffers.iter(),
};
fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
read_buffer(self.buffers.next().unwrap(), self.data, self.compression)
}
fn read_buffer(
buf: &crate::Buffer,
a_data: &Buffer,
compression_codec: Option<CompressionCodec>,
) -> Result<Buffer, ArrowError> {
let start_offset = buf.offset() as usize;
let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
// corner case: empty buffer
match (buf_data.is_empty(), compression_codec) {
(true, _) | (_, None) => Ok(buf_data),
(false, Some(decompressor)) => decompressor.decompress_to_buffer(&buf_data),
}
}
pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
assert!(
offset.saturating_add(length) <= self.length,
"the offset of the new Buffer cannot exceed the existing length: slice offset={offset} length={length} selflen={}",
self.length
);
// Safety:
// offset + length <= self.length
let ptr = unsafe { self.ptr.add(offset) };
Self {
data: self.data.clone(),
ptr,
length,
}
}
pub fn capacity(&self) -> usize {
self.data.capacity()
}

@alamb
Copy link
Contributor

alamb commented Sep 24, 2024

See related discussion: #6439

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants