Skip to content

Commit

Permalink
Merge branch 'main' into bigquery/schema-change-float-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored Mar 19, 2024
2 parents 4e84728 + 9abd6b6 commit dc2ab47
Show file tree
Hide file tree
Showing 12 changed files with 234 additions and 88 deletions.
6 changes: 0 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ type CheckConnectionResult struct {
NeedsSetupMetadataTables bool
}

type SlotSnapshotSignal struct {
signal connpostgres.SlotSignal
snapshotName string
connector connectors.CDCPullConnector
}

type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
Expand Down
70 changes: 44 additions & 26 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,33 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
)

type SlotSnapshotState struct {
snapshotName string
signal connpostgres.SlotSignal
connector connectors.CDCPullConnector
}

type TxSnapshotState struct {
SnapshotName string
SupportsTIDScans bool
}

type SnapshotActivity struct {
SnapshotConnectionsMutex sync.Mutex
SnapshotConnections map[string]SlotSnapshotSignal
Alerter *alerting.Alerter
SnapshotStatesMutex sync.Mutex
SlotSnapshotStates map[string]SlotSnapshotState
TxSnapshotStates map[string]TxSnapshotState
Alerter *alerting.Alerter
}

// closes the slot signal
func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName string) error {
a.SnapshotConnectionsMutex.Lock()
defer a.SnapshotConnectionsMutex.Unlock()
a.SnapshotStatesMutex.Lock()
defer a.SnapshotStatesMutex.Unlock()

if s, ok := a.SnapshotConnections[flowJobName]; ok {
if s, ok := a.SlotSnapshotStates[flowJobName]; ok {
close(s.signal.CloneComplete)
connectors.CloseConnector(ctx, s.connector)
delete(a.SnapshotConnections, flowJobName)
delete(a.SlotSnapshotStates, flowJobName)
}
a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job")

Expand Down Expand Up @@ -94,18 +106,19 @@ func (a *SnapshotActivity) SetupReplication(
return nil, fmt.Errorf("slot error: %w", slotInfo.Err)
}

a.SnapshotConnectionsMutex.Lock()
defer a.SnapshotConnectionsMutex.Unlock()
a.SnapshotStatesMutex.Lock()
defer a.SnapshotStatesMutex.Unlock()

a.SnapshotConnections[config.FlowJobName] = SlotSnapshotSignal{
a.SlotSnapshotStates[config.FlowJobName] = SlotSnapshotState{
signal: slotSignal,
snapshotName: slotInfo.SnapshotName,
connector: conn,
}

return &protos.SetupReplicationOutput{
SlotName: slotInfo.SlotName,
SnapshotName: slotInfo.SnapshotName,
SlotName: slotInfo.SlotName,
SnapshotName: slotInfo.SnapshotName,
SupportsTidScans: slotInfo.SupportsTIDScans,
}, nil
}

Expand All @@ -116,49 +129,54 @@ func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, pee
}
defer connectors.CloseConnector(ctx, conn)

snapshotName, tx, err := conn.ExportSnapshot(ctx)
exportSnapshotOutput, tx, err := conn.ExportTxSnapshot(ctx)
if err != nil {
return err
}

sss := SlotSnapshotSignal{snapshotName: snapshotName}
a.SnapshotConnectionsMutex.Lock()
a.SnapshotConnections[sessionID] = sss
a.SnapshotConnectionsMutex.Unlock()
a.SnapshotStatesMutex.Lock()
a.TxSnapshotStates[sessionID] = TxSnapshotState{
SnapshotName: exportSnapshotOutput.SnapshotName,
SupportsTIDScans: exportSnapshotOutput.SupportsTidScans,
}
a.SnapshotStatesMutex.Unlock()

logger := activity.GetLogger(ctx)
start := time.Now()
for {
msg := fmt.Sprintf("maintaining export snapshot transaction %s", time.Since(start).Round(time.Second))
logger.Info(msg)
// this function relies on context cancellation to exit
// context is not explicitly cancelled, but workflow exit triggers an implicit cancel
// from activity.RecordBeat
activity.RecordHeartbeat(ctx, msg)
if ctx.Err() != nil {
a.SnapshotConnectionsMutex.Lock()
delete(a.SnapshotConnections, sessionID)
a.SnapshotConnectionsMutex.Unlock()
a.SnapshotStatesMutex.Lock()
delete(a.TxSnapshotStates, sessionID)
a.SnapshotStatesMutex.Unlock()
return conn.FinishExport(tx)
}
time.Sleep(time.Minute)
}
}

func (a *SnapshotActivity) WaitForExportSnapshot(ctx context.Context, sessionID string) (string, error) {
func (a *SnapshotActivity) WaitForExportSnapshot(ctx context.Context, sessionID string) (*TxSnapshotState, error) {
logger := activity.GetLogger(ctx)
attempt := 0
for {
a.SnapshotConnectionsMutex.Lock()
sss, ok := a.SnapshotConnections[sessionID]
a.SnapshotConnectionsMutex.Unlock()
a.SnapshotStatesMutex.Lock()
tsc, ok := a.TxSnapshotStates[sessionID]
a.SnapshotStatesMutex.Unlock()
if ok {
return sss.snapshotName, nil
return &tsc, nil
}
activity.RecordHeartbeat(ctx, "wait another second for snapshot export")
attempt += 1
if attempt > 2 {
logger.Info("waiting on snapshot export", slog.Int("attempt", attempt))
}
if err := ctx.Err(); err != nil {
return "", err
return nil, err
}
time.Sleep(time.Second)
}
Expand Down
6 changes: 4 additions & 2 deletions flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Work
})

w.RegisterWorkflow(peerflow.SnapshotFlowWorkflow)
// explicitly not initializing mutex, in line with design
w.RegisterActivity(&activities.SnapshotActivity{
SnapshotConnections: make(map[string]activities.SlotSnapshotSignal),
Alerter: alerting.NewAlerter(context.Background(), conn),
SlotSnapshotStates: make(map[string]activities.SlotSnapshotState),
TxSnapshotStates: make(map[string]activities.TxSnapshotState),
Alerter: alerting.NewAlerter(context.Background(), conn),
})

return c, w, nil
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type CDCPullConnector interface {

// For InitialSnapshotOnly correctness without replication slot
// `any` is for returning transaction if necessary
ExportSnapshot(context.Context) (string, any, error)
ExportTxSnapshot(context.Context) (*protos.ExportTxSnapshotOutput, any, error)

// `any` from ExportSnapshot passed here when done, allowing transaction to commit
FinishExport(any) error
Expand Down
23 changes: 15 additions & 8 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,12 @@ func (c *PostgresConnector) createSlotAndPublication(

c.logger.Warn(fmt.Sprintf("Creating replication slot '%s'", slot))

_, err = conn.Exec(ctx, "SET idle_in_transaction_session_timeout = 0")
_, err = conn.Exec(ctx, "SET LOCAL idle_in_transaction_session_timeout=0")
if err != nil {
return fmt.Errorf("[slot] error setting idle_in_transaction_session_timeout: %w", err)
}

_, err = conn.Exec(ctx, "SET lock_timeout = 0")
_, err = conn.Exec(ctx, "SET LOCAL lock_timeout=0")
if err != nil {
return fmt.Errorf("[slot] error setting lock_timeout: %w", err)
}
Expand All @@ -388,11 +388,17 @@ func (c *PostgresConnector) createSlotAndPublication(
return fmt.Errorf("[slot] error creating replication slot: %w", err)
}

ok, _, err := c.MajorVersionCheck(ctx, POSTGRES_13)
if err != nil {
return fmt.Errorf("[slot] error getting PG version: %w", err)
}

c.logger.Info(fmt.Sprintf("Created replication slot '%s'", slot))
slotDetails := SlotCreationResult{
SlotName: res.SlotName,
SnapshotName: res.SnapshotName,
Err: nil,
SlotName: res.SlotName,
SnapshotName: res.SnapshotName,
Err: nil,
SupportsTIDScans: ok,
}
signal.SlotCreated <- slotDetails
c.logger.Info("Waiting for clone to complete")
Expand All @@ -405,9 +411,10 @@ func (c *PostgresConnector) createSlotAndPublication(
e = errors.New("slot already exists")
}
slotDetails := SlotCreationResult{
SlotName: slot,
SnapshotName: "",
Err: e,
SlotName: slot,
SnapshotName: "",
Err: e,
SupportsTIDScans: false,
}
signal.SlotCreated <- slotDetails
}
Expand Down
39 changes: 29 additions & 10 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,30 +941,49 @@ func (c *PostgresConnector) EnsurePullability(
return &protos.EnsurePullabilityBatchOutput{TableIdentifierMapping: tableIdentifierMapping}, nil
}

func (c *PostgresConnector) ExportSnapshot(ctx context.Context) (string, any, error) {
func (c *PostgresConnector) ExportTxSnapshot(ctx context.Context) (*protos.ExportTxSnapshotOutput, any, error) {
var snapshotName string
tx, err := c.conn.Begin(ctx)
if err != nil {
return "", nil, err
return nil, nil, err
}
txNeedsRollback := true
defer func() {
if txNeedsRollback {
rollbackCtx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancelFunc()
err := tx.Rollback(rollbackCtx)
if err != pgx.ErrTxClosed {
c.logger.Error("error while rolling back transaction for snapshot export")
}
}
}()

_, err = tx.Exec(ctx, "SET LOCAL idle_in_transaction_session_timeout=0")
if err != nil {
return nil, nil, fmt.Errorf("[export-snapshot] error setting idle_in_transaction_session_timeout: %w", err)
}

_, err = tx.Exec(ctx, "SET idle_in_transaction_session_timeout = 0")
_, err = tx.Exec(ctx, "SET LOCAL lock_timeout=0")
if err != nil {
return "", nil, fmt.Errorf("[export-snapshot] error setting idle_in_transaction_session_timeout: %w", err)
return nil, nil, fmt.Errorf("[export-snapshot] error setting lock_timeout: %w", err)
}

_, err = tx.Exec(ctx, "SET lock_timeout = 0")
ok, _, err := c.MajorVersionCheck(ctx, POSTGRES_13)
if err != nil {
return "", nil, fmt.Errorf("[export-snapshot] error setting lock_timeout: %w", err)
return nil, nil, fmt.Errorf("[export-snapshot] error getting PG version: %w", err)
}

err = tx.QueryRow(ctx, "select pg_export_snapshot()").Scan(&snapshotName)
err = tx.QueryRow(ctx, "SELECT pg_export_snapshot()").Scan(&snapshotName)
if err != nil {
_ = tx.Rollback(ctx)
return "", nil, err
return nil, nil, err
}
txNeedsRollback = false

return snapshotName, tx, err
return &protos.ExportTxSnapshotOutput{
SnapshotName: snapshotName,
SupportsTidScans: ok,
}, tx, err
}

func (c *PostgresConnector) FinishExport(tx any) error {
Expand Down
7 changes: 4 additions & 3 deletions flow/connectors/postgres/slot_signal.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package connpostgres

type SlotCreationResult struct {
SlotName string
SnapshotName string
Err error
SlotName string
SnapshotName string
SupportsTIDScans bool
Err error
}

// This struct contains two signals.
Expand Down
Loading

0 comments on commit dc2ab47

Please sign in to comment.