Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/wait-for' into normalize-wait-for
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 6, 2024
2 parents 759c21e + 21fd480 commit 6b30e4c
Show file tree
Hide file tree
Showing 17 changed files with 619 additions and 666 deletions.
1 change: 0 additions & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,3 @@ jobs:
PEERDB_CATALOG_USER: postgres
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_DATABASE: postgres
PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 10
12 changes: 6 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name)

shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string {
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("transferring records for job - %s", jobName)
})
defer shutdown()

// start a goroutine to pull records from the source
recordBatch := model.NewCDCRecordStream()
startTime := time.Now()
Expand Down Expand Up @@ -282,12 +288,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return syncResponse, nil
}

shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string {
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("pushing records for job - %s", jobName)
})
defer shutdown()

syncStartTime := time.Now()
res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{
Records: recordBatch,
Expand Down
30 changes: 19 additions & 11 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,17 +297,19 @@ func (p *PostgresCDCSource) consumeStream(
}
}

if (cdcRecordsStorage.Len() >= int(req.MaxBatchSize)) && !p.commitLock {
return nil
}
if !p.commitLock {
if cdcRecordsStorage.Len() >= int(req.MaxBatchSize) {
return nil
}

if waitingForCommit && !p.commitLock {
p.logger.Info(fmt.Sprintf(
"[%s] commit received, returning currently accumulated records - %d",
p.flowJobName,
cdcRecordsStorage.Len()),
)
return nil
if waitingForCommit {
p.logger.Info(fmt.Sprintf(
"[%s] commit received, returning currently accumulated records - %d",
p.flowJobName,
cdcRecordsStorage.Len()),
)
return nil
}
}

// if we are past the next standby deadline (?)
Expand Down Expand Up @@ -340,9 +342,15 @@ func (p *PostgresCDCSource) consumeStream(
} else {
ctx, cancel = context.WithDeadline(p.ctx, nextStandbyMessageDeadline)
}

rawMsg, err := conn.ReceiveMessage(ctx)
cancel()

utils.RecordHeartbeatWithRecover(p.ctx, "consumeStream ReceiveMessage")
ctxErr := p.ctx.Err()
if ctxErr != nil {
return fmt.Errorf("consumeStream preempted: %w", ctxErr)
}

if err != nil && !p.commitLock {
if pgconn.Timeout(err) {
p.logger.Info(fmt.Sprintf("Stand-by deadline reached, returning currently accumulated records - %d",
Expand Down
6 changes: 2 additions & 4 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ func (c *SnowflakeConnector) getTableSchemaForTable(tableName string) (*protos.T
return nil, fmt.Errorf("error querying Snowflake peer for schema of table %s: %w", tableName, err)
}
defer func() {
// not sure if the errors these two return are same or different?
err = errors.Join(rows.Close(), rows.Err())
err = rows.Close()
if err != nil {
c.logger.Error("error while closing rows for reading schema of table",
slog.String("tableName", tableName),
Expand Down Expand Up @@ -289,8 +288,7 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) {
return 0, fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err)
}
defer func() {
// not sure if the errors these two return are same or different?
err = errors.Join(rows.Close(), rows.Err())
err = rows.Close()
if err != nil {
c.logger.Error("error while closing rows for reading last offset", slog.Any("error", err))
}
Expand Down
25 changes: 11 additions & 14 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ type BigQueryTestHelper struct {
Peer *protos.Peer
// client to talk to BigQuery
client *bigquery.Client
// dataset to use for testing.
datasetName string
}

// NewBigQueryTestHelper creates a new BigQueryTestHelper.
Expand All @@ -51,7 +49,7 @@ func NewBigQueryTestHelper() (*BigQueryTestHelper, error) {
return nil, fmt.Errorf("failed to read file: %w", err)
}

var config protos.BigqueryConfig
var config *protos.BigqueryConfig
err = json.Unmarshal(content, &config)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal json: %w", err)
Expand All @@ -60,7 +58,7 @@ func NewBigQueryTestHelper() (*BigQueryTestHelper, error) {
// suffix the dataset with the runID to namespace stateful schemas.
config.DatasetId = fmt.Sprintf("%s_%d", config.DatasetId, runID)

bqsa, err := peer_bq.NewBigQueryServiceAccount(&config)
bqsa, err := peer_bq.NewBigQueryServiceAccount(config)
if err != nil {
return nil, fmt.Errorf("failed to create BigQueryServiceAccount: %v", err)
}
Expand All @@ -70,14 +68,13 @@ func NewBigQueryTestHelper() (*BigQueryTestHelper, error) {
return nil, fmt.Errorf("failed to create helper BigQuery client: %v", err)
}

peer := generateBQPeer(&config)
peer := generateBQPeer(config)

return &BigQueryTestHelper{
runID: runID,
Config: &config,
client: client,
datasetName: config.DatasetId,
Peer: peer,
runID: runID,
Config: config,
client: client,
Peer: peer,
}, nil
}

Expand Down Expand Up @@ -115,12 +112,12 @@ func (b *BigQueryTestHelper) datasetExists(datasetName string) (bool, error) {

// RecreateDataset recreates the dataset, i.e, deletes it if exists and creates it again.
func (b *BigQueryTestHelper) RecreateDataset() error {
exists, err := b.datasetExists(b.datasetName)
exists, err := b.datasetExists(b.Config.DatasetId)
if err != nil {
return fmt.Errorf("failed to check if dataset %s exists: %w", b.Config.DatasetId, err)
}

dataset := b.client.Dataset(b.datasetName)
dataset := b.client.Dataset(b.Config.DatasetId)
if exists {
err := dataset.DeleteWithContents(context.Background())
if err != nil {
Expand Down Expand Up @@ -168,7 +165,7 @@ func (b *BigQueryTestHelper) RunCommand(command string) error {

// countRows(tableName) returns the number of rows in the given table.
func (b *BigQueryTestHelper) countRows(tableName string) (int, error) {
return b.countRowsWithDataset(b.datasetName, tableName, "")
return b.countRowsWithDataset(b.Config.DatasetId, tableName, "")
}

func (b *BigQueryTestHelper) countRowsWithDataset(dataset, tableName string, nonNullCol string) (int, error) {
Expand Down Expand Up @@ -445,7 +442,7 @@ func (b *BigQueryTestHelper) CreateTable(tableName string, schema *model.QRecord
fields = append(fields, fmt.Sprintf("`%s` %s", field.Name, bqType))
}

command := fmt.Sprintf("CREATE TABLE %s.%s (%s)", b.datasetName, tableName, strings.Join(fields, ", "))
command := fmt.Sprintf("CREATE TABLE %s.%s (%s)", b.Config.DatasetId, tableName, strings.Join(fields, ", "))

err := b.RunCommand(command)
if err != nil {
Expand Down
Loading

0 comments on commit 6b30e4c

Please sign in to comment.