Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make SnapshotFlow significantly simpler #922

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions docker-bake.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ group "default" {
"peerdb",
"flow-worker",
"flow-api",
"flow-snapshot-worker",
"peerdb-ui"
]
}
Expand All @@ -37,20 +36,6 @@ target "flow-api" {
]
}

target "flow-snapshot-worker" {
context = "."
dockerfile = "stacks/flow.Dockerfile"
target = "flow-snapshot-worker"
platforms = [
"linux/amd64",
"linux/arm64",
]
tags = [
"${REGISTRY}/flow-snapshot-worker:${TAG}",
"${REGISTRY}/flow-snapshot-worker:${SHA_SHORT}",
]
}

target "flow-worker" {
context = "."
dockerfile = "stacks/flow.Dockerfile"
Expand Down
12 changes: 0 additions & 12 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,6 @@ services:
temporal-admin-tools:
condition: service_healthy

flow-snapshot-worker:
container_name: flow-snapshot-worker
build:
context: .
dockerfile: stacks/flow.Dockerfile
target: flow-snapshot-worker
environment:
<<: [*catalog-config, *flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy

flow-worker1:
container_name: flow-worker1
build:
Expand Down
9 changes: 0 additions & 9 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,6 @@ services:
temporal-admin-tools:
condition: service_healthy

flow-snapshot-worker:
container_name: flow-snapshot-worker
image: ghcr.io/peerdb-io/flow-snapshot-worker:latest-dev
environment:
<<: [*catalog-config, *flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy

flow-worker1:
container_name: flow-worker1
image: ghcr.io/peerdb-io/flow-worker:latest-dev
Expand Down
6 changes: 0 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ type CheckConnectionResult struct {
NeedsSetupMetadataTables bool
}

type SlotSnapshotSignal struct {
signal connpostgres.SlotSignal
snapshotName string
connector connectors.CDCPullConnector
}

type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
Expand Down
62 changes: 6 additions & 56 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,7 @@ import (
)

type SnapshotActivity struct {
SnapshotConnections map[string]SlotSnapshotSignal
Alerter *alerting.Alerter
}

// closes the slot signal
func (a *SnapshotActivity) CloseSlotKeepAlive(flowJobName string) error {
if a.SnapshotConnections == nil {
return nil
}

if s, ok := a.SnapshotConnections[flowJobName]; ok {
s.signal.CloneComplete <- struct{}{}
s.connector.Close()
}

return nil
Alerter *alerting.Alerter
}

func (a *SnapshotActivity) SetupReplication(
Expand All @@ -47,49 +32,14 @@ func (a *SnapshotActivity) SetupReplication(
return nil, fmt.Errorf("failed to get connector: %w", err)
}

slotSignal := connpostgres.NewSlotSignal()

replicationErr := make(chan error)
defer close(replicationErr)

// This now happens in a goroutine
go func() {
pgConn := conn.(*connpostgres.PostgresConnector)
err = pgConn.SetupReplication(slotSignal, config)
if err != nil {
slog.ErrorContext(ctx, "failed to setup replication", slog.Any("error", err))
replicationErr <- err
return
}
}()

slog.InfoContext(ctx, "waiting for slot to be created...")
var slotInfo connpostgres.SlotCreationResult
select {
case slotInfo = <-slotSignal.SlotCreated:
slog.InfoContext(ctx, fmt.Sprintf("slot '%s' created", slotInfo.SlotName))
case err := <-replicationErr:
pgConn := conn.(*connpostgres.PostgresConnector)
res, err := pgConn.SetupReplication(config)
if err != nil {
slog.ErrorContext(ctx, "failed to setup replication", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to setup replication: %w", err)
}

if slotInfo.Err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, slotInfo.Err)
return nil, fmt.Errorf("slot error: %w", slotInfo.Err)
}

if a.SnapshotConnections == nil {
a.SnapshotConnections = make(map[string]SlotSnapshotSignal)
}

a.SnapshotConnections[config.FlowJobName] = SlotSnapshotSignal{
signal: slotSignal,
snapshotName: slotInfo.SnapshotName,
connector: conn,
}

return &protos.SetupReplicationOutput{
SlotName: slotInfo.SlotName,
SnapshotName: slotInfo.SnapshotName,
}, nil
return res, nil
}
6 changes: 1 addition & 5 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,7 @@ func APIMain(args *APIServerParams) error {
return fmt.Errorf("unable to get catalog connection pool: %w", err)
}

taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID)
if err != nil {
return err
}

taskQueue := shared.GetPeerFlowTaskQueueName()
flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue)

err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace, taskQueue)
Expand Down
18 changes: 0 additions & 18 deletions flow/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,24 +90,6 @@ func main() {
&temporalKeyFlag,
},
},
{
Name: "snapshot-worker",
Action: func(ctx *cli.Context) error {
temporalHostPort := ctx.String("temporal-host-port")
return SnapshotWorkerMain(&SnapshotWorkerOptions{
TemporalHostPort: temporalHostPort,
TemporalNamespace: ctx.String("temporal-namespace"),
TemporalCert: ctx.String("temporal-cert"),
TemporalKey: ctx.String("temporal-key"),
})
},
Flags: []cli.Flag{
temporalHostPortFlag,
temporalNamespaceFlag,
&temporalCertFlag,
&temporalKeyFlag,
},
},
{
Name: "api",
Flags: []cli.Flag{
Expand Down
79 changes: 0 additions & 79 deletions flow/cmd/snapshot_worker.go

This file was deleted.

8 changes: 4 additions & 4 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,7 @@ func WorkerMain(opts *WorkerOptions) error {
slog.Info("Created temporal client")
defer c.Close()

taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID)
if queueErr != nil {
return queueErr
}
taskQueue := shared.GetPeerFlowTaskQueueName()

w := worker.New(c, taskQueue, worker.Options{})
w.RegisterWorkflow(peerflow.CDCFlowWorkflowWithConfig)
Expand All @@ -145,6 +142,9 @@ func WorkerMain(opts *WorkerOptions) error {
CatalogPool: conn,
Alerter: alerter,
})
w.RegisterActivity(&activities.SnapshotActivity{
Alerter: alerter,
})

err = w.Run(worker.InterruptCh())
if err != nil {
Expand Down
30 changes: 5 additions & 25 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package connpostgres

import (
"errors"
"fmt"
"log"
"regexp"
Expand Down Expand Up @@ -289,8 +288,7 @@ func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, er

// createSlotAndPublication creates the replication slot and publication.
func (c *PostgresConnector) createSlotAndPublication(
signal SlotSignal,
s SlotCheckResult,
checkRes SlotCheckResult,
slot string,
publication string,
tableNameMapping map[string]model.NameAndExclude,
Expand All @@ -310,7 +308,7 @@ func (c *PostgresConnector) createSlotAndPublication(
}
tableNameString := strings.Join(srcTableNames, ", ")

if !s.PublicationExists {
if !checkRes.PublicationExists {
// Create the publication to help filter changes only for the given tables
stmt := fmt.Sprintf("CREATE PUBLICATION %s FOR TABLE %s", publication, tableNameString)
_, err := c.pool.Exec(c.ctx, stmt)
Expand All @@ -321,7 +319,7 @@ func (c *PostgresConnector) createSlotAndPublication(
}

// create slot only after we succeeded in creating publication.
if !s.SlotExists {
if !checkRes.SlotExists {
conn, err := c.replPool.Acquire(c.ctx)
if err != nil {
return fmt.Errorf("[slot] error acquiring connection: %w", err)
Expand All @@ -340,33 +338,15 @@ func (c *PostgresConnector) createSlotAndPublication(
Temporary: false,
Mode: pglogrepl.LogicalReplication,
}
res, err := pglogrepl.CreateReplicationSlot(c.ctx, conn.Conn().PgConn(), slot, "pgoutput", opts)

_, err = pglogrepl.CreateReplicationSlot(c.ctx, conn.Conn().PgConn(), slot, "pgoutput", opts)
if err != nil {
return fmt.Errorf("[slot] error creating replication slot: %w", err)
}

c.logger.Info(fmt.Sprintf("Created replication slot '%s'", slot))
slotDetails := SlotCreationResult{
SlotName: res.SlotName,
SnapshotName: res.SnapshotName,
Err: nil,
}
signal.SlotCreated <- slotDetails
c.logger.Info("Waiting for clone to complete")
<-signal.CloneComplete
c.logger.Info("Clone complete")
} else {
c.logger.Info(fmt.Sprintf("Replication slot '%s' already exists", slot))
var e error
if doInitialCopy {
e = errors.New("slot already exists")
}
slotDetails := SlotCreationResult{
SlotName: slot,
SnapshotName: "",
Err: e,
}
signal.SlotCreated <- slotDetails
}

return nil
Expand Down
Loading
Loading