Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WalHeartBeat across peers #708

Merged
merged 10 commits into from
Nov 24, 2023
96 changes: 79 additions & 17 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package activities

import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
Expand All @@ -12,14 +13,17 @@ import (
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/connectors/utils"
catalog "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)

// CheckConnectionResult is the result of a CheckConnection call.
Expand Down Expand Up @@ -659,31 +663,89 @@ func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.Shutdown
return nil
}

func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context, config *protos.FlowConnectionConfigs) error {
srcConn, err := connectors.GetCDCPullConnector(ctx, config.Source)
func getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) {
var peerOptions sql.RawBytes
catalogPool, catalogErr := catalog.GetCatalogConnectionPoolFromEnv()
if catalogErr != nil {
return nil, fmt.Errorf("error getting catalog connection pool: %w", catalogErr)
}
defer catalogPool.Close()

optionRows, err := catalogPool.Query(ctx,
"SELECT name, options FROM peers WHERE type=$1", protos.DBType_POSTGRES)
if err != nil {
return fmt.Errorf("failed to get destination connector: %w", err)
return nil, err
}
defer connectors.CloseConnector(srcConn)
log.WithFields(log.Fields{"flowName": config.FlowJobName}).Info("sending walheartbeat every 10 minutes")
ticker := time.NewTicker(10 * time.Minute)
defer optionRows.Close()
var peerName string
var postgresPeers []*protos.Peer
for optionRows.Next() {
err := optionRows.Scan(&peerName, &peerOptions)
if err != nil {
return nil, err
}
var pgPeerConfig protos.PostgresConfig
unmarshalErr := proto.Unmarshal(peerOptions, &pgPeerConfig)
if unmarshalErr != nil {
return nil, unmarshalErr
}
postgresPeers = append(postgresPeers, &protos.Peer{
Name: peerName,
Type: protos.DBType_POSTGRES,
Config: &protos.Peer_PostgresConfig{PostgresConfig: &pgPeerConfig},
})
}
return postgresPeers, nil
}

func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
sendTimeout := 10 * time.Minute
ticker := time.NewTicker(sendTimeout)
defer ticker.Stop()

pgPeers, err := getPostgresPeerConfigs(ctx)
if err != nil {
return fmt.Errorf("error getting postgres peer configs: %w", err)
}

activity.RecordHeartbeat(ctx, "sending walheartbeat every 10 minutes")
for {
select {
case <-ctx.Done():
log.WithFields(
log.Fields{
"flowName": config.FlowJobName,
}).Info("context is done, exiting wal heartbeat send loop")
log.Info("context is done, exiting wal heartbeat send loop")
return nil
case <-ticker.C:
err = srcConn.SendWALHeartbeat()
if err != nil {
return fmt.Errorf("failed to send WAL heartbeat: %w", err)
command := `
BEGIN;
DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4);
END;
`
// run above command for each Postgres peer
for _, pgPeer := range pgPeers {
pgConfig := pgPeer.GetPostgresConfig()
peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig))
if peerErr != nil {
return fmt.Errorf("error creating pool for postgres peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, peerErr)
}

_, err := peerConn.Exec(ctx, command)
if err != nil {
log.Warnf("warning: could not send walheartbeat to peer %v: %v", pgPeer.Name, err)
}

closeErr := peerConn.Close(ctx)
if closeErr != nil {
return fmt.Errorf("error closing postgres connection for peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, closeErr)
}
log.Infof("sent walheartbeat to peer %v", pgPeer.Name)
}
log.WithFields(
log.Fields{
"flowName": config.FlowJobName,
}).Info("sent wal heartbeat")
ticker.Stop()
ticker = time.NewTicker(sendTimeout)

}
}
}
Expand Down
45 changes: 45 additions & 0 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ import (

utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/google/uuid"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/reflection"

"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
Expand Down Expand Up @@ -58,6 +62,27 @@ func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) {
return server, nil
}

func killExistingHeartbeatFlows(ctx context.Context, tc client.Client, namespace string) error {
listRes, err := tc.ListWorkflow(ctx,
&workflowservice.ListWorkflowExecutionsRequest{
Namespace: namespace,
Query: "WorkflowType = 'HeartbeatFlowWorkflow'",
})
if err != nil {
return fmt.Errorf("unable to list workflows: %w", err)
}
log.Info("Requesting cancellation of pre-existing heartbeat flows")
for _, workflow := range listRes.Executions {
log.Info("Cancelling workflow: ", workflow.Execution.WorkflowId)
err := tc.CancelWorkflow(ctx,
workflow.Execution.WorkflowId, workflow.Execution.RunId)
if err != nil && err.Error() != "workflow execution already completed" {
return fmt.Errorf("unable to terminate workflow: %w", err)
}
}
return nil
}

func APIMain(args *APIServerParams) error {
ctx := args.ctx
clientOptions := client.Options{
Expand Down Expand Up @@ -91,6 +116,26 @@ func APIMain(args *APIServerParams) error {
flowHandler := NewFlowRequestHandler(tc, catalogConn)
defer flowHandler.Close()

err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace)
if err != nil {
return fmt.Errorf("unable to kill existing heartbeat flows: %w", err)
}

workflowID := fmt.Sprintf("heartbeatflow-%s", uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: shared.PeerFlowTaskQueue,
}

_, err = flowHandler.temporalClient.ExecuteWorkflow(
ctx, // context
workflowOptions, // workflow start options
peerflow.HeartbeatFlowWorkflow, // workflow function
)
if err != nil {
return fmt.Errorf("unable to start heartbeat workflow: %w", err)
}

protos.RegisterFlowServiceServer(grpcServer, flowHandler)
grpc_health_v1.RegisterHealthServer(grpcServer, health.NewServer())
reflection.Register(grpcServer)
Expand Down
1 change: 1 addition & 0 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func WorkerMain(opts *WorkerOptions) error {
w.RegisterWorkflow(peerflow.QRepFlowWorkflow)
w.RegisterWorkflow(peerflow.QRepPartitionWorkflow)
w.RegisterWorkflow(peerflow.DropFlowWorkflow)
w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow)
w.RegisterActivity(&activities.FlowableActivity{
CatalogMirrorMonitor: catalogMirrorMonitor,
})
Expand Down
22 changes: 22 additions & 0 deletions flow/workflows/heartbeat_flow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package peerflow

import (
"time"

"go.temporal.io/sdk/workflow"
)

// HeartbeatFlowWorkflow is the workflow that sets up heartbeat sending.
func HeartbeatFlowWorkflow(ctx workflow.Context) error {

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
})

heartbeatFuture := workflow.ExecuteActivity(ctx, flowable.SendWALHeartbeat)
if err := heartbeatFuture.Get(ctx, nil); err != nil {
return err
}

return nil
}
12 changes: 6 additions & 6 deletions ui/app/mirrors/create/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,24 @@ export const cdcSchema = z.object({
.optional(),
snapshotNumRowsPerPartition: z
.number({
invalid_type_error: 'Snapshow rows per partition must be a number',
invalid_type_error: 'Snapshot rows per partition must be a number',
})
.int()
.min(1, 'Snapshow rows per partition must be a positive integer')
.min(1, 'Snapshot rows per partition must be a positive integer')
.optional(),
snapshotMaxParallelWorkers: z
.number({
invalid_type_error: 'Snapshow max workers must be a number',
invalid_type_error: 'Snapshot max workers must be a number',
})
.int()
.min(1, 'Snapshow max workers must be a positive integer')
.min(1, 'Snapshot max workers must be a positive integer')
.optional(),
snapshotNumTablesInParallel: z
.number({
invalid_type_error: 'Snapshow parallel tables must be a number',
invalid_type_error: 'Snapshot parallel tables must be a number',
})
.int()
.min(1, 'Snapshow parallel tables must be a positive integer')
.min(1, 'Snapshot parallel tables must be a positive integer')
.optional(),
snapshotStagingPath: z
.string({
Expand Down
Loading