Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(remote_wal): introduce kafka remote wal #3001

Merged
merged 20 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ sync_write = false
# produce_record_timeout = "100ms"
# backoff_init = "500ms"
# backoff_max = "10s"
# backoff_base = 2.0
# backoff_base = 2
# backoff_deadline = "5mins"

# Storage options, see `standalone.example.toml`.
Expand Down
2 changes: 1 addition & 1 deletion config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ provider = "raft_engine"
# The maximum backoff for kafka clients.
# backoff_max = "10s"
# Exponential backoff rate, i.e. next backoff = base * current backoff.
# backoff_base = 2.0
# backoff_base = 2
# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
# backoff_deadline = "5mins"

Expand Down
2 changes: 1 addition & 1 deletion config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ provider = "raft_engine"
# The maximum backoff for kafka clients.
# backoff_max = "10s"
# Exponential backoff rate, i.e. next backoff = base * current backoff.
# backoff_base = 2.0
# backoff_base = 2
# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
# backoff_deadline = "5mins"

Expand Down
7 changes: 5 additions & 2 deletions src/common/meta/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,16 @@ pub struct KafkaConfig {

impl Default for KafkaConfig {
fn default() -> Self {
let broker_endpoints = vec!["127.0.0.1:9092".to_string()];
let replication_factor = broker_endpoints.len() as i16;

Self {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
broker_endpoints,
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 3,
replication_factor,
create_topic_timeout: Duration::from_secs(30),
backoff: KafkaBackoffConfig::default(),
}
Expand Down
63 changes: 45 additions & 18 deletions src/common/meta/src/wal/kafka/topic_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ use std::sync::Arc;
use std::time::Duration;

use common_config::wal::kafka::TopicSelectorType;
use common_telemetry::debug;
use common_telemetry::{debug, error, info};
use rskafka::client::controller::ControllerClient;
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
use rskafka::client::ClientBuilder;
use rskafka::BackoffConfig;
use snafu::{ensure, ResultExt};
use snafu::{ensure, AsErrorSource, ResultExt};

use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu, DecodeJsonSnafu,
Expand Down Expand Up @@ -79,7 +82,6 @@ impl TopicManager {
.await?
.into_iter()
.collect::<HashSet<Topic>>();
debug!("Restored {} topics", created_topics.len());

// Creates missing topics.
let to_be_created = topics
Expand All @@ -92,10 +94,10 @@ impl TopicManager {
Some(i)
})
.collect::<Vec<_>>();

if !to_be_created.is_empty() {
self.try_create_topics(topics, &to_be_created).await?;
Self::persist_created_topics(topics, &self.kv_backend).await?;
debug!("Persisted {} topics", topics.len());
}
Ok(())
}
Expand All @@ -119,23 +121,12 @@ impl TopicManager {
.controller_client()
.context(BuildKafkaCtrlClientSnafu)?;

// Spawns tokio tasks for creating missing topics.
// Try to create missing topics.
let tasks = to_be_created
.iter()
.map(|i| {
client.create_topic(
topics[*i].clone(),
self.config.num_partitions,
self.config.replication_factor,
self.config.create_topic_timeout.as_millis() as i32,
)
})
.map(|i| self.try_create_topic(&topics[*i], &client))
.collect::<Vec<_>>();
// FIXME(niebayes): try to create an already-exist topic would raise an error.
futures::future::try_join_all(tasks)
.await
.context(CreateKafkaWalTopicSnafu)
.map(|_| ())
futures::future::try_join_all(tasks).await.map(|_| ())
}

/// Selects one topic from the topic pool through the topic selector.
Expand All @@ -150,6 +141,32 @@ impl TopicManager {
.collect()
}

async fn try_create_topic(&self, topic: &Topic, client: &ControllerClient) -> Result<()> {
match client
.create_topic(
topic.clone(),
self.config.num_partitions,
self.config.replication_factor,
self.config.create_topic_timeout.as_millis() as i32,
)
.await
{
Ok(_) => {
info!("Successfully created topic {}", topic);
Ok(())
}
Err(e) => {
if Self::is_topic_already_exist_err(&e) {
info!("The topic {} already exists", topic);
Ok(())
} else {
error!("Failed to create a topic {}, error {:?}", topic, e);
Err(e).context(CreateKafkaWalTopicSnafu)
}
}
}
}

async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result<Vec<Topic>> {
kv_backend
.get(CREATED_TOPICS_KEY.as_bytes())
Expand All @@ -171,6 +188,16 @@ impl TopicManager {
.await
.map(|_| ())
}

fn is_topic_already_exist_err(e: &RsKafkaError) -> bool {
matches!(
e,
&RsKafkaError::ServerError {
protocol_error: TopicAlreadyExists,
..
}
)
}
}

#[cfg(test)]
Expand Down
21 changes: 12 additions & 9 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
use common_runtime::error::Error as RuntimeError;
use snafu::{Location, Snafu};

use crate::kafka::NamespaceImpl as KafkaNamespace;

#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
Expand Down Expand Up @@ -152,16 +154,17 @@
error: rskafka::client::producer::Error,
},

#[snafu(display(
"Failed to read a record from Kafka, topic: {}, region_id: {}, offset: {}",
topic,
region_id,
offset,
))]
#[snafu(display("Failed to read a record from Kafka, ns: {}", ns))]
ConsumeRecord {
topic: String,
region_id: u64,
offset: i64,
ns: KafkaNamespace,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},

