Skip to content

Commit

Permalink
copy insert code
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Nov 14, 2023
1 parent 934852e commit c402663
Showing 1 changed file with 31 additions and 6 deletions.
37 changes: 31 additions & 6 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func TestPeerFlowE2ETestSuiteSF(t *testing.T) {
{"Test_Composite_PKey_SF", s.Test_Composite_PKey_SF},
{"Test_Composite_PKey_Toast_1_SF", s.Test_Composite_PKey_Toast_1_SF},
{"Test_Composite_PKey_Toast_2_SF", s.Test_Composite_PKey_Toast_2_SF},
{"Test_Column_Exclusion", s.Test_Column_Exclusion},
}

// assert that there are no duplicate test names
Expand Down Expand Up @@ -1346,10 +1347,12 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) {
`, srcTableName))
require.NoError(t, err)

c := e2e.FlowConnectionGenerationConfig{}
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_exclude_flow"),
}

config := &protos.FlowConnectionConfigs{
FlowJobName: s.attachSuffix("test_exclude_flow"),
FlowJobName: connectionGen.FlowJobName,
Destination: s.sfHelper.Peer,
TableMappings: []*protos.TableMapping{
{
Expand All @@ -1359,23 +1362,44 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) {
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcSyncMode: c.CDCSyncMode,
CdcStagingPath: c.CdcStagingPath,
CdcSyncMode: connectionGen.CDCSyncMode,
CdcStagingPath: connectionGen.CdcStagingPath,
}

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

// insert 10 rows into the source table
for i := 0; i < 10; i++ {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(100))
`, srcTableName), i, testValue)
require.NoError(t, err)
}
fmt.Println("Inserted 10 rows into the source table")

e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
_, err = s.pool.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
require.NoError(t, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
require.NoError(t, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, &config, &limits, nil)
require.True(t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()
s.Error(err)
s.Contains(err.Error(), "continue as new")

// TODO insert rows, wait

query := fmt.Sprintf("SELECT * FROM %s.%s.test_exclude_sf ORDER BY id",
s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName)
sfRows, err := s.sfHelper.ExecuteAndProcessQuery(query)
Expand All @@ -1385,4 +1409,5 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) {
require.NotEqual(t, field.Name, "c2")
}
require.Equal(t, len(sfRows.Schema.Fields), 4)
// TODO inspect content
}

0 comments on commit c402663

Please sign in to comment.