diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 16d290d0e8..b9135c3726 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -397,7 +397,7 @@ func (a *FlowableActivity) SyncFlow( var res *model.SyncResponse errGroup.Go(func() error { syncBatchID, err := dstConn.GetLastSyncBatchID(errCtx, flowName) - if err != nil && config.Destination.Type != protos.DBType_EVENTHUB { + if err != nil && config.Destination.Type != protos.DBType_EVENTHUBS { return err } syncBatchID += 1 diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 4ed9315872..15c0e22ccf 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -50,6 +50,7 @@ func NewEventHubConnector( config: config, creds: defaultAzureCreds, hubManager: hubManager, + logger: logger, }, nil } @@ -135,7 +136,7 @@ func (c *EventHubConnector) processBatch( return 0, err } - ehConfig, ok := c.hubManager.peerConfig.Get(destination.PeerName) + ehConfig, ok := c.hubManager.namespaceToEventhubMap.Get(destination.NamespaceName) if !ok { c.logger.Error("failed to get eventhub config", slog.Any("error", err)) return 0, err diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 7b791e52ca..b374d12543 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -22,8 +22,8 @@ import ( type EventHubManager struct { creds *azidentity.DefaultAzureCredential - // eventhub peer name -> config - peerConfig cmap.ConcurrentMap[string, *protos.EventHubConfig] + // eventhub namespace name -> config + namespaceToEventhubMap cmap.ConcurrentMap[string, *protos.EventHubConfig] // eventhub name -> client hubs sync.Map } @@ -32,22 +32,22 @@ func NewEventHubManager( creds *azidentity.DefaultAzureCredential, groupConfig *protos.EventHubGroupConfig, ) *EventHubManager { - peerConfig := cmap.New[*protos.EventHubConfig]() + namespaceToEventhubMap := cmap.New[*protos.EventHubConfig]() for name, config := range groupConfig.Eventhubs { - peerConfig.Set(name, config) + namespaceToEventhubMap.Set(name, config) } return &EventHubManager{ - creds: creds, - peerConfig: peerConfig, + creds: creds, + namespaceToEventhubMap: namespaceToEventhubMap, } } func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) ( *azeventhubs.ProducerClient, error, ) { - ehConfig, ok := m.peerConfig.Get(name.PeerName) + ehConfig, ok := m.namespaceToEventhubMap.Get(name.NamespaceName) if !ok { return nil, fmt.Errorf("eventhub '%s' not configured", name.Eventhub) } @@ -152,9 +152,9 @@ func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, destination // EnsureEventHubExists ensures that the eventhub exists. func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedEventhub) error { - cfg, ok := m.peerConfig.Get(name.PeerName) + cfg, ok := m.namespaceToEventhubMap.Get(name.NamespaceName) if !ok { - return fmt.Errorf("eventhub peer '%s' not configured", name.PeerName) + return fmt.Errorf("eventhub namespace '%s' not registered", name.NamespaceName) } hubClient, err := m.getEventHubMgmtClient(cfg.SubscriptionId) diff --git a/flow/connectors/eventhub/scoped_eventhub.go b/flow/connectors/eventhub/scoped_eventhub.go index f3b5d46d4c..ecdfcf29ed 100644 --- a/flow/connectors/eventhub/scoped_eventhub.go +++ b/flow/connectors/eventhub/scoped_eventhub.go @@ -5,18 +5,18 @@ import ( "strings" ) -// Scoped eventhub is of the form peer_name.eventhub_name.partition_column.partition_key_value +// Scoped eventhub is of the form namespace.eventhub_name.partition_column.partition_key_value // partition_column is the column in the table that is used to determine // the partition key for the eventhub. Partition value is one such value of that column. type ScopedEventhub struct { - PeerName string + NamespaceName string Eventhub string PartitionKeyColumn string PartitionKeyValue string } func NewScopedEventhub(dstTableName string) (ScopedEventhub, error) { - // split by dot, the model is peername.eventhub.partition_key_column + // split by dot, the model is namespace.eventhub.partition_key_column parts := strings.Split(dstTableName, ".") if len(parts) != 3 { @@ -27,7 +27,7 @@ func NewScopedEventhub(dstTableName string) (ScopedEventhub, error) { eventhubPart := strings.Trim(parts[1], `"`) partitionPart := strings.Trim(parts[2], `"`) return ScopedEventhub{ - PeerName: parts[0], + NamespaceName: parts[0], Eventhub: eventhubPart, PartitionKeyColumn: partitionPart, }, nil @@ -38,7 +38,7 @@ func (s *ScopedEventhub) SetPartitionValue(value string) { } func (s ScopedEventhub) Equals(other ScopedEventhub) bool { - return s.PeerName == other.PeerName && + return s.NamespaceName == other.NamespaceName && s.Eventhub == other.Eventhub && s.PartitionKeyColumn == other.PartitionKeyColumn && s.PartitionKeyValue == other.PartitionKeyValue @@ -46,5 +46,5 @@ func (s ScopedEventhub) Equals(other ScopedEventhub) bool { // ToString returns the string representation of the ScopedEventhub func (s ScopedEventhub) ToString() string { - return fmt.Sprintf("%s.%s.%s.%s", s.PeerName, s.Eventhub, s.PartitionKeyColumn, s.PartitionKeyValue) + return fmt.Sprintf("%s.%s.%s.%s", s.NamespaceName, s.Eventhub, s.PartitionKeyColumn, s.PartitionKeyValue) } diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index 1af28dab14..cab0257482 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -2935,7 +2935,7 @@ dependencies = [ [[package]] name = "sqlparser" version = "0.41.0" -source = "git+https://github.com/peerdb-io/sqlparser-rs.git#9fbfb423db7fc0949dea2b1400cb5ef848426409" +source = "git+https://github.com/peerdb-io/sqlparser-rs.git#178a84c3c48123a2818298d87748033581886eae" dependencies = [ "log", "sqlparser_derive", @@ -2944,7 +2944,7 @@ dependencies = [ [[package]] name = "sqlparser_derive" version = "0.2.1" -source = "git+https://github.com/peerdb-io/sqlparser-rs.git#9fbfb423db7fc0949dea2b1400cb5ef848426409" +source = "git+https://github.com/peerdb-io/sqlparser-rs.git#178a84c3c48123a2818298d87748033581886eae" dependencies = [ "proc-macro2", "quote", diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 0c16269cd4..1feae83ed4 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -89,15 +89,8 @@ impl<'a> StatementAnalyzer for PeerExistanceAnalyzer<'a> { /// PeerDDLAnalyzer is a statement analyzer that checks if the given /// statement is a PeerDB DDL statement. If it is, it returns the type of /// DDL statement. -pub struct PeerDDLAnalyzer<'a> { - peers: &'a HashMap, -} - -impl<'a> PeerDDLAnalyzer<'a> { - pub fn new(peers: &'a HashMap) -> Self { - Self { peers } - } -} +#[derive(Default)] +pub struct PeerDDLAnalyzer; #[derive(Debug, Clone)] pub enum PeerDDL { @@ -139,7 +132,7 @@ pub enum PeerDDL { }, } -impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { +impl StatementAnalyzer for PeerDDLAnalyzer { type Output = Option; fn analyze(&self, statement: &Statement) -> anyhow::Result { @@ -151,7 +144,7 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { with_options, } => { let db_type = DbType::from(peer_type.clone()); - let config = parse_db_options(self.peers, db_type, with_options)?; + let config = parse_db_options(db_type, with_options)?; let peer = Peer { name: peer_name.to_string().to_lowercase(), r#type: db_type as i32, @@ -495,11 +488,7 @@ impl StatementAnalyzer for PeerCursorAnalyzer { } } -fn parse_db_options( - peers: &HashMap, - db_type: DbType, - with_options: &[SqlOption], -) -> anyhow::Result> { +fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Result> { let mut opts: HashMap<&str, &str> = HashMap::with_capacity(with_options.len()); for opt in with_options { let val = match opt.value { @@ -664,45 +653,6 @@ fn parse_db_options( }; Config::PostgresConfig(postgres_config) } - DbType::Eventhub => { - let subscription_id = opts - .get("subscription_id") - .map(|s| s.to_string()) - .unwrap_or_default(); - - // partition_count default to 3 if not set, parse as int - let partition_count = opts - .get("partition_count") - .unwrap_or(&"3") - .parse::() - .context("unable to parse partition_count as valid int")?; - - // message_retention_in_days default to 7 if not set, parse as int - let message_retention_in_days = opts - .get("message_retention_in_days") - .unwrap_or(&"7") - .parse::() - .context("unable to parse message_retention_in_days as valid int")?; - - let eventhub_config = EventHubConfig { - namespace: opts - .get("namespace") - .context("no namespace specified")? - .to_string(), - resource_group: opts - .get("resource_group") - .context("no resource group specified")? - .to_string(), - location: opts - .get("location") - .context("location not specified")? - .to_string(), - subscription_id, - partition_count, - message_retention_in_days, - }; - Config::EventhubConfig(eventhub_config) - } DbType::S3 => { let s3_config = S3Config { url: opts @@ -738,44 +688,6 @@ fn parse_db_options( }; Config::SqlserverConfig(sqlserver_config) } - DbType::EventhubGroup => { - // split comma separated list of columns and trim - let unnest_columns = opts - .get("unnest_columns") - .map(|columns| { - columns - .split(',') - .map(|column| column.trim().to_string()) - .collect::>() - }) - .unwrap_or_default(); - - let mut eventhubs: HashMap = HashMap::new(); - for (key, _) in opts { - if matches!(key, "metadata_db" | "unnest_columns") { - continue; - } - - // check if peers contains key and if it does - // then add it to the eventhubs hashmap, if not error - if let Some(peer) = peers.get(key) { - let eventhub_config = peer.config.as_ref().unwrap(); - if let Config::EventhubConfig(eventhub_config) = eventhub_config { - eventhubs.insert(key.to_string(), eventhub_config.clone()); - } else { - anyhow::bail!("Peer '{}' is not an eventhub", key); - } - } else { - anyhow::bail!("Peer '{}' does not exist", key); - } - } - - let eventhub_group_config = pt::peerdb_peers::EventHubGroupConfig { - eventhubs, - unnest_columns, - }; - Config::EventhubGroupConfig(eventhub_group_config) - } DbType::Clickhouse => { let clickhouse_config = ClickhouseConfig { host: opts.get("host").context("no host specified")?.to_string(), @@ -907,5 +819,37 @@ fn parse_db_options( }; Config::PubsubConfig(ps_config) } + DbType::Eventhubs => { + let unnest_columns = opts + .get("unnest_columns") + .map(|columns| { + columns + .split(',') + .map(|column| column.trim().to_string()) + .collect::>() + }) + .unwrap_or_default(); + + let eventhubs: Vec = serde_json::from_str( + opts.get("eventhubs") + .context("no eventhubs specified")? + .to_string() + .as_str(), + ) + .context("unable to parse eventhubs as valid json")?; + + let mut eventhubs_map: HashMap = HashMap::new(); + for eventhub in eventhubs { + eventhubs_map.insert(eventhub.namespace.clone(), eventhub); + } + + let eventhub_group_config = pt::peerdb_peers::EventHubGroupConfig { + eventhubs: eventhubs_map, + unnest_columns, + }; + + println!("eventhub_group_config: {:?}", eventhub_group_config); + Config::EventhubGroupConfig(eventhub_group_config) + } })) } diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 80e67aa507..3e01159f9d 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -278,11 +278,6 @@ impl Catalog { pt::peerdb_peers::MongoConfig::decode(options).with_context(err)?; Config::MongoConfig(mongo_config) } - DbType::Eventhub => { - let eventhub_config = - pt::peerdb_peers::EventHubConfig::decode(options).with_context(err)?; - Config::EventhubConfig(eventhub_config) - } DbType::Postgres => { let postgres_config = pt::peerdb_peers::PostgresConfig::decode(options).with_context(err)?; @@ -298,7 +293,7 @@ impl Catalog { pt::peerdb_peers::SqlServerConfig::decode(options).with_context(err)?; Config::SqlserverConfig(sqlserver_config) } - DbType::EventhubGroup => { + DbType::Eventhubs => { let eventhub_group_config = pt::peerdb_peers::EventHubGroupConfig::decode(options).with_context(err)?; Config::EventhubGroupConfig(eventhub_group_config) diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 9c25337974..3425a1b29f 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -2,8 +2,7 @@ use catalog::WorkflowDetails; use pt::{ flow_model::{FlowJob, QRepFlowJob}, peerdb_flow::{QRepWriteMode, QRepWriteType}, - peerdb_route, - tonic, + peerdb_route, tonic, }; use serde_json::Value; use tonic_health::pb::health_client; diff --git a/nexus/parser/src/lib.rs b/nexus/parser/src/lib.rs index 4a305c7899..deb16af505 100644 --- a/nexus/parser/src/lib.rs +++ b/nexus/parser/src/lib.rs @@ -42,7 +42,7 @@ impl NexusStatement { stmt: &Statement, ) -> PgWireResult { let ddl = { - let pdl: PeerDDLAnalyzer = PeerDDLAnalyzer::new(&peers); + let pdl: PeerDDLAnalyzer = PeerDDLAnalyzer; pdl.analyze(stmt).map_err(|e| { PgWireError::UserError(Box::new(ErrorInfo::new( "ERROR".to_owned(), diff --git a/nexus/pt/src/lib.rs b/nexus/pt/src/lib.rs index 48a558ec0c..d04c615aee 100644 --- a/nexus/pt/src/lib.rs +++ b/nexus/pt/src/lib.rs @@ -24,11 +24,10 @@ impl From for DbType { PeerType::Mongo => DbType::Mongo, PeerType::Snowflake => DbType::Snowflake, PeerType::Postgres => DbType::Postgres, - PeerType::EventHub => DbType::Eventhub, PeerType::S3 => DbType::S3, PeerType::SQLServer => DbType::Sqlserver, - PeerType::EventHubGroup => DbType::EventhubGroup, PeerType::Kafka => DbType::Kafka, + PeerType::Eventhubs => DbType::Eventhubs, } } } diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index e237e48bf0..d5cb4c0dcc 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -147,8 +147,7 @@ impl NexusBackend { fn is_peer_validity_supported(peer_type: i32) -> bool { let unsupported_peer_types = [ - 4, // EVENTHUB - 7, // EVENTHUB_GROUP + 11, // EVENTHUBS ]; !unsupported_peer_types.contains(&peer_type) } diff --git a/protos/peers.proto b/protos/peers.proto index 4fa1978568..360e08f311 100644 --- a/protos/peers.proto +++ b/protos/peers.proto @@ -90,7 +90,7 @@ message EventHubConfig { } message EventHubGroupConfig { - // event hub peer name to event hub config + // event hub namespace name to event hub config map eventhubs = 1; repeated string unnest_columns = 3; } @@ -139,13 +139,12 @@ enum DBType { SNOWFLAKE = 1; MONGO = 2; POSTGRES = 3; - EVENTHUB = 4; S3 = 5; SQLSERVER = 6; - EVENTHUB_GROUP = 7; CLICKHOUSE = 8; KAFKA = 9; PUBSUB = 10; + EVENTHUBS = 11; } message Peer { diff --git a/ui/app/api/peers/getTruePeer.ts b/ui/app/api/peers/getTruePeer.ts index 4c29ffd280..8644ce6662 100644 --- a/ui/app/api/peers/getTruePeer.ts +++ b/ui/app/api/peers/getTruePeer.ts @@ -71,6 +71,10 @@ export const getTruePeer = (peer: CatalogPeer) => { config = PubSubConfig.decode(options); newPeer.pubsubConfig = config; break; + case 11: + config = EventHubGroupConfig.decode(options); + newPeer.eventhubGroupConfig = config; + break; default: return newPeer; } diff --git a/ui/app/api/peers/info/[peerName]/route.ts b/ui/app/api/peers/info/[peerName]/route.ts index 1a1cb6cde3..01016bb940 100644 --- a/ui/app/api/peers/info/[peerName]/route.ts +++ b/ui/app/api/peers/info/[peerName]/route.ts @@ -19,7 +19,7 @@ export async function GET( const bqConfig = peerConfig.bigqueryConfig; const s3Config = peerConfig.s3Config; const sfConfig = peerConfig.snowflakeConfig; - const ehConfig = peerConfig.eventhubConfig; + const ehConfig = peerConfig.eventhubGroupConfig; const chConfig = peerConfig.clickhouseConfig; const kaConfig = peerConfig.kafkaConfig; const psConfig = peerConfig.pubsubConfig; @@ -39,8 +39,11 @@ export async function GET( sfConfig.password = '********'; } if (ehConfig) { - ehConfig.subscriptionId = '********'; + for (const key in ehConfig.eventhubs) { + ehConfig.eventhubs[key].subscriptionId = '********'; + } } + if (chConfig) { chConfig.password = '********'; chConfig.secretAccessKey = '********'; diff --git a/ui/app/api/peers/route.ts b/ui/app/api/peers/route.ts index 398d834e74..f31fd40cf4 100644 --- a/ui/app/api/peers/route.ts +++ b/ui/app/api/peers/route.ts @@ -10,6 +10,7 @@ import { BigqueryConfig, ClickhouseConfig, DBType, + EventHubGroupConfig, KafkaConfig, Peer, PostgresConfig, @@ -77,6 +78,12 @@ const constructPeer = ( type: DBType.PUBSUB, pubsubConfig: config as PubSubConfig, }; + case 'EVENTHUBS': + return { + name, + type: DBType.EVENTHUBS, + eventhubGroupConfig: config as EventHubGroupConfig, + }; default: return; } diff --git a/ui/app/dto/PeersDTO.ts b/ui/app/dto/PeersDTO.ts index 3035ea86cb..8f44e9035c 100644 --- a/ui/app/dto/PeersDTO.ts +++ b/ui/app/dto/PeersDTO.ts @@ -1,6 +1,7 @@ import { BigqueryConfig, ClickhouseConfig, + EventHubGroupConfig, KafkaConfig, PostgresConfig, PubSubConfig, @@ -47,7 +48,8 @@ export type PeerConfig = | ClickhouseConfig | S3Config | KafkaConfig - | PubSubConfig; + | PubSubConfig + | EventHubGroupConfig; export type CatalogPeer = { id: number; name: string; diff --git a/ui/components/DropDialog.tsx b/ui/components/DropDialog.tsx index 14680900e3..9a1062d40e 100644 --- a/ui/components/DropDialog.tsx +++ b/ui/components/DropDialog.tsx @@ -127,12 +127,16 @@ export const DropDialog = ({ switch (mode) { case 'MIRROR': objectSpecificDeleteText = `mirror ${(dropArgs as dropMirrorArgs).flowJobName}`; + break; case 'PEER': objectSpecificDeleteText = `peer ${(dropArgs as dropPeerArgs).peerName}`; + break; case 'ALERT': objectSpecificDeleteText = 'this alert'; + break; case 'SCRIPT': objectSpecificDeleteText = 'this script'; + break; } return ( deletePart + objectSpecificDeleteText + '? This action cannot be reverted' @@ -143,12 +147,16 @@ export const DropDialog = ({ switch (mode) { case 'MIRROR': handleDropMirror(dropArgs as dropMirrorArgs, setLoading, setMsg); + break; case 'PEER': handleDropPeer(dropArgs as dropPeerArgs); + break; case 'ALERT': handleDeleteAlert(dropArgs as deleteAlertArgs); + break; case 'SCRIPT': handleDeleteScript(dropArgs as deleteScriptArgs); + break; } }; diff --git a/ui/components/PeerComponent.tsx b/ui/components/PeerComponent.tsx index 78c8db0d2d..589476d226 100644 --- a/ui/components/PeerComponent.tsx +++ b/ui/components/PeerComponent.tsx @@ -29,8 +29,7 @@ export const DBTypeToImageMapping = (peerType: DBType | string) => { case 'CLICKHOUSE': case DBType.CLICKHOUSE: return '/svgs/ch.svg'; - case DBType.EVENTHUB_GROUP: - case DBType.EVENTHUB: + case DBType.EVENTHUBS: return '/svgs/ms.svg'; case DBType.KAFKA: case 'KAFKA': diff --git a/ui/components/PeerTypeComponent.tsx b/ui/components/PeerTypeComponent.tsx index e1642cb001..b96abdbc6c 100644 --- a/ui/components/PeerTypeComponent.tsx +++ b/ui/components/PeerTypeComponent.tsx @@ -10,10 +10,8 @@ export const DBTypeToGoodText = (ptype: DBType) => { return 'PostgreSQL'; case DBType.SNOWFLAKE: return 'Snowflake'; - case DBType.EVENTHUB: + case DBType.EVENTHUBS: return 'Event Hubs'; - case DBType.EVENTHUB_GROUP: - return 'Event Hubs (Group)'; case DBType.BIGQUERY: return 'BigQuery'; case DBType.S3: @@ -28,7 +26,7 @@ export const DBTypeToGoodText = (ptype: DBType) => { return 'Kafka'; case DBType.PUBSUB: return 'PubSub'; - case DBType.UNRECOGNIZED: + default: return 'Unrecognised'; } };