-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WaitFor #980
WaitFor #980
Changes from 74 commits
de3c2c4
0379219
a8e200c
6677efb
d84f3f6
454fcb7
855e9db
c510262
fa6891c
18c3a8d
17a505b
d01b24e
8c52d8b
950e9f4
6b635f3
46f9d28
3dd64e2
68cdc87
6a92239
b95a132
6f591db
cd620b9
d47371a
448f9fe
028d0fd
e574110
d9b68ed
86ca8b9
758b587
fbcc64c
c1ddff7
1b989fa
e6c9a27
544f59c
12cbe0a
39dcf17
53a181d
6809ab3
d88f76d
8eae927
d7c329a
a9ced64
a64df48
3b28eff
7897009
e435887
1ab2f23
cafacfd
4af0d01
aea6c65
9a3d50d
a20f178
d9ca763
2250165
8a696b0
a77938b
ae10042
6b783ff
c95180e
2ea6dbd
4056b62
4ce1f80
21fd480
446df16
6b411e7
ffaa86f
79542ac
838302a
9458da2
9f00855
afc9367
b9843c7
f9f0d0b
54d4bc4
7ad14d6
5b70e27
00d0085
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -297,17 +297,19 @@ func (p *PostgresCDCSource) consumeStream( | |
} | ||
} | ||
|
||
if (cdcRecordsStorage.Len() >= int(req.MaxBatchSize)) && !p.commitLock { | ||
return nil | ||
} | ||
if !p.commitLock { | ||
if cdcRecordsStorage.Len() >= int(req.MaxBatchSize) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cleaned up this to potentially add activity.GetWorkerStopChannel handling, but this ended up not helping for e2e testing & so integrating that can be left as something for the future |
||
return nil | ||
} | ||
|
||
if waitingForCommit && !p.commitLock { | ||
p.logger.Info(fmt.Sprintf( | ||
"[%s] commit received, returning currently accumulated records - %d", | ||
p.flowJobName, | ||
cdcRecordsStorage.Len()), | ||
) | ||
return nil | ||
if waitingForCommit { | ||
p.logger.Info(fmt.Sprintf( | ||
"[%s] commit received, returning currently accumulated records - %d", | ||
p.flowJobName, | ||
cdcRecordsStorage.Len()), | ||
) | ||
return nil | ||
} | ||
} | ||
|
||
// if we are past the next standby deadline (?) | ||
|
@@ -340,9 +342,15 @@ func (p *PostgresCDCSource) consumeStream( | |
} else { | ||
ctx, cancel = context.WithDeadline(p.ctx, nextStandbyMessageDeadline) | ||
} | ||
|
||
rawMsg, err := conn.ReceiveMessage(ctx) | ||
cancel() | ||
|
||
utils.RecordHeartbeatWithRecover(p.ctx, "consumeStream ReceiveMessage") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is put here to improve speed at which we react to context cancellation, since the error handling below is looking at |
||
ctxErr := p.ctx.Err() | ||
if ctxErr != nil { | ||
return fmt.Errorf("consumeStream preempted: %w", ctxErr) | ||
} | ||
|
||
if err != nil && !p.commitLock { | ||
if pgconn.Timeout(err) { | ||
p.logger.Info(fmt.Sprintf("Stand-by deadline reached, returning currently accumulated records - %d", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -248,8 +248,7 @@ func (c *SnowflakeConnector) getTableSchemaForTable(tableName string) (*protos.T | |
return nil, fmt.Errorf("error querying Snowflake peer for schema of table %s: %w", tableName, err) | ||
} | ||
defer func() { | ||
// not sure if the errors these two return are same or different? | ||
err = errors.Join(rows.Close(), rows.Err()) | ||
err = rows.Close() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I checked the code, they're the same |
||
if err != nil { | ||
c.logger.Error("error while closing rows for reading schema of table", | ||
slog.String("tableName", tableName), | ||
|
@@ -289,8 +288,7 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) { | |
return 0, fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err) | ||
} | ||
defer func() { | ||
// not sure if the errors these two return are same or different? | ||
err = errors.Join(rows.Close(), rows.Err()) | ||
err = rows.Close() | ||
if err != nil { | ||
c.logger.Error("error while closing rows for reading last offset", slog.Any("error", err)) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to make sure we start heartbeats before a potential
errGroup.Wait()