Skip to content

Commit

Permalink
fix eventhub drop mirror
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Sep 25, 2023
1 parent f335140 commit dcf0144
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 8 deletions.
4 changes: 2 additions & 2 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/cenkalti/backoff"
backoff "github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -171,7 +171,7 @@ func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowI

err := backoff.Retry(operation, expBackoff)
if err != nil {
return nil, err
return err
}

return nil
Expand Down
6 changes: 0 additions & 6 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,12 +380,6 @@ func (c *EventHubConnector) SetupNormalizedTables(
}, nil
}

func (c *EventHubConnector) SyncFlowCleanup(jobName string) error {
_, err := c.pgMetadata.pool.Exec(c.ctx, fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
metadataSchema))
return err
}

func eventDataFromString(s string) *azeventhubs.EventData {
return &azeventhubs.EventData{
Body: []byte(s),
Expand Down
8 changes: 8 additions & 0 deletions flow/connectors/eventhub/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,11 @@ func (c *EventHubConnector) incrementSyncBatchID(jobName string) error {

return nil
}

func (c *EventHubConnector) SyncFlowCleanup(jobName string) error {
_, err := c.pgMetadata.pool.Exec(c.ctx, `
DELETE FROM `+metadataSchema+`.`+lastSyncStateTableName+`
WHERE job_name = $1
`, jobName)
return err
}

0 comments on commit dcf0144

Please sign in to comment.