-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DRAFT] CDC mirrors with SCHEMA MAPPING support #420
Conversation
for _, table := range populateTableMappingFromSchemasOutput.TableMapping[srcSchema].Tables { | ||
flowConnectionConfigs.TableNameMapping[fmt.Sprintf("%s.%s", srcSchema, table)] = | ||
fmt.Sprintf("%s.%s", dstSchema, table) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚫 [golangci] reported by reviewdog 🐶
unnecessary trailing newline (whitespace)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please let me know once these are fixed, I'll take another look.
@@ -127,6 +124,9 @@ message SetupReplicationInput { | |||
bool do_initial_copy = 5; | |||
string existing_publication_name = 6; | |||
string existing_replication_slot_name = 7; | |||
|
|||
// if length > 0, ignore table name mapping and use schemas as basis of publication creation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of this, can we pass the mapping type here?
flow/model/model.go
Outdated
@@ -282,6 +284,8 @@ type SyncResponse struct { | |||
TableSchemaDelta *protos.TableSchemaDelta | |||
// to be stored in state for future PullFlows | |||
RelationMessageMapping RelationMessageMapping | |||
// to be used to create additional tables, for MappingType SCHEMA | |||
AdditionalTableInfo *protos.AdditionalTableInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we push this info to TableSchemaDelta
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored into an interface of various deltas
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please let me know once these are fixed, I'll take another look.
6629834
to
7853885
Compare
|
||
// if the functions are being called outside the context of a Temporal workflow, | ||
// activity.RecordHeartbeat panics, this is a bandaid for that. | ||
func (c *PostgresConnector) recordHeartbeatWithRecover(details ...interface{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚫 [golangci] reported by reviewdog 🐶
func (*PostgresConnector).recordHeartbeatWithRecover
is unused (unused)
@@ -478,7 +478,7 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model. | |||
} | |||
|
|||
func (c *SnowflakeConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, rawTableIdentifier string, | |||
syncBatchID int64, syncRecordsTx *sql.Tx) (*model.SyncResponse, error) { | |||
syncBatchID int64, syncRecordsTx *sql.Tx) (*protos.SyncResponse, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚫 [golangci] reported by reviewdog 🐶
unnecessary leading newline (whitespace)
+1 this feature would have helped a customer we chatted with today. |
support only for Postgres and Snowflake for now