Skip to content

Commit

Permalink
Eventhub peer refactor (#1563)
Browse files Browse the repository at this point in the history
## BREAKING CHANGE FOR EVENTHUBS ❗❗
### Eventhub Peer Redesign
This PR performs a redesign of our Eventhubs peer architecture on the
core engine side. Create Peer UI is going to be a follow up PR to this.

`Eventhub` and `Eventhub Group` peers are **removed.** However
`EventhubConfig` and `EventhubGroupConfig` are still in use.
A new peer - `Eventhubs` is the only peer related to this queue.
#### Current structure of Eventhubs peer
The `Eventhubs` peer is an extension of the earlier `EventhubGroup`
peer, with it now having a list of `EventhubConfig`s.

You can create an `Eventhubs` peer via the SQL Layer as follows:
```sql
CREATE PEER eventhubs_peer_x FROM EVENTHUBS WITH (
eventhubs = '[{"subscription_id":"my-sub-id",
"resource_group":"myresource",
"namespace":"mynamespace-1",
"location":"eastus",
"partition_count":5,
"message_retention_in_days":2
},
{"subscription_id":"my-sub-id",
"resource_group":"myresource",
"namespace":"mynamespace-2",
"location":"eastus",
"partition_count":3,
"message_retention_in_days":1
}
]',
unnest_columns = [] -- can omit this field too
);
```

The UI has had changes made to it to reflect the above redesign, with no
displays showing the earlier two peers.

<img width="1185" alt="Screenshot 2024-04-02 at 6 59 14 PM"
src="https://github.com/PeerDB-io/peerdb/assets/65964360/36d9dbfc-b271-4340-87c0-3a31a03abccf">

#### Functional testing of this PR
- Create peer command for `Eventhubs` tested via SQL Layer
- Create mirror with the redesigned peer tested via UI
  • Loading branch information
Amogh-Bharadwaj authored Apr 2, 2024
1 parent 9b926ee commit c1d10e5
Show file tree
Hide file tree
Showing 19 changed files with 94 additions and 137 deletions.
2 changes: 1 addition & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (a *FlowableActivity) SyncFlow(
var res *model.SyncResponse
errGroup.Go(func() error {
syncBatchID, err := dstConn.GetLastSyncBatchID(errCtx, flowName)
if err != nil && config.Destination.Type != protos.DBType_EVENTHUB {
if err != nil && config.Destination.Type != protos.DBType_EVENTHUBS {
return err
}
syncBatchID += 1
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func NewEventHubConnector(
config: config,
creds: defaultAzureCreds,
hubManager: hubManager,
logger: logger,
}, nil
}

Expand Down Expand Up @@ -135,7 +136,7 @@ func (c *EventHubConnector) processBatch(
return 0, err
}

ehConfig, ok := c.hubManager.peerConfig.Get(destination.PeerName)
ehConfig, ok := c.hubManager.namespaceToEventhubMap.Get(destination.NamespaceName)
if !ok {
c.logger.Error("failed to get eventhub config", slog.Any("error", err))
return 0, err
Expand Down
18 changes: 9 additions & 9 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (

type EventHubManager struct {
creds *azidentity.DefaultAzureCredential
// eventhub peer name -> config
peerConfig cmap.ConcurrentMap[string, *protos.EventHubConfig]
// eventhub namespace name -> config
namespaceToEventhubMap cmap.ConcurrentMap[string, *protos.EventHubConfig]
// eventhub name -> client
hubs sync.Map
}
Expand All @@ -32,22 +32,22 @@ func NewEventHubManager(
creds *azidentity.DefaultAzureCredential,
groupConfig *protos.EventHubGroupConfig,
) *EventHubManager {
peerConfig := cmap.New[*protos.EventHubConfig]()
namespaceToEventhubMap := cmap.New[*protos.EventHubConfig]()

for name, config := range groupConfig.Eventhubs {
peerConfig.Set(name, config)
namespaceToEventhubMap.Set(name, config)
}

return &EventHubManager{
creds: creds,
peerConfig: peerConfig,
creds: creds,
namespaceToEventhubMap: namespaceToEventhubMap,
}
}

func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) (
*azeventhubs.ProducerClient, error,
) {
ehConfig, ok := m.peerConfig.Get(name.PeerName)
ehConfig, ok := m.namespaceToEventhubMap.Get(name.NamespaceName)
if !ok {
return nil, fmt.Errorf("eventhub '%s' not configured", name.Eventhub)
}
Expand Down Expand Up @@ -152,9 +152,9 @@ func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, destination

// EnsureEventHubExists ensures that the eventhub exists.
func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedEventhub) error {
cfg, ok := m.peerConfig.Get(name.PeerName)
cfg, ok := m.namespaceToEventhubMap.Get(name.NamespaceName)
if !ok {
return fmt.Errorf("eventhub peer '%s' not configured", name.PeerName)
return fmt.Errorf("eventhub namespace '%s' not registered", name.NamespaceName)
}

hubClient, err := m.getEventHubMgmtClient(cfg.SubscriptionId)
Expand Down
12 changes: 6 additions & 6 deletions flow/connectors/eventhub/scoped_eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@ import (
"strings"
)

// Scoped eventhub is of the form peer_name.eventhub_name.partition_column.partition_key_value
// Scoped eventhub is of the form namespace.eventhub_name.partition_column.partition_key_value
// partition_column is the column in the table that is used to determine
// the partition key for the eventhub. Partition value is one such value of that column.
type ScopedEventhub struct {
PeerName string
NamespaceName string
Eventhub string
PartitionKeyColumn string
PartitionKeyValue string
}

func NewScopedEventhub(dstTableName string) (ScopedEventhub, error) {
// split by dot, the model is peername.eventhub.partition_key_column
// split by dot, the model is namespace.eventhub.partition_key_column
parts := strings.Split(dstTableName, ".")

if len(parts) != 3 {
Expand All @@ -27,7 +27,7 @@ func NewScopedEventhub(dstTableName string) (ScopedEventhub, error) {
eventhubPart := strings.Trim(parts[1], `"`)
partitionPart := strings.Trim(parts[2], `"`)
return ScopedEventhub{
PeerName: parts[0],
NamespaceName: parts[0],
Eventhub: eventhubPart,
PartitionKeyColumn: partitionPart,
}, nil
Expand All @@ -38,13 +38,13 @@ func (s *ScopedEventhub) SetPartitionValue(value string) {
}

func (s ScopedEventhub) Equals(other ScopedEventhub) bool {
return s.PeerName == other.PeerName &&
return s.NamespaceName == other.NamespaceName &&
s.Eventhub == other.Eventhub &&
s.PartitionKeyColumn == other.PartitionKeyColumn &&
s.PartitionKeyValue == other.PartitionKeyValue
}

// ToString returns the string representation of the ScopedEventhub
func (s ScopedEventhub) ToString() string {
return fmt.Sprintf("%s.%s.%s.%s", s.PeerName, s.Eventhub, s.PartitionKeyColumn, s.PartitionKeyValue)
return fmt.Sprintf("%s.%s.%s.%s", s.NamespaceName, s.Eventhub, s.PartitionKeyColumn, s.PartitionKeyValue)
}
4 changes: 2 additions & 2 deletions nexus/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

130 changes: 37 additions & 93 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,8 @@ impl<'a> StatementAnalyzer for PeerExistanceAnalyzer<'a> {
/// PeerDDLAnalyzer is a statement analyzer that checks if the given
/// statement is a PeerDB DDL statement. If it is, it returns the type of
/// DDL statement.
pub struct PeerDDLAnalyzer<'a> {
peers: &'a HashMap<String, Peer>,
}

impl<'a> PeerDDLAnalyzer<'a> {
pub fn new(peers: &'a HashMap<String, Peer>) -> Self {
Self { peers }
}
}
#[derive(Default)]
pub struct PeerDDLAnalyzer;

#[derive(Debug, Clone)]
pub enum PeerDDL {
Expand Down Expand Up @@ -139,7 +132,7 @@ pub enum PeerDDL {
},
}

impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
impl StatementAnalyzer for PeerDDLAnalyzer {
type Output = Option<PeerDDL>;

fn analyze(&self, statement: &Statement) -> anyhow::Result<Self::Output> {
Expand All @@ -151,7 +144,7 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
with_options,
} => {
let db_type = DbType::from(peer_type.clone());
let config = parse_db_options(self.peers, db_type, with_options)?;
let config = parse_db_options(db_type, with_options)?;
let peer = Peer {
name: peer_name.to_string().to_lowercase(),
r#type: db_type as i32,
Expand Down Expand Up @@ -495,11 +488,7 @@ impl StatementAnalyzer for PeerCursorAnalyzer {
}
}

fn parse_db_options(
peers: &HashMap<String, Peer>,
db_type: DbType,
with_options: &[SqlOption],
) -> anyhow::Result<Option<Config>> {
fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Result<Option<Config>> {
let mut opts: HashMap<&str, &str> = HashMap::with_capacity(with_options.len());
for opt in with_options {
let val = match opt.value {
Expand Down Expand Up @@ -664,45 +653,6 @@ fn parse_db_options(
};
Config::PostgresConfig(postgres_config)
}
DbType::Eventhub => {
let subscription_id = opts
.get("subscription_id")
.map(|s| s.to_string())
.unwrap_or_default();

// partition_count default to 3 if not set, parse as int
let partition_count = opts
.get("partition_count")
.unwrap_or(&"3")
.parse::<u32>()
.context("unable to parse partition_count as valid int")?;

// message_retention_in_days default to 7 if not set, parse as int
let message_retention_in_days = opts
.get("message_retention_in_days")
.unwrap_or(&"7")
.parse::<u32>()
.context("unable to parse message_retention_in_days as valid int")?;

let eventhub_config = EventHubConfig {
namespace: opts
.get("namespace")
.context("no namespace specified")?
.to_string(),
resource_group: opts
.get("resource_group")
.context("no resource group specified")?
.to_string(),
location: opts
.get("location")
.context("location not specified")?
.to_string(),
subscription_id,
partition_count,
message_retention_in_days,
};
Config::EventhubConfig(eventhub_config)
}
DbType::S3 => {
let s3_config = S3Config {
url: opts
Expand Down Expand Up @@ -738,44 +688,6 @@ fn parse_db_options(
};
Config::SqlserverConfig(sqlserver_config)
}
DbType::EventhubGroup => {
// split comma separated list of columns and trim
let unnest_columns = opts
.get("unnest_columns")
.map(|columns| {
columns
.split(',')
.map(|column| column.trim().to_string())
.collect::<Vec<_>>()
})
.unwrap_or_default();

let mut eventhubs: HashMap<String, EventHubConfig> = HashMap::new();
for (key, _) in opts {
if matches!(key, "metadata_db" | "unnest_columns") {
continue;
}

// check if peers contains key and if it does
// then add it to the eventhubs hashmap, if not error
if let Some(peer) = peers.get(key) {
let eventhub_config = peer.config.as_ref().unwrap();
if let Config::EventhubConfig(eventhub_config) = eventhub_config {
eventhubs.insert(key.to_string(), eventhub_config.clone());
} else {
anyhow::bail!("Peer '{}' is not an eventhub", key);
}
} else {
anyhow::bail!("Peer '{}' does not exist", key);
}
}

let eventhub_group_config = pt::peerdb_peers::EventHubGroupConfig {
eventhubs,
unnest_columns,
};
Config::EventhubGroupConfig(eventhub_group_config)
}
DbType::Clickhouse => {
let clickhouse_config = ClickhouseConfig {
host: opts.get("host").context("no host specified")?.to_string(),
Expand Down Expand Up @@ -907,5 +819,37 @@ fn parse_db_options(
};
Config::PubsubConfig(ps_config)
}
DbType::Eventhubs => {
let unnest_columns = opts
.get("unnest_columns")
.map(|columns| {
columns
.split(',')
.map(|column| column.trim().to_string())
.collect::<Vec<_>>()
})
.unwrap_or_default();

let eventhubs: Vec<EventHubConfig> = serde_json::from_str(
opts.get("eventhubs")
.context("no eventhubs specified")?
.to_string()
.as_str(),
)
.context("unable to parse eventhubs as valid json")?;

let mut eventhubs_map: HashMap<String, EventHubConfig> = HashMap::new();
for eventhub in eventhubs {
eventhubs_map.insert(eventhub.namespace.clone(), eventhub);
}

let eventhub_group_config = pt::peerdb_peers::EventHubGroupConfig {
eventhubs: eventhubs_map,
unnest_columns,
};

println!("eventhub_group_config: {:?}", eventhub_group_config);
Config::EventhubGroupConfig(eventhub_group_config)
}
}))
}
7 changes: 1 addition & 6 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,6 @@ impl Catalog {
pt::peerdb_peers::MongoConfig::decode(options).with_context(err)?;
Config::MongoConfig(mongo_config)
}
DbType::Eventhub => {
let eventhub_config =
pt::peerdb_peers::EventHubConfig::decode(options).with_context(err)?;
Config::EventhubConfig(eventhub_config)
}
DbType::Postgres => {
let postgres_config =
pt::peerdb_peers::PostgresConfig::decode(options).with_context(err)?;
Expand All @@ -298,7 +293,7 @@ impl Catalog {
pt::peerdb_peers::SqlServerConfig::decode(options).with_context(err)?;
Config::SqlserverConfig(sqlserver_config)
}
DbType::EventhubGroup => {
DbType::Eventhubs => {
let eventhub_group_config =
pt::peerdb_peers::EventHubGroupConfig::decode(options).with_context(err)?;
Config::EventhubGroupConfig(eventhub_group_config)
Expand Down
3 changes: 1 addition & 2 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use catalog::WorkflowDetails;
use pt::{
flow_model::{FlowJob, QRepFlowJob},
peerdb_flow::{QRepWriteMode, QRepWriteType},
peerdb_route,
tonic,
peerdb_route, tonic,
};
use serde_json::Value;
use tonic_health::pb::health_client;
Expand Down
2 changes: 1 addition & 1 deletion nexus/parser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl NexusStatement {
stmt: &Statement,
) -> PgWireResult<Self> {
let ddl = {
let pdl: PeerDDLAnalyzer = PeerDDLAnalyzer::new(&peers);
let pdl: PeerDDLAnalyzer = PeerDDLAnalyzer;
pdl.analyze(stmt).map_err(|e| {
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
Expand Down
3 changes: 1 addition & 2 deletions nexus/pt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ impl From<PeerType> for DbType {
PeerType::Mongo => DbType::Mongo,
PeerType::Snowflake => DbType::Snowflake,
PeerType::Postgres => DbType::Postgres,
PeerType::EventHub => DbType::Eventhub,
PeerType::S3 => DbType::S3,
PeerType::SQLServer => DbType::Sqlserver,
PeerType::EventHubGroup => DbType::EventhubGroup,
PeerType::Kafka => DbType::Kafka,
PeerType::Eventhubs => DbType::Eventhubs,
}
}
}
3 changes: 1 addition & 2 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ impl NexusBackend {

fn is_peer_validity_supported(peer_type: i32) -> bool {
let unsupported_peer_types = [
4, // EVENTHUB
7, // EVENTHUB_GROUP
11, // EVENTHUBS
];
!unsupported_peer_types.contains(&peer_type)
}
Expand Down
Loading

0 comments on commit c1d10e5

Please sign in to comment.