Skip to content

Commit

Permalink
drop_flow: drop destination/source concurrently
Browse files Browse the repository at this point in the history
Avoids destination issues blocking source cleanup,
along with being a bit faster

Also cleanup identifier escaping in external_metadata/store.go
  • Loading branch information
serprex committed Jan 18, 2024
1 parent f504b1e commit 460cf82
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 46 deletions.
17 changes: 6 additions & 11 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,29 +668,24 @@ func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.Q
return dst.CleanupQRepFlow(config)
}

func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.ShutdownRequest) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
func (a *FlowableActivity) DropFlowSource(ctx context.Context, config *protos.ShutdownRequest) error {
srcConn, err := connectors.GetCDCPullConnector(ctx, config.SourcePeer)
if err != nil {
return fmt.Errorf("failed to get source connector: %w", err)
}
defer connectors.CloseConnector(srcConn)

return srcConn.PullFlowCleanup(config.FlowJobName)
}

func (a *FlowableActivity) DropFlowDestination(ctx context.Context, config *protos.ShutdownRequest) error {
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.DestinationPeer)
if err != nil {
return fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

err = srcConn.PullFlowCleanup(config.FlowJobName)
if err != nil {
return fmt.Errorf("failed to cleanup source: %w", err)
}
err = dstConn.SyncFlowCleanup(config.FlowJobName)
if err != nil {
return fmt.Errorf("failed to cleanup destination: %w", err)
}
return nil
return dstConn.SyncFlowCleanup(config.FlowJobName)
}

func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) {
Expand Down
6 changes: 1 addition & 5 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,5 @@ func (c *EventHubConnector) SetupNormalizedTables(
}

func (c *EventHubConnector) SyncFlowCleanup(jobName string) error {
err := c.pgMetadata.DropMetadata(jobName)
if err != nil {
return err
}
return nil
return c.pgMetadata.DropMetadata(jobName)
}
19 changes: 12 additions & 7 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log/slog"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -66,6 +67,10 @@ func (p *PostgresMetadataStore) Close() error {
return nil
}

func (p *PostgresMetadataStore) QualifyTable(table string) string {
return connpostgres.QuoteIdentifier(p.schemaName) + "." + connpostgres.QuoteIdentifier(table)
}

func (p *PostgresMetadataStore) Ping() error {
if p.pool == nil {
return fmt.Errorf("metadata db ping failed as pool does not exist")
Expand Down Expand Up @@ -106,7 +111,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error {

// create the last sync state table
_, err = p.pool.Exec(p.ctx, `
CREATE TABLE IF NOT EXISTS `+p.schemaName+`.`+lastSyncStateTableName+` (
CREATE TABLE IF NOT EXISTS `+p.QualifyTable(lastSyncStateTableName)+` (
job_name TEXT PRIMARY KEY NOT NULL,
last_offset BIGINT NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
Expand All @@ -125,7 +130,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error {
func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
rows := p.pool.QueryRow(p.ctx, `
SELECT last_offset
FROM `+p.schemaName+`.`+lastSyncStateTableName+`
FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)
var offset pgtype.Int8
Expand All @@ -147,7 +152,7 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
rows := p.pool.QueryRow(p.ctx, `
SELECT sync_batch_id
FROM `+p.schemaName+`.`+lastSyncStateTableName+`
FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)

Expand Down Expand Up @@ -179,10 +184,10 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e
// update the last offset
p.logger.Info("updating last offset", slog.Int64("offset", offset))
_, err = tx.Exec(p.ctx, `
INSERT INTO `+p.schemaName+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id)
INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id)
VALUES ($1, $2, $3)
ON CONFLICT (job_name)
DO UPDATE SET last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset),
DO UPDATE SET last_offset = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.last_offset, excluded.last_offset),
updated_at = NOW()
`, jobName, offset, 0)

Expand All @@ -205,7 +210,7 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e
func (p *PostgresMetadataStore) IncrementID(jobName string) error {
p.logger.Info("incrementing sync batch id for job")
_, err := p.pool.Exec(p.ctx, `
UPDATE `+p.schemaName+`.`+lastSyncStateTableName+`
UPDATE `+p.QualifyTable(lastSyncStateTableName)+`
SET sync_batch_id=sync_batch_id+1 WHERE job_name=$1
`, jobName)
if err != nil {
Expand All @@ -218,7 +223,7 @@ func (p *PostgresMetadataStore) IncrementID(jobName string) error {

func (p *PostgresMetadataStore) DropMetadata(jobName string) error {
_, err := p.pool.Exec(p.ctx, `
DELETE FROM `+p.schemaName+`.`+lastSyncStateTableName+`
DELETE FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)
return err
Expand Down
40 changes: 17 additions & 23 deletions flow/workflows/drop_flow.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,37 @@
package peerflow

import (
"errors"
"fmt"
"time"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/workflow"
)

// DropFlowWorkflowExecution represents the state for execution of a drop flow.
type DropFlowWorkflowExecution struct {
shutDownRequest *protos.ShutdownRequest
flowExecutionID string
logger log.Logger
}

func newDropFlowWorkflowExecution(ctx workflow.Context, req *protos.ShutdownRequest) *DropFlowWorkflowExecution {
return &DropFlowWorkflowExecution{
shutDownRequest: req,
flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
logger: workflow.GetLogger(ctx),
}
}

func DropFlowWorkflow(ctx workflow.Context, req *protos.ShutdownRequest) error {
execution := newDropFlowWorkflowExecution(ctx, req)
execution.logger.Info("performing cleanup for flow ", req.FlowJobName)
logger := workflow.GetLogger(ctx)
logger.Info("performing cleanup for flow ", req.FlowJobName)

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 1 * time.Minute,
})

ctx = workflow.WithValue(ctx, shared.FlowNameKey, req.FlowJobName)
dropSourceFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowSource, req)
dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, req)

dropFlowFuture := workflow.ExecuteActivity(ctx, flowable.DropFlow, req)
if err := dropFlowFuture.Get(ctx, nil); err != nil {
return err
}
var sourceError, destinationError error
selector := workflow.NewNamedSelector(ctx, fmt.Sprintf("%s-drop", req.FlowJobName))
selector.AddFuture(dropSourceFuture, func(f workflow.Future) {
sourceError = f.Get(ctx, nil)
})
selector.AddFuture(dropDestinationFuture, func(f workflow.Future) {
destinationError = f.Get(ctx, nil)
})
selector.Select(ctx)
selector.Select(ctx)

return nil
return errors.Join(sourceError, destinationError)
}

0 comments on commit 460cf82

Please sign in to comment.