-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
supporting Snowflake as a source for full table QRep #1368
Conversation
flow/activities/flowable.go
Outdated
|
||
stream, err = recordBatch.ToQRecordStream(bufferSize) | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should split out a preceding PR to make this use errgroup instead of waitgroup+goroutine+goroutineErr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e78a04c
to
ec32b34
Compare
4aa074a
to
1d8f164
Compare
Should SQL Server tests and QRep support be removed in a preceding PR? @heavycrystal @iskakaushik |
@@ -141,6 +141,21 @@ func (a *FlowableActivity) GetTableSchema( | |||
return srcConn.GetTableSchema(ctx, config) | |||
} | |||
|
|||
// GetTableSchema returns the schema of a table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// GetTableSchema returns the schema of a table. | |
// GetTableSchemaQRep returns the schema of a table. |
Better off moving GetTableSchema to its own connector interface & having both workflows use same activity method
|
||
tables = append(tables, table.String) | ||
} | ||
return &protos.AllTablesResponse{Tables: tables}, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's surprising, but you also need to check rows.Err()
after loop. Which is why the pgx Collect methods are nice, because they handle this all
flow/connectors/core.go
Outdated
@@ -136,11 +136,15 @@ type CDCNormalizeConnector interface { | |||
type QRepPullConnector interface { | |||
Connector | |||
|
|||
// GetTableSchema returns the schema of a table. | |||
GetTableSchema(ctx context.Context, req *protos.GetTableSchemaBatchInput) (*protos.GetTableSchemaBatchOutput, error) | |||
|
|||
// GetQRepPartitions returns the partitions for a given table that haven't been synced yet. | |||
GetQRepPartitions(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition) ([]*protos.QRepPartition, error) | |||
|
|||
// PullQRepRecords returns the records for a given partition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// PullQRepRecords returns the records for a given partition. | |
// PullQRepRecordStream returns the records for a given partition. |
Need to update rest of comment too
@@ -255,7 +259,8 @@ var ( | |||
_ NormalizedTablesConnector = &connclickhouse.ClickhouseConnector{} | |||
|
|||
_ QRepPullConnector = &connpostgres.PostgresConnector{} | |||
_ QRepPullConnector = &connsqlserver.SQLServerConnector{} | |||
// _ QRepPullConnector = &connsqlserver.SQLServerConnector{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to implement GetTableSchema for SQLServerConnector
Or push to have support for it removed
defer func() { | ||
rows.Close() | ||
|
||
// description of .Close() says it should only be called once rows are closed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// description of .Close() says it should only be called once rows are closed | |
// description of .Err() says it should only be called once rows are closed |
Seems like it'd be more straightforward to explicitly call rows.Close before the rows.Err check before. Fine to close multiple times
Replaces a usage waitgroup with errgroup in replicateQRepPartitions in flowable.go This is a preceding PR to #1368
// GetQRepPartitions returns the partitions for a given table that haven't been synced yet. | ||
GetQRepPartitions(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition) ([]*protos.QRepPartition, error) | ||
|
||
// PullQRepRecords returns the records for a given partition. | ||
PullQRepRecords(ctx context.Context, config *protos.QRepConfig, partition *protos.QRepPartition) (*model.QRecordBatch, error) | ||
PullQRepRecordStream(ctx context.Context, config *protos.QRepConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can create something like a QRepPullStreamConnector interface instead, prefer that, fallback to QRepPullConnector when ErrUnsupportedFunctionality
1d8f164
to
10adad0
Compare
not mainlining this for now, will reraise PR and address issues then |
sample command for reference:
UI:
QRepPullConnector
, SQL Server is now not a validQRepPullConnector