Skip to content

Commit

Permalink
feat(remote_wal): append a noop record after kafka topic initializati…
Browse files Browse the repository at this point in the history
…on (#3040)

* feat: append a noop record after kafka topic initialization

* chore: apply suggestions from CR

* feat: ignore the noop record during the read
  • Loading branch information
WenyXu authored Dec 29, 2023
1 parent d22072f commit 301ffc1
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 5 deletions.
23 changes: 23 additions & 0 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,27 @@ pub enum Error {
error: rskafka::client::error::Error,
},

#[snafu(display(
"Failed to build a Kafka partition client, topic: {}, partition: {}",
topic,
partition
))]
BuildKafkaPartitionClient {
topic: String,
partition: i32,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},

#[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
ProduceRecord {
topic: String,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},

#[snafu(display("Failed to create a Kafka wal topic"))]
CreateKafkaWalTopic {
location: Location,
Expand Down Expand Up @@ -368,6 +389,8 @@ impl ErrorExt for Error {
| EncodeWalOptions { .. }
| BuildKafkaClient { .. }
| BuildKafkaCtrlClient { .. }
| BuildKafkaPartitionClient { .. }
| ProduceRecord { .. }
| CreateKafkaWalTopic { .. }
| EmptyTopicPool { .. } => StatusCode::Unexpected,

Expand Down
48 changes: 43 additions & 5 deletions src/common/meta/src/wal/kafka/topic_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ use common_telemetry::{debug, error, info};
use rskafka::client::controller::ControllerClient;
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
use rskafka::client::ClientBuilder;
use rskafka::client::partition::{Compression, UnknownTopicHandling};
use rskafka::client::{Client, ClientBuilder};
use rskafka::record::Record;
use rskafka::BackoffConfig;
use snafu::{ensure, AsErrorSource, ResultExt};

use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu, DecodeJsonSnafu,
EncodeJsonSnafu, InvalidNumTopicsSnafu, Result,
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu,
CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, InvalidNumTopicsSnafu,
ProduceRecordSnafu, Result,
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
Expand All @@ -37,6 +40,10 @@ use crate::wal::kafka::KafkaConfig;

const CREATED_TOPICS_KEY: &str = "__created_wal_topics/kafka/";

// Each topic only has one partition for now.
// The `DEFAULT_PARTITION` refers to the index of the partition.
const DEFAULT_PARTITION: i32 = 0;

/// Manages topic initialization and selection.
pub struct TopicManager {
config: KafkaConfig,
Expand Down Expand Up @@ -117,14 +124,20 @@ impl TopicManager {
.await
.with_context(|_| BuildKafkaClientSnafu {
broker_endpoints: self.config.broker_endpoints.clone(),
})?
})?;

let control_client = client
.controller_client()
.context(BuildKafkaCtrlClientSnafu)?;

// Try to create missing topics.
let tasks = to_be_created
.iter()
.map(|i| self.try_create_topic(&topics[*i], &client))
.map(|i| async {
self.try_create_topic(&topics[*i], &control_client).await?;
self.try_append_noop_record(&topics[*i], &client).await?;
Ok(())
})
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.map(|_| ())
}
Expand All @@ -141,6 +154,31 @@ impl TopicManager {
.collect()
}

async fn try_append_noop_record(&self, topic: &Topic, client: &Client) -> Result<()> {
let partition_client = client
.partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
.await
.context(BuildKafkaPartitionClientSnafu {
topic,
partition: DEFAULT_PARTITION,
})?;

partition_client
.produce(
vec![Record {
key: None,
value: None,
timestamp: rskafka::chrono::Utc::now(),
headers: Default::default(),
}],
Compression::NoCompression,
)
.await
.context(ProduceRecordSnafu { topic })?;

Ok(())
}

async fn try_create_topic(&self, topic: &Topic, client: &ControllerClient) -> Result<()> {
match client
.create_topic(
Expand Down
4 changes: 4 additions & 0 deletions src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ impl LogStore for KafkaLogStore {
record_offset, ns_clone, high_watermark
);

// Ignores the noop record.
if record.record.value.is_none() {
continue;
}
let entries = decode_from_record(record.record)?;

// Filters entries by region id.
Expand Down

0 comments on commit 301ffc1

Please sign in to comment.