Skip to content

Commit

Permalink
More initial copy only support (#938)
Browse files Browse the repository at this point in the history
When InitialCopyOnly is set, we don't need to check for primary keys as
well as replication identity.
We also do not need to create a slot.

---------

Co-authored-by: Amogh-Bharadwaj <[email protected]>
  • Loading branch information
iskakaushik and Amogh-Bharadwaj authored Dec 30, 2023
1 parent 8aec70a commit 0287c21
Show file tree
Hide file tree
Showing 11 changed files with 423 additions and 342 deletions.
26 changes: 17 additions & 9 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func (c *PostgresConnector) GetTableSchema(
) (*protos.GetTableSchemaBatchOutput, error) {
res := make(map[string]*protos.TableSchema)
for _, tableName := range req.TableIdentifiers {
tableSchema, err := c.getTableSchemaForTable(tableName)
tableSchema, err := c.getTableSchemaForTable(tableName, req.SkipPkeyAndReplicaCheck)
if err != nil {
return nil, err
}
Expand All @@ -567,20 +567,27 @@ func (c *PostgresConnector) GetTableSchema(

func (c *PostgresConnector) getTableSchemaForTable(
tableName string,
skipPkeyAndReplicaCheck bool,
) (*protos.TableSchema, error) {
schemaTable, err := utils.ParseSchemaTable(tableName)
if err != nil {
return nil, err
}

replicaIdentityType, replErr := c.getReplicaIdentityType(schemaTable)
if replErr != nil {
return nil, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, replErr)
}
var pKeyCols []string
var replicaIdentityType ReplicaIdentityType
if !skipPkeyAndReplicaCheck {
var replErr error
replicaIdentityType, replErr = c.getReplicaIdentityType(schemaTable)
if replErr != nil {
return nil, fmt.Errorf("[getTableSchema]:error getting replica identity for table %s: %w", schemaTable, replErr)
}

pKeyCols, err := c.getPrimaryKeyColumns(replicaIdentityType, schemaTable)
if err != nil {
return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err)
var err error
pKeyCols, err = c.getPrimaryKeyColumns(replicaIdentityType, schemaTable)
if err != nil {
return nil, fmt.Errorf("[getTableSchema]:error getting primary key column for table %s: %w", schemaTable, err)
}
}

// Get the column names and types
Expand Down Expand Up @@ -731,7 +738,8 @@ func (c *PostgresConnector) ReplayTableSchemaDeltas(flowJobName string,
}

// EnsurePullability ensures that a table is pullable, implementing the Connector interface.
func (c *PostgresConnector) EnsurePullability(req *protos.EnsurePullabilityBatchInput,
func (c *PostgresConnector) EnsurePullability(
req *protos.EnsurePullabilityBatchInput,
) (*protos.EnsurePullabilityBatchOutput, error) {
tableIdentifierMapping := make(map[string]*protos.TableIdentifier)
for _, tableName := range req.SourceTableIdentifiers {
Expand Down
594 changes: 303 additions & 291 deletions flow/generated/protos/flow.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ func CDCFlowWorkflowWithConfig(

state.SnapshotComplete = true
state.Progress = append(state.Progress, "executed setup flow and snapshot flow")

// if initial_copy_only is opted for, we end the flow here.
if cfg.InitialCopyOnly {
return nil, nil
Expand Down
7 changes: 4 additions & 3 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex
})

tableSchemaInput := &protos.GetTableSchemaBatchInput{
PeerConnectionConfig: q.config.SourcePeer,
TableIdentifiers: []string{q.config.WatermarkTable},
FlowName: q.config.FlowJobName,
PeerConnectionConfig: q.config.SourcePeer,
TableIdentifiers: []string{q.config.WatermarkTable},
FlowName: q.config.FlowJobName,
SkipPkeyAndReplicaCheck: true,
}

future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput)
Expand Down
25 changes: 15 additions & 10 deletions flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,10 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables(
sort.Strings(sourceTables)

tableSchemaInput := &protos.GetTableSchemaBatchInput{
PeerConnectionConfig: flowConnectionConfigs.Source,
TableIdentifiers: sourceTables,
FlowName: s.CDCFlowName,
PeerConnectionConfig: flowConnectionConfigs.Source,
TableIdentifiers: sourceTables,
FlowName: s.CDCFlowName,
SkipPkeyAndReplicaCheck: flowConnectionConfigs.InitialCopyOnly,
}

future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput)
Expand Down Expand Up @@ -260,14 +261,18 @@ func (s *SetupFlowExecution) executeSetupFlow(
return nil, fmt.Errorf("failed to check connections and setup metadata tables: %w", err)
}

// then ensure pullability
if err := s.ensurePullability(ctx, config); err != nil {
return nil, fmt.Errorf("failed to ensure pullability: %w", err)
}
// for initial copy only flows, we don't need to ensure pullability or create the raw table
// as we don't need the primary key requirement.
if !config.InitialCopyOnly {
// then ensure pullability
if err := s.ensurePullability(ctx, config); err != nil {
return nil, fmt.Errorf("failed to ensure pullability: %w", err)
}

// then create the raw table
if err := s.createRawTable(ctx, config); err != nil {
return nil, fmt.Errorf("failed to create raw table: %w", err)
// then create the raw table
if err := s.createRawTable(ctx, config); err != nil {
return nil, fmt.Errorf("failed to create raw table: %w", err)
}
}

// then fetch the table schema and setup the normalized tables
Expand Down
66 changes: 41 additions & 25 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (s *SnapshotFlowExecution) cloneTables(
ctx workflow.Context,
slotInfo *protos.SetupReplicationOutput,
maxParallelClones int,
) {
) error {
slog.Info(fmt.Sprintf("cloning tables for slot name %s and snapshotName %s",
slotInfo.SlotName, slotInfo.SnapshotName))

Expand All @@ -216,10 +216,11 @@ func (s *SnapshotFlowExecution) cloneTables(

if err := boundSelector.Wait(); err != nil {
s.logger.Error("failed to clone some tables", "error", err)
return
return err
}

s.logger.Info("finished cloning tables")
return nil
}

func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs) error {
Expand All @@ -230,24 +231,48 @@ func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionCon
logger: logger,
}

numTablesInParallel := int(config.SnapshotNumTablesInParallel)
if numTablesInParallel <= 0 {
numTablesInParallel = 1
}

replCtx := ctx
if config.DoInitialCopy {
sessionOpts := &workflow.SessionOptions{
CreationTimeout: 5 * time.Minute,
ExecutionTimeout: time.Hour * 24 * 365 * 100, // 100 years
HeartbeatTimeout: time.Hour * 24 * 365 * 100, // 100 years
}

sessionCtx, err := workflow.CreateSession(ctx, sessionOpts)
if !config.DoInitialCopy {
_, err := se.setupReplication(replCtx)
if err != nil {
return fmt.Errorf("failed to create session: %w", err)
return fmt.Errorf("failed to setup replication: %w", err)
}

if err := se.closeSlotKeepAlive(replCtx); err != nil {
return fmt.Errorf("failed to close slot keep alive: %w", err)
}

return nil
}

if config.InitialCopyOnly {
slotInfo := &protos.SetupReplicationOutput{
SlotName: "peerdb_initial_copy_only",
SnapshotName: "", // empty snapshot name indicates that we should not use a snapshot
}
defer workflow.CompleteSession(sessionCtx)
se.cloneTables(ctx, slotInfo, int(config.SnapshotNumTablesInParallel))
return nil
}

sessionOpts := &workflow.SessionOptions{
CreationTimeout: 5 * time.Minute,
ExecutionTimeout: time.Hour * 24 * 365 * 100, // 100 years
HeartbeatTimeout: time.Hour * 24 * 365 * 100, // 100 years
}

replCtx = sessionCtx
sessionCtx, err := workflow.CreateSession(ctx, sessionOpts)
if err != nil {
return fmt.Errorf("failed to create session: %w", err)
}
defer workflow.CompleteSession(sessionCtx)

slotInfo, err := se.setupReplication(replCtx)
slotInfo, err := se.setupReplication(sessionCtx)
if err != nil {
return fmt.Errorf("failed to setup replication: %w", err)
}
Expand All @@ -257,19 +282,10 @@ func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionCon
return nil
}

if config.DoInitialCopy {
numTablesInParallel := int(config.SnapshotNumTablesInParallel)
if numTablesInParallel <= 0 {
numTablesInParallel = 1
}

logger.Info("cloning tables in parallel: ", numTablesInParallel)
se.cloneTables(ctx, slotInfo, numTablesInParallel)
} else {
logger.Info("skipping initial copy as 'doInitialCopy' is false")
}
logger.Info("cloning tables in parallel: ", numTablesInParallel)
se.cloneTables(ctx, slotInfo, numTablesInParallel)

if err := se.closeSlotKeepAlive(replCtx); err != nil {
if err := se.closeSlotKeepAlive(sessionCtx); err != nil {
return fmt.Errorf("failed to close slot keep alive: %w", err)
}

Expand Down
7 changes: 4 additions & 3 deletions flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ func (q *XminFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex
})

tableSchemaInput := &protos.GetTableSchemaBatchInput{
PeerConnectionConfig: q.config.SourcePeer,
TableIdentifiers: []string{q.config.WatermarkTable},
FlowName: q.config.FlowJobName,
PeerConnectionConfig: q.config.SourcePeer,
TableIdentifiers: []string{q.config.WatermarkTable},
FlowName: q.config.FlowJobName,
SkipPkeyAndReplicaCheck: true,
}

future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput)
Expand Down
2 changes: 2 additions & 0 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ pub struct GetTableSchemaBatchInput {
pub table_identifiers: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(string, tag="3")]
pub flow_name: ::prost::alloc::string::String,
#[prost(bool, tag="4")]
pub skip_pkey_and_replica_check: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 18 additions & 0 deletions nexus/pt/src/peerdb_flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1957,6 +1957,9 @@ impl serde::Serialize for GetTableSchemaBatchInput {
if !self.flow_name.is_empty() {
len += 1;
}
if self.skip_pkey_and_replica_check {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_flow.GetTableSchemaBatchInput", len)?;
if let Some(v) = self.peer_connection_config.as_ref() {
struct_ser.serialize_field("peerConnectionConfig", v)?;
Expand All @@ -1967,6 +1970,9 @@ impl serde::Serialize for GetTableSchemaBatchInput {
if !self.flow_name.is_empty() {
struct_ser.serialize_field("flowName", &self.flow_name)?;
}
if self.skip_pkey_and_replica_check {
struct_ser.serialize_field("skipPkeyAndReplicaCheck", &self.skip_pkey_and_replica_check)?;
}
struct_ser.end()
}
}
Expand All @@ -1983,13 +1989,16 @@ impl<'de> serde::Deserialize<'de> for GetTableSchemaBatchInput {
"tableIdentifiers",
"flow_name",
"flowName",
"skip_pkey_and_replica_check",
"skipPkeyAndReplicaCheck",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
PeerConnectionConfig,
TableIdentifiers,
FlowName,
SkipPkeyAndReplicaCheck,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand All @@ -2015,6 +2024,7 @@ impl<'de> serde::Deserialize<'de> for GetTableSchemaBatchInput {
"peerConnectionConfig" | "peer_connection_config" => Ok(GeneratedField::PeerConnectionConfig),
"tableIdentifiers" | "table_identifiers" => Ok(GeneratedField::TableIdentifiers),
"flowName" | "flow_name" => Ok(GeneratedField::FlowName),
"skipPkeyAndReplicaCheck" | "skip_pkey_and_replica_check" => Ok(GeneratedField::SkipPkeyAndReplicaCheck),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -2037,6 +2047,7 @@ impl<'de> serde::Deserialize<'de> for GetTableSchemaBatchInput {
let mut peer_connection_config__ = None;
let mut table_identifiers__ = None;
let mut flow_name__ = None;
let mut skip_pkey_and_replica_check__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::PeerConnectionConfig => {
Expand All @@ -2057,6 +2068,12 @@ impl<'de> serde::Deserialize<'de> for GetTableSchemaBatchInput {
}
flow_name__ = Some(map.next_value()?);
}
GeneratedField::SkipPkeyAndReplicaCheck => {
if skip_pkey_and_replica_check__.is_some() {
return Err(serde::de::Error::duplicate_field("skipPkeyAndReplicaCheck"));
}
skip_pkey_and_replica_check__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -2066,6 +2083,7 @@ impl<'de> serde::Deserialize<'de> for GetTableSchemaBatchInput {
peer_connection_config: peer_connection_config__,
table_identifiers: table_identifiers__.unwrap_or_default(),
flow_name: flow_name__.unwrap_or_default(),
skip_pkey_and_replica_check: skip_pkey_and_replica_check__.unwrap_or_default(),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ message GetTableSchemaBatchInput {
peerdb_peers.Peer peer_connection_config = 1;
repeated string table_identifiers = 2;
string flow_name = 3;
bool skip_pkey_and_replica_check = 4;
}

message GetTableSchemaBatchOutput {
Expand Down
18 changes: 17 additions & 1 deletion ui/grpc_generated/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ export interface GetTableSchemaBatchInput {
peerConnectionConfig: Peer | undefined;
tableIdentifiers: string[];
flowName: string;
skipPkeyAndReplicaCheck: boolean;
}

export interface GetTableSchemaBatchOutput {
Expand Down Expand Up @@ -4136,7 +4137,7 @@ export const TableSchema_ColumnsEntry = {
};

function createBaseGetTableSchemaBatchInput(): GetTableSchemaBatchInput {
return { peerConnectionConfig: undefined, tableIdentifiers: [], flowName: "" };
return { peerConnectionConfig: undefined, tableIdentifiers: [], flowName: "", skipPkeyAndReplicaCheck: false };
}

export const GetTableSchemaBatchInput = {
Expand All @@ -4150,6 +4151,9 @@ export const GetTableSchemaBatchInput = {
if (message.flowName !== "") {
writer.uint32(26).string(message.flowName);
}
if (message.skipPkeyAndReplicaCheck === true) {
writer.uint32(32).bool(message.skipPkeyAndReplicaCheck);
}
return writer;
},

Expand Down Expand Up @@ -4181,6 +4185,13 @@ export const GetTableSchemaBatchInput = {

message.flowName = reader.string();
continue;
case 4:
if (tag !== 32) {
break;
}

message.skipPkeyAndReplicaCheck = reader.bool();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand All @@ -4197,6 +4208,7 @@ export const GetTableSchemaBatchInput = {
? object.tableIdentifiers.map((e: any) => String(e))
: [],
flowName: isSet(object.flowName) ? String(object.flowName) : "",
skipPkeyAndReplicaCheck: isSet(object.skipPkeyAndReplicaCheck) ? Boolean(object.skipPkeyAndReplicaCheck) : false,
};
},

Expand All @@ -4211,6 +4223,9 @@ export const GetTableSchemaBatchInput = {
if (message.flowName !== "") {
obj.flowName = message.flowName;
}
if (message.skipPkeyAndReplicaCheck === true) {
obj.skipPkeyAndReplicaCheck = message.skipPkeyAndReplicaCheck;
}
return obj;
},

Expand All @@ -4224,6 +4239,7 @@ export const GetTableSchemaBatchInput = {
: undefined;
message.tableIdentifiers = object.tableIdentifiers?.map((e) => e) || [];
message.flowName = object.flowName ?? "";
message.skipPkeyAndReplicaCheck = object.skipPkeyAndReplicaCheck ?? false;
return message;
},
};
Expand Down

0 comments on commit 0287c21

Please sign in to comment.