Skip to content

Commit

Permalink
CRD docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Razz4780 committed Oct 4, 2024
1 parent 630f2cc commit cf976f8
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 17 deletions.
4 changes: 2 additions & 2 deletions mirrord/config/src/feature/split_queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ impl SplitQueuesConfig {
}

/// Out of the whole queue splitting config, get only the sqs queues.
pub fn sqs(&self) -> impl '_ + Iterator<Item = (&'_ str, &'_ BTreeMap<String, String>)> {
pub fn sqs(&self) -> impl '_ + Iterator<Item = (&'_ str, &'_ QueueMessageFilter)> {
self.0.iter().filter_map(|(name, filter)| match filter {
QueueFilter::Sqs { message_filter } => Some((name.as_str(), message_filter)),
_ => None,
})
}

/// Out of the whole queue splitting config, get only the kafka topics.
pub fn kafka(&self) -> impl '_ + Iterator<Item = (&'_ str, &'_ BTreeMap<String, String>)> {
pub fn kafka(&self) -> impl '_ + Iterator<Item = (&'_ str, &'_ QueueMessageFilter)> {
self.0.iter().filter_map(|(name, filter)| match filter {
QueueFilter::Kafka { message_filter } => Some((name.as_str(), message_filter)),
_ => None,
Expand Down
108 changes: 97 additions & 11 deletions mirrord/operator/src/crd/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

/// Properties to use when creating operator's Kafka client.
/// Configuration to use when creating operator's Kafka client.
/// Resources of this kind should live in the operator's namespace.
#[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)]
#[kube(
Expand All @@ -16,7 +16,12 @@ use serde::{Deserialize, Serialize};
pub struct MirrordKafkaClientConfigSpec {
/// Name of parent resource to use as base when resolving final configuration.
pub parent: Option<String>,

/// Properties to set.
///
/// When performing Kafka splitting, the operator will override `group.id` property.
///
/// The list of all available properties can be found [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
pub properties: Vec<MirrordKafkaClientProperty>,
}

Expand All @@ -26,11 +31,64 @@ pub struct MirrordKafkaClientConfigSpec {
pub struct MirrordKafkaClientProperty {
/// Name of the property, e.g `bootstrap.servers`.
pub name: String,

/// Value for the property, e.g `kafka.default.svc.cluster.local:9092`.
/// `null` clears the property from parent resource when resolving the final configuration.
pub value: Option<String>,
}

/// Defines splittable Kafka topics consumed by some workload living in the same namespace.
///
/// # Concurrent splitting
///
/// Concurrent Kafka splitting sessions are allowed, as long as they use the same topic id or their
/// topics' `nameSources` have no overlap.
///
/// # Example
///
/// ```yaml
/// apiVersion: queues.mirrord.metalbear.co/v1alpha
/// kind: MirrordKafkaTopicsConsumer
/// metadata:
/// name: example
/// namespace: default
/// spec:
/// consumerName: example-deployment
/// consumerApiVersion: apps/v1
/// consumerKind: Deployment
/// topics:
/// - id: example-topic
/// nameSources:
/// - directEnvVar:
/// container: example-container
/// name: KAFKA_TOPIC_NAME
/// groupIdSources:
/// - directEnvVar:
/// container: example-container
/// name: KAFKA_GROUP_ID
/// clientConfig: example-config
/// ```
///
/// 1. Creating the resource below will enable Kafka splitting on a deployment `example-deployment`
/// living in namespace `default`. Id `example-topic` can be then used in the mirrord config to
/// split the topic for the duration of the mirrord session.
///
/// 2. Topic name will be resolved based on `example-deployment`'s pod template by extracting value
/// of variable `KAFKA_TOPIC_NAME` defined directly in `example-container`.
///
/// 3. Consumer group id used by the mirrord operator will be resolved based on
/// `example-deployment`'s pod template by extracting value of variable `KAFKA_GROUP_ID` defined
/// directly in `example-container`.
///
/// 4. For the duration of the session, `example-deployment` will be patched - the mirrord operator
/// will substitute topic name in `KAFKA_TOPIC_NAME` variable with a name of an ephemeral Kafka
/// topic.
///
/// 5. Local application will see a different value of the `KAFKA_TOPIC_NAME` - it will be a name of
/// another ephemeral Kafka topic.
///
/// 6. `MirrordKafkaClientConfig` named `example-config` living in mirrord operator's namespace will
/// be used to manage ephemeral Kafka topics and consume/produce messages.
#[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)]
#[kube(
group = "queues.mirrord.metalbear.co",
Expand All @@ -40,51 +98,79 @@ pub struct MirrordKafkaClientProperty {
)]
#[serde(rename_all = "camelCase")]
pub struct MirrordKafkaTopicsConsumerSpec {
/// Workload name, for example `my-deployment`.
pub consumer_name: String,

/// Workload kind, for example `Deployment`.
pub consumer_kind: String,

/// Workload api version, for example `apps/v1`.
pub consumer_api_version: String,

/// Timeout for waiting until workload patch takes effect, that is at least one pod with
/// patched spec is running.
///
/// Specified in seconds. Defaults to 60s.
#[serde(skip_serializing_if = "Option::is_none")]
pub consumer_restart_timeout: Option<u32>,

/// List of consumed splittable topics.
pub topics: Vec<KafkaTopicDetails>,
}

/// Splittable Kafka topic consumed by some remote target.
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct KafkaTopicDetails {
/// Id of this topic.
/// Id of this topic. Can be used in mirrord config to identify this topic.
pub id: String,
/// All occurrences of this topic's name in the pod spec.

/// All occurrences of this topic's name in the workload's pod template.
pub name_sources: Vec<TopicPropertySource>,
/// All occurrences of this topic's group id in the pod spec.

/// All occurrences of this topic's group id in the workload's pod template.
pub group_id_sources: Vec<TopicPropertySource>,

/// Links to [`MirrordKafkaClientConfig`] in the operator's namespace.
/// This config will be used to manage ephemeral Kafka topics and consume/produce messages.
pub client_config: String,
}

/// Source of some topic property required for Kafka splitting.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub enum TopicPropertySource {
DirectEnvVar(DirectEnvVar),
/// Environment variable with value defined directly in the pod template.
DirectEnvVar(EnvVarLocation),
}

/// Location of an environment variable defined in the pod template.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct DirectEnvVar {
pub struct EnvVarLocation {
/// Name of the container.
pub container: String,
pub name: String,

/// Name of the variable.
pub variable: String,
}

/// Created temporary topic in a Kafka cluster.
/// Resources of this kind should live in the operator's namespace.
/// Ephemeral topic created in your Kafka cluster for the purpose of running a Kafka splitting
/// session.
///
/// Resources of this kind should live in the operator's namespace. They will be used to clean up
/// topics that are no longer used.
#[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)]
#[kube(
group = "queues.mirrord.metalbear.co",
version = "v1alpha",
kind = "MirrordKafkaTemporaryTopic",
kind = "MirrordKafkaEphemeralTopic",
namespaced,
printcolumn = r#"{"name":"NAME", "type":"string", "description":"Name of the topic.", "jsonPath":".spec.name"}"#,
printcolumn = r#"{"name":"CLIENT-CONFIG", "type":"string", "description":"Name of MirrordKafkaClientProperties to use when creating Kafka client.", "jsonPath":".spec.clientConfig"}"#
)]
#[serde(rename_all = "camelCase")]
pub struct MirrordKafkaTemporaryTopicSpec {
pub struct MirrordKafkaEphemeralTopicSpec {
/// Name of the topic.
pub name: String,
/// Links to [`MirrordKafkaClientConfigSpec`] resource living in the same namespace.
Expand Down
8 changes: 4 additions & 4 deletions mirrord/operator/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use kube::{CustomResourceExt, Resource};
use thiserror::Error;

use crate::crd::{
kafka::{MirrordKafkaClientConfig, MirrordKafkaTemporaryTopic, MirrordKafkaTopicsConsumer},
kafka::{MirrordKafkaClientConfig, MirrordKafkaEphemeralTopic, MirrordKafkaTopicsConsumer},
MirrordPolicy, MirrordSqsSession, MirrordWorkloadQueueRegistry, TargetCrd,
};

Expand Down Expand Up @@ -225,7 +225,7 @@ impl OperatorSetup for Operator {
MirrordKafkaClientConfig::crd().to_writer(&mut writer)?;

writer.write_all(b"---\n")?;
MirrordKafkaTemporaryTopic::crd().to_writer(&mut writer)?;
MirrordKafkaEphemeralTopic::crd().to_writer(&mut writer)?;

writer.write_all(b"---\n")?;
MirrordKafkaTopicsConsumer::crd().to_writer(&mut writer)?;
Expand Down Expand Up @@ -552,8 +552,8 @@ impl OperatorRole {
if kafka_splitting {
rules.extend([
PolicyRule {
api_groups: Some(vec![MirrordKafkaTemporaryTopic::group(&()).into_owned()]),
resources: Some(vec![MirrordKafkaTemporaryTopic::plural(&()).into_owned()]),
api_groups: Some(vec![MirrordKafkaEphemeralTopic::group(&()).into_owned()]),
resources: Some(vec![MirrordKafkaEphemeralTopic::plural(&()).into_owned()]),
verbs: ["get", "list", "watch", "create", "delete"]
.into_iter()
.map(String::from)
Expand Down

0 comments on commit cf976f8

Please sign in to comment.