diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 4924627fd9..68f764a890 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -35,12 +35,6 @@ type CheckConnectionResult struct { NeedsSetupMetadataTables bool } -type SlotSnapshotSignal struct { - signal connpostgres.SlotSignal - snapshotName string - connector connectors.CDCPullConnector -} - type FlowableActivity struct { CatalogPool *pgxpool.Pool Alerter *alerting.Alerter diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 1cd4dac3ef..d5aeb50ed7 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -16,21 +16,33 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) +type SlotSnapshotState struct { + snapshotName string + signal connpostgres.SlotSignal + connector connectors.CDCPullConnector +} + +type TxSnapshotState struct { + SnapshotName string + SupportsTIDScans bool +} + type SnapshotActivity struct { - SnapshotConnectionsMutex sync.Mutex - SnapshotConnections map[string]SlotSnapshotSignal - Alerter *alerting.Alerter + SnapshotStatesMutex sync.Mutex + SlotSnapshotStates map[string]SlotSnapshotState + TxSnapshotStates map[string]TxSnapshotState + Alerter *alerting.Alerter } // closes the slot signal func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName string) error { - a.SnapshotConnectionsMutex.Lock() - defer a.SnapshotConnectionsMutex.Unlock() + a.SnapshotStatesMutex.Lock() + defer a.SnapshotStatesMutex.Unlock() - if s, ok := a.SnapshotConnections[flowJobName]; ok { + if s, ok := a.SlotSnapshotStates[flowJobName]; ok { close(s.signal.CloneComplete) connectors.CloseConnector(ctx, s.connector) - delete(a.SnapshotConnections, flowJobName) + delete(a.SlotSnapshotStates, flowJobName) } a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job") @@ -94,18 +106,19 @@ func (a *SnapshotActivity) SetupReplication( return nil, fmt.Errorf("slot error: %w", slotInfo.Err) } - a.SnapshotConnectionsMutex.Lock() - defer a.SnapshotConnectionsMutex.Unlock() + a.SnapshotStatesMutex.Lock() + defer a.SnapshotStatesMutex.Unlock() - a.SnapshotConnections[config.FlowJobName] = SlotSnapshotSignal{ + a.SlotSnapshotStates[config.FlowJobName] = SlotSnapshotState{ signal: slotSignal, snapshotName: slotInfo.SnapshotName, connector: conn, } return &protos.SetupReplicationOutput{ - SlotName: slotInfo.SlotName, - SnapshotName: slotInfo.SnapshotName, + SlotName: slotInfo.SlotName, + SnapshotName: slotInfo.SnapshotName, + SupportsTidScans: slotInfo.SupportsTIDScans, }, nil } @@ -116,41 +129,46 @@ func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, pee } defer connectors.CloseConnector(ctx, conn) - snapshotName, tx, err := conn.ExportSnapshot(ctx) + exportSnapshotOutput, tx, err := conn.ExportTxSnapshot(ctx) if err != nil { return err } - sss := SlotSnapshotSignal{snapshotName: snapshotName} - a.SnapshotConnectionsMutex.Lock() - a.SnapshotConnections[sessionID] = sss - a.SnapshotConnectionsMutex.Unlock() + a.SnapshotStatesMutex.Lock() + a.TxSnapshotStates[sessionID] = TxSnapshotState{ + SnapshotName: exportSnapshotOutput.SnapshotName, + SupportsTIDScans: exportSnapshotOutput.SupportsTidScans, + } + a.SnapshotStatesMutex.Unlock() logger := activity.GetLogger(ctx) start := time.Now() for { msg := fmt.Sprintf("maintaining export snapshot transaction %s", time.Since(start).Round(time.Second)) logger.Info(msg) + // this function relies on context cancellation to exit + // context is not explicitly cancelled, but workflow exit triggers an implicit cancel + // from activity.RecordBeat activity.RecordHeartbeat(ctx, msg) if ctx.Err() != nil { - a.SnapshotConnectionsMutex.Lock() - delete(a.SnapshotConnections, sessionID) - a.SnapshotConnectionsMutex.Unlock() + a.SnapshotStatesMutex.Lock() + delete(a.TxSnapshotStates, sessionID) + a.SnapshotStatesMutex.Unlock() return conn.FinishExport(tx) } time.Sleep(time.Minute) } } -func (a *SnapshotActivity) WaitForExportSnapshot(ctx context.Context, sessionID string) (string, error) { +func (a *SnapshotActivity) WaitForExportSnapshot(ctx context.Context, sessionID string) (*TxSnapshotState, error) { logger := activity.GetLogger(ctx) attempt := 0 for { - a.SnapshotConnectionsMutex.Lock() - sss, ok := a.SnapshotConnections[sessionID] - a.SnapshotConnectionsMutex.Unlock() + a.SnapshotStatesMutex.Lock() + tsc, ok := a.TxSnapshotStates[sessionID] + a.SnapshotStatesMutex.Unlock() if ok { - return sss.snapshotName, nil + return &tsc, nil } activity.RecordHeartbeat(ctx, "wait another second for snapshot export") attempt += 1 @@ -158,7 +176,7 @@ func (a *SnapshotActivity) WaitForExportSnapshot(ctx context.Context, sessionID logger.Info("waiting on snapshot export", slog.Int("attempt", attempt)) } if err := ctx.Err(); err != nil { - return "", err + return nil, err } time.Sleep(time.Second) } diff --git a/flow/cmd/snapshot_worker.go b/flow/cmd/snapshot_worker.go index d5b9d4b51f..b7418e9ceb 100644 --- a/flow/cmd/snapshot_worker.go +++ b/flow/cmd/snapshot_worker.go @@ -66,9 +66,11 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Work }) w.RegisterWorkflow(peerflow.SnapshotFlowWorkflow) + // explicitly not initializing mutex, in line with design w.RegisterActivity(&activities.SnapshotActivity{ - SnapshotConnections: make(map[string]activities.SlotSnapshotSignal), - Alerter: alerting.NewAlerter(context.Background(), conn), + SlotSnapshotStates: make(map[string]activities.SlotSnapshotState), + TxSnapshotStates: make(map[string]activities.TxSnapshotState), + Alerter: alerting.NewAlerter(context.Background(), conn), }) return c, w, nil diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 6adb3a2429..988b8e3f28 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -44,7 +44,7 @@ type CDCPullConnector interface { // For InitialSnapshotOnly correctness without replication slot // `any` is for returning transaction if necessary - ExportSnapshot(context.Context) (string, any, error) + ExportTxSnapshot(context.Context) (*protos.ExportTxSnapshotOutput, any, error) // `any` from ExportSnapshot passed here when done, allowing transaction to commit FinishExport(any) error diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 898f962aec..f667107bb2 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -369,12 +369,12 @@ func (c *PostgresConnector) createSlotAndPublication( c.logger.Warn(fmt.Sprintf("Creating replication slot '%s'", slot)) - _, err = conn.Exec(ctx, "SET idle_in_transaction_session_timeout = 0") + _, err = conn.Exec(ctx, "SET LOCAL idle_in_transaction_session_timeout=0") if err != nil { return fmt.Errorf("[slot] error setting idle_in_transaction_session_timeout: %w", err) } - _, err = conn.Exec(ctx, "SET lock_timeout = 0") + _, err = conn.Exec(ctx, "SET LOCAL lock_timeout=0") if err != nil { return fmt.Errorf("[slot] error setting lock_timeout: %w", err) } @@ -388,11 +388,17 @@ func (c *PostgresConnector) createSlotAndPublication( return fmt.Errorf("[slot] error creating replication slot: %w", err) } + ok, _, err := c.MajorVersionCheck(ctx, POSTGRES_13) + if err != nil { + return fmt.Errorf("[slot] error getting PG version: %w", err) + } + c.logger.Info(fmt.Sprintf("Created replication slot '%s'", slot)) slotDetails := SlotCreationResult{ - SlotName: res.SlotName, - SnapshotName: res.SnapshotName, - Err: nil, + SlotName: res.SlotName, + SnapshotName: res.SnapshotName, + Err: nil, + SupportsTIDScans: ok, } signal.SlotCreated <- slotDetails c.logger.Info("Waiting for clone to complete") @@ -405,9 +411,10 @@ func (c *PostgresConnector) createSlotAndPublication( e = errors.New("slot already exists") } slotDetails := SlotCreationResult{ - SlotName: slot, - SnapshotName: "", - Err: e, + SlotName: slot, + SnapshotName: "", + Err: e, + SupportsTIDScans: false, } signal.SlotCreated <- slotDetails } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 1bba4e8eca..bd6721c076 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -941,30 +941,49 @@ func (c *PostgresConnector) EnsurePullability( return &protos.EnsurePullabilityBatchOutput{TableIdentifierMapping: tableIdentifierMapping}, nil } -func (c *PostgresConnector) ExportSnapshot(ctx context.Context) (string, any, error) { +func (c *PostgresConnector) ExportTxSnapshot(ctx context.Context) (*protos.ExportTxSnapshotOutput, any, error) { var snapshotName string tx, err := c.conn.Begin(ctx) if err != nil { - return "", nil, err + return nil, nil, err + } + txNeedsRollback := true + defer func() { + if txNeedsRollback { + rollbackCtx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancelFunc() + err := tx.Rollback(rollbackCtx) + if err != pgx.ErrTxClosed { + c.logger.Error("error while rolling back transaction for snapshot export") + } + } + }() + + _, err = tx.Exec(ctx, "SET LOCAL idle_in_transaction_session_timeout=0") + if err != nil { + return nil, nil, fmt.Errorf("[export-snapshot] error setting idle_in_transaction_session_timeout: %w", err) } - _, err = tx.Exec(ctx, "SET idle_in_transaction_session_timeout = 0") + _, err = tx.Exec(ctx, "SET LOCAL lock_timeout=0") if err != nil { - return "", nil, fmt.Errorf("[export-snapshot] error setting idle_in_transaction_session_timeout: %w", err) + return nil, nil, fmt.Errorf("[export-snapshot] error setting lock_timeout: %w", err) } - _, err = tx.Exec(ctx, "SET lock_timeout = 0") + ok, _, err := c.MajorVersionCheck(ctx, POSTGRES_13) if err != nil { - return "", nil, fmt.Errorf("[export-snapshot] error setting lock_timeout: %w", err) + return nil, nil, fmt.Errorf("[export-snapshot] error getting PG version: %w", err) } - err = tx.QueryRow(ctx, "select pg_export_snapshot()").Scan(&snapshotName) + err = tx.QueryRow(ctx, "SELECT pg_export_snapshot()").Scan(&snapshotName) if err != nil { - _ = tx.Rollback(ctx) - return "", nil, err + return nil, nil, err } + txNeedsRollback = false - return snapshotName, tx, err + return &protos.ExportTxSnapshotOutput{ + SnapshotName: snapshotName, + SupportsTidScans: ok, + }, tx, err } func (c *PostgresConnector) FinishExport(tx any) error { diff --git a/flow/connectors/postgres/slot_signal.go b/flow/connectors/postgres/slot_signal.go index 386b3af36b..be6395917b 100644 --- a/flow/connectors/postgres/slot_signal.go +++ b/flow/connectors/postgres/slot_signal.go @@ -1,9 +1,10 @@ package connpostgres type SlotCreationResult struct { - SlotName string - SnapshotName string - Err error + SlotName string + SnapshotName string + SupportsTIDScans bool + Err error } // This struct contains two signals. diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index ce9ab27d78..8ec844cc4b 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -12,6 +12,7 @@ import ( "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" + "github.com/PeerDB-io/peer-flow/activities" "github.com/PeerDB-io/peer-flow/concurrency" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/connectors/utils" @@ -19,12 +20,28 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) +type snapshotType int + +const ( + SNAPSHOT_TYPE_UNKNOWN snapshotType = iota + SNAPSHOT_TYPE_SLOT + SNAPSHOT_TYPE_TX +) + type SnapshotFlowExecution struct { config *protos.FlowConnectionConfigs logger log.Logger tableNameSchemaMapping map[string]*protos.TableSchema } +type cloneTablesInput struct { + snapshotType snapshotType + slotName string + snapshotName string + supportsTIDScans bool + maxParallelClones int +} + // ensurePullability ensures that the source peer is pullable. func (s *SnapshotFlowExecution) setupReplication( ctx workflow.Context, @@ -117,11 +134,6 @@ func (s *SnapshotFlowExecution) cloneTable( sourcePostgres := s.config.Source sourcePostgres.GetPostgresConfig().TransactionSnapshot = snapshotName - partitionCol := "ctid" - if mapping.PartitionKey != "" { - partitionCol = mapping.PartitionKey - } - parsedSrcTable, err := utils.ParseSchemaTable(srcName) if err != nil { s.logger.Error("unable to parse source table", slog.Any("error", err), cloneLog) @@ -142,9 +154,13 @@ func (s *SnapshotFlowExecution) cloneTable( } } } - - query := fmt.Sprintf("SELECT %s FROM %s WHERE %s BETWEEN {{.start}} AND {{.end}}", - from, parsedSrcTable.String(), partitionCol) + var query string + if mapping.PartitionKey == "" { + query = fmt.Sprintf("SELECT %s FROM %s", from, parsedSrcTable.String()) + } else { + query = fmt.Sprintf("SELECT %s FROM %s WHERE %s BETWEEN {{.start}} AND {{.end}}", + from, parsedSrcTable.String(), mapping.PartitionKey) + } numWorkers := uint32(8) if s.config.SnapshotMaxParallelWorkers > 0 { @@ -161,7 +177,7 @@ func (s *SnapshotFlowExecution) cloneTable( SourcePeer: sourcePostgres, DestinationPeer: s.config.Destination, Query: query, - WatermarkColumn: partitionCol, + WatermarkColumn: mapping.PartitionKey, WatermarkTable: srcName, InitialCopyOnly: true, DestinationTableIdentifier: dstName, @@ -182,23 +198,36 @@ func (s *SnapshotFlowExecution) cloneTable( func (s *SnapshotFlowExecution) cloneTables( ctx workflow.Context, - slotInfo *protos.SetupReplicationOutput, - maxParallelClones int, + cloneTablesInput *cloneTablesInput, ) error { - s.logger.Info(fmt.Sprintf("cloning tables for slot name %s and snapshotName %s", - slotInfo.SlotName, slotInfo.SnapshotName)) + if cloneTablesInput.snapshotType == SNAPSHOT_TYPE_SLOT { + s.logger.Info(fmt.Sprintf("cloning tables for slot name %s and snapshotName %s", + cloneTablesInput.slotName, cloneTablesInput.snapshotName)) + } else if cloneTablesInput.snapshotType == SNAPSHOT_TYPE_TX { + s.logger.Info("cloning tables in txn snapshot mode with snapshotName " + + cloneTablesInput.snapshotName) + } - boundSelector := concurrency.NewBoundSelector(maxParallelClones) + boundSelector := concurrency.NewBoundSelector(cloneTablesInput.maxParallelClones) + defaultPartitionCol := "ctid" + if !cloneTablesInput.supportsTIDScans { + s.logger.Info("Postgres version too old for TID scans, might use full table partitions!") + defaultPartitionCol = "" + } + + snapshotName := cloneTablesInput.snapshotName for _, v := range s.config.TableMappings { source := v.SourceTableIdentifier destination := v.DestinationTableIdentifier - snapshotName := slotInfo.SnapshotName s.logger.Info(fmt.Sprintf( "Cloning table with source table %s and destination table name %s", source, destination), slog.String("snapshotName", snapshotName), ) + if v.PartitionKey == "" { + v.PartitionKey = defaultPartitionCol + } err := s.cloneTable(ctx, boundSelector, snapshotName, v) if err != nil { s.logger.Error("failed to start clone child workflow: ", err) @@ -227,7 +256,13 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( } logger.Info(fmt.Sprintf("cloning %d tables in parallel", numTablesInParallel)) - if err := s.cloneTables(ctx, slotInfo, numTablesInParallel); err != nil { + if err := s.cloneTables(ctx, &cloneTablesInput{ + snapshotType: SNAPSHOT_TYPE_SLOT, + slotName: slotInfo.SlotName, + snapshotName: slotInfo.SnapshotName, + supportsTIDScans: slotInfo.SupportsTidScans, + maxParallelClones: numTablesInParallel, + }); err != nil { return fmt.Errorf("failed to clone tables: %w", err) } @@ -246,7 +281,8 @@ func SnapshotFlowWorkflow( se := &SnapshotFlowExecution{ config: config, tableNameSchemaMapping: tableNameSchemaMapping, - logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)), + logger: log.With(workflow.GetLogger(ctx), + slog.String(string(shared.FlowNameKey), config.FlowJobName)), } numTablesInParallel := int(max(config.SnapshotNumTablesInParallel, 1)) @@ -299,7 +335,7 @@ func SnapshotFlowWorkflow( ) var sessionError error - var snapshotName string + var txnSnapshotState *activities.TxSnapshotState sessionSelector := workflow.NewNamedSelector(ctx, "ExportSnapshotSetup") sessionSelector.AddFuture(fMaintain, func(f workflow.Future) { // MaintainTx should never exit without an error before this point @@ -307,7 +343,7 @@ func SnapshotFlowWorkflow( }) sessionSelector.AddFuture(fExportSnapshot, func(f workflow.Future) { // Happy path is waiting for this to return without error - sessionError = f.Get(exportCtx, &snapshotName) + sessionError = f.Get(exportCtx, &txnSnapshotState) }) sessionSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { sessionError = ctx.Err() @@ -317,11 +353,13 @@ func SnapshotFlowWorkflow( return sessionError } - slotInfo := &protos.SetupReplicationOutput{ - SlotName: "peerdb_initial_copy_only", - SnapshotName: snapshotName, - } - if err := se.cloneTables(ctx, slotInfo, int(config.SnapshotNumTablesInParallel)); err != nil { + if err := se.cloneTables(ctx, &cloneTablesInput{ + snapshotType: SNAPSHOT_TYPE_TX, + slotName: "", + snapshotName: txnSnapshotState.SnapshotName, + supportsTIDScans: txnSnapshotState.SupportsTIDScans, + maxParallelClones: numTablesInParallel, + }); err != nil { return fmt.Errorf("failed to clone tables: %w", err) } } else if err := se.cloneTablesWithSlot(ctx, sessionCtx, numTablesInParallel); err != nil { diff --git a/protos/flow.proto b/protos/flow.proto index e872b0fa17..f33b0696db 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -144,6 +144,7 @@ message SetupReplicationInput { message SetupReplicationOutput { string slot_name = 1; string snapshot_name = 2; + bool supports_tid_scans = 3; } message CreateRawTableInput { @@ -394,3 +395,8 @@ message IsQRepPartitionSyncedInput { string partition_id = 2; } +message ExportTxSnapshotOutput { + string snapshot_name = 1; + bool supports_tid_scans = 2; +} + diff --git a/ui/app/peers/page.tsx b/ui/app/peers/page.tsx index d7812e54c0..2170712950 100644 --- a/ui/app/peers/page.tsx +++ b/ui/app/peers/page.tsx @@ -39,6 +39,10 @@ export default function Peers() { > Peers + {isLoading && ( @@ -46,12 +50,61 @@ export default function Peers() { )} - {!isLoading && ( - peer)} - /> - )} + {!isLoading && + (peers && peers.length == 0 ? ( +
+ + + +
+ ) : ( + peer)} /> + ))}
); diff --git a/ui/app/peers/peersTable.tsx b/ui/app/peers/peersTable.tsx index a49cbd2e53..a592d232f3 100644 --- a/ui/app/peers/peersTable.tsx +++ b/ui/app/peers/peersTable.tsx @@ -33,7 +33,7 @@ function PeerRow({ peer }: { peer: Peer }) { ); } -function PeersTable({ title, peers }: { title: string; peers: Peer[] }) { +function PeersTable({ peers }: { peers: Peer[] }) { const [searchQuery, setSearchQuery] = useState(''); const [filteredType, setFilteredType] = useState( undefined @@ -65,7 +65,6 @@ function PeersTable({ title, peers }: { title: string; peers: Peer[] }) { return ( {title}} toolbar={{ left: <>, right: ( diff --git a/ui/next.config.js b/ui/next.config.js index a42a10c727..5212d4780c 100644 --- a/ui/next.config.js +++ b/ui/next.config.js @@ -3,6 +3,15 @@ const nextConfig = { compiler: { styledComponents: true, }, + async redirects() { + return [ + { + source: '/', + destination: '/peers', + permanent: false, + }, + ]; + }, reactStrictMode: true, swcMinify: true, output: 'standalone',