Skip to content

Commit

Permalink
Merge branch 'main' into create-peer-apis
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Sep 28, 2023
2 parents 46b0cb9 + 8598d15 commit beda19b
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 21 deletions.
4 changes: 2 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"reflect"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand All @@ -16,7 +17,6 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/lib/pq/oid"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
)

type PostgresCDCSource struct {
Expand Down Expand Up @@ -157,7 +157,7 @@ func (p *PostgresCDCSource) consumeStream(
}

numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(records.Records))
activity.RecordHeartbeat(p.ctx, numRowsProcessedMessage)
utils.RecordHeartbeatWithRecover(p.ctx, numRowsProcessedMessage)
log.Infof("Sent Standby status message. %s", numRowsProcessedMessage)
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
}
Expand Down
19 changes: 3 additions & 16 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
"golang.org/x/exp/maps"
)

Expand Down Expand Up @@ -545,7 +544,7 @@ func (c *PostgresConnector) GetTableSchema(
return nil, err
}
res[tableName] = tableSchema
c.recordHeartbeatWithRecover(fmt.Sprintf("fetched schema for table %s", tableName))
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("fetched schema for table %s", tableName))
}

return &protos.GetTableSchemaBatchOutput{
Expand Down Expand Up @@ -647,7 +646,7 @@ func (c *PostgresConnector) SetupNormalizedTables(req *protos.SetupNormalizedTab

tableExistsMapping[tableIdentifier] = false
log.Printf("created table %s", tableIdentifier)
c.recordHeartbeatWithRecover(fmt.Sprintf("created table %s", tableIdentifier))
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("created table %s", tableIdentifier))
}

err = createNormalizedTablesTx.Commit(c.ctx)
Expand Down Expand Up @@ -747,7 +746,7 @@ func (c *PostgresConnector) EnsurePullability(req *protos.EnsurePullabilityBatch
RelId: relID},
},
}
c.recordHeartbeatWithRecover(fmt.Sprintf("ensured pullability table %s", tableName))
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("ensured pullability table %s", tableName))
}

return &protos.EnsurePullabilityBatchOutput{TableIdentifierMapping: tableIdentifierMapping}, nil
Expand Down Expand Up @@ -871,15 +870,3 @@ func parseSchemaTable(tableName string) (*SchemaTable, error) {
Table: parts[1],
}, nil
}

// if the functions are being called outside the context of a Temporal workflow,
// activity.RecordHeartbeat panics, this is a bandaid for that.
func (c *PostgresConnector) recordHeartbeatWithRecover(details ...interface{}) {
defer func() {
if r := recover(); r != nil {
log.Warnln("ignoring panic from activity.RecordHeartbeat")
log.Warnln("this can happen when function is invoked outside of a Temporal workflow")
}
}()
activity.RecordHeartbeat(c.ctx, details...)
}
8 changes: 5 additions & 3 deletions flow/connectors/postgres/postgres_cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,11 @@ func (suite *PostgresCDCTestSuite) TestErrorForTableNotExist() {
TableNameSchemaMapping: tableNameSchemaMapping,
RelationMessageMapping: relationMessageMapping,
})
suite.Equal(0, len(recordsWithSchemaDelta.RecordBatch.Records))
suite.Nil(recordsWithSchemaDelta.TableSchemaDelta)
suite.Nil(err)
suite.Nil(recordsWithSchemaDelta)
suite.Errorf(
err,
"error while closing statement batch: ERROR: relation \"%s\" does not exist (SQLSTATE 42P01)",
nonExistentFlowSrcTableName)

err = suite.connector.PullFlowCleanup(nonExistentFlowName)
suite.failTestError(err)
Expand Down
13 changes: 13 additions & 0 deletions flow/connectors/utils/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
)

Expand All @@ -30,3 +31,15 @@ func HeartbeatRoutine(
}()
return shutdown
}

// if the functions are being called outside the context of a Temporal workflow,
// activity.RecordHeartbeat panics, this is a bandaid for that.
func RecordHeartbeatWithRecover(ctx context.Context, details ...interface{}) {
defer func() {
if r := recover(); r != nil {
log.Warnln("ignoring panic from activity.RecordHeartbeat")
log.Warnln("this can happen when function is invoked outside of a Temporal workflow")
}
}()
activity.RecordHeartbeat(ctx, details...)
}
3 changes: 3 additions & 0 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ func (s *SnapshotFlowExecution) cloneTable(
SyncMode: s.config.SnapshotSyncMode,
MaxParallelWorkers: numWorkers,
StagingPath: s.config.SnapshotStagingPath,
WriteMode: &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND,
},
}

numPartitionsProcessed := 0
Expand Down

0 comments on commit beda19b

Please sign in to comment.