diff --git a/flow/connectors/postgres/postgres_cdc_test.go b/flow/connectors/postgres/postgres_cdc_test.go index 11659b43d5..cacc46ec17 100644 --- a/flow/connectors/postgres/postgres_cdc_test.go +++ b/flow/connectors/postgres/postgres_cdc_test.go @@ -40,19 +40,23 @@ func (suite *PostgresCDCTestSuite) insertSimpleRecords(srcTableName string) { func (suite *PostgresCDCTestSuite) validateInsertedSimpleRecords(records []model.Record, srcTableName string, dstTableName string) { suite.Equal(3, len(records)) + model.NewRecordItemWithData([]string{"id", "name"}, + []*qvalue.QValue{ + {Kind: qvalue.QValueKindInt32, Value: int32(2)}, + {Kind: qvalue.QValueKindString, Value: "quick"}}) matchData := []*model.RecordItems{ - model.NewRecordItemWithData(map[string]*qvalue.QValue{ - "id": {Kind: qvalue.QValueKindInt32, Value: int32(2)}, - "name": {Kind: qvalue.QValueKindString, Value: "quick"}, - }), - model.NewRecordItemWithData(map[string]*qvalue.QValue{ - "id": {Kind: qvalue.QValueKindInt32, Value: int32(4)}, - "name": {Kind: qvalue.QValueKindString, Value: "brown"}, - }), - model.NewRecordItemWithData(map[string]*qvalue.QValue{ - "id": {Kind: qvalue.QValueKindInt32, Value: int32(8)}, - "name": {Kind: qvalue.QValueKindString, Value: "fox"}, - }), + model.NewRecordItemWithData([]string{"id", "name"}, + []*qvalue.QValue{ + {Kind: qvalue.QValueKindInt32, Value: int32(2)}, + {Kind: qvalue.QValueKindString, Value: "quick"}}), + model.NewRecordItemWithData([]string{"id", "name"}, + []*qvalue.QValue{ + {Kind: qvalue.QValueKindInt32, Value: int32(4)}, + {Kind: qvalue.QValueKindString, Value: "brown"}}), + model.NewRecordItemWithData([]string{"id", "name"}, + []*qvalue.QValue{ + {Kind: qvalue.QValueKindInt32, Value: int32(8)}, + {Kind: qvalue.QValueKindString, Value: "fox"}}), } for idx, record := range records { suite.IsType(&model.InsertRecord{}, record) @@ -90,22 +94,22 @@ func (suite *PostgresCDCTestSuite) validateSimpleMutatedRecords(records []model. updateRecord := records[0].(*model.UpdateRecord) suite.Equal(srcTableName, updateRecord.SourceTableName) suite.Equal(dstTableName, updateRecord.DestinationTableName) - suite.Equal(model.RecordItems{}, updateRecord.OldItems) + suite.Equal(model.NewRecordItemWithData([]string{}, []*qvalue.QValue{}), updateRecord.OldItems) - items := model.NewRecordItemWithData(map[string]*qvalue.QValue{ - "id": {Kind: qvalue.QValueKindInt32, Value: int32(2)}, - "name": {Kind: qvalue.QValueKindString, Value: "slow"}, - }) + items := model.NewRecordItemWithData([]string{"id", "name"}, + []*qvalue.QValue{ + {Kind: qvalue.QValueKindInt32, Value: int32(2)}, + {Kind: qvalue.QValueKindString, Value: "slow"}}) suite.Equal(items, updateRecord.NewItems) suite.IsType(&model.DeleteRecord{}, records[1]) deleteRecord := records[1].(*model.DeleteRecord) suite.Equal(srcTableName, deleteRecord.SourceTableName) suite.Equal(dstTableName, deleteRecord.DestinationTableName) - items = model.NewRecordItemWithData(map[string]*qvalue.QValue{ - "id": {Kind: qvalue.QValueKindInt32, Value: int32(8)}, - "name": {Kind: qvalue.QValueKindInvalid, Value: nil}, - }) + items = model.NewRecordItemWithData([]string{"id", "name"}, + []*qvalue.QValue{ + {Kind: qvalue.QValueKindInt32, Value: int32(8)}, + {Kind: qvalue.QValueKindInvalid, Value: nil}}) suite.Equal(items, deleteRecord.Items) } @@ -780,6 +784,17 @@ func (suite *PostgresCDCTestSuite) TestToastHappyFlow() { RelationMessageMapping: relationMessageMapping, }) suite.failTestError(err) + recordsWithSchemaDelta, err = suite.connector.PullRecords(&model.PullRecordsRequest{ + FlowJobName: toastHappyFlowName, + LastSyncState: nil, + IdleTimeout: 10 * time.Second, + MaxBatchSize: 100, + SrcTableIDNameMapping: relIDTableNameMapping, + TableNameMapping: tableNameMapping, + TableNameSchemaMapping: tableNameSchemaMapping, + RelationMessageMapping: relationMessageMapping, + }) + suite.failTestError(err) suite.Nil(recordsWithSchemaDelta.TableSchemaDelta) suite.validateInsertedToastRecords(recordsWithSchemaDelta.RecordBatch.Records, toastHappyFlowSrcTableName, toastHappyFlowDstTableName) diff --git a/flow/model/model.go b/flow/model/model.go index 3984ad2848..64582127a9 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -57,11 +57,11 @@ func NewRecordItems() *RecordItems { } } -func NewRecordItemWithData(data map[string]*qvalue.QValue) *RecordItems { +func NewRecordItemWithData(cols []string, val []*qvalue.QValue) *RecordItems { recordItem := NewRecordItems() - for col, val := range data { + for i, col := range cols { recordItem.colToValIdx[col] = len(recordItem.values) - recordItem.values = append(recordItem.values, val) + recordItem.values = append(recordItem.values, val[i]) } return recordItem }