diff --git a/Cargo.lock b/Cargo.lock index 21993b96d03f..77112f61ada5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5789,6 +5789,7 @@ version = "0.9.1" dependencies = [ "async-stream", "async-trait", + "bytes", "chrono", "common-base", "common-error", diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index e599e0334995..6a84965974eb 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -15,6 +15,7 @@ workspace = true [dependencies] async-stream.workspace = true async-trait.workspace = true +bytes.workspace = true chrono.workspace = true common-base.workspace = true common-error.workspace = true diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 222725d06ac7..5572a05dddfa 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -21,8 +21,6 @@ use serde_json::error::Error as JsonError; use snafu::{Location, Snafu}; use store_api::storage::RegionId; -use crate::kafka::producer::ProduceRequest; - #[derive(Snafu)] #[snafu(visibility(pub))] #[stack_trace_debug] @@ -268,12 +266,10 @@ pub enum Error { attempt_index: u64, }, - #[snafu(display("Failed to send produce request"))] - SendProduceRequest { + #[snafu(display("OrderedBatchProducer is stopped",))] + OrderedBatchProducerStopped { #[snafu(implicit)] location: Location, - #[snafu(source)] - error: tokio::sync::mpsc::error::SendError, }, #[snafu(display("Failed to send produce request"))] diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index 21c5a397c03d..dfbf2f36d2e8 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -13,12 +13,17 @@ // limitations under the License. pub(crate) mod client_manager; +// TODO(weny): remove it +#[allow(dead_code)] pub(crate) mod consumer; #[allow(unused)] pub(crate) mod index; pub mod log_store; pub(crate) mod producer; pub(crate) mod util; +// TODO(weny): remove it +#[allow(dead_code)] +pub(crate) mod worker; use serde::{Deserialize, Serialize}; use store_api::logstore::entry::Id as EntryId; diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 64523e6d0b08..3f4b4aecf75a 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -23,19 +23,16 @@ use snafu::ResultExt; use store_api::logstore::provider::KafkaProvider; use tokio::sync::{Mutex, RwLock}; -use super::producer::OrderedBatchProducer; use crate::error::{ BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu, }; -use crate::kafka::producer::OrderedBatchProducerRef; +use crate::kafka::index::{GlobalIndexCollector, NoopCollector}; +use crate::kafka::producer::{OrderedBatchProducer, OrderedBatchProducerRef}; // Each topic only has one partition for now. // The `DEFAULT_PARTITION` refers to the index of the partition. const DEFAULT_PARTITION: i32 = 0; -// Max batch size for a `OrderedBatchProducer` to handle requests. -const REQUEST_BATCH_SIZE: usize = 64; - /// Arc wrapper of ClientManager. pub(crate) type ClientManagerRef = Arc; @@ -63,9 +60,8 @@ pub(crate) struct ClientManager { /// Used to initialize a new [Client]. mutex: Mutex<()>, instances: RwLock, Client>>, + global_index_collector: Option, - producer_channel_size: usize, - producer_request_batch_size: usize, flush_batch_size: usize, compression: Compression, } @@ -99,10 +95,9 @@ impl ClientManager { client, mutex: Mutex::new(()), instances: RwLock::new(HashMap::new()), - producer_channel_size: REQUEST_BATCH_SIZE * 2, - producer_request_batch_size: REQUEST_BATCH_SIZE, flush_batch_size: config.max_batch_bytes.as_bytes() as usize, compression: Compression::Lz4, + global_index_collector: None, }) } @@ -151,12 +146,19 @@ impl ClientManager { }) .map(Arc::new)?; + let (tx, rx) = OrderedBatchProducer::channel(); + let index_collector = if let Some(global_collector) = self.global_index_collector.as_ref() { + global_collector.provider_level_index_collector(provider.clone(), tx.clone()) + } else { + Box::new(NoopCollector) + }; let producer = Arc::new(OrderedBatchProducer::new( + (tx, rx), + provider.clone(), client.clone(), self.compression, - self.producer_channel_size, - self.producer_request_batch_size, self.flush_batch_size, + index_collector, )); Ok(Client { client, producer }) diff --git a/src/log-store/src/kafka/index.rs b/src/log-store/src/kafka/index.rs index b0b4048516c4..1c646376165b 100644 --- a/src/log-store/src/kafka/index.rs +++ b/src/log-store/src/kafka/index.rs @@ -12,8 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod collector; mod iterator; +pub(crate) use collector::{ + GlobalIndexCollector, IndexCollector, IndexEncoder, NoopCollector, ProviderLevelIndexCollector, +}; pub(crate) use iterator::{ MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange, RegionWalVecIndex, diff --git a/src/log-store/src/kafka/index/collector.rs b/src/log-store/src/kafka/index/collector.rs new file mode 100644 index 000000000000..a8cb2546b6c6 --- /dev/null +++ b/src/log-store/src/kafka/index/collector.rs @@ -0,0 +1,149 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeSet, HashMap}; +use std::io::Write; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use bytes::buf::Writer; +use bytes::{BufMut, Bytes, BytesMut}; +use common_telemetry::tracing::error; +use futures::future::try_join_all; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use store_api::logstore::provider::KafkaProvider; +use store_api::logstore::EntryId; +use store_api::storage::RegionId; +use tokio::select; +use tokio::sync::mpsc::Sender; +use tokio::sync::Mutex as TokioMutex; + +use crate::error::{self, Result}; +use crate::kafka::worker::{DumpIndexRequest, WorkerRequest}; + +pub trait IndexEncoder: Send + Sync { + fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes); + + fn finish(&self) -> Result>; +} + +/// The [`IndexCollector`] trait defines the operations for managing and collecting index entries. +pub trait IndexCollector: Send + Sync { + /// Appends an [`EntryId`] for a specific region. + fn append(&mut self, region_id: RegionId, entry_id: EntryId); + + /// Truncates the index for a specific region up to a given [`EntryId`]. + /// + /// It removes all [`EntryId`]s smaller than `entry_id`. + fn truncate(&mut self, region_id: RegionId, entry_id: EntryId); + + /// Sets the latest [`EntryId`]. + fn set_latest_entry_id(&mut self, entry_id: EntryId); + + /// Dumps the index. + fn dump(&mut self, encoder: &dyn IndexEncoder); +} + +/// The [`GlobalIndexCollector`] struct is responsible for managing index entries +/// across multiple providers. +#[derive(Debug, Clone, Default)] +pub struct GlobalIndexCollector { + providers: Arc, Sender>>>, +} + +impl GlobalIndexCollector { + /// Creates a new [`ProviderLevelIndexCollector`] for a specified provider. + pub fn provider_level_index_collector( + &self, + provider: Arc, + sender: Sender, + ) -> Box { + Box::new(ProviderLevelIndexCollector { + indexes: Default::default(), + provider, + }) + } +} + +/// The [`RegionIndexes`] struct maintains indexes for a collection of regions. +/// Each region is identified by a `RegionId` and maps to a set of [`EntryId`]s, +/// representing the entries within that region. It also keeps track of the +/// latest [`EntryId`] across all regions. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct RegionIndexes { + regions: HashMap>, + latest_entry_id: EntryId, +} + +impl RegionIndexes { + fn append(&mut self, region_id: RegionId, entry_id: EntryId) { + self.regions.entry(region_id).or_default().insert(entry_id); + self.latest_entry_id = self.latest_entry_id.max(entry_id); + } + + fn truncate(&mut self, region_id: RegionId, entry_id: EntryId) { + if let Some(entry_ids) = self.regions.get_mut(®ion_id) { + *entry_ids = entry_ids.split_off(&entry_id); + // The `RegionIndexes` can be empty, keeps to track the latest entry id. + self.latest_entry_id = self.latest_entry_id.max(entry_id); + } + } + + fn set_latest_entry_id(&mut self, entry_id: EntryId) { + self.latest_entry_id = entry_id; + } +} + +/// The [`ProviderLevelIndexCollector`] struct is responsible for managing index entries +/// specific to a particular provider. +#[derive(Debug, Clone)] +pub struct ProviderLevelIndexCollector { + indexes: RegionIndexes, + provider: Arc, +} + +impl IndexCollector for ProviderLevelIndexCollector { + fn append(&mut self, region_id: RegionId, entry_id: EntryId) { + self.indexes.append(region_id, entry_id) + } + + fn truncate(&mut self, region_id: RegionId, entry_id: EntryId) { + self.indexes.truncate(region_id, entry_id) + } + + fn set_latest_entry_id(&mut self, entry_id: EntryId) { + self.indexes.set_latest_entry_id(entry_id); + } + + fn dump(&mut self, encoder: &dyn IndexEncoder) { + encoder.encode(&self.provider, &self.indexes) + } +} + +/// The [`NoopCollector`] struct implements the [`IndexCollector`] trait with no-op methods. +/// +/// This collector effectively ignores all operations, making it suitable for cases +/// where index collection is not required or should be disabled. +pub struct NoopCollector; + +impl IndexCollector for NoopCollector { + fn append(&mut self, _region_id: RegionId, _entry_id: EntryId) {} + + fn truncate(&mut self, _region_id: RegionId, _entry_id: EntryId) {} + + fn set_latest_entry_id(&mut self, _entry_id: EntryId) {} + + fn dump(&mut self, encoder: &dyn IndexEncoder) {} +} diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 23fb19461789..67bf4019f611 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -178,7 +178,8 @@ impl LogStore for KafkaLogStore { for (region_id, (producer, records)) in region_grouped_records { // Safety: `KafkaLogStore::entry` will ensure that the // `Record`'s `approximate_size` must be less or equal to `max_batch_bytes`. - region_grouped_result_receivers.push((region_id, producer.produce(records).await?)) + region_grouped_result_receivers + .push((region_id, producer.produce(region_id, records).await?)) } let region_grouped_max_offset = diff --git a/src/log-store/src/kafka/producer.rs b/src/log-store/src/kafka/producer.rs index aaa76834cbdb..80d214ed7838 100644 --- a/src/log-store/src/kafka/producer.rs +++ b/src/log-store/src/kafka/producer.rs @@ -12,230 +12,58 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use common_telemetry::{debug, warn}; -use futures::future::try_join_all; -use rskafka::client::partition::Compression; -use rskafka::client::producer::ProducerClient; +use common_telemetry::warn; +use rskafka::client::partition::{Compression, OffsetAt, PartitionClient}; use rskafka::record::Record; -use snafu::{OptionExt, ResultExt}; +use store_api::logstore::provider::KafkaProvider; +use store_api::storage::RegionId; use tokio::sync::mpsc::{self, Receiver, Sender}; -use tokio::sync::oneshot; -use crate::error::{self, NoMaxValueSnafu, Result}; +use crate::error::{self, Result}; +use crate::kafka::index::IndexCollector; +use crate::kafka::worker::{BackgroundProducerWorker, ProduceResultHandle, WorkerRequest}; -pub struct ProduceRequest { - batch: Vec, - sender: oneshot::Sender, -} - -#[derive(Default)] -struct ProduceResultReceiver { - receivers: Vec>>>, -} - -impl ProduceResultReceiver { - fn add_receiver(&mut self, receiver: oneshot::Receiver>>) { - self.receivers.push(receiver) - } - - async fn wait(self) -> Result { - Ok(try_join_all(self.receivers) - .await - .into_iter() - .flatten() - .collect::>>()? - .into_iter() - .flatten() - .max() - .context(NoMaxValueSnafu)? as u64) - } -} - -struct BackgroundProducerWorker { - /// The [`ProducerClient`]. - client: Arc, - // The compression configuration. - compression: Compression, - // The running flag. - running: Arc, - /// Receiver of [ProduceRequest]. - receiver: Receiver, - /// Max batch size for a worker to handle requests. - request_batch_size: usize, - /// Max bytes size for a single flush. - max_batch_bytes: usize, - /// The [PendingRequest]s. - pending_requests: Vec, -} - -struct PendingRequest { - batch: Vec, - size: usize, - sender: oneshot::Sender>>, -} - -/// ## Panic -/// Panic if any [Record]'s `approximate_size` > `max_batch_bytes`. -fn handle_produce_requests( - requests: &mut Vec, - max_batch_bytes: usize, -) -> Vec { - let mut records_buffer = vec![]; - let mut batch_size = 0; - let mut pending_requests = Vec::with_capacity(requests.len()); - - for ProduceRequest { batch, sender } in requests.drain(..) { - let mut receiver = ProduceResultReceiver::default(); - for record in batch { - assert!(record.approximate_size() <= max_batch_bytes); - // Yields the `PendingRequest` if buffer is full. - if batch_size + record.approximate_size() > max_batch_bytes { - let (tx, rx) = oneshot::channel(); - pending_requests.push(PendingRequest { - batch: std::mem::take(&mut records_buffer), - size: batch_size, - sender: tx, - }); - batch_size = 0; - receiver.add_receiver(rx); - } - - batch_size += record.approximate_size(); - records_buffer.push(record); - } - // The remaining records. - if batch_size > 0 { - // Yields `PendingRequest` - let (tx, rx) = oneshot::channel(); - pending_requests.push(PendingRequest { - batch: std::mem::take(&mut records_buffer), - size: batch_size, - sender: tx, - }); - batch_size = 0; - receiver.add_receiver(rx); - } - - let _ = sender.send(receiver); - } - pending_requests -} - -async fn do_flush( - client: &Arc, - PendingRequest { - batch, - sender, - size: _size, - }: PendingRequest, - compression: Compression, -) { - let result = client - .produce(batch, compression) - .await - .context(error::BatchProduceSnafu); - - if let Err(err) = sender.send(result) { - warn!(err; "BatchFlushState Receiver is dropped"); - } -} +pub type OrderedBatchProducerRef = Arc; -impl BackgroundProducerWorker { - async fn run(&mut self) { - let mut buffer = Vec::with_capacity(self.request_batch_size); - while self.running.load(Ordering::Relaxed) { - // Processes pending requests first. - if !self.pending_requests.is_empty() { - // TODO(weny): Considering merge `PendingRequest`s. - for req in self.pending_requests.drain(..) { - do_flush(&self.client, req, self.compression).await - } - } - match self.receiver.recv().await { - Some(req) => { - buffer.clear(); - buffer.push(req); - for _ in 1..self.request_batch_size { - match self.receiver.try_recv() { - Ok(req) => buffer.push(req), - Err(_) => break, - } - } - self.pending_requests = - handle_produce_requests(&mut buffer, self.max_batch_bytes); - } - None => { - debug!("The sender is dropped, BackgroundProducerWorker exited"); - // Exits the loop if the `sender` is dropped. - break; - } - } - } - } -} +// Max batch size for a `OrderedBatchProducer` to handle requests. +const REQUEST_BATCH_SIZE: usize = 64; -pub type OrderedBatchProducerRef = Arc; +// Producer channel size +const PRODUCER_CHANNEL_SIZE: usize = REQUEST_BATCH_SIZE * 2; /// [`OrderedBatchProducer`] attempts to aggregate multiple produce requests together #[derive(Debug)] pub(crate) struct OrderedBatchProducer { - sender: Sender, - /// Used to control the [`BackgroundProducerWorker`]. - running: Arc, -} - -impl Drop for OrderedBatchProducer { - fn drop(&mut self) { - self.running.store(false, Ordering::Relaxed); - } + pub(crate) sender: Sender, } -/// Receives the committed offsets when data has been committed to Kafka -/// or an unrecoverable error has been encountered. -pub(crate) struct ProduceResultHandle { - receiver: oneshot::Receiver, -} - -impl ProduceResultHandle { - /// Waits for the data has been committed to Kafka. - /// Returns the **max** committed offsets. - pub(crate) async fn wait(self) -> Result { - self.receiver - .await - .context(error::WaitProduceResultReceiverSnafu)? - .wait() - .await +impl OrderedBatchProducer { + pub(crate) fn channel() -> (Sender, Receiver) { + mpsc::channel(PRODUCER_CHANNEL_SIZE) } -} -impl OrderedBatchProducer { /// Constructs a new [`OrderedBatchProducer`]. pub(crate) fn new( + (tx, rx): (Sender, Receiver), + provider: Arc, client: Arc, compression: Compression, - channel_size: usize, - request_batch_size: usize, max_batch_bytes: usize, + index_collector: Box, ) -> Self { - let (tx, rx) = mpsc::channel(channel_size); - let running = Arc::new(AtomicBool::new(true)); let mut worker = BackgroundProducerWorker { + provider, client, compression, - running: running.clone(), receiver: rx, - request_batch_size, + request_batch_size: REQUEST_BATCH_SIZE, max_batch_bytes, - pending_requests: vec![], + index_collector, }; tokio::spawn(async move { worker.run().await }); - Self { - sender: tx, - running, - } + Self { sender: tx } } /// Writes `data` to the [`OrderedBatchProducer`]. @@ -245,17 +73,44 @@ impl OrderedBatchProducer { /// /// ## Panic /// Panic if any [Record]'s `approximate_size` > `max_batch_bytes`. - pub(crate) async fn produce(&self, batch: Vec) -> Result { - let receiver = { - let (tx, rx) = oneshot::channel(); - self.sender - .send(ProduceRequest { batch, sender: tx }) - .await - .context(error::SendProduceRequestSnafu)?; - rx - }; + pub(crate) async fn produce( + &self, + region_id: RegionId, + batch: Vec, + ) -> Result { + let (req, handle) = WorkerRequest::new_produce_request(region_id, batch); + if self.sender.send(req).await.is_err() { + warn!("OrderedBatchProducer is already exited"); + return error::OrderedBatchProducerStoppedSnafu {}.fail(); + } + + Ok(handle) + } +} - Ok(ProduceResultHandle { receiver }) +#[async_trait::async_trait] +pub trait ProducerClient: std::fmt::Debug + Send + Sync { + async fn produce( + &self, + records: Vec, + compression: Compression, + ) -> rskafka::client::error::Result>; + + async fn get_offset(&self, at: OffsetAt) -> rskafka::client::error::Result; +} + +#[async_trait::async_trait] +impl ProducerClient for PartitionClient { + async fn produce( + &self, + records: Vec, + compression: Compression, + ) -> rskafka::client::error::Result> { + self.produce(records, compression).await + } + + async fn get_offset(&self, at: OffsetAt) -> rskafka::client::error::Result { + self.get_offset(at).await } } @@ -267,15 +122,16 @@ mod tests { use chrono::{TimeZone, Utc}; use common_base::readable_size::ReadableSize; use common_telemetry::debug; - use futures::future::BoxFuture; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use rskafka::client::error::{Error as ClientError, RequestContext}; use rskafka::client::partition::Compression; - use rskafka::client::producer::ProducerClient; use rskafka::protocol::error::Error as ProtocolError; use rskafka::record::Record; + use store_api::storage::RegionId; + use super::*; + use crate::kafka::index::NoopCollector; use crate::kafka::producer::OrderedBatchProducer; #[derive(Debug)] @@ -286,38 +142,41 @@ mod tests { batch_sizes: Mutex>, } + #[async_trait::async_trait] impl ProducerClient for MockClient { - fn produce( + async fn produce( &self, records: Vec, _compression: Compression, - ) -> BoxFuture<'_, Result, ClientError>> { - Box::pin(async move { - tokio::time::sleep(self.delay).await; - - if let Some(e) = self.error { - return Err(ClientError::ServerError { - protocol_error: e, - error_message: None, - request: RequestContext::Partition("foo".into(), 1), - response: None, - is_virtual: false, - }); - } - - if let Some(p) = self.panic.as_ref() { - panic!("{}", p); - } - - let mut batch_sizes = self.batch_sizes.lock().unwrap(); - let offset_base = batch_sizes.iter().sum::(); - let offsets = (0..records.len()) - .map(|x| (x + offset_base) as i64) - .collect(); - batch_sizes.push(records.len()); - debug!("Return offsets: {offsets:?}"); - Ok(offsets) - }) + ) -> rskafka::client::error::Result> { + tokio::time::sleep(self.delay).await; + + if let Some(e) = self.error { + return Err(ClientError::ServerError { + protocol_error: e, + error_message: None, + request: RequestContext::Partition("foo".into(), 1), + response: None, + is_virtual: false, + }); + } + + if let Some(p) = self.panic.as_ref() { + panic!("{}", p); + } + + let mut batch_sizes = self.batch_sizes.lock().unwrap(); + let offset_base = batch_sizes.iter().sum::(); + let offsets = (0..records.len()) + .map(|x| (x + offset_base) as i64) + .collect(); + batch_sizes.push(records.len()); + debug!("Return offsets: {offsets:?}"); + Ok(offsets) + } + + async fn get_offset(&self, _at: OffsetAt) -> rskafka::client::error::Result { + todo!() } } @@ -341,18 +200,23 @@ mod tests { delay, batch_sizes: Default::default(), }); - + let provider = Arc::new(KafkaProvider::new(String::new())); let producer = OrderedBatchProducer::new( + OrderedBatchProducer::channel(), + provider, client.clone(), Compression::NoCompression, - 128, - 64, ReadableSize((record.approximate_size() * 2) as u64).as_bytes() as usize, + Box::new(NoopCollector), ); + let region_id = RegionId::new(1, 1); // Produces 3 records let handle = producer - .produce(vec![record.clone(), record.clone(), record.clone()]) + .produce( + region_id, + vec![record.clone(), record.clone(), record.clone()], + ) .await .unwrap(); assert_eq!(handle.wait().await.unwrap(), 2); @@ -360,14 +224,17 @@ mod tests { // Produces 2 records let handle = producer - .produce(vec![record.clone(), record.clone()]) + .produce(region_id, vec![record.clone(), record.clone()]) .await .unwrap(); assert_eq!(handle.wait().await.unwrap(), 4); assert_eq!(client.batch_sizes.lock().unwrap().as_slice(), &[2, 1, 2]); // Produces 1 records - let handle = producer.produce(vec![record.clone()]).await.unwrap(); + let handle = producer + .produce(region_id, vec![record.clone()]) + .await + .unwrap(); assert_eq!(handle.wait().await.unwrap(), 5); assert_eq!(client.batch_sizes.lock().unwrap().as_slice(), &[2, 1, 2, 1]); } @@ -381,31 +248,42 @@ mod tests { delay: Duration::from_millis(1), batch_sizes: Default::default(), }); - + let provider = Arc::new(KafkaProvider::new(String::new())); let producer = OrderedBatchProducer::new( + OrderedBatchProducer::channel(), + provider, client.clone(), Compression::NoCompression, - 128, - 64, ReadableSize((record.approximate_size() * 2) as u64).as_bytes() as usize, + Box::new(NoopCollector), ); + let region_id = RegionId::new(1, 1); let mut futures = FuturesUnordered::new(); futures.push( producer - .produce(vec![record.clone(), record.clone(), record.clone()]) + .produce( + region_id, + vec![record.clone(), record.clone(), record.clone()], + ) .await .unwrap() .wait(), ); futures.push( producer - .produce(vec![record.clone(), record.clone()]) + .produce(region_id, vec![record.clone(), record.clone()]) + .await + .unwrap() + .wait(), + ); + futures.push( + producer + .produce(region_id, vec![record.clone()]) .await .unwrap() .wait(), ); - futures.push(producer.produce(vec![record.clone()]).await.unwrap().wait()); futures.next().await.unwrap().unwrap_err(); futures.next().await.unwrap().unwrap_err(); @@ -422,22 +300,33 @@ mod tests { batch_sizes: Default::default(), }); + let provider = Arc::new(KafkaProvider::new(String::new())); let producer = OrderedBatchProducer::new( + OrderedBatchProducer::channel(), + provider, client.clone(), Compression::NoCompression, - 128, - 64, ReadableSize((record.approximate_size() * 2) as u64).as_bytes() as usize, + Box::new(NoopCollector), ); + let region_id = RegionId::new(1, 1); let a = producer - .produce(vec![record.clone(), record.clone(), record.clone()]) + .produce( + region_id, + vec![record.clone(), record.clone(), record.clone()], + ) .await .unwrap() .wait() .fuse(); - let b = producer.produce(vec![record]).await.unwrap().wait().fuse(); + let b = producer + .produce(region_id, vec![record]) + .await + .unwrap() + .wait() + .fuse(); let mut b = Box::pin(b); diff --git a/src/log-store/src/kafka/worker.rs b/src/log-store/src/kafka/worker.rs new file mode 100644 index 000000000000..972d56d6f1c4 --- /dev/null +++ b/src/log-store/src/kafka/worker.rs @@ -0,0 +1,205 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub(crate) mod checkpoint; +pub(crate) mod flush; +pub(crate) mod produce; + +use std::sync::Arc; + +use common_telemetry::debug; +use futures::future::try_join_all; +use rskafka::client::partition::Compression; +use rskafka::record::Record; +use snafu::{OptionExt, ResultExt}; +use store_api::logstore::provider::KafkaProvider; +use store_api::logstore::EntryId; +use store_api::storage::RegionId; +use tokio::sync::mpsc::Receiver; +use tokio::sync::oneshot::{self}; + +use super::index::IndexEncoder; +use crate::error::{self, NoMaxValueSnafu, Result}; +use crate::kafka::index::IndexCollector; +use crate::kafka::producer::ProducerClient; + +pub(crate) enum WorkerRequest { + Produce(ProduceRequest), + Checkpoint, + TruncateIndex(TruncateIndexRequest), + DumpIndex(DumpIndexRequest), +} + +impl WorkerRequest { + pub(crate) fn new_produce_request( + region_id: RegionId, + batch: Vec, + ) -> (WorkerRequest, ProduceResultHandle) { + let (tx, rx) = oneshot::channel(); + + ( + WorkerRequest::Produce(ProduceRequest { + region_id, + batch, + sender: tx, + }), + ProduceResultHandle { receiver: rx }, + ) + } +} + +pub(crate) struct DumpIndexRequest { + encoder: Arc, + sender: oneshot::Sender<()>, +} + +impl DumpIndexRequest { + pub fn new(encoder: Arc) -> (DumpIndexRequest, oneshot::Receiver<()>) { + let (tx, rx) = oneshot::channel(); + ( + DumpIndexRequest { + encoder, + sender: tx, + }, + rx, + ) + } +} + +pub(crate) struct TruncateIndexRequest { + region_id: RegionId, + entry_id: EntryId, +} + +pub(crate) struct ProduceRequest { + region_id: RegionId, + batch: Vec, + sender: oneshot::Sender, +} + +/// Receives the committed offsets when data has been committed to Kafka +/// or an unrecoverable error has been encountered. +pub(crate) struct ProduceResultHandle { + receiver: oneshot::Receiver, +} + +impl ProduceResultHandle { + /// Waits for the data has been committed to Kafka. + /// Returns the **max** committed offsets. + pub(crate) async fn wait(self) -> Result { + self.receiver + .await + .context(error::WaitProduceResultReceiverSnafu)? + .wait() + .await + } +} + +#[derive(Default)] +pub(crate) struct ProduceResultReceiver { + receivers: Vec>>>, +} + +impl ProduceResultReceiver { + fn add_receiver(&mut self, receiver: oneshot::Receiver>>) { + self.receivers.push(receiver) + } + + async fn wait(self) -> Result { + Ok(try_join_all(self.receivers) + .await + .into_iter() + .flatten() + .collect::>>()? + .into_iter() + .flatten() + .max() + .context(NoMaxValueSnafu)? as u64) + } +} + +pub(crate) struct PendingRequest { + batch: Vec, + region_ids: Vec, + size: usize, + sender: oneshot::Sender>>, +} + +pub(crate) struct BackgroundProducerWorker { + pub(crate) provider: Arc, + /// The [`ProducerClient`]. + pub(crate) client: Arc, + // The compression configuration. + pub(crate) compression: Compression, + /// Receiver of [ProduceRequest]. + pub(crate) receiver: Receiver, + /// Max batch size for a worker to handle requests. + pub(crate) request_batch_size: usize, + /// Max bytes size for a single flush. + pub(crate) max_batch_bytes: usize, + /// Collecting ids of WAL entries. + pub(crate) index_collector: Box, +} + +impl BackgroundProducerWorker { + pub(crate) async fn run(&mut self) { + let mut buffer = Vec::with_capacity(self.request_batch_size); + loop { + match self.receiver.recv().await { + Some(req) => { + buffer.clear(); + buffer.push(req); + for _ in 1..self.request_batch_size { + match self.receiver.try_recv() { + Ok(req) => buffer.push(req), + Err(_) => break, + } + } + self.handle_requests(&mut buffer).await; + } + None => { + debug!("The sender is dropped, BackgroundProducerWorker exited"); + // Exits the loop if the `sender` is dropped. + break; + } + } + } + } + + async fn handle_requests(&mut self, buffer: &mut Vec) { + let mut produce_requests = Vec::with_capacity(buffer.len()); + let mut do_checkpoint = false; + for req in buffer.drain(..) { + match req { + WorkerRequest::Produce(req) => produce_requests.push(req), + WorkerRequest::Checkpoint => do_checkpoint = true, + WorkerRequest::TruncateIndex(TruncateIndexRequest { + region_id, + entry_id, + }) => self.index_collector.truncate(region_id, entry_id), + WorkerRequest::DumpIndex(req) => { + self.index_collector.dump(req.encoder.as_ref()); + let _ = req.sender.send(()); + } + } + } + + let pending_requests = self.aggregate_records(&mut produce_requests, self.max_batch_bytes); + self.try_flush_pending_requests(pending_requests).await; + + if do_checkpoint { + self.do_checkpoint().await; + } + } +} diff --git a/src/log-store/src/kafka/worker/checkpoint.rs b/src/log-store/src/kafka/worker/checkpoint.rs new file mode 100644 index 000000000000..0ef6cb8627a7 --- /dev/null +++ b/src/log-store/src/kafka/worker/checkpoint.rs @@ -0,0 +1,35 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::error; +use rskafka::client::partition::OffsetAt; +use snafu::ResultExt; + +use crate::error; +use crate::kafka::worker::BackgroundProducerWorker; + +impl BackgroundProducerWorker { + pub(crate) async fn do_checkpoint(&mut self) { + match self + .client + .get_offset(OffsetAt::Latest) + .await + .context(error::GetOffsetSnafu { + topic: &self.provider.topic, + }) { + Ok(offset) => self.index_collector.set_latest_entry_id(offset as u64), + Err(err) => error!(err; "Failed to do checkpoint"), + } + } +} diff --git a/src/log-store/src/kafka/worker/flush.rs b/src/log-store/src/kafka/worker/flush.rs new file mode 100644 index 000000000000..427a528a14db --- /dev/null +++ b/src/log-store/src/kafka/worker/flush.rs @@ -0,0 +1,57 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::warn; +use snafu::ResultExt; + +use crate::error; +use crate::kafka::worker::{BackgroundProducerWorker, PendingRequest}; + +impl BackgroundProducerWorker { + async fn do_flush( + &mut self, + PendingRequest { + batch, + region_ids, + sender, + size: _size, + }: PendingRequest, + ) { + let result = self + .client + .produce(batch, self.compression) + .await + .context(error::BatchProduceSnafu); + + if let Ok(result) = &result { + for (idx, region_id) in result.iter().zip(region_ids) { + self.index_collector.append(region_id, *idx as u64); + } + } + + if let Err(err) = sender.send(result) { + warn!(err; "BatchFlushState Receiver is dropped"); + } + } + + pub(crate) async fn try_flush_pending_requests( + &mut self, + pending_requests: Vec, + ) { + // TODO(weny): Considering merge `PendingRequest`s. + for req in pending_requests { + self.do_flush(req).await + } + } +} diff --git a/src/log-store/src/kafka/worker/produce.rs b/src/log-store/src/kafka/worker/produce.rs new file mode 100644 index 000000000000..465d19a57d86 --- /dev/null +++ b/src/log-store/src/kafka/worker/produce.rs @@ -0,0 +1,83 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::tracing::warn; +use tokio::sync::oneshot; + +use crate::kafka::worker::{ + BackgroundProducerWorker, PendingRequest, ProduceRequest, ProduceResultReceiver, +}; + +impl BackgroundProducerWorker { + /// Aggregates records into batches, ensuring that the size of each batch does not exceed a specified maximum (`max_batch_bytes`). + /// + /// ## Panic + /// Panic if any [Record]'s `approximate_size` > `max_batch_bytes`. + pub(crate) fn aggregate_records( + &self, + requests: &mut Vec, + max_batch_bytes: usize, + ) -> Vec { + let mut records_buffer = vec![]; + let mut region_ids = vec![]; + let mut batch_size = 0; + let mut pending_requests = Vec::with_capacity(requests.len()); + + for ProduceRequest { + batch, + sender, + region_id, + } in std::mem::take(requests) + { + let mut receiver = ProduceResultReceiver::default(); + for record in batch { + assert!(record.approximate_size() <= max_batch_bytes); + // Yields the `PendingRequest` if buffer is full. + if batch_size + record.approximate_size() > max_batch_bytes { + let (tx, rx) = oneshot::channel(); + pending_requests.push(PendingRequest { + batch: std::mem::take(&mut records_buffer), + region_ids: std::mem::take(&mut region_ids), + size: batch_size, + sender: tx, + }); + batch_size = 0; + receiver.add_receiver(rx); + } + + batch_size += record.approximate_size(); + records_buffer.push(record); + region_ids.push(region_id); + } + // The remaining records. + if batch_size > 0 { + // Yields `PendingRequest` + let (tx, rx) = oneshot::channel(); + pending_requests.push(PendingRequest { + batch: std::mem::take(&mut records_buffer), + region_ids: std::mem::take(&mut region_ids), + size: batch_size, + sender: tx, + }); + batch_size = 0; + receiver.add_receiver(rx); + } + + if sender.send(receiver).is_err() { + warn!("The Receiver of ProduceResultReceiver is dropped"); + } + } + pending_requests + } +}