Skip to content

Commit

Permalink
Snowflake Avro sync method (#123)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Jun 12, 2023
1 parent 9a21113 commit eb8f8fe
Show file tree
Hide file tree
Showing 26 changed files with 1,488 additions and 199 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,17 @@ jobs:
name: "bq_service_account.json"
json: ${{ secrets.GCP_GH_CI_PKEY }}

- name: setup snowflake credentials
id: sf-credentials
uses: jsdaniell/[email protected]
with:
name: "snowflake_creds.json"
json: ${{ secrets.SNOWFLAKE_GH_CI_PKEY }}

- name: run tests
run: |
gotestsum --format testname
working-directory: ./flow
env:
TEST_BQ_CREDS: ${{ github.workspace }}/bq_service_account.json
TEST_SF_CREDS: ${{ github.workspace }}/snowflake_creds.json
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(

// Write each QRecord to the OCF file
for _, qRecord := range records.Records {
avroMap, err := qRecord.ToAvroCompatibleMap(&nullable, records.ColumnNames)
avroMap, err := qRecord.ToAvroCompatibleMap(model.QDBTypeBigQuery, &nullable, records.Schema.GetColumnNames())
if err != nil {
return 0, fmt.Errorf("failed to convert QRecord to Avro compatible map: %w", err)
}
Expand Down Expand Up @@ -205,7 +205,7 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
case bigquery.TimestampFieldType:
return map[string]string{
"type": "long",
"logicalType": "timestamp-millis",
"logicalType": "timestamp-micros",
}, nil
case bigquery.DateFieldType:
return map[string]string{
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qrep_sync_method.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
numRowsInserted := 0
for _, qRecord := range records.Records {
toPut := QRecordValueSaver{
ColumnNames: records.ColumnNames,
ColumnNames: records.Schema.GetColumnNames(),
Record: qRecord,
PartitionID: partitionID,
RunID: runID,
Expand Down
61 changes: 52 additions & 9 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,55 @@ func (qe *QRepQueryExecutor) ExecuteQuery(query string, args ...interface{}) (pg
return rows, nil
}

func fieldDescriptionToQValueKind(fd pgconn.FieldDescription) model.QValueKind {
switch fd.DataTypeOID {
case pgtype.BoolOID:
return model.QValueKindBoolean
case pgtype.Int2OID:
return model.QValueKindInt16
case pgtype.Int4OID:
return model.QValueKindInt32
case pgtype.Int8OID:
return model.QValueKindInt64
case pgtype.Float4OID:
return model.QValueKindFloat32
case pgtype.Float8OID:
return model.QValueKindFloat64
case pgtype.TextOID, pgtype.VarcharOID:
return model.QValueKindString
case pgtype.ByteaOID:
return model.QValueKindBytes
case pgtype.JSONOID, pgtype.JSONBOID:
return model.QValueKindJSON
case pgtype.UUIDOID:
return model.QValueKindUUID
case pgtype.TimestampOID, pgtype.TimestamptzOID, pgtype.DateOID, pgtype.TimeOID:
return model.QValueKindETime
case pgtype.NumericOID:
return model.QValueKindNumeric
default:
return model.QValueKindInvalid
}
}

// FieldDescriptionsToSchema converts a slice of pgconn.FieldDescription to a QRecordSchema.
func fieldDescriptionsToSchema(fds []pgconn.FieldDescription) *model.QRecordSchema {
qfields := make([]*model.QField, len(fds))
for i, fd := range fds {
cname := fd.Name
ctype := fieldDescriptionToQValueKind(fd)
// there isn't a way to know if a column is nullable or not
// TODO fix this.
cnullable := true
qfields[i] = &model.QField{
Name: cname,
Type: ctype,
Nullable: cnullable,
}
}
return model.NewQRecordSchema(qfields)
}

func (qe *QRepQueryExecutor) ProcessRows(
rows pgx.Rows,
fieldDescriptions []pgconn.FieldDescription,
Expand All @@ -57,16 +106,10 @@ func (qe *QRepQueryExecutor) ProcessRows(
return nil, fmt.Errorf("row iteration failed: %w", rows.Err())
}

// get col names from fieldDescriptions
colNames := make([]string, len(fieldDescriptions))
for i, fd := range fieldDescriptions {
colNames[i] = fd.Name
}

batch := &model.QRecordBatch{
NumRecords: uint32(len(records)),
Records: records,
ColumnNames: colNames,
NumRecords: uint32(len(records)),
Records: records,
Schema: fieldDescriptionsToSchema(fieldDescriptions),
}

return batch, nil
Expand Down
Loading

0 comments on commit eb8f8fe

Please sign in to comment.