diff --git a/flow/.golangci.yml b/flow/.golangci.yml index eee69b73a1..638fab22f9 100644 --- a/flow/.golangci.yml +++ b/flow/.golangci.yml @@ -7,6 +7,7 @@ linters: - durationcheck - errcheck - forbidigo + - gci - gocritic - gofumpt - gosec @@ -14,9 +15,11 @@ linters: - ineffassign - lll - misspell + - musttag - nakedret - nolintlint - nonamedreturns + - perfsprint - prealloc - staticcheck - stylecheck @@ -30,6 +33,11 @@ linters: - wastedassign - whitespace linters-settings: + gci: + sections: + - standard + - 'prefix(github.com/PeerDB-io)' + - default gocritic: disabled-checks: - ifElseChain diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index f9830d479d..b4c8911ed3 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -9,6 +9,14 @@ import ( "sync" "time" + "github.com/jackc/pglogrepl" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" + "go.temporal.io/sdk/activity" + "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/proto" + "github.com/PeerDB-io/peer-flow/connectors" connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" @@ -20,13 +28,6 @@ import ( "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" "github.com/PeerDB-io/peer-flow/shared/alerting" - "github.com/jackc/pglogrepl" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgtype" - "github.com/jackc/pgx/v5/pgxpool" - "go.temporal.io/sdk/activity" - "golang.org/x/sync/errgroup" - "google.golang.org/protobuf/proto" ) // CheckConnectionResult is the result of a CheckConnection call. @@ -577,7 +578,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords) if err != nil { - slog.ErrorContext(ctx, fmt.Sprintf("%v", err)) + slog.ErrorContext(ctx, err.Error()) goroutineErr = err } } @@ -966,7 +967,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, err = monitoring.UpdatePullEndTimeAndRowsForPartition( errCtx, a.CatalogPool, runUUID, partition, int64(numRecords)) if err != nil { - slog.Error(fmt.Sprintf("%v", err)) + slog.Error(err.Error()) return err } diff --git a/flow/cmd/api.go b/flow/cmd/api.go index f466a6b9f9..1e6d3c6d17 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -10,21 +10,20 @@ import ( "net/http" "time" - utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/shared" - peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/google/uuid" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" - - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/reflection" - "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/reflection" + + utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" + peerflow "github.com/PeerDB-io/peer-flow/workflows" ) type APIServerParams struct { diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 1cee1fd5fc..93d5ff1ea6 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -8,15 +8,16 @@ import ( "strings" "time" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/shared" - peerflow "github.com/PeerDB-io/peer-flow/workflows" backoff "github.com/cenkalti/backoff/v4" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "go.temporal.io/sdk/client" "google.golang.org/protobuf/proto" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" + peerflow "github.com/PeerDB-io/peer-flow/workflows" ) // grpc server implementation diff --git a/flow/cmd/main.go b/flow/cmd/main.go index 79741669f4..1d924e3ccc 100644 --- a/flow/cmd/main.go +++ b/flow/cmd/main.go @@ -8,9 +8,10 @@ import ( "os/signal" "syscall" - "github.com/PeerDB-io/peer-flow/logger" "github.com/urfave/cli/v3" _ "go.uber.org/automaxprocs" + + "github.com/PeerDB-io/peer-flow/logger" ) func main() { diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 1e17495085..df7862d8fe 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -6,11 +6,12 @@ import ( "fmt" "log/slog" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5/pgtype" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" ) func (h *FlowRequestHandler) MirrorStatus( diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index f91f37bf8c..110b9b5a7f 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -6,13 +6,14 @@ import ( "fmt" "log/slog" - connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" - "github.com/PeerDB-io/peer-flow/connectors/utils" - "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "google.golang.org/protobuf/proto" + + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" + "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/generated/protos" ) func (h *FlowRequestHandler) getPGPeerConfig(ctx context.Context, peerName string) (*protos.PostgresConfig, error) { diff --git a/flow/cmd/snapshot_worker.go b/flow/cmd/snapshot_worker.go index 9239cf7276..d76a095ace 100644 --- a/flow/cmd/snapshot_worker.go +++ b/flow/cmd/snapshot_worker.go @@ -4,14 +4,14 @@ import ( "crypto/tls" "fmt" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "github.com/PeerDB-io/peer-flow/activities" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/shared" "github.com/PeerDB-io/peer-flow/shared/alerting" peerflow "github.com/PeerDB-io/peer-flow/workflows" - - "go.temporal.io/sdk/client" - "go.temporal.io/sdk/worker" ) type SnapshotWorkerOptions struct { diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index f060230b63..aacc7d85bc 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -10,16 +10,15 @@ import ( "runtime" "syscall" + "github.com/grafana/pyroscope-go" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "github.com/PeerDB-io/peer-flow/activities" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/shared" "github.com/PeerDB-io/peer-flow/shared/alerting" peerflow "github.com/PeerDB-io/peer-flow/workflows" - - "github.com/grafana/pyroscope-go" - - "go.temporal.io/sdk/client" - "go.temporal.io/sdk/worker" ) type WorkerOptions struct { diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 3d7e27d863..3e6499d513 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -12,17 +12,17 @@ import ( "cloud.google.com/go/bigquery" "cloud.google.com/go/storage" + "github.com/jackc/pgx/v5/pgxpool" + "go.temporal.io/sdk/activity" + "google.golang.org/api/iterator" + "google.golang.org/api/option" + "github.com/PeerDB-io/peer-flow/connectors/utils" cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" - "github.com/jackc/pgx/v5/pgxpool" - - "go.temporal.io/sdk/activity" - "google.golang.org/api/iterator" - "google.golang.org/api/option" ) const ( diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index e93a139a73..d87a83a290 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -5,6 +5,7 @@ import ( "strings" "cloud.google.com/go/bigquery" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index 305bab01eb..4720e60cee 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -8,12 +8,12 @@ import ( "time" "cloud.google.com/go/bigquery" + "google.golang.org/api/iterator" + "google.golang.org/protobuf/encoding/protojson" + "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" - - "google.golang.org/api/iterator" - "google.golang.org/protobuf/encoding/protojson" ) func (c *BigQueryConnector) SyncQRepRecords( diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index ce51ed1e2c..ab20343626 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -5,17 +5,19 @@ import ( "fmt" "log/slog" "os" + "strconv" "strings" "time" "cloud.google.com/go/bigquery" + "go.temporal.io/sdk/activity" + "github.com/PeerDB-io/peer-flow/connectors/utils" avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" - "go.temporal.io/sdk/activity" ) type QRepAvroSyncMethod struct { @@ -53,8 +55,8 @@ func (s *QRepAvroSyncMethod) SyncRecords( return nil, fmt.Errorf("failed to define Avro schema: %w", err) } - stagingTable := fmt.Sprintf("%s_%s_staging", rawTableName, fmt.Sprint(syncBatchID)) - numRecords, err := s.writeToStage(fmt.Sprint(syncBatchID), rawTableName, avroSchema, + stagingTable := fmt.Sprintf("%s_%s_staging", rawTableName, strconv.FormatInt(syncBatchID, 10)) + numRecords, err := s.writeToStage(strconv.FormatInt(syncBatchID, 10), rawTableName, avroSchema, &datasetTable{ dataset: s.connector.datasetID, table: stagingTable, @@ -105,7 +107,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( // just log the error this isn't fatal. slog.Error("failed to delete staging table "+stagingTable, slog.Any("error", err), - slog.String("syncBatchID", fmt.Sprint(syncBatchID)), + slog.Int64("syncBatchID", syncBatchID), slog.String("destinationTable", rawTableName)) } @@ -114,7 +116,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( slog.String("dstTableName", rawTableName)) return &model.SyncResponse{ - LastSyncedCheckPointID: lastCP, + LastSyncedCheckpointID: lastCP, NumRecordsSynced: int64(numRecords), CurrentSyncBatchID: syncBatchID, TableNameRowsMapping: tableNameRowsMapping, diff --git a/flow/connectors/bigquery/qvalue_convert.go b/flow/connectors/bigquery/qvalue_convert.go index d4e5032182..75fa2971b5 100644 --- a/flow/connectors/bigquery/qvalue_convert.go +++ b/flow/connectors/bigquery/qvalue_convert.go @@ -4,6 +4,7 @@ import ( "fmt" "cloud.google.com/go/bigquery" + "github.com/PeerDB-io/peer-flow/model/qvalue" ) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 59f58b1379..6b3740935a 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -8,6 +8,7 @@ import ( _ "github.com/ClickHouse/clickhouse-go/v2" _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" ) diff --git a/flow/connectors/clickhouse/client.go b/flow/connectors/clickhouse/client.go index 9aa14cd57c..8bd5a0221e 100644 --- a/flow/connectors/clickhouse/client.go +++ b/flow/connectors/clickhouse/client.go @@ -4,10 +4,11 @@ import ( "context" "fmt" + "github.com/jmoiron/sqlx" + peersql "github.com/PeerDB-io/peer-flow/connectors/sql" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/jmoiron/sqlx" ) type ClickhouseClient struct { diff --git a/flow/connectors/clickhouse/qrep.go b/flow/connectors/clickhouse/qrep.go index 57f2f94f26..0695d955bf 100644 --- a/flow/connectors/clickhouse/qrep.go +++ b/flow/connectors/clickhouse/qrep.go @@ -7,13 +7,14 @@ import ( "strings" "time" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "google.golang.org/protobuf/encoding/protojson" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/s3" - "google.golang.org/protobuf/encoding/protojson" ) const qRepMetadataTableName = "_peerdb_query_replication_metadata" diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 2adb391457..84c5d2eb89 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -6,13 +6,14 @@ import ( "log/slog" "time" + "go.temporal.io/sdk/activity" + "github.com/PeerDB-io/peer-flow/connectors/utils" avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" - "go.temporal.io/sdk/activity" ) type ClickhouseAvroSyncMethod struct { diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 8abc6f3bf8..1e28822181 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -6,6 +6,8 @@ import ( "fmt" "log/slog" + "github.com/jackc/pgx/v5/pgxpool" + connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery" connclickhouse "github.com/PeerDB-io/peer-flow/connectors/clickhouse" conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub" @@ -15,7 +17,6 @@ import ( connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" - "github.com/jackc/pgx/v5/pgxpool" ) var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality") diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index f60cb547c7..1a2ecb4a58 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -9,6 +9,7 @@ import ( "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" + metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -155,7 +156,7 @@ func (c *EventHubConnector) processBatch( numRecords.Add(1) - recordLSN := record.GetCheckPointID() + recordLSN := record.GetCheckpointID() if recordLSN > lastSeenLSN { lastSeenLSN = recordLSN } @@ -257,7 +258,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S return &model.SyncResponse{ CurrentSyncBatchID: syncBatchID, - LastSyncedCheckPointID: lastCheckpoint, + LastSyncedCheckpointID: lastCheckpoint, NumRecordsSynced: rowsSynced, TableNameRowsMapping: make(map[string]uint32), TableSchemaDeltas: req.Records.WaitForSchemaDeltas(req.TableMappings), diff --git a/flow/connectors/eventhub/hub_batches.go b/flow/connectors/eventhub/hub_batches.go index 5634173faf..e96d0e6896 100644 --- a/flow/connectors/eventhub/hub_batches.go +++ b/flow/connectors/eventhub/hub_batches.go @@ -9,6 +9,7 @@ import ( "time" azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + "github.com/PeerDB-io/peer-flow/shared" ) diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 3635544d3c..7f6d206728 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -12,9 +12,10 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub" + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" - cmap "github.com/orcaman/concurrent-map/v2" ) type EventHubManager struct { diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index b26aacf637..4bcfcaa583 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -5,12 +5,13 @@ import ( "fmt" "log/slog" + "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/PeerDB-io/peer-flow/connectors/utils" cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" - "github.com/jackc/pgx/v5/pgtype" - "github.com/jackc/pgx/v5/pgxpool" ) const ( diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 73250c3a31..7959e5ff67 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -3,19 +3,11 @@ package connpostgres import ( "context" "crypto/sha256" - "encoding/json" "fmt" "log/slog" "regexp" "time" - "github.com/PeerDB-io/peer-flow/connectors/utils" - "github.com/PeerDB-io/peer-flow/connectors/utils/cdc_records" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/geo" - "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" @@ -24,6 +16,14 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/lib/pq/oid" "go.temporal.io/sdk/activity" + + "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/connectors/utils/cdc_records" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/geo" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/shared" ) const maxRetriesForWalSegmentRemoved = 5 @@ -599,7 +599,7 @@ func (p *PostgresCDCSource) processInsertMessage( } return &model.InsertRecord{ - CheckPointID: int64(lsn), + CheckpointID: int64(lsn), Items: items, DestinationTableName: p.TableNameMapping[tableName].Name, SourceTableName: tableName, @@ -640,7 +640,7 @@ func (p *PostgresCDCSource) processUpdateMessage( } return &model.UpdateRecord{ - CheckPointID: int64(lsn), + CheckpointID: int64(lsn), OldItems: oldItems, NewItems: newItems, DestinationTableName: p.TableNameMapping[tableName].Name, @@ -677,7 +677,7 @@ func (p *PostgresCDCSource) processDeleteMessage( } return &model.DeleteRecord{ - CheckPointID: int64(lsn), + CheckpointID: int64(lsn), Items: items, DestinationTableName: p.TableNameMapping[tableName].Name, SourceTableName: tableName, @@ -809,16 +809,12 @@ func (p *PostgresCDCSource) auditSchemaDelta(flowJobName string, rec *model.Rela activityInfo := activity.GetInfo(p.ctx) workflowID := activityInfo.WorkflowExecution.ID runID := activityInfo.WorkflowExecution.RunID - recJSON, err := json.Marshal(rec) - if err != nil { - return fmt.Errorf("failed to marshal schema delta to JSON: %w", err) - } - _, err = p.catalogPool.Exec(p.ctx, + _, err := p.catalogPool.Exec(p.ctx, `INSERT INTO peerdb_stats.schema_deltas_audit_log(flow_job_name,workflow_id,run_id,delta_info) VALUES($1,$2,$3,$4)`, - flowJobName, workflowID, runID, recJSON) + flowJobName, workflowID, runID, rec) if err != nil { return fmt.Errorf("failed to insert row into table: %w", err) } @@ -885,7 +881,7 @@ func (p *PostgresCDCSource) processRelationMessage( p.relationMessageMapping[currRel.RelationId] = currRel rec := &model.RelationRecord{ TableSchemaDelta: schemaDelta, - CheckPointID: int64(lsn), + CheckpointID: int64(lsn), } return rec, p.auditSchemaDelta(p.flowJobName, rec) } diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 2527a560c8..eb0b3e7619 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -7,13 +7,14 @@ import ( "strconv" "strings" - "github.com/PeerDB-io/peer-flow/connectors/utils" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/model" "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/lib/pq/oid" + + "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" ) type PGVersion int diff --git a/flow/connectors/postgres/normalize_stmt_generator.go b/flow/connectors/postgres/normalize_stmt_generator.go index 083021926a..3792c188af 100644 --- a/flow/connectors/postgres/normalize_stmt_generator.go +++ b/flow/connectors/postgres/normalize_stmt_generator.go @@ -6,10 +6,11 @@ import ( "slices" "strings" + "golang.org/x/exp/maps" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" - "golang.org/x/exp/maps" ) type normalizeStmtGenerator struct { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 98b4382df6..c7c7dd7feb 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -8,17 +8,18 @@ import ( "strings" "time" + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" - "github.com/google/uuid" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgconn" - "github.com/jackc/pgx/v5/pgtype" - "github.com/jackc/pgx/v5/pgxpool" ) // PostgresConnector is a Connector implementation for Postgres. @@ -362,7 +363,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S if len(records) == 0 { return &model.SyncResponse{ - LastSyncedCheckPointID: 0, + LastSyncedCheckpointID: 0, NumRecordsSynced: 0, }, nil } @@ -412,7 +413,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S } return &model.SyncResponse{ - LastSyncedCheckPointID: lastCP, + LastSyncedCheckpointID: lastCP, NumRecordsSynced: int64(len(records)), CurrentSyncBatchID: syncBatchID, TableNameRowsMapping: tableNameRowsMapping, diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index e912201533..4c3b012243 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -6,13 +6,14 @@ import ( "strings" "testing" + "github.com/jackc/pgx/v5" + "github.com/stretchr/testify/require" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" - "github.com/jackc/pgx/v5" - "github.com/stretchr/testify/require" ) type PostgresSchemaDeltaTestSuite struct { diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index f34e0a13bd..51cb6e30e9 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -8,14 +8,15 @@ import ( "text/template" "time" + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "github.com/PeerDB-io/peer-flow/connectors/utils" partition_utils "github.com/PeerDB-io/peer-flow/connectors/utils/partition" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" - "github.com/google/uuid" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgtype" ) const qRepMetadataTableName = "_peerdb_query_replication_metadata" diff --git a/flow/connectors/postgres/qrep_partition_test.go b/flow/connectors/postgres/qrep_partition_test.go index b18f9c2574..f99b248ec4 100644 --- a/flow/connectors/postgres/qrep_partition_test.go +++ b/flow/connectors/postgres/qrep_partition_test.go @@ -7,10 +7,11 @@ import ( "testing" "time" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5/pgxpool" "github.com/stretchr/testify/assert" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" ) type testCase struct { diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 506668c9be..52648249e3 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -5,17 +5,17 @@ import ( "fmt" "log/slog" - "github.com/PeerDB-io/peer-flow/connectors/utils" - "github.com/PeerDB-io/peer-flow/geo" - "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" - "go.temporal.io/sdk/activity" + + "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/geo" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/shared" ) type QRepQueryExecutor struct { diff --git a/flow/connectors/postgres/qrep_sql_sync.go b/flow/connectors/postgres/qrep_sql_sync.go index b8bcf38687..e75e38a970 100644 --- a/flow/connectors/postgres/qrep_sql_sync.go +++ b/flow/connectors/postgres/qrep_sql_sync.go @@ -7,14 +7,14 @@ import ( "strings" "time" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "google.golang.org/protobuf/encoding/protojson" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" - - "google.golang.org/protobuf/encoding/protojson" ) type QRepSyncMethod interface { diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index d357495810..d80baf69b1 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -10,9 +10,10 @@ import ( "strings" "time" - "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/jackc/pgx/v5/pgtype" "github.com/lib/pq/oid" + + "github.com/PeerDB-io/peer-flow/model/qvalue" ) func postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind { @@ -262,8 +263,7 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( val = qvalue.QValue{Kind: qvalue.QValueKindFloat64, Value: floatVal} case qvalue.QValueKindString: // handling all unsupported types with strings as well for now. - textVal := value - val = qvalue.QValue{Kind: qvalue.QValueKindString, Value: fmt.Sprint(textVal)} + val = qvalue.QValue{Kind: qvalue.QValueKindString, Value: fmt.Sprint(value)} case qvalue.QValueKindUUID: switch value.(type) { case string: diff --git a/flow/connectors/postgres/ssh_wrapped_pool.go b/flow/connectors/postgres/ssh_wrapped_pool.go index 1c5511fc49..4f17116ea4 100644 --- a/flow/connectors/postgres/ssh_wrapped_pool.go +++ b/flow/connectors/postgres/ssh_wrapped_pool.go @@ -8,10 +8,11 @@ import ( "sync" "time" - "github.com/PeerDB-io/peer-flow/connectors/utils" - "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/jackc/pgx/v5/pgxpool" "golang.org/x/crypto/ssh" + + "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/generated/protos" ) type SSHWrappedPostgresPool struct { diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index d24175ed45..2a3cf640f9 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -4,16 +4,18 @@ import ( "context" "fmt" "log/slog" + "strconv" "strings" "time" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/s3" ) const ( @@ -200,7 +202,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes DestinationTableIdentifier: fmt.Sprintf("raw_table_%s", req.FlowJobName), } partition := &protos.QRepPartition{ - PartitionId: fmt.Sprint(syncBatchID), + PartitionId: strconv.FormatInt(syncBatchID, 10), } numRecords, err := c.SyncQRepRecords(qrepConfig, partition, recordStream) if err != nil { @@ -225,7 +227,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes } return &model.SyncResponse{ - LastSyncedCheckPointID: lastCheckpoint, + LastSyncedCheckpointID: lastCheckpoint, NumRecordsSynced: int64(numRecords), TableNameRowsMapping: tableNameRowsMapping, TableSchemaDeltas: req.Records.WaitForSchemaDeltas(req.TableMappings), diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index 8a5753680c..1e531ca3ac 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -8,11 +8,12 @@ import ( "testing" "time" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/google/uuid" - "github.com/stretchr/testify/require" ) // createQValue creates a QValue of the appropriate kind for a given placeholder. diff --git a/flow/connectors/snowflake/client.go b/flow/connectors/snowflake/client.go index 089965d573..82a09cb1ed 100644 --- a/flow/connectors/snowflake/client.go +++ b/flow/connectors/snowflake/client.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/jackc/pgx/v5/pgtype" "github.com/jmoiron/sqlx" "github.com/snowflakedb/gosnowflake" @@ -14,7 +15,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" - "github.com/jackc/pgx/v5/pgtype" ) type SnowflakeClient struct { diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index 215fdeab41..b4c2642f57 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -7,14 +7,15 @@ import ( "strings" "time" - "github.com/PeerDB-io/peer-flow/connectors/utils" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/shared" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/jackc/pgx/v5/pgtype" "google.golang.org/protobuf/encoding/protojson" + + "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/shared" ) const qRepMetadataTableName = "_peerdb_query_replication_metadata" diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 8355ab8bb2..3c330d636c 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -8,14 +8,15 @@ import ( "strings" "time" + _ "github.com/snowflakedb/gosnowflake" + "go.temporal.io/sdk/activity" + "github.com/PeerDB-io/peer-flow/connectors/utils" avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" - _ "github.com/snowflakedb/gosnowflake" - "go.temporal.io/sdk/activity" ) type SnowflakeAvroSyncHandler struct { diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 23b335f8f8..da6affa827 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -12,15 +12,16 @@ import ( "sync/atomic" "time" + "github.com/jackc/pgx/v5/pgtype" + "github.com/snowflakedb/gosnowflake" + "go.temporal.io/sdk/activity" + "golang.org/x/sync/errgroup" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" - "github.com/jackc/pgx/v5/pgtype" - "github.com/snowflakedb/gosnowflake" - "go.temporal.io/sdk/activity" - "golang.org/x/sync/errgroup" ) const ( @@ -510,7 +511,7 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model. }() // updating metadata with new offset and syncBatchID - err = c.updateSyncMetadata(req.FlowJobName, res.LastSyncedCheckPointID, syncBatchID, syncRecordsTx) + err = c.updateSyncMetadata(req.FlowJobName, res.LastSyncedCheckpointID, syncBatchID, syncRecordsTx) if err != nil { return nil, err } @@ -564,7 +565,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( } return &model.SyncResponse{ - LastSyncedCheckPointID: lastCheckpoint, + LastSyncedCheckpointID: lastCheckpoint, NumRecordsSynced: int64(numRecords), CurrentSyncBatchID: syncBatchID, TableNameRowsMapping: tableNameRowsMapping, diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index 751230a643..b5e699e067 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -9,14 +9,14 @@ import ( "math/big" "strings" - "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgtype" "github.com/jmoiron/sqlx" - "go.temporal.io/sdk/activity" + + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/shared" ) type SQLQueryExecutor interface { diff --git a/flow/connectors/sqlserver/qrep.go b/flow/connectors/sqlserver/qrep.go index 6a75373597..33474ef490 100644 --- a/flow/connectors/sqlserver/qrep.go +++ b/flow/connectors/sqlserver/qrep.go @@ -6,12 +6,13 @@ import ( "log/slog" "text/template" - utils "github.com/PeerDB-io/peer-flow/connectors/utils/partition" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/model" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgtype" "github.com/jmoiron/sqlx" + + utils "github.com/PeerDB-io/peer-flow/connectors/utils/partition" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" ) func (c *SQLServerConnector) GetQRepPartitions( diff --git a/flow/connectors/sqlserver/sqlserver.go b/flow/connectors/sqlserver/sqlserver.go index 6e9235431b..805dcfb849 100644 --- a/flow/connectors/sqlserver/sqlserver.go +++ b/flow/connectors/sqlserver/sqlserver.go @@ -5,11 +5,12 @@ import ( "fmt" "log/slog" + "github.com/jmoiron/sqlx" + _ "github.com/microsoft/go-mssqldb" + peersql "github.com/PeerDB-io/peer-flow/connectors/sql" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" - "github.com/jmoiron/sqlx" - _ "github.com/microsoft/go-mssqldb" ) type SQLServerConnector struct { diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index e96a642dae..6148ae2a6a 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -9,10 +9,6 @@ import ( "os" "sync/atomic" - "github.com/PeerDB-io/peer-flow/connectors/utils" - "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -20,6 +16,10 @@ import ( "github.com/klauspost/compress/snappy" "github.com/klauspost/compress/zstd" "github.com/linkedin/goavro/v2" + + "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" ) type ( diff --git a/flow/connectors/utils/aws.go b/flow/connectors/utils/aws.go index 6e6cbed022..63adfa8330 100644 --- a/flow/connectors/utils/aws.go +++ b/flow/connectors/utils/aws.go @@ -22,11 +22,11 @@ type AWSSecrets struct { } type S3PeerCredentials struct { - AccessKeyID string - SecretAccessKey string - AwsRoleArn string - Region string - Endpoint string + AccessKeyID string `json:"accessKeyId"` + SecretAccessKey string `json:"secretAccessKey"` + AwsRoleArn string `json:"awsRoleArn"` + Region string `json:"region"` + Endpoint string `json:"endpoint"` } func GetAWSSecrets(creds S3PeerCredentials) (*AWSSecrets, error) { diff --git a/flow/connectors/utils/catalog/env.go b/flow/connectors/utils/catalog/env.go index f5c8e0507d..5a12172022 100644 --- a/flow/connectors/utils/catalog/env.go +++ b/flow/connectors/utils/catalog/env.go @@ -5,10 +5,11 @@ import ( "fmt" "sync" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/peerdbenv" - "github.com/jackc/pgx/v5/pgxpool" ) var ( diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage.go b/flow/connectors/utils/cdc_records/cdc_records_storage.go index c6dece9eb9..a1bde8d614 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage.go @@ -10,10 +10,11 @@ import ( "os" "time" + "github.com/cockroachdb/pebble" + "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" - "github.com/cockroachdb/pebble" ) func encVal(val any) ([]byte, error) { diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go index 1d7afae08c..9f0b7a9f6e 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go @@ -6,9 +6,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/stretchr/testify/require" ) func getTimeForTesting(t *testing.T) time.Time { @@ -47,7 +48,7 @@ func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) { rec := &model.InsertRecord{ SourceTableName: "test_src_tbl", DestinationTableName: "test_dst_tbl", - CheckPointID: 1, + CheckpointID: 1, CommitID: 2, Items: &model.RecordItems{ ColToValIdx: map[string]int{ diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index 7e9263cf26..5412323bfc 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -4,16 +4,17 @@ import ( "context" "fmt" "log/slog" + "strconv" "time" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" - "google.golang.org/protobuf/proto" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" ) type CDCBatchInfo struct { @@ -233,8 +234,8 @@ func addPartitionToQRepRun(ctx context.Context, pool *pgxpool.Pool, flowJobName var rangeStart, rangeEnd string switch x := partition.Range.Range.(type) { case *protos.PartitionRange_IntRange: - rangeStart = fmt.Sprint(x.IntRange.Start) - rangeEnd = fmt.Sprint(x.IntRange.End) + rangeStart = strconv.FormatInt(x.IntRange.Start, 10) + rangeEnd = strconv.FormatInt(x.IntRange.End, 10) case *protos.PartitionRange_TimestampRange: rangeStart = x.TimestampRange.Start.AsTime().String() rangeEnd = x.TimestampRange.End.AsTime().String() diff --git a/flow/connectors/utils/partition/partition.go b/flow/connectors/utils/partition/partition.go index 37ecd15f37..7d4d2c4298 100644 --- a/flow/connectors/utils/partition/partition.go +++ b/flow/connectors/utils/partition/partition.go @@ -5,11 +5,11 @@ import ( "log/slog" "time" - "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgtype" - "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/PeerDB-io/peer-flow/generated/protos" ) // Function to compare two values diff --git a/flow/connectors/utils/partition_hash.go b/flow/connectors/utils/partition_hash.go index 14de3ae943..e22b1a27d3 100644 --- a/flow/connectors/utils/partition_hash.go +++ b/flow/connectors/utils/partition_hash.go @@ -1,8 +1,8 @@ package utils import ( - "fmt" "hash/fnv" + "strconv" ) func hashString(s string) uint32 { @@ -14,5 +14,5 @@ func hashString(s string) uint32 { func HashedPartitionKey(s string, numPartitions uint32) string { hashValue := hashString(s) partition := hashValue % numPartitions - return fmt.Sprintf("%d", partition) + return strconv.FormatUint(uint64(partition), 10) } diff --git a/flow/connectors/utils/postgres.go b/flow/connectors/utils/postgres.go index c3cc48659f..ed97364417 100644 --- a/flow/connectors/utils/postgres.go +++ b/flow/connectors/utils/postgres.go @@ -6,12 +6,13 @@ import ( "fmt" "net/url" - "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" + + "github.com/PeerDB-io/peer-flow/generated/protos" ) func IsUniqueError(err error) bool { diff --git a/flow/connectors/utils/ssh.go b/flow/connectors/utils/ssh.go index d960b20488..c4580e870f 100644 --- a/flow/connectors/utils/ssh.go +++ b/flow/connectors/utils/ssh.go @@ -4,8 +4,9 @@ import ( "encoding/base64" "fmt" - "github.com/PeerDB-io/peer-flow/generated/protos" "golang.org/x/crypto/ssh" + + "github.com/PeerDB-io/peer-flow/generated/protos" ) // getSSHClientConfig returns an *ssh.ClientConfig based on provided credentials. diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go index bbfe0a2fa6..59602b676a 100644 --- a/flow/connectors/utils/stream.go +++ b/flow/connectors/utils/stream.go @@ -4,9 +4,10 @@ import ( "fmt" "time" + "github.com/google/uuid" + "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/google/uuid" ) func RecordsToRawTableStream(req *model.RecordsToStreamRequest) (*model.RecordsToStreamResponse, error) { diff --git a/flow/dynamicconf/dynamicconf.go b/flow/dynamicconf/dynamicconf.go index 5a22ba4058..d08ece7078 100644 --- a/flow/dynamicconf/dynamicconf.go +++ b/flow/dynamicconf/dynamicconf.go @@ -6,9 +6,10 @@ import ( "strconv" "time" - utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" + + utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" ) func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) bool { diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index 7d2c5bce6b..0d024f9dc7 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -11,13 +11,14 @@ import ( "cloud.google.com/go/bigquery" "cloud.google.com/go/civil" + "google.golang.org/api/iterator" + peer_bq "github.com/PeerDB-io/peer-flow/connectors/bigquery" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" - "google.golang.org/api/iterator" ) type BigQueryTestHelper struct { diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index efa47366db..c195ab5e56 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -10,6 +10,12 @@ import ( "testing" "time" + "github.com/jackc/pgerrcode" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/joho/godotenv" + "github.com/stretchr/testify/require" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" @@ -18,11 +24,6 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" - "github.com/jackc/pgerrcode" - "github.com/jackc/pgx/v5/pgconn" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/joho/godotenv" - "github.com/stretchr/testify/require" ) type PeerFlowE2ETestSuiteBQ struct { diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index 6ac9f3be5d..4302b222e5 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -3,8 +3,9 @@ package e2e_bigquery import ( "fmt" - "github.com/PeerDB-io/peer-flow/e2e" "github.com/stretchr/testify/require" + + "github.com/PeerDB-io/peer-flow/e2e" ) func (s PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int) { diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index f324a8e8f5..cb3c35bc29 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -6,13 +6,14 @@ import ( "log/slog" "time" - "github.com/PeerDB-io/peer-flow/connectors/utils" - "github.com/PeerDB-io/peer-flow/e2eshared" - "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/stretchr/testify/require" + + "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/e2eshared" + "github.com/PeerDB-io/peer-flow/generated/protos" ) const ( diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index b02d03df05..27be975acb 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -7,16 +7,17 @@ import ( "strings" "time" - "github.com/PeerDB-io/peer-flow/connectors/utils" - "github.com/PeerDB-io/peer-flow/e2e" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/model/qvalue" - peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" "github.com/stretchr/testify/require" "go.temporal.io/sdk/testsuite" + + "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model/qvalue" + peerflow "github.com/PeerDB-io/peer-flow/workflows" ) func (s PeerFlowE2ETestSuitePG) attachSchemaSuffix(tableName string) string { diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 2943e201b5..dcb32c7fd9 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -8,16 +8,17 @@ import ( "testing" "time" - connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" - "github.com/PeerDB-io/peer-flow/e2e" - "github.com/PeerDB-io/peer-flow/e2eshared" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" + + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" ) type PeerFlowE2ETestSuitePG struct { diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index 26eb42d38e..69c26b6809 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -5,9 +5,10 @@ import ( "fmt" "time" + "github.com/stretchr/testify/require" + "github.com/PeerDB-io/peer-flow/e2e" peerflow "github.com/PeerDB-io/peer-flow/workflows" - "github.com/stretchr/testify/require" ) func (s PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string { diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index 5a2f266ab7..919994b570 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -8,12 +8,13 @@ import ( "testing" "time" - "github.com/PeerDB-io/peer-flow/e2e" - "github.com/PeerDB-io/peer-flow/e2eshared" - "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" + + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" + "github.com/PeerDB-io/peer-flow/shared" ) type PeerFlowE2ETestSuiteS3 struct { diff --git a/flow/e2e/s3/s3_helper.go b/flow/e2e/s3/s3_helper.go index ecca066ea5..1be1765927 100644 --- a/flow/e2e/s3/s3_helper.go +++ b/flow/e2e/s3/s3_helper.go @@ -8,12 +8,13 @@ import ( "os" "time" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" - "github.com/aws/aws-sdk-go-v2/service/s3" - s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" ) const ( diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 95340c74f3..8f3cf46e7a 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -10,6 +10,12 @@ import ( "testing" "time" + "github.com/jackc/pgerrcode" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/joho/godotenv" + "github.com/stretchr/testify/require" + connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" @@ -19,11 +25,6 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" - "github.com/jackc/pgerrcode" - "github.com/jackc/pgx/v5/pgconn" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/joho/godotenv" - "github.com/stretchr/testify/require" ) type PeerFlowE2ETestSuiteSF struct { diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index b23ec8bb67..574869ac82 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -3,10 +3,11 @@ package e2e_snowflake import ( "fmt" - "github.com/PeerDB-io/peer-flow/e2e" - "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/google/uuid" "github.com/stretchr/testify/require" + + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/generated/protos" ) //nolint:unparam diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index f9f0c1ceca..52f02b005e 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -6,12 +6,13 @@ import ( "log/slog" "testing" + "github.com/stretchr/testify/require" + connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/stretchr/testify/require" ) const schemaDeltaTestSchemaName = "PUBLIC" diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index f7378ea2ae..3cb9cda650 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -5,20 +5,22 @@ import ( "fmt" "log/slog" "os" + "strconv" "strings" "testing" "time" + "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/joho/godotenv" + "github.com/stretchr/testify/require" + "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" - "github.com/jackc/pgx/v5/pgtype" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/joho/godotenv" - "github.com/stretchr/testify/require" ) type PeerFlowE2ETestSuiteSQLServer struct { @@ -95,8 +97,8 @@ func (s PeerFlowE2ETestSuiteSQLServer) insertRowsIntoSQLServerTable(tableName st schemaQualified := fmt.Sprintf("%s.%s", s.sqlsHelper.SchemaName, tableName) for i := 0; i < numRows; i++ { params := make(map[string]interface{}) - params["id"] = "test_id_" + fmt.Sprintf("%d", i) - params["card_id"] = "test_card_id_" + fmt.Sprintf("%d", i) + params["id"] = "test_id_" + strconv.Itoa(i) + params["card_id"] = "test_card_id_" + strconv.Itoa(i) params["v_from"] = time.Now() params["price"] = 100.00 params["status"] = 1 diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 0aa5aaebbb..94ee47a8e2 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -13,6 +13,13 @@ import ( "testing" "time" + "github.com/google/uuid" + "github.com/jackc/pgerrcode" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/testsuite" + "github.com/PeerDB-io/peer-flow/activities" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" @@ -25,12 +32,6 @@ import ( "github.com/PeerDB-io/peer-flow/shared" "github.com/PeerDB-io/peer-flow/shared/alerting" peerflow "github.com/PeerDB-io/peer-flow/workflows" - "github.com/google/uuid" - "github.com/jackc/pgerrcode" - "github.com/jackc/pgx/v5/pgconn" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/stretchr/testify/require" - "go.temporal.io/sdk/testsuite" ) func RegisterWorkflowsAndActivities(t *testing.T, env *testsuite.TestWorkflowEnvironment) { diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index 176564d342..9ac22762f8 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -8,8 +8,9 @@ import ( "strings" "testing" - "github.com/PeerDB-io/peer-flow/model" "github.com/jackc/pgx/v5/pgxpool" + + "github.com/PeerDB-io/peer-flow/model" ) type Suite interface { diff --git a/flow/model/model.go b/flow/model/model.go index c2ecd7a3b7..7e56a402e4 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -58,8 +58,8 @@ type PullRecordsRequest struct { } type Record interface { - // GetCheckPointID returns the ID of the record. - GetCheckPointID() int64 + // GetCheckpointID returns the ID of the record. + GetCheckpointID() int64 // get table name GetDestinationTableName() string // get columns and values for the record @@ -339,8 +339,8 @@ type InsertRecord struct { SourceTableName string // Name of the destination table DestinationTableName string - // CheckPointID is the ID of the record. - CheckPointID int64 + // CheckpointID is the ID of the record. + CheckpointID int64 // CommitID is the ID of the commit corresponding to this record. CommitID int64 // Items is a map of column name to value. @@ -348,8 +348,8 @@ type InsertRecord struct { } // Implement Record interface for InsertRecord. -func (r *InsertRecord) GetCheckPointID() int64 { - return r.CheckPointID +func (r *InsertRecord) GetCheckpointID() int64 { + return r.CheckpointID } func (r *InsertRecord) GetDestinationTableName() string { @@ -363,8 +363,8 @@ func (r *InsertRecord) GetItems() *RecordItems { type UpdateRecord struct { // Name of the source table SourceTableName string - // CheckPointID is the ID of the record. - CheckPointID int64 + // CheckpointID is the ID of the record. + CheckpointID int64 // Name of the destination table DestinationTableName string // OldItems is a map of column name to value. @@ -376,8 +376,8 @@ type UpdateRecord struct { } // Implement Record interface for UpdateRecord. -func (r *UpdateRecord) GetCheckPointID() int64 { - return r.CheckPointID +func (r *UpdateRecord) GetCheckpointID() int64 { + return r.CheckpointID } // Implement Record interface for UpdateRecord. @@ -394,8 +394,8 @@ type DeleteRecord struct { SourceTableName string // Name of the destination table DestinationTableName string - // CheckPointID is the ID of the record. - CheckPointID int64 + // CheckpointID is the ID of the record. + CheckpointID int64 // Items is a map of column name to value. Items *RecordItems // unchanged toast columns, filled from latest UpdateRecord @@ -403,8 +403,8 @@ type DeleteRecord struct { } // Implement Record interface for DeleteRecord. -func (r *DeleteRecord) GetCheckPointID() int64 { - return r.CheckPointID +func (r *DeleteRecord) GetCheckpointID() int64 { + return r.CheckpointID } func (r *DeleteRecord) GetDestinationTableName() string { @@ -428,8 +428,8 @@ type CDCRecordStream struct { SchemaDeltas chan *protos.TableSchemaDelta // Indicates if the last checkpoint has been set. lastCheckpointSet bool - // lastCheckPointID is the last ID of the commit that corresponds to this batch. - lastCheckPointID atomic.Int64 + // lastCheckpointID is the last ID of the commit that corresponds to this batch. + lastCheckpointID atomic.Int64 // empty signal to indicate if the records are going to be empty or not. emptySignal chan bool } @@ -442,16 +442,16 @@ func NewCDCRecordStream() *CDCRecordStream { SchemaDeltas: make(chan *protos.TableSchemaDelta, 1<<10), emptySignal: make(chan bool, 1), lastCheckpointSet: false, - lastCheckPointID: atomic.Int64{}, + lastCheckpointID: atomic.Int64{}, } } func (r *CDCRecordStream) UpdateLatestCheckpoint(val int64) { // TODO update with https://github.com/golang/go/issues/63999 once implemented - // r.lastCheckPointID.Max(val) - oldLast := r.lastCheckPointID.Load() - for oldLast < val && !r.lastCheckPointID.CompareAndSwap(oldLast, val) { - oldLast = r.lastCheckPointID.Load() + // r.lastCheckpointID.Max(val) + oldLast := r.lastCheckpointID.Load() + for oldLast < val && !r.lastCheckpointID.CompareAndSwap(oldLast, val) { + oldLast = r.lastCheckpointID.Load() } } @@ -459,7 +459,7 @@ func (r *CDCRecordStream) GetLastCheckpoint() (int64, error) { if !r.lastCheckpointSet { return 0, errors.New("last checkpoint not set, stream is still active") } - return r.lastCheckPointID.Load(), nil + return r.lastCheckpointID.Load(), nil } func (r *CDCRecordStream) AddRecord(record Record) { @@ -546,8 +546,8 @@ type NormalizeRecordsRequest struct { } type SyncResponse struct { - // LastSyncedCheckPointID is the last ID that was synced. - LastSyncedCheckPointID int64 + // LastSyncedCheckpointID is the last ID that was synced. + LastSyncedCheckpointID int64 // NumRecordsSynced is the number of records that were synced. NumRecordsSynced int64 // CurrentSyncBatchID is the ID of the currently synced batch. @@ -569,13 +569,13 @@ type NormalizeResponse struct { // being clever and passing the delta back as a regular record instead of heavy CDC refactoring. type RelationRecord struct { - CheckPointID int64 - TableSchemaDelta *protos.TableSchemaDelta + CheckpointID int64 `json:"checkpointId"` + TableSchemaDelta *protos.TableSchemaDelta `json:"tableSchemaDelta"` } // Implement Record interface for RelationRecord. -func (r *RelationRecord) GetCheckPointID() int64 { - return r.CheckPointID +func (r *RelationRecord) GetCheckpointID() int64 { + return r.CheckpointID } func (r *RelationRecord) GetDestinationTableName() string { diff --git a/flow/model/qrecord_batch.go b/flow/model/qrecord_batch.go index 4729e04baa..9b18dfbbc0 100644 --- a/flow/model/qrecord_batch.go +++ b/flow/model/qrecord_batch.go @@ -6,10 +6,11 @@ import ( "math/big" "time" - "github.com/PeerDB-io/peer-flow/geo" - "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgtype" + + "github.com/PeerDB-io/peer-flow/geo" + "github.com/PeerDB-io/peer-flow/model/qvalue" ) // QRecordBatch holds a batch of QRecord objects. diff --git a/flow/model/qrecord_test.go b/flow/model/qrecord_test.go index 40aead66c9..775d0c81fd 100644 --- a/flow/model/qrecord_test.go +++ b/flow/model/qrecord_test.go @@ -4,11 +4,12 @@ import ( "math/big" "testing" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/google/uuid" - "github.com/stretchr/testify/assert" ) func TestEquals(t *testing.T) { diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index f404df7e73..b95cde93b8 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -4,12 +4,14 @@ import ( "fmt" "log/slog" "math/big" + "strconv" "time" - hstore_util "github.com/PeerDB-io/peer-flow/hstore" - "github.com/PeerDB-io/peer-flow/model/numeric" "github.com/google/uuid" "github.com/linkedin/goavro/v2" + + hstore_util "github.com/PeerDB-io/peer-flow/hstore" + "github.com/PeerDB-io/peer-flow/model/numeric" ) // https://avro.apache.org/docs/1.11.0/spec.html @@ -275,7 +277,7 @@ func (c *QValueAvroConverter) processGoTime() (interface{}, error) { // Snowflake has issues with avro timestamp types, returning as string form of the int64 // See: https://stackoverflow.com/questions/66104762/snowflake-date-column-have-incorrect-date-from-avro-file if c.TargetDWH == QDWHTypeSnowflake { - return fmt.Sprint(ret), nil + return strconv.FormatInt(ret, 10), nil } return ret, nil } @@ -294,7 +296,7 @@ func (c *QValueAvroConverter) processGoDate() (interface{}, error) { // See: https://stackoverflow.com/questions/66104762/snowflake-date-column-have-incorrect-date-from-avro-file if c.TargetDWH == QDWHTypeSnowflake { ret := t.UnixMicro() - return fmt.Sprint(ret), nil + return strconv.FormatInt(ret, 10), nil } return t, nil } diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index 786065227b..7abe9b5ef4 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -10,9 +10,10 @@ import ( "time" "cloud.google.com/go/civil" + "github.com/google/uuid" + "github.com/PeerDB-io/peer-flow/geo" hstore_util "github.com/PeerDB-io/peer-flow/hstore" - "github.com/google/uuid" ) // if new types are added, register them in gob - cdc_records_storage.go diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go index 18c6cc2b9b..98922d22bc 100644 --- a/flow/shared/alerting/alerting.go +++ b/flow/shared/alerting/alerting.go @@ -7,9 +7,10 @@ import ( "log/slog" "time" - "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" + + "github.com/PeerDB-io/peer-flow/dynamicconf" ) // alerting service, no cool name :( diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 1dd453931b..e4f4559b0a 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -6,15 +6,16 @@ import ( "strings" "time" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/shared" "github.com/google/uuid" "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" "google.golang.org/protobuf/proto" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/shared" ) const ( diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index b21d89f198..b50da66334 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -3,10 +3,11 @@ package peerflow import ( "time" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/shared" "go.temporal.io/sdk/log" "go.temporal.io/sdk/workflow" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" ) // DropFlowWorkflowExecution represents the state for execution of a drop flow. diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 720cbba06f..70f6463aef 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -4,10 +4,11 @@ import ( "fmt" "time" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/model" "go.temporal.io/sdk/log" "go.temporal.io/sdk/workflow" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" ) type NormalizeFlowState struct { diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 47354a148b..1ae1518e21 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -6,13 +6,14 @@ import ( "strings" "time" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/shared" "github.com/google/uuid" "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" ) type QRepFlowExecution struct { diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 9b9b168a89..8959c07edf 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -6,13 +6,13 @@ import ( "sort" "time" + "go.temporal.io/sdk/log" + "go.temporal.io/sdk/workflow" + "golang.org/x/exp/maps" + "github.com/PeerDB-io/peer-flow/activities" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" - "golang.org/x/exp/maps" - - "go.temporal.io/sdk/log" - "go.temporal.io/sdk/workflow" ) // SetupFlow is the workflow that is responsible for ensuring all the diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 8830d0feec..d38801b599 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -7,15 +7,15 @@ import ( "strings" "time" - "github.com/PeerDB-io/peer-flow/concurrency" - "github.com/PeerDB-io/peer-flow/connectors/utils" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/shared" "github.com/google/uuid" - "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" + + "github.com/PeerDB-io/peer-flow/concurrency" + "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" ) type SnapshotFlowExecution struct { diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index d2b0e72136..4c930ebf66 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -4,10 +4,11 @@ import ( "fmt" "time" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/model" "go.temporal.io/sdk/log" "go.temporal.io/sdk/workflow" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" ) type SyncFlowState struct { diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index 32b276ee96..c6885253df 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -5,11 +5,12 @@ import ( "fmt" "time" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/shared" "github.com/google/uuid" "go.temporal.io/sdk/log" "go.temporal.io/sdk/workflow" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" ) type XminFlowExecution struct {