diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index d7e3e4315b..c9215ef509 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -35,22 +35,21 @@ type PeerFlowE2ETestSuiteSF struct { func TestPeerFlowE2ETestSuiteSF(t *testing.T) { e2eshared.GotSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSF) { - err := e2e.TearDownPostgres(s.pool, s.pgSuffix) - if err != nil { - slog.Error("failed to tear down Postgres", slog.Any("error", err)) - s.FailNow() - } + // err := e2e.TearDownPostgres(s.pool, s.pgSuffix) + // if err != nil { + // slog.Error("failed to tear down Postgres", slog.Any("error", err)) + // s.FailNow() + // } if s.sfHelper != nil { - err = s.sfHelper.Cleanup() + err := s.sfHelper.Cleanup() if err != nil { slog.Error("failed to tear down Snowflake", slog.Any("error", err)) s.FailNow() } } - err = s.connector.Close() - + err := s.connector.Close() if err != nil { slog.Error("failed to close Snowflake connector", slog.Any("error", err)) s.FailNow() @@ -1610,3 +1609,78 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { require.NoError(s.t, err) s.Equal(0, numNewRows) } + +func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) + + srcTableName := s.attachSchemaSuffix("testMixedCase") + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_mixed_case") + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS e2e_test_%s."%s" ( + "pulseArmor" SERIAL PRIMARY KEY, + "highGold" TEXT NOT NULL, + "eVe" TEXT NOT NULL + ); + `, s.pgSuffix, "testMixedCase")) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_mixed_case"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.sfHelper.Peer, + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + require.NoError(s.t, err) + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 20, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert 20 rows into the source table + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + // insert 20 rows into the source table + for i := 0; i < 20; i++ { + testKey := fmt.Sprintf("test_key_%d", i) + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO e2e_test_%s."%s"("highGold","eVe") VALUES ($1, $2) + `, s.pgSuffix, "testMixedCase"), testKey, testValue) + require.NoError(s.t, err) + } + fmt.Println("Inserted 20 rows into the source table") + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + + // allow only continue as new error + require.Contains(s.t, err.Error(), "continue as new") + + count, err := s.sfHelper.CountRows("test_mixed_case") + require.NoError(s.t, err) + s.Equal(20, count) + + // check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago + // it should match the count. + newerSyncedAtQuery := fmt.Sprintf(` + SELECT COUNT(*) FROM %s WHERE _PEERDB_SYNCED_AT > CURRENT_TIMESTAMP() - INTERVAL '30 MINUTE' + `, dstTableName) + numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) + require.NoError(s.t, err) + s.Equal(20, numNewRows) + + // TODO: verify that the data is correctly synced to the destination table + // on the Snowflake side + + env.AssertExpectations(s.t) +}