Skip to content

Commit

Permalink
Flatten QRecordSchema Fields (#836)
Browse files Browse the repository at this point in the history
Reduces pointer indirection
  • Loading branch information
serprex authored Dec 18, 2023
1 parent 860d0a5 commit 5fc57e6
Show file tree
Hide file tree
Showing 9 changed files with 18 additions and 18 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
tableNameRowsMapping := make(map[string]uint32)
recordStream := model.NewQRecordStream(1 << 20)
err := recordStream.SetSchema(&model.QRecordSchema{
Fields: []*model.QField{
Fields: []model.QField{
{
Name: "_peerdb_uid",
Type: qvalue.QValueKindString,
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc

// FieldDescriptionsToSchema converts a slice of pgconn.FieldDescription to a QRecordSchema.
func (qe *QRepQueryExecutor) fieldDescriptionsToSchema(fds []pgconn.FieldDescription) *model.QRecordSchema {
qfields := make([]*model.QField, len(fds))
qfields := make([]model.QField, len(fds))
for i, fd := range fds {
cname := fd.Name
ctype := postgresOIDToQValueKind(fd.DataTypeOID)
Expand All @@ -119,7 +119,7 @@ func (qe *QRepQueryExecutor) fieldDescriptionsToSchema(fds []pgconn.FieldDescrip
// there isn't a way to know if a column is nullable or not
// TODO fix this.
cnullable := true
qfields[i] = &model.QField{
qfields[i] = model.QField{
Name: cname,
Type: ctype,
Nullable: cnullable,
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/snowflake/avro_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func generateRecords(
numKinds := len(allQValueKinds)

schema := &model.QRecordSchema{
Fields: make([]*model.QField, numKinds),
Fields: make([]model.QField, numKinds),
}

// Create sample records
Expand All @@ -97,7 +97,7 @@ func generateRecords(
}

for i, kind := range allQValueKinds {
schema.Fields[i] = &model.QField{
schema.Fields[i] = model.QField{
Name: string(kind),
Type: kind,
Nullable: nullable,
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ func (g *GenericSQLQueryExecutor) CountNonNullRows(
return count.Int64, err
}

func (g *GenericSQLQueryExecutor) columnTypeToQField(ct *sql.ColumnType) (*model.QField, error) {
func (g *GenericSQLQueryExecutor) columnTypeToQField(ct *sql.ColumnType) (model.QField, error) {
qvKind, ok := g.dbtypeToQValueKind[ct.DatabaseTypeName()]
if !ok {
return nil, fmt.Errorf("unsupported database type %s", ct.DatabaseTypeName())
return model.QField{}, fmt.Errorf("unsupported database type %s", ct.DatabaseTypeName())
}

nullable, ok := ct.Nullable()

return &model.QField{
return model.QField{
Name: ct.Name(),
Type: qvKind,
Nullable: ok && nullable,
Expand All @@ -163,7 +163,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa
}

// Convert dbColTypes to QFields
qfields := make([]*model.QField, len(dbColTypes))
qfields := make([]model.QField, len(dbColTypes))
for i, ct := range dbColTypes {
qfield, err := g.columnTypeToQField(ct)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func RecordsToRawTableStream(req *model.RecordsToStreamRequest) (*model.RecordsToStreamResponse, error) {
recordStream := model.NewQRecordStream(1 << 16)
err := recordStream.SetSchema(&model.QRecordSchema{
Fields: []*model.QField{
Fields: []model.QField{
{
Name: "_peerdb_uid",
Type: qvalue.QValueKindString,
Expand Down
8 changes: 4 additions & 4 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,13 @@ func toQValue(bqValue bigquery.Value) (qvalue.QValue, error) {
}
}

func bqFieldSchemaToQField(fieldSchema *bigquery.FieldSchema) (*model.QField, error) {
func bqFieldSchemaToQField(fieldSchema *bigquery.FieldSchema) (model.QField, error) {
qValueKind, err := peer_bq.BigQueryTypeToQValueKind(fieldSchema.Type)
if err != nil {
return nil, err
return model.QField{}, err
}

return &model.QField{
return model.QField{
Name: fieldSchema.Name,
Type: qValueKind,
Nullable: !fieldSchema.Required,
Expand All @@ -285,7 +285,7 @@ func bqFieldSchemaToQField(fieldSchema *bigquery.FieldSchema) (*model.QField, er

// bqSchemaToQRecordSchema converts a bigquery schema to a QRecordSchema.
func bqSchemaToQRecordSchema(schema bigquery.Schema) (*model.QRecordSchema, error) {
var fields []*model.QField
var fields []model.QField
for _, fieldSchema := range schema {
qField, err := bqFieldSchemaToQField(fieldSchema)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/sqlserver/qrep_flow_sqlserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (s *PeerFlowE2ETestSuiteSQLServer) setupPGDestinationTable(tableName string

func getSimpleTableSchema() *model.QRecordSchema {
return &model.QRecordSchema{
Fields: []*model.QField{
Fields: []model.QField{
{Name: "id", Type: qvalue.QValueKindString, Nullable: true},
{Name: "card_id", Type: qvalue.QValueKindString, Nullable: true},
{Name: "v_from", Type: qvalue.QValueKindTimestamp, Nullable: true},
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func RunXminFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos.

func GetOwnersSchema() *model.QRecordSchema {
return &model.QRecordSchema{
Fields: []*model.QField{
Fields: []model.QField{
{Name: "id", Type: qvalue.QValueKindString, Nullable: true},
{Name: "card_id", Type: qvalue.QValueKindString, Nullable: true},
{Name: "from", Type: qvalue.QValueKindTimestamp, Nullable: true},
Expand Down
4 changes: 2 additions & 2 deletions flow/model/qschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ type QField struct {
}

type QRecordSchema struct {
Fields []*QField
Fields []QField
}

// NewQRecordSchema creates a new QRecordSchema.
func NewQRecordSchema(fields []*QField) *QRecordSchema {
func NewQRecordSchema(fields []QField) *QRecordSchema {
return &QRecordSchema{
Fields: fields,
}
Expand Down

0 comments on commit 5fc57e6

Please sign in to comment.