diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index e8cdebbcc3..a0b2107436 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -668,29 +668,24 @@ func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.Q return dst.CleanupQRepFlow(config) } -func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.ShutdownRequest) error { - ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) +func (a *FlowableActivity) DropFlowSource(ctx context.Context, config *protos.ShutdownRequest) error { srcConn, err := connectors.GetCDCPullConnector(ctx, config.SourcePeer) if err != nil { return fmt.Errorf("failed to get source connector: %w", err) } defer connectors.CloseConnector(srcConn) + return srcConn.PullFlowCleanup(config.FlowJobName) +} + +func (a *FlowableActivity) DropFlowDestination(ctx context.Context, config *protos.ShutdownRequest) error { dstConn, err := connectors.GetCDCSyncConnector(ctx, config.DestinationPeer) if err != nil { return fmt.Errorf("failed to get destination connector: %w", err) } defer connectors.CloseConnector(dstConn) - err = srcConn.PullFlowCleanup(config.FlowJobName) - if err != nil { - return fmt.Errorf("failed to cleanup source: %w", err) - } - err = dstConn.SyncFlowCleanup(config.FlowJobName) - if err != nil { - return fmt.Errorf("failed to cleanup destination: %w", err) - } - return nil + return dstConn.SyncFlowCleanup(config.FlowJobName) } func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) { diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 9250943918..8783f21403 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -311,9 +311,5 @@ func (c *EventHubConnector) SetupNormalizedTables( } func (c *EventHubConnector) SyncFlowCleanup(jobName string) error { - err := c.pgMetadata.DropMetadata(jobName) - if err != nil { - return err - } - return nil + return c.pgMetadata.DropMetadata(jobName) } diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index b26aacf637..9d221f4774 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/connectors/utils" cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -66,6 +67,10 @@ func (p *PostgresMetadataStore) Close() error { return nil } +func (p *PostgresMetadataStore) QualifyTable(table string) string { + return connpostgres.QuoteIdentifier(p.schemaName) + "." + connpostgres.QuoteIdentifier(table) +} + func (p *PostgresMetadataStore) Ping() error { if p.pool == nil { return fmt.Errorf("metadata db ping failed as pool does not exist") @@ -106,7 +111,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error { // create the last sync state table _, err = p.pool.Exec(p.ctx, ` - CREATE TABLE IF NOT EXISTS `+p.schemaName+`.`+lastSyncStateTableName+` ( + CREATE TABLE IF NOT EXISTS `+p.QualifyTable(lastSyncStateTableName)+` ( job_name TEXT PRIMARY KEY NOT NULL, last_offset BIGINT NOT NULL, updated_at TIMESTAMP NOT NULL DEFAULT NOW(), @@ -125,7 +130,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error { func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { rows := p.pool.QueryRow(p.ctx, ` SELECT last_offset - FROM `+p.schemaName+`.`+lastSyncStateTableName+` + FROM `+p.QualifyTable(lastSyncStateTableName)+` WHERE job_name = $1 `, jobName) var offset pgtype.Int8 @@ -147,7 +152,7 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { rows := p.pool.QueryRow(p.ctx, ` SELECT sync_batch_id - FROM `+p.schemaName+`.`+lastSyncStateTableName+` + FROM `+p.QualifyTable(lastSyncStateTableName)+` WHERE job_name = $1 `, jobName) @@ -179,10 +184,10 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e // update the last offset p.logger.Info("updating last offset", slog.Int64("offset", offset)) _, err = tx.Exec(p.ctx, ` - INSERT INTO `+p.schemaName+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) + INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id) VALUES ($1, $2, $3) ON CONFLICT (job_name) - DO UPDATE SET last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset), + DO UPDATE SET last_offset = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.last_offset, excluded.last_offset), updated_at = NOW() `, jobName, offset, 0) @@ -205,7 +210,7 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e func (p *PostgresMetadataStore) IncrementID(jobName string) error { p.logger.Info("incrementing sync batch id for job") _, err := p.pool.Exec(p.ctx, ` - UPDATE `+p.schemaName+`.`+lastSyncStateTableName+` + UPDATE `+p.QualifyTable(lastSyncStateTableName)+` SET sync_batch_id=sync_batch_id+1 WHERE job_name=$1 `, jobName) if err != nil { @@ -218,7 +223,7 @@ func (p *PostgresMetadataStore) IncrementID(jobName string) error { func (p *PostgresMetadataStore) DropMetadata(jobName string) error { _, err := p.pool.Exec(p.ctx, ` - DELETE FROM `+p.schemaName+`.`+lastSyncStateTableName+` + DELETE FROM `+p.QualifyTable(lastSyncStateTableName)+` WHERE job_name = $1 `, jobName) return err diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index b21d89f198..6f314de721 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -1,43 +1,37 @@ package peerflow import ( + "errors" + "fmt" "time" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" - "go.temporal.io/sdk/log" "go.temporal.io/sdk/workflow" ) -// DropFlowWorkflowExecution represents the state for execution of a drop flow. -type DropFlowWorkflowExecution struct { - shutDownRequest *protos.ShutdownRequest - flowExecutionID string - logger log.Logger -} - -func newDropFlowWorkflowExecution(ctx workflow.Context, req *protos.ShutdownRequest) *DropFlowWorkflowExecution { - return &DropFlowWorkflowExecution{ - shutDownRequest: req, - flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID, - logger: workflow.GetLogger(ctx), - } -} - func DropFlowWorkflow(ctx workflow.Context, req *protos.ShutdownRequest) error { - execution := newDropFlowWorkflowExecution(ctx, req) - execution.logger.Info("performing cleanup for flow ", req.FlowJobName) + logger := workflow.GetLogger(ctx) + logger.Info("performing cleanup for flow ", req.FlowJobName) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 1 * time.Minute, }) ctx = workflow.WithValue(ctx, shared.FlowNameKey, req.FlowJobName) + dropSourceFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowSource, req) + dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, req) - dropFlowFuture := workflow.ExecuteActivity(ctx, flowable.DropFlow, req) - if err := dropFlowFuture.Get(ctx, nil); err != nil { - return err - } + var sourceError, destinationError error + selector := workflow.NewNamedSelector(ctx, fmt.Sprintf("%s-drop", req.FlowJobName)) + selector.AddFuture(dropSourceFuture, func(f workflow.Future) { + sourceError = f.Get(ctx, nil) + }) + selector.AddFuture(dropDestinationFuture, func(f workflow.Future) { + destinationError = f.Get(ctx, nil) + }) + selector.Select(ctx) + selector.Select(ctx) - return nil + return errors.Join(sourceError, destinationError) }