Skip to content

Commit

Permalink
Make SnapshotFlow significantly simpler
Browse files Browse the repository at this point in the history
- remove the requirement of a consistent snapshot connection
- remove snapshot worker and all the things that result from it
  • Loading branch information
iskakaushik committed Dec 28, 2023
1 parent 7dd1f0d commit c4142f6
Show file tree
Hide file tree
Showing 23 changed files with 435 additions and 701 deletions.
15 changes: 0 additions & 15 deletions docker-bake.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ group "default" {
"peerdb",
"flow-worker",
"flow-api",
"flow-snapshot-worker",
"peerdb-ui"
]
}
Expand All @@ -37,20 +36,6 @@ target "flow-api" {
]
}

target "flow-snapshot-worker" {
context = "."
dockerfile = "stacks/flow.Dockerfile"
target = "flow-snapshot-worker"
platforms = [
"linux/amd64",
"linux/arm64",
]
tags = [
"${REGISTRY}/flow-snapshot-worker:${TAG}",
"${REGISTRY}/flow-snapshot-worker:${SHA_SHORT}",
]
}

target "flow-worker" {
context = "."
dockerfile = "stacks/flow.Dockerfile"
Expand Down
12 changes: 0 additions & 12 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,6 @@ services:
temporal-admin-tools:
condition: service_healthy

flow-snapshot-worker:
container_name: flow-snapshot-worker
build:
context: .
dockerfile: stacks/flow.Dockerfile
target: flow-snapshot-worker
environment:
<<: [*catalog-config, *flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy

flow-worker1:
container_name: flow-worker1
build:
Expand Down
9 changes: 0 additions & 9 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,6 @@ services:
temporal-admin-tools:
condition: service_healthy

flow-snapshot-worker:
container_name: flow-snapshot-worker
image: ghcr.io/peerdb-io/flow-snapshot-worker:latest-dev
environment:
<<: [*catalog-config, *flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy

flow-worker1:
container_name: flow-worker1
image: ghcr.io/peerdb-io/flow-worker:latest-dev
Expand Down
6 changes: 0 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ type CheckConnectionResult struct {
NeedsSetupMetadataTables bool
}

type SlotSnapshotSignal struct {
signal connpostgres.SlotSignal
snapshotName string
connector connectors.CDCPullConnector
}

type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
Expand Down
62 changes: 6 additions & 56 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,7 @@ import (
)

type SnapshotActivity struct {
SnapshotConnections map[string]SlotSnapshotSignal
Alerter *alerting.Alerter
}

// closes the slot signal
func (a *SnapshotActivity) CloseSlotKeepAlive(flowJobName string) error {
if a.SnapshotConnections == nil {
return nil
}

if s, ok := a.SnapshotConnections[flowJobName]; ok {
s.signal.CloneComplete <- struct{}{}
s.connector.Close()
}

return nil
Alerter *alerting.Alerter
}

func (a *SnapshotActivity) SetupReplication(
Expand All @@ -47,49 +32,14 @@ func (a *SnapshotActivity) SetupReplication(
return nil, fmt.Errorf("failed to get connector: %w", err)
}

slotSignal := connpostgres.NewSlotSignal()

replicationErr := make(chan error)
defer close(replicationErr)

// This now happens in a goroutine
go func() {
pgConn := conn.(*connpostgres.PostgresConnector)
err = pgConn.SetupReplication(slotSignal, config)
if err != nil {
slog.ErrorContext(ctx, "failed to setup replication", slog.Any("error", err))
replicationErr <- err
return
}
}()

slog.InfoContext(ctx, "waiting for slot to be created...")
var slotInfo connpostgres.SlotCreationResult
select {
case slotInfo = <-slotSignal.SlotCreated:
slog.InfoContext(ctx, fmt.Sprintf("slot '%s' created", slotInfo.SlotName))
case err := <-replicationErr:
pgConn := conn.(*connpostgres.PostgresConnector)
res, err := pgConn.SetupReplication(config)
if err != nil {
slog.ErrorContext(ctx, "failed to setup replication", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to setup replication: %w", err)
}

if slotInfo.Err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, slotInfo.Err)
return nil, fmt.Errorf("slot error: %w", slotInfo.Err)
}

if a.SnapshotConnections == nil {
a.SnapshotConnections = make(map[string]SlotSnapshotSignal)
}

a.SnapshotConnections[config.FlowJobName] = SlotSnapshotSignal{
signal: slotSignal,
snapshotName: slotInfo.SnapshotName,
connector: conn,
}

return &protos.SetupReplicationOutput{
SlotName: slotInfo.SlotName,
SnapshotName: slotInfo.SnapshotName,
}, nil
return res, nil
}
6 changes: 1 addition & 5 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,7 @@ func APIMain(args *APIServerParams) error {
return fmt.Errorf("unable to get catalog connection pool: %w", err)
}

taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID)
if err != nil {
return err
}

taskQueue := shared.GetPeerFlowTaskQueueName()
flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue)

err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace, taskQueue)
Expand Down
18 changes: 0 additions & 18 deletions flow/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,24 +90,6 @@ func main() {
&temporalKeyFlag,
},
},
{
Name: "snapshot-worker",
Action: func(ctx *cli.Context) error {
temporalHostPort := ctx.String("temporal-host-port")
return SnapshotWorkerMain(&SnapshotWorkerOptions{
TemporalHostPort: temporalHostPort,
TemporalNamespace: ctx.String("temporal-namespace"),
TemporalCert: ctx.String("temporal-cert"),
TemporalKey: ctx.String("temporal-key"),
})
},
Flags: []cli.Flag{
temporalHostPortFlag,
temporalNamespaceFlag,
&temporalCertFlag,
&temporalKeyFlag,
},
},
{
Name: "api",
Flags: []cli.Flag{
Expand Down
79 changes: 0 additions & 79 deletions flow/cmd/snapshot_worker.go

This file was deleted.

8 changes: 4 additions & 4 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,7 @@ func WorkerMain(opts *WorkerOptions) error {
slog.Info("Created temporal client")
defer c.Close()

taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID)
if queueErr != nil {
return queueErr
}
taskQueue := shared.GetPeerFlowTaskQueueName()

w := worker.New(c, taskQueue, worker.Options{})
w.RegisterWorkflow(peerflow.CDCFlowWorkflowWithConfig)
Expand All @@ -145,6 +142,9 @@ func WorkerMain(opts *WorkerOptions) error {
CatalogPool: conn,
Alerter: alerter,
})
w.RegisterActivity(&activities.SnapshotActivity{
Alerter: alerter,
})

err = w.Run(worker.InterruptCh())
if err != nil {
Expand Down
30 changes: 5 additions & 25 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package connpostgres

import (
"errors"
"fmt"
"log"
"regexp"
Expand Down Expand Up @@ -289,8 +288,7 @@ func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, er

// createSlotAndPublication creates the replication slot and publication.
func (c *PostgresConnector) createSlotAndPublication(
signal SlotSignal,
s SlotCheckResult,
checkRes SlotCheckResult,
slot string,
publication string,
tableNameMapping map[string]model.NameAndExclude,
Expand All @@ -310,7 +308,7 @@ func (c *PostgresConnector) createSlotAndPublication(
}
tableNameString := strings.Join(srcTableNames, ", ")

if !s.PublicationExists {
if !checkRes.PublicationExists {
// Create the publication to help filter changes only for the given tables
stmt := fmt.Sprintf("CREATE PUBLICATION %s FOR TABLE %s", publication, tableNameString)
_, err := c.pool.Exec(c.ctx, stmt)
Expand All @@ -321,7 +319,7 @@ func (c *PostgresConnector) createSlotAndPublication(
}

// create slot only after we succeeded in creating publication.
if !s.SlotExists {
if !checkRes.SlotExists {
conn, err := c.replPool.Acquire(c.ctx)
if err != nil {
return fmt.Errorf("[slot] error acquiring connection: %w", err)
Expand All @@ -340,33 +338,15 @@ func (c *PostgresConnector) createSlotAndPublication(
Temporary: false,
Mode: pglogrepl.LogicalReplication,
}
res, err := pglogrepl.CreateReplicationSlot(c.ctx, conn.Conn().PgConn(), slot, "pgoutput", opts)

_, err = pglogrepl.CreateReplicationSlot(c.ctx, conn.Conn().PgConn(), slot, "pgoutput", opts)
if err != nil {
return fmt.Errorf("[slot] error creating replication slot: %w", err)
}

c.logger.Info(fmt.Sprintf("Created replication slot '%s'", slot))
slotDetails := SlotCreationResult{
SlotName: res.SlotName,
SnapshotName: res.SnapshotName,
Err: nil,
}
signal.SlotCreated <- slotDetails
c.logger.Info("Waiting for clone to complete")
<-signal.CloneComplete
c.logger.Info("Clone complete")
} else {
c.logger.Info(fmt.Sprintf("Replication slot '%s' already exists", slot))
var e error
if doInitialCopy {
e = errors.New("slot already exists")
}
slotDetails := SlotCreationResult{
SlotName: slot,
SnapshotName: "",
Err: e,
}
signal.SlotCreated <- slotDetails
}

return nil
Expand Down
Loading

0 comments on commit c4142f6

Please sign in to comment.