From dcf0144b14439c0a0e49878b7557c78bb59229ea Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Mon, 25 Sep 2023 09:21:44 -0400 Subject: [PATCH] fix eventhub drop mirror --- flow/cmd/handler.go | 4 ++-- flow/connectors/eventhub/eventhub.go | 6 ------ flow/connectors/eventhub/metadata.go | 8 ++++++++ 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index afd884c0aa..b059161cc8 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -8,7 +8,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" - "github.com/cenkalti/backoff" + backoff "github.com/cenkalti/backoff/v4" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgxpool" log "github.com/sirupsen/logrus" @@ -171,7 +171,7 @@ func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowI err := backoff.Retry(operation, expBackoff) if err != nil { - return nil, err + return err } return nil diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 6364dd7c66..158323e1a0 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -380,12 +380,6 @@ func (c *EventHubConnector) SetupNormalizedTables( }, nil } -func (c *EventHubConnector) SyncFlowCleanup(jobName string) error { - _, err := c.pgMetadata.pool.Exec(c.ctx, fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", - metadataSchema)) - return err -} - func eventDataFromString(s string) *azeventhubs.EventData { return &azeventhubs.EventData{ Body: []byte(s), diff --git a/flow/connectors/eventhub/metadata.go b/flow/connectors/eventhub/metadata.go index 72410bce62..178ebbb076 100644 --- a/flow/connectors/eventhub/metadata.go +++ b/flow/connectors/eventhub/metadata.go @@ -226,3 +226,11 @@ func (c *EventHubConnector) incrementSyncBatchID(jobName string) error { return nil } + +func (c *EventHubConnector) SyncFlowCleanup(jobName string) error { + _, err := c.pgMetadata.pool.Exec(c.ctx, ` + DELETE FROM `+metadataSchema+`.`+lastSyncStateTableName+` + WHERE job_name = $1 + `, jobName) + return err +}