Skip to content

Commit

Permalink
Merge branch 'main' into remove-testify-suite
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 27, 2023
2 parents a398110 + e4a71dc commit 2af4c60
Show file tree
Hide file tree
Showing 101 changed files with 2,846 additions and 2,221 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 16 ./... -timeout 2400s
gotestsum --format testname -- -p 16 ./... -timeout 1200s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
29 changes: 22 additions & 7 deletions dev-peerdb.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,27 @@
#!/bin/bash
set -Eeuo pipefail
#!/bin/sh
if test -z "$USE_PODMAN"
then
if ! command -v docker &> /dev/null
then
if command -v podman-compose
then
echo "docker could not be found on PATH, using podman-compose"
USE_PODMAN=1
else
echo "docker could not be found on PATH"
exit 1
fi
fi
fi

if ! command -v docker &> /dev/null
if test -z "$USE_PODMAN"
then
echo "docker could not be found on PATH"
exit 1
DOCKER="docker compose"
EXTRA_ARGS="--no-attach temporal --no-attach pyroscope --no-attach temporal-ui"
else
DOCKER="podman-compose --podman-run-args=--replace"
EXTRA_ARGS=""
fi

export PEERDB_VERSION_SHA_SHORT=local-$(git rev-parse --short HEAD)
docker compose -f docker-compose-dev.yml up --build \
--no-attach temporal --no-attach pyroscope --no-attach temporal-ui
exec $DOCKER -f docker-compose-dev.yml up --build $EXTRA_ARGS
4 changes: 3 additions & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ services:
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready", "-d", "postgres", "-U", "postgres"]
test: ["CMD", "pg_isready", "-d", "postgres", "-U", "postgres"]
interval: 10s
timeout: 30s
retries: 5
Expand Down Expand Up @@ -209,6 +209,8 @@ services:
DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres
PEERDB_FLOW_SERVER_HTTP: http://flow_api:8113
PEERDB_PASSWORD:
depends_on:
- flow-api

volumes:
pgdata:
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ services:
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready", "-d", "postgres", "-U", "postgres"]
test: ["CMD", "pg_isready", "-d", "postgres", "-U", "postgres"]
interval: 10s
timeout: 30s
retries: 5
Expand Down Expand Up @@ -174,6 +174,8 @@ services:
<<: *catalog-config
DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres
PEERDB_FLOW_SERVER_HTTP: http://flow_api:8113
depends_on:
- flow-api

volumes:
pgdata:
Expand Down
16 changes: 12 additions & 4 deletions flow/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,31 @@ run:
linters:
enable:
- dogsled
- dupl
- durationcheck
- errcheck
- gofumpt
- gosec
- gosimple
- misspell
- nakedret
- nolintlint
- staticcheck
- stylecheck
- sqlclosecheck
- unconvert
- unparam
- whitespace
- errcheck
- prealloc
- staticcheck
- thelper
- ineffassign
- unparam
- unused
- lll
linters-settings:
stylecheck:
checks:
- all
- '-ST1003'
lll:
line-length: 120
line-length: 144
tab-width: 4
23 changes: 16 additions & 7 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot
}
defer connectors.CloseConnector(dstConn)

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
if err := dstConn.SetupMetadataTables(); err != nil {
a.Alerter.LogFlowError(ctx, config.Name, err)
a.Alerter.LogFlowError(ctx, flowName, err)
return fmt.Errorf("failed to setup metadata tables: %w", err)
}

Expand Down Expand Up @@ -112,7 +113,7 @@ func (a *FlowableActivity) EnsurePullability(

output, err := srcConn.EnsurePullability(config)
if err != nil {
a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err)
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to ensure pullability: %w", err)
}

Expand Down Expand Up @@ -169,14 +170,14 @@ func (a *FlowableActivity) CreateNormalizedTable(

setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(config)
if err != nil {
a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err)
flowName, _ := ctx.Value(shared.FlowNameKey).(string)
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to setup normalized tables: %w", err)
}

return setupNormalizedTablesOutput, nil
}

// StartFlow implements StartFlow.
func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput,
) (*model.SyncResponse, error) {
Expand Down Expand Up @@ -581,7 +582,8 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
slog.Error("failed to pull records", slog.Any("error", err))
goroutineErr = err
} else {
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords)
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx,
a.CatalogPool, runUUID, partition, numRecords)
if err != nil {
slog.Error(fmt.Sprintf("%v", err))
goroutineErr = err
Expand Down Expand Up @@ -742,6 +744,11 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto
}

func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
if !peerdbenv.PeerDBEnableWALHeartbeat() {
slog.InfoContext(ctx, "wal heartbeat is disabled")
return nil
}

sendTimeout := 10 * time.Minute
ticker := time.NewTicker(sendTimeout)
defer ticker.Stop()
Expand Down Expand Up @@ -931,7 +938,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
},
}
}
updateErr := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
updateErr := monitoring.InitializeQRepRun(
ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
if updateErr != nil {
return updateErr
}
Expand All @@ -941,7 +949,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
return fmt.Errorf("failed to update start time for partition: %w", err)
}

err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, a.CatalogPool, runUUID, partition, int64(numRecords))
err = monitoring.UpdatePullEndTimeAndRowsForPartition(
errCtx, a.CatalogPool, runUUID, partition, int64(numRecords))
if err != nil {
slog.Error(fmt.Sprintf("%v", err))
return err
Expand Down
5 changes: 5 additions & 0 deletions flow/activities/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ func (a *FlowableActivity) handleSlotInfo(
return err
}

if len(slotInfo) == 0 {
slog.WarnContext(ctx, "warning: unable to get slot info", slog.Any("slotName", slotName))
return nil
}

deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
Expand Down
5 changes: 4 additions & 1 deletion flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ func APIMain(args *APIServerParams) error {
}

connOptions := client.ConnectionOptions{
TLS: &tls.Config{Certificates: certs},
TLS: &tls.Config{
Certificates: certs,
MinVersion: tls.VersionTLS13,
},
}
clientOptions.ConnectionOptions = connOptions
}
Expand Down
11 changes: 6 additions & 5 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,9 @@ func (h *FlowRequestHandler) CreateCDCFlow(
}

limits := &peerflow.CDCFlowLimits{
TotalSyncFlows: 0,
ExitAfterRecords: -1,
TotalNormalizeFlows: 0,
MaxBatchSize: maxBatchSize,
TotalSyncFlows: 0,
ExitAfterRecords: -1,
MaxBatchSize: maxBatchSize,
}

if req.ConnectionConfigs.SoftDeleteColName == "" {
Expand Down Expand Up @@ -261,7 +260,9 @@ func (h *FlowRequestHandler) CreateQRepFlow(
slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
return nil, fmt.Errorf("invalid xmin txid for xmin rep: %w", err)
}
state.LastPartition.Range = &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: txid}}}
state.LastPartition.Range = &protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: txid}},
}
}

workflowFn = peerflow.XminFlowWorkflow
Expand Down
16 changes: 8 additions & 8 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,24 @@ func (h *FlowRequestHandler) getPGPeerConfig(ctx context.Context, peerName strin
return &pgPeerConfig, nil
}

func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, string, error) {
func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, error) {
pgPeerConfig, err := h.getPGPeerConfig(ctx, peerName)
if err != nil {
return nil, "", err
return nil, err
}
connStr := utils.GetPGConnectionString(pgPeerConfig)
peerPool, err := pgxpool.New(ctx, connStr)
if err != nil {
return nil, "", err
return nil, err
}
return peerPool, pgPeerConfig.User, nil
return peerPool, nil
}

func (h *FlowRequestHandler) GetSchemas(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerSchemasResponse, error) {
peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName)
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func (h *FlowRequestHandler) GetTablesInSchema(
ctx context.Context,
req *protos.SchemaTablesRequest,
) (*protos.SchemaTablesResponse, error) {
peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName)
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
}
Expand Down Expand Up @@ -110,7 +110,7 @@ func (h *FlowRequestHandler) GetAllTables(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.AllTablesResponse, error) {
peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName)
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}
Expand Down Expand Up @@ -140,7 +140,7 @@ func (h *FlowRequestHandler) GetColumns(
ctx context.Context,
req *protos.TableColumnsRequest,
) (*protos.TableColumnsResponse, error) {
peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName)
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.TableColumnsResponse{Columns: nil}, err
}
Expand Down
5 changes: 4 additions & 1 deletion flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
}

connOptions := client.ConnectionOptions{
TLS: &tls.Config{Certificates: certs},
TLS: &tls.Config{
Certificates: certs,
MinVersion: tls.VersionTLS13,
},
}
clientOptions.ConnectionOptions = connOptions
}
Expand Down
5 changes: 4 additions & 1 deletion flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ func WorkerMain(opts *WorkerOptions) error {
return fmt.Errorf("unable to process certificate and key: %w", err)
}
connOptions := client.ConnectionOptions{
TLS: &tls.Config{Certificates: certs},
TLS: &tls.Config{
Certificates: certs,
MinVersion: tls.VersionTLS13,
},
}
clientOptions.ConnectionOptions = connOptions
}
Expand Down
Loading

0 comments on commit 2af4c60

Please sign in to comment.