From 27a6a0ebe5c695a49bb7e18249411af2a0c5ea4c Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 16 Mar 2024 13:47:23 +0900 Subject: [PATCH] test mrecordlog --- quickwit/Cargo.lock | 2 +- quickwit/Cargo.toml | 2 +- quickwit/quickwit-ingest/src/ingest_v2/fetch.rs | 9 ++++----- quickwit/quickwit-ingest/src/queue.rs | 2 +- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 551dec93f13..7ea27ffa83a 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -3977,7 +3977,7 @@ dependencies = [ [[package]] name = "mrecordlog" version = "0.4.0" -source = "git+https://github.com/quickwit-oss/mrecordlog?rev=187486f#187486fcde8dcfc4d570af4af19be852ab325cde" +source = "git+https://github.com/quickwit-oss/mrecordlog?rev=6a2fcab#6a2fcabba78436a31567d68dcb235210e1979b86" dependencies = [ "bytes", "crc32fast", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index edc7bb0e821..d13b052ee17 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -153,7 +153,7 @@ matches = "0.1.9" md5 = "0.7" mime_guess = "2.0.4" mockall = "0.11" -mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "187486f" } +mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "bdd759f9b0" } new_string_template = "1.4.0" nom = "7.1.3" num_cpus = "1" diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index 617fda9a93c..9eabf11c9df 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -17,13 +17,12 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::borrow::Borrow; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt; use std::sync::Arc; -use bytes::{BufMut, BytesMut}; +use bytes::{Buf, BufMut, BytesMut}; use futures::StreamExt; use mrecordlog::Record; use quickwit_common::retry::RetryParams; @@ -143,12 +142,12 @@ impl FetchStreamTask { break; }; for Record { payload, .. } in mrecords { - if mrecord_buffer.len() + payload.len() > mrecord_buffer.capacity() { + if mrecord_buffer.len() + payload.remaining() > mrecord_buffer.capacity() { has_drained_queue = false; break; } - mrecord_buffer.put(payload.borrow()); - mrecord_lengths.push(payload.len() as u32); + mrecord_lengths.push(payload.remaining() as u32); + mrecord_buffer.put(payload); } // Drop the lock while we send the message. drop(mrecordlog_guard); diff --git a/quickwit/quickwit-ingest/src/queue.rs b/quickwit/quickwit-ingest/src/queue.rs index 17bab541c9e..87d84883dd6 100644 --- a/quickwit/quickwit-ingest/src/queue.rs +++ b/quickwit/quickwit-ingest/src/queue.rs @@ -183,7 +183,7 @@ impl Queues { if first_key_opt.is_none() { first_key_opt = Some(position); } - num_bytes += doc_batch.command_from_buf(payload.as_ref()); + num_bytes += doc_batch.command_from_buf(payload); if num_bytes > size_limit { break; }