Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalize split #1088

Merged
merged 58 commits into from
Jan 16, 2024
Merged

Normalize split #1088

merged 58 commits into from
Jan 16, 2024

Conversation

serprex
Copy link
Contributor

@serprex serprex commented Jan 16, 2024

wanting to do testing as if in customer environment

serprex and others added 30 commits January 6, 2024 09:32
#1020 reverted #997

Reimplement so only func declaration moves into HeartbeatRoutine
Adds tests for enum for postgres, bigquery and snowflake. We wish for
enums to be texts on the destination
json.Marshall does not support NaN , so we have to nullfy them, but:
NULLs are aren't allowed when inserting arrays in bigquery. 
So in the Merge statement we skip nulls
Fixes #1029
Tests were flaking because enum tests were trying to create the mood enum in parallel.
Unfortunately, `CREATE TYPE IF NOT EXISTS` is not a thing,
and doing `DROP TYPE IF EXISTS` before every enum creation can be flaky.
Solution is to just ignore the specific unique typename constraint violation
Also move PEERDB_CDC_IDLE_TIMEOUT_SECONDS to being specified by test
Remove force-dynamic in the UI mirrors status file
to allow better testing in the future, also soft-delete only happens
when SoftDelete is set to true and a SoftDeleteColName is set.

Added merge statement tests for PG, SF and BQ.
Removed some whitespace in Postgres merge statements.
Cursor is mutable so type system already knows it has exclusive access
Catalog's methods already handle synchronization,
besides when running migrations, which already uses exclusive connections

Also query source/destination peer in parallel
Realised we can just create a qrep execution object for many of the
activities we're calling in xmin_flow.go
Fixes a typo in qrep_flow
Adds temporal query handlers in xmin_flow.go (same ones as in qrep_flow
- these have been refactored to a common function). Without these, XMIN
mirror statuses cannot be viewed in UI
Refactors our initial load status fetch to now perform a join instead of
iterating through flow names and running `len(clones)` number of
queries.

Uses `{cache : 'no-store'}` as an alternative to forcing dynamic
rendering. This is the more idiomatic way of using NextJS and
force-dynamic is just for cases where easier migration from the earlier
getServerSideProps of page router is the need
SchemaRef was used everywhere, pgwire doesn't name the type,
so when the api wanted an `Arc<Vec<FieldInfo>>` while we had
`Arc<Schema{ fields: Vec<FieldInfo> }>` it required constructing
`Arc::new(schema.fields.clone())`, an unnecessary clone
Maps HStore to JSON in BigQuery. 
Achieves this by transforming `'"a"=>"b"'` to `{"a":"b"}` via string
functions, in both qrep/initial load and cdc
Test added

Relies on the fact hstore key and value must be quoted strings (although
the key can be empty and this case is supported)
And follows the syntax in the example above

```sql
postgres=# select 'a'::hstore;
ERROR:  Unexpected end of string
LINE 1: select 'a'::hstore;
               ^
postgres=# select 'a=>b'::hstore;
-[ RECORD 1 ]----
hstore | "a"=>"b"

postgres=# select 'a=>'::hstore;
ERROR:  Unexpected end of string
LINE 1: select 'a=>'::hstore;

postgres=# select 'a=>3434'::hstore;
-[ RECORD 1 ]-------
hstore | "a"=>"3434"
```
go 1.19 introduced atomic types similar to uber
Must specify http or else the callback upon login fails
Add tests for QRep transform functions in Snowflake and BigQuery
If a table does not have primary key nor is it a replica identity, it is
now not selectable in Create CDC UI.

<img width="1143" alt="Screenshot 2024-01-11 at 3 11 17 AM"
src="https://github.com/PeerDB-io/peerdb/assets/65964360/c589802e-437f-4f62-b351-e1def921b4f8">
For tables with a lot of columns in general and especially many toast
columns, some batches can trigger a MERGE statement so complex that
BigQuery is unable to process them, with errors like

`The query is too large. The maximum standard SQL query length is
1024.00K characters, including comments and white space characters.`
`Error 400: Resources exceeded during query execution: The query is too
complex., resourcesExceeded`

For now, the fix is splitting these complex MERGE statements into
smaller ones that act on different subsets of a raw table [partitioning
on the basis of `_peerdb_unchanged_toast_columns`]. This can lead to
tables need 10+ MERGE statements in a single batch, but this is a
compromise with our current design. Instead of sending MERGEs for all
tables at once, we do it per table now and update metadata at the end,
to avoid exceeding SQL query length limits.

---------

Co-authored-by: Kaushik Iska <[email protected]>
## Eventhubs: Enable Partitioning Via Column Value
This PR aims to achieve a feature where PG to Eventhub CDC now supports
partition-level routing. Suppose there is a column in the source table
T1 called `id`. We want T1 to be mapped to one eventhub E1, and the rows
with the same `id` value V1 in T1 to be mapped to the same partition P1
of E1. And similarly, V2 or T1 to P2 of E1 and so on.

**Note**: The limit of 32 on the number of partitions is being accounted
for by Eventhub's internal assignment. V100 to P100 is not happening

### Mirror definition
- `ScopedEventhub`'s definition is now :
```golang
// Scoped eventhub is of the form peer_name.eventhub_name.partition_column.partition_key_value
// partition_column is the column in the table that is used to determine
// the partition key for the eventhub. Partition value is one such value of that column.
type ScopedEventhub struct {
	PeerName           string
	Eventhub           string
	PartitionKeyColumn string
	PartitionKeyValue  string
}
```

Therefore now the semantics of the table mapping in the mirror command
is:
```sql
WITH TABLE MAPPING (
-- src_table:<eventhub (not group) peer>.<eventhub name>.<column name>
public.oss1:ehpeer1.oss1_destination_eventhub.id 
)
```

### Routing
When creating a batch, we specify the partition key of the batch:
```golang
opts := &azeventhubs.EventDataBatchOptions{
		PartitionKey: &destination.PartitionKeyValue,
	}
batch, err := hub.NewEventDataBatch(ctx, opts)
```

According to azeventhubs docs:
```
PartitionKey is hashed to calculate the partition assignment. Messages and message
batches with the same PartitionKey are guaranteed to end up in the same partition.
```
Amogh-Bharadwaj and others added 28 commits January 11, 2024 09:27
Maps HStore to a JSON-compatible Variant in Snowflake - for CDC and QRep
We weren't returning the current sync batch ID as part of eventhub's
SyncRecords. As a result, we weren't able to get CDC logs for Eventhub.
This PR fixes that
Removes metadatadb requirement for Eventhub Group peer as it can use
catalog on its own
Fixes a few logs
Refreshing in the slot page was not updating the slot table data since
we aren't disabling caching there like we are in the mirrors edit page.
Same goes for mirror errors page. Also just adds `cache:no-store` in a
few other fetches for safety
partitioning based on unchanged toast columns wasn't being done right
- Remove push parallelism
- Move heartbeat routine inside processBatch
- use atomic int for num records

---------

Co-authored-by: Philip Dubé <[email protected]>
In UI we get data from `information_schema.table_constraints` for
primary key based filtering.
This can be unreliable as according to [postgres
docs](https://www.postgresql.org/docs/current/infoschema-table-constraints.html),
it won't work for read-only users.

This PR modifies two of our queries we use for the table picker to not
use this view
`NormalizeFlowCountQuery` is stunting decoupled sync/normalize workflows
So replace it with `WaitFor`

Besides, I just don't like this `ExitAfterRecords` way of doing things
e2e tests are integration tests: implementation should be treated as a black box as much as possible
Temporal has a bunch of capabilities to mock activities so that we can create unit tests for the more intrusive tests that'd be necessary to raise branch coverage etc

`WaitFor` presents the ideal mechanism for testing convergent processes:
update source, wait for destination to reflect change

In order to make this change work, however,
I needed to use `env.CancelWorkflow` after completing tests
since I now want the workflow running indefinitely
It turns out our code doesn't adequately handle cancellation,
so implemented that
Holding back from pgwire 0.19 since tests were hanging with it
Closing allows for multiple receivers & never blocks sender
It's the correct choice when one wants to signal an irreversible state change
Changes necessary because of
1. sunng87/pgwire#144
2. sunng87/pgwire#147

Tests were failing due to hanging in 0.19.0,
0.19.1 fixed hang: sunng87/pgwire#148
A sync batch should not be considered complete until its schema changes are processed,
this avoids failures after commit causing schema changes to be dropped,
& when decoupling normalize/sync in #893 was causing normalization to be missing values
Seems like `peer_flow_bq_test.go` was missing the sync import
We now hash the partition key column value obtained from the destination
table name in create mirror for PG->EH.
This is to reduce the number of Eventhub batches we create. Noticed that
not doing so makes the mirror extremely slow
Also fixes some code of the UI Graph component
@serprex serprex requested a review from iskakaushik January 16, 2024 23:32
@serprex serprex merged commit 4101934 into customer-docker Jan 16, 2024
7 of 8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants