Skip to content

Commit

Permalink
fixing tests pt.4
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Sep 15, 2023
1 parent 3a48fa6 commit dde625a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 24 deletions.
57 changes: 36 additions & 21 deletions flow/connectors/postgres/postgres_cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,23 @@ func (suite *PostgresCDCTestSuite) insertSimpleRecords(srcTableName string) {
func (suite *PostgresCDCTestSuite) validateInsertedSimpleRecords(records []model.Record, srcTableName string,
dstTableName string) {
suite.Equal(3, len(records))
model.NewRecordItemWithData([]string{"id", "name"},
[]*qvalue.QValue{
{Kind: qvalue.QValueKindInt32, Value: int32(2)},
{Kind: qvalue.QValueKindString, Value: "quick"}})
matchData := []*model.RecordItems{
model.NewRecordItemWithData(map[string]*qvalue.QValue{
"id": {Kind: qvalue.QValueKindInt32, Value: int32(2)},
"name": {Kind: qvalue.QValueKindString, Value: "quick"},
}),
model.NewRecordItemWithData(map[string]*qvalue.QValue{
"id": {Kind: qvalue.QValueKindInt32, Value: int32(4)},
"name": {Kind: qvalue.QValueKindString, Value: "brown"},
}),
model.NewRecordItemWithData(map[string]*qvalue.QValue{
"id": {Kind: qvalue.QValueKindInt32, Value: int32(8)},
"name": {Kind: qvalue.QValueKindString, Value: "fox"},
}),
model.NewRecordItemWithData([]string{"id", "name"},
[]*qvalue.QValue{
{Kind: qvalue.QValueKindInt32, Value: int32(2)},
{Kind: qvalue.QValueKindString, Value: "quick"}}),
model.NewRecordItemWithData([]string{"id", "name"},
[]*qvalue.QValue{
{Kind: qvalue.QValueKindInt32, Value: int32(4)},
{Kind: qvalue.QValueKindString, Value: "brown"}}),
model.NewRecordItemWithData([]string{"id", "name"},
[]*qvalue.QValue{
{Kind: qvalue.QValueKindInt32, Value: int32(8)},
{Kind: qvalue.QValueKindString, Value: "fox"}}),
}
for idx, record := range records {
suite.IsType(&model.InsertRecord{}, record)
Expand Down Expand Up @@ -90,22 +94,22 @@ func (suite *PostgresCDCTestSuite) validateSimpleMutatedRecords(records []model.
updateRecord := records[0].(*model.UpdateRecord)
suite.Equal(srcTableName, updateRecord.SourceTableName)
suite.Equal(dstTableName, updateRecord.DestinationTableName)
suite.Equal(model.RecordItems{}, updateRecord.OldItems)
suite.Equal(model.NewRecordItemWithData([]string{}, []*qvalue.QValue{}), updateRecord.OldItems)

items := model.NewRecordItemWithData(map[string]*qvalue.QValue{
"id": {Kind: qvalue.QValueKindInt32, Value: int32(2)},
"name": {Kind: qvalue.QValueKindString, Value: "slow"},
})
items := model.NewRecordItemWithData([]string{"id", "name"},
[]*qvalue.QValue{
{Kind: qvalue.QValueKindInt32, Value: int32(2)},
{Kind: qvalue.QValueKindString, Value: "slow"}})
suite.Equal(items, updateRecord.NewItems)

suite.IsType(&model.DeleteRecord{}, records[1])
deleteRecord := records[1].(*model.DeleteRecord)
suite.Equal(srcTableName, deleteRecord.SourceTableName)
suite.Equal(dstTableName, deleteRecord.DestinationTableName)
items = model.NewRecordItemWithData(map[string]*qvalue.QValue{
"id": {Kind: qvalue.QValueKindInt32, Value: int32(8)},
"name": {Kind: qvalue.QValueKindInvalid, Value: nil},
})
items = model.NewRecordItemWithData([]string{"id", "name"},
[]*qvalue.QValue{
{Kind: qvalue.QValueKindInt32, Value: int32(8)},
{Kind: qvalue.QValueKindInvalid, Value: nil}})
suite.Equal(items, deleteRecord.Items)
}

Expand Down Expand Up @@ -780,6 +784,17 @@ func (suite *PostgresCDCTestSuite) TestToastHappyFlow() {
RelationMessageMapping: relationMessageMapping,
})
suite.failTestError(err)
recordsWithSchemaDelta, err = suite.connector.PullRecords(&model.PullRecordsRequest{
FlowJobName: toastHappyFlowName,
LastSyncState: nil,
IdleTimeout: 10 * time.Second,
MaxBatchSize: 100,
SrcTableIDNameMapping: relIDTableNameMapping,
TableNameMapping: tableNameMapping,
TableNameSchemaMapping: tableNameSchemaMapping,
RelationMessageMapping: relationMessageMapping,
})
suite.failTestError(err)
suite.Nil(recordsWithSchemaDelta.TableSchemaDelta)
suite.validateInsertedToastRecords(recordsWithSchemaDelta.RecordBatch.Records,
toastHappyFlowSrcTableName, toastHappyFlowDstTableName)
Expand Down
6 changes: 3 additions & 3 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ func NewRecordItems() *RecordItems {
}
}

func NewRecordItemWithData(data map[string]*qvalue.QValue) *RecordItems {
func NewRecordItemWithData(cols []string, val []*qvalue.QValue) *RecordItems {
recordItem := NewRecordItems()
for col, val := range data {
for i, col := range cols {
recordItem.colToValIdx[col] = len(recordItem.values)
recordItem.values = append(recordItem.values, val)
recordItem.values = append(recordItem.values, val[i])
}
return recordItem
}
Expand Down

0 comments on commit dde625a

Please sign in to comment.