Skip to content

Commit

Permalink
First stab at snowflake
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 5, 2024
1 parent cf0b228 commit c4b65d5
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 15 deletions.
27 changes: 12 additions & 15 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,16 +858,19 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() {

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert and mutate schema repeatedly.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
// insert first row.
defer wg.Done()

e2e.SetupCDCFlowStatusQuery(env, connectionGen)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted initial row in the source table")

// verify we got our first row.
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
e2e.EnvWaitForEqualTables(env, s, "normalize reinsert", "test_simple_schema_changes", "id,c1")

expectedTableSchema := &protos.TableSchema{
TableIdentifier: strings.ToUpper(dstTableName),
ColumnNames: []string{
Expand Down Expand Up @@ -901,7 +904,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() {
s.t.Log("Inserted row with added c2 in the source table")

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 4)
e2e.EnvWaitForEqualTables(env, s, "normalize altered row", "test_simple_schema_changes", "id,c1,c2")
expectedTableSchema = &protos.TableSchema{
TableIdentifier: strings.ToUpper(dstTableName),
ColumnNames: []string{
Expand Down Expand Up @@ -937,7 +940,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() {
s.t.Log("Inserted row with added c3 in the source table")

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 6)
e2e.EnvWaitForEqualTables(env, s, "normalize dropped c2 column", "test_simple_schema_changes", "id,c1,c3")
expectedTableSchema = &protos.TableSchema{
TableIdentifier: strings.ToUpper(dstTableName),
ColumnNames: []string{
Expand Down Expand Up @@ -975,7 +978,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() {
s.t.Log("Inserted row after dropping all columns in the source table")

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 8)
e2e.EnvWaitForEqualTables(env, s, "normalize dropped c3 column", "test_simple_schema_changes", "id,c1")
expectedTableSchema = &protos.TableSchema{
TableIdentifier: strings.ToUpper(dstTableName),
ColumnNames: []string{
Expand All @@ -1001,18 +1004,12 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() {
e2e.EnvNoError(s.t, env, err)
e2e.EnvEqual(s.t, env, expectedTableSchema, output.TableNameSchemaMapping[dstTableName])
e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1")

env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)

// Verify workflow completes without error
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

env.AssertExpectations(s.t)
wg.Wait()
}

func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() {
Expand Down
25 changes: 25 additions & 0 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,31 @@ func EnvEqualTables(env *testsuite.TestWorkflowEnvironment, suite e2eshared.RowS
EnvEqualRecordBatches(t, env, pgRows, rows)
}

func EnvWaitForEqualTables(
env *testsuite.TestWorkflowEnvironment,
suite e2eshared.RowSource,
reason string,
table string,
cols string,
) {
t := suite.T()
EnvWaitFor(t, env, time.Minute, reason, func(ctx context.Context) bool {
suffix := suite.Suffix()
pool := suite.Pool()
pgRows, err := GetPgRows(pool, suffix, table, cols)
if err != nil {
return false
}

rows, err := suite.GetRows(table, cols)
if err != nil {
return false
}

return e2eshared.CheckEqualRecordBatches(t, pgRows, rows)
})
}

func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment,
connectionGen FlowConnectionGenerationConfig,
) {
Expand Down

0 comments on commit c4b65d5

Please sign in to comment.