From 6da675e5824675a284b713a3549cac20b0267d18 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Thu, 28 Sep 2023 22:03:17 +0530 Subject: [PATCH 1/4] added WriteMode for Snapshot config, since we assert that for overwrite now --- flow/workflows/snapshot_flow.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 8d4d5faf90..585ae0bc25 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -148,6 +148,9 @@ func (s *SnapshotFlowExecution) cloneTable( SyncMode: s.config.SnapshotSyncMode, MaxParallelWorkers: numWorkers, StagingPath: s.config.SnapshotStagingPath, + WriteMode: &protos.QRepWriteMode{ + WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND, + }, } numPartitionsProcessed := 0 From f505cc4c09e97541370e64ba4a5d66eb5b1b4df0 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Thu, 28 Sep 2023 22:55:52 +0530 Subject: [PATCH 2/4] fixed heartbeat panic in cdc.go --- flow/connectors/postgres/cdc.go | 4 ++-- flow/connectors/postgres/postgres.go | 19 +++---------------- flow/connectors/utils/heartbeat.go | 13 +++++++++++++ 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 641d5f8543..65e75acfc0 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -6,6 +6,7 @@ import ( "reflect" "time" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -16,7 +17,6 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/lib/pq/oid" log "github.com/sirupsen/logrus" - "go.temporal.io/sdk/activity" ) type PostgresCDCSource struct { @@ -157,7 +157,7 @@ func (p *PostgresCDCSource) consumeStream( } numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(records.Records)) - activity.RecordHeartbeat(p.ctx, numRowsProcessedMessage) + utils.RecordHeartbeatWithRecover(p.ctx, numRowsProcessedMessage) log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 46f946e77b..f37fc88bdf 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -20,7 +20,6 @@ import ( "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" log "github.com/sirupsen/logrus" - "go.temporal.io/sdk/activity" "golang.org/x/exp/maps" ) @@ -545,7 +544,7 @@ func (c *PostgresConnector) GetTableSchema( return nil, err } res[tableName] = tableSchema - c.recordHeartbeatWithRecover(fmt.Sprintf("fetched schema for table %s", tableName)) + utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("fetched schema for table %s", tableName)) } return &protos.GetTableSchemaBatchOutput{ @@ -647,7 +646,7 @@ func (c *PostgresConnector) SetupNormalizedTables(req *protos.SetupNormalizedTab tableExistsMapping[tableIdentifier] = false log.Printf("created table %s", tableIdentifier) - c.recordHeartbeatWithRecover(fmt.Sprintf("created table %s", tableIdentifier)) + utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("created table %s", tableIdentifier)) } err = createNormalizedTablesTx.Commit(c.ctx) @@ -747,7 +746,7 @@ func (c *PostgresConnector) EnsurePullability(req *protos.EnsurePullabilityBatch RelId: relID}, }, } - c.recordHeartbeatWithRecover(fmt.Sprintf("ensured pullability table %s", tableName)) + utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("ensured pullability table %s", tableName)) } return &protos.EnsurePullabilityBatchOutput{TableIdentifierMapping: tableIdentifierMapping}, nil @@ -871,15 +870,3 @@ func parseSchemaTable(tableName string) (*SchemaTable, error) { Table: parts[1], }, nil } - -// if the functions are being called outside the context of a Temporal workflow, -// activity.RecordHeartbeat panics, this is a bandaid for that. -func (c *PostgresConnector) recordHeartbeatWithRecover(details ...interface{}) { - defer func() { - if r := recover(); r != nil { - log.Warnln("ignoring panic from activity.RecordHeartbeat") - log.Warnln("this can happen when function is invoked outside of a Temporal workflow") - } - }() - activity.RecordHeartbeat(c.ctx, details...) -} diff --git a/flow/connectors/utils/heartbeat.go b/flow/connectors/utils/heartbeat.go index 78f5503876..3494399f1a 100644 --- a/flow/connectors/utils/heartbeat.go +++ b/flow/connectors/utils/heartbeat.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/prometheus/common/log" "go.temporal.io/sdk/activity" ) @@ -30,3 +31,15 @@ func HeartbeatRoutine( }() return shutdown } + +// if the functions are being called outside the context of a Temporal workflow, +// activity.RecordHeartbeat panics, this is a bandaid for that. +func RecordHeartbeatWithRecover(ctx context.Context, details ...interface{}) { + defer func() { + if r := recover(); r != nil { + log.Warnln("ignoring panic from activity.RecordHeartbeat") + log.Warnln("this can happen when function is invoked outside of a Temporal workflow") + } + }() + activity.RecordHeartbeat(ctx, details...) +} From f47ff9a1e8d0955553673d75fd316bec1cd92017 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Thu, 28 Sep 2023 22:59:58 +0530 Subject: [PATCH 3/4] imported wrong log oops --- flow/connectors/utils/heartbeat.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/utils/heartbeat.go b/flow/connectors/utils/heartbeat.go index 3494399f1a..9809a41b6b 100644 --- a/flow/connectors/utils/heartbeat.go +++ b/flow/connectors/utils/heartbeat.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/prometheus/common/log" + log "github.com/sirupsen/logrus" "go.temporal.io/sdk/activity" ) From 0ddfc5c486bb8d877c8ef29bbdeeb7875fbdef0e Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Thu, 28 Sep 2023 23:41:11 +0530 Subject: [PATCH 4/4] fixing non-existing table test, errors now --- flow/connectors/postgres/postgres_cdc_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/flow/connectors/postgres/postgres_cdc_test.go b/flow/connectors/postgres/postgres_cdc_test.go index ddc9dc3d82..2735c5ff49 100644 --- a/flow/connectors/postgres/postgres_cdc_test.go +++ b/flow/connectors/postgres/postgres_cdc_test.go @@ -448,9 +448,11 @@ func (suite *PostgresCDCTestSuite) TestErrorForTableNotExist() { TableNameSchemaMapping: tableNameSchemaMapping, RelationMessageMapping: relationMessageMapping, }) - suite.Equal(0, len(recordsWithSchemaDelta.RecordBatch.Records)) - suite.Nil(recordsWithSchemaDelta.TableSchemaDelta) - suite.Nil(err) + suite.Nil(recordsWithSchemaDelta) + suite.Errorf( + err, + "error while closing statement batch: ERROR: relation \"%s\" does not exist (SQLSTATE 42P01)", + nonExistentFlowSrcTableName) err = suite.connector.PullFlowCleanup(nonExistentFlowName) suite.failTestError(err)