From ffa556d5abd0da8124371239c9c21b186c29fd69 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Thu, 7 Sep 2023 10:07:49 +0530 Subject: [PATCH] fixing Eventhub fpr CDC sync --- flow/connectors/core.go | 3 +++ flow/connectors/eventhub/eventhub.go | 12 ++++++++++++ 2 files changed, 15 insertions(+) diff --git a/flow/connectors/core.go b/flow/connectors/core.go index dbc8171265..9810fe6672 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -5,6 +5,7 @@ import ( "errors" connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery" + conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" conns3 "github.com/PeerDB-io/peer-flow/connectors/s3" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" @@ -139,6 +140,8 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig()) case *protos.Peer_SnowflakeConfig: return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig()) + case *protos.Peer_EventhubConfig: + return conneventhub.NewEventHubConnector(ctx, config.GetEventhubConfig()) default: return nil, ErrUnsupportedFunctionality } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 6ca63c0251..4376036cba 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -323,3 +323,15 @@ func (c *EventHubConnector) getEventHubMgmtClient() (*armeventhub.EventHubsClien return hubClient, nil } + +func (c *EventHubConnector) SetupNormalizedTables( + req *protos.SetupNormalizedTableBatchInput) ( + *protos.SetupNormalizedTableBatchOutput, error) { + log.Infof("setting up tables for Eventhub is a no-op") + return nil, nil +} + +func (c *EventHubConnector) SyncFlowCleanup(jobName string) error { + // TODO (kaushik): this has to be implemented for DROP PEER support. + panic("sync flow cleanup not implemented for event hub") +}