Skip to content

Commit

Permalink
Remove fmt.Print from non-test files
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 17, 2023
1 parent 84f84d3 commit 7c60d5b
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 278 deletions.
19 changes: 1 addition & 18 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,30 +458,13 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, sync
break
}
if err != nil {
fmt.Printf("Error while iterating through results: %v\n", err)
return nil, err
return nil, fmt.Errorf("Error while iterating through results: %v", err)
}
resultMap[row.Tablename] = row.UnchangedToastColumns
}
return resultMap, nil
}

// ValueSaver interface for bqRecord
func (r StagingBQRecord) Save() (map[string]bigquery.Value, string, error) {
return map[string]bigquery.Value{
"_peerdb_uid": r.uid,
"_peerdb_timestamp": r.timestamp,
"_peerdb_timestamp_nanos": r.timestampNanos,
"_peerdb_destination_table_name": r.destinationTableName,
"_peerdb_data": r.data,
"_peerdb_record_type": r.recordType,
"_peerdb_match_data": r.matchData,
"_peerdb_batch_id": r.batchID,
"_peerdb_staging_batch_id": r.stagingBatchID,
"_peerdb_unchanged_toast_columns": r.unchangedToastColumns,
}, bigquery.NoDedupeID, nil
}

// SyncRecords pushes records to the destination.
// currently only supports inserts,updates and deletes
// more record types will be added in the future.
Expand Down
203 changes: 0 additions & 203 deletions flow/connectors/bigquery/qrecord_value_saver.go

This file was deleted.

1 change: 0 additions & 1 deletion flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ func (g *GenericSQLQueryExecutor) CreateTable(schema *model.QRecordSchema, schem
}

command := fmt.Sprintf("CREATE TABLE %s.%s (%s)", schemaName, tableName, strings.Join(fields, ", "))
fmt.Printf("creating table %s.%s with command %s\n", schemaName, tableName, command)

_, err := g.db.ExecContext(g.ctx, command)
if err != nil {
Expand Down
20 changes: 0 additions & 20 deletions flow/model/qrecord.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package model

import (
"fmt"

"github.com/PeerDB-io/peer-flow/model/qvalue"
)

Expand All @@ -23,21 +21,3 @@ func NewQRecord(n int) *QRecord {
func (q *QRecord) Set(idx int, value qvalue.QValue) {
q.Entries[idx] = value
}

// equals checks if two QRecords are identical.
func (q *QRecord) equals(other *QRecord) bool {
// First check simple attributes
if q.NumEntries != other.NumEntries {
return false
}

for i, entry := range q.Entries {
otherEntry := other.Entries[i]
if !entry.Equals(otherEntry) {
fmt.Printf("entry %d: %v != %v\n", i, entry, otherEntry)
return false
}
}

return true
}
36 changes: 0 additions & 36 deletions flow/model/qrecord_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,6 @@ type QRecordBatch struct {
Schema *QRecordSchema
}

// Equals checks if two QRecordBatches are identical.
func (q *QRecordBatch) Equals(other *QRecordBatch) bool {
if other == nil {
fmt.Printf("other is nil")
return q == nil
}

// First check simple attributes
if q.NumRecords != other.NumRecords {
// print num records
fmt.Printf("q.NumRecords: %d\n", q.NumRecords)
fmt.Printf("other.NumRecords: %d\n", other.NumRecords)
return false
}

// Compare column names
if !q.Schema.EqualNames(other.Schema) {
fmt.Printf("Column names are not equal\n")
fmt.Printf("Schema 1: %v\n", q.Schema.GetColumnNames())
fmt.Printf("Schema 2: %v\n", other.Schema.GetColumnNames())
return false
}

// Compare records
for i, record := range q.Records {
if !record.equals(other.Records[i]) {
fmt.Printf("Record %d is not equal\n", i)
fmt.Printf("Record 1: %v\n", record)
fmt.Printf("Record 2: %v\n", other.Records[i])
return false
}
}

return true
}

func (q *QRecordBatch) ToQRecordStream(buffer int) (*QRecordStream, error) {
stream := NewQRecordStream(buffer)

Expand Down
54 changes: 54 additions & 0 deletions flow/model/qrecord_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,60 @@ import (
"github.com/stretchr/testify/assert"
)

// Equals checks if two QRecordBatches are identical.
func (q *QRecordBatch) Equals(other *QRecordBatch) bool {
if other == nil {
fmt.Printf("other is nil")
return q == nil
}

// First check simple attributes
if q.NumRecords != other.NumRecords {
// print num records
fmt.Printf("q.NumRecords: %d\n", q.NumRecords)
fmt.Printf("other.NumRecords: %d\n", other.NumRecords)
return false
}

// Compare column names
if !q.Schema.EqualNames(other.Schema) {
fmt.Printf("Column names are not equal\n")
fmt.Printf("Schema 1: %v\n", q.Schema.GetColumnNames())
fmt.Printf("Schema 2: %v\n", other.Schema.GetColumnNames())
return false
}

// Compare records
for i, record := range q.Records {
if !record.equals(other.Records[i]) {
fmt.Printf("Record %d is not equal\n", i)
fmt.Printf("Record 1: %v\n", record)
fmt.Printf("Record 2: %v\n", other.Records[i])
return false
}
}

return true
}

// equals checks if two QRecords are identical.
func (q *QRecord) equals(other *QRecord) bool {
// First check simple attributes
if q.NumEntries != other.NumEntries {
return false
}

for i, entry := range q.Entries {
otherEntry := other.Entries[i]
if !entry.Equals(otherEntry) {
fmt.Printf("entry %d: %v != %v\n", i, entry, otherEntry)
return false
}
}

return true
}

func TestEquals(t *testing.T) {
uuidVal1, _ := uuid.NewRandom()
uuidVal2, _ := uuid.NewRandom()
Expand Down

0 comments on commit 7c60d5b

Please sign in to comment.