Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(remote_wal): remove topic alias #3120

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/common/config/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ pub mod raft_engine;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;

pub use crate::wal::kafka::{
KafkaConfig, KafkaOptions as KafkaWalOptions, StandaloneKafkaConfig, Topic as KafkaWalTopic,
};
pub use crate::wal::kafka::{KafkaConfig, KafkaOptions as KafkaWalOptions, StandaloneKafkaConfig};
pub use crate::wal::raft_engine::RaftEngineConfig;

/// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair
Expand Down
7 changes: 1 addition & 6 deletions src/common/config/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@ use rskafka::client::partition::Compression as RsKafkaCompression;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;

/// Topic name prefix.
pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic";
/// Kafka wal topic.
pub type Topic = String;

/// The type of the topic selector, i.e. with which strategy to select a topic.
#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -138,5 +133,5 @@ impl Default for StandaloneKafkaConfig {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct KafkaOptions {
/// Kafka wal topic.
pub topic: Topic,
pub topic: String,
}
1 change: 0 additions & 1 deletion src/common/meta/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use serde::{Deserialize, Serialize};
use store_api::storage::{RegionId, RegionNumber};

use crate::wal::kafka::KafkaConfig;
pub use crate::wal::kafka::Topic as KafkaWalTopic;
pub use crate::wal::options_allocator::{
allocate_region_wal_options, WalOptionsAllocator, WalOptionsAllocatorRef,
};
Expand Down
2 changes: 0 additions & 2 deletions src/common/meta/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#[cfg(any(test, feature = "testing"))]
pub mod test_util;
pub mod topic;
pub mod topic_manager;
pub mod topic_selector;

Expand All @@ -23,7 +22,6 @@ use std::time::Duration;
use common_config::wal::kafka::{kafka_backoff, KafkaBackoffConfig, TopicSelectorType};
use serde::{Deserialize, Serialize};

pub use crate::wal::kafka::topic::Topic;
pub use crate::wal::kafka::topic_manager::TopicManager;

/// Configurations for kafka wal.
Expand Down
19 changes: 0 additions & 19 deletions src/common/meta/src/wal/kafka/topic.rs

This file was deleted.

19 changes: 9 additions & 10 deletions src/common/meta/src/wal/kafka/topic_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::error::{
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
use crate::wal::kafka::topic::Topic;
use crate::wal::kafka::topic_selector::{RoundRobinTopicSelector, TopicSelectorRef};
use crate::wal::kafka::KafkaConfig;

Expand All @@ -46,7 +45,7 @@ const DEFAULT_PARTITION: i32 = 0;
/// Manages topic initialization and selection.
pub struct TopicManager {
config: KafkaConfig,
pub(crate) topic_pool: Vec<Topic>,
pub(crate) topic_pool: Vec<String>,
pub(crate) topic_selector: TopicSelectorRef,
kv_backend: KvBackendRef,
}
Expand Down Expand Up @@ -86,7 +85,7 @@ impl TopicManager {
let created_topics = Self::restore_created_topics(&self.kv_backend)
.await?
.into_iter()
.collect::<HashSet<Topic>>();
.collect::<HashSet<String>>();

// Creates missing topics.
let to_be_created = topics
Expand All @@ -108,7 +107,7 @@ impl TopicManager {
}

/// Tries to create topics specified by indexes in `to_be_created`.
async fn try_create_topics(&self, topics: &[Topic], to_be_created: &[usize]) -> Result<()> {
async fn try_create_topics(&self, topics: &[String], to_be_created: &[usize]) -> Result<()> {
// Builds an kafka controller client for creating topics.
let backoff_config = BackoffConfig {
init_backoff: self.config.backoff.init,
Expand Down Expand Up @@ -141,18 +140,18 @@ impl TopicManager {
}

/// Selects one topic from the topic pool through the topic selector.
pub fn select(&self) -> Result<&Topic> {
pub fn select(&self) -> Result<&String> {
self.topic_selector.select(&self.topic_pool)
}

/// Selects a batch of topics from the topic pool through the topic selector.
pub fn select_batch(&self, num_topics: usize) -> Result<Vec<&Topic>> {
pub fn select_batch(&self, num_topics: usize) -> Result<Vec<&String>> {
(0..num_topics)
.map(|_| self.topic_selector.select(&self.topic_pool))
.collect()
}

async fn try_append_noop_record(&self, topic: &Topic, client: &Client) -> Result<()> {
async fn try_append_noop_record(&self, topic: &String, client: &Client) -> Result<()> {
let partition_client = client
.partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
.await
Expand All @@ -177,7 +176,7 @@ impl TopicManager {
Ok(())
}

async fn try_create_topic(&self, topic: &Topic, client: &ControllerClient) -> Result<()> {
async fn try_create_topic(&self, topic: &String, client: &ControllerClient) -> Result<()> {
match client
.create_topic(
topic.clone(),
Expand All @@ -203,7 +202,7 @@ impl TopicManager {
}
}

async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result<Vec<Topic>> {
async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result<Vec<String>> {
kv_backend
.get(CREATED_TOPICS_KEY.as_bytes())
.await?
Expand All @@ -213,7 +212,7 @@ impl TopicManager {
)
}

async fn persist_created_topics(topics: &[Topic], kv_backend: &KvBackendRef) -> Result<()> {
async fn persist_created_topics(topics: &[String], kv_backend: &KvBackendRef) -> Result<()> {
let raw_topics = serde_json::to_vec(topics).context(EncodeJsonSnafu)?;
kv_backend
.put(PutRequest {
Expand Down
5 changes: 2 additions & 3 deletions src/common/meta/src/wal/kafka/topic_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ use rand::Rng;
use snafu::ensure;

use crate::error::{EmptyTopicPoolSnafu, Result};
use crate::wal::kafka::topic::Topic;

/// Controls topic selection.
pub(crate) trait TopicSelector: Send + Sync {
/// Selects a topic from the topic pool.
fn select<'a>(&self, topic_pool: &'a [Topic]) -> Result<&'a Topic>;
fn select<'a>(&self, topic_pool: &'a [String]) -> Result<&'a String>;
}

/// Arc wrapper of TopicSelector.
Expand All @@ -48,7 +47,7 @@ impl RoundRobinTopicSelector {
}

impl TopicSelector for RoundRobinTopicSelector {
fn select<'a>(&self, topic_pool: &'a [Topic]) -> Result<&'a Topic> {
fn select<'a>(&self, topic_pool: &'a [String]) -> Result<&'a String> {
ensure!(!topic_pool.is_empty(), EmptyTopicPoolSnafu);
let which = self.cursor.fetch_add(1, Ordering::Relaxed) % topic_pool.len();
Ok(&topic_pool[which])
Expand Down
5 changes: 2 additions & 3 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::any::Any;

use common_config::wal::KafkaWalTopic;
use common_error::ext::ErrorExt;
use common_macro::stack_trace_debug;
use common_runtime::error::Error as RuntimeError;
Expand Down Expand Up @@ -119,7 +118,7 @@ pub enum Error {
error
))]
GetClient {
topic: KafkaWalTopic,
topic: String,
location: Location,
error: String,
},
Expand All @@ -140,7 +139,7 @@ pub enum Error {
limit,
))]
ProduceRecord {
topic: KafkaWalTopic,
topic: String,
size: usize,
limit: usize,
location: Location,
Expand Down
3 changes: 1 addition & 2 deletions src/log-store/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub(crate) mod util;

use std::fmt::Display;

use common_meta::wal::KafkaWalTopic as Topic;
use serde::{Deserialize, Serialize};
use store_api::logstore::entry::{Entry, Id as EntryId};
use store_api::logstore::namespace::Namespace;
Expand All @@ -29,7 +28,7 @@ use crate::error::Error;
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct NamespaceImpl {
pub region_id: u64,
pub topic: Topic,
pub topic: String,
}

impl Namespace for NamespaceImpl {
Expand Down
10 changes: 5 additions & 5 deletions src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic};
use common_config::wal::KafkaConfig;
use rskafka::client::partition::{PartitionClient, UnknownTopicHandling};
use rskafka::client::producer::aggregator::RecordAggregator;
use rskafka::client::producer::{BatchProducer, BatchProducerBuilder};
Expand Down Expand Up @@ -67,7 +67,7 @@ pub(crate) struct ClientManager {
client_factory: RsKafkaClient,
/// A pool maintaining a collection of clients.
/// Key: a topic. Value: the associated client of the topic.
client_pool: RwLock<HashMap<Topic, Client>>,
client_pool: RwLock<HashMap<String, Client>>,
}

impl ClientManager {
Expand Down Expand Up @@ -97,7 +97,7 @@ impl ClientManager {

/// Gets the client associated with the topic. If the client does not exist, a new one will
/// be created and returned.
pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result<Client> {
pub(crate) async fn get_or_insert(&self, topic: &String) -> Result<Client> {
{
let client_pool = self.client_pool.read().await;
if let Some(client) = client_pool.get(topic) {
Expand All @@ -116,7 +116,7 @@ impl ClientManager {
}
}

async fn try_create_client(&self, topic: &Topic) -> Result<Client> {
async fn try_create_client(&self, topic: &String) -> Result<Client> {
// Sets to Retry to retry connecting if the kafka cluter replies with an UnknownTopic error.
// That's because the topic is believed to exist as the metasrv is expected to create required topics upon start.
// The reconnecting won't stop until succeed or a different error returns.
Expand Down Expand Up @@ -147,7 +147,7 @@ mod tests {
test_name: &str,
num_topics: usize,
broker_endpoints: Vec<String>,
) -> (ClientManager, Vec<Topic>) {
) -> (ClientManager, Vec<String>) {
let topics = create_topics(
num_topics,
|i| format!("{test_name}_{}_{}", i, uuid::Uuid::new_v4()),
Expand Down
3 changes: 1 addition & 2 deletions src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ fn check_termination(
#[cfg(test)]
mod tests {
use common_base::readable_size::ReadableSize;
use common_config::wal::KafkaWalTopic as Topic;
use rand::seq::IteratorRandom;

use super::*;
Expand All @@ -304,7 +303,7 @@ mod tests {
test_name: &str,
num_topics: usize,
broker_endpoints: Vec<String>,
) -> (KafkaLogStore, Vec<Topic>) {
) -> (KafkaLogStore, Vec<String>) {
let topics = create_topics(
num_topics,
|i| format!("{test_name}_{}_{}", i, uuid::Uuid::new_v4()),
Expand Down
3 changes: 1 addition & 2 deletions src/log-store/src/test_util/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::sync::atomic::{AtomicU64 as AtomicEntryId, Ordering};
use std::sync::Mutex;

use common_meta::wal::KafkaWalTopic as Topic;
use rand::distributions::Alphanumeric;
use rand::rngs::ThreadRng;
use rand::{thread_rng, Rng};
Expand All @@ -29,7 +28,7 @@ pub async fn create_topics<F>(
num_topics: usize,
decorator: F,
broker_endpoints: &[String],
) -> Vec<Topic>
) -> Vec<String>
where
F: Fn(usize) -> String,
{
Expand Down
Loading