Skip to content

Commit

Permalink
Merge branch 'main' into schema-changes-diff-from-schema
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Mar 8, 2024
2 parents 39214ae + 0496eb5 commit 6acbb86
Show file tree
Hide file tree
Showing 35 changed files with 593 additions and 569 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
matrix:
runner: [ubicloud-standard-16-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
timeout-minutes: 40
timeout-minutes: 30
services:
catalog:
image: imresamu/postgis:15-3.4-alpine
Expand Down Expand Up @@ -96,7 +96,7 @@ jobs:
temporal operator search-attribute create --name MirrorName --type Text --namespace default
./peer-flow worker &
./peer-flow snapshot-worker &
go test -p 32 ./... -timeout 1200s
go test -p 32 ./... -timeout 900s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
37 changes: 24 additions & 13 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (a *FlowableActivity) SyncFlow(
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "transferring records for job - " + flowName
return "transferring records for job"
})
defer shutdown()

Expand Down Expand Up @@ -467,7 +467,7 @@ func (a *FlowableActivity) StartNormalize(
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "normalizing records from batch for job - " + input.FlowConnectionConfigs.FlowJobName
return "normalizing records from batch for job"
})
defer shutdown()

Expand Down Expand Up @@ -535,7 +535,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
defer connectors.CloseConnector(ctx, srcConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "getting partitions for job - " + config.FlowJobName
return "getting partitions for job"
})
defer shutdown()

Expand Down Expand Up @@ -604,12 +604,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now())
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to update start time for partition: %w", err)
}

srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand All @@ -624,6 +618,25 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}
defer connectors.CloseConnector(ctx, dstConn)

done, err := dstConn.IsQRepPartitionSynced(ctx, &protos.IsQRepPartitionSyncedInput{
FlowJobName: config.FlowJobName,
PartitionId: partition.PartitionId,
})
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to get fetch status of partition: %w", err)
}
if done {
logger.Info("no records to push for partition " + partition.PartitionId)
return nil
}

err = monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now())
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to update start time for partition: %w", err)
}

logger.Info("replicating partition " + partition.PartitionId)
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
Expand Down Expand Up @@ -698,8 +711,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
if err != nil {
return err
}
} else {
logger.Info("no records to push for partition " + partition.PartitionId)
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
Expand All @@ -718,7 +729,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "consolidating partitions for job - " + config.FlowJobName
return "consolidating partitions for job"
})
defer shutdown()

Expand Down Expand Up @@ -973,7 +984,7 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "renaming tables for job - " + config.FlowJobName
return "renaming tables for job"
})
defer shutdown()

Expand Down
6 changes: 3 additions & 3 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s
connectors.CloseConnector(ctx, s.connector)
delete(a.SnapshotConnections, flowJobName)
}
a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job - "+flowJobName)
a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job")

return nil
}
Expand All @@ -50,7 +50,7 @@ func (a *SnapshotActivity) SetupReplication(
return nil, nil
}

a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job - "+config.FlowJobName)
a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job")

conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
if err != nil {
Expand Down Expand Up @@ -84,7 +84,7 @@ func (a *SnapshotActivity) SetupReplication(
var slotInfo connpostgres.SlotCreationResult
select {
case slotInfo = <-slotSignal.SlotCreated:
logger.Info("slot created", slotInfo.SlotName)
logger.Info("slot created", slog.String("SlotName", slotInfo.SlotName))
case err := <-replicationErr:
closeConnectionForError(err)
return nil, fmt.Errorf("failed to setup replication: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ func (h *FlowRequestHandler) cloneTableSummary(

rows, err := h.pool.Query(ctx, q, "clone_"+flowJobName+"_%")
if err != nil {
slog.Error("unable to query initial load partition - "+flowJobName, slog.Any("error", err))
slog.Error("unable to query initial load partition",
slog.String(string(shared.FlowNameKey), flowJobName), slog.Any("error", err))
return nil, fmt.Errorf("unable to query initial load partition - %s: %w", flowJobName, err)
}

Expand Down
15 changes: 6 additions & 9 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,6 @@ func (c *BigQueryConnector) SyncQRepRecords(
return 0, err
}

done, err := c.pgMetadata.IsQrepPartitionSynced(ctx, config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err)
}

if done {
c.logger.Info(fmt.Sprintf("Partition %s has already been synced", partition.PartitionId))
return 0, nil
}
c.logger.Info(fmt.Sprintf("QRep sync function called and partition existence checked for"+
" partition %s of destination table %s",
partition.PartitionId, destTable))
Expand Down Expand Up @@ -111,3 +102,9 @@ func (c *BigQueryConnector) SetupQRepMetadataTables(ctx context.Context, config

return nil
}

func (c *BigQueryConnector) IsQRepPartitionSynced(ctx context.Context,
req *protos.IsQRepPartitionSyncedInput,
) (bool, error) {
return c.pgMetadata.IsQrepPartitionSynced(ctx, req.FlowJobName, req.PartitionId)
}
16 changes: 4 additions & 12 deletions flow/connectors/clickhouse/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,6 @@ func (c *ClickhouseConnector) SyncQRepRecords(
slog.String("destinationTable", destTable),
)

done, err := c.isPartitionSynced(ctx, partition.PartitionId)
if err != nil {
return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err)
}

if done {
c.logger.Info("Partition has already been synced", flowLog)
return 0, nil
}

tblSchema, err := c.getTableSchema(destTable)
if err != nil {
return 0, fmt.Errorf("failed to get schema of table %s: %w", destTable, err)
Expand Down Expand Up @@ -96,9 +86,11 @@ func (c *ClickhouseConnector) getTableSchema(tableName string) ([]*sql.ColumnTyp
return columnTypes, nil
}

func (c *ClickhouseConnector) isPartitionSynced(ctx context.Context, partitionID string) (bool, error) {
func (c *ClickhouseConnector) IsQRepPartitionSynced(ctx context.Context,
req *protos.IsQRepPartitionSyncedInput,
) (bool, error) {
//nolint:gosec
queryString := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE partitionID = '%s'`, qRepMetadataTableName, partitionID)
queryString := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE partitionID = '%s'`, qRepMetadataTableName, req.PartitionId)

row := c.database.QueryRowContext(ctx, queryString)

Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,12 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
continue
}

selector = append(selector, colName)
selector = append(selector, "`"+colName+"`")
}
selectorStr := strings.Join(selector, ",")
//nolint:gosec
query := fmt.Sprintf("INSERT INTO %s(%s) SELECT * FROM s3('%s','%s','%s', 'Avro')",
config.DestinationTableIdentifier, selectorStr, avroFileUrl,
query := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s FROM s3('%s','%s','%s', 'Avro')",
config.DestinationTableIdentifier, selectorStr, selectorStr, avroFileUrl,
s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey)

_, err = s.connector.database.ExecContext(ctx, query)
Expand Down
4 changes: 4 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ type QRepPullConnector interface {
type QRepSyncConnector interface {
Connector

// IsQRepPartitionSynced returns true if a partition has already been synced
IsQRepPartitionSynced(ctx context.Context, req *protos.IsQRepPartitionSyncedInput) (bool, error)

// SetupQRepMetadataTables sets up the metadata tables for QRep.
SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error

Expand Down Expand Up @@ -261,6 +264,7 @@ var (
_ QRepSyncConnector = &connbigquery.BigQueryConnector{}
_ QRepSyncConnector = &connsnowflake.SnowflakeConnector{}
_ QRepSyncConnector = &connclickhouse.ClickhouseConnector{}
_ QRepSyncConnector = &conns3.S3Connector{}

_ QRepConsolidateConnector = &connsnowflake.SnowflakeConnector{}
_ QRepConsolidateConnector = &connclickhouse.ClickhouseConnector{}
Expand Down
40 changes: 21 additions & 19 deletions flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,23 +196,25 @@ func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() {
}

func TestPostgresSchemaDeltaTestSuite(t *testing.T) {
e2eshared.RunSuite(t, SetupSuite, func(s PostgresSchemaDeltaTestSuite) {
teardownTx, err := s.connector.conn.Begin(context.Background())
require.NoError(s.t, err)
defer func() {
err := teardownTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
require.NoError(s.t, err)
}
}()
_, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
s.schema))
require.NoError(s.t, err)
err = teardownTx.Commit(context.Background())
require.NoError(s.t, err)

require.NoError(s.t, s.connector.ConnectionActive(context.Background()))
require.NoError(s.t, s.connector.Close())
require.Error(s.t, s.connector.ConnectionActive(context.Background()))
})
e2eshared.RunSuite(t, SetupSuite)
}

func (s PostgresSchemaDeltaTestSuite) Teardown() {
teardownTx, err := s.connector.conn.Begin(context.Background())
require.NoError(s.t, err)
defer func() {
err := teardownTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
require.NoError(s.t, err)
}
}()
_, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
s.schema))
require.NoError(s.t, err)
err = teardownTx.Commit(context.Background())
require.NoError(s.t, err)

require.NoError(s.t, s.connector.ConnectionActive(context.Background()))
require.NoError(s.t, s.connector.Close())
require.Error(s.t, s.connector.ConnectionActive(context.Background()))
}
19 changes: 6 additions & 13 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,15 +458,6 @@ func (c *PostgresConnector) SyncQRepRecords(
return 0, fmt.Errorf("table %s does not exist, used schema: %s", dstTable.Table, dstTable.Schema)
}

done, err := c.isPartitionSynced(ctx, partition.PartitionId)
if err != nil {
return 0, fmt.Errorf("failed to check if partition is synced: %w", err)
}

if done {
c.logger.Info(fmt.Sprintf("partition %s already synced", partition.PartitionId))
return 0, nil
}
c.logger.Info("SyncRecords called and initial checks complete.")

stagingTableSync := &QRepStagingTableSync{connector: c}
Expand Down Expand Up @@ -573,18 +564,20 @@ func BuildQuery(logger log.Logger, query string, flowJobName string) (string, er
return res, nil
}

// isPartitionSynced checks whether a specific partition is synced
func (c *PostgresConnector) isPartitionSynced(ctx context.Context, partitionID string) (bool, error) {
// IsQRepPartitionSynced checks whether a specific partition is synced
func (c *PostgresConnector) IsQRepPartitionSynced(ctx context.Context,
req *protos.IsQRepPartitionSyncedInput,
) (bool, error) {
// setup the query string
metadataTableIdentifier := pgx.Identifier{c.metadataSchema, qRepMetadataTableName}
queryString := fmt.Sprintf(
"SELECT COUNT(*)>0 FROM %s WHERE partitionID = $1;",
"SELECT COUNT(*)>0 FROM %s WHERE partitionID=$1;",
metadataTableIdentifier.Sanitize(),
)

// prepare and execute the query
var result bool
err := c.conn.QueryRow(ctx, queryString, partitionID).Scan(&result)
err := c.conn.QueryRow(ctx, queryString, req.PartitionId).Scan(&result)
if err != nil {
return false, fmt.Errorf("failed to execute query: %w", err)
}
Expand Down
7 changes: 7 additions & 0 deletions flow/connectors/s3/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,10 @@ func (c *S3Connector) SetupQRepMetadataTables(_ context.Context, config *protos.
c.logger.Info("QRep metadata setup not needed for S3.")
return nil
}

// S3 doesn't check if partition is already synced, but file with same name is overwritten
func (c *S3Connector) IsQRepPartitionSynced(_ context.Context,
config *protos.IsQRepPartitionSyncedInput,
) (bool, error) {
return false, nil
}
16 changes: 6 additions & 10 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,6 @@ func (c *SnowflakeConnector) SyncQRepRecords(
}
c.logger.Info("Called QRep sync function and obtained table schema", flowLog)

done, err := c.pgMetadata.IsQrepPartitionSynced(ctx, config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err)
}

if done {
c.logger.Info("Partition has already been synced", flowLog)
return 0, nil
}

avroSync := NewSnowflakeAvroSyncHandler(config, c)
return avroSync.SyncQRepRecords(ctx, config, partition, tblSchema, stream)
}
Expand Down Expand Up @@ -282,3 +272,9 @@ func (c *SnowflakeConnector) dropStage(ctx context.Context, stagingPath string,
func (c *SnowflakeConnector) getStageNameForJob(job string) string {
return fmt.Sprintf("%s.peerdb_stage_%s", c.rawSchema, job)
}

func (c *SnowflakeConnector) IsQRepPartitionSynced(ctx context.Context,
req *protos.IsQRepPartitionSyncedInput,
) (bool, error) {
return c.pgMetadata.IsQrepPartitionSynced(ctx, req.FlowJobName, req.PartitionId)
}
Loading

0 comments on commit 6acbb86

Please sign in to comment.