Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WaitFor #980

Merged
merged 77 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 70 commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
de3c2c4
WaitFor
serprex Jan 4, 2024
0379219
debug waitfor
serprex Jan 4, 2024
a8e200c
fix test?
serprex Jan 4, 2024
6677efb
experiment: workflow only exits when we cancel it
serprex Jan 4, 2024
d84f3f6
fix columns
serprex Jan 4, 2024
454fcb7
handle cancel in cdc_flow
serprex Jan 4, 2024
855e9db
Don't use reflect.DeepEqual
serprex Jan 4, 2024
c510262
idea: need to sleep to avoid test ending before slot closed
serprex Jan 4, 2024
fa6891c
think activity context needs heartbeat to find out workflow was cance…
serprex Jan 4, 2024
18c3a8d
theory: need to specify WaitForCancellation:true
serprex Jan 4, 2024
17a505b
hoping to use existing heartbeat
serprex Jan 4, 2024
d01b24e
utils.HeartbeatRoutine: use context.WithCancel
serprex Jan 4, 2024
8c52d8b
Try waiting a long time for cancellation
serprex Jan 4, 2024
950e9f4
Wait for activity to cancel in a waitfor loop in teardown
serprex Jan 4, 2024
6b635f3
Convert rest of e2e/postgres to WaitFor
serprex Jan 5, 2024
46f9d28
First stab at snowflake
serprex Jan 5, 2024
3dd64e2
log cancel
serprex Jan 5, 2024
68cdc87
-1
serprex Jan 5, 2024
6a92239
compare schema helper, more logging
serprex Jan 5, 2024
b95a132
fix up expected schema
serprex Jan 5, 2024
6f591db
fix comparisons
serprex Jan 5, 2024
cd620b9
don't need waitgroup
serprex Jan 5, 2024
d47371a
rest of snowflake no longer using normalize flow count
serprex Jan 5, 2024
448f9fe
remove env.AssertExpectations
serprex Jan 5, 2024
028d0fd
try fixing soft delete mixup
serprex Jan 5, 2024
e574110
clean up non longer needed cmpTableName
serprex Jan 5, 2024
d9b68ed
equaltables already handles schema
serprex Jan 5, 2024
86ca8b9
more tablename cleanup
serprex Jan 5, 2024
758b587
fix softdel_iad, convert mixedcase
serprex Jan 5, 2024
fbcc64c
more table name fixing
serprex Jan 5, 2024
c1ddff7
fix error reporting, fix flow names
serprex Jan 5, 2024
1b989fa
remove no longer used parameter
serprex Jan 5, 2024
e6c9a27
fix flowjobname
serprex Jan 5, 2024
544f59c
fix missed waitfor fix
serprex Jan 5, 2024
12cbe0a
more fix
serprex Jan 6, 2024
39dcf17
Merge remote-tracking branch 'origin/main' into wait-for
serprex Jan 6, 2024
53a181d
bq
serprex Jan 6, 2024
6809ab3
fix sf exclusion
serprex Jan 6, 2024
d88f76d
more helper, missed a CancelWorkflow
serprex Jan 6, 2024
8eae927
improve error messages causing test failures
serprex Jan 6, 2024
d7c329a
fixes
serprex Jan 6, 2024
a9ced64
column exclusion: include checking c1. debug logging this last test
serprex Jan 6, 2024
a64df48
remove datasetName from bqhelper
serprex Jan 6, 2024
3b28eff
more debug
serprex Jan 6, 2024
7897009
fix
serprex Jan 6, 2024
e435887
more logging
serprex Jan 6, 2024
1ab2f23
err != nil
serprex Jan 6, 2024
cafacfd
I've managed to once again burn an hour of my time over != vs ==
serprex Jan 6, 2024
4af0d01
remove leftover logging
serprex Jan 6, 2024
aea6c65
reduce noise from waitfor
serprex Jan 6, 2024
9a3d50d
Merge remote-tracking branch 'origin/main' into wait-for
serprex Jan 6, 2024
a20f178
2 minute timeout
serprex Jan 6, 2024
d9ca763
skip 5 second sleep at start of SetupCDCFlowStatusQuery
serprex Jan 6, 2024
2250165
log count
serprex Jan 6, 2024
8a696b0
Merge remote-tracking branch 'origin/main' into wait-for
serprex Jan 6, 2024
a77938b
cleanup code, move cdc timeout into code, raise to 30, fix teardown
serprex Jan 6, 2024
ae10042
test reverting WaitForCancellation: true
serprex Jan 6, 2024
6b783ff
fix type errors
serprex Jan 6, 2024
c95180e
adjust timeouts, refactor WaitFuncSchema into WaitForSchema
serprex Jan 6, 2024
2ea6dbd
Revert "test reverting WaitForCancellation: true"
serprex Jan 6, 2024
4056b62
30->15, raise pg timeout to 2m
serprex Jan 6, 2024
4ce1f80
not sure why this delete is finnicky
serprex Jan 6, 2024
21fd480
remove logging, move count check out of wait for
serprex Jan 6, 2024
446df16
Merge branch 'main' into wait-for
serprex Jan 7, 2024
6b411e7
Merge branch 'main' into wait-for
serprex Jan 8, 2024
ffaa86f
Merge branch 'main' into wait-for
serprex Jan 9, 2024
79542ac
Merge branch 'main' into wait-for
serprex Jan 11, 2024
838302a
Fix 2 tests
serprex Jan 11, 2024
9458da2
Merge remote-tracking branch 'origin/main' into wait-for
serprex Jan 11, 2024
9f00855
more logging around TearDownPostgres
serprex Jan 11, 2024
afc9367
replace reviewdog action with official golangci action
serprex Jan 11, 2024
b9843c7
Merge remote-tracking branch 'origin/main' into wait-for
serprex Jan 12, 2024
f9f0d0b
Increase wait for margins, long term need to make heartbeats not be 2…
serprex Jan 12, 2024
54d4bc4
Merge remote-tracking branch 'origin/main' into wait-for
serprex Jan 12, 2024
7ad14d6
cleanup SetupCDCFlowStatusQuery control flow
serprex Jan 12, 2024
5b70e27
bq is slow
serprex Jan 12, 2024
00d0085
Theory: big query isn't slow, we're just getting hung up on caching
serprex Jan 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name)

shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to make sure we start heartbeats before a potential errGroup.Wait()

jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("transferring records for job - %s", jobName)
})
defer shutdown()

// start a goroutine to pull records from the source
recordBatch := model.NewCDCRecordStream()
startTime := time.Now()
Expand Down Expand Up @@ -283,12 +289,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return syncResponse, nil
}

shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string {
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("pushing records for job - %s", jobName)
})
defer shutdown()

syncStartTime := time.Now()
res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{
Records: recordBatch,
Expand Down
30 changes: 19 additions & 11 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,17 +297,19 @@ func (p *PostgresCDCSource) consumeStream(
}
}

if (cdcRecordsStorage.Len() >= int(req.MaxBatchSize)) && !p.commitLock {
return nil
}
if !p.commitLock {
if cdcRecordsStorage.Len() >= int(req.MaxBatchSize) {
Copy link
Contributor Author

@serprex serprex Jan 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned up this to potentially add activity.GetWorkerStopChannel handling, but this ended up not helping for e2e testing & so integrating that can be left as something for the future

return nil
}

if waitingForCommit && !p.commitLock {
p.logger.Info(fmt.Sprintf(
"[%s] commit received, returning currently accumulated records - %d",
p.flowJobName,
cdcRecordsStorage.Len()),
)
return nil
if waitingForCommit {
p.logger.Info(fmt.Sprintf(
"[%s] commit received, returning currently accumulated records - %d",
p.flowJobName,
cdcRecordsStorage.Len()),
)
return nil
}
}

// if we are past the next standby deadline (?)
Expand Down Expand Up @@ -340,9 +342,15 @@ func (p *PostgresCDCSource) consumeStream(
} else {
ctx, cancel = context.WithDeadline(p.ctx, nextStandbyMessageDeadline)
}

rawMsg, err := conn.ReceiveMessage(ctx)
cancel()

utils.RecordHeartbeatWithRecover(p.ctx, "consumeStream ReceiveMessage")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is put here to improve speed at which we react to context cancellation, since the error handling below is looking at p.commitLock but our context being cancelled isn't something we can ignore

ctxErr := p.ctx.Err()
if ctxErr != nil {
return fmt.Errorf("consumeStream preempted: %w", ctxErr)
}

if err != nil && !p.commitLock {
if pgconn.Timeout(err) {
p.logger.Info(fmt.Sprintf("Stand-by deadline reached, returning currently accumulated records - %d",
Expand Down
6 changes: 2 additions & 4 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ func (c *SnowflakeConnector) getTableSchemaForTable(tableName string) (*protos.T
return nil, fmt.Errorf("error querying Snowflake peer for schema of table %s: %w", tableName, err)
}
defer func() {
// not sure if the errors these two return are same or different?
err = errors.Join(rows.Close(), rows.Err())
err = rows.Close()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the code, they're the same

if err != nil {
c.logger.Error("error while closing rows for reading schema of table",
slog.String("tableName", tableName),
Expand Down Expand Up @@ -289,8 +288,7 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) {
return 0, fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err)
}
defer func() {
// not sure if the errors these two return are same or different?
err = errors.Join(rows.Close(), rows.Err())
err = rows.Close()
if err != nil {
c.logger.Error("error while closing rows for reading last offset", slog.Any("error", err))
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/utils/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func HeartbeatRoutine(
}
}
}()
return func() { shutdown <- struct{}{} }
return func() { close(shutdown) }
}

// if the functions are being called outside the context of a Temporal workflow,
Expand Down
25 changes: 11 additions & 14 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ type BigQueryTestHelper struct {
Peer *protos.Peer
// client to talk to BigQuery
client *bigquery.Client
// dataset to use for testing.
datasetName string
}

// NewBigQueryTestHelper creates a new BigQueryTestHelper.
Expand All @@ -51,7 +49,7 @@ func NewBigQueryTestHelper() (*BigQueryTestHelper, error) {
return nil, fmt.Errorf("failed to read file: %w", err)
}

var config protos.BigqueryConfig
var config *protos.BigqueryConfig
err = json.Unmarshal(content, &config)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal json: %w", err)
Expand All @@ -60,7 +58,7 @@ func NewBigQueryTestHelper() (*BigQueryTestHelper, error) {
// suffix the dataset with the runID to namespace stateful schemas.
config.DatasetId = fmt.Sprintf("%s_%d", config.DatasetId, runID)

bqsa, err := peer_bq.NewBigQueryServiceAccount(&config)
bqsa, err := peer_bq.NewBigQueryServiceAccount(config)
if err != nil {
return nil, fmt.Errorf("failed to create BigQueryServiceAccount: %v", err)
}
Expand All @@ -70,14 +68,13 @@ func NewBigQueryTestHelper() (*BigQueryTestHelper, error) {
return nil, fmt.Errorf("failed to create helper BigQuery client: %v", err)
}

peer := generateBQPeer(&config)
peer := generateBQPeer(config)

return &BigQueryTestHelper{
runID: runID,
Config: &config,
client: client,
datasetName: config.DatasetId,
Peer: peer,
runID: runID,
Config: config,
client: client,
Peer: peer,
}, nil
}

Expand Down Expand Up @@ -115,12 +112,12 @@ func (b *BigQueryTestHelper) datasetExists(datasetName string) (bool, error) {

// RecreateDataset recreates the dataset, i.e, deletes it if exists and creates it again.
func (b *BigQueryTestHelper) RecreateDataset() error {
exists, err := b.datasetExists(b.datasetName)
exists, err := b.datasetExists(b.Config.DatasetId)
if err != nil {
return fmt.Errorf("failed to check if dataset %s exists: %w", b.Config.DatasetId, err)
}

dataset := b.client.Dataset(b.datasetName)
dataset := b.client.Dataset(b.Config.DatasetId)
if exists {
err := dataset.DeleteWithContents(context.Background())
if err != nil {
Expand Down Expand Up @@ -168,7 +165,7 @@ func (b *BigQueryTestHelper) RunCommand(command string) error {

// countRows(tableName) returns the number of rows in the given table.
func (b *BigQueryTestHelper) countRows(tableName string) (int, error) {
return b.countRowsWithDataset(b.datasetName, tableName, "")
return b.countRowsWithDataset(b.Config.DatasetId, tableName, "")
}

func (b *BigQueryTestHelper) countRowsWithDataset(dataset, tableName string, nonNullCol string) (int, error) {
Expand Down Expand Up @@ -474,7 +471,7 @@ func (b *BigQueryTestHelper) CreateTable(tableName string, schema *model.QRecord
fields = append(fields, fmt.Sprintf("`%s` %s", field.Name, bqType))
}

command := fmt.Sprintf("CREATE TABLE %s.%s (%s)", b.datasetName, tableName, strings.Join(fields, ", "))
command := fmt.Sprintf("CREATE TABLE %s.%s (%s)", b.Config.DatasetId, tableName, strings.Join(fields, ", "))

err := b.RunCommand(command)
if err != nil {
Expand Down
Loading
Loading