From d6796fc0163ce1acefa25aee4378698705da1104 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 3 Jan 2024 12:28:14 -0500 Subject: [PATCH] Rename SlotSignal to SnapshotSignal and use errgroup for better error handling Also move errors to a channel to make it clearer to wait for errors. --- flow/activities/flowable.go | 6 ---- flow/activities/snapshot_activity.go | 43 ++++++++++++------------- flow/connectors/postgres/client.go | 16 ++------- flow/connectors/postgres/postgres.go | 2 +- flow/connectors/postgres/slot_signal.go | 17 +++++----- flow/e2e/postgres/qrep_flow_pg_test.go | 2 +- 6 files changed, 35 insertions(+), 51 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index d84c060f29..c3685ed915 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -35,12 +35,6 @@ type CheckConnectionResult struct { NeedsSetupMetadataTables bool } -type SlotSnapshotSignal struct { - signal connpostgres.SlotSignal - snapshotName string - connector connectors.CDCPullConnector -} - type FlowableActivity struct { CatalogPool *pgxpool.Pool Alerter *alerting.Alerter diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 3abd39c588..08673fc0e4 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -10,8 +10,15 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" "github.com/PeerDB-io/peer-flow/shared/alerting" + "golang.org/x/sync/errgroup" ) +type SlotSnapshotSignal struct { + signal connpostgres.SnapshotSignal + snapshotName string + connector connectors.CDCPullConnector +} + type SnapshotActivity struct { SnapshotConnections map[string]SlotSnapshotSignal Alerter *alerting.Alerter @@ -42,40 +49,32 @@ func (a *SnapshotActivity) SetupReplication( return nil, nil } - conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig) + errGroup, errCtx := errgroup.WithContext(ctx) + + conn, err := connectors.GetCDCPullConnector(errCtx, config.PeerConnectionConfig) if err != nil { return nil, fmt.Errorf("failed to get connector: %w", err) } - slotSignal := connpostgres.NewSlotSignal() - - replicationErr := make(chan error) - defer close(replicationErr) - - // This now happens in a goroutine - go func() { + snapshotSignal := connpostgres.NewSnapshotSignal() + errGroup.Go(func() error { pgConn := conn.(*connpostgres.PostgresConnector) - err = pgConn.SetupReplication(slotSignal, config) - if err != nil { - slog.ErrorContext(ctx, "failed to setup replication", slog.Any("error", err)) - replicationErr <- err - return - } - }() + return pgConn.SetupReplication(snapshotSignal, config) + }) slog.InfoContext(ctx, "waiting for slot to be created...") - var slotInfo connpostgres.SlotCreationResult + var slotInfo connpostgres.SnapshotCreationResult select { - case slotInfo = <-slotSignal.SlotCreated: + case slotInfo = <-snapshotSignal.SlotCreated: slog.InfoContext(ctx, fmt.Sprintf("slot '%s' created", slotInfo.SlotName)) - case err := <-replicationErr: + case err := <-snapshotSignal.Error: a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return nil, fmt.Errorf("failed to setup replication: %w", err) } - if slotInfo.Err != nil { - a.Alerter.LogFlowError(ctx, config.FlowJobName, slotInfo.Err) - return nil, fmt.Errorf("slot error: %w", slotInfo.Err) + if errWait := errGroup.Wait(); errWait != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, errWait) + return nil, fmt.Errorf("failed to setup replication: %w", errWait) } if a.SnapshotConnections == nil { @@ -83,7 +82,7 @@ func (a *SnapshotActivity) SetupReplication( } a.SnapshotConnections[config.FlowJobName] = SlotSnapshotSignal{ - signal: slotSignal, + signal: snapshotSignal, snapshotName: slotInfo.SnapshotName, connector: conn, } diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index b51741aff6..1fc93966d6 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -291,7 +291,7 @@ func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, er // createSlotAndPublication creates the replication slot and publication. func (c *PostgresConnector) createSlotAndPublication( - signal SlotSignal, + signal SnapshotSignal, s SlotCheckResult, slot string, publication string, @@ -353,10 +353,9 @@ func (c *PostgresConnector) createSlotAndPublication( } c.logger.Info(fmt.Sprintf("Created replication slot '%s'", slot)) - slotDetails := SlotCreationResult{ + slotDetails := SnapshotCreationResult{ SlotName: res.SlotName, SnapshotName: res.SnapshotName, - Err: nil, } signal.SlotCreated <- slotDetails c.logger.Info("Waiting for clone to complete") @@ -364,16 +363,7 @@ func (c *PostgresConnector) createSlotAndPublication( c.logger.Info("Clone complete") } else { c.logger.Info(fmt.Sprintf("Replication slot '%s' already exists", slot)) - var e error - if doInitialCopy { - e = errors.New("slot already exists") - } - slotDetails := SlotCreationResult{ - SlotName: slot, - SnapshotName: "", - Err: e, - } - signal.SlotCreated <- slotDetails + signal.Error <- errors.New(fmt.Sprintf("Replication slot '%s' already exists", slot)) } return nil diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index d869b22153..481a520814 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -794,7 +794,7 @@ func (c *PostgresConnector) EnsurePullability( } // SetupReplication sets up replication for the source connector. -func (c *PostgresConnector) SetupReplication(signal SlotSignal, req *protos.SetupReplicationInput) error { +func (c *PostgresConnector) SetupReplication(signal SnapshotSignal, req *protos.SetupReplicationInput) error { // ensure that the flowjob name is [a-z0-9_] only reg := regexp.MustCompile(`^[a-z0-9_]+$`) if !reg.MatchString(req.FlowJobName) { diff --git a/flow/connectors/postgres/slot_signal.go b/flow/connectors/postgres/slot_signal.go index 1dd9f8d0d7..8bf8d53f14 100644 --- a/flow/connectors/postgres/slot_signal.go +++ b/flow/connectors/postgres/slot_signal.go @@ -1,23 +1,24 @@ package connpostgres -type SlotCreationResult struct { +type SnapshotCreationResult struct { SlotName string SnapshotName string - Err error } // This struct contains two signals. // 1. SlotCreated - this can be waited on to ensure that the slot has been created. // 2. CloneComplete - which can be waited on to ensure that the clone has completed. -type SlotSignal struct { - SlotCreated chan SlotCreationResult +type SnapshotSignal struct { + SlotCreated chan SnapshotCreationResult CloneComplete chan struct{} + Error chan error } -// NewSlotSignal returns a new SlotSignal. -func NewSlotSignal() SlotSignal { - return SlotSignal{ - SlotCreated: make(chan SlotCreationResult, 1), +// NewSnapshotSignal returns a new SlotSignal. +func NewSnapshotSignal() SnapshotSignal { + return SnapshotSignal{ + SlotCreated: make(chan SnapshotCreationResult, 1), CloneComplete: make(chan struct{}, 1), + Error: make(chan error, 1), } } diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 064c28d16c..417c4aafa1 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -195,7 +195,7 @@ func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() { }, } - signal := connpostgres.NewSlotSignal() + signal := connpostgres.NewSnapshotSignal() // Moved to a go routine go func() {