Skip to content

Commit

Permalink
dynamically add new tables to CDC mirrors (#1084)
Browse files Browse the repository at this point in the history
1. Call an activity to add the new tables to the publication.
2. Kick off a child `CDCFlow` with only the new tables as input along
with the `InitialCopyOnly` flag set. This runs enough of the `SetupFlow`
and `SnapshotFlow` to be fine, but also skips over some checks regarding
primary keys and replica identities. The current idea is to have them in
the UI instead, just like in the create mirror interface.
3. Patch the config of the parent workflow with the new tables and then
resume.

Currently there is no way to interface with this feature, will add a way
to signal this in a different PR. UI interface will be added
subsequently.

---------

Co-authored-by: Kaushik Iska <[email protected]>
  • Loading branch information
heavycrystal and iskakaushik authored Jan 18, 2024
1 parent f504b1e commit bc2dcdc
Show file tree
Hide file tree
Showing 18 changed files with 324 additions and 136 deletions.
21 changes: 21 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,3 +986,24 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,

return currentSnapshotXmin, nil
}

func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *protos.FlowConnectionConfigs,
additionalTableMappings []*protos.TableMapping,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, cfg.FlowJobName)
srcConn, err := connectors.GetCDCPullConnector(ctx, cfg.Source)
if err != nil {
return fmt.Errorf("failed to get source connector: %w", err)
}
defer connectors.CloseConnector(srcConn)

err = srcConn.AddTablesToPublication(&protos.AddTablesToPublicationInput{
FlowJobName: cfg.FlowJobName,
PublicationName: cfg.PublicationName,
AdditionalTables: additionalTableMappings,
})
if err != nil {
a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err)
}
return err
}
15 changes: 8 additions & 7 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (h *FlowRequestHandler) ShutdownFlow(
ctx,
req.WorkflowId,
"",
shared.CDCFlowSignalName,
shared.FlowSignalName,
shared.ShutdownSignal,
)
if err != nil {
Expand Down Expand Up @@ -442,8 +442,9 @@ func (h *FlowRequestHandler) FlowStateChange(
if err != nil {
return nil, err
}

if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED &&
*currState == protos.FlowStatus_STATUS_RUNNING {
currState == protos.FlowStatus_STATUS_RUNNING {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING)
if err != nil {
return nil, err
Expand All @@ -452,20 +453,20 @@ func (h *FlowRequestHandler) FlowStateChange(
ctx,
workflowID,
"",
shared.CDCFlowSignalName,
shared.FlowSignalName,
shared.PauseSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING &&
*currState == protos.FlowStatus_STATUS_PAUSED {
currState == protos.FlowStatus_STATUS_PAUSED {
err = h.temporalClient.SignalWorkflow(
ctx,
workflowID,
"",
shared.CDCFlowSignalName,
shared.FlowSignalName,
shared.NoopSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED &&
(*currState == protos.FlowStatus_STATUS_RUNNING || *currState == protos.FlowStatus_STATUS_PAUSED) {
(currState == protos.FlowStatus_STATUS_RUNNING || currState == protos.FlowStatus_STATUS_PAUSED) {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING)
if err != nil {
return nil, err
Expand All @@ -482,7 +483,7 @@ func (h *FlowRequestHandler) FlowStateChange(
req.RequestedFlowState, currState)
}
if err != nil {
return nil, fmt.Errorf("unable to signal CDCFlow workflow: %w", err)
return nil, fmt.Errorf("unable to signal workflow: %w", err)
}

return &protos.FlowStateChangeResponse{
Expand Down
14 changes: 8 additions & 6 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (h *FlowRequestHandler) MirrorStatus(
Status: &protos.MirrorStatusResponse_CdcStatus{
CdcStatus: cdcStatus,
},
CurrentFlowState: *currState,
CurrentFlowState: currState,
}, nil
} else {
qrepStatus, err := h.QRepFlowStatus(ctx, req)
Expand All @@ -66,7 +66,7 @@ func (h *FlowRequestHandler) MirrorStatus(
Status: &protos.MirrorStatusResponse_QrepStatus{
QrepStatus: qrepStatus,
},
CurrentFlowState: *currState,
CurrentFlowState: currState,
}, nil
}
}
Expand Down Expand Up @@ -334,17 +334,19 @@ func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string)
return false, nil
}

func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID string) (*protos.FlowStatus, error) {
func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID string) (protos.FlowStatus, error) {
res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.FlowStatusQuery)
if err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
return nil, fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
return protos.FlowStatus_STATUS_UNKNOWN,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
var state *protos.FlowStatus
var state protos.FlowStatus
err = res.Get(&state)
if err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
return nil, fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
return protos.FlowStatus_STATUS_UNKNOWN,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
return state, nil
}
Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type CDCPullConnector interface {

// GetOpenConnectionsForUser returns the number of open connections for the user configured in the peer.
GetOpenConnectionsForUser() (*protos.GetOpenConnectionsForUserResult, error)

// AddTablesToPublication adds additional tables added to a mirror to the publication also
AddTablesToPublication(req *protos.AddTablesToPublicationInput) error
}

type CDCSyncConnector interface {
Expand Down
4 changes: 4 additions & 0 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,3 +604,7 @@ func (c *PostgresConnector) getCurrentLSN() (pglogrepl.LSN, error) {
}
return pglogrepl.ParseLSN(result.String)
}

func (c *PostgresConnector) getDefaultPublicationName(jobName string) string {
return fmt.Sprintf("peerflow_pub_%s", jobName)
}
63 changes: 54 additions & 9 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log/slog"
"regexp"
"strings"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
Expand Down Expand Up @@ -205,8 +206,7 @@ func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.Pu
slotName = req.OverrideReplicationSlotName
}

// Publication name would be the job name prefixed with "peerflow_pub_"
publicationName := fmt.Sprintf("peerflow_pub_%s", req.FlowJobName)
publicationName := c.getDefaultPublicationName(req.FlowJobName)
if req.OverridePublicationName != "" {
publicationName = req.OverridePublicationName
}
Expand Down Expand Up @@ -788,6 +788,16 @@ func (c *PostgresConnector) EnsurePullability(
return nil, err
}

tableIdentifierMapping[tableName] = &protos.PostgresTableIdentifier{
RelId: relID,
}

if !req.CheckConstraints {
msg := fmt.Sprintf("[no-constraints] ensured pullability table %s", tableName)
utils.RecordHeartbeatWithRecover(c.ctx, msg)
continue
}

replicaIdentity, replErr := c.getReplicaIdentityType(schemaTable)
if replErr != nil {
return nil, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, replErr)
Expand All @@ -799,13 +809,11 @@ func (c *PostgresConnector) EnsurePullability(
}

// we only allow no primary key if the table has REPLICA IDENTITY FULL
// this is ok for replica identity index as we populate the primary key columns
if len(pKeyCols) == 0 && !(replicaIdentity == ReplicaIdentityFull) {
return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable)
}

tableIdentifierMapping[tableName] = &protos.PostgresTableIdentifier{
RelId: relID,
}
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("ensured pullability table %s", tableName))
}

Expand All @@ -826,8 +834,7 @@ func (c *PostgresConnector) SetupReplication(signal SlotSignal, req *protos.Setu
slotName = req.ExistingReplicationSlotName
}

// Publication name would be the job name prefixed with "peerflow_pub_"
publicationName := fmt.Sprintf("peerflow_pub_%s", req.FlowJobName)
publicationName := c.getDefaultPublicationName(req.FlowJobName)
if req.ExistingPublicationName != "" {
publicationName = req.ExistingPublicationName
}
Expand Down Expand Up @@ -859,8 +866,7 @@ func (c *PostgresConnector) PullFlowCleanup(jobName string) error {
// Slotname would be the job name prefixed with "peerflow_slot_"
slotName := fmt.Sprintf("peerflow_slot_%s", jobName)

// Publication name would be the job name prefixed with "peerflow_pub_"
publicationName := fmt.Sprintf("peerflow_pub_%s", jobName)
publicationName := c.getDefaultPublicationName(jobName)

pullFlowCleanupTx, err := c.pool.Begin(c.ctx)
if err != nil {
Expand Down Expand Up @@ -938,3 +944,42 @@ func (c *PostgresConnector) GetOpenConnectionsForUser() (*protos.GetOpenConnecti
CurrentOpenConnections: result.Int64,
}, nil
}

func (c *PostgresConnector) AddTablesToPublication(req *protos.AddTablesToPublicationInput) error {
// don't modify custom publications
if req == nil || len(req.AdditionalTables) == 0 {
return nil
}

additionalSrcTables := make([]string, 0, len(req.AdditionalTables))
for _, additionalTableMapping := range req.AdditionalTables {
additionalSrcTables = append(additionalSrcTables, additionalTableMapping.SourceTableIdentifier)
}

// just check if we have all the tables already in the publication
if req.PublicationName != "" {
rows, err := c.pool.Query(c.ctx,
"SELECT tablename FROM pg_publication_tables WHERE pubname=$1", req.PublicationName)
if err != nil {
return fmt.Errorf("failed to check tables in publication: %w", err)
}

tableNames, err := pgx.CollectRows[string](rows, pgx.RowTo)
if err != nil {
return fmt.Errorf("failed to check tables in publication: %w", err)
}
notPresentTables := utils.ArrayMinus(tableNames, additionalSrcTables)
if len(notPresentTables) > 0 {
return fmt.Errorf("some additional tables not present in custom publication: %s",
strings.Join(notPresentTables, ", "))
}
}

additionalSrcTablesString := strings.Join(additionalSrcTables, ",")
_, err := c.pool.Exec(c.ctx, fmt.Sprintf("ALTER PUBLICATION %s ADD TABLE %s",
c.getDefaultPublicationName(req.FlowJobName), additionalSrcTablesString))
if err != nil {
return fmt.Errorf("failed to alter publication: %w", err)
}
return nil
}
23 changes: 20 additions & 3 deletions flow/connectors/utils/array.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package utils

func ArrayMinus(first []string, second []string) []string {
lookup := make(map[string]struct{}, len(second))
// first - second
func ArrayMinus[T comparable](first, second []T) []T {
lookup := make(map[T]struct{}, len(second))
// Add elements from arrayB to the lookup map
for _, element := range second {
lookup[element] = struct{}{}
}
// Iterate over arrayA and check if the element is present in the lookup map
var result []string
var result []T
for _, element := range first {
_, exists := lookup[element]
if !exists {
Expand All @@ -29,3 +30,19 @@ func ArrayChunks[T any](slice []T, size int) [][]T {

return partitions
}

func ArraysHaveOverlap[T comparable](first, second []T) bool {
lookup := make(map[T]struct{})

for _, element := range second {
lookup[element] = struct{}{}
}

for _, element := range first {
if _, exists := lookup[element]; exists {
return true
}
}

return false
}
2 changes: 1 addition & 1 deletion flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func SetupCDCFlowStatusQuery(t *testing.T, env *testsuite.TestWorkflowEnvironmen
err = response.Get(&state)
if err != nil {
slog.Error(err.Error())
} else if state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING {
} else if state.CurrentFlowStatus == protos.FlowStatus_STATUS_RUNNING {
return
}
} else if counter > 15 {
Expand Down
27 changes: 27 additions & 0 deletions flow/shared/additional_tables.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package shared

import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
)

func AdditionalTablesHasOverlap(currentTableMappings []*protos.TableMapping,
additionalTableMappings []*protos.TableMapping,
) bool {
currentSrcTables := make([]string, 0, len(currentTableMappings))
currentDstTables := make([]string, 0, len(currentTableMappings))
additionalSrcTables := make([]string, 0, len(additionalTableMappings))
additionalDstTables := make([]string, 0, len(additionalTableMappings))

for _, currentTableMapping := range currentTableMappings {
currentSrcTables = append(currentSrcTables, currentTableMapping.SourceTableIdentifier)
currentDstTables = append(currentDstTables, currentTableMapping.DestinationTableIdentifier)
}
for _, additionalTableMapping := range additionalTableMappings {
currentSrcTables = append(currentSrcTables, additionalTableMapping.SourceTableIdentifier)
currentDstTables = append(currentDstTables, additionalTableMapping.DestinationTableIdentifier)
}

return utils.ArraysHaveOverlap(currentSrcTables, additionalSrcTables) ||
utils.ArraysHaveOverlap(currentDstTables, additionalDstTables)
}
2 changes: 1 addition & 1 deletion flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const (
snapshotFlowTaskQueue = "snapshot-flow-task-queue"

// Signals
CDCFlowSignalName = "peer-flow-signal"
FlowSignalName = "peer-flow-signal"
CDCDynamicPropertiesSignalName = "cdc-dynamic-properties"

// Queries
Expand Down
Loading

0 comments on commit bc2dcdc

Please sign in to comment.