Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flatten QRecordSchema Fields #836

Merged
merged 2 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
tableNameRowsMapping := make(map[string]uint32)
recordStream := model.NewQRecordStream(1 << 20)
err := recordStream.SetSchema(&model.QRecordSchema{
Fields: []*model.QField{
Fields: []model.QField{
{
Name: "_peerdb_uid",
Type: qvalue.QValueKindString,
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc

// FieldDescriptionsToSchema converts a slice of pgconn.FieldDescription to a QRecordSchema.
func (qe *QRepQueryExecutor) fieldDescriptionsToSchema(fds []pgconn.FieldDescription) *model.QRecordSchema {
qfields := make([]*model.QField, len(fds))
qfields := make([]model.QField, len(fds))
for i, fd := range fds {
cname := fd.Name
ctype := postgresOIDToQValueKind(fd.DataTypeOID)
Expand All @@ -119,7 +119,7 @@ func (qe *QRepQueryExecutor) fieldDescriptionsToSchema(fds []pgconn.FieldDescrip
// there isn't a way to know if a column is nullable or not
// TODO fix this.
cnullable := true
qfields[i] = &model.QField{
qfields[i] = model.QField{
Name: cname,
Type: ctype,
Nullable: cnullable,
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/snowflake/avro_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func generateRecords(
numKinds := len(allQValueKinds)

schema := &model.QRecordSchema{
Fields: make([]*model.QField, numKinds),
Fields: make([]model.QField, numKinds),
}

// Create sample records
Expand All @@ -97,7 +97,7 @@ func generateRecords(
}

for i, kind := range allQValueKinds {
schema.Fields[i] = &model.QField{
schema.Fields[i] = model.QField{
Name: string(kind),
Type: kind,
Nullable: nullable,
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ func (g *GenericSQLQueryExecutor) CountNonNullRows(
return count.Int64, err
}

func (g *GenericSQLQueryExecutor) columnTypeToQField(ct *sql.ColumnType) (*model.QField, error) {
func (g *GenericSQLQueryExecutor) columnTypeToQField(ct *sql.ColumnType) (model.QField, error) {
qvKind, ok := g.dbtypeToQValueKind[ct.DatabaseTypeName()]
if !ok {
return nil, fmt.Errorf("unsupported database type %s", ct.DatabaseTypeName())
return model.QField{}, fmt.Errorf("unsupported database type %s", ct.DatabaseTypeName())
}

nullable, ok := ct.Nullable()

return &model.QField{
return model.QField{
Name: ct.Name(),
Type: qvKind,
Nullable: ok && nullable,
Expand All @@ -163,7 +163,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa
}

// Convert dbColTypes to QFields
qfields := make([]*model.QField, len(dbColTypes))
qfields := make([]model.QField, len(dbColTypes))
for i, ct := range dbColTypes {
qfield, err := g.columnTypeToQField(ct)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func RecordsToRawTableStream(req *model.RecordsToStreamRequest) (*model.RecordsToStreamResponse, error) {
recordStream := model.NewQRecordStream(1 << 16)
err := recordStream.SetSchema(&model.QRecordSchema{
Fields: []*model.QField{
Fields: []model.QField{
{
Name: "_peerdb_uid",
Type: qvalue.QValueKindString,
Expand Down
8 changes: 4 additions & 4 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,13 @@ func toQValue(bqValue bigquery.Value) (qvalue.QValue, error) {
}
}

func bqFieldSchemaToQField(fieldSchema *bigquery.FieldSchema) (*model.QField, error) {
func bqFieldSchemaToQField(fieldSchema *bigquery.FieldSchema) (model.QField, error) {
qValueKind, err := peer_bq.BigQueryTypeToQValueKind(fieldSchema.Type)
if err != nil {
return nil, err
return model.QField{}, err
}

return &model.QField{
return model.QField{
Name: fieldSchema.Name,
Type: qValueKind,
Nullable: !fieldSchema.Required,
Expand All @@ -285,7 +285,7 @@ func bqFieldSchemaToQField(fieldSchema *bigquery.FieldSchema) (*model.QField, er

// bqSchemaToQRecordSchema converts a bigquery schema to a QRecordSchema.
func bqSchemaToQRecordSchema(schema bigquery.Schema) (*model.QRecordSchema, error) {
var fields []*model.QField
var fields []model.QField
for _, fieldSchema := range schema {
qField, err := bqFieldSchemaToQField(fieldSchema)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/sqlserver/qrep_flow_sqlserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (s *PeerFlowE2ETestSuiteSQLServer) setupPGDestinationTable(tableName string

func getSimpleTableSchema() *model.QRecordSchema {
return &model.QRecordSchema{
Fields: []*model.QField{
Fields: []model.QField{
{Name: "id", Type: qvalue.QValueKindString, Nullable: true},
{Name: "card_id", Type: qvalue.QValueKindString, Nullable: true},
{Name: "v_from", Type: qvalue.QValueKindTimestamp, Nullable: true},
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func RunXminFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos.

func GetOwnersSchema() *model.QRecordSchema {
return &model.QRecordSchema{
Fields: []*model.QField{
Fields: []model.QField{
{Name: "id", Type: qvalue.QValueKindString, Nullable: true},
{Name: "card_id", Type: qvalue.QValueKindString, Nullable: true},
{Name: "from", Type: qvalue.QValueKindTimestamp, Nullable: true},
Expand Down
4 changes: 2 additions & 2 deletions flow/model/qschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ type QField struct {
}

type QRecordSchema struct {
Fields []*QField
Fields []QField
}

// NewQRecordSchema creates a new QRecordSchema.
func NewQRecordSchema(fields []*QField) *QRecordSchema {
func NewQRecordSchema(fields []QField) *QRecordSchema {
return &QRecordSchema{
Fields: fields,
}
Expand Down
Loading