Skip to content

Commit

Permalink
Merge branch 'main' into refactor-sync-request
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Jan 23, 2024
2 parents 0659afe + 5b3fe7b commit e019829
Show file tree
Hide file tree
Showing 85 changed files with 348 additions and 270 deletions.
8 changes: 8 additions & 0 deletions flow/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ linters:
- durationcheck
- errcheck
- forbidigo
- gci
- gocritic
- gofumpt
- gosec
- gosimple
- ineffassign
- lll
- misspell
- musttag
- nakedret
- nolintlint
- nonamedreturns
- perfsprint
- prealloc
- staticcheck
- stylecheck
Expand All @@ -30,6 +33,11 @@ linters:
- wastedassign
- whitespace
linters-settings:
gci:
sections:
- standard
- 'prefix(github.com/PeerDB-io)'
- default
gocritic:
disabled-checks:
- ifElseChain
Expand Down
19 changes: 10 additions & 9 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ import (
"sync"
"time"

"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peer-flow/connectors"
connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
Expand All @@ -20,13 +28,6 @@ import (
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)

// CheckConnectionResult is the result of a CheckConnection call.
Expand Down Expand Up @@ -578,7 +579,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx,
a.CatalogPool, runUUID, partition, numRecords)
if err != nil {
slog.ErrorContext(ctx, fmt.Sprintf("%v", err))
slog.ErrorContext(ctx, err.Error())
goroutineErr = err
}
}
Expand Down Expand Up @@ -967,7 +968,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
err = monitoring.UpdatePullEndTimeAndRowsForPartition(
errCtx, a.CatalogPool, runUUID, partition, int64(numRecords))
if err != nil {
slog.Error(fmt.Sprintf("%v", err))
slog.Error(err.Error())
return err
}

Expand Down
17 changes: 8 additions & 9 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,20 @@ import (
"net/http"
"time"

utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/google/uuid"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/reflection"

"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"

utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

type APIServerParams struct {
Expand Down
7 changes: 4 additions & 3 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ import (
"strings"
"time"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
backoff "github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/client"
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

// grpc server implementation
Expand Down
3 changes: 2 additions & 1 deletion flow/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
"os/signal"
"syscall"

"github.com/PeerDB-io/peer-flow/logger"
"github.com/urfave/cli/v3"
_ "go.uber.org/automaxprocs"

"github.com/PeerDB-io/peer-flow/logger"
)

func main() {
Expand Down
5 changes: 3 additions & 2 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"fmt"
"log/slog"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pgx/v5/pgtype"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
)

func (h *FlowRequestHandler) MirrorStatus(
Expand Down
7 changes: 4 additions & 3 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"fmt"
"log/slog"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"google.golang.org/protobuf/proto"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
)

func (h *FlowRequestHandler) getPGPeerConfig(ctx context.Context, peerName string) (*protos.PostgresConfig, error) {
Expand Down
6 changes: 3 additions & 3 deletions flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"crypto/tls"
"fmt"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"

"github.com/PeerDB-io/peer-flow/activities"
utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
peerflow "github.com/PeerDB-io/peer-flow/workflows"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

type SnapshotWorkerOptions struct {
Expand Down
9 changes: 4 additions & 5 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@ import (
"runtime"
"syscall"

"github.com/grafana/pyroscope-go"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"

"github.com/PeerDB-io/peer-flow/activities"
utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
peerflow "github.com/PeerDB-io/peer-flow/workflows"

"github.com/grafana/pyroscope-go"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

type WorkerOptions struct {
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ import (

"cloud.google.com/go/bigquery"
"cloud.google.com/go/storage"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"
"google.golang.org/api/iterator"
"google.golang.org/api/option"

"github.com/PeerDB-io/peer-flow/connectors/utils"
cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pgx/v5/pgxpool"

"go.temporal.io/sdk/activity"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)

const (
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"

"cloud.google.com/go/bigquery"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"time"

"cloud.google.com/go/bigquery"
"google.golang.org/api/iterator"
"google.golang.org/protobuf/encoding/protojson"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"

"google.golang.org/api/iterator"
"google.golang.org/protobuf/encoding/protojson"
)

func (c *BigQueryConnector) SyncQRepRecords(
Expand Down
12 changes: 7 additions & 5 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ import (
"fmt"
"log/slog"
"os"
"strconv"
"strings"
"time"

"cloud.google.com/go/bigquery"
"go.temporal.io/sdk/activity"

"github.com/PeerDB-io/peer-flow/connectors/utils"
avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
"go.temporal.io/sdk/activity"
)

type QRepAvroSyncMethod struct {
Expand Down Expand Up @@ -53,8 +55,8 @@ func (s *QRepAvroSyncMethod) SyncRecords(
return nil, fmt.Errorf("failed to define Avro schema: %w", err)
}

stagingTable := fmt.Sprintf("%s_%s_staging", rawTableName, fmt.Sprint(syncBatchID))
numRecords, err := s.writeToStage(fmt.Sprint(syncBatchID), rawTableName, avroSchema,
stagingTable := fmt.Sprintf("%s_%s_staging", rawTableName, strconv.FormatInt(syncBatchID, 10))
numRecords, err := s.writeToStage(strconv.FormatInt(syncBatchID, 10), rawTableName, avroSchema,
&datasetTable{
dataset: s.connector.datasetID,
table: stagingTable,
Expand Down Expand Up @@ -105,7 +107,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
// just log the error this isn't fatal.
slog.Error("failed to delete staging table "+stagingTable,
slog.Any("error", err),
slog.String("syncBatchID", fmt.Sprint(syncBatchID)),
slog.Int64("syncBatchID", syncBatchID),
slog.String("destinationTable", rawTableName))
}

Expand All @@ -114,7 +116,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
slog.String("dstTableName", rawTableName))

return &model.SyncResponse{
LastSyncedCheckPointID: lastCP,
LastSyncedCheckpointID: lastCP,
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/bigquery/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"cloud.google.com/go/bigquery"

"github.com/PeerDB-io/peer-flow/model/qvalue"
)

Expand Down
1 change: 1 addition & 0 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

_ "github.com/ClickHouse/clickhouse-go/v2"
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/clickhouse/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"fmt"

"github.com/jmoiron/sqlx"

peersql "github.com/PeerDB-io/peer-flow/connectors/sql"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/jmoiron/sqlx"
)

type ClickhouseClient struct {
Expand Down
7 changes: 4 additions & 3 deletions flow/connectors/clickhouse/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"google.golang.org/protobuf/encoding/protojson"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"google.golang.org/protobuf/encoding/protojson"
)

const qRepMetadataTableName = "_peerdb_query_replication_metadata"
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"log/slog"
"time"

"go.temporal.io/sdk/activity"

"github.com/PeerDB-io/peer-flow/connectors/utils"
avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
"go.temporal.io/sdk/activity"
)

type ClickhouseAvroSyncMethod struct {
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"log/slog"

"github.com/jackc/pgx/v5/pgxpool"

connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
connclickhouse "github.com/PeerDB-io/peer-flow/connectors/clickhouse"
conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub"
Expand All @@ -15,7 +17,6 @@ import (
connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/jackc/pgx/v5/pgxpool"
)

var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality")
Expand Down
Loading

0 comments on commit e019829

Please sign in to comment.