caLHNqi{>~JQ96RI(GT}=JCVgoYML54+c>A7sZ(;|jFH{uOm~FO&g%HP
zOSD=C7H6uSX2f=9h9GGW$%Rw%fhr0wE+5iPdUw*@#t8mZbjj3%F||YJJMyCt5}72z
zPf2>gmabI^E&rglty{ji+iJ=k%0H+p~&e^|^1nHwWLex;D{h|XKYtpZ=zh;NE6k`4pxNfw6Q~uO5t=l*Iop%q))xCM0nS_ty7AG=3+m;us>DT$CC)0ydmalQsrFgR28nbP9MM%pgff^n~JO9lDZ
zX1r?61}7VJneD^X!wr7yt^V?{2f8D@n)>)t-TN)uKKLu}RRuo*juNghDNzm!JMs)vR$LfMRqw)GI8{VYV8@#22+Y16BFjQm#XTe+s0{z`>|3>D|>jR
zL?ea_Jlz5gr`rs~02P5>+cA9Ov#D*L!r!L^#?;tfiK6M=f~~)>!@l(z=G_(gi4w!A
z5tn{DIFZ7z=g@>|cT}H<`iL#;e_ZU)>U;)rXV(#Xp`Rewrh27#A4RhCPfDgyB1EUM
z@O7N)>VLDsO5NLp4DNHS>T)^#rHn5@bN)fbtj-@inMMxOjVti`hq
gdtzcD@ksF5cbVOS_U*<~zaAj>SxKr&0uu8700L0;!vFvP
literal 0
HcmV?d00001
From ad65c28496553337182e65de14ab1a0de3b11292 Mon Sep 17 00:00:00 2001
From: Kaushik Iska
Date: Tue, 12 Mar 2024 15:54:23 -0400
Subject: [PATCH 2/7] For replica identity full don't create pkey on snowflake
(#1478)
We create primary key indexes on the destination table in Snowflake CDC.
For Replica Identity Full on a table, we assume all columns to be
primary keys and replay that to the destination.
But if one of those columns on the source have null values, our
normalize records step fails saying `NULL result in a non-nullable
column`
This PR fixes this by not creating Primary Keys on the target for
replica identity full
Co-authored-by: Amogh Bharadwaj
---
flow/connectors/postgres/client.go | 2 +-
flow/connectors/snowflake/snowflake.go | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go
index 22ac244aa9..898f962aec 100644
--- a/flow/connectors/postgres/client.go
+++ b/flow/connectors/postgres/client.go
@@ -458,7 +458,7 @@ func generateCreateTableSQLForNormalizedTable(
}
// add composite primary key to the table
- if len(sourceTableSchema.PrimaryKeyColumns) > 0 {
+ if len(sourceTableSchema.PrimaryKeyColumns) > 0 && !sourceTableSchema.IsReplicaIdentityFull {
primaryKeyColsQuoted := make([]string, 0, len(sourceTableSchema.PrimaryKeyColumns))
for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns {
primaryKeyColsQuoted = append(primaryKeyColsQuoted, QuoteIdentifier(primaryKeyCol))
diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go
index 775c130665..4ba626a053 100644
--- a/flow/connectors/snowflake/snowflake.go
+++ b/flow/connectors/snowflake/snowflake.go
@@ -727,7 +727,7 @@ func generateCreateTableSQLForNormalizedTable(
}
// add composite primary key to the table
- if len(sourceTableSchema.PrimaryKeyColumns) > 0 {
+ if len(sourceTableSchema.PrimaryKeyColumns) > 0 && !sourceTableSchema.IsReplicaIdentityFull {
normalizedPrimaryKeyCols := make([]string, 0, len(sourceTableSchema.PrimaryKeyColumns))
for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns {
normalizedPrimaryKeyCols = append(normalizedPrimaryKeyCols,
From d9995a7347502997dba6f87ffe3913926b4847c4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Philip=20Dub=C3=A9?=
Date: Wed, 13 Mar 2024 21:14:10 +0000
Subject: [PATCH 3/7] PullRecords: update replState.Offset in defer (#1481)
When an error happens we must update replState.Offset still,
as with activity retries it's possible to end up in a scenario:
replState.Offset is 1, last offset in catalog is 1, repl connection is at 3
PullRecords will go ahead reading from 4, skipping LSN 2 & 3
---
flow/activities/flowable.go | 1 +
flow/connectors/core.go | 3 +++
flow/connectors/postgres/postgres.go | 18 ++++++++++++++----
3 files changed, 18 insertions(+), 4 deletions(-)
diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go
index 7ad57fd12d..55710ae642 100644
--- a/flow/activities/flowable.go
+++ b/flow/activities/flowable.go
@@ -410,6 +410,7 @@ func (a *FlowableActivity) SyncFlow(
logger.Info(fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds())))
lastCheckpoint := recordBatch.GetLastCheckpoint()
+ srcConn.UpdateReplStateLastOffset(lastCheckpoint)
err = monitoring.UpdateNumRowsAndEndLSNForCDCBatch(
ctx,
diff --git a/flow/connectors/core.go b/flow/connectors/core.go
index b374ff70e4..a8e39b67b1 100644
--- a/flow/connectors/core.go
+++ b/flow/connectors/core.go
@@ -54,6 +54,9 @@ type CDCPullConnector interface {
// This method should be idempotent, and should be able to be called multiple times with the same request.
PullRecords(ctx context.Context, catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error
+ // Called when offset has been confirmed to destination
+ UpdateReplStateLastOffset(lastOffset int64)
+
// PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR
PullFlowCleanup(ctx context.Context, jobName string) error
diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go
index f82b867f2b..1bba4e8eca 100644
--- a/flow/connectors/postgres/postgres.go
+++ b/flow/connectors/postgres/postgres.go
@@ -8,6 +8,7 @@ import (
"regexp"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/google/uuid"
@@ -49,6 +50,7 @@ type ReplState struct {
Slot string
Publication string
Offset int64
+ LastOffset atomic.Int64
}
func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) {
@@ -133,7 +135,7 @@ func (c *PostgresConnector) ReplPing(ctx context.Context) error {
return pglogrepl.SendStandbyStatusUpdate(
ctx,
c.replConn.PgConn(),
- pglogrepl.StandbyStatusUpdate{WALWritePosition: pglogrepl.LSN(c.replState.Offset)},
+ pglogrepl.StandbyStatusUpdate{WALWritePosition: pglogrepl.LSN(c.replState.LastOffset.Load())},
)
}
}
@@ -184,7 +186,9 @@ func (c *PostgresConnector) MaybeStartReplication(
Slot: slotName,
Publication: publicationName,
Offset: req.LastOffset,
+ LastOffset: atomic.Int64{},
}
+ c.replState.LastOffset.Store(req.LastOffset)
}
return nil
}
@@ -308,6 +312,9 @@ func (c *PostgresConnector) SetLastOffset(ctx context.Context, jobName string, l
func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error {
defer func() {
req.RecordStream.Close()
+ if c.replState != nil {
+ c.replState.Offset = req.RecordStream.GetLastCheckpoint()
+ }
}()
// Slotname would be the job name prefixed with "peerflow_slot_"
@@ -371,9 +378,6 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo
return err
}
- req.RecordStream.Close()
- c.replState.Offset = req.RecordStream.GetLastCheckpoint()
-
latestLSN, err := c.getCurrentLSN(ctx)
if err != nil {
c.logger.Error("error getting current LSN", slog.Any("error", err))
@@ -389,6 +393,12 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo
return nil
}
+func (c *PostgresConnector) UpdateReplStateLastOffset(lastOffset int64) {
+ if c.replState != nil {
+ c.replState.LastOffset.Store(lastOffset)
+ }
+}
+
// SyncRecords pushes records to the destination.
func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
rawTableIdentifier := getRawTableIdentifier(req.FlowJobName)
From 864978f462c137ff7cbc829b37c13b13e1e3c71f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Philip=20Dub=C3=A9?=
Date: Wed, 13 Mar 2024 21:49:56 +0000
Subject: [PATCH 4/7] SyncFlow: fix NonRetryableError handling from PullRecords
(#1482)
Wrapping an application error prevents preventing retries
---
flow/activities/flowable.go | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go
index 55710ae642..17187a35c6 100644
--- a/flow/activities/flowable.go
+++ b/flow/activities/flowable.go
@@ -346,7 +346,11 @@ func (a *FlowableActivity) SyncFlow(
err = errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
- return nil, fmt.Errorf("failed in pull records when: %w", err)
+ if temporal.IsApplicationError(err) {
+ return nil, err
+ } else {
+ return nil, fmt.Errorf("failed in pull records when: %w", err)
+ }
}
logger.Info("no records to push")
@@ -401,7 +405,11 @@ func (a *FlowableActivity) SyncFlow(
err = errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
- return nil, fmt.Errorf("failed to pull records: %w", err)
+ if temporal.IsApplicationError(err) {
+ return nil, err
+ } else {
+ return nil, fmt.Errorf("failed to pull records: %w", err)
+ }
}
numRecords := res.NumRecordsSynced
From 0759db5237efc6255ca32da65e749cc81e9027f1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Philip=20Dub=C3=A9?=
Date: Wed, 13 Mar 2024 22:03:15 +0000
Subject: [PATCH 5/7] Log LastOffset when logging start of SyncFlow (#1483)
---
flow/activities/flowable.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go
index 17187a35c6..afd7b3704e 100644
--- a/flow/activities/flowable.go
+++ b/flow/activities/flowable.go
@@ -287,7 +287,6 @@ func (a *FlowableActivity) SyncFlow(
}
defer connectors.CloseConnector(ctx, dstConn)
- logger.Info("pulling records...")
tblNameMapping := make(map[string]model.NameAndExclude, len(options.TableMappings))
for _, v := range options.TableMappings {
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
@@ -315,6 +314,7 @@ func (a *FlowableActivity) SyncFlow(
if err != nil {
return nil, err
}
+ logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset))
// start a goroutine to pull records from the source
recordBatch := model.NewCDCRecordStream()
From 22d652839cb98303b27d4ffff6af9bb6c0e4f812 Mon Sep 17 00:00:00 2001
From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com>
Date: Thu, 14 Mar 2024 08:53:48 +0530
Subject: [PATCH 6/7] fix(alerting): update usage (#1484)
---
ui/app/alert-config/page.tsx | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/ui/app/alert-config/page.tsx b/ui/app/alert-config/page.tsx
index 25a92338b9..f67bb21b0e 100644
--- a/ui/app/alert-config/page.tsx
+++ b/ui/app/alert-config/page.tsx
@@ -83,8 +83,8 @@ const AlertConfigPage: React.FC = () => {
From 578b1ed6f01260a3a92e8b65ed32665c9e9d4098 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Philip=20Dub=C3=A9?=
Date: Thu, 14 Mar 2024 04:27:14 +0000
Subject: [PATCH 7/7] Move PG/SF simple schema changes to generic test (#1477)
BQ would need to implement GetTableSchema interface
---
flow/connectors/core.go | 10 +-
.../snowflake/get_schema_for_tests.go | 2 -
flow/e2e/bigquery/bigquery.go | 6 +
flow/e2e/bigquery/peer_flow_bq_test.go | 3 +-
flow/e2e/generic/generic_test.go | 304 ++++++++++++++++++
flow/e2e/generic/peer_flow_test.go | 82 -----
flow/e2e/postgres/peer_flow_pg_test.go | 209 ------------
flow/e2e/postgres/postgres.go | 5 +
flow/e2e/snowflake/peer_flow_sf_test.go | 214 ------------
flow/e2e/snowflake/snowflake.go | 5 +
flow/e2e/test_utils.go | 23 +-
11 files changed, 352 insertions(+), 511 deletions(-)
create mode 100644 flow/e2e/generic/generic_test.go
delete mode 100644 flow/e2e/generic/peer_flow_test.go
diff --git a/flow/connectors/core.go b/flow/connectors/core.go
index a8e39b67b1..6adb3a2429 100644
--- a/flow/connectors/core.go
+++ b/flow/connectors/core.go
@@ -27,11 +27,16 @@ type Connector interface {
ConnectionActive(context.Context) error
}
-type CDCPullConnector interface {
+type GetTableSchemaConnector interface {
Connector
// GetTableSchema returns the schema of a table.
GetTableSchema(ctx context.Context, req *protos.GetTableSchemaBatchInput) (*protos.GetTableSchemaBatchOutput, error)
+}
+
+type CDCPullConnector interface {
+ Connector
+ GetTableSchemaConnector
// EnsurePullability ensures that the connector is pullable.
EnsurePullability(ctx context.Context, req *protos.EnsurePullabilityBatchInput) (
@@ -255,6 +260,9 @@ var (
_ CDCNormalizeConnector = &connsnowflake.SnowflakeConnector{}
_ CDCNormalizeConnector = &connclickhouse.ClickhouseConnector{}
+ _ GetTableSchemaConnector = &connpostgres.PostgresConnector{}
+ _ GetTableSchemaConnector = &connsnowflake.SnowflakeConnector{}
+
_ NormalizedTablesConnector = &connpostgres.PostgresConnector{}
_ NormalizedTablesConnector = &connbigquery.BigQueryConnector{}
_ NormalizedTablesConnector = &connsnowflake.SnowflakeConnector{}
diff --git a/flow/connectors/snowflake/get_schema_for_tests.go b/flow/connectors/snowflake/get_schema_for_tests.go
index 05631e635f..476f16f165 100644
--- a/flow/connectors/snowflake/get_schema_for_tests.go
+++ b/flow/connectors/snowflake/get_schema_for_tests.go
@@ -3,7 +3,6 @@ package connsnowflake
import (
"context"
- "github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
)
@@ -47,7 +46,6 @@ func (c *SnowflakeConnector) GetTableSchema(
return nil, err
}
res[tableName] = tableSchema
- utils.RecordHeartbeat(ctx, "fetched schema for table "+tableName)
}
return &protos.GetTableSchemaBatchOutput{
diff --git a/flow/e2e/bigquery/bigquery.go b/flow/e2e/bigquery/bigquery.go
index 73f5c38d6e..1e2a3842ee 100644
--- a/flow/e2e/bigquery/bigquery.go
+++ b/flow/e2e/bigquery/bigquery.go
@@ -8,6 +8,7 @@ import (
"github.com/jackc/pgx/v5"
+ "github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/generated/protos"
@@ -35,6 +36,11 @@ func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector {
return s.conn
}
+func (s PeerFlowE2ETestSuiteBQ) DestinationConnector() connectors.Connector {
+ // TODO have BQ connector
+ return nil
+}
+
func (s PeerFlowE2ETestSuiteBQ) Suffix() string {
return s.bqSuffix
}
diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go
index ec28b5f97b..3dff6e310e 100644
--- a/flow/e2e/bigquery/peer_flow_bq_test.go
+++ b/flow/e2e/bigquery/peer_flow_bq_test.go
@@ -643,7 +643,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
e2e.RequireEnvCanceled(s.t, env)
}
-// TODO: not checking schema exactly, add later
+// TODO: not checking schema exactly
+// write a GetTableSchemaConnector for BQ to enable generic_test
func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
tc := e2e.NewTemporalClient(s.t)
diff --git a/flow/e2e/generic/generic_test.go b/flow/e2e/generic/generic_test.go
new file mode 100644
index 0000000000..97e64f5ed3
--- /dev/null
+++ b/flow/e2e/generic/generic_test.go
@@ -0,0 +1,304 @@
+package e2e_generic
+
+import (
+ "context"
+ "fmt"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/PeerDB-io/peer-flow/connectors"
+ "github.com/PeerDB-io/peer-flow/e2e"
+ "github.com/PeerDB-io/peer-flow/e2e/bigquery"
+ "github.com/PeerDB-io/peer-flow/e2e/postgres"
+ "github.com/PeerDB-io/peer-flow/e2e/snowflake"
+ "github.com/PeerDB-io/peer-flow/e2eshared"
+ "github.com/PeerDB-io/peer-flow/generated/protos"
+ "github.com/PeerDB-io/peer-flow/model/qvalue"
+ peerflow "github.com/PeerDB-io/peer-flow/workflows"
+)
+
+func TestGenericPG(t *testing.T) {
+ e2eshared.RunSuite(t, SetupGenericSuite(e2e_postgres.SetupSuite))
+}
+
+func TestGenericSF(t *testing.T) {
+ e2eshared.RunSuite(t, SetupGenericSuite(e2e_snowflake.SetupSuite))
+}
+
+func TestGenericBQ(t *testing.T) {
+ e2eshared.RunSuite(t, SetupGenericSuite(e2e_bigquery.SetupSuite))
+}
+
+type Generic struct {
+ e2e.GenericSuite
+}
+
+func SetupGenericSuite[T e2e.GenericSuite](f func(t *testing.T) T) func(t *testing.T) Generic {
+ return func(t *testing.T) Generic {
+ t.Helper()
+ return Generic{f(t)}
+ }
+}
+
+func (s Generic) Test_Simple_Flow() {
+ t := s.T()
+ srcTable := "test_simple"
+ dstTable := "test_simple_dst"
+ srcSchemaTable := e2e.AttachSchema(s, srcTable)
+
+ _, err := s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
+ CREATE TABLE IF NOT EXISTS %s (
+ id SERIAL PRIMARY KEY,
+ key TEXT NOT NULL,
+ value TEXT NOT NULL,
+ myh HSTORE NOT NULL
+ );
+ `, srcSchemaTable))
+ require.NoError(t, err)
+
+ connectionGen := e2e.FlowConnectionGenerationConfig{
+ FlowJobName: e2e.AddSuffix(s, "test_simple"),
+ TableMappings: e2e.TableMappings(s, srcTable, dstTable),
+ Destination: s.Peer(),
+ }
+ flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()
+
+ tc := e2e.NewTemporalClient(t)
+ env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
+
+ e2e.SetupCDCFlowStatusQuery(t, env, connectionGen)
+ // insert 10 rows into the source table
+ for i := range 10 {
+ testKey := fmt.Sprintf("test_key_%d", i)
+ testValue := fmt.Sprintf("test_value_%d", i)
+ _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
+ INSERT INTO %s(key, value, myh) VALUES ($1, $2, '"a"=>"b"')
+ `, srcSchemaTable), testKey, testValue)
+ e2e.EnvNoError(t, env, err)
+ }
+ t.Log("Inserted 10 rows into the source table")
+
+ e2e.EnvWaitForEqualTablesWithNames(env, s, "normalizing 10 rows", srcTable, dstTable, `id,key,value,myh`)
+ env.Cancel()
+ e2e.RequireEnvCanceled(t, env)
+}
+
+func (s Generic) Test_Simple_Schema_Changes() {
+ t := s.T()
+
+ destinationSchemaConnector, ok := s.DestinationConnector().(connectors.GetTableSchemaConnector)
+ if !ok {
+ t.SkipNow()
+ }
+
+ srcTable := "test_simple_schema_changes"
+ dstTable := "test_simple_schema_changes_dst"
+ srcTableName := e2e.AttachSchema(s, srcTable)
+ dstTableName := s.DestinationTable(dstTable)
+
+ _, err := s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
+ CREATE TABLE IF NOT EXISTS %s (
+ id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
+ c1 BIGINT
+ );
+ `, srcTableName))
+ require.NoError(t, err)
+
+ connectionGen := e2e.FlowConnectionGenerationConfig{
+ FlowJobName: e2e.AddSuffix(s, srcTable),
+ TableMappings: e2e.TableMappings(s, srcTable, dstTable),
+ Destination: s.Peer(),
+ }
+
+ flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()
+
+ // wait for PeerFlowStatusQuery to finish setup
+ // and then insert and mutate schema repeatedly.
+ tc := e2e.NewTemporalClient(t)
+ env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
+ e2e.SetupCDCFlowStatusQuery(t, env, connectionGen)
+ _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
+ INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1)
+ e2e.EnvNoError(t, env, err)
+ t.Log("Inserted initial row in the source table")
+
+ e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize reinsert", srcTable, dstTable, "id,c1")
+
+ expectedTableSchema := &protos.TableSchema{
+ TableIdentifier: e2e.ExpectedDestinationTableName(s, dstTable),
+ Columns: []*protos.FieldDescription{
+ {
+ Name: e2e.ExpectedDestinationIdentifier(s, "id"),
+ Type: string(qvalue.QValueKindNumeric),
+ TypeModifier: -1,
+ },
+ {
+ Name: e2e.ExpectedDestinationIdentifier(s, "c1"),
+ Type: string(qvalue.QValueKindNumeric),
+ TypeModifier: -1,
+ },
+ {
+ Name: "_PEERDB_IS_DELETED",
+ Type: string(qvalue.QValueKindBoolean),
+ TypeModifier: -1,
+ },
+ {
+ Name: "_PEERDB_SYNCED_AT",
+ Type: string(qvalue.QValueKindTimestamp),
+ TypeModifier: -1,
+ },
+ },
+ }
+ output, err := destinationSchemaConnector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{
+ TableIdentifiers: []string{dstTableName},
+ })
+ e2e.EnvNoError(t, env, err)
+ e2e.EnvTrue(t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]))
+
+ // alter source table, add column c2 and insert another row.
+ _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
+ ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName))
+ e2e.EnvNoError(t, env, err)
+ t.Log("Altered source table, added column c2")
+ _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
+ INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2)
+ e2e.EnvNoError(t, env, err)
+ t.Log("Inserted row with added c2 in the source table")
+
+ // verify we got our two rows, if schema did not match up it will error.
+ e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize altered row", srcTable, dstTable, "id,c1,c2")
+ expectedTableSchema = &protos.TableSchema{
+ TableIdentifier: e2e.ExpectedDestinationTableName(s, dstTable),
+ Columns: []*protos.FieldDescription{
+ {
+ Name: e2e.ExpectedDestinationIdentifier(s, "id"),
+ Type: string(qvalue.QValueKindNumeric),
+ TypeModifier: -1,
+ },
+ {
+ Name: e2e.ExpectedDestinationIdentifier(s, "c1"),
+ Type: string(qvalue.QValueKindNumeric),
+ TypeModifier: -1,
+ },
+ {
+ Name: "_PEERDB_SYNCED_AT",
+ Type: string(qvalue.QValueKindTimestamp),
+ TypeModifier: -1,
+ },
+ {
+ Name: e2e.ExpectedDestinationIdentifier(s, "c2"),
+ Type: string(qvalue.QValueKindNumeric),
+ TypeModifier: -1,
+ },
+ },
+ }
+ output, err = destinationSchemaConnector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{
+ TableIdentifiers: []string{dstTableName},
+ })
+ e2e.EnvNoError(t, env, err)
+ e2e.EnvTrue(t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]))
+ e2e.EnvEqualTablesWithNames(env, s, srcTable, dstTable, "id,c1,c2")
+
+ // alter source table, add column c3, drop column c2 and insert another row.
+ _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
+ ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName))
+ e2e.EnvNoError(t, env, err)
+ t.Log("Altered source table, dropped column c2 and added column c3")
+ _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
+ INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3)
+ e2e.EnvNoError(t, env, err)
+ t.Log("Inserted row with added c3 in the source table")
+
+ // verify we got our two rows, if schema did not match up it will error.
+ e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize dropped c2 column", srcTable, dstTable, "id,c1,c3")
+ expectedTableSchema = &protos.TableSchema{
+ TableIdentifier: e2e.ExpectedDestinationTableName(s, dstTable),
+ Columns: []*protos.FieldDescription{
+ {
+ Name: e2e.ExpectedDestinationIdentifier(s, "id"),
+ Type: string(qvalue.QValueKindNumeric),
+ TypeModifier: -1,
+ },
+ {
+ Name: e2e.ExpectedDestinationIdentifier(s, "c1"),
+ Type: string(qvalue.QValueKindNumeric),
+ TypeModifier: -1,
+ },
+ {
+ Name: "_PEERDB_SYNCED_AT",
+ Type: string(qvalue.QValueKindTimestamp),
+ TypeModifier: -1,
+ },
+ {
+ Name: e2e.ExpectedDestinationIdentifier(s, "c2"),
+ Type: string(qvalue.QValueKindNumeric),
+ TypeModifier: -1,
+ },
+ {
+ Name: e2e.ExpectedDestinationIdentifier(s, "c3"),
+ Type: string(qvalue.QValueKindNumeric),
+ TypeModifier: -1,
+ },
+ },
+ }
+ output, err = destinationSchemaConnector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{
+ TableIdentifiers: []string{dstTableName},
+ })
+ e2e.EnvNoError(t, env, err)
+ e2e.EnvTrue(t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]))
+ e2e.EnvEqualTablesWithNames(env, s, srcTable, dstTable, "id,c1,c3")
+
+ // alter source table, drop column c3 and insert another row.
+ _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
+ ALTER TABLE %s DROP COLUMN c3`, srcTableName))
+ e2e.EnvNoError(t, env, err)
+ t.Log("Altered source table, dropped column c3")
+ _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
+ INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4)
+ e2e.EnvNoError(t, env, err)
+ t.Log("Inserted row after dropping all columns in the source table")
+
+ // verify we got our two rows, if schema did not match up it will error.
+ e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize dropped c3 column", srcTable, dstTable, "id,c1")
+ expectedTableSchema = &protos.TableSchema{
+ TableIdentifier: e2e.ExpectedDestinationTableName(s, dstTable),
+ Columns: []*protos.FieldDescription{
+ {
+ Name: e2e.ExpectedDestinationIdentifier(s, "id"),
+ Type: string(qvalue.QValueKindNumeric),
+ TypeModifier: -1,
+ },
+ {
+ Name: e2e.ExpectedDestinationIdentifier(s, "c1"),
+ Type: string(qvalue.QValueKindNumeric),
+ TypeModifier: -1,
+ },
+ {
+ Name: "_PEERDB_SYNCED_AT",
+ Type: string(qvalue.QValueKindTimestamp),
+ TypeModifier: -1,
+ },
+ {
+ Name: e2e.ExpectedDestinationIdentifier(s, "c2"),
+ Type: string(qvalue.QValueKindNumeric),
+ TypeModifier: -1,
+ },
+ {
+ Name: e2e.ExpectedDestinationIdentifier(s, "c3"),
+ Type: string(qvalue.QValueKindNumeric),
+ TypeModifier: -1,
+ },
+ },
+ }
+ output, err = destinationSchemaConnector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{
+ TableIdentifiers: []string{dstTableName},
+ })
+ e2e.EnvNoError(t, env, err)
+ e2e.EnvTrue(t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]))
+ e2e.EnvEqualTablesWithNames(env, s, srcTable, dstTable, "id,c1")
+
+ env.Cancel()
+
+ e2e.RequireEnvCanceled(t, env)
+}
diff --git a/flow/e2e/generic/peer_flow_test.go b/flow/e2e/generic/peer_flow_test.go
deleted file mode 100644
index 20c5847df4..0000000000
--- a/flow/e2e/generic/peer_flow_test.go
+++ /dev/null
@@ -1,82 +0,0 @@
-package e2e_generic
-
-import (
- "context"
- "fmt"
- "testing"
-
- "github.com/stretchr/testify/require"
-
- "github.com/PeerDB-io/peer-flow/e2e"
- "github.com/PeerDB-io/peer-flow/e2e/bigquery"
- "github.com/PeerDB-io/peer-flow/e2e/postgres"
- "github.com/PeerDB-io/peer-flow/e2e/snowflake"
- "github.com/PeerDB-io/peer-flow/e2eshared"
- peerflow "github.com/PeerDB-io/peer-flow/workflows"
-)
-
-func TestGenericPG(t *testing.T) {
- e2eshared.RunSuite(t, SetupGenericSuite(e2e_postgres.SetupSuite))
-}
-
-func TestGenericSF(t *testing.T) {
- e2eshared.RunSuite(t, SetupGenericSuite(e2e_snowflake.SetupSuite))
-}
-
-func TestGenericBQ(t *testing.T) {
- e2eshared.RunSuite(t, SetupGenericSuite(e2e_bigquery.SetupSuite))
-}
-
-type Generic struct {
- e2e.GenericSuite
-}
-
-func SetupGenericSuite[T e2e.GenericSuite](f func(t *testing.T) T) func(t *testing.T) Generic {
- return func(t *testing.T) Generic {
- t.Helper()
- return Generic{f(t)}
- }
-}
-
-func (s Generic) Test_Simple_Flow() {
- t := s.T()
- srcTable := "test_simple"
- dstTable := "test_simple_dst"
- srcSchemaTable := e2e.AttachSchema(s, srcTable)
-
- _, err := s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
- CREATE TABLE IF NOT EXISTS %s (
- id SERIAL PRIMARY KEY,
- key TEXT NOT NULL,
- value TEXT NOT NULL,
- myh HSTORE NOT NULL
- );
- `, srcSchemaTable))
- require.NoError(t, err)
-
- connectionGen := e2e.FlowConnectionGenerationConfig{
- FlowJobName: e2e.AddSuffix(s, "test_simple"),
- TableMappings: e2e.TableMappings(s, srcTable, dstTable),
- Destination: s.Peer(),
- }
- flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()
-
- tc := e2e.NewTemporalClient(t)
- env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
-
- e2e.SetupCDCFlowStatusQuery(t, env, connectionGen)
- // insert 10 rows into the source table
- for i := range 10 {
- testKey := fmt.Sprintf("test_key_%d", i)
- testValue := fmt.Sprintf("test_value_%d", i)
- _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
- INSERT INTO %s(key, value, myh) VALUES ($1, $2, '"a"=>"b"')
- `, srcSchemaTable), testKey, testValue)
- e2e.EnvNoError(t, env, err)
- }
- t.Log("Inserted 10 rows into the source table")
-
- e2e.EnvWaitForEqualTablesWithNames(env, s, "normalizing 10 rows", srcTable, dstTable, `id,key,value,myh`)
- env.Cancel()
- e2e.RequireEnvCanceled(t, env)
-}
diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go
index 2e69376b01..eeec5e373d 100644
--- a/flow/e2e/postgres/peer_flow_pg_test.go
+++ b/flow/e2e/postgres/peer_flow_pg_test.go
@@ -17,7 +17,6 @@ import (
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
- "github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)
@@ -51,32 +50,6 @@ func (s PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, ro
return nil
}
-func (s PeerFlowE2ETestSuitePG) WaitForSchema(
- env e2e.WorkflowRun,
- reason string,
- srcTableName string,
- dstTableName string,
- cols string,
- expectedSchema *protos.TableSchema,
-) {
- s.t.Helper()
- e2e.EnvWaitFor(s.t, env, 3*time.Minute, reason, func() bool {
- s.t.Helper()
- output, err := s.conn.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{
- TableIdentifiers: []string{dstTableName},
- })
- if err != nil {
- return false
- }
- tableSchema := output.TableNameSchemaMapping[dstTableName]
- if !e2e.CompareTableSchemas(expectedSchema, tableSchema) {
- s.t.Log("schemas unequal", expectedSchema, tableSchema)
- return false
- }
- return s.comparePGTables(srcTableName, dstTableName, cols) == nil
- })
-}
-
func (s PeerFlowE2ETestSuitePG) Test_Geospatial_PG() {
srcTableName := s.attachSchemaSuffix("test_geospatial_pg")
dstTableName := s.attachSchemaSuffix("test_geospatial_pg_dst")
@@ -224,188 +197,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() {
e2e.RequireEnvCanceled(s.t, env)
}
-func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
- tc := e2e.NewTemporalClient(s.t)
-
- srcTableName := s.attachSchemaSuffix("test_simple_schema_changes")
- dstTableName := s.attachSchemaSuffix("test_simple_schema_changes_dst")
-
- _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
- CREATE TABLE IF NOT EXISTS %s (
- id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
- c1 BIGINT
- );
- `, srcTableName))
- require.NoError(s.t, err)
-
- connectionGen := e2e.FlowConnectionGenerationConfig{
- FlowJobName: s.attachSuffix("test_simple_schema_changes"),
- TableNameMapping: map[string]string{srcTableName: dstTableName},
- Destination: s.peer,
- }
-
- flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()
- flowConnConfig.MaxBatchSize = 1
-
- // wait for PeerFlowStatusQuery to finish setup
- // and then insert and mutate schema repeatedly.
- env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
- e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
-
- // insert first row.
- _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
- INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1)
- e2e.EnvNoError(s.t, env, err)
- s.t.Log("Inserted initial row in the source table")
-
- s.WaitForSchema(env, "normalizing first row", srcTableName, dstTableName, "id,c1", &protos.TableSchema{
- TableIdentifier: dstTableName,
- PrimaryKeyColumns: []string{"id"},
- Columns: []*protos.FieldDescription{
- {
- Name: "id",
- Type: string(qvalue.QValueKindInt64),
- TypeModifier: -1,
- },
- {
- Name: "c1",
- Type: string(qvalue.QValueKindInt64),
- TypeModifier: -1,
- },
- {
- Name: "_PEERDB_SYNCED_AT",
- Type: string(qvalue.QValueKindTimestamp),
- TypeModifier: -1,
- },
- },
- })
-
- // alter source table, add column c2 and insert another row.
- _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
- ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName))
- e2e.EnvNoError(s.t, env, err)
- s.t.Log("Altered source table, added column c2")
- _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
- INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2)
- e2e.EnvNoError(s.t, env, err)
- s.t.Log("Inserted row with added c2 in the source table")
-
- s.WaitForSchema(env, "normalizing altered row", srcTableName, dstTableName, "id,c1,c2", &protos.TableSchema{
- TableIdentifier: dstTableName,
- PrimaryKeyColumns: []string{"id"},
- Columns: []*protos.FieldDescription{
- {
- Name: "id",
- Type: string(qvalue.QValueKindInt64),
- TypeModifier: -1,
- },
- {
- Name: "c1",
- Type: string(qvalue.QValueKindInt64),
- TypeModifier: -1,
- },
- {
- Name: "_PEERDB_SYNCED_AT",
- Type: string(qvalue.QValueKindTimestamp),
- TypeModifier: -1,
- },
- {
- Name: "c2",
- Type: string(qvalue.QValueKindInt64),
- TypeModifier: -1,
- },
- },
- })
-
- // alter source table, add column c3, drop column c2 and insert another row.
- _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
- ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName))
- e2e.EnvNoError(s.t, env, err)
- s.t.Log("Altered source table, dropped column c2 and added column c3")
- _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
- INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3)
- e2e.EnvNoError(s.t, env, err)
- s.t.Log("Inserted row with added c3 in the source table")
-
- s.WaitForSchema(env, "normalizing dropped column row", srcTableName, dstTableName, "id,c1,c3", &protos.TableSchema{
- TableIdentifier: dstTableName,
- PrimaryKeyColumns: []string{"id"},
- Columns: []*protos.FieldDescription{
- {
- Name: "id",
- Type: string(qvalue.QValueKindInt64),
- TypeModifier: -1,
- },
- {
- Name: "c1",
- Type: string(qvalue.QValueKindInt64),
- TypeModifier: -1,
- },
- {
- Name: "c2",
- Type: string(qvalue.QValueKindInt64),
- TypeModifier: -1,
- },
- {
- Name: "_PEERDB_SYNCED_AT",
- Type: string(qvalue.QValueKindTimestamp),
- TypeModifier: -1,
- },
- {
- Name: "c3",
- Type: string(qvalue.QValueKindInt64),
- TypeModifier: -1,
- },
- },
- })
-
- // alter source table, drop column c3 and insert another row.
- _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
- ALTER TABLE %s DROP COLUMN c3`, srcTableName))
- e2e.EnvNoError(s.t, env, err)
- s.t.Log("Altered source table, dropped column c3")
- _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
- INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4)
- e2e.EnvNoError(s.t, env, err)
- s.t.Log("Inserted row after dropping all columns in the source table")
-
- s.WaitForSchema(env, "normalizing 2nd dropped column row", srcTableName, dstTableName, "id,c1", &protos.TableSchema{
- TableIdentifier: dstTableName,
- PrimaryKeyColumns: []string{"id"},
- Columns: []*protos.FieldDescription{
- {
- Name: "id",
- Type: string(qvalue.QValueKindInt64),
- TypeModifier: -1,
- },
- {
- Name: "c1",
- Type: string(qvalue.QValueKindInt64),
- TypeModifier: -1,
- },
- {
- Name: "_PEERDB_SYNCED_AT",
- Type: string(qvalue.QValueKindTimestamp),
- TypeModifier: -1,
- },
- {
- Name: "c2",
- Type: string(qvalue.QValueKindInt64),
- TypeModifier: -1,
- },
- {
- Name: "c3",
- Type: string(qvalue.QValueKindInt64),
- TypeModifier: -1,
- },
- },
- })
-
- env.Cancel()
-
- e2e.RequireEnvCanceled(s.t, env)
-}
-
func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
tc := e2e.NewTemporalClient(s.t)
diff --git a/flow/e2e/postgres/postgres.go b/flow/e2e/postgres/postgres.go
index 23ca778c8d..8eafd6ade0 100644
--- a/flow/e2e/postgres/postgres.go
+++ b/flow/e2e/postgres/postgres.go
@@ -9,6 +9,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/require"
+ "github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/generated/protos"
@@ -32,6 +33,10 @@ func (s PeerFlowE2ETestSuitePG) Connector() *connpostgres.PostgresConnector {
return s.conn
}
+func (s PeerFlowE2ETestSuitePG) DestinationConnector() connectors.Connector {
+ return s.conn
+}
+
func (s PeerFlowE2ETestSuitePG) Conn() *pgx.Conn {
return s.conn.Conn()
}
diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go
index 525d2c7256..56084a1a27 100644
--- a/flow/e2e/snowflake/peer_flow_sf_test.go
+++ b/flow/e2e/snowflake/peer_flow_sf_test.go
@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
- "strings"
"testing"
"time"
@@ -16,7 +15,6 @@ import (
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
- "github.com/PeerDB-io/peer-flow/model/qvalue"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)
@@ -516,218 +514,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() {
e2e.RequireEnvCanceled(s.t, env)
}
-func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() {
- tc := e2e.NewTemporalClient(s.t)
-
- srcTableName := s.attachSchemaSuffix("test_simple_schema_changes")
- dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_schema_changes")
-
- _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
- CREATE TABLE IF NOT EXISTS %s (
- id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
- c1 BIGINT
- );
- `, srcTableName))
- require.NoError(s.t, err)
-
- connectionGen := e2e.FlowConnectionGenerationConfig{
- FlowJobName: s.attachSuffix("test_simple_schema_changes"),
- TableNameMapping: map[string]string{srcTableName: dstTableName},
- Destination: s.sfHelper.Peer,
- }
-
- flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()
- flowConnConfig.MaxBatchSize = 100
-
- // wait for PeerFlowStatusQuery to finish setup
- // and then insert and mutate schema repeatedly.
- env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
- e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
- _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
- INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1)
- e2e.EnvNoError(s.t, env, err)
- s.t.Log("Inserted initial row in the source table")
-
- e2e.EnvWaitForEqualTables(env, s, "normalize reinsert", "test_simple_schema_changes", "id,c1")
-
- expectedTableSchema := &protos.TableSchema{
- TableIdentifier: strings.ToUpper(dstTableName),
- Columns: []*protos.FieldDescription{
- {
- Name: "ID",
- Type: string(qvalue.QValueKindNumeric),
- TypeModifier: -1,
- },
- {
- Name: "C1",
- Type: string(qvalue.QValueKindNumeric),
- TypeModifier: -1,
- },
- {
- Name: "_PEERDB_IS_DELETED",
- Type: string(qvalue.QValueKindBoolean),
- TypeModifier: -1,
- },
- {
- Name: "_PEERDB_SYNCED_AT",
- Type: string(qvalue.QValueKindTimestamp),
- TypeModifier: -1,
- },
- },
- }
- output, err := s.connector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{
- TableIdentifiers: []string{dstTableName},
- })
- e2e.EnvNoError(s.t, env, err)
- e2e.EnvTrue(s.t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]))
-
- // alter source table, add column c2 and insert another row.
- _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
- ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName))
- e2e.EnvNoError(s.t, env, err)
- s.t.Log("Altered source table, added column c2")
- _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
- INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2)
- e2e.EnvNoError(s.t, env, err)
- s.t.Log("Inserted row with added c2 in the source table")
-
- // verify we got our two rows, if schema did not match up it will error.
- e2e.EnvWaitForEqualTables(env, s, "normalize altered row", "test_simple_schema_changes", "id,c1,c2")
- expectedTableSchema = &protos.TableSchema{
- TableIdentifier: strings.ToUpper(dstTableName),
- Columns: []*protos.FieldDescription{
- {
- Name: "ID",
- Type: string(qvalue.QValueKindNumeric),
- TypeModifier: -1,
- },
- {
- Name: "C1",
- Type: string(qvalue.QValueKindNumeric),
- TypeModifier: -1,
- },
- {
- Name: "_PEERDB_SYNCED_AT",
- Type: string(qvalue.QValueKindTimestamp),
- TypeModifier: -1,
- },
- {
- Name: "C2",
- Type: string(qvalue.QValueKindNumeric),
- TypeModifier: -1,
- },
- },
- }
- output, err = s.connector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{
- TableIdentifiers: []string{dstTableName},
- })
- e2e.EnvNoError(s.t, env, err)
- e2e.EnvTrue(s.t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]))
- e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c2")
-
- // alter source table, add column c3, drop column c2 and insert another row.
- _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
- ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName))
- e2e.EnvNoError(s.t, env, err)
- s.t.Log("Altered source table, dropped column c2 and added column c3")
- _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
- INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3)
- e2e.EnvNoError(s.t, env, err)
- s.t.Log("Inserted row with added c3 in the source table")
-
- // verify we got our two rows, if schema did not match up it will error.
- e2e.EnvWaitForEqualTables(env, s, "normalize dropped c2 column", "test_simple_schema_changes", "id,c1,c3")
- expectedTableSchema = &protos.TableSchema{
- TableIdentifier: strings.ToUpper(dstTableName),
- Columns: []*protos.FieldDescription{
- {
- Name: "ID",
- Type: string(qvalue.QValueKindNumeric),
- TypeModifier: -1,
- },
- {
- Name: "C1",
- Type: string(qvalue.QValueKindNumeric),
- TypeModifier: -1,
- },
- {
- Name: "_PEERDB_SYNCED_AT",
- Type: string(qvalue.QValueKindTimestamp),
- TypeModifier: -1,
- },
- {
- Name: "C2",
- Type: string(qvalue.QValueKindNumeric),
- TypeModifier: -1,
- },
- {
- Name: "C3",
- Type: string(qvalue.QValueKindNumeric),
- TypeModifier: -1,
- },
- },
- }
- output, err = s.connector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{
- TableIdentifiers: []string{dstTableName},
- })
- e2e.EnvNoError(s.t, env, err)
- e2e.EnvTrue(s.t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]))
- e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c3")
-
- // alter source table, drop column c3 and insert another row.
- _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
- ALTER TABLE %s DROP COLUMN c3`, srcTableName))
- e2e.EnvNoError(s.t, env, err)
- s.t.Log("Altered source table, dropped column c3")
- _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
- INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4)
- e2e.EnvNoError(s.t, env, err)
- s.t.Log("Inserted row after dropping all columns in the source table")
-
- // verify we got our two rows, if schema did not match up it will error.
- e2e.EnvWaitForEqualTables(env, s, "normalize dropped c3 column", "test_simple_schema_changes", "id,c1")
- expectedTableSchema = &protos.TableSchema{
- TableIdentifier: strings.ToUpper(dstTableName),
- Columns: []*protos.FieldDescription{
- {
- Name: "ID",
- Type: string(qvalue.QValueKindNumeric),
- TypeModifier: -1,
- },
- {
- Name: "C1",
- Type: string(qvalue.QValueKindNumeric),
- TypeModifier: -1,
- },
- {
- Name: "_PEERDB_SYNCED_AT",
- Type: string(qvalue.QValueKindTimestamp),
- TypeModifier: -1,
- },
- {
- Name: "C2",
- Type: string(qvalue.QValueKindNumeric),
- TypeModifier: -1,
- },
- {
- Name: "C3",
- Type: string(qvalue.QValueKindNumeric),
- TypeModifier: -1,
- },
- },
- }
- output, err = s.connector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{
- TableIdentifiers: []string{dstTableName},
- })
- e2e.EnvNoError(s.t, env, err)
- e2e.EnvTrue(s.t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]))
- e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1")
-
- env.Cancel()
-
- e2e.RequireEnvCanceled(s.t, env)
-}
-
func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() {
tc := e2e.NewTemporalClient(s.t)
diff --git a/flow/e2e/snowflake/snowflake.go b/flow/e2e/snowflake/snowflake.go
index 45132ef601..06c46d1046 100644
--- a/flow/e2e/snowflake/snowflake.go
+++ b/flow/e2e/snowflake/snowflake.go
@@ -10,6 +10,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/require"
+ "github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/e2e"
@@ -35,6 +36,10 @@ func (s PeerFlowE2ETestSuiteSF) Connector() *connpostgres.PostgresConnector {
return s.conn
}
+func (s PeerFlowE2ETestSuiteSF) DestinationConnector() connectors.Connector {
+ return s.connector
+}
+
func (s PeerFlowE2ETestSuiteSF) Conn() *pgx.Conn {
return s.Connector().Conn()
}
diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go
index c018f32df2..dcaef74291 100644
--- a/flow/e2e/test_utils.go
+++ b/flow/e2e/test_utils.go
@@ -22,6 +22,7 @@ import (
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/temporal"
+ "github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/connectors/utils"
@@ -55,6 +56,7 @@ type RowSource interface {
type GenericSuite interface {
RowSource
Peer() *protos.Peer
+ DestinationConnector() connectors.Connector
DestinationTable(table string) string
}
@@ -112,13 +114,17 @@ func RequireEqualTables(suite RowSource, table string, cols string) {
}
func EnvEqualTables(env WorkflowRun, suite RowSource, table string, cols string) {
+ EnvEqualTablesWithNames(env, suite, table, table, cols)
+}
+
+func EnvEqualTablesWithNames(env WorkflowRun, suite RowSource, srcTable string, dstTable string, cols string) {
t := suite.T()
t.Helper()
- pgRows, err := GetPgRows(suite.Connector(), suite.Suffix(), table, cols)
+ pgRows, err := GetPgRows(suite.Connector(), suite.Suffix(), srcTable, cols)
EnvNoError(t, env, err)
- rows, err := suite.GetRows(table, cols)
+ rows, err := suite.GetRows(dstTable, cols)
EnvNoError(t, env, err)
EnvEqualRecordBatches(t, env, pgRows, rows)
@@ -519,6 +525,19 @@ func GetOwnersSelectorStringsSF() [2]string {
return [2]string{strings.Join(pgFields, ","), strings.Join(sfFields, ",")}
}
+func ExpectedDestinationIdentifier(s GenericSuite, ident string) string {
+ switch s.DestinationConnector().(type) {
+ case *connsnowflake.SnowflakeConnector:
+ return strings.ToUpper(ident)
+ default:
+ return ident
+ }
+}
+
+func ExpectedDestinationTableName(s GenericSuite, table string) string {
+ return ExpectedDestinationIdentifier(s, s.DestinationTable(table))
+}
+
type testWriter struct {
*testing.T
}