Skip to content

Commit

Permalink
added partial mixed case test for SF
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 28, 2023
1 parent 682fa0a commit 8a304af
Showing 1 changed file with 82 additions and 8 deletions.
90 changes: 82 additions & 8 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,21 @@ type PeerFlowE2ETestSuiteSF struct {

func TestPeerFlowE2ETestSuiteSF(t *testing.T) {
e2eshared.GotSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSF) {
err := e2e.TearDownPostgres(s.pool, s.pgSuffix)
if err != nil {
slog.Error("failed to tear down Postgres", slog.Any("error", err))
s.FailNow()
}
// err := e2e.TearDownPostgres(s.pool, s.pgSuffix)
// if err != nil {
// slog.Error("failed to tear down Postgres", slog.Any("error", err))
// s.FailNow()
// }

if s.sfHelper != nil {
err = s.sfHelper.Cleanup()
err := s.sfHelper.Cleanup()
if err != nil {
slog.Error("failed to tear down Snowflake", slog.Any("error", err))
s.FailNow()
}
}

err = s.connector.Close()

err := s.connector.Close()
if err != nil {
slog.Error("failed to close Snowflake connector", slog.Any("error", err))
s.FailNow()
Expand Down Expand Up @@ -1610,3 +1609,78 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() {
require.NoError(s.t, err)
s.Equal(0, numNewRows)
}

func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)

srcTableName := s.attachSchemaSuffix("testMixedCase")
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_mixed_case")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS e2e_test_%s."%s" (
"pulseArmor" SERIAL PRIMARY KEY,
"highGold" TEXT NOT NULL,
"eVe" TEXT NOT NULL
);
`, s.pgSuffix, "testMixedCase"))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_mixed_case"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.sfHelper.Peer,
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 20,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert 20 rows into the source table
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
// insert 20 rows into the source table
for i := 0; i < 20; i++ {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO e2e_test_%s."%s"("highGold","eVe") VALUES ($1, $2)
`, s.pgSuffix, "testMixedCase"), testKey, testValue)
require.NoError(s.t, err)
}
fmt.Println("Inserted 20 rows into the source table")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

count, err := s.sfHelper.CountRows("test_mixed_case")
require.NoError(s.t, err)
s.Equal(20, count)

// check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago
// it should match the count.
newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM %s WHERE _PEERDB_SYNCED_AT > CURRENT_TIMESTAMP() - INTERVAL '30 MINUTE'
`, dstTableName)
numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery)
require.NoError(s.t, err)
s.Equal(20, numNewRows)

// TODO: verify that the data is correctly synced to the destination table
// on the Snowflake side

env.AssertExpectations(s.t)
}

0 comments on commit 8a304af

Please sign in to comment.