Skip to content

Commit

Permalink
e2e: remove Test_Toast_Nochanges (#1182)
Browse files Browse the repository at this point in the history
This test doesn't really make sense,
since the goroutine can finish before or after the workflow runs
  • Loading branch information
serprex authored Jan 31, 2024
1 parent edf5c7b commit 50153c3
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 126 deletions.
64 changes: 0 additions & 64 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"log/slog"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -425,69 +424,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
e2e.RequireEqualTables(s, dstTableName, "id,t1,t2,k")
}

func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)

srcTableName := s.attachSchemaSuffix("test_toast_bq_2")
dstTableName := "test_toast_bq_2"

_, err := s.conn.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
t1 text,
t2 text,
k int
);
`, srcTableName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_toast_bq_2"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 0,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and execute a transaction touching toast columns
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
/* transaction updating no rows */
_, err = s.conn.Exec(context.Background(), fmt.Sprintf(`
BEGIN;
UPDATE %s SET k=102 WHERE id=1;
UPDATE %s SET t1='dummy' WHERE id=2;
END;
`, srcTableName, srcTableName))
e2e.EnvNoError(s.t, env, err)
s.t.Log("Executed a transaction touching toast columns")
wg.Done()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
wg.Wait()

// Verify workflow completes without error
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

e2e.RequireEqualTables(s, dstTableName, "id,t1,t2,k")
}

func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
Expand Down
62 changes: 0 additions & 62 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"log/slog"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -398,67 +397,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() {
e2e.RequireEqualTables(s, "test_toast_sf_1", `id,t1,t2,k`)
}

func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)

srcTableName := s.attachSchemaSuffix("test_toast_sf_2")
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_2")

_, err := s.conn.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
t1 text,
t2 text,
k int
);
`, srcTableName))
slog.Info(fmt.Sprintf("Creating table '%s', err: %v", srcTableName, err))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_toast_sf_2"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.sfHelper.Peer,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 0,
MaxBatchSize: 100,
}

wg := sync.WaitGroup{}
wg.Add(1)

go func() {
defer wg.Done()
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
/* transaction updating no rows */
_, err = s.conn.Exec(context.Background(), fmt.Sprintf(`
BEGIN;
UPDATE %s SET k=102 WHERE id=1;
UPDATE %s SET t1='dummy' WHERE id=2;
END;
`, srcTableName, srcTableName))
e2e.EnvNoError(s.t, env, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)

// Verify workflow completes without error
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

e2e.RequireEqualTables(s, "test_toast_sf_2", `id,t1,t2,k`)
wg.Wait()
}

func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
Expand Down

0 comments on commit 50153c3

Please sign in to comment.