Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flow ci: drop temporal's sdk/testsuite in favor of actual integration testing #1391

Merged
merged 68 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
796fdcc
flow ci: try running temporal, getting out of sdk/testsuite
serprex Feb 27, 2024
bf391b5
More 7132 ports outside e2e
serprex Feb 27, 2024
1d7a4f6
remove temporal sdk/testsuite
serprex Feb 28, 2024
e69edac
include temporal admin tools, seems required
serprex Feb 28, 2024
7197afb
Merge branch 'main' into remove-temporal-testsuite
serprex Feb 28, 2024
108cf50
fix Finished
serprex Feb 28, 2024
373ead1
fix unused import
serprex Feb 28, 2024
8c92f5a
try moving services out of docker-compose-ci
serprex Feb 28, 2024
91634af
remove unsupported fields, remove flow-api
serprex Feb 28, 2024
1d63d1d
remove health check
serprex Feb 28, 2024
24e4873
no depends_on
serprex Feb 28, 2024
ae275a5
try with depends_on scalar
serprex Feb 28, 2024
034b181
env is map
serprex Feb 28, 2024
cd277e5
remove depends_on again
serprex Feb 28, 2024
5aee469
remove labels
serprex Feb 28, 2024
bf784aa
remove temporal admin truffles
serprex Feb 28, 2024
72561f9
remove restart
serprex Feb 28, 2024
a01400f
remove dependency on temporal admin
serprex Feb 28, 2024
4dc9838
start temporal workers as goroutines
serprex Feb 28, 2024
73a583a
service has different hostname?
serprex Feb 28, 2024
066d95d
better error handling
serprex Feb 28, 2024
4ecab4f
fix mount path
serprex Feb 28, 2024
6177c18
can I replace this with their github action?
serprex Feb 28, 2024
d371f98
no goroutine exit now that we don't goroutine
serprex Feb 28, 2024
1085999
back to localhost now that temporal no longer a container
serprex Feb 29, 2024
b04ca20
trying to find out why cdc flow wait spams context deadline timeouts
serprex Feb 29, 2024
ddcdfb1
Log workflow not being finished
serprex Feb 29, 2024
4dca5c0
WithTimeout to cap time spent asking to describe
serprex Feb 29, 2024
462d211
log status, probably running
serprex Feb 29, 2024
c41e4a1
checking if behavior same if no workers started
serprex Feb 29, 2024
2d9f201
testing if TestMain is running
serprex Feb 29, 2024
965b96c
ffs
serprex Feb 29, 2024
3f4f4fc
15m timeout on TestMain
serprex Feb 29, 2024
bffc0a3
end tests more forcefully
serprex Feb 29, 2024
63ce714
Don't use Run in main_test
serprex Feb 29, 2024
c6f79a3
merge e2e/postgres into e2e package
serprex Feb 29, 2024
7ed994d
Merge branch 'main' into remove-temporal-testsuite
serprex Feb 29, 2024
8acedb3
theory: try init instead
serprex Feb 29, 2024
2c84c68
fix client being closed
serprex Feb 29, 2024
f3fe29f
Merge remote-tracking branch 'origin/main' into remove-temporal-tests…
serprex Feb 29, 2024
cd7e210
env.Error is not a string
serprex Feb 29, 2024
4953a2c
create MirrorName search-attribute
serprex Feb 29, 2024
a9a96b7
wanting to see what plain go test looks like in ci
serprex Feb 29, 2024
3f712d6
Revert "merge e2e/postgres into e2e package"
serprex Mar 1, 2024
117626a
comment why init
serprex Mar 1, 2024
a511c6e
test with lowered parallelism
serprex Mar 1, 2024
ed19cd5
remove -v
serprex Mar 1, 2024
8d3b64c
try running temporal-server in same step as go test
serprex Mar 1, 2024
a13bec8
Merge branch 'main' into remove-temporal-testsuite
serprex Mar 1, 2024
375b5f6
Merge branch 'main' into remove-temporal-testsuite
serprex Mar 1, 2024
0af61aa
port new test
serprex Mar 1, 2024
05edf75
Log fatal errors in temporal workers
serprex Mar 1, 2024
49995f0
try logging task queue info
serprex Mar 1, 2024
b7d768e
Merge branch 'main' into remove-temporal-testsuite
serprex Mar 1, 2024
f126b9c
post merge fix
serprex Mar 1, 2024
5ed27f4
make sure logging is setup for cases ignoring non-default logger
serprex Mar 1, 2024
d9ed3dc
Merge branch 'main' into remove-temporal-testsuite
serprex Mar 2, 2024
00456aa
explicit default
serprex Mar 2, 2024
82fa52b
fix list-partition command
serprex Mar 2, 2024
fefa5b5
logging init, not sure why no snapshot task queue
serprex Mar 2, 2024
cd6a283
try -v to see init logging
serprex Mar 2, 2024
eb80c98
list workflows
serprex Mar 2, 2024
b70b896
idea: do integration tests the usual way.. by running the actual program
serprex Mar 2, 2024
8e54ae9
split up background tasks
serprex Mar 2, 2024
d33402a
GH actions are bad & don't let background processes continue to print…
serprex Mar 2, 2024
a8e4310
cook the books
serprex Mar 2, 2024
464f61c
Merge remote-tracking branch 'origin/main' into remove-temporal-tests…
serprex Mar 3, 2024
31aa444
Merge branch 'main' into remove-temporal-testsuite
serprex Mar 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 19 additions & 24 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,17 @@ jobs:
matrix:
runner: [ubicloud-standard-16-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
timeout-minutes: 30
timeout-minutes: 40
services:
pg_cdc:
catalog:
image: imresamu/postgis:15-3.4-alpine
ports:
- 7132:5432
- 5432:5432
env:
POSTGRES_USER: postgres
PGUSER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
options: >-
--name pg_cdc
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5

steps:
- name: checkout sources
uses: actions/checkout@v4
Expand All @@ -43,18 +38,12 @@ jobs:
go-version: "1.22"
cache-dependency-path: flow/go.sum

- name: install gotestsum
run: |
go install gotest.tools/gotestsum@latest

- name: install lib-geos
run: |
sudo apt-get update
sudo apt-get install libgeos-dev

- name: download go modules
run: |
go mod download
- run: go mod download
working-directory: ./flow

- name: setup gcp service account
Expand Down Expand Up @@ -87,21 +76,27 @@ jobs:

- name: create hstore extension, increase logical replication limits, and setup catalog database
run: >
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "CREATE EXTENSION hstore;"
docker exec "${{ job.services.catalog.id }}" psql -U postgres -c "CREATE EXTENSION hstore;"
-c "ALTER SYSTEM SET wal_level=logical;"
-c "ALTER SYSTEM SET max_replication_slots=192;"
-c "ALTER SYSTEM SET max_wal_senders=256;"
-c "ALTER SYSTEM SET max_connections=2048;" &&
(cat ../nexus/catalog/migrations/V{?,??}__* | docker exec -i pg_cdc psql -h localhost -p 5432 -U postgres) &&
docker restart pg_cdc
working-directory: ./flow
(cat ./nexus/catalog/migrations/V{?,??}__* | docker exec -i "${{ job.services.catalog.id }}" psql -U postgres) &&
docker restart "${{ job.services.catalog.id }}"
env:
PG_CDC: empty
PGPASSWORD: postgres

- name: Install Temporal CLI
uses: temporalio/setup-temporal@v0

- name: run tests
run: |
gotestsum --format testname -- -p 24 ./... -timeout 1200s
temporal server start-dev --namespace default --headless &
go build -ldflags="-s -w" -o peer-flow
temporal operator search-attribute create --name MirrorName --type Text --namespace default
./peer-flow worker &
./peer-flow snapshot-worker &
go test -p 32 ./... -timeout 1200s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand All @@ -122,7 +117,7 @@ jobs:
SQLSERVER_PASSWORD: ${{ secrets.SQLSERVER_PASSWORD }}
SQLSERVER_DB: ${{ secrets.SQLSERVER_DB }}
PEERDB_CATALOG_HOST: localhost
PEERDB_CATALOG_PORT: 7132
PEERDB_CATALOG_PORT: 5432
PEERDB_CATALOG_USER: postgres
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_DATABASE: postgres
7 changes: 3 additions & 4 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package alerting
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"time"
Expand Down Expand Up @@ -51,14 +50,14 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSen
}

// doesn't take care of closing pool, needs to be done externally.
func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) {
func NewAlerter(catalogPool *pgxpool.Pool) *Alerter {
if catalogPool == nil {
return nil, errors.New("catalog pool is nil for Alerter")
panic("catalog pool is nil for Alerter")
}

return &Alerter{
catalogPool: catalogPool,
}, nil
}
}

func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) {
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/api.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/cert.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"crypto/tls"
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/handler.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
36 changes: 14 additions & 22 deletions flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand All @@ -25,7 +25,7 @@ type SnapshotWorkerOptions struct {
TemporalKey string
}

func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Worker, error) {
clientOptions := client.Options{
HostPort: opts.TemporalHostPort,
Namespace: opts.TemporalNamespace,
Expand All @@ -35,7 +35,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
if opts.TemporalCert != "" && opts.TemporalKey != "" {
certs, err := Base64DecodeCertAndKey(opts.TemporalCert, opts.TemporalKey)
if err != nil {
return fmt.Errorf("unable to process certificate and key: %w", err)
return nil, nil, fmt.Errorf("unable to process certificate and key: %w", err)
}

connOptions := client.ConnectionOptions{
Expand All @@ -47,37 +47,29 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
clientOptions.ConnectionOptions = connOptions
}

conn, err := utils.GetCatalogConnectionPoolFromEnv(context.Background())
if err != nil {
return nil, nil, fmt.Errorf("unable to create catalog connection pool: %w", err)
}

c, err := client.Dial(clientOptions)
if err != nil {
return fmt.Errorf("unable to create Temporal client: %w", err)
return nil, nil, fmt.Errorf("unable to create Temporal client: %w", err)
}
defer c.Close()

taskQueue := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueue)
w := worker.New(c, taskQueue, worker.Options{
EnableSessionWorker: true,
OnFatalError: func(err error) {
slog.Error("Snapshot Worker failed", slog.Any("error", err))
},
})

conn, err := utils.GetCatalogConnectionPoolFromEnv(context.Background())
if err != nil {
return fmt.Errorf("unable to create catalog connection pool: %w", err)
}

alerter, err := alerting.NewAlerter(conn)
if err != nil {
return fmt.Errorf("unable to create alerter: %w", err)
}

w.RegisterWorkflow(peerflow.SnapshotFlowWorkflow)
w.RegisterActivity(&activities.SnapshotActivity{
SnapshotConnections: make(map[string]activities.SlotSnapshotSignal),
Alerter: alerter,
Alerter: alerting.NewAlerter(conn),
})

err = w.Run(worker.InterruptCh())
if err != nil {
return fmt.Errorf("worker run error: %w", err)
}

return nil
return c, w, nil
}
2 changes: 1 addition & 1 deletion flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/validate_peer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/version.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
41 changes: 10 additions & 31 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand All @@ -7,9 +7,7 @@ import (
"log"
"log/slog"
"os"
"os/signal"
"runtime"
"syscall"

"github.com/grafana/pyroscope-go"
"go.temporal.io/sdk/client"
Expand Down Expand Up @@ -74,22 +72,11 @@ func setupPyroscope(opts *WorkerOptions) {
}
}

func WorkerMain(opts *WorkerOptions) error {
func WorkerMain(opts *WorkerOptions) (client.Client, worker.Worker, error) {
if opts.EnableProfiling {
setupPyroscope(opts)
}

go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGQUIT)
buf := make([]byte, 1<<20)
for {
<-sigs
stacklen := runtime.Stack(buf, true)
log.Printf("=== received SIGQUIT ===\n*** goroutine dump...\n%s\n*** end\n", buf[:stacklen])
}
}()

clientOptions := client.Options{
HostPort: opts.TemporalHostPort,
Namespace: opts.TemporalNamespace,
Expand All @@ -100,7 +87,7 @@ func WorkerMain(opts *WorkerOptions) error {
slog.Info("Using temporal certificate/key for authentication")
certs, err := Base64DecodeCertAndKey(opts.TemporalCert, opts.TemporalKey)
if err != nil {
return fmt.Errorf("unable to process certificate and key: %w", err)
return nil, nil, fmt.Errorf("unable to process certificate and key: %w", err)
}
connOptions := client.ConnectionOptions{
TLS: &tls.Config{
Expand All @@ -113,37 +100,29 @@ func WorkerMain(opts *WorkerOptions) error {

conn, err := utils.GetCatalogConnectionPoolFromEnv(context.Background())
if err != nil {
return fmt.Errorf("unable to create catalog connection pool: %w", err)
return nil, nil, fmt.Errorf("unable to create catalog connection pool: %w", err)
}

c, err := client.Dial(clientOptions)
if err != nil {
return fmt.Errorf("unable to create Temporal client: %w", err)
return nil, nil, fmt.Errorf("unable to create Temporal client: %w", err)
}
slog.Info("Created temporal client")
defer c.Close()

taskQueue := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueue)
w := worker.New(c, taskQueue, worker.Options{
EnableSessionWorker: true,
OnFatalError: func(err error) {
slog.Error("Peerflow Worker failed", slog.Any("error", err))
},
})
peerflow.RegisterFlowWorkerWorkflows(w)

alerter, err := alerting.NewAlerter(conn)
if err != nil {
return fmt.Errorf("unable to create alerter: %w", err)
}

w.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerter,
Alerter: alerting.NewAlerter(conn),
CdcCache: make(map[string]connectors.CDCPullConnector),
})

err = w.Run(worker.InterruptCh())
if err != nil {
return fmt.Errorf("worker run error: %w", err)
}

return nil
return c, w, nil
}
9 changes: 2 additions & 7 deletions flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/require"

"github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand All @@ -24,13 +25,7 @@ type PostgresSchemaDeltaTestSuite struct {
func SetupSuite(t *testing.T) PostgresSchemaDeltaTestSuite {
t.Helper()

connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{
Host: "localhost",
Port: 7132,
User: "postgres",
Password: "postgres",
Database: "postgres",
})
connector, err := NewPostgresConnector(context.Background(), utils.GetCatalogPostgresConfigFromEnv())
require.NoError(t, err)

setupTx, err := connector.conn.Begin(context.Background())
Expand Down
11 changes: 2 additions & 9 deletions flow/connectors/postgres/qrep_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,14 @@ import (
"context"
"testing"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
)

func BenchmarkQRepQueryExecutor(b *testing.B) {
query := "SELECT * FROM bench.large_table"

ctx := context.Background()
connector, err := NewPostgresConnector(ctx,
&protos.PostgresConfig{
Host: "localhost",
Port: 7132,
User: "postgres",
Password: "postgres",
Database: "postgres",
})
connector, err := NewPostgresConnector(ctx, utils.GetCatalogPostgresConfigFromEnv())
if err != nil {
b.Fatalf("failed to create connection: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/postgres/qrep_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"go.temporal.io/sdk/log"

"github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -63,7 +64,7 @@ func newTestCaseForCTID(schema string, name string, rows uint32, expectedNum int
}

func TestGetQRepPartitions(t *testing.T) {
const connStr = "postgres://postgres:postgres@localhost:7132/postgres"
connStr := utils.GetCatalogConnectionStringFromEnv()

// Setup the DB
config, err := pgx.ParseConfig(connStr)
Expand Down
Loading
Loading