#[snafu(display("Failed to get the lastest offset, ns: {}", ns))]

Check warning on line 165 in src/log-store/src/error.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"lastest" should be "latest".

Check warning on line 165 in src/log-store/src/error.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"lastest" should be "latest".
GetOffset {
ns: KafkaNamespace,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
Expand Down
20 changes: 20 additions & 0 deletions src/log-store/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub mod log_store;
mod offset;
mod record_utils;

use std::fmt::Display;

use common_meta::wal::KafkaWalTopic as Topic;
use serde::{Deserialize, Serialize};
use store_api::logstore::entry::{Entry, Id as EntryId};
Expand All @@ -37,6 +39,12 @@ impl Namespace for NamespaceImpl {
}
}

impl Display for NamespaceImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.topic, self.region_id)
}
}

/// Kafka Entry implementation.
#[derive(Debug, PartialEq, Clone)]
pub struct EntryImpl {
Expand Down Expand Up @@ -64,3 +72,15 @@ impl Entry for EntryImpl {
self.ns.clone()
}
}

impl Display for EntryImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Entry (ns: {}, id: {}, data_len: {})",
self.ns,
self.id,
self.data.len()
)
}
}
59 changes: 53 additions & 6 deletions src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
use std::sync::Arc;

use common_config::wal::{KafkaConfig, WalOptions};
use common_telemetry::debug;
use futures_util::StreamExt;
use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder};
use rskafka::client::partition::OffsetAt;
use snafu::ResultExt;
use store_api::logstore::entry::Id as EntryId;
use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::Id as NamespaceId;
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};

use crate::error::{Error, Result};
use crate::error::{ConsumeRecordSnafu, Error, GetOffsetSnafu, Result};
use crate::kafka::client_manager::{ClientManager, ClientManagerRef};
use crate::kafka::offset::Offset;
use crate::kafka::record_utils::{handle_consume_result, RecordProducer};
use crate::kafka::record_utils::{decode_from_record, RecordProducer};
use crate::kafka::{EntryImpl, NamespaceImpl};

/// A log store backed by Kafka.
Expand Down Expand Up @@ -82,6 +85,8 @@
/// Appends a batch of entries and returns a response containing a map where the key is a region id
/// while the value is the id of the last successfully written entry of the region.
async fn append_batch(&self, entries: Vec<Self::Entry>) -> Result<AppendBatchResponse> {
debug!("LogStore handles append_batch with entries {:?}", entries);

if entries.is_empty() {
return Ok(AppendBatchResponse::default());
}
Expand All @@ -97,6 +102,7 @@

// Builds a record from entries belong to a region and produces them to kafka server.
let region_ids = producers.keys().cloned().collect::<Vec<_>>();

let tasks = producers
.into_values()
.map(|producer| producer.produce(&self.client_manager))
Expand All @@ -108,6 +114,8 @@
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>>>()?;
debug!("The entries are appended at offsets {:?}", entry_ids);

Ok(AppendBatchResponse {
last_entry_ids: region_ids.into_iter().zip(entry_ids).collect(),
})
Expand All @@ -131,15 +139,54 @@
.raw_client
.clone();

// Reads the entries starting from exactly the specified offset.
let offset = Offset::try_from(entry_id)?.0;
let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(offset))
// Gets the offset of the lastest record in the topic. Actually, it's the lastest record of the single partition in the topic.

Check warning on line 142 in src/log-store/src/kafka/log_store.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"lastest" should be "latest".

Check warning on line 142 in src/log-store/src/kafka/log_store.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"lastest" should be "latest".

Check warning on line 142 in src/log-store/src/kafka/log_store.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"lastest" should be "latest".

Check warning on line 142 in src/log-store/src/kafka/log_store.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"lastest" should be "latest".
// The read operation terminates when this record is consumed.
let end_offset = client
.get_offset(OffsetAt::Latest)
.await
.context(GetOffsetSnafu { ns: ns.clone() })?;
// Reads entries with offsets in the range [start_offset, end_offset].
let start_offset = Offset::try_from(entry_id)?.0;

// Abort if there're no new entries.
// FIXME(niebayes): how come this case happens?
if start_offset > end_offset {
debug!("No new entries in ns {}", ns);
return Ok(futures_util::stream::empty().boxed());
}

let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(start_offset))
.with_max_batch_size(self.config.max_batch_size.as_bytes() as i32)
.with_max_wait_ms(self.config.produce_record_timeout.as_millis() as i32)
.build();

debug!(
"Built a stream consumer for ns {} to consume entries in range [{}, {}]",
ns, start_offset, end_offset
);

let ns_clone = ns.clone();
let stream = async_stream::stream!({
while let Some(consume_result) = stream_consumer.next().await {
yield handle_consume_result(consume_result, &topic, region_id, offset);
let (record, offset) = consume_result.context(ConsumeRecordSnafu {
ns: ns_clone.clone(),
})?;
let entries = decode_from_record(record.record)?;

// Filters entries by region id.
if let Some(entry) = entries.first()
&& entry.ns.region_id == region_id
{
yield Ok(entries);
} else {
yield Ok(vec![]);
}

// Terminates the stream if the entry with the end offset was read.
if offset >= end_offset {
debug!("Stream for ns {} terminates at offset {}", ns_clone, offset);
break;
}
}
});
Ok(Box::pin(stream))
Expand Down
Loading
Loading