Skip to content

Commit

Permalink
filter kill heartbeat by task queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Nov 28, 2023
1 parent 0603b49 commit c98f7ea
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 41 deletions.
1 change: 0 additions & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ x-flow-worker-env: &flow-worker-env
PEERDB_TEMPORAL_NAMESPACE: default
# For the below 2 cert and key variables,
# paste as base64 encoded strings.
# use yml multiline syntax with '|'
TEMPORAL_CLIENT_CERT:
TEMPORAL_CLIENT_KEY:
# For GCS, these will be your HMAC keys instead
Expand Down
24 changes: 14 additions & 10 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,15 @@ func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) {
return server, nil
}

func killExistingHeartbeatFlows(ctx context.Context, tc client.Client, namespace string) error {
func killExistingHeartbeatFlows(
ctx context.Context,
tc client.Client,
namespace string,
taskQueue string) error {
listRes, err := tc.ListWorkflow(ctx,
&workflowservice.ListWorkflowExecutionsRequest{
Namespace: namespace,
Query: "WorkflowType = 'HeartbeatFlowWorkflow'",
Query: "WorkflowType = 'HeartbeatFlowWorkflow' AND TaskQueue = '" + taskQueue + "'",
})
if err != nil {
return fmt.Errorf("unable to list workflows: %w", err)
Expand Down Expand Up @@ -94,7 +98,7 @@ func APIMain(args *APIServerParams) error {

certs, err := Base64DecodeCertAndKey(args.TemporalCert, args.TemporalKey)
if err != nil {
return fmt.Errorf("unable to process certificate and key: %w", err)
return fmt.Errorf("unable to base64 decode certificate and key: %w", err)
}

connOptions := client.ConnectionOptions{
Expand All @@ -115,17 +119,17 @@ func APIMain(args *APIServerParams) error {
return fmt.Errorf("unable to get catalog connection pool: %w", err)
}

flowHandler := NewFlowRequestHandler(tc, catalogConn)
defer flowHandler.Close()

err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace)
taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID)
if err != nil {
return fmt.Errorf("unable to kill existing heartbeat flows: %w", err)
return err
}

taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID)
flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue)
defer flowHandler.Close()

err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace, taskQueue)
if err != nil {
return err
return fmt.Errorf("unable to kill existing heartbeat flows: %w", err)
}

workflowID := fmt.Sprintf("heartbeatflow-%s", uuid.New())
Expand Down
33 changes: 10 additions & 23 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ import (

// grpc server implementation
type FlowRequestHandler struct {
temporalClient client.Client
pool *pgxpool.Pool
temporalClient client.Client
pool *pgxpool.Pool
peerflowTaskQueueID string
protos.UnimplementedFlowServiceServer
}

func NewFlowRequestHandler(temporalClient client.Client, pool *pgxpool.Pool) *FlowRequestHandler {
func NewFlowRequestHandler(temporalClient client.Client, pool *pgxpool.Pool, taskQueue string) *FlowRequestHandler {
return &FlowRequestHandler{
temporalClient: temporalClient,
pool: pool,
temporalClient: temporalClient,
pool: pool,
peerflowTaskQueueID: taskQueue,
}
}

Expand Down Expand Up @@ -124,14 +126,9 @@ func (h *FlowRequestHandler) CreateCDCFlow(
ctx context.Context, req *protos.CreateCDCFlowRequest) (*protos.CreateCDCFlowResponse, error) {
cfg := req.ConnectionConfigs
workflowID := fmt.Sprintf("%s-peerflow-%s", cfg.FlowJobName, uuid.New())
taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID)
if queueErr != nil {
return nil, queueErr
}

workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskQueue,
TaskQueue: h.peerflowTaskQueueID,
}

maxBatchSize := int(cfg.MaxBatchSize)
Expand Down Expand Up @@ -229,14 +226,9 @@ func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest) (*protos.CreateQRepFlowResponse, error) {
cfg := req.QrepConfig
workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID)
if queueErr != nil {
return nil, queueErr
}

workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskQueue,
TaskQueue: h.peerflowTaskQueueID,
}
if req.CreateCatalogEntry {
err := h.createQrepJobEntry(ctx, req, workflowID)
Expand Down Expand Up @@ -316,14 +308,9 @@ func (h *FlowRequestHandler) ShutdownFlow(
}

workflowID := fmt.Sprintf("%s-dropflow-%s", req.FlowJobName, uuid.New())
taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID)
if queueErr != nil {
return nil, queueErr
}

workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskQueue,
TaskQueue: h.peerflowTaskQueueID,
}
dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(
ctx, // context
Expand Down
12 changes: 5 additions & 7 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@ const (

const FetchAndChannelSize = 256 * 1024

func GetPeerFlowTaskQueueName(taskQueueId TaskQueueID) (string, error) {
// read deployment uid from env
func GetPeerFlowTaskQueueName(taskQueueID TaskQueueID) (string, error) {
deploymentUID := os.Getenv("PEERDB_DEPLOYMENT_UID")

switch taskQueueId {
switch taskQueueID {
case PeerFlowTaskQueueID:
return deploymentUID + peerFlowTaskQueue, nil
return deploymentUID + "-" + peerFlowTaskQueue, nil
case SnapshotFlowTaskQueueID:
return deploymentUID + snapshotFlowTaskQueue, nil
return deploymentUID + "-" + snapshotFlowTaskQueue, nil
default:
return "", fmt.Errorf("unknown task queue id %d", taskQueueId)
return "", fmt.Errorf("unknown task queue id %d", taskQueueID)
}
}

0 comments on commit c98f7ea

Please sign in to comment.