Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into golang-ci-enable-all
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 18, 2023
2 parents 221b907 + 52a013c commit 429d730
Show file tree
Hide file tree
Showing 23 changed files with 639 additions and 612 deletions.
6 changes: 3 additions & 3 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
tableNameRowsMapping := make(map[string]uint32)
recordStream := model.NewQRecordStream(1 << 20)
err := recordStream.SetSchema(&model.QRecordSchema{
Fields: []*model.QField{
Fields: []model.QField{
{
Name: "_peerdb_uid",
Type: qvalue.QValueKindString,
Expand Down Expand Up @@ -673,8 +673,8 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
Kind: qvalue.QValueKindInt64,
Value: syncBatchID,
}
recordStream.Records <- &model.QRecordOrError{
Record: &model.QRecord{
recordStream.Records <- model.QRecordOrError{
Record: model.QRecord{
NumEntries: 10,
Entries: entries[:],
},
Expand Down
30 changes: 15 additions & 15 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc

// FieldDescriptionsToSchema converts a slice of pgconn.FieldDescription to a QRecordSchema.
func (qe *QRepQueryExecutor) fieldDescriptionsToSchema(fds []pgconn.FieldDescription) *model.QRecordSchema {
qfields := make([]*model.QField, len(fds))
qfields := make([]model.QField, len(fds))
for i, fd := range fds {
cname := fd.Name
ctype := postgresOIDToQValueKind(fd.DataTypeOID)
Expand All @@ -119,7 +119,7 @@ func (qe *QRepQueryExecutor) fieldDescriptionsToSchema(fds []pgconn.FieldDescrip
// there isn't a way to know if a column is nullable or not
// TODO fix this.
cnullable := true
qfields[i] = &model.QField{
qfields[i] = model.QField{
Name: cname,
Type: ctype,
Nullable: cnullable,
Expand All @@ -133,7 +133,7 @@ func (qe *QRepQueryExecutor) ProcessRows(
fieldDescriptions []pgconn.FieldDescription,
) (*model.QRecordBatch, error) {
// Initialize the record slice
records := make([]*model.QRecord, 0)
records := make([]model.QRecord, 0)
qe.logger.Info("Processing rows")
// Iterate over the rows
for rows.Next() {
Expand Down Expand Up @@ -173,13 +173,13 @@ func (qe *QRepQueryExecutor) processRowsStream(
for rows.Next() {
record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap)
if err != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to map row to QRecord: %w", err),
}
return 0, fmt.Errorf("failed to map row to QRecord: %w", err)
}

stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Record: record,
Err: nil,
}
Expand Down Expand Up @@ -214,7 +214,7 @@ func (qe *QRepQueryExecutor) processFetchedRows(
) (int, error) {
rows, err := qe.executeQueryInTx(tx, cursorName, fetchSize)
if err != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: err,
}
qe.logger.Error("[pg_query_executor] failed to execute query in tx",
Expand All @@ -239,7 +239,7 @@ func (qe *QRepQueryExecutor) processFetchedRows(
rows.Close()

if rows.Err() != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: rows.Err(),
}
qe.logger.Error("[pg_query_executor] row iteration failed",
Expand Down Expand Up @@ -275,7 +275,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
}
batch := &model.QRecordBatch{
NumRecords: 0,
Records: make([]*model.QRecord, 0),
Records: make([]model.QRecord, 0),
Schema: schema.Schema,
}
for record := range stream.Records {
Expand Down Expand Up @@ -356,7 +356,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
if qe.snapshot != "" {
_, err = tx.Exec(qe.ctx, fmt.Sprintf("SET TRANSACTION SNAPSHOT '%s'", qe.snapshot))
if err != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to set snapshot: %w", err),
}
qe.logger.Error("[pg_query_executor] failed to set snapshot",
Expand All @@ -367,7 +367,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(

randomUint, err := shared.RandomUInt64()
if err != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to generate random uint: %w", err),
}
return 0, fmt.Errorf("[pg_query_executor] failed to generate random uint: %w", err)
Expand All @@ -379,7 +379,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
qe.logger.Info(fmt.Sprintf("[pg_query_executor] executing cursor declaration for %v with args %v", cursorQuery, args))
_, err = tx.Exec(qe.ctx, cursorQuery, args...)
if err != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to declare cursor: %w", err),
}
qe.logger.Info("[pg_query_executor] failed to declare cursor",
Expand Down Expand Up @@ -411,7 +411,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
qe.logger.Info("Committing transaction")
err = tx.Commit(qe.ctx)
if err != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to commit transaction: %w", err),
}
return 0, fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err)
Expand All @@ -424,13 +424,13 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(

func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
customTypeMap map[uint32]string,
) (*model.QRecord, error) {
) (model.QRecord, error) {
// make vals an empty array of QValue of size len(fds)
record := model.NewQRecord(len(fds))

values, err := row.Values()
if err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
return model.QRecord{}, fmt.Errorf("failed to scan row: %w", err)
}

for i, fd := range fds {
Expand All @@ -439,7 +439,7 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
if !ok {
tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i])
if err != nil {
return nil, fmt.Errorf("failed to parse field: %w", err)
return model.QRecord{}, fmt.Errorf("failed to parse field: %w", err)
}
record.Set(i, tmp)
} else {
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/snowflake/avro_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,18 @@ func generateRecords(
numKinds := len(allQValueKinds)

schema := &model.QRecordSchema{
Fields: make([]*model.QField, numKinds),
Fields: make([]model.QField, numKinds),
}

// Create sample records
records := &model.QRecordBatch{
NumRecords: numRows,
Records: make([]*model.QRecord, numRows),
Records: make([]model.QRecord, numRows),
Schema: schema,
}

for i, kind := range allQValueKinds {
schema.Fields[i] = &model.QField{
schema.Fields[i] = model.QField{
Name: string(kind),
Type: kind,
Nullable: nullable,
Expand All @@ -115,7 +115,7 @@ func generateRecords(
}
}

records.Records[row] = &model.QRecord{
records.Records[row] = model.QRecord{
Entries: entries,
}
}
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,15 @@ func (g *GenericSQLQueryExecutor) CountNonNullRows(
return count.Int64, err
}

func (g *GenericSQLQueryExecutor) columnTypeToQField(ct *sql.ColumnType) (*model.QField, error) {
func (g *GenericSQLQueryExecutor) columnTypeToQField(ct *sql.ColumnType) (model.QField, error) {
qvKind, ok := g.dbtypeToQValueKind[ct.DatabaseTypeName()]
if !ok {
return nil, fmt.Errorf("unsupported database type %s", ct.DatabaseTypeName())
return model.QField{}, fmt.Errorf("unsupported database type %s", ct.DatabaseTypeName())
}

nullable, ok := ct.Nullable()

return &model.QField{
return model.QField{
Name: ct.Name(),
Type: qvKind,
Nullable: ok && nullable,
Expand All @@ -162,7 +162,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa
}

// Convert dbColTypes to QFields
qfields := make([]*model.QField, len(dbColTypes))
qfields := make([]model.QField, len(dbColTypes))
for i, ct := range dbColTypes {
qfield, err := g.columnTypeToQField(ct)
if err != nil {
Expand All @@ -173,7 +173,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa
qfields[i] = qfield
}

var records []*model.QRecord
var records []model.QRecord
totalRowsProcessed := 0
const heartBeatNumRows = 25000

Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,8 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) (
return 0, fmt.Errorf("[avro] failed to get record from stream: %w", qRecordOrErr.Err)
}

qRecord := qRecordOrErr.Record
avroConverter := model.NewQRecordAvroConverter(
qRecord,
qRecordOrErr.Record,
p.targetDWH,
p.avroSchema.NullableFields,
colNames,
Expand Down
18 changes: 9 additions & 9 deletions flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func RecordsToRawTableStream(req *model.RecordsToStreamRequest) (*model.RecordsToStreamResponse, error) {
recordStream := model.NewQRecordStream(1 << 16)
err := recordStream.SetSchema(&model.QRecordSchema{
Fields: []*model.QField{
Fields: []model.QField{
{
Name: "_peerdb_uid",
Type: qvalue.QValueKindString,
Expand Down Expand Up @@ -73,14 +73,14 @@ func RecordsToRawTableStream(req *model.RecordsToStreamRequest) (*model.RecordsT
}, nil
}

func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, record model.Record) *model.QRecordOrError {
func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, record model.Record) model.QRecordOrError {
var entries [8]qvalue.QValue
switch typedRecord := record.(type) {
case *model.InsertRecord:
// json.Marshal converts bytes in Hex automatically to BASE64 string.
itemsJSON, err := typedRecord.Items.ToJSON()
if err != nil {
return &model.QRecordOrError{
return model.QRecordOrError{
Err: fmt.Errorf("failed to serialize insert record items to JSON: %w", err),
}
}
Expand Down Expand Up @@ -110,13 +110,13 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
case *model.UpdateRecord:
newItemsJSON, err := typedRecord.NewItems.ToJSON()
if err != nil {
return &model.QRecordOrError{
return model.QRecordOrError{
Err: fmt.Errorf("failed to serialize update record new items to JSON: %w", err),
}
}
oldItemsJSON, err := typedRecord.OldItems.ToJSON()
if err != nil {
return &model.QRecordOrError{
return model.QRecordOrError{
Err: fmt.Errorf("failed to serialize update record old items to JSON: %w", err),
}
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
case *model.DeleteRecord:
itemsJSON, err := typedRecord.Items.ToJSON()
if err != nil {
return &model.QRecordOrError{
return model.QRecordOrError{
Err: fmt.Errorf("failed to serialize delete record items to JSON: %w", err),
}
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
}
tableMapping[typedRecord.DestinationTableName] += 1
default:
return &model.QRecordOrError{
return model.QRecordOrError{
Err: fmt.Errorf("unknown record type: %T", typedRecord),
}
}
Expand All @@ -191,8 +191,8 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
Value: batchID,
}

return &model.QRecordOrError{
Record: &model.QRecord{
return model.QRecordOrError{
Record: model.QRecord{
NumEntries: 8,
Entries: entries[:],
},
Expand Down
10 changes: 5 additions & 5 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,13 @@ func toQValue(bqValue bigquery.Value) (qvalue.QValue, error) {
}
}

func bqFieldSchemaToQField(fieldSchema *bigquery.FieldSchema) (*model.QField, error) {
func bqFieldSchemaToQField(fieldSchema *bigquery.FieldSchema) (model.QField, error) {
qValueKind, err := peer_bq.BigQueryTypeToQValueKind(fieldSchema.Type)
if err != nil {
return nil, err
return model.QField{}, err
}

return &model.QField{
return model.QField{
Name: fieldSchema.Name,
Type: qValueKind,
Nullable: !fieldSchema.Required,
Expand All @@ -285,7 +285,7 @@ func bqFieldSchemaToQField(fieldSchema *bigquery.FieldSchema) (*model.QField, er

// bqSchemaToQRecordSchema converts a bigquery schema to a QRecordSchema.
func bqSchemaToQRecordSchema(schema bigquery.Schema) (*model.QRecordSchema, error) {
var fields []*model.QField
var fields []model.QField
for _, fieldSchema := range schema {
qField, err := bqFieldSchemaToQField(fieldSchema)
if err != nil {
Expand All @@ -306,7 +306,7 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor
return nil, fmt.Errorf("failed to run command: %w", err)
}

var records []*model.QRecord
var records []model.QRecord
for {
var row []bigquery.Value
err := it.Next(&row)
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/sqlserver/qrep_flow_sqlserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (s *PeerFlowE2ETestSuiteSQLServer) setupPGDestinationTable(tableName string

func getSimpleTableSchema() *model.QRecordSchema {
return &model.QRecordSchema{
Fields: []*model.QField{
Fields: []model.QField{
{Name: "id", Type: qvalue.QValueKindString, Nullable: true},
{Name: "card_id", Type: qvalue.QValueKindString, Nullable: true},
{Name: "v_from", Type: qvalue.QValueKindTimestamp, Nullable: true},
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func RunXminFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos.

func GetOwnersSchema() *model.QRecordSchema {
return &model.QRecordSchema{
Fields: []*model.QField{
Fields: []model.QField{
{Name: "id", Type: qvalue.QValueKindString, Nullable: true},
{Name: "card_id", Type: qvalue.QValueKindString, Nullable: true},
{Name: "from", Type: qvalue.QValueKindTimestamp, Nullable: true},
Expand Down
Loading

0 comments on commit 429d730

Please sign in to comment.