Skip to content

Commit

Permalink
Close replication connection faster (#1549)
Browse files Browse the repository at this point in the history
Kevin noticed MaintainPull was outliving parent workflow for some time,
temporal contexts only detect cancelation with RecordHeartbeat,
in order to reduce shutdown latency implement UnmaintainPull
  • Loading branch information
serprex authored Mar 27, 2024
1 parent 857bc5d commit 00ac0c2
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 7 deletions.
29 changes: 25 additions & 4 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ type CheckConnectionResult struct {
NeedsSetupMetadataTables bool
}

type CdcCacheEntry struct {
connector connectors.CDCPullConnector
done chan struct{}
}

type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
CdcCache map[string]connectors.CDCPullConnector
CdcCache map[string]CdcCacheEntry
CdcCacheRw sync.RWMutex
}

Expand Down Expand Up @@ -217,8 +222,12 @@ func (a *FlowableActivity) MaintainPull(
return err
}

done := make(chan struct{})
a.CdcCacheRw.Lock()
a.CdcCache[sessionID] = srcConn
a.CdcCache[sessionID] = CdcCacheEntry{
connector: srcConn,
done: done,
}
a.CdcCacheRw.Unlock()

ticker := time.NewTicker(15 * time.Second)
Expand All @@ -234,6 +243,8 @@ func (a *FlowableActivity) MaintainPull(
a.CdcCacheRw.Unlock()
return temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", err)
}
case <-done:
return nil
case <-ctx.Done():
a.CdcCacheRw.Lock()
delete(a.CdcCache, sessionID)
Expand All @@ -243,15 +254,25 @@ func (a *FlowableActivity) MaintainPull(
}
}

func (a *FlowableActivity) UnmaintainPull(ctx context.Context, sessionID string) error {
a.CdcCacheRw.Lock()
if entry, ok := a.CdcCache[sessionID]; ok {
close(entry.done)
delete(a.CdcCache, sessionID)
}
a.CdcCacheRw.Unlock()
return nil
}

func (a *FlowableActivity) waitForCdcCache(ctx context.Context, sessionID string) (connectors.CDCPullConnector, error) {
logger := activity.GetLogger(ctx)
attempt := 0
for {
a.CdcCacheRw.RLock()
conn, ok := a.CdcCache[sessionID]
entry, ok := a.CdcCache[sessionID]
a.CdcCacheRw.RUnlock()
if ok {
return conn, nil
return entry.connector, nil
}
activity.RecordHeartbeat(ctx, "wait another second for source connector")
attempt += 1
Expand Down
3 changes: 1 addition & 2 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/PeerDB-io/peer-flow/activities"
"github.com/PeerDB-io/peer-flow/alerting"
"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -132,7 +131,7 @@ func WorkerMain(opts *WorkerOptions) (client.Client, worker.Worker, error) {
w.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerting.NewAlerter(context.Background(), conn),
CdcCache: make(map[string]connectors.CDCPullConnector),
CdcCache: make(map[string]activities.CdcCacheEntry),
})

return c, w, nil
Expand Down
22 changes: 21 additions & 1 deletion flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
"golang.org/x/exp/maps"

Expand Down Expand Up @@ -212,10 +213,29 @@ func SyncFlowWorkflow(
break
}
}

if err := ctx.Err(); err != nil {
logger.Info("sync canceled", slog.Any("error", err))
return err
} else if stop {
}

if fMaintain != nil {
unmaintainCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1},
StartToCloseTimeout: time.Minute,
HeartbeatTimeout: time.Minute,
WaitForCancellation: true,
})
if err := workflow.ExecuteActivity(
unmaintainCtx,
flowable.UnmaintainPull,
sessionID,
).Get(unmaintainCtx, nil); err != nil {
logger.Warn("UnmaintainPull failed", slog.Any("error", err))
}
}

if stop {
return nil
}
return workflow.NewContinueAsNewError(ctx, SyncFlowWorkflow, config, options)
Expand Down

0 comments on commit 00ac0c2

Please sign in to comment.