Skip to content

Commit

Permalink
Merge branch 'main' into qrep-syncedpart-handling
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Mar 4, 2024
2 parents 85958be + 67fd5d3 commit d58d657
Show file tree
Hide file tree
Showing 88 changed files with 2,776 additions and 2,625 deletions.
43 changes: 19 additions & 24 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,17 @@ jobs:
matrix:
runner: [ubicloud-standard-16-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
timeout-minutes: 30
timeout-minutes: 40
services:
pg_cdc:
catalog:
image: imresamu/postgis:15-3.4-alpine
ports:
- 7132:5432
- 5432:5432
env:
POSTGRES_USER: postgres
PGUSER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
options: >-
--name pg_cdc
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5

steps:
- name: checkout sources
uses: actions/checkout@v4
Expand All @@ -43,18 +38,12 @@ jobs:
go-version: "1.22"
cache-dependency-path: flow/go.sum

- name: install gotestsum
run: |
go install gotest.tools/gotestsum@latest
- name: install lib-geos
run: |
sudo apt-get update
sudo apt-get install libgeos-dev
- name: download go modules
run: |
go mod download
- run: go mod download
working-directory: ./flow

- name: setup gcp service account
Expand Down Expand Up @@ -87,21 +76,27 @@ jobs:

- name: create hstore extension, increase logical replication limits, and setup catalog database
run: >
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "CREATE EXTENSION hstore;"
docker exec "${{ job.services.catalog.id }}" psql -U postgres -c "CREATE EXTENSION hstore;"
-c "ALTER SYSTEM SET wal_level=logical;"
-c "ALTER SYSTEM SET max_replication_slots=192;"
-c "ALTER SYSTEM SET max_wal_senders=256;"
-c "ALTER SYSTEM SET max_connections=2048;" &&
(cat ../nexus/catalog/migrations/V{?,??}__* | docker exec -i pg_cdc psql -h localhost -p 5432 -U postgres) &&
docker restart pg_cdc
working-directory: ./flow
(cat ./nexus/catalog/migrations/V{?,??}__* | docker exec -i "${{ job.services.catalog.id }}" psql -U postgres) &&
docker restart "${{ job.services.catalog.id }}"
env:
PG_CDC: empty
PGPASSWORD: postgres

- name: Install Temporal CLI
uses: temporalio/setup-temporal@v0

- name: run tests
run: |
gotestsum --format testname -- -p 24 ./... -timeout 1200s
temporal server start-dev --namespace default --headless &
go build -ldflags="-s -w" -o peer-flow
temporal operator search-attribute create --name MirrorName --type Text --namespace default
./peer-flow worker &
./peer-flow snapshot-worker &
go test -p 32 ./... -timeout 1200s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand All @@ -122,7 +117,7 @@ jobs:
SQLSERVER_PASSWORD: ${{ secrets.SQLSERVER_PASSWORD }}
SQLSERVER_DB: ${{ secrets.SQLSERVER_DB }}
PEERDB_CATALOG_HOST: localhost
PEERDB_CATALOG_PORT: 7132
PEERDB_CATALOG_PORT: 5432
PEERDB_CATALOG_USER: postgres
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_DATABASE: postgres
2 changes: 1 addition & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peer-flow/alerting"
"github.com/PeerDB-io/peer-flow/connectors"
connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
Expand All @@ -26,7 +27,6 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
)

// CheckConnectionResult is the result of a CheckConnection call.
Expand Down
5 changes: 4 additions & 1 deletion flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (

"go.temporal.io/sdk/activity"

"github.com/PeerDB-io/peer-flow/alerting"
"github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
)

type SnapshotActivity struct {
Expand All @@ -32,6 +32,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s
connectors.CloseConnector(ctx, s.connector)
delete(a.SnapshotConnections, flowJobName)
}
a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job - "+flowJobName)

return nil
}
Expand All @@ -49,6 +50,8 @@ func (a *SnapshotActivity) SetupReplication(
return nil, nil
}

a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job - "+config.FlowJobName)

conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand Down
48 changes: 41 additions & 7 deletions flow/shared/alerting/alerting.go → flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package alerting
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"time"
Expand All @@ -15,11 +14,13 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared/telemetry"
)

// alerting service, no cool name :(
type Alerter struct {
catalogPool *pgxpool.Pool
catalogPool *pgxpool.Pool
telemetrySender telemetry.Sender
}

func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSender, error) {
Expand Down Expand Up @@ -51,14 +52,26 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSen
}

// doesn't take care of closing pool, needs to be done externally.
func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) {
func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter {
if catalogPool == nil {
return nil, errors.New("catalog pool is nil for Alerter")
panic("catalog pool is nil for Alerter")
}
snsTopic := peerdbenv.PeerDBTelemetryAWSSNSTopicArn()
var snsMessageSender telemetry.Sender
if snsTopic != "" {
var err error
snsMessageSender, err = telemetry.NewSNSMessageSenderWithNewClient(ctx, &telemetry.SNSMessageSenderConfig{
Topic: snsTopic,
})
logger.LoggerFromCtx(ctx).Info("Successfully registered telemetry sender")
if err != nil {
panic(fmt.Sprintf("unable to setup telemetry is nil for Alerter %+v", err))
}
}

return &Alerter{
catalogPool: catalogPool,
}, nil
catalogPool: catalogPool,
telemetrySender: snsMessageSender,
}
}

func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) {
Expand Down Expand Up @@ -194,6 +207,22 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string
return false
}

func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, more string, level telemetry.Level) {
if a.telemetrySender != nil {
details := fmt.Sprintf("[%s] %s", flowName, more)
_, err := a.telemetrySender.SendMessage(ctx, details, details, telemetry.Attributes{
Level: level,
DeploymentUID: peerdbenv.PeerDBDeploymentUID(),
Tags: []string{flowName},
Type: flowName,
})
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to send message to telemetrySender", slog.Any("error", err))
return
}
}
}

func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) {
errorWithStack := fmt.Sprintf("%+v", err)
_, err = a.catalogPool.Exec(ctx,
Expand All @@ -203,6 +232,11 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error)
logger.LoggerFromCtx(ctx).Warn("failed to insert flow error", slog.Any("error", err))
return
}
a.sendTelemetryMessage(ctx, flowName, errorWithStack, telemetry.ERROR)
}

func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string) {
a.sendTelemetryMessage(ctx, flowName, info, telemetry.INFO)
}

func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) {
Expand Down
File renamed without changes.
8 changes: 2 additions & 6 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down Expand Up @@ -124,11 +124,7 @@ func APIMain(ctx context.Context, args *APIServerParams) error {
return fmt.Errorf("unable to get catalog connection pool: %w", err)
}

taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID)
if err != nil {
return err
}

taskQueue := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueue)
flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue)

err = killExistingScheduleFlows(ctx, tc, args.TemporalNamespace, taskQueue)
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/cert.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"crypto/tls"
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/handler.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
42 changes: 15 additions & 27 deletions flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand All @@ -11,10 +11,10 @@ import (
"go.temporal.io/sdk/worker"

"github.com/PeerDB-io/peer-flow/activities"
"github.com/PeerDB-io/peer-flow/alerting"
utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

Expand All @@ -25,7 +25,7 @@ type SnapshotWorkerOptions struct {
TemporalKey string
}

func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Worker, error) {
clientOptions := client.Options{
HostPort: opts.TemporalHostPort,
Namespace: opts.TemporalNamespace,
Expand All @@ -35,7 +35,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
if opts.TemporalCert != "" && opts.TemporalKey != "" {
certs, err := Base64DecodeCertAndKey(opts.TemporalCert, opts.TemporalKey)
if err != nil {
return fmt.Errorf("unable to process certificate and key: %w", err)
return nil, nil, fmt.Errorf("unable to process certificate and key: %w", err)
}

connOptions := client.ConnectionOptions{
Expand All @@ -47,41 +47,29 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
clientOptions.ConnectionOptions = connOptions
}

c, err := client.Dial(clientOptions)
conn, err := utils.GetCatalogConnectionPoolFromEnv(context.Background())
if err != nil {
return fmt.Errorf("unable to create Temporal client: %w", err)
return nil, nil, fmt.Errorf("unable to create catalog connection pool: %w", err)
}
defer c.Close()

taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueueID)
if queueErr != nil {
return queueErr
c, err := client.Dial(clientOptions)
if err != nil {
return nil, nil, fmt.Errorf("unable to create Temporal client: %w", err)
}

taskQueue := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueue)
w := worker.New(c, taskQueue, worker.Options{
EnableSessionWorker: true,
OnFatalError: func(err error) {
slog.Error("Snapshot Worker failed", slog.Any("error", err))
},
})

conn, err := utils.GetCatalogConnectionPoolFromEnv(context.Background())
if err != nil {
return fmt.Errorf("unable to create catalog connection pool: %w", err)
}

alerter, err := alerting.NewAlerter(conn)
if err != nil {
return fmt.Errorf("unable to create alerter: %w", err)
}

w.RegisterWorkflow(peerflow.SnapshotFlowWorkflow)
w.RegisterActivity(&activities.SnapshotActivity{
SnapshotConnections: make(map[string]activities.SlotSnapshotSignal),
Alerter: alerter,
Alerter: alerting.NewAlerter(context.Background(), conn),
})

err = w.Run(worker.InterruptCh())
if err != nil {
return fmt.Errorf("worker run error: %w", err)
}

return nil
return c, w, nil
}
2 changes: 1 addition & 1 deletion flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/validate_peer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/version.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand Down
Loading

0 comments on commit d58d657

Please sign in to comment.