From 5fc57e6d2f18706067057ac5b0fb32670d2631af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 18 Dec 2023 15:26:45 +0000 Subject: [PATCH] Flatten QRecordSchema Fields (#836) Reduces pointer indirection --- flow/connectors/bigquery/bigquery.go | 2 +- flow/connectors/postgres/qrep_query_executor.go | 4 ++-- flow/connectors/snowflake/avro_file_writer_test.go | 4 ++-- flow/connectors/sql/query_executor.go | 8 ++++---- flow/connectors/utils/stream.go | 2 +- flow/e2e/bigquery/bigquery_helper.go | 8 ++++---- flow/e2e/sqlserver/qrep_flow_sqlserver_test.go | 2 +- flow/e2e/test_utils.go | 2 +- flow/model/qschema.go | 4 ++-- 9 files changed, 18 insertions(+), 18 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 233f319a02..7c2d37a489 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -515,7 +515,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro( tableNameRowsMapping := make(map[string]uint32) recordStream := model.NewQRecordStream(1 << 20) err := recordStream.SetSchema(&model.QRecordSchema{ - Fields: []*model.QField{ + Fields: []model.QField{ { Name: "_peerdb_uid", Type: qvalue.QValueKindString, diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 2118cec70a..e25ceef09f 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -101,7 +101,7 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc // FieldDescriptionsToSchema converts a slice of pgconn.FieldDescription to a QRecordSchema. func (qe *QRepQueryExecutor) fieldDescriptionsToSchema(fds []pgconn.FieldDescription) *model.QRecordSchema { - qfields := make([]*model.QField, len(fds)) + qfields := make([]model.QField, len(fds)) for i, fd := range fds { cname := fd.Name ctype := postgresOIDToQValueKind(fd.DataTypeOID) @@ -119,7 +119,7 @@ func (qe *QRepQueryExecutor) fieldDescriptionsToSchema(fds []pgconn.FieldDescrip // there isn't a way to know if a column is nullable or not // TODO fix this. cnullable := true - qfields[i] = &model.QField{ + qfields[i] = model.QField{ Name: cname, Type: ctype, Nullable: cnullable, diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index 76b70f478f..e4733cebec 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -86,7 +86,7 @@ func generateRecords( numKinds := len(allQValueKinds) schema := &model.QRecordSchema{ - Fields: make([]*model.QField, numKinds), + Fields: make([]model.QField, numKinds), } // Create sample records @@ -97,7 +97,7 @@ func generateRecords( } for i, kind := range allQValueKinds { - schema.Fields[i] = &model.QField{ + schema.Fields[i] = model.QField{ Name: string(kind), Type: kind, Nullable: nullable, diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index 82a13b691f..0d7fd177d2 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -141,15 +141,15 @@ func (g *GenericSQLQueryExecutor) CountNonNullRows( return count.Int64, err } -func (g *GenericSQLQueryExecutor) columnTypeToQField(ct *sql.ColumnType) (*model.QField, error) { +func (g *GenericSQLQueryExecutor) columnTypeToQField(ct *sql.ColumnType) (model.QField, error) { qvKind, ok := g.dbtypeToQValueKind[ct.DatabaseTypeName()] if !ok { - return nil, fmt.Errorf("unsupported database type %s", ct.DatabaseTypeName()) + return model.QField{}, fmt.Errorf("unsupported database type %s", ct.DatabaseTypeName()) } nullable, ok := ct.Nullable() - return &model.QField{ + return model.QField{ Name: ct.Name(), Type: qvKind, Nullable: ok && nullable, @@ -163,7 +163,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa } // Convert dbColTypes to QFields - qfields := make([]*model.QField, len(dbColTypes)) + qfields := make([]model.QField, len(dbColTypes)) for i, ct := range dbColTypes { qfield, err := g.columnTypeToQField(ct) if err != nil { diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go index 2ef78d33ce..e753001934 100644 --- a/flow/connectors/utils/stream.go +++ b/flow/connectors/utils/stream.go @@ -12,7 +12,7 @@ import ( func RecordsToRawTableStream(req *model.RecordsToStreamRequest) (*model.RecordsToStreamResponse, error) { recordStream := model.NewQRecordStream(1 << 16) err := recordStream.SetSchema(&model.QRecordSchema{ - Fields: []*model.QField{ + Fields: []model.QField{ { Name: "_peerdb_uid", Type: qvalue.QValueKindString, diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index f3508d033e..a89de8dfc3 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -270,13 +270,13 @@ func toQValue(bqValue bigquery.Value) (qvalue.QValue, error) { } } -func bqFieldSchemaToQField(fieldSchema *bigquery.FieldSchema) (*model.QField, error) { +func bqFieldSchemaToQField(fieldSchema *bigquery.FieldSchema) (model.QField, error) { qValueKind, err := peer_bq.BigQueryTypeToQValueKind(fieldSchema.Type) if err != nil { - return nil, err + return model.QField{}, err } - return &model.QField{ + return model.QField{ Name: fieldSchema.Name, Type: qValueKind, Nullable: !fieldSchema.Required, @@ -285,7 +285,7 @@ func bqFieldSchemaToQField(fieldSchema *bigquery.FieldSchema) (*model.QField, er // bqSchemaToQRecordSchema converts a bigquery schema to a QRecordSchema. func bqSchemaToQRecordSchema(schema bigquery.Schema) (*model.QRecordSchema, error) { - var fields []*model.QField + var fields []model.QField for _, fieldSchema := range schema { qField, err := bqFieldSchemaToQField(fieldSchema) if err != nil { diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index ab15c0862d..2d327458af 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -120,7 +120,7 @@ func (s *PeerFlowE2ETestSuiteSQLServer) setupPGDestinationTable(tableName string func getSimpleTableSchema() *model.QRecordSchema { return &model.QRecordSchema{ - Fields: []*model.QField{ + Fields: []model.QField{ {Name: "id", Type: qvalue.QValueKindString, Nullable: true}, {Name: "card_id", Type: qvalue.QValueKindString, Nullable: true}, {Name: "v_from", Type: qvalue.QValueKindTimestamp, Nullable: true}, diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index fea19ab808..abd0076d54 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -323,7 +323,7 @@ func RunXminFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos. func GetOwnersSchema() *model.QRecordSchema { return &model.QRecordSchema{ - Fields: []*model.QField{ + Fields: []model.QField{ {Name: "id", Type: qvalue.QValueKindString, Nullable: true}, {Name: "card_id", Type: qvalue.QValueKindString, Nullable: true}, {Name: "from", Type: qvalue.QValueKindTimestamp, Nullable: true}, diff --git a/flow/model/qschema.go b/flow/model/qschema.go index 10de9664a9..5081b10d12 100644 --- a/flow/model/qschema.go +++ b/flow/model/qschema.go @@ -13,11 +13,11 @@ type QField struct { } type QRecordSchema struct { - Fields []*QField + Fields []QField } // NewQRecordSchema creates a new QRecordSchema. -func NewQRecordSchema(fields []*QField) *QRecordSchema { +func NewQRecordSchema(fields []QField) *QRecordSchema { return &QRecordSchema{ Fields: fields, }