Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

supporting Snowflake as a source for full table QRep #1368

Closed
wants to merge 13 commits into from

Conversation

heavycrystal
Copy link
Contributor

@heavycrystal heavycrystal commented Feb 23, 2024

sample command for reference:

CREATE MIRROR watermark3 FROM sf_target TO catalog FOR
$$
  SELECT * FROM public.retwal1
$$
WITH (
        destination_table_name = 'public.retwal2',
        watermark_column = '', -- has to be empty
        watermark_table_name = 'public.retwal1',
        mode = 'overwrite',
        parallelism = 2, -- no effect
        refresh_interval = 10, -- no effect
        num_rows_per_partition = 500, -- no effect
        setup_watermark_table_on_destination = true,
        initial_copy_only=true
);

UI:
Screenshot 2024-02-26 at 6 24 33 PM

  1. some refactors for QRepPullConnector, SQL Server is now not a valid QRepPullConnector

@heavycrystal heavycrystal marked this pull request as draft February 23, 2024 20:21

stream, err = recordBatch.ToQRecordStream(bufferSize)
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should split out a preceding PR to make this use errgroup instead of waitgroup+goroutine+goroutineErr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Amogh-Bharadwaj
Copy link
Contributor

Amogh-Bharadwaj commented Feb 27, 2024

Should SQL Server tests and QRep support be removed in a preceding PR? @heavycrystal @iskakaushik
Those tests are failing since we've removed SQL Server as a QRep pull connector here

@@ -141,6 +141,21 @@ func (a *FlowableActivity) GetTableSchema(
return srcConn.GetTableSchema(ctx, config)
}

// GetTableSchema returns the schema of a table.
Copy link
Contributor

@serprex serprex Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// GetTableSchema returns the schema of a table.
// GetTableSchemaQRep returns the schema of a table.

Better off moving GetTableSchema to its own connector interface & having both workflows use same activity method


tables = append(tables, table.String)
}
return &protos.AllTablesResponse{Tables: tables}, nil
Copy link
Contributor

@serprex serprex Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's surprising, but you also need to check rows.Err() after loop. Which is why the pgx Collect methods are nice, because they handle this all

@@ -136,11 +136,15 @@ type CDCNormalizeConnector interface {
type QRepPullConnector interface {
Connector

// GetTableSchema returns the schema of a table.
GetTableSchema(ctx context.Context, req *protos.GetTableSchemaBatchInput) (*protos.GetTableSchemaBatchOutput, error)

// GetQRepPartitions returns the partitions for a given table that haven't been synced yet.
GetQRepPartitions(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition) ([]*protos.QRepPartition, error)

// PullQRepRecords returns the records for a given partition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// PullQRepRecords returns the records for a given partition.
// PullQRepRecordStream returns the records for a given partition.

Need to update rest of comment too

@@ -255,7 +259,8 @@ var (
_ NormalizedTablesConnector = &connclickhouse.ClickhouseConnector{}

_ QRepPullConnector = &connpostgres.PostgresConnector{}
_ QRepPullConnector = &connsqlserver.SQLServerConnector{}
// _ QRepPullConnector = &connsqlserver.SQLServerConnector{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to implement GetTableSchema for SQLServerConnector

Or push to have support for it removed

defer func() {
rows.Close()

// description of .Close() says it should only be called once rows are closed
Copy link
Contributor

@serprex serprex Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// description of .Close() says it should only be called once rows are closed
// description of .Err() says it should only be called once rows are closed

Seems like it'd be more straightforward to explicitly call rows.Close before the rows.Err check before. Fine to close multiple times

Amogh-Bharadwaj added a commit that referenced this pull request Feb 28, 2024
Replaces a usage waitgroup with errgroup in replicateQRepPartitions in
flowable.go
This is a preceding PR to #1368
// GetQRepPartitions returns the partitions for a given table that haven't been synced yet.
GetQRepPartitions(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition) ([]*protos.QRepPartition, error)

// PullQRepRecords returns the records for a given partition.
PullQRepRecords(ctx context.Context, config *protos.QRepConfig, partition *protos.QRepPartition) (*model.QRecordBatch, error)
PullQRepRecordStream(ctx context.Context, config *protos.QRepConfig,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can create something like a QRepPullStreamConnector interface instead, prefer that, fallback to QRepPullConnector when ErrUnsupportedFunctionality

@heavycrystal
Copy link
Contributor Author

not mainlining this for now, will reraise PR and address issues then

@serprex serprex deleted the sf-full-table-qrep branch July 19, 2024 15:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants