Skip to content

Commit

Permalink
refactor(log_store): remove associated type Namespace and Entry i…
Browse files Browse the repository at this point in the history
…n `LogStore` (#4038)

* refactor(log_store): remove associated type `Namespace` in `LogStore`

* fix(test): filter entries

* refactor: ignore incomplete parts

* refactor: simplify `RawEntryReaderFilter`

* chore: avoid cloning

* test: add tests for `maybe_emit_entry`

* refactor: remove `Namespace` trait and rename `LogStoreNamespace` to `Namespace`

* chore: apply suggestions from CR

* refactor: refine `entry` method signature

* feat: ingore  any potential incomplete parts

* refactor: rename `namespace` to `provider`

* chore: add debug assertion

* refactor: associated type `Entry` in `LogStore`

* refactor: renamse `namespace` to `provider`

* refactor: remove unwrap

* refactor: let `remaining_entries` return a optional vector

* test: add basic tests for kafka logstore

* refactor: move `append` method under `cfg(test)`

* refactor: rename `RawEntry` to `Entry`

* refactor: rename `CorruptedLogEntry` to `CorruptedEntry`

* test: add tests for handling corrupted raw entry stream

* refactor: rename `ns` to `provider`

* refactor: remove `entry_stream.rs` file

* chore: remove unused code

* chore: update comments

* chore: apply suggestions from CR

* chore: update comments

* chore: apply suggestions from CR

* chore: remove Deref

* chore: add comments

* fix: ignores tail corrupted data

* chore: add comments

* fix: add `MIN_BATCH_SIZE` limit
  • Loading branch information
WenyXu authored May 29, 2024
1 parent 848bd7e commit 6e9a9dc
Show file tree
Hide file tree
Showing 27 changed files with 1,491 additions and 1,622 deletions.
13 changes: 9 additions & 4 deletions benchmarks/src/wal_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use rand::distributions::{Alphanumeric, DistString, Uniform};
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
use serde::{Deserialize, Serialize};
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::storage::RegionId;

Expand Down Expand Up @@ -210,7 +211,7 @@ impl From<Args> for Config {
pub struct Region {
id: RegionId,
schema: Vec<ColumnSchema>,
wal_options: WalOptions,
provider: Provider,
next_sequence: AtomicU64,
next_entry_id: AtomicU64,
next_timestamp: AtomicI64,
Expand All @@ -227,10 +228,14 @@ impl Region {
num_rows: u32,
rng_seed: u64,
) -> Self {
let provider = match wal_options {
WalOptions::RaftEngine => Provider::raft_engine_provider(id.as_u64()),
WalOptions::Kafka(opts) => Provider::kafka_provider(opts.topic),
};
Self {
id,
schema,
wal_options,
provider,
next_sequence: AtomicU64::new(1),
next_entry_id: AtomicU64::new(1),
next_timestamp: AtomicI64::new(1655276557000),
Expand Down Expand Up @@ -258,14 +263,14 @@ impl Region {
self.id,
self.next_entry_id.fetch_add(1, Ordering::Relaxed),
&entry,
&self.wal_options,
&self.provider,
)
.unwrap();
}

/// Replays the region.
pub async fn replay<S: LogStore>(&self, wal: &Arc<Wal<S>>) {
let mut wal_stream = wal.scan(self.id, 0, &self.wal_options).unwrap();
let mut wal_stream = wal.scan(self.id, 0, &self.provider).unwrap();
while let Some(res) = wal_stream.next().await {
let (_, entry) = res.unwrap();
metrics::METRIC_WAL_READ_BYTES_TOTAL.inc_by(Self::entry_estimated_size(&entry) as u64);
Expand Down
2 changes: 1 addition & 1 deletion src/common/telemetry/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub fn init_default_ut_logging() {
env::var("UNITTEST_LOG_DIR").unwrap_or_else(|_| "/tmp/__unittest_logs".to_string());

let level = env::var("UNITTEST_LOG_LEVEL").unwrap_or_else(|_|
"debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info".to_string()
"debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info,rskafka=info".to_string()
);
let opts = LoggingOptions {
dir: dir.clone(),
Expand Down
26 changes: 13 additions & 13 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ use serde_json::error::Error as JsonError;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;

use crate::kafka::NamespaceImpl as KafkaNamespace;

#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Invalid provider type, expected: {}, actual: {}", expected, actual))]
InvalidProvider {
#[snafu(implicit)]
location: Location,
expected: String,
actual: String,
},

#[snafu(display("Failed to start log store gc task"))]
StartGcTask {
#[snafu(implicit)]
Expand Down Expand Up @@ -170,34 +176,28 @@ pub enum Error {
location: Location,
},

#[snafu(display(
"Failed to produce records to Kafka, topic: {}, size: {}, limit: {}",
topic,
size,
limit,
))]
#[snafu(display("Failed to produce records to Kafka, topic: {}, size: {}", topic, size))]
ProduceRecord {
topic: String,
size: usize,
limit: usize,
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: rskafka::client::producer::Error,
},

#[snafu(display("Failed to read a record from Kafka, ns: {}", ns))]
#[snafu(display("Failed to read a record from Kafka, topic: {}", topic))]
ConsumeRecord {
ns: KafkaNamespace,
topic: String,
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},

#[snafu(display("Failed to get the latest offset, ns: {}", ns))]
#[snafu(display("Failed to get the latest offset, topic: {}", topic))]
GetOffset {
ns: KafkaNamespace,
topic: String,
#[snafu(implicit)]
location: Location,
#[snafu(source)]
Expand Down
81 changes: 1 addition & 80 deletions src/log-store/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::mem::size_of;
pub(crate) mod client_manager;
pub mod log_store;
pub(crate) mod util;

use std::fmt::Display;

use serde::{Deserialize, Serialize};
use store_api::logstore::entry::{Entry, Id as EntryId, RawEntry};
use store_api::logstore::namespace::Namespace;
use store_api::storage::RegionId;
use store_api::logstore::entry::Id as EntryId;

/// Kafka Namespace implementation.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
Expand All @@ -31,18 +26,6 @@ pub struct NamespaceImpl {
pub topic: String,
}

impl Namespace for NamespaceImpl {
fn id(&self) -> u64 {
self.region_id
}
}

impl Display for NamespaceImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[topic: {}, region: {}]", self.topic, self.region_id)
}
}

/// Kafka Entry implementation.
#[derive(Debug, PartialEq, Clone)]
pub struct EntryImpl {
Expand All @@ -53,65 +36,3 @@ pub struct EntryImpl {
/// The namespace used to identify and isolate log entries from different regions.
pub ns: NamespaceImpl,
}

impl Entry for EntryImpl {
fn into_raw_entry(self) -> RawEntry {
RawEntry {
region_id: self.region_id(),
entry_id: self.id(),
data: self.data,
}
}

fn data(&self) -> &[u8] {
&self.data
}

fn id(&self) -> EntryId {
self.id
}

fn region_id(&self) -> RegionId {
RegionId::from_u64(self.ns.region_id)
}

fn estimated_size(&self) -> usize {
size_of::<Self>() + self.data.capacity() * size_of::<u8>() + self.ns.topic.capacity()
}
}

impl Display for EntryImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Entry [ns: {}, id: {}, data_len: {}]",
self.ns,
self.id,
self.data.len()
)
}
}

#[cfg(test)]
mod tests {
use std::mem::size_of;

use store_api::logstore::entry::Entry;

use crate::kafka::{EntryImpl, NamespaceImpl};

#[test]
fn test_estimated_size() {
let entry = EntryImpl {
data: Vec::with_capacity(100),
id: 0,
ns: NamespaceImpl {
region_id: 0,
topic: String::with_capacity(10),
},
};
let expected = size_of::<EntryImpl>() + 100 * size_of::<u8>() + 10;
let got = entry.estimated_size();
assert_eq!(expected, got);
}
}
4 changes: 3 additions & 1 deletion src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tokio::sync::RwLock;
use crate::error::{
BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result,
};
use crate::kafka::util::record::MIN_BATCH_SIZE;

// Each topic only has one partition for now.
// The `DEFAULT_PARTITION` refers to the index of the partition.
Expand All @@ -48,7 +49,8 @@ pub(crate) struct Client {
impl Client {
/// Creates a Client from the raw client.
pub(crate) fn new(raw_client: Arc<PartitionClient>, config: &DatanodeKafkaConfig) -> Self {
let record_aggregator = RecordAggregator::new(config.max_batch_size.as_bytes() as usize);
let record_aggregator =
RecordAggregator::new((config.max_batch_size.as_bytes() as usize).max(MIN_BATCH_SIZE));
let batch_producer = BatchProducerBuilder::new(raw_client.clone())
.with_compression(config.compression)
.with_linger(config.linger)
Expand Down
Loading

0 comments on commit 6e9a9dc

Please sign in to comment.