diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 233f319a02..473fa4a5c1 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -458,30 +458,13 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, sync break } if err != nil { - fmt.Printf("Error while iterating through results: %v\n", err) - return nil, err + return nil, fmt.Errorf("Error while iterating through results: %v", err) } resultMap[row.Tablename] = row.UnchangedToastColumns } return resultMap, nil } -// ValueSaver interface for bqRecord -func (r StagingBQRecord) Save() (map[string]bigquery.Value, string, error) { - return map[string]bigquery.Value{ - "_peerdb_uid": r.uid, - "_peerdb_timestamp": r.timestamp, - "_peerdb_timestamp_nanos": r.timestampNanos, - "_peerdb_destination_table_name": r.destinationTableName, - "_peerdb_data": r.data, - "_peerdb_record_type": r.recordType, - "_peerdb_match_data": r.matchData, - "_peerdb_batch_id": r.batchID, - "_peerdb_staging_batch_id": r.stagingBatchID, - "_peerdb_unchanged_toast_columns": r.unchangedToastColumns, - }, bigquery.NoDedupeID, nil -} - // SyncRecords pushes records to the destination. // currently only supports inserts,updates and deletes // more record types will be added in the future. diff --git a/flow/connectors/bigquery/qrecord_value_saver.go b/flow/connectors/bigquery/qrecord_value_saver.go deleted file mode 100644 index 202ac3df4d..0000000000 --- a/flow/connectors/bigquery/qrecord_value_saver.go +++ /dev/null @@ -1,203 +0,0 @@ -package connbigquery - -import ( - "fmt" - "math/big" - - "cloud.google.com/go/bigquery" - "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/google/uuid" -) - -type QRecordValueSaver struct { - ColumnNames []string - Record *model.QRecord - PartitionID string - RunID int64 -} - -// RatToBigQueryNumeric converts a *big.Rat to a decimal string compatible with -// BigQuery's NUMERIC type. -// -// BigQuery's NUMERIC type supports large-scale fixed-point numbers with up to -// 38 digits of precision and 9 digits of scale. This function converts a *big.Rat -// to a decimal string that respects these limits. -// -// The function uses *big.Rat's FloatString method with 9 as the argument, which -// converts the *big.Rat to a string that represents a floating-point number with -// 9 digits after the decimal point. The resulting string can be inserted into a -// NUMERIC field in BigQuery. -// -// Parameters: -// rat: The *big.Rat to convert. This should represent a decimal number with up to -// -// 38 digits of precision and 9 digits of scale. -// -// Returns: -// A string representing the *big.Rat as a decimal number with up to 38 digits -// of precision and 9 digits of scale. This string can be inserted into a NUMERIC -// field in BigQuery. -func RatToBigQueryNumeric(rat *big.Rat) string { - // Convert the *big.Rat to a decimal string with 9 digits of scale - return rat.FloatString(9) // 9 is the scale of the NUMERIC type -} - -func (q QRecordValueSaver) Save() (map[string]bigquery.Value, string, error) { - bqValues := make(map[string]bigquery.Value, q.Record.NumEntries) - - for i, v := range q.Record.Entries { - k := q.ColumnNames[i] - if v.Value == nil { - if v.Kind.IsArray() { - bqValues[k] = make([]interface{}, 0) - } else { - bqValues[k] = nil - } - continue - } - - switch v.Kind { - case qvalue.QValueKindFloat32: - val, ok := v.Value.(float32) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to float64", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindFloat64: - val, ok := v.Value.(float64) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to float64", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindInt16: - switch v.Value.(type) { - case int16: - bqValues[k] = v.Value - case int32: - bqValues[k] = int16(v.Value.(int32)) - case int64: - bqValues[k] = int16(v.Value.(int64)) - default: - return nil, "", fmt.Errorf("failed to convert %v to int16", v.Value) - } - - case qvalue.QValueKindInt32: - val, ok := v.Value.(int32) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to int32", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindInt64: - val, ok := v.Value.(int64) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to int64", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindBoolean: - val, ok := v.Value.(bool) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to bool", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindString: - val, ok := v.Value.(string) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to string", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindTimestamp, qvalue.QValueKindDate, qvalue.QValueKindTime: - var err error - bqValues[k], err = v.GoTimeConvert() - if err != nil { - return nil, "", fmt.Errorf("failed to convert parse %v into time.Time", v) - } - - case qvalue.QValueKindNumeric: - val, ok := v.Value.(*big.Rat) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to *big.Rat", v.Value) - } - - bqValues[k] = RatToBigQueryNumeric(val) - - case qvalue.QValueKindBytes, qvalue.QValueKindBit: - val, ok := v.Value.([]byte) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to []byte", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindUUID: - val, ok := v.Value.([16]byte) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to string", v.Value) - } - uuidVal := uuid.UUID(val) - bqValues[k] = uuidVal.String() - - case qvalue.QValueKindJSON: - val, ok := v.Value.(string) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to string", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindArrayFloat32: - val, ok := v.Value.([]float32) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to []float32", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindArrayFloat64: - val, ok := v.Value.([]float64) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to []float64", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindArrayInt32: - val, ok := v.Value.([]int32) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to []int32", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindArrayInt64: - val, ok := v.Value.([]int64) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to []int64", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindArrayString: - val, ok := v.Value.([]string) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to []string", v.Value) - } - bqValues[k] = val - - default: - // Skip invalid QValueKind, but log the type for debugging - fmt.Printf("[bigquery] Invalid QValueKind: %v\n", v.Kind) - } - } - - // add partition id to the map - bqValues["PartitionID"] = q.PartitionID - - // add run id to the map - bqValues["RunID"] = q.RunID - - // log the bigquery values - // fmt.Printf("BigQuery Values: %v\n", bqValues) - - return bqValues, "", nil -} diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index 82a13b691f..d062440b3c 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -114,7 +114,6 @@ func (g *GenericSQLQueryExecutor) CreateTable(schema *model.QRecordSchema, schem } command := fmt.Sprintf("CREATE TABLE %s.%s (%s)", schemaName, tableName, strings.Join(fields, ", ")) - fmt.Printf("creating table %s.%s with command %s\n", schemaName, tableName, command) _, err := g.db.ExecContext(g.ctx, command) if err != nil { diff --git a/flow/model/qrecord.go b/flow/model/qrecord.go index 5b44808be7..a1483b6e1d 100644 --- a/flow/model/qrecord.go +++ b/flow/model/qrecord.go @@ -1,8 +1,6 @@ package model import ( - "fmt" - "github.com/PeerDB-io/peer-flow/model/qvalue" ) @@ -23,21 +21,3 @@ func NewQRecord(n int) *QRecord { func (q *QRecord) Set(idx int, value qvalue.QValue) { q.Entries[idx] = value } - -// equals checks if two QRecords are identical. -func (q *QRecord) equals(other *QRecord) bool { - // First check simple attributes - if q.NumEntries != other.NumEntries { - return false - } - - for i, entry := range q.Entries { - otherEntry := other.Entries[i] - if !entry.Equals(otherEntry) { - fmt.Printf("entry %d: %v != %v\n", i, entry, otherEntry) - return false - } - } - - return true -} diff --git a/flow/model/qrecord_batch.go b/flow/model/qrecord_batch.go index e2990a28ae..447ac210c9 100644 --- a/flow/model/qrecord_batch.go +++ b/flow/model/qrecord_batch.go @@ -18,42 +18,6 @@ type QRecordBatch struct { Schema *QRecordSchema } -// Equals checks if two QRecordBatches are identical. -func (q *QRecordBatch) Equals(other *QRecordBatch) bool { - if other == nil { - fmt.Printf("other is nil") - return q == nil - } - - // First check simple attributes - if q.NumRecords != other.NumRecords { - // print num records - fmt.Printf("q.NumRecords: %d\n", q.NumRecords) - fmt.Printf("other.NumRecords: %d\n", other.NumRecords) - return false - } - - // Compare column names - if !q.Schema.EqualNames(other.Schema) { - fmt.Printf("Column names are not equal\n") - fmt.Printf("Schema 1: %v\n", q.Schema.GetColumnNames()) - fmt.Printf("Schema 2: %v\n", other.Schema.GetColumnNames()) - return false - } - - // Compare records - for i, record := range q.Records { - if !record.equals(other.Records[i]) { - fmt.Printf("Record %d is not equal\n", i) - fmt.Printf("Record 1: %v\n", record) - fmt.Printf("Record 2: %v\n", other.Records[i]) - return false - } - } - - return true -} - func (q *QRecordBatch) ToQRecordStream(buffer int) (*QRecordStream, error) { stream := NewQRecordStream(buffer) diff --git a/flow/model/qrecord_test.go b/flow/model/qrecord_test.go index 50ca08a6a7..11499430cb 100644 --- a/flow/model/qrecord_test.go +++ b/flow/model/qrecord_test.go @@ -9,6 +9,60 @@ import ( "github.com/stretchr/testify/assert" ) +// Equals checks if two QRecordBatches are identical. +func (q *QRecordBatch) Equals(other *QRecordBatch) bool { + if other == nil { + fmt.Printf("other is nil") + return q == nil + } + + // First check simple attributes + if q.NumRecords != other.NumRecords { + // print num records + fmt.Printf("q.NumRecords: %d\n", q.NumRecords) + fmt.Printf("other.NumRecords: %d\n", other.NumRecords) + return false + } + + // Compare column names + if !q.Schema.EqualNames(other.Schema) { + fmt.Printf("Column names are not equal\n") + fmt.Printf("Schema 1: %v\n", q.Schema.GetColumnNames()) + fmt.Printf("Schema 2: %v\n", other.Schema.GetColumnNames()) + return false + } + + // Compare records + for i, record := range q.Records { + if !record.equals(other.Records[i]) { + fmt.Printf("Record %d is not equal\n", i) + fmt.Printf("Record 1: %v\n", record) + fmt.Printf("Record 2: %v\n", other.Records[i]) + return false + } + } + + return true +} + +// equals checks if two QRecords are identical. +func (q *QRecord) equals(other *QRecord) bool { + // First check simple attributes + if q.NumEntries != other.NumEntries { + return false + } + + for i, entry := range q.Entries { + otherEntry := other.Entries[i] + if !entry.Equals(otherEntry) { + fmt.Printf("entry %d: %v != %v\n", i, entry, otherEntry) + return false + } + } + + return true +} + func TestEquals(t *testing.T) { uuidVal1, _ := uuid.NewRandom() uuidVal2, _ := uuid.NewRandom()