From 8598d1568819f9072a909785d2c8e316ab5cdbf5 Mon Sep 17 00:00:00 2001 From: Kevin K Biju <52661649+heavycrystal@users.noreply.github.com> Date: Thu, 28 Sep 2023 18:24:43 +0000 Subject: [PATCH] added WriteMode for Snapshot config (#447) --- flow/connectors/postgres/cdc.go | 4 ++-- flow/connectors/postgres/postgres.go | 19 +++---------------- flow/connectors/postgres/postgres_cdc_test.go | 8 +++++--- flow/connectors/utils/heartbeat.go | 13 +++++++++++++ flow/workflows/snapshot_flow.go | 3 +++ 5 files changed, 26 insertions(+), 21 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 641d5f8543..65e75acfc0 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -6,6 +6,7 @@ import ( "reflect" "time" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -16,7 +17,6 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/lib/pq/oid" log "github.com/sirupsen/logrus" - "go.temporal.io/sdk/activity" ) type PostgresCDCSource struct { @@ -157,7 +157,7 @@ func (p *PostgresCDCSource) consumeStream( } numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(records.Records)) - activity.RecordHeartbeat(p.ctx, numRowsProcessedMessage) + utils.RecordHeartbeatWithRecover(p.ctx, numRowsProcessedMessage) log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 46f946e77b..f37fc88bdf 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -20,7 +20,6 @@ import ( "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" log "github.com/sirupsen/logrus" - "go.temporal.io/sdk/activity" "golang.org/x/exp/maps" ) @@ -545,7 +544,7 @@ func (c *PostgresConnector) GetTableSchema( return nil, err } res[tableName] = tableSchema - c.recordHeartbeatWithRecover(fmt.Sprintf("fetched schema for table %s", tableName)) + utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("fetched schema for table %s", tableName)) } return &protos.GetTableSchemaBatchOutput{ @@ -647,7 +646,7 @@ func (c *PostgresConnector) SetupNormalizedTables(req *protos.SetupNormalizedTab tableExistsMapping[tableIdentifier] = false log.Printf("created table %s", tableIdentifier) - c.recordHeartbeatWithRecover(fmt.Sprintf("created table %s", tableIdentifier)) + utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("created table %s", tableIdentifier)) } err = createNormalizedTablesTx.Commit(c.ctx) @@ -747,7 +746,7 @@ func (c *PostgresConnector) EnsurePullability(req *protos.EnsurePullabilityBatch RelId: relID}, }, } - c.recordHeartbeatWithRecover(fmt.Sprintf("ensured pullability table %s", tableName)) + utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("ensured pullability table %s", tableName)) } return &protos.EnsurePullabilityBatchOutput{TableIdentifierMapping: tableIdentifierMapping}, nil @@ -871,15 +870,3 @@ func parseSchemaTable(tableName string) (*SchemaTable, error) { Table: parts[1], }, nil } - -// if the functions are being called outside the context of a Temporal workflow, -// activity.RecordHeartbeat panics, this is a bandaid for that. -func (c *PostgresConnector) recordHeartbeatWithRecover(details ...interface{}) { - defer func() { - if r := recover(); r != nil { - log.Warnln("ignoring panic from activity.RecordHeartbeat") - log.Warnln("this can happen when function is invoked outside of a Temporal workflow") - } - }() - activity.RecordHeartbeat(c.ctx, details...) -} diff --git a/flow/connectors/postgres/postgres_cdc_test.go b/flow/connectors/postgres/postgres_cdc_test.go index ddc9dc3d82..2735c5ff49 100644 --- a/flow/connectors/postgres/postgres_cdc_test.go +++ b/flow/connectors/postgres/postgres_cdc_test.go @@ -448,9 +448,11 @@ func (suite *PostgresCDCTestSuite) TestErrorForTableNotExist() { TableNameSchemaMapping: tableNameSchemaMapping, RelationMessageMapping: relationMessageMapping, }) - suite.Equal(0, len(recordsWithSchemaDelta.RecordBatch.Records)) - suite.Nil(recordsWithSchemaDelta.TableSchemaDelta) - suite.Nil(err) + suite.Nil(recordsWithSchemaDelta) + suite.Errorf( + err, + "error while closing statement batch: ERROR: relation \"%s\" does not exist (SQLSTATE 42P01)", + nonExistentFlowSrcTableName) err = suite.connector.PullFlowCleanup(nonExistentFlowName) suite.failTestError(err) diff --git a/flow/connectors/utils/heartbeat.go b/flow/connectors/utils/heartbeat.go index 78f5503876..9809a41b6b 100644 --- a/flow/connectors/utils/heartbeat.go +++ b/flow/connectors/utils/heartbeat.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + log "github.com/sirupsen/logrus" "go.temporal.io/sdk/activity" ) @@ -30,3 +31,15 @@ func HeartbeatRoutine( }() return shutdown } + +// if the functions are being called outside the context of a Temporal workflow, +// activity.RecordHeartbeat panics, this is a bandaid for that. +func RecordHeartbeatWithRecover(ctx context.Context, details ...interface{}) { + defer func() { + if r := recover(); r != nil { + log.Warnln("ignoring panic from activity.RecordHeartbeat") + log.Warnln("this can happen when function is invoked outside of a Temporal workflow") + } + }() + activity.RecordHeartbeat(ctx, details...) +} diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 8d4d5faf90..585ae0bc25 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -148,6 +148,9 @@ func (s *SnapshotFlowExecution) cloneTable( SyncMode: s.config.SnapshotSyncMode, MaxParallelWorkers: numWorkers, StagingPath: s.config.SnapshotStagingPath, + WriteMode: &protos.QRepWriteMode{ + WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND, + }, } numPartitionsProcessed := 0