Skip to content

Commit

Permalink
feat: Implement the Buf to avoid extra memory allocation (GreptimeTea…
Browse files Browse the repository at this point in the history
…m#4585)

* feat: Implement the Buf to avoid extra memory allocation

* fmt toml

* fmt code

* mv entry.into_buffer to raw_entry_buffer

* less reuse opendal

* remove todo GreptimeTeam#4065

* Update src/mito2/src/wal/entry_reader.rs

Co-authored-by: Weny Xu <[email protected]>

* fmt code

---------

Co-authored-by: ozewr <[email protected]>
Co-authored-by: Weny Xu <[email protected]>
  • Loading branch information
3 people authored and CookiePieWw committed Sep 17, 2024
1 parent 207f934 commit 9c3a92a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
16 changes: 12 additions & 4 deletions src/mito2/src/wal/entry_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use api::v1::WalEntry;
use async_stream::stream;
use futures::StreamExt;
use object_store::Buffer;
use prost::Message;
use snafu::{ensure, ResultExt};
use store_api::logstore::entry::Entry;
Expand All @@ -28,13 +29,20 @@ pub(crate) fn decode_raw_entry(raw_entry: Entry) -> Result<(EntryId, WalEntry)>
let entry_id = raw_entry.entry_id();
let region_id = raw_entry.region_id();
ensure!(raw_entry.is_complete(), CorruptedEntrySnafu { region_id });
// TODO(weny): implement the [Buf] for return value, avoid extra memory allocation.
let bytes = raw_entry.into_bytes();
let wal_entry = WalEntry::decode(bytes.as_slice()).context(DecodeWalSnafu { region_id })?;

let buffer = into_buffer(raw_entry);
let wal_entry = WalEntry::decode(buffer).context(DecodeWalSnafu { region_id })?;
Ok((entry_id, wal_entry))
}

fn into_buffer(raw_entry: Entry) -> Buffer {
match raw_entry {
Entry::Naive(entry) => Buffer::from(entry.data),
Entry::MultiplePart(entry) => {
Buffer::from_iter(entry.parts.into_iter().map(bytes::Bytes::from))
}
}
}

/// [WalEntryReader] provides the ability to read and decode entries from the underlying store.
///
/// Notes: It will consume the inner stream and only allow invoking the `read` at once.
Expand Down
2 changes: 1 addition & 1 deletion src/object-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

pub use opendal::raw::{normalize_path as raw_normalize_path, Access, HttpClient};
pub use opendal::{
services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind,
services, Buffer, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind,
FuturesAsyncReader, FuturesAsyncWriter, Lister, Metakey, Operator as ObjectStore, Reader,
Result, Writer,
};
Expand Down

0 comments on commit 9c3a92a

Please sign in to comment.