Skip to content

Commit

Permalink
Merge branch 'main' into qrep-syncedpart-handling
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Mar 7, 2024
2 parents d58d657 + d8a7bd6 commit 0a29dcd
Show file tree
Hide file tree
Showing 46 changed files with 1,236 additions and 1,110 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
matrix:
runner: [ubicloud-standard-16-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
timeout-minutes: 40
timeout-minutes: 30
services:
catalog:
image: imresamu/postgis:15-3.4-alpine
Expand Down Expand Up @@ -96,7 +96,7 @@ jobs:
temporal operator search-attribute create --name MirrorName --type Text --namespace default
./peer-flow worker &
./peer-flow snapshot-worker &
go test -p 32 ./... -timeout 1200s
go test -p 32 ./... -timeout 900s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
5 changes: 5 additions & 0 deletions flow/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ linters-settings:
settings:
hugeParam:
sizeThreshold: 512
govet:
enable-all: true
disable:
- fieldalignment
- shadow
stylecheck:
checks:
- all
Expand Down
12 changes: 6 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (a *FlowableActivity) SyncFlow(
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "transferring records for job - " + flowName
return "transferring records for job"
})
defer shutdown()

Expand Down Expand Up @@ -474,7 +474,7 @@ func (a *FlowableActivity) StartNormalize(
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "normalizing records from batch for job - " + input.FlowConnectionConfigs.FlowJobName
return "normalizing records from batch for job"
})
defer shutdown()

Expand Down Expand Up @@ -542,7 +542,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
defer connectors.CloseConnector(ctx, srcConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "getting partitions for job - " + config.FlowJobName
return "getting partitions for job"
})
defer shutdown()

Expand Down Expand Up @@ -736,7 +736,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "consolidating partitions for job - " + config.FlowJobName
return "consolidating partitions for job"
})
defer shutdown()

Expand Down Expand Up @@ -863,7 +863,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
}

func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
rows, err := a.CatalogPool.Query(ctx, "SELECT flows.name, flows.config_proto FROM flows WHERE query_string IS NULL")
rows, err := a.CatalogPool.Query(ctx, "SELECT DISTINCT ON (name) name, config_proto FROM flows WHERE query_string IS NULL")
if err != nil {
return err
}
Expand Down Expand Up @@ -991,7 +991,7 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "renaming tables for job - " + config.FlowJobName
return "renaming tables for job"
})
defer shutdown()

Expand Down
6 changes: 3 additions & 3 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s
connectors.CloseConnector(ctx, s.connector)
delete(a.SnapshotConnections, flowJobName)
}
a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job - "+flowJobName)
a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job")

return nil
}
Expand All @@ -50,7 +50,7 @@ func (a *SnapshotActivity) SetupReplication(
return nil, nil
}

a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job - "+config.FlowJobName)
a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job")

conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
if err != nil {
Expand Down Expand Up @@ -84,7 +84,7 @@ func (a *SnapshotActivity) SetupReplication(
var slotInfo connpostgres.SlotCreationResult
select {
case slotInfo = <-slotSignal.SlotCreated:
logger.Info("slot created", slotInfo.SlotName)
logger.Info("slot created", slog.String("SlotName", slotInfo.SlotName))
case err := <-replicationErr:
closeConnectionForError(err)
return nil, fmt.Errorf("failed to setup replication: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,8 @@ func (h *FlowRequestHandler) ShutdownFlow(
}

if req.RemoveFlowEntry {
delErr := h.removeFlowEntryInCatalog(ctx, req.FlowJobName)
if delErr != nil {
err := h.removeFlowEntryInCatalog(ctx, req.FlowJobName)
if err != nil {
slog.Error("unable to remove flow job entry",
slog.String(string(shared.FlowNameKey), req.FlowJobName),
slog.Any("error", err),
Expand Down
3 changes: 2 additions & 1 deletion flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ func (h *FlowRequestHandler) cloneTableSummary(

rows, err := h.pool.Query(ctx, q, "clone_"+flowJobName+"_%")
if err != nil {
slog.Error("unable to query initial load partition - "+flowJobName, slog.Any("error", err))
slog.Error("unable to query initial load partition",
slog.String(string(shared.FlowNameKey), flowJobName), slog.Any("error", err))
return nil, fmt.Errorf("unable to query initial load partition - %s: %w", flowJobName, err)
}

Expand Down
4 changes: 2 additions & 2 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ func (h *FlowRequestHandler) getPGPeerConfig(ctx context.Context, peerName strin
return nil, err
}

unmarshalErr := proto.Unmarshal(pgPeerOptions, &pgPeerConfig)
err = proto.Unmarshal(pgPeerOptions, &pgPeerConfig)
if err != nil {
return nil, unmarshalErr
return nil, err
}

return &pgPeerConfig, nil
Expand Down
9 changes: 3 additions & 6 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a
avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket,
s.connector.creds.Region, avroFile.FilePath)

if err != nil {
return err
}
//nolint:gosec
query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', 'Avro')",
s.config.DestinationTableIdentifier, avroFileUrl,
Expand Down Expand Up @@ -133,12 +130,12 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
continue
}

selector = append(selector, colName)
selector = append(selector, "`"+colName+"`")
}
selectorStr := strings.Join(selector, ",")
//nolint:gosec
query := fmt.Sprintf("INSERT INTO %s(%s) SELECT * FROM s3('%s','%s','%s', 'Avro')",
config.DestinationTableIdentifier, selectorStr, avroFileUrl,
query := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s FROM s3('%s','%s','%s', 'Avro')",
config.DestinationTableIdentifier, selectorStr, selectorStr, avroFileUrl,
s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey)

_, err = s.connector.database.ExecContext(ctx, query)
Expand Down
40 changes: 21 additions & 19 deletions flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,23 +196,25 @@ func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() {
}

func TestPostgresSchemaDeltaTestSuite(t *testing.T) {
e2eshared.RunSuite(t, SetupSuite, func(s PostgresSchemaDeltaTestSuite) {
teardownTx, err := s.connector.conn.Begin(context.Background())
require.NoError(s.t, err)
defer func() {
err := teardownTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
require.NoError(s.t, err)
}
}()
_, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
s.schema))
require.NoError(s.t, err)
err = teardownTx.Commit(context.Background())
require.NoError(s.t, err)

require.NoError(s.t, s.connector.ConnectionActive(context.Background()))
require.NoError(s.t, s.connector.Close())
require.Error(s.t, s.connector.ConnectionActive(context.Background()))
})
e2eshared.RunSuite(t, SetupSuite)
}

func (s PostgresSchemaDeltaTestSuite) Teardown() {
teardownTx, err := s.connector.conn.Begin(context.Background())
require.NoError(s.t, err)
defer func() {
err := teardownTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
require.NoError(s.t, err)
}
}()
_, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
s.schema))
require.NoError(s.t, err)
err = teardownTx.Commit(context.Background())
require.NoError(s.t, err)

require.NoError(s.t, s.connector.ConnectionActive(context.Background()))
require.NoError(s.t, s.connector.Close())
require.Error(s.t, s.connector.ConnectionActive(context.Background()))
}
102 changes: 102 additions & 0 deletions flow/e2e/bigquery/bigquery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package e2e_bigquery

import (
"fmt"
"strings"
"testing"
"time"

"github.com/jackc/pgx/v5"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
)

type PeerFlowE2ETestSuiteBQ struct {
t *testing.T

bqSuffix string
conn *connpostgres.PostgresConnector
bqHelper *BigQueryTestHelper
}

func (s PeerFlowE2ETestSuiteBQ) T() *testing.T {
return s.t
}

func (s PeerFlowE2ETestSuiteBQ) Conn() *pgx.Conn {
return s.conn.Conn()
}

func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector {
return s.conn
}

func (s PeerFlowE2ETestSuiteBQ) Suffix() string {
return s.bqSuffix
}

func (s PeerFlowE2ETestSuiteBQ) Peer() *protos.Peer {
return s.bqHelper.Peer
}

func (s PeerFlowE2ETestSuiteBQ) DestinationTable(table string) string {
return table
}

func (s PeerFlowE2ETestSuiteBQ) GetRows(tableName string, colsString string) (*model.QRecordBatch, error) {
s.t.Helper()
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName)
bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName)
s.t.Logf("running query on bigquery: %s", bqSelQuery)
return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery)
}

func (s PeerFlowE2ETestSuiteBQ) GetRowsWhere(tableName string, colsString string, where string) (*model.QRecordBatch, error) {
s.t.Helper()
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName)
bqSelQuery := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY id", colsString, qualifiedTableName, where)
s.t.Logf("running query on bigquery: %s", bqSelQuery)
return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery)
}

func (s PeerFlowE2ETestSuiteBQ) Teardown() {
e2e.TearDownPostgres(s)

err := s.bqHelper.DropDataset(s.bqHelper.Config.DatasetId)
if err != nil {
s.t.Fatalf("failed to tear down bigquery: %v", err)
}
}

func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ {
t.Helper()

suffix := shared.RandomString(8)
tsSuffix := time.Now().Format("20060102150405")
bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix)
conn, err := e2e.SetupPostgres(t, bqSuffix)
if err != nil || conn == nil {
t.Fatalf("failed to setup postgres: %v", err)
}

bqHelper, err := NewBigQueryTestHelper()
if err != nil {
t.Fatalf("Failed to create helper: %v", err)
}

err = bqHelper.RecreateDataset()
if err != nil {
t.Fatalf("Failed to recreate dataset: %v", err)
}

return PeerFlowE2ETestSuiteBQ{
t: t,
bqSuffix: bqSuffix,
conn: conn,
bqHelper: bqHelper,
}
}
Loading

0 comments on commit 0a29dcd

Please sign in to comment.