diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 6aba3994f9..b6ff159792 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -55,6 +55,7 @@ func TestPeerFlowE2ETestSuiteSF(t *testing.T) { {"Test_Composite_PKey_SF", s.Test_Composite_PKey_SF}, {"Test_Composite_PKey_Toast_1_SF", s.Test_Composite_PKey_Toast_1_SF}, {"Test_Composite_PKey_Toast_2_SF", s.Test_Composite_PKey_Toast_2_SF}, + {"Test_Column_Exclusion", s.Test_Column_Exclusion}, } // assert that there are no duplicate test names @@ -1346,10 +1347,12 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) { `, srcTableName)) require.NoError(t, err) - c := e2e.FlowConnectionGenerationConfig{} + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_exclude_flow"), + } config := &protos.FlowConnectionConfigs{ - FlowJobName: s.attachSuffix("test_exclude_flow"), + FlowJobName: connectionGen.FlowJobName, Destination: s.sfHelper.Peer, TableMappings: []*protos.TableMapping{ { @@ -1359,8 +1362,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) { }, }, Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), - CdcSyncMode: c.CDCSyncMode, - CdcStagingPath: c.CdcStagingPath, + CdcSyncMode: connectionGen.CDCSyncMode, + CdcStagingPath: connectionGen.CdcStagingPath, } limits := peerflow.CDCFlowLimits{ @@ -1368,14 +1371,35 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) { MaxBatchSize: 100, } + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + // insert 10 rows into the source table + for i := 0; i < 10; i++ { + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(100)) + `, srcTableName), i, testValue) + require.NoError(t, err) + } + fmt.Println("Inserted 10 rows into the source table") + + e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + _, err = s.pool.Exec(context.Background(), + fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) + require.NoError(t, err) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) + require.NoError(t, err) + }() + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, &config, &limits, nil) require.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() s.Error(err) s.Contains(err.Error(), "continue as new") - // TODO insert rows, wait - query := fmt.Sprintf("SELECT * FROM %s.%s.test_exclude_sf ORDER BY id", s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName) sfRows, err := s.sfHelper.ExecuteAndProcessQuery(query) @@ -1385,4 +1409,5 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) { require.NotEqual(t, field.Name, "c2") } require.Equal(t, len(sfRows.Schema.Fields), 4) + // TODO inspect content }