Skip to content

Commit

Permalink
fix tests and increase capacity
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 18, 2023
1 parent beb55b5 commit 11add23
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 16 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ func (c *BigQueryConnector) SetupNormalizedTables(
}

// convert the column names and types to bigquery types
columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns))
columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns), len(tableSchema.Columns)+2)
idx := 0
for colName, genericColType := range tableSchema.Columns {
columns[idx] = &bigquery.FieldSchema{
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func generateCreateTableSQLForNormalizedTable(
softDeleteColName string,
syncedAtColName string,
) string {
createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns))
createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns)+2)
for columnName, genericColumnType := range sourceTableSchema.Columns {
createTableSQLArray = append(createTableSQLArray, fmt.Sprintf("\"%s\" %s,", columnName,
qValueKindToPostgresType(genericColumnType)))
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ func generateCreateTableSQLForNormalizedTable(
softDeleteColName string,
syncedAtColName string,
) string {
createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns))
createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns)+2)
for columnName, genericColumnType := range sourceTableSchema.Columns {
columnNameUpper := strings.ToUpper(columnName)
sfColType, err := qValueKindToSnowflakeType(qvalue.QValueKind(genericColumnType))
Expand Down
15 changes: 7 additions & 8 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ func (s PeerFlowE2ETestSuiteBQ) attachSuffix(input string) string {
return fmt.Sprintf("%s_%s", input, s.bqSuffix)
}

func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstSchemaQualified string, rowID int8) error {
func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, rowID int8) error {
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, dstQualified)
query := fmt.Sprintf("SELECT `_PEERDB_IS_DELETED`,`_PEERDB_SYNCED_AT` FROM %s WHERE id = %d",
dstSchemaQualified, rowID)
qualifiedTableName, rowID)

recordBatch, err := s.bqHelper.ExecuteAndProcessQuery(query)
if err != nil {
Expand Down Expand Up @@ -456,7 +457,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

s.compareTableContentsBQ(dstTableName, "id,t1,t2,k,_PEERDB_IS_DELETED, _PEERDB_SYNCED_AT")
s.compareTableContentsBQ(dstTableName, "id,t1,t2,k")
env.AssertExpectations(s.t)
<-done
}
Expand Down Expand Up @@ -1133,13 +1134,12 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
env.AssertExpectations(s.t)
}

func (s *PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns() {
func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_peerdb_cols")
dstTableName := s.attachSchemaSuffix("test_peerdb_cols_dst")

dstTableName := "test_peerdb_cols_dst"
_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
Expand All @@ -1154,6 +1154,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns() {
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
SoftDelete: true,
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
Expand All @@ -1179,7 +1180,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns() {
DELETE FROM %s WHERE id=1
`, srcTableName))
require.NoError(s.t, err)
fmt.Println("Inserted 10 rows into the source table")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand All @@ -1189,7 +1189,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns() {
err = env.GetWorkflowError()

// allow only continue as new error
s.Error(err)
require.Contains(s.t, err.Error(), "continue as new")

err = s.checkPeerdbColumns(dstTableName, 1)
Expand Down
7 changes: 5 additions & 2 deletions flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ type FlowConnectionGenerationConfig struct {
PostgresPort int
Destination *protos.Peer
CdcStagingPath string
SoftDelete bool
}

// GenerateSnowflakePeer generates a snowflake peer config for testing.
Expand Down Expand Up @@ -201,8 +202,10 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() (*proto
ret.Source = GeneratePostgresPeer(c.PostgresPort)
ret.Destination = c.Destination
ret.CdcStagingPath = c.CdcStagingPath
ret.SoftDelete = true
ret.SoftDeleteColName = "_PEERDB_IS_DELETED"
ret.SoftDelete = c.SoftDelete
if ret.SoftDelete {
ret.SoftDeleteColName = "_PEERDB_IS_DELETED"
}
ret.SyncedAtColName = "_PEERDB_SYNCED_AT"
return ret, nil
}
Expand Down
1 change: 1 addition & 0 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() {
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.peer,
SoftDelete: true,
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
Expand Down
7 changes: 4 additions & 3 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1176,8 +1176,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() {
Exclude: []string{"c2"},
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcStagingPath: connectionGen.CdcStagingPath,
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcStagingPath: connectionGen.CdcStagingPath,
SyncedAtColName: "_PEERDB_SYNCED_AT",
}

limits := peerflow.CDCFlowLimits{
Expand Down Expand Up @@ -1221,7 +1222,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() {
for _, field := range sfRows.Schema.Fields {
require.NotEqual(s.t, field.Name, "c2")
}
s.Equal(4, len(sfRows.Schema.Fields))
s.Equal(5, len(sfRows.Schema.Fields))
s.Equal(10, len(sfRows.Records))
}

Expand Down

0 comments on commit 11add23

Please sign in to comment.