Skip to content

Commit

Permalink
flow ci: drop temporal's sdk/testsuite in favor of actual integration…
Browse files Browse the repository at this point in the history
… testing (#1391)

temporalio/sdk-go#805 cements temporal's position that sdk/testsuite is for unit testing, not integration testing

See also simplification of dynamic signals test where before Kevin had to work out some precise delayed callback timing to get signals right

Instead workflows run in another process &
tests are only using temporal client to kick off workflows &
query workflow state or check databases in a waitfor loop
  • Loading branch information
serprex authored Mar 4, 2024
1 parent 72cfd9d commit 5853e4e
Show file tree
Hide file tree
Showing 29 changed files with 1,785 additions and 2,048 deletions.
43 changes: 19 additions & 24 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,17 @@ jobs:
matrix:
runner: [ubicloud-standard-16-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
timeout-minutes: 30
timeout-minutes: 40
services:
pg_cdc:
catalog:
image: imresamu/postgis:15-3.4-alpine
ports:
- 7132:5432
- 5432:5432
env:
POSTGRES_USER: postgres
PGUSER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
options: >-
--name pg_cdc
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5

steps:
- name: checkout sources
uses: actions/checkout@v4
Expand All @@ -43,18 +38,12 @@ jobs:
go-version: "1.22"
cache-dependency-path: flow/go.sum

- name: install gotestsum
run: |
go install gotest.tools/gotestsum@latest
- name: install lib-geos
run: |
sudo apt-get update
sudo apt-get install libgeos-dev
- name: download go modules
run: |
go mod download
- run: go mod download
working-directory: ./flow

- name: setup gcp service account
Expand Down Expand Up @@ -87,21 +76,27 @@ jobs:

- name: create hstore extension, increase logical replication limits, and setup catalog database
run: >
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "CREATE EXTENSION hstore;"
docker exec "${{ job.services.catalog.id }}" psql -U postgres -c "CREATE EXTENSION hstore;"
-c "ALTER SYSTEM SET wal_level=logical;"
-c "ALTER SYSTEM SET max_replication_slots=192;"
-c "ALTER SYSTEM SET max_wal_senders=256;"
-c "ALTER SYSTEM SET max_connections=2048;" &&
(cat ../nexus/catalog/migrations/V{?,??}__* | docker exec -i pg_cdc psql -h localhost -p 5432 -U postgres) &&
docker restart pg_cdc
working-directory: ./flow
(cat ./nexus/catalog/migrations/V{?,??}__* | docker exec -i "${{ job.services.catalog.id }}" psql -U postgres) &&
docker restart "${{ job.services.catalog.id }}"
env:
PG_CDC: empty
PGPASSWORD: postgres

- name: Install Temporal CLI
uses: temporalio/setup-temporal@v0

- name: run tests
run: |
gotestsum --format testname -- -p 24 ./... -timeout 1200s
temporal server start-dev --namespace default --headless &
go build -ldflags="-s -w" -o peer-flow
temporal operator search-attribute create --name MirrorName --type Text --namespace default
./peer-flow worker &
./peer-flow snapshot-worker &
go test -p 32 ./... -timeout 1200s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand All @@ -122,7 +117,7 @@ jobs:
SQLSERVER_PASSWORD: ${{ secrets.SQLSERVER_PASSWORD }}
SQLSERVER_DB: ${{ secrets.SQLSERVER_DB }}
PEERDB_CATALOG_HOST: localhost
PEERDB_CATALOG_PORT: 7132
PEERDB_CATALOG_PORT: 5432
PEERDB_CATALOG_USER: postgres
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_DATABASE: postgres
7 changes: 3 additions & 4 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package alerting
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"time"
Expand Down Expand Up @@ -51,14 +50,14 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSen
}

// doesn't take care of closing pool, needs to be done externally.
func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) {
func NewAlerter(catalogPool *pgxpool.Pool) *Alerter {
if catalogPool == nil {
return nil, errors.New("catalog pool is nil for Alerter")
panic("catalog pool is nil for Alerter")
}

return &Alerter{
catalogPool: catalogPool,
}, nil
}
}

func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) {
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/api.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/cert.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"crypto/tls"
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/handler.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
36 changes: 14 additions & 22 deletions flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand All @@ -25,7 +25,7 @@ type SnapshotWorkerOptions struct {
TemporalKey string
}

func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Worker, error) {
clientOptions := client.Options{
HostPort: opts.TemporalHostPort,
Namespace: opts.TemporalNamespace,
Expand All @@ -35,7 +35,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
if opts.TemporalCert != "" && opts.TemporalKey != "" {
certs, err := Base64DecodeCertAndKey(opts.TemporalCert, opts.TemporalKey)
if err != nil {
return fmt.Errorf("unable to process certificate and key: %w", err)
return nil, nil, fmt.Errorf("unable to process certificate and key: %w", err)
}

connOptions := client.ConnectionOptions{
Expand All @@ -47,37 +47,29 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
clientOptions.ConnectionOptions = connOptions
}

conn, err := utils.GetCatalogConnectionPoolFromEnv(context.Background())
if err != nil {
return nil, nil, fmt.Errorf("unable to create catalog connection pool: %w", err)
}

c, err := client.Dial(clientOptions)
if err != nil {
return fmt.Errorf("unable to create Temporal client: %w", err)
return nil, nil, fmt.Errorf("unable to create Temporal client: %w", err)
}
defer c.Close()

taskQueue := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueue)
w := worker.New(c, taskQueue, worker.Options{
EnableSessionWorker: true,
OnFatalError: func(err error) {
slog.Error("Snapshot Worker failed", slog.Any("error", err))
},
})

conn, err := utils.GetCatalogConnectionPoolFromEnv(context.Background())
if err != nil {
return fmt.Errorf("unable to create catalog connection pool: %w", err)
}

alerter, err := alerting.NewAlerter(conn)
if err != nil {
return fmt.Errorf("unable to create alerter: %w", err)
}

w.RegisterWorkflow(peerflow.SnapshotFlowWorkflow)
w.RegisterActivity(&activities.SnapshotActivity{
SnapshotConnections: make(map[string]activities.SlotSnapshotSignal),
Alerter: alerter,
Alerter: alerting.NewAlerter(conn),
})

err = w.Run(worker.InterruptCh())
if err != nil {
return fmt.Errorf("worker run error: %w", err)
}

return nil
return c, w, nil
}
2 changes: 1 addition & 1 deletion flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/validate_peer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/version.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
41 changes: 10 additions & 31 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand All @@ -7,9 +7,7 @@ import (
"log"
"log/slog"
"os"
"os/signal"
"runtime"
"syscall"

"github.com/grafana/pyroscope-go"
"go.temporal.io/sdk/client"
Expand Down Expand Up @@ -74,22 +72,11 @@ func setupPyroscope(opts *WorkerOptions) {
}
}

func WorkerMain(opts *WorkerOptions) error {
func WorkerMain(opts *WorkerOptions) (client.Client, worker.Worker, error) {
if opts.EnableProfiling {
setupPyroscope(opts)
}

go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGQUIT)
buf := make([]byte, 1<<20)
for {
<-sigs
stacklen := runtime.Stack(buf, true)
log.Printf("=== received SIGQUIT ===\n*** goroutine dump...\n%s\n*** end\n", buf[:stacklen])
}
}()

clientOptions := client.Options{
HostPort: opts.TemporalHostPort,
Namespace: opts.TemporalNamespace,
Expand All @@ -100,7 +87,7 @@ func WorkerMain(opts *WorkerOptions) error {
slog.Info("Using temporal certificate/key for authentication")
certs, err := Base64DecodeCertAndKey(opts.TemporalCert, opts.TemporalKey)
if err != nil {
return fmt.Errorf("unable to process certificate and key: %w", err)
return nil, nil, fmt.Errorf("unable to process certificate and key: %w", err)
}
connOptions := client.ConnectionOptions{
TLS: &tls.Config{
Expand All @@ -113,37 +100,29 @@ func WorkerMain(opts *WorkerOptions) error {

conn, err := utils.GetCatalogConnectionPoolFromEnv(context.Background())
if err != nil {
return fmt.Errorf("unable to create catalog connection pool: %w", err)
return nil, nil, fmt.Errorf("unable to create catalog connection pool: %w", err)
}

c, err := client.Dial(clientOptions)
if err != nil {
return fmt.Errorf("unable to create Temporal client: %w", err)
return nil, nil, fmt.Errorf("unable to create Temporal client: %w", err)
}
slog.Info("Created temporal client")
defer c.Close()

taskQueue := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueue)
w := worker.New(c, taskQueue, worker.Options{
EnableSessionWorker: true,
OnFatalError: func(err error) {
slog.Error("Peerflow Worker failed", slog.Any("error", err))
},
})
peerflow.RegisterFlowWorkerWorkflows(w)

alerter, err := alerting.NewAlerter(conn)
if err != nil {
return fmt.Errorf("unable to create alerter: %w", err)
}

w.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerter,
Alerter: alerting.NewAlerter(conn),
CdcCache: make(map[string]connectors.CDCPullConnector),
})

err = w.Run(worker.InterruptCh())
if err != nil {
return fmt.Errorf("worker run error: %w", err)
}

return nil
return c, w, nil
}
9 changes: 2 additions & 7 deletions flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/require"

"github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand All @@ -24,13 +25,7 @@ type PostgresSchemaDeltaTestSuite struct {
func SetupSuite(t *testing.T) PostgresSchemaDeltaTestSuite {
t.Helper()

connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{
Host: "localhost",
Port: 7132,
User: "postgres",
Password: "postgres",
Database: "postgres",
})
connector, err := NewPostgresConnector(context.Background(), utils.GetCatalogPostgresConfigFromEnv())
require.NoError(t, err)

setupTx, err := connector.conn.Begin(context.Background())
Expand Down
11 changes: 2 additions & 9 deletions flow/connectors/postgres/qrep_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,14 @@ import (
"context"
"testing"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
)

func BenchmarkQRepQueryExecutor(b *testing.B) {
query := "SELECT * FROM bench.large_table"

ctx := context.Background()
connector, err := NewPostgresConnector(ctx,
&protos.PostgresConfig{
Host: "localhost",
Port: 7132,
User: "postgres",
Password: "postgres",
Database: "postgres",
})
connector, err := NewPostgresConnector(ctx, utils.GetCatalogPostgresConfigFromEnv())
if err != nil {
b.Fatalf("failed to create connection: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/postgres/qrep_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"go.temporal.io/sdk/log"

"github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -63,7 +64,7 @@ func newTestCaseForCTID(schema string, name string, rows uint32, expectedNum int
}

func TestGetQRepPartitions(t *testing.T) {
const connStr = "postgres://postgres:postgres@localhost:7132/postgres"
connStr := utils.GetCatalogConnectionStringFromEnv()

// Setup the DB
config, err := pgx.ParseConfig(connStr)
Expand Down
Loading

0 comments on commit 5853e4e

Please sign in to comment.