diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 5d9c4e690f..e2c1731174 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -611,12 +611,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) logger := activity.GetLogger(ctx) - err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now()) - if err != nil { - a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return fmt.Errorf("failed to update start time for partition: %w", err) - } - srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) @@ -631,6 +625,25 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } defer connectors.CloseConnector(ctx, dstConn) + done, err := dstConn.IsQRepPartitionSynced(ctx, &protos.IsQRepPartitionSyncedInput{ + FlowJobName: config.FlowJobName, + PartitionId: partition.PartitionId, + }) + if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return fmt.Errorf("failed to get fetch status of partition: %w", err) + } + if done { + logger.Info("no records to push for partition " + partition.PartitionId) + return nil + } + + err = monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now()) + if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return fmt.Errorf("failed to update start time for partition: %w", err) + } + logger.Info("replicating partition " + partition.PartitionId) shutdown := utils.HeartbeatRoutine(ctx, func() string { return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total) @@ -705,8 +718,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, if err != nil { return err } - } else { - logger.Info("no records to push for partition " + partition.PartitionId) } err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index 744a20cdec..e2c2ef58e5 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -30,15 +30,6 @@ func (c *BigQueryConnector) SyncQRepRecords( return 0, err } - done, err := c.pgMetadata.IsQrepPartitionSynced(ctx, config.FlowJobName, partition.PartitionId) - if err != nil { - return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err) - } - - if done { - c.logger.Info(fmt.Sprintf("Partition %s has already been synced", partition.PartitionId)) - return 0, nil - } c.logger.Info(fmt.Sprintf("QRep sync function called and partition existence checked for"+ " partition %s of destination table %s", partition.PartitionId, destTable)) @@ -111,3 +102,9 @@ func (c *BigQueryConnector) SetupQRepMetadataTables(ctx context.Context, config return nil } + +func (c *BigQueryConnector) IsQRepPartitionSynced(ctx context.Context, + req *protos.IsQRepPartitionSyncedInput, +) (bool, error) { + return c.pgMetadata.IsQrepPartitionSynced(ctx, req.FlowJobName, req.PartitionId) +} diff --git a/flow/connectors/clickhouse/qrep.go b/flow/connectors/clickhouse/qrep.go index 5efcfaefb8..642c987962 100644 --- a/flow/connectors/clickhouse/qrep.go +++ b/flow/connectors/clickhouse/qrep.go @@ -33,16 +33,6 @@ func (c *ClickhouseConnector) SyncQRepRecords( slog.String("destinationTable", destTable), ) - done, err := c.isPartitionSynced(ctx, partition.PartitionId) - if err != nil { - return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err) - } - - if done { - c.logger.Info("Partition has already been synced", flowLog) - return 0, nil - } - tblSchema, err := c.getTableSchema(destTable) if err != nil { return 0, fmt.Errorf("failed to get schema of table %s: %w", destTable, err) @@ -96,9 +86,11 @@ func (c *ClickhouseConnector) getTableSchema(tableName string) ([]*sql.ColumnTyp return columnTypes, nil } -func (c *ClickhouseConnector) isPartitionSynced(ctx context.Context, partitionID string) (bool, error) { +func (c *ClickhouseConnector) IsQRepPartitionSynced(ctx context.Context, + req *protos.IsQRepPartitionSyncedInput, +) (bool, error) { //nolint:gosec - queryString := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE partitionID = '%s'`, qRepMetadataTableName, partitionID) + queryString := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE partitionID = '%s'`, qRepMetadataTableName, req.PartitionId) row := c.database.QueryRowContext(ctx, queryString) diff --git a/flow/connectors/core.go b/flow/connectors/core.go index ed306f73bb..b374ff70e4 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -146,6 +146,9 @@ type QRepPullConnector interface { type QRepSyncConnector interface { Connector + // IsQRepPartitionSynced returns true if a partition has already been synced + IsQRepPartitionSynced(ctx context.Context, req *protos.IsQRepPartitionSyncedInput) (bool, error) + // SetupQRepMetadataTables sets up the metadata tables for QRep. SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error @@ -261,6 +264,7 @@ var ( _ QRepSyncConnector = &connbigquery.BigQueryConnector{} _ QRepSyncConnector = &connsnowflake.SnowflakeConnector{} _ QRepSyncConnector = &connclickhouse.ClickhouseConnector{} + _ QRepSyncConnector = &conns3.S3Connector{} _ QRepConsolidateConnector = &connsnowflake.SnowflakeConnector{} _ QRepConsolidateConnector = &connclickhouse.ClickhouseConnector{} diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index df82fb5bf4..ce9521ff4c 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -458,15 +458,6 @@ func (c *PostgresConnector) SyncQRepRecords( return 0, fmt.Errorf("table %s does not exist, used schema: %s", dstTable.Table, dstTable.Schema) } - done, err := c.isPartitionSynced(ctx, partition.PartitionId) - if err != nil { - return 0, fmt.Errorf("failed to check if partition is synced: %w", err) - } - - if done { - c.logger.Info(fmt.Sprintf("partition %s already synced", partition.PartitionId)) - return 0, nil - } c.logger.Info("SyncRecords called and initial checks complete.") stagingTableSync := &QRepStagingTableSync{connector: c} @@ -573,18 +564,20 @@ func BuildQuery(logger log.Logger, query string, flowJobName string) (string, er return res, nil } -// isPartitionSynced checks whether a specific partition is synced -func (c *PostgresConnector) isPartitionSynced(ctx context.Context, partitionID string) (bool, error) { +// IsQRepPartitionSynced checks whether a specific partition is synced +func (c *PostgresConnector) IsQRepPartitionSynced(ctx context.Context, + req *protos.IsQRepPartitionSyncedInput, +) (bool, error) { // setup the query string metadataTableIdentifier := pgx.Identifier{c.metadataSchema, qRepMetadataTableName} queryString := fmt.Sprintf( - "SELECT COUNT(*)>0 FROM %s WHERE partitionID = $1;", + "SELECT COUNT(*)>0 FROM %s WHERE partitionID=$1;", metadataTableIdentifier.Sanitize(), ) // prepare and execute the query var result bool - err := c.conn.QueryRow(ctx, queryString, partitionID).Scan(&result) + err := c.conn.QueryRow(ctx, queryString, req.PartitionId).Scan(&result) if err != nil { return false, fmt.Errorf("failed to execute query: %w", err) } diff --git a/flow/connectors/s3/qrep.go b/flow/connectors/s3/qrep.go index 7a9432c1d3..1d5684d060 100644 --- a/flow/connectors/s3/qrep.go +++ b/flow/connectors/s3/qrep.go @@ -81,3 +81,10 @@ func (c *S3Connector) SetupQRepMetadataTables(_ context.Context, config *protos. c.logger.Info("QRep metadata setup not needed for S3.") return nil } + +// S3 doesn't check if partition is already synced, but file with same name is overwritten +func (c *S3Connector) IsQRepPartitionSynced(_ context.Context, + config *protos.IsQRepPartitionSyncedInput, +) (bool, error) { + return false, nil +} diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index d5ef2e32db..04bfd590fe 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -35,16 +35,6 @@ func (c *SnowflakeConnector) SyncQRepRecords( } c.logger.Info("Called QRep sync function and obtained table schema", flowLog) - done, err := c.pgMetadata.IsQrepPartitionSynced(ctx, config.FlowJobName, partition.PartitionId) - if err != nil { - return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err) - } - - if done { - c.logger.Info("Partition has already been synced", flowLog) - return 0, nil - } - avroSync := NewSnowflakeAvroSyncHandler(config, c) return avroSync.SyncQRepRecords(ctx, config, partition, tblSchema, stream) } @@ -282,3 +272,9 @@ func (c *SnowflakeConnector) dropStage(ctx context.Context, stagingPath string, func (c *SnowflakeConnector) getStageNameForJob(job string) string { return fmt.Sprintf("%s.peerdb_stage_%s", c.rawSchema, job) } + +func (c *SnowflakeConnector) IsQRepPartitionSynced(ctx context.Context, + req *protos.IsQRepPartitionSyncedInput, +) (bool, error) { + return c.pgMetadata.IsQrepPartitionSynced(ctx, req.FlowJobName, req.PartitionId) +} diff --git a/protos/flow.proto b/protos/flow.proto index 0ab9b94a35..08b87fb9d9 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -380,9 +380,14 @@ message SetupFlowOutput { map table_name_schema_mapping = 2; } -message AddTablesToPublicationInput{ +message AddTablesToPublicationInput { string flow_job_name = 1; string publication_name = 2; repeated TableMapping additional_tables = 3; } +message IsQRepPartitionSyncedInput { + string flow_job_name = 1; + string partition_id = 2; +} +