Skip to content

Commit

Permalink
Kafka splitting (metalbear-co#2740)
Browse files Browse the repository at this point in the history
* CRDs

* Improved external changes crd

* Improve external changes crd

* Whatever ;_;

* One properties for all clients

* External change CRD

* saving client properties for created kafka topics

* CRD only for tmp topics

* Fixed fields

* target patch crd

* Add owner process to change

* Fix type

* namespaced

* Improve topic details

* ...

* ......

* camelCase

* setup fix

* Command flag doc

* Config fixes

* Config again

* type name fix

* Fixed unknown queue type variant

* test cfg

* Removed todo

* test sqs config deserialization

* crd update

* CRD docs

* Fixes

* Hash + Eq for some structs in crd

* Printcols

* Schema

* Fix medschool and update configuration.md

* Fix medschool even better

* Fixed config doc

* Removed redundant analytics field
  • Loading branch information
Razz4780 authored Oct 11, 2024
1 parent a006e43 commit 6b3b4fe
Show file tree
Hide file tree
Showing 16 changed files with 572 additions and 166 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions changelog.d/2601.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added Kafka splitting feature.
5 changes: 3 additions & 2 deletions medschool/src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ fn dfs_fields<'a, const MAX_RECURSION_LEVEL: usize>(
recursion_level: &mut usize,
) -> Vec<String> {
if *recursion_level >= MAX_RECURSION_LEVEL {
return vec!["Recursion limit reached".to_string()];
panic!("recursion limit {MAX_RECURSION_LEVEL} reached");
}

// increment the recursion level as we're going deeper into the tree
types // get the type of the field from the types set to recurse into it's fields
.get(&field.ty)
Expand Down Expand Up @@ -281,7 +282,7 @@ fn dfs_fields<'a, const MAX_RECURSION_LEVEL: usize>(
#[tracing::instrument(level = "trace", ret)]
pub fn resolve_references(types: HashSet<PartialType>) -> Option<PartialType> {
/// Maximum recursion level for safety.
const MAX_RECURSION_LEVEL: usize = 10;
const MAX_RECURSION_LEVEL: usize = 16;
// Cache to perform memoization between recursive calls so we don't have to resolve the same
// type multiple times. Mapping between `ident` -> `resolved_docs`.
// For example, if we have a types [`A`, `B`, `C`] and A has a field of type `B` and `B` has a
Expand Down
31 changes: 26 additions & 5 deletions mirrord-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,7 @@
],
"properties": {
"message_filter": {
"description": "A filter is a mapping between message attribute names and regexes they should match. The local application will only receive messages that match **all** of the given patterns. This means, only messages that have **all** of the attributes in the filter, with values of those attributes matching the respective patterns.",
"type": "object",
"additionalProperties": {
"type": "string"
Expand All @@ -1546,6 +1547,29 @@
]
}
}
},
{
"description": "Kafka.",
"type": "object",
"required": [
"message_filter",
"queue_type"
],
"properties": {
"message_filter": {
"description": "A filter is a mapping between message header names and regexes they should match. The local application will only receive messages that match **all** of the given patterns. This means, only messages that have **all** of the headers in the filter, with values of those headers matching the respective patterns.",
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"queue_type": {
"type": "string",
"enum": [
"Kafka"
]
}
}
}
]
},
Expand All @@ -1570,11 +1594,8 @@
"additionalProperties": false
},
"SplitQueuesConfig": {
"description": "```json { \"feature\": { \"split_queues\": { \"first-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"wows\": \"so wows\", \"coolz\": \"^very .*\" } }, \"second-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"who\": \"*you$\" } }, } } } ```",
"type": [
"object",
"null"
],
"description": "```json { \"feature\": { \"split_queues\": { \"first-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"wows\": \"so wows\", \"coolz\": \"^very\" } }, \"second-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"who\": \"you$\" } }, \"third-queue\": { \"queue_type\": \"Kafka\", \"message_filter\": { \"who\": \"you$\" } }, \"fourth-queue\": { \"queue_type\": \"Kafka\", \"message_filter\": { \"wows\": \"so wows\", \"coolz\": \"^very\" } }, } } } ```",
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/QueueFilter"
}
Expand Down
93 changes: 51 additions & 42 deletions mirrord/cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,48 +547,7 @@ pub(super) enum OperatorCommand {
///
/// NOTE: You don't need to install the operator to use open source mirrord features.
#[command(override_usage = "mirrord operator setup [OPTIONS] | kubectl apply -f -")]
Setup {
/// ToS can be read here <https://metalbear.co/legal/terms>
#[arg(long)]
accept_tos: bool,

/// A mirrord for Teams license key (online)
#[arg(long, allow_hyphen_values(true))]
license_key: Option<String>,

/// Path to a file containing a mirrord for Teams license certificate
#[arg(long)]
license_path: Option<PathBuf>,

/// Output Kubernetes specs to file instead of stdout
#[arg(short, long)]
file: Option<PathBuf>,

/// Namespace to create the operator in (this doesn't limit the namespaces the operator
/// will be able to access)
#[arg(short, long, default_value = "mirrord")]
namespace: OperatorNamespace,

/// AWS role ARN for the operator's service account.
/// Necessary for enabling SQS queue splitting.
/// For successfully running an SQS queue splitting operator the given IAM role must be
/// able to create, read from, write to, and delete SQS queues.
/// If the queue messages are encrypted using KMS, the operator also needs the
/// `kms:Encrypt`, `kms:Decrypt` and `kms:GenerateDataKey` permissions.
#[arg(long, visible_alias = "arn")]
aws_role_arn: Option<String>,

/// Enable SQS queue splitting.
/// When set, some extra CRDs will be installed on the cluster, and the operator will run
/// an SQS splitting component.
#[arg(
long,
visible_alias = "sqs",
default_value_t = false,
requires = "aws_role_arn"
)]
sqs_splitting: bool,
},
Setup(#[clap(flatten)] OperatorSetupParams),
/// Print operator status
Status {
/// Specify config file to use
Expand All @@ -602,6 +561,56 @@ pub(super) enum OperatorCommand {
Session(SessionCommand),
}

#[derive(Args, Debug)]
pub(super) struct OperatorSetupParams {
/// ToS can be read here <https://metalbear.co/legal/terms>
#[arg(long)]
pub(super) accept_tos: bool,

/// A mirrord for Teams license key (online)
#[arg(long, allow_hyphen_values(true))]
pub(super) license_key: Option<String>,

/// Path to a file containing a mirrord for Teams license certificate
#[arg(long)]
pub(super) license_path: Option<PathBuf>,

/// Output Kubernetes specs to file instead of stdout
#[arg(short, long)]
pub(super) file: Option<PathBuf>,

/// Namespace to create the operator in (this doesn't limit the namespaces the operator
/// will be able to access)
#[arg(short, long, default_value = "mirrord")]
pub(super) namespace: OperatorNamespace,

/// AWS role ARN for the operator's service account.
/// Necessary for enabling SQS queue splitting.
/// For successfully running an SQS queue splitting operator the given IAM role must be
/// able to create, read from, write to, and delete SQS queues.
/// If the queue messages are encrypted using KMS, the operator also needs the
/// `kms:Encrypt`, `kms:Decrypt` and `kms:GenerateDataKey` permissions.
#[arg(long, visible_alias = "arn")]
pub(super) aws_role_arn: Option<String>,

/// Enable SQS queue splitting.
/// When set, some extra CRDs will be installed on the cluster, and the operator will run
/// an SQS splitting component.
#[arg(
long,
visible_alias = "sqs",
default_value_t = false,
requires = "aws_role_arn"
)]
pub(super) sqs_splitting: bool,

/// Enable Kafka queue splitting.
/// When set, some extra CRDs will be installed on the cluster, and the operator will run
/// a Kafka splitting component.
#[arg(long, visible_alias = "kafka", default_value_t = false)]
pub(super) kafka_splitting: bool,
}

/// `mirrord operator session` family of commands.
///
/// Allows the user to forcefully kill operator sessions, use with care!
Expand Down
48 changes: 15 additions & 33 deletions mirrord/cli/src/operator.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use std::{
fs::File,
path::{Path, PathBuf},
time::Duration,
};
use std::{fs::File, path::Path, time::Duration};

use futures::TryFutureExt;
use kube::{Api, Client};
Expand All @@ -15,7 +11,7 @@ use mirrord_kube::api::kubernetes::create_kube_config;
use mirrord_operator::{
client::OperatorApi,
crd::{MirrordOperatorCrd, MirrordOperatorSpec},
setup::{LicenseType, Operator, OperatorNamespace, OperatorSetup, SetupOptions},
setup::{LicenseType, Operator, OperatorSetup, SetupOptions},
types::LicenseInfoOwned,
};
use mirrord_progress::{Progress, ProgressTracker};
Expand All @@ -29,7 +25,7 @@ use crate::{
config::{OperatorArgs, OperatorCommand},
error::{CliError, OperatorSetupError},
util::remove_proxy_env,
Result,
OperatorSetupParams, Result,
};

mod session;
Expand All @@ -54,13 +50,16 @@ async fn get_last_version() -> Result<String, reqwest::Error> {

/// Setup the operator into a file or to stdout, with explanation.
async fn operator_setup(
accept_tos: bool,
file: Option<PathBuf>,
namespace: OperatorNamespace,
license_key: Option<String>,
license_path: Option<PathBuf>,
aws_role_arn: Option<String>,
sqs_splitting: bool,
OperatorSetupParams {
accept_tos,
license_key,
license_path,
file,
namespace,
aws_role_arn,
sqs_splitting,
kafka_splitting,
}: OperatorSetupParams,
) -> Result<(), OperatorSetupError> {
if !accept_tos {
eprintln!("Please note that mirrord operator installation requires an active subscription for the mirrord Operator provided by MetalBear Tech LTD.\nThe service ToS can be read here - https://metalbear.co/legal/terms\nPass --accept-tos to accept the TOS");
Expand Down Expand Up @@ -105,6 +104,7 @@ async fn operator_setup(
image,
aws_role_arn,
sqs_splitting,
kafka_splitting,
});

match file {
Expand Down Expand Up @@ -297,25 +297,7 @@ Operator License
/// Handle commands related to the operator `mirrord operator ...`
pub(crate) async fn operator_command(args: OperatorArgs) -> Result<()> {
match args.command {
OperatorCommand::Setup {
accept_tos,
file,
namespace,
license_key,
license_path,
aws_role_arn,
sqs_splitting,
} => operator_setup(
accept_tos,
file,
namespace,
license_key,
license_path,
aws_role_arn,
sqs_splitting,
)
.await
.map_err(CliError::from),
OperatorCommand::Setup(params) => operator_setup(params).await.map_err(CliError::from),
OperatorCommand::Status { config_file } => operator_status(config_file.as_deref()).await,
OperatorCommand::Session(session_command) => {
SessionCommandHandler::new(session_command)
Expand Down
3 changes: 2 additions & 1 deletion mirrord/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ ipnet = "2.8"
bitflags = "2"
k8s-openapi = { workspace = true, features = ["schemars", "earliest"] }
tera = "1"
fancy-regex.workspace = true

[dev-dependencies]
rstest = "0.23"
rstest = "0.23"
17 changes: 15 additions & 2 deletions mirrord/config/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1236,13 +1236,26 @@ will be used, and your local application will not receive any messages from that
"queue_type": "SQS",
"message_filter": {
"wows": "so wows",
"coolz": "^very .*"
"coolz": "^very"
}
},
"second-queue": {
"queue_type": "SQS",
"message_filter": {
"who": "*you$"
"who": "you$"
}
},
"third-queue": {
"queue_type": "Kafka",
"message_filter": {
"who": "you$"
}
},
"fourth-queue": {
"queue_type": "Kafka",
"message_filter": {
"wows": "so wows",
"coolz": "^very"
}
},
}
Expand Down
5 changes: 5 additions & 0 deletions mirrord/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::error::Error;

use thiserror::Error;

use crate::feature::split_queues::QueueSplittingVerificationError;

/// <!--${internal}-->
/// Error that would be returned from [MirrordConfig::generate_config]
#[derive(Error, Debug)]
Expand Down Expand Up @@ -71,6 +73,9 @@ pub enum ConfigError {

#[error("Target type requires the mirrord-operator, but operator usage was explicitly disabled. Consider enabling mirrord-operator in your mirrord config.")]
TargetRequiresOperator,

#[error("Queue splitting config is invalid: {0}")]
QueueSplittingVerificationError(#[from] QueueSplittingVerificationError),
}

impl From<tera::Error> for ConfigError {
Expand Down
2 changes: 1 addition & 1 deletion mirrord/config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub struct FeatureConfig {
/// If you don't specify any filter for a queue that is however declared in the
/// `MirrordWorkloadQueueRegistry` of the target you're using, a match-nothing filter
/// will be used, and your local application will not receive any messages from that queue.
#[config(nested, unstable)]
#[config(nested, default, unstable)]
pub split_queues: SplitQueuesConfig,
}

Expand Down
Loading

0 comments on commit 6b3b4fe

Please sign in to comment.