-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Normalize split #1088
Merged
Merged
Normalize split #1088
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Adds tests for enum for postgres, bigquery and snowflake. We wish for enums to be texts on the destination
json.Marshall does not support NaN , so we have to nullfy them, but: NULLs are aren't allowed when inserting arrays in bigquery. So in the Merge statement we skip nulls Fixes #1029
Tests were flaking because enum tests were trying to create the mood enum in parallel. Unfortunately, `CREATE TYPE IF NOT EXISTS` is not a thing, and doing `DROP TYPE IF EXISTS` before every enum creation can be flaky. Solution is to just ignore the specific unique typename constraint violation
Also move PEERDB_CDC_IDLE_TIMEOUT_SECONDS to being specified by test
Remove force-dynamic in the UI mirrors status file
to allow better testing in the future, also soft-delete only happens when SoftDelete is set to true and a SoftDeleteColName is set. Added merge statement tests for PG, SF and BQ. Removed some whitespace in Postgres merge statements.
Cursor is mutable so type system already knows it has exclusive access
Catalog's methods already handle synchronization, besides when running migrations, which already uses exclusive connections Also query source/destination peer in parallel
Realised we can just create a qrep execution object for many of the activities we're calling in xmin_flow.go Fixes a typo in qrep_flow Adds temporal query handlers in xmin_flow.go (same ones as in qrep_flow - these have been refactored to a common function). Without these, XMIN mirror statuses cannot be viewed in UI
Refactors our initial load status fetch to now perform a join instead of iterating through flow names and running `len(clones)` number of queries. Uses `{cache : 'no-store'}` as an alternative to forcing dynamic rendering. This is the more idiomatic way of using NextJS and force-dynamic is just for cases where easier migration from the earlier getServerSideProps of page router is the need
 
SchemaRef was used everywhere, pgwire doesn't name the type, so when the api wanted an `Arc<Vec<FieldInfo>>` while we had `Arc<Schema{ fields: Vec<FieldInfo> }>` it required constructing `Arc::new(schema.fields.clone())`, an unnecessary clone
Maps HStore to JSON in BigQuery. Achieves this by transforming `'"a"=>"b"'` to `{"a":"b"}` via string functions, in both qrep/initial load and cdc Test added Relies on the fact hstore key and value must be quoted strings (although the key can be empty and this case is supported) And follows the syntax in the example above ```sql postgres=# select 'a'::hstore; ERROR: Unexpected end of string LINE 1: select 'a'::hstore; ^ postgres=# select 'a=>b'::hstore; -[ RECORD 1 ]---- hstore | "a"=>"b" postgres=# select 'a=>'::hstore; ERROR: Unexpected end of string LINE 1: select 'a=>'::hstore; postgres=# select 'a=>3434'::hstore; -[ RECORD 1 ]------- hstore | "a"=>"3434" ```
go 1.19 introduced atomic types similar to uber
Must specify http or else the callback upon login fails
Add tests for QRep transform functions in Snowflake and BigQuery
If a table does not have primary key nor is it a replica identity, it is now not selectable in Create CDC UI. <img width="1143" alt="Screenshot 2024-01-11 at 3 11 17 AM" src="https://github.com/PeerDB-io/peerdb/assets/65964360/c589802e-437f-4f62-b351-e1def921b4f8">
For tables with a lot of columns in general and especially many toast columns, some batches can trigger a MERGE statement so complex that BigQuery is unable to process them, with errors like `The query is too large. The maximum standard SQL query length is 1024.00K characters, including comments and white space characters.` `Error 400: Resources exceeded during query execution: The query is too complex., resourcesExceeded` For now, the fix is splitting these complex MERGE statements into smaller ones that act on different subsets of a raw table [partitioning on the basis of `_peerdb_unchanged_toast_columns`]. This can lead to tables need 10+ MERGE statements in a single batch, but this is a compromise with our current design. Instead of sending MERGEs for all tables at once, we do it per table now and update metadata at the end, to avoid exceeding SQL query length limits. --------- Co-authored-by: Kaushik Iska <[email protected]>
## Eventhubs: Enable Partitioning Via Column Value This PR aims to achieve a feature where PG to Eventhub CDC now supports partition-level routing. Suppose there is a column in the source table T1 called `id`. We want T1 to be mapped to one eventhub E1, and the rows with the same `id` value V1 in T1 to be mapped to the same partition P1 of E1. And similarly, V2 or T1 to P2 of E1 and so on. **Note**: The limit of 32 on the number of partitions is being accounted for by Eventhub's internal assignment. V100 to P100 is not happening ### Mirror definition - `ScopedEventhub`'s definition is now : ```golang // Scoped eventhub is of the form peer_name.eventhub_name.partition_column.partition_key_value // partition_column is the column in the table that is used to determine // the partition key for the eventhub. Partition value is one such value of that column. type ScopedEventhub struct { PeerName string Eventhub string PartitionKeyColumn string PartitionKeyValue string } ``` Therefore now the semantics of the table mapping in the mirror command is: ```sql WITH TABLE MAPPING ( -- src_table:<eventhub (not group) peer>.<eventhub name>.<column name> public.oss1:ehpeer1.oss1_destination_eventhub.id ) ``` ### Routing When creating a batch, we specify the partition key of the batch: ```golang opts := &azeventhubs.EventDataBatchOptions{ PartitionKey: &destination.PartitionKeyValue, } batch, err := hub.NewEventDataBatch(ctx, opts) ``` According to azeventhubs docs: ``` PartitionKey is hashed to calculate the partition assignment. Messages and message batches with the same PartitionKey are guaranteed to end up in the same partition. ```
Maps HStore to a JSON-compatible Variant in Snowflake - for CDC and QRep
We weren't returning the current sync batch ID as part of eventhub's SyncRecords. As a result, we weren't able to get CDC logs for Eventhub. This PR fixes that
Removes metadatadb requirement for Eventhub Group peer as it can use catalog on its own Fixes a few logs
Co-authored-by: Kevin Biju <[email protected]>
Refreshing in the slot page was not updating the slot table data since we aren't disabling caching there like we are in the mirrors edit page. Same goes for mirror errors page. Also just adds `cache:no-store` in a few other fetches for safety
partitioning based on unchanged toast columns wasn't being done right
- Remove push parallelism - Move heartbeat routine inside processBatch - use atomic int for num records --------- Co-authored-by: Philip Dubé <[email protected]>
In UI we get data from `information_schema.table_constraints` for primary key based filtering. This can be unreliable as according to [postgres docs](https://www.postgresql.org/docs/current/infoschema-table-constraints.html), it won't work for read-only users. This PR modifies two of our queries we use for the table picker to not use this view
`NormalizeFlowCountQuery` is stunting decoupled sync/normalize workflows So replace it with `WaitFor` Besides, I just don't like this `ExitAfterRecords` way of doing things e2e tests are integration tests: implementation should be treated as a black box as much as possible Temporal has a bunch of capabilities to mock activities so that we can create unit tests for the more intrusive tests that'd be necessary to raise branch coverage etc `WaitFor` presents the ideal mechanism for testing convergent processes: update source, wait for destination to reflect change In order to make this change work, however, I needed to use `env.CancelWorkflow` after completing tests since I now want the workflow running indefinitely It turns out our code doesn't adequately handle cancellation, so implemented that
Holding back from pgwire 0.19 since tests were hanging with it
Closing allows for multiple receivers & never blocks sender It's the correct choice when one wants to signal an irreversible state change
Split from #893
Changes necessary because of 1. sunng87/pgwire#144 2. sunng87/pgwire#147 Tests were failing due to hanging in 0.19.0, 0.19.1 fixed hang: sunng87/pgwire#148
A sync batch should not be considered complete until its schema changes are processed, this avoids failures after commit causing schema changes to be dropped, & when decoupling normalize/sync in #893 was causing normalization to be missing values
Seems like `peer_flow_bq_test.go` was missing the sync import
We now hash the partition key column value obtained from the destination table name in create mirror for PG->EH. This is to reduce the number of Eventhub batches we create. Noticed that not doing so makes the mirror extremely slow Also fixes some code of the UI Graph component
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
wanting to do testing as if in customer environment