From 913452be161563c44343b0ecafc888d7c59b39d9 Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 9 Jan 2024 14:35:02 +0800 Subject: [PATCH] chore: remove topic alias --- src/common/config/src/wal.rs | 4 +--- src/common/config/src/wal/kafka.rs | 7 +------ src/common/meta/src/wal.rs | 1 - src/common/meta/src/wal/kafka.rs | 2 -- src/common/meta/src/wal/kafka/topic.rs | 19 ------------------- .../meta/src/wal/kafka/topic_manager.rs | 19 +++++++++---------- .../meta/src/wal/kafka/topic_selector.rs | 5 ++--- src/log-store/src/error.rs | 5 ++--- src/log-store/src/kafka.rs | 3 +-- src/log-store/src/kafka/client_manager.rs | 10 +++++----- src/log-store/src/kafka/log_store.rs | 3 +-- src/log-store/src/test_util/kafka.rs | 3 +-- 12 files changed, 23 insertions(+), 58 deletions(-) delete mode 100644 src/common/meta/src/wal/kafka/topic.rs diff --git a/src/common/config/src/wal.rs b/src/common/config/src/wal.rs index 0b951a942662..f2d3d2c286a1 100644 --- a/src/common/config/src/wal.rs +++ b/src/common/config/src/wal.rs @@ -18,9 +18,7 @@ pub mod raft_engine; use serde::{Deserialize, Serialize}; use serde_with::with_prefix; -pub use crate::wal::kafka::{ - KafkaConfig, KafkaOptions as KafkaWalOptions, StandaloneKafkaConfig, Topic as KafkaWalTopic, -}; +pub use crate::wal::kafka::{KafkaConfig, KafkaOptions as KafkaWalOptions, StandaloneKafkaConfig}; pub use crate::wal::raft_engine::RaftEngineConfig; /// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index e7179f96f106..998c895db50b 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -19,11 +19,6 @@ use rskafka::client::partition::Compression as RsKafkaCompression; use serde::{Deserialize, Serialize}; use serde_with::with_prefix; -/// Topic name prefix. -pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic"; -/// Kafka wal topic. -pub type Topic = String; - /// The type of the topic selector, i.e. with which strategy to select a topic. #[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] @@ -138,5 +133,5 @@ impl Default for StandaloneKafkaConfig { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct KafkaOptions { /// Kafka wal topic. - pub topic: Topic, + pub topic: String, } diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index 2c018145ab42..053b82350ed2 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -23,7 +23,6 @@ use serde::{Deserialize, Serialize}; use store_api::storage::{RegionId, RegionNumber}; use crate::wal::kafka::KafkaConfig; -pub use crate::wal::kafka::Topic as KafkaWalTopic; pub use crate::wal::options_allocator::{ allocate_region_wal_options, WalOptionsAllocator, WalOptionsAllocatorRef, }; diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/meta/src/wal/kafka.rs index 18cde0fdaa52..921f2942f82f 100644 --- a/src/common/meta/src/wal/kafka.rs +++ b/src/common/meta/src/wal/kafka.rs @@ -14,7 +14,6 @@ #[cfg(any(test, feature = "testing"))] pub mod test_util; -pub mod topic; pub mod topic_manager; pub mod topic_selector; @@ -23,7 +22,6 @@ use std::time::Duration; use common_config::wal::kafka::{kafka_backoff, KafkaBackoffConfig, TopicSelectorType}; use serde::{Deserialize, Serialize}; -pub use crate::wal::kafka::topic::Topic; pub use crate::wal::kafka::topic_manager::TopicManager; /// Configurations for kafka wal. diff --git a/src/common/meta/src/wal/kafka/topic.rs b/src/common/meta/src/wal/kafka/topic.rs deleted file mode 100644 index 34e15ad22450..000000000000 --- a/src/common/meta/src/wal/kafka/topic.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/// Kafka wal topic. -/// Publishers publish log entries to the topic while subscribers pull log entries from the topic. -/// A topic is simply a string right now. But it may be more complex in the future. -// TODO(niebayes): remove the Topic alias. -pub type Topic = String; diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index f57d2044862b..eba47f119da8 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -33,7 +33,6 @@ use crate::error::{ }; use crate::kv_backend::KvBackendRef; use crate::rpc::store::PutRequest; -use crate::wal::kafka::topic::Topic; use crate::wal::kafka::topic_selector::{RoundRobinTopicSelector, TopicSelectorRef}; use crate::wal::kafka::KafkaConfig; @@ -46,7 +45,7 @@ const DEFAULT_PARTITION: i32 = 0; /// Manages topic initialization and selection. pub struct TopicManager { config: KafkaConfig, - pub(crate) topic_pool: Vec, + pub(crate) topic_pool: Vec, pub(crate) topic_selector: TopicSelectorRef, kv_backend: KvBackendRef, } @@ -86,7 +85,7 @@ impl TopicManager { let created_topics = Self::restore_created_topics(&self.kv_backend) .await? .into_iter() - .collect::>(); + .collect::>(); // Creates missing topics. let to_be_created = topics @@ -108,7 +107,7 @@ impl TopicManager { } /// Tries to create topics specified by indexes in `to_be_created`. - async fn try_create_topics(&self, topics: &[Topic], to_be_created: &[usize]) -> Result<()> { + async fn try_create_topics(&self, topics: &[String], to_be_created: &[usize]) -> Result<()> { // Builds an kafka controller client for creating topics. let backoff_config = BackoffConfig { init_backoff: self.config.backoff.init, @@ -141,18 +140,18 @@ impl TopicManager { } /// Selects one topic from the topic pool through the topic selector. - pub fn select(&self) -> Result<&Topic> { + pub fn select(&self) -> Result<&String> { self.topic_selector.select(&self.topic_pool) } /// Selects a batch of topics from the topic pool through the topic selector. - pub fn select_batch(&self, num_topics: usize) -> Result> { + pub fn select_batch(&self, num_topics: usize) -> Result> { (0..num_topics) .map(|_| self.topic_selector.select(&self.topic_pool)) .collect() } - async fn try_append_noop_record(&self, topic: &Topic, client: &Client) -> Result<()> { + async fn try_append_noop_record(&self, topic: &String, client: &Client) -> Result<()> { let partition_client = client .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry) .await @@ -177,7 +176,7 @@ impl TopicManager { Ok(()) } - async fn try_create_topic(&self, topic: &Topic, client: &ControllerClient) -> Result<()> { + async fn try_create_topic(&self, topic: &String, client: &ControllerClient) -> Result<()> { match client .create_topic( topic.clone(), @@ -203,7 +202,7 @@ impl TopicManager { } } - async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result> { + async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result> { kv_backend .get(CREATED_TOPICS_KEY.as_bytes()) .await? @@ -213,7 +212,7 @@ impl TopicManager { ) } - async fn persist_created_topics(topics: &[Topic], kv_backend: &KvBackendRef) -> Result<()> { + async fn persist_created_topics(topics: &[String], kv_backend: &KvBackendRef) -> Result<()> { let raw_topics = serde_json::to_vec(topics).context(EncodeJsonSnafu)?; kv_backend .put(PutRequest { diff --git a/src/common/meta/src/wal/kafka/topic_selector.rs b/src/common/meta/src/wal/kafka/topic_selector.rs index 432900ebacc3..609d54820692 100644 --- a/src/common/meta/src/wal/kafka/topic_selector.rs +++ b/src/common/meta/src/wal/kafka/topic_selector.rs @@ -19,12 +19,11 @@ use rand::Rng; use snafu::ensure; use crate::error::{EmptyTopicPoolSnafu, Result}; -use crate::wal::kafka::topic::Topic; /// Controls topic selection. pub(crate) trait TopicSelector: Send + Sync { /// Selects a topic from the topic pool. - fn select<'a>(&self, topic_pool: &'a [Topic]) -> Result<&'a Topic>; + fn select<'a>(&self, topic_pool: &'a [String]) -> Result<&'a String>; } /// Arc wrapper of TopicSelector. @@ -48,7 +47,7 @@ impl RoundRobinTopicSelector { } impl TopicSelector for RoundRobinTopicSelector { - fn select<'a>(&self, topic_pool: &'a [Topic]) -> Result<&'a Topic> { + fn select<'a>(&self, topic_pool: &'a [String]) -> Result<&'a String> { ensure!(!topic_pool.is_empty(), EmptyTopicPoolSnafu); let which = self.cursor.fetch_add(1, Ordering::Relaxed) % topic_pool.len(); Ok(&topic_pool[which]) diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index b3f8b5d08585..70a2ede1519b 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -14,7 +14,6 @@ use std::any::Any; -use common_config::wal::KafkaWalTopic; use common_error::ext::ErrorExt; use common_macro::stack_trace_debug; use common_runtime::error::Error as RuntimeError; @@ -119,7 +118,7 @@ pub enum Error { error ))] GetClient { - topic: KafkaWalTopic, + topic: String, location: Location, error: String, }, @@ -140,7 +139,7 @@ pub enum Error { limit, ))] ProduceRecord { - topic: KafkaWalTopic, + topic: String, size: usize, limit: usize, location: Location, diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index 27a422d2f647..ef84b9a68acf 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -18,7 +18,6 @@ pub(crate) mod util; use std::fmt::Display; -use common_meta::wal::KafkaWalTopic as Topic; use serde::{Deserialize, Serialize}; use store_api::logstore::entry::{Entry, Id as EntryId}; use store_api::logstore::namespace::Namespace; @@ -29,7 +28,7 @@ use crate::error::Error; #[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)] pub struct NamespaceImpl { pub region_id: u64, - pub topic: Topic, + pub topic: String, } impl Namespace for NamespaceImpl { diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index d5402a42b1d1..1d89fe3f43cf 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; -use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic}; +use common_config::wal::KafkaConfig; use rskafka::client::partition::{PartitionClient, UnknownTopicHandling}; use rskafka::client::producer::aggregator::RecordAggregator; use rskafka::client::producer::{BatchProducer, BatchProducerBuilder}; @@ -67,7 +67,7 @@ pub(crate) struct ClientManager { client_factory: RsKafkaClient, /// A pool maintaining a collection of clients. /// Key: a topic. Value: the associated client of the topic. - client_pool: RwLock>, + client_pool: RwLock>, } impl ClientManager { @@ -97,7 +97,7 @@ impl ClientManager { /// Gets the client associated with the topic. If the client does not exist, a new one will /// be created and returned. - pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result { + pub(crate) async fn get_or_insert(&self, topic: &String) -> Result { { let client_pool = self.client_pool.read().await; if let Some(client) = client_pool.get(topic) { @@ -116,7 +116,7 @@ impl ClientManager { } } - async fn try_create_client(&self, topic: &Topic) -> Result { + async fn try_create_client(&self, topic: &String) -> Result { // Sets to Retry to retry connecting if the kafka cluter replies with an UnknownTopic error. // That's because the topic is believed to exist as the metasrv is expected to create required topics upon start. // The reconnecting won't stop until succeed or a different error returns. @@ -147,7 +147,7 @@ mod tests { test_name: &str, num_topics: usize, broker_endpoints: Vec, - ) -> (ClientManager, Vec) { + ) -> (ClientManager, Vec) { let topics = create_topics( num_topics, |i| format!("{test_name}_{}_{}", i, uuid::Uuid::new_v4()), diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 2e05683fe246..f4bf35d6915b 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -283,7 +283,6 @@ fn check_termination( #[cfg(test)] mod tests { use common_base::readable_size::ReadableSize; - use common_config::wal::KafkaWalTopic as Topic; use rand::seq::IteratorRandom; use super::*; @@ -304,7 +303,7 @@ mod tests { test_name: &str, num_topics: usize, broker_endpoints: Vec, - ) -> (KafkaLogStore, Vec) { + ) -> (KafkaLogStore, Vec) { let topics = create_topics( num_topics, |i| format!("{test_name}_{}_{}", i, uuid::Uuid::new_v4()), diff --git a/src/log-store/src/test_util/kafka.rs b/src/log-store/src/test_util/kafka.rs index 7107c6e5c3f1..d56d9b9405b2 100644 --- a/src/log-store/src/test_util/kafka.rs +++ b/src/log-store/src/test_util/kafka.rs @@ -15,7 +15,6 @@ use std::sync::atomic::{AtomicU64 as AtomicEntryId, Ordering}; use std::sync::Mutex; -use common_meta::wal::KafkaWalTopic as Topic; use rand::distributions::Alphanumeric; use rand::rngs::ThreadRng; use rand::{thread_rng, Rng}; @@ -29,7 +28,7 @@ pub async fn create_topics( num_topics: usize, decorator: F, broker_endpoints: &[String], -) -> Vec +) -> Vec where F: Fn(usize) -> String, {