diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index 56fab2c918..a95cc7ef24 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -96,9 +96,9 @@ func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { suite.failTestError(err) suite.Equal(&protos.TableSchema{ TableIdentifier: tableName, - Columns: map[string]string{ - "id": string(qvalue.QValueKindInt32), - "hi": string(qvalue.QValueKindInt64), + ColumnNameType: []string{ + "id", string(qvalue.QValueKindInt32), + "hi", string(qvalue.QValueKindInt64), }, PrimaryKeyColumns: []string{"id"}, }, output.TableNameSchemaMapping[tableName]) @@ -113,24 +113,24 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, // goal is to test all types we're currently mapping to, not all QValue types - Columns: map[string]string{ - "id": string(qvalue.QValueKindInt32), - "c1": string(qvalue.QValueKindBit), - "c2": string(qvalue.QValueKindBoolean), - "c3": string(qvalue.QValueKindBytes), - "c4": string(qvalue.QValueKindDate), - "c5": string(qvalue.QValueKindFloat32), - "c6": string(qvalue.QValueKindFloat64), - "c7": string(qvalue.QValueKindInt16), - "c8": string(qvalue.QValueKindInt32), - "c9": string(qvalue.QValueKindInt64), - "c10": string(qvalue.QValueKindJSON), - "c11": string(qvalue.QValueKindNumeric), - "c12": string(qvalue.QValueKindString), - "c13": string(qvalue.QValueKindTime), - "c14": string(qvalue.QValueKindTimestamp), - "c15": string(qvalue.QValueKindTimestampTZ), - "c16": string(qvalue.QValueKindUUID), + ColumnNameType: []string{ + "id", string(qvalue.QValueKindInt32), + "c1", string(qvalue.QValueKindBit), + "c2", string(qvalue.QValueKindBoolean), + "c3", string(qvalue.QValueKindBytes), + "c4", string(qvalue.QValueKindDate), + "c5", string(qvalue.QValueKindFloat32), + "c6", string(qvalue.QValueKindFloat64), + "c7", string(qvalue.QValueKindInt16), + "c8", string(qvalue.QValueKindInt32), + "c9", string(qvalue.QValueKindInt64), + "c10", string(qvalue.QValueKindJSON), + "c11", string(qvalue.QValueKindNumeric), + "c12", string(qvalue.QValueKindString), + "c13", string(qvalue.QValueKindTime), + "c14", string(qvalue.QValueKindTimestamp), + "c15", string(qvalue.QValueKindTimestampTZ), + "c16", string(qvalue.QValueKindUUID), }, PrimaryKeyColumns: []string{"id"}, } @@ -166,17 +166,17 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, - Columns: map[string]string{ - "id": string(qvalue.QValueKindInt32), - "c1": string(qvalue.QValueKindString), - "C1": string(qvalue.QValueKindString), - "C 1": string(qvalue.QValueKindString), - "right": string(qvalue.QValueKindString), - "select": string(qvalue.QValueKindString), - "XMIN": string(qvalue.QValueKindString), - "Cariño": string(qvalue.QValueKindString), - "±ªþ³§": string(qvalue.QValueKindString), - "カラム": string(qvalue.QValueKindString), + ColumnNameType: []string{ + "id", string(qvalue.QValueKindInt32), + "c1", string(qvalue.QValueKindString), + "C1", string(qvalue.QValueKindString), + "C 1", string(qvalue.QValueKindString), + "right", string(qvalue.QValueKindString), + "select", string(qvalue.QValueKindString), + "XMIN", string(qvalue.QValueKindString), + "Cariño", string(qvalue.QValueKindString), + "±ªþ³§", string(qvalue.QValueKindString), + "カラム", string(qvalue.QValueKindString), }, PrimaryKeyColumns: []string{"id"}, } @@ -212,11 +212,11 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, - Columns: map[string]string{ - " ": string(qvalue.QValueKindInt32), - " ": string(qvalue.QValueKindString), - " ": string(qvalue.QValueKindInt64), - " ": string(qvalue.QValueKindDate), + ColumnNameType: []string{ + " ", string(qvalue.QValueKindInt32), + " ", string(qvalue.QValueKindString), + " ", string(qvalue.QValueKindInt64), + " ", string(qvalue.QValueKindDate), }, PrimaryKeyColumns: []string{" "}, } diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 53658bf6ff..f2a35c6271 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -147,9 +147,9 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { e2e.NormalizeFlowCountQuery(env, connectionGen, 2) expectedTableSchema := &protos.TableSchema{ TableIdentifier: dstTableName, - Columns: map[string]string{ - "id": string(qvalue.QValueKindInt64), - "c1": string(qvalue.QValueKindInt64), + ColumnNameType: []string{ + "id", string(qvalue.QValueKindInt64), + "c1", string(qvalue.QValueKindInt64), }, PrimaryKeyColumns: []string{"id"}, } @@ -175,10 +175,10 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { e2e.NormalizeFlowCountQuery(env, connectionGen, 4) expectedTableSchema = &protos.TableSchema{ TableIdentifier: dstTableName, - Columns: map[string]string{ - "id": string(qvalue.QValueKindInt64), - "c1": string(qvalue.QValueKindInt64), - "c2": string(qvalue.QValueKindInt64), + ColumnNameType: []string{ + "id", string(qvalue.QValueKindInt64), + "c1", string(qvalue.QValueKindInt64), + "c2", string(qvalue.QValueKindInt64), }, PrimaryKeyColumns: []string{"id"}, } @@ -204,11 +204,11 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { e2e.NormalizeFlowCountQuery(env, connectionGen, 6) expectedTableSchema = &protos.TableSchema{ TableIdentifier: dstTableName, - Columns: map[string]string{ - "id": string(qvalue.QValueKindInt64), - "c1": string(qvalue.QValueKindInt64), - "c2": string(qvalue.QValueKindInt64), - "c3": string(qvalue.QValueKindInt64), + ColumnNameType: []string{ + "id", string(qvalue.QValueKindInt64), + "c1", string(qvalue.QValueKindInt64), + "c2", string(qvalue.QValueKindInt64), + "c3", string(qvalue.QValueKindInt64), }, PrimaryKeyColumns: []string{"id"}, } @@ -234,11 +234,11 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { e2e.NormalizeFlowCountQuery(env, connectionGen, 8) expectedTableSchema = &protos.TableSchema{ TableIdentifier: dstTableName, - Columns: map[string]string{ - "id": string(qvalue.QValueKindInt64), - "c1": string(qvalue.QValueKindInt64), - "c2": string(qvalue.QValueKindInt64), - "c3": string(qvalue.QValueKindInt64), + ColumnNameType: []string{ + "id", string(qvalue.QValueKindInt64), + "c1", string(qvalue.QValueKindInt64), + "c2", string(qvalue.QValueKindInt64), + "c3", string(qvalue.QValueKindInt64), }, PrimaryKeyColumns: []string{"id"}, } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 314c92d2b5..04cc3de955 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -858,11 +858,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { e2e.NormalizeFlowCountQuery(env, connectionGen, 2) expectedTableSchema := &protos.TableSchema{ TableIdentifier: strings.ToUpper(dstTableName), - Columns: map[string]string{ - "ID": string(qvalue.QValueKindNumeric), - "C1": string(qvalue.QValueKindNumeric), - "_PEERDB_IS_DELETED": string(qvalue.QValueKindBoolean), - "_PEERDB_SYNCED_AT": string(qvalue.QValueKindTimestamp), + ColumnNameType: []string{ + "ID", string(qvalue.QValueKindNumeric), + "C1", string(qvalue.QValueKindNumeric), + "_PEERDB_IS_DELETED", string(qvalue.QValueKindBoolean), + "_PEERDB_SYNCED_AT", string(qvalue.QValueKindTimestamp), }, } output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ @@ -886,12 +886,12 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { e2e.NormalizeFlowCountQuery(env, connectionGen, 4) expectedTableSchema = &protos.TableSchema{ TableIdentifier: strings.ToUpper(dstTableName), - Columns: map[string]string{ - "ID": string(qvalue.QValueKindNumeric), - "C1": string(qvalue.QValueKindNumeric), - "C2": string(qvalue.QValueKindNumeric), - "_PEERDB_IS_DELETED": string(qvalue.QValueKindBoolean), - "_PEERDB_SYNCED_AT": string(qvalue.QValueKindTimestamp), + ColumnNameType: []string{ + "ID", string(qvalue.QValueKindNumeric), + "C1", string(qvalue.QValueKindNumeric), + "C2", string(qvalue.QValueKindNumeric), + "_PEERDB_IS_DELETED", string(qvalue.QValueKindBoolean), + "_PEERDB_SYNCED_AT", string(qvalue.QValueKindTimestamp), }, } output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ @@ -915,13 +915,13 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { e2e.NormalizeFlowCountQuery(env, connectionGen, 6) expectedTableSchema = &protos.TableSchema{ TableIdentifier: strings.ToUpper(dstTableName), - Columns: map[string]string{ - "ID": string(qvalue.QValueKindNumeric), - "C1": string(qvalue.QValueKindNumeric), - "C2": string(qvalue.QValueKindNumeric), - "C3": string(qvalue.QValueKindNumeric), - "_PEERDB_IS_DELETED": string(qvalue.QValueKindBoolean), - "_PEERDB_SYNCED_AT": string(qvalue.QValueKindTimestamp), + ColumnNameType: []string{ + "ID", string(qvalue.QValueKindNumeric), + "C1", string(qvalue.QValueKindNumeric), + "C2", string(qvalue.QValueKindNumeric), + "C3", string(qvalue.QValueKindNumeric), + "_PEERDB_IS_DELETED", string(qvalue.QValueKindBoolean), + "_PEERDB_SYNCED_AT", string(qvalue.QValueKindTimestamp), }, } output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ @@ -945,13 +945,13 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { e2e.NormalizeFlowCountQuery(env, connectionGen, 8) expectedTableSchema = &protos.TableSchema{ TableIdentifier: strings.ToUpper(dstTableName), - Columns: map[string]string{ - "ID": string(qvalue.QValueKindNumeric), - "C1": string(qvalue.QValueKindNumeric), - "C2": string(qvalue.QValueKindNumeric), - "C3": string(qvalue.QValueKindNumeric), - "_PEERDB_IS_DELETED": string(qvalue.QValueKindBoolean), - "_PEERDB_SYNCED_AT": string(qvalue.QValueKindTimestamp), + ColumnNameType: []string{ + "ID", string(qvalue.QValueKindNumeric), + "C1", string(qvalue.QValueKindNumeric), + "C2", string(qvalue.QValueKindNumeric), + "C3", string(qvalue.QValueKindNumeric), + "_PEERDB_IS_DELETED", string(qvalue.QValueKindBoolean), + "_PEERDB_SYNCED_AT", string(qvalue.QValueKindTimestamp), }, } output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ @@ -1655,7 +1655,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { `, s.pgSuffix, "testMixedCase"), testKey, testValue) require.NoError(s.t, err) } - fmt.Println("Inserted 20 rows into the source table") + s.t.Log("Inserted 20 rows into the source table") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index e4aa99cf08..0a7c2451c6 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -81,9 +81,9 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { suite.failTestError(err) suite.Equal(&protos.TableSchema{ TableIdentifier: tableName, - Columns: map[string]string{ - "ID": string(qvalue.QValueKindString), - "HI": string(qvalue.QValueKindJSON), + ColumnNameType: []string{ + "ID", string(qvalue.QValueKindString), + "HI", string(qvalue.QValueKindJSON), }, }, output.TableNameSchemaMapping[tableName]) } @@ -96,18 +96,18 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, // goal is to test all types we're currently mapping to, not all QValue types - Columns: map[string]string{ - "ID": string(qvalue.QValueKindString), - "C1": string(qvalue.QValueKindBoolean), - "C2": string(qvalue.QValueKindBytes), - "C3": string(qvalue.QValueKindDate), - "C4": string(qvalue.QValueKindFloat64), - "C5": string(qvalue.QValueKindJSON), - "C6": string(qvalue.QValueKindNumeric), - "C7": string(qvalue.QValueKindString), - "C8": string(qvalue.QValueKindTime), - "C9": string(qvalue.QValueKindTimestamp), - "C10": string(qvalue.QValueKindTimestampTZ), + ColumnNameType: []string{ + "ID", string(qvalue.QValueKindString), + "C1", string(qvalue.QValueKindBoolean), + "C2", string(qvalue.QValueKindBytes), + "C3", string(qvalue.QValueKindDate), + "C4", string(qvalue.QValueKindFloat64), + "C5", string(qvalue.QValueKindJSON), + "C6", string(qvalue.QValueKindNumeric), + "C7", string(qvalue.QValueKindString), + "C8", string(qvalue.QValueKindTime), + "C9", string(qvalue.QValueKindTimestamp), + "C10", string(qvalue.QValueKindTimestampTZ), }, } addedColumns := make([]*protos.DeltaAddedColumn, 0) @@ -142,16 +142,16 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, // strings.ToUpper also does Unicode uppercasing :) - Columns: map[string]string{ - "ID": string(qvalue.QValueKindString), - "C1": string(qvalue.QValueKindString), - "C 1": string(qvalue.QValueKindString), - "RIGHT": string(qvalue.QValueKindString), - "SELECT": string(qvalue.QValueKindString), - "XMIN": string(qvalue.QValueKindString), - "CARIÑO": string(qvalue.QValueKindString), - "±ªÞ³§": string(qvalue.QValueKindString), - "カラム": string(qvalue.QValueKindString), + ColumnNameType: []string{ + "ID", string(qvalue.QValueKindString), + "C1", string(qvalue.QValueKindString), + "C 1", string(qvalue.QValueKindString), + "RIGHT", string(qvalue.QValueKindString), + "SELECT", string(qvalue.QValueKindString), + "XMIN", string(qvalue.QValueKindString), + "CARIÑO", string(qvalue.QValueKindString), + "±ªÞ³§", string(qvalue.QValueKindString), + "カラム", string(qvalue.QValueKindString), }, } addedColumns := make([]*protos.DeltaAddedColumn, 0) @@ -185,11 +185,11 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, - Columns: map[string]string{ - " ": string(qvalue.QValueKindString), - " ": string(qvalue.QValueKindString), - " ": string(qvalue.QValueKindTime), - " ": string(qvalue.QValueKindDate), + ColumnNameType: []string{ + " ", string(qvalue.QValueKindString), + " ", string(qvalue.QValueKindString), + " ", string(qvalue.QValueKindTime), + " ", string(qvalue.QValueKindDate), }, } addedColumns := make([]*protos.DeltaAddedColumn, 0)