Skip to content

Commit

Permalink
Rename SlotSignal to SnapshotSignal and use errgroup for better error
Browse files Browse the repository at this point in the history
handling

Also move errors to a channel to make it clearer to wait for errors.
  • Loading branch information
iskakaushik committed Jan 3, 2024
1 parent 4bc7879 commit d6796fc
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 51 deletions.
6 changes: 0 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ type CheckConnectionResult struct {
NeedsSetupMetadataTables bool
}

type SlotSnapshotSignal struct {
signal connpostgres.SlotSignal
snapshotName string
connector connectors.CDCPullConnector
}

type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
Expand Down
43 changes: 21 additions & 22 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
"golang.org/x/sync/errgroup"
)

type SlotSnapshotSignal struct {
signal connpostgres.SnapshotSignal
snapshotName string
connector connectors.CDCPullConnector
}

type SnapshotActivity struct {
SnapshotConnections map[string]SlotSnapshotSignal
Alerter *alerting.Alerter
Expand Down Expand Up @@ -42,48 +49,40 @@ func (a *SnapshotActivity) SetupReplication(
return nil, nil
}

conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
errGroup, errCtx := errgroup.WithContext(ctx)

conn, err := connectors.GetCDCPullConnector(errCtx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
}

slotSignal := connpostgres.NewSlotSignal()

replicationErr := make(chan error)
defer close(replicationErr)

// This now happens in a goroutine
go func() {
snapshotSignal := connpostgres.NewSnapshotSignal()
errGroup.Go(func() error {
pgConn := conn.(*connpostgres.PostgresConnector)
err = pgConn.SetupReplication(slotSignal, config)
if err != nil {
slog.ErrorContext(ctx, "failed to setup replication", slog.Any("error", err))
replicationErr <- err
return
}
}()
return pgConn.SetupReplication(snapshotSignal, config)
})

slog.InfoContext(ctx, "waiting for slot to be created...")
var slotInfo connpostgres.SlotCreationResult
var slotInfo connpostgres.SnapshotCreationResult
select {
case slotInfo = <-slotSignal.SlotCreated:
case slotInfo = <-snapshotSignal.SlotCreated:
slog.InfoContext(ctx, fmt.Sprintf("slot '%s' created", slotInfo.SlotName))
case err := <-replicationErr:
case err := <-snapshotSignal.Error:
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to setup replication: %w", err)
}

if slotInfo.Err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, slotInfo.Err)
return nil, fmt.Errorf("slot error: %w", slotInfo.Err)
if errWait := errGroup.Wait(); errWait != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, errWait)
return nil, fmt.Errorf("failed to setup replication: %w", errWait)
}

if a.SnapshotConnections == nil {
a.SnapshotConnections = make(map[string]SlotSnapshotSignal)
}

a.SnapshotConnections[config.FlowJobName] = SlotSnapshotSignal{
signal: slotSignal,
signal: snapshotSignal,
snapshotName: slotInfo.SnapshotName,
connector: conn,
}
Expand Down
16 changes: 3 additions & 13 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, er

// createSlotAndPublication creates the replication slot and publication.
func (c *PostgresConnector) createSlotAndPublication(
signal SlotSignal,
signal SnapshotSignal,
s SlotCheckResult,
slot string,
publication string,
Expand Down Expand Up @@ -353,27 +353,17 @@ func (c *PostgresConnector) createSlotAndPublication(
}

c.logger.Info(fmt.Sprintf("Created replication slot '%s'", slot))
slotDetails := SlotCreationResult{
slotDetails := SnapshotCreationResult{
SlotName: res.SlotName,
SnapshotName: res.SnapshotName,
Err: nil,
}
signal.SlotCreated <- slotDetails
c.logger.Info("Waiting for clone to complete")
<-signal.CloneComplete
c.logger.Info("Clone complete")
} else {
c.logger.Info(fmt.Sprintf("Replication slot '%s' already exists", slot))
var e error
if doInitialCopy {
e = errors.New("slot already exists")
}
slotDetails := SlotCreationResult{
SlotName: slot,
SnapshotName: "",
Err: e,
}
signal.SlotCreated <- slotDetails
signal.Error <- errors.New(fmt.Sprintf("Replication slot '%s' already exists", slot))
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ func (c *PostgresConnector) EnsurePullability(
}

// SetupReplication sets up replication for the source connector.
func (c *PostgresConnector) SetupReplication(signal SlotSignal, req *protos.SetupReplicationInput) error {
func (c *PostgresConnector) SetupReplication(signal SnapshotSignal, req *protos.SetupReplicationInput) error {
// ensure that the flowjob name is [a-z0-9_] only
reg := regexp.MustCompile(`^[a-z0-9_]+$`)
if !reg.MatchString(req.FlowJobName) {
Expand Down
17 changes: 9 additions & 8 deletions flow/connectors/postgres/slot_signal.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
package connpostgres

type SlotCreationResult struct {
type SnapshotCreationResult struct {
SlotName string
SnapshotName string
Err error
}

// This struct contains two signals.
// 1. SlotCreated - this can be waited on to ensure that the slot has been created.
// 2. CloneComplete - which can be waited on to ensure that the clone has completed.
type SlotSignal struct {
SlotCreated chan SlotCreationResult
type SnapshotSignal struct {
SlotCreated chan SnapshotCreationResult
CloneComplete chan struct{}
Error chan error
}

// NewSlotSignal returns a new SlotSignal.
func NewSlotSignal() SlotSignal {
return SlotSignal{
SlotCreated: make(chan SlotCreationResult, 1),
// NewSnapshotSignal returns a new SlotSignal.
func NewSnapshotSignal() SnapshotSignal {
return SnapshotSignal{
SlotCreated: make(chan SnapshotCreationResult, 1),
CloneComplete: make(chan struct{}, 1),
Error: make(chan error, 1),
}
}
2 changes: 1 addition & 1 deletion flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() {
},
}

signal := connpostgres.NewSlotSignal()
signal := connpostgres.NewSnapshotSignal()

// Moved to a go routine
go func() {
Expand Down

0 comments on commit d6796fc

Please sign in to comment.