From 6309d6267343579376f3e9390b043caaede73c55 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Wed, 7 Feb 2024 20:15:42 +0530 Subject: [PATCH] tests for mixed case + fixes/cleanup (#1212) 1) Added test for mixed case column/pkey/table names in PG 2) BatchSize is now taken directly from config instead of from limits which is derived from config, Limits is now empty so removed. 3) table name in addpub should be using `SchemaTable.String()` instead of `QuoteIdentifier` dynamic properties test isn't working in CI, will debug and raise it separately --- flow/activities/flowable.go | 7 +- flow/cmd/handler.go | 10 - flow/connectors/postgres/postgres.go | 8 +- flow/e2e/bigquery/peer_flow_bq_test.go | 153 +++-------- flow/e2e/postgres/peer_flow_pg_test.go | 336 +++++++++++++++++++----- flow/e2e/s3/cdc_s3_test.go | 7 +- flow/e2e/snowflake/peer_flow_sf_test.go | 133 +++------- flow/e2e/test_utils.go | 4 +- flow/workflows/cdc_flow.go | 18 +- 9 files changed, 374 insertions(+), 302 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 26a6bbe061..0d8a796ef5 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -225,6 +225,11 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, }) defer shutdown() + batchSize := input.SyncFlowOptions.BatchSize + if batchSize <= 0 { + batchSize = 1_000_000 + } + // start a goroutine to pull records from the source recordBatch := model.NewCDCRecordStream() startTime := time.Now() @@ -235,7 +240,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, SrcTableIDNameMapping: input.SrcTableIdNameMapping, TableNameMapping: tblNameMapping, LastOffset: input.LastSyncState.Checkpoint, - MaxBatchSize: input.SyncFlowOptions.BatchSize, + MaxBatchSize: batchSize, IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds( int(input.SyncFlowOptions.IdleTimeoutSeconds), ), diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index c55b122af0..0838653540 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -136,15 +136,6 @@ func (h *FlowRequestHandler) CreateCDCFlow( }, } - maxBatchSize := cfg.MaxBatchSize - if maxBatchSize == 0 { - maxBatchSize = 1_000_000 - } - - limits := &peerflow.CDCFlowLimits{ - MaxBatchSize: maxBatchSize, - } - if req.ConnectionConfigs.SoftDeleteColName == "" { req.ConnectionConfigs.SoftDeleteColName = "_PEERDB_IS_DELETED" } else { @@ -178,7 +169,6 @@ func (h *FlowRequestHandler) CreateCDCFlow( workflowOptions, // workflow start options peerflow.CDCFlowWorkflowWithConfig, // workflow function cfg, // workflow input - limits, // workflow limits nil, // workflow state ) if err != nil { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index dc51546070..ada5f60e84 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -1014,9 +1014,13 @@ func (c *PostgresConnector) AddTablesToPublication(req *protos.AddTablesToPublic } } else { for _, additionalSrcTable := range additionalSrcTables { - _, err := c.conn.Exec(c.ctx, fmt.Sprintf("ALTER PUBLICATION %s ADD TABLE %s", + schemaTable, err := utils.ParseSchemaTable(additionalSrcTable) + if err != nil { + return err + } + _, err = c.conn.Exec(c.ctx, fmt.Sprintf("ALTER PUBLICATION %s ADD TABLE %s", utils.QuoteIdentifier(c.getDefaultPublicationName(req.FlowJobName)), - utils.QuoteIdentifier(additionalSrcTable))) + schemaTable.String())) // don't error out if table is already added to our publication if err != nil && !strings.Contains(err.Error(), "SQLSTATE 42710") { return fmt.Errorf("failed to alter publication: %w", err) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index d33f9a3f5c..3c3cddf548 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -191,11 +191,7 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ { func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { env := e2e.NewTemporalTestWorkflowEnvironment(s.t) - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 1, - } - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, nil, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, nil, nil) // Verify workflow completes require.True(s.t, env.IsWorkflowCompleted()) @@ -229,10 +225,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 1, - } + flowConnConfig.MaxBatchSize = 1 go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) @@ -240,7 +233,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -268,10 +261,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 1, - } + flowConnConfig.MaxBatchSize = 1 go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) @@ -279,7 +269,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -310,10 +300,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) @@ -332,7 +319,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -361,10 +348,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns @@ -391,7 +375,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -420,10 +404,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns @@ -456,7 +437,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -484,10 +465,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns @@ -513,7 +491,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -542,10 +520,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns @@ -570,7 +545,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -606,10 +581,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns @@ -670,7 +642,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -693,10 +665,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns @@ -715,7 +684,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -743,10 +712,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table @@ -800,7 +766,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -827,10 +793,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns @@ -860,7 +823,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -888,10 +851,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert and mutate schema repeatedly. @@ -947,7 +907,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -977,10 +937,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. @@ -1010,7 +967,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -1041,10 +998,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. @@ -1075,7 +1029,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -1106,10 +1060,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. @@ -1137,7 +1088,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -1164,10 +1115,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) @@ -1191,7 +1139,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -1222,10 +1170,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns @@ -1254,7 +1199,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) require.NoError(s.t, s.bqHelper.DropDataset(secondDataset)) @@ -1295,10 +1240,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", SyncedAtColName: "_PEERDB_SYNCED_AT", - } - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1332,7 +1274,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil) e2e.RequireEnvCanceled(s.t, env) newerSyncedAtQuery := fmt.Sprintf( @@ -1378,10 +1320,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", SyncedAtColName: "_PEERDB_SYNCED_AT", - } - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1420,7 +1359,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -1459,10 +1398,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", SyncedAtColName: "_PEERDB_SYNCED_AT", - } - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1501,7 +1437,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil) e2e.RequireEnvCanceled(s.t, env) newerSyncedAtQuery := fmt.Sprintf( @@ -1546,10 +1482,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", SyncedAtColName: "_PEERDB_SYNCED_AT", - } - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1583,7 +1516,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil) e2e.RequireEnvCanceled(s.t, env) newerSyncedAtQuery := fmt.Sprintf( diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 74f6a39c6d..7847759d1e 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -99,10 +99,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table @@ -125,7 +122,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -152,10 +149,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Geospatial_PG() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) @@ -172,7 +166,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Geospatial_PG() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -201,10 +195,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) @@ -239,7 +230,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -271,10 +262,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) @@ -290,7 +278,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -316,10 +304,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 1, - } + flowConnConfig.MaxBatchSize = 1 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert and mutate schema repeatedly. @@ -477,7 +462,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -506,10 +491,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. @@ -540,7 +522,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -574,10 +556,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. @@ -611,7 +590,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -645,10 +624,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. @@ -681,7 +657,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -709,10 +685,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) @@ -737,7 +710,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -776,10 +749,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() { SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", SyncedAtColName: "_PEERDB_SYNCED_AT", - } - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -814,7 +784,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil) e2e.RequireEnvCanceled(s.t, env) // verify our updates and delete happened @@ -865,10 +835,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_IUD_Same_Batch() { SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", SyncedAtColName: "_PEERDB_SYNCED_AT", - } - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -908,7 +875,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_IUD_Same_Batch() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -947,10 +914,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() { SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", SyncedAtColName: "_PEERDB_SYNCED_AT", - } - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -979,13 +943,14 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() { e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background())) e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize transaction", func() bool { - return s.comparePGTables(srcTableName, dstTableName+` WHERE NOT "_PEERDB_IS_DELETED"`, "id,c1,c2,t") == nil + return s.comparePGTables(srcTableName, + dstTableName+` WHERE NOT "_PEERDB_IS_DELETED"`, "id,c1,c2,t") == nil }) env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil) e2e.RequireEnvCanceled(s.t, env) // verify our updates and delete happened @@ -1033,10 +998,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Insert_After_Delete() { SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", SyncedAtColName: "_PEERDB_SYNCED_AT", - } - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1066,7 +1028,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Insert_After_Delete() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil) e2e.RequireEnvCanceled(s.t, env) softDeleteQuery := fmt.Sprintf(` @@ -1076,3 +1038,245 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Insert_After_Delete() { require.NoError(s.t, err) require.Equal(s.t, int64(0), numRows) } + +func (s PeerFlowE2ETestSuitePG) Test_Supported_Mixed_Case_Table() { + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + + stmtSrcTableName := fmt.Sprintf(`e2e_test_%s."%s"`, s.suffix, "testMixedCase") + srcTableName := s.attachSchemaSuffix("testMixedCase") + stmtDstTableName := fmt.Sprintf(`e2e_test_%s."%s"`, s.suffix, "testMixedCaseDst") + dstTableName := s.attachSchemaSuffix("testMixedCaseDst") + + _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + "pulseArmor" SERIAL PRIMARY KEY, + "highGold" TEXT NOT NULL, + "eVe" TEXT NOT NULL, + id SERIAL + ); + `, stmtSrcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_mixed_case"), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) + // insert 20 rows into the source table + for i := 0; i < 20; i++ { + testKey := fmt.Sprintf("test_key_%d", i) + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s ("highGold","eVe") VALUES ($1, $2) + `, stmtSrcTableName), testKey, testValue) + e2e.EnvNoError(s.t, env, err) + } + s.t.Log("Inserted 20 rows into the source table") + + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize mixed case", func() bool { + return s.comparePGTables(stmtSrcTableName, stmtDstTableName, + "id,\"pulseArmor\",\"highGold\",\"eVe\"") == nil + }) + + env.CancelWorkflow() + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil) +} + +// test don't work, make it work later + +// func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { +// env := e2e.NewTemporalTestWorkflowEnvironment(s.t) +// // needed otherwise errors out +// workerOptions := worker.Options{ +// EnableSessionWorker: true, +// } +// env.SetWorkerOptions(workerOptions) + +// srcTable1Name := s.attachSchemaSuffix("test_dynconfig_1") +// srcTable2Name := s.attachSchemaSuffix("test_dynconfig_2") +// dstTable1Name := s.attachSchemaSuffix("test_dynconfig_1_dst") +// dstTable2Name := s.attachSchemaSuffix("test_dynconfig_2_dst") +// sentPause := false +// sentUpdate := false + +// // signals in tests are weird, you need to register them before starting the workflow +// // otherwise you guessed it, errors out. really don't like this. +// // too short of a gap between signals also causes issues +// // might have something to do with how test workflows handle fast-forwarding time. +// env.RegisterDelayedCallback(func() { +// env.SignalWorkflow(shared.FlowSignalName, shared.PauseSignal) +// s.t.Log("Sent pause signal") +// sentPause = true +// }, 28*time.Second) +// env.RegisterDelayedCallback(func() { +// env.SignalWorkflow(shared.CDCDynamicPropertiesSignalName, &protos.CDCFlowConfigUpdate{ +// IdleTimeout: 14, +// BatchSize: 12, +// AdditionalTables: []*protos.TableMapping{ +// { +// SourceTableIdentifier: srcTable2Name, +// DestinationTableIdentifier: dstTable2Name, +// }, +// }, +// }) +// s.t.Log("Sent update signal") +// sentUpdate = true +// }, 56*time.Second) +// env.RegisterDelayedCallback(func() { +// env.SignalWorkflow(shared.FlowSignalName, shared.NoopSignal) +// s.t.Log("Sent resume signal") +// }, 84*time.Second) + +// _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s ( +// id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, +// t TEXT DEFAULT md5(random()::text)); +// CREATE TABLE IF NOT EXISTS %s ( +// id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, +// t TEXT DEFAULT md5(random()::text)); +// `, srcTable1Name, srcTable2Name)) +// require.NoError(s.t, err) + +// connectionGen := e2e.FlowConnectionGenerationConfig{ +// FlowJobName: s.attachSuffix("test_dynconfig"), +// } + +// config := &protos.FlowConnectionConfigs{ +// FlowJobName: connectionGen.FlowJobName, +// Destination: s.peer, +// TableMappings: []*protos.TableMapping{ +// { +// SourceTableIdentifier: srcTable1Name, +// DestinationTableIdentifier: dstTable1Name, +// }, +// }, +// Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), +// CdcStagingPath: connectionGen.CdcStagingPath, +// MaxBatchSize: 6, +// IdleTimeoutSeconds: 7, +// DoInitialSnapshot: true, +// SnapshotNumRowsPerPartition: 1000, +// SnapshotMaxParallelWorkers: 1, +// SnapshotNumTablesInParallel: 1, +// } + +// addRows := func(numRows int) { +// for i := 0; i < numRows; i++ { +// _, err = s.conn.Exec(context.Background(), +// fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable1Name)) +// e2e.EnvNoError(s.t, env, err) +// _, err = s.conn.Exec(context.Background(), +// fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable2Name)) +// e2e.EnvNoError(s.t, env, err) +// } +// s.t.Logf("Inserted %d rows into the source table", numRows) +// } + +// getWorkFlowState := func() peerflow.CDCFlowWorkflowState { +// var workflowState peerflow.CDCFlowWorkflowState +// val, err := env.QueryWorkflow(shared.CDCFlowStateQuery) +// e2e.EnvNoError(s.t, env, err) +// err = val.Get(&workflowState) +// e2e.EnvNoError(s.t, env, err) + +// return workflowState +// } + +// getFlowStatus := func() protos.FlowStatus { +// var flowStatus protos.FlowStatus +// val, err := env.QueryWorkflow(shared.FlowStatusQuery) +// e2e.EnvNoError(s.t, env, err) +// err = val.Get(&flowStatus) +// e2e.EnvNoError(s.t, env, err) + +// return flowStatus +// } + +// // add before to test initial load too. +// addRows(18) +// go func() { +// e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) +// // insert 18 rows into the source tables, exactly 3 batches +// addRows(18) + +// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { +// return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil +// }) + +// workflowState := getWorkFlowState() +// require.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds) +// require.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize) +// require.EqualValues(s.t, 1, len(workflowState.TableMappings)) +// require.EqualValues(s.t, 1, len(workflowState.SrcTableIdNameMapping)) +// require.EqualValues(s.t, 1, len(workflowState.TableNameSchemaMapping)) +// // we have limited batch size to 6, so atleast 3 syncs needed +// require.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3) + +// // wait for first RegisterDelayedCallback to hit. +// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent pause signal", func() bool { +// return sentPause +// }) +// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool { +// // keep adding 1 more row - guarantee finishing another sync +// addRows(1) +// flowStatus := getFlowStatus() +// return flowStatus == protos.FlowStatus_STATUS_PAUSED +// }) +// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 1 record - first table", func() bool { +// return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil +// }) + +// // we have a paused mirror, wait for second signal to hit. +// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "sent updates signal", func() bool { +// return sentUpdate +// }) + +// // add rows to both tables before resuming - should handle +// addRows(18) + +// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { +// flowStatus := getFlowStatus() +// return flowStatus == protos.FlowStatus_STATUS_RUNNING +// }) +// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "normalize 18 records - first table", func() bool { +// return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil +// }) +// e2e.EnvWaitFor(s.t, env, 1*time.Minute, "initial load + normalize 18 records - second table", func() bool { +// return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil +// }) + +// workflowState = getWorkFlowState() +// require.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds) +// require.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize) +// require.EqualValues(s.t, 2, len(workflowState.TableMappings)) +// require.EqualValues(s.t, 2, len(workflowState.SrcTableIdNameMapping)) +// require.EqualValues(s.t, 2, len(workflowState.TableNameSchemaMapping)) +// // 3 from first insert of 18 rows in 1 table +// // 1 from pre-pause +// // 3 from second insert of 18 rows in 2 tables, batch size updated +// require.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3+1+3) + +// env.CancelWorkflow() +// }() + +// env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil) +// } diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index c9a537bdf0..7295273597 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -41,10 +41,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 5, - } + flowConnConfig.MaxBatchSize = 5 go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) @@ -71,6 +68,6 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 64dd662268..9fa22f2b2e 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -145,10 +145,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 20 rows into the source table @@ -169,7 +166,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) // check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago @@ -209,10 +206,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 20 rows into the source table @@ -236,7 +230,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -264,10 +258,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table @@ -320,7 +311,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -348,10 +339,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns @@ -377,7 +365,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -405,10 +393,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns @@ -440,7 +425,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -467,10 +452,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns @@ -496,7 +478,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -524,10 +506,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns @@ -553,7 +532,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -588,10 +567,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns @@ -652,7 +628,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -678,10 +654,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns @@ -710,7 +683,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -736,10 +709,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert and mutate schema repeatedly. @@ -928,7 +898,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -957,10 +927,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. @@ -988,7 +955,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -1018,10 +985,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. @@ -1053,7 +1017,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -1084,10 +1048,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. @@ -1115,7 +1076,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -1155,10 +1116,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), CdcStagingPath: connectionGen.CdcStagingPath, SyncedAtColName: "_PEERDB_SYNCED_AT", - } - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1187,7 +1145,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil) e2e.RequireEnvCanceled(s.t, env) sfRows, err := s.GetRows(tableName, "*") @@ -1235,10 +1193,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", SyncedAtColName: "_PEERDB_SYNCED_AT", - } - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1269,7 +1224,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil) e2e.RequireEnvCanceled(s.t, env) newerSyncedAtQuery := fmt.Sprintf(` @@ -1314,10 +1269,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", SyncedAtColName: "_PEERDB_SYNCED_AT", - } - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1356,7 +1308,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -1396,10 +1348,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", SyncedAtColName: "_PEERDB_SYNCED_AT", - } - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1444,7 +1393,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil) e2e.RequireEnvCanceled(s.t, env) } @@ -1483,10 +1432,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", SyncedAtColName: "_PEERDB_SYNCED_AT", - } - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1518,7 +1464,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil) e2e.RequireEnvCanceled(s.t, env) newerSyncedAtQuery := fmt.Sprintf(` @@ -1552,10 +1498,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - limits := peerflow.CDCFlowLimits{ - MaxBatchSize: 100, - } + flowConnConfig.MaxBatchSize = 100 // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 20 rows into the source table @@ -1583,6 +1526,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { env.CancelWorkflow() }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil) e2e.RequireEnvCanceled(s.t, env) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 3baa05b3ee..4bf2d9e3f0 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -64,7 +64,9 @@ func RegisterWorkflowsAndActivities(t *testing.T, env *testsuite.TestWorkflowEnv CatalogPool: conn, Alerter: alerter, }) - env.RegisterActivity(&activities.SnapshotActivity{}) + env.RegisterActivity(&activities.SnapshotActivity{ + Alerter: alerter, + }) } // Helper function to assert errors in go routines running concurrent to workflows diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 7f859814e9..5244e9e4f1 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -24,11 +24,6 @@ const ( maxSyncFlowsPerCDCFlow = 32 ) -type CDCFlowLimits struct { - // Maximum number of rows in a sync flow batch. - MaxBatchSize uint32 -} - type CDCFlowWorkflowState struct { // Progress events for the peer flow. Progress []string @@ -150,14 +145,15 @@ type CDCFlowWorkflowResult = CDCFlowWorkflowState func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Context, cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState, - limits *CDCFlowLimits, mirrorNameSearch map[string]interface{}, + mirrorNameSearch map[string]interface{}, ) error { for _, flowConfigUpdate := range state.FlowConfigUpdates { if len(flowConfigUpdate.AdditionalTables) == 0 { continue } if shared.AdditionalTablesHasOverlap(state.TableMappings, flowConfigUpdate.AdditionalTables) { - return fmt.Errorf("duplicate source/destination tables found in additionalTables") + w.logger.Warn("duplicate source/destination tables found in additionalTables") + continue } alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ @@ -199,7 +195,6 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont CDCFlowWorkflowWithConfig, additionalTablesWorkflowCfg, nil, - limits, ) var res *CDCFlowWorkflowResult if err := childAdditionalTablesCDCFlowFuture.Get(childAdditionalTablesCDCFlowCtx, &res); err != nil { @@ -223,7 +218,6 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont func CDCFlowWorkflowWithConfig( ctx workflow.Context, cfg *protos.FlowConnectionConfigs, - limits *CDCFlowLimits, state *CDCFlowWorkflowState, ) (*CDCFlowWorkflowResult, error) { if cfg == nil { @@ -369,7 +363,7 @@ func CDCFlowWorkflowWithConfig( } state.SyncFlowOptions = &protos.SyncFlowOptions{ - BatchSize: limits.MaxBatchSize, + BatchSize: cfg.MaxBatchSize, // this means the env variable assignment path is never hit IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, SrcTableIdNameMapping: state.SrcTableIdNameMapping, @@ -479,7 +473,7 @@ func CDCFlowWorkflowWithConfig( // only place we block on receive, so signal processing is immediate mainLoopSelector.Select(ctx) if state.ActiveSignal == shared.NoopSignal { - err = w.processCDCFlowConfigUpdates(ctx, cfg, state, limits, mirrorNameSearch) + err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch) if err != nil { return state, err } @@ -609,5 +603,5 @@ func CDCFlowWorkflowWithConfig( return nil, err } - return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, state) }