Skip to content

Commit

Permalink
Merge branch 'main' into update-dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 1, 2023
2 parents f282176 + 505e524 commit dc098a9
Show file tree
Hide file tree
Showing 29 changed files with 600 additions and 320 deletions.
6 changes: 4 additions & 2 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ x-flow-worker-env: &flow-worker-env
PEERDB_TEMPORAL_NAMESPACE: default
# For the below 2 cert and key variables,
# paste as base64 encoded strings.
# use yml multiline syntax with '|'
TEMPORAL_CLIENT_CERT:
TEMPORAL_CLIENT_KEY:
# For GCS, these will be your HMAC keys instead
Expand Down Expand Up @@ -57,7 +56,7 @@ services:
catalog:
condition: service_healthy
environment:
- DB=postgresql
- DB=postgres12
- DB_PORT=5432
- POSTGRES_USER=postgres
- POSTGRES_PWD=postgres
Expand Down Expand Up @@ -87,11 +86,14 @@ services:
image: temporalio/admin-tools:1.22
stdin_open: true
tty: true
entrypoint: ["bash", "/etc/temporal/entrypoint.sh"]
healthcheck:
test: ["CMD", "tctl", "workflow", "list"]
interval: 1s
timeout: 5s
retries: 30
volumes:
- ./scripts/mirror-name-search.sh:/etc/temporal/entrypoint.sh

temporal-ui:
container_name: temporal-ui
Expand Down
5 changes: 4 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ services:
catalog:
condition: service_healthy
environment:
- DB=postgresql
- DB=postgres12
- DB_PORT=5432
- POSTGRES_USER=postgres
- POSTGRES_PWD=postgres
Expand All @@ -73,11 +73,14 @@ services:
image: temporalio/admin-tools:1.22
stdin_open: true
tty: true
entrypoint: ["bash", "/etc/temporal/entrypoint.sh"]
healthcheck:
test: ["CMD", "tctl", "workflow", "list"]
interval: 1s
timeout: 5s
retries: 30
volumes:
- ./scripts/mirror-name-search.sh:/etc/temporal/entrypoint.sh

temporal-ui:
container_name: temporal-ui
Expand Down
21 changes: 15 additions & 6 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,15 @@ func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) {
return server, nil
}

func killExistingHeartbeatFlows(ctx context.Context, tc client.Client, namespace string) error {
func killExistingHeartbeatFlows(
ctx context.Context,
tc client.Client,
namespace string,
taskQueue string) error {
listRes, err := tc.ListWorkflow(ctx,
&workflowservice.ListWorkflowExecutionsRequest{
Namespace: namespace,
Query: "WorkflowType = 'HeartbeatFlowWorkflow'",
Query: "WorkflowType = 'HeartbeatFlowWorkflow' AND TaskQueue = '" + taskQueue + "'",
})
if err != nil {
return fmt.Errorf("unable to list workflows: %w", err)
Expand Down Expand Up @@ -94,7 +98,7 @@ func APIMain(args *APIServerParams) error {

certs, err := Base64DecodeCertAndKey(args.TemporalCert, args.TemporalKey)
if err != nil {
return fmt.Errorf("unable to process certificate and key: %w", err)
return fmt.Errorf("unable to base64 decode certificate and key: %w", err)
}

connOptions := client.ConnectionOptions{
Expand All @@ -115,18 +119,23 @@ func APIMain(args *APIServerParams) error {
return fmt.Errorf("unable to get catalog connection pool: %w", err)
}

flowHandler := NewFlowRequestHandler(tc, catalogConn)
taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID)
if err != nil {
return err
}

flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue)
defer flowHandler.Close()

err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace)
err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace, taskQueue)
if err != nil {
return fmt.Errorf("unable to kill existing heartbeat flows: %w", err)
}

workflowID := fmt.Sprintf("heartbeatflow-%s", uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: shared.PeerFlowTaskQueue,
TaskQueue: taskQueue,
}

_, err = flowHandler.temporalClient.ExecuteWorkflow(
Expand Down
31 changes: 23 additions & 8 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ import (

// grpc server implementation
type FlowRequestHandler struct {
temporalClient client.Client
pool *pgxpool.Pool
temporalClient client.Client
pool *pgxpool.Pool
peerflowTaskQueueID string
protos.UnimplementedFlowServiceServer
}

func NewFlowRequestHandler(temporalClient client.Client, pool *pgxpool.Pool) *FlowRequestHandler {
func NewFlowRequestHandler(temporalClient client.Client, pool *pgxpool.Pool, taskQueue string) *FlowRequestHandler {
return &FlowRequestHandler{
temporalClient: temporalClient,
pool: pool,
temporalClient: temporalClient,
pool: pool,
peerflowTaskQueueID: taskQueue,
}
}

Expand Down Expand Up @@ -126,7 +128,10 @@ func (h *FlowRequestHandler) CreateCDCFlow(
workflowID := fmt.Sprintf("%s-peerflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: shared.PeerFlowTaskQueue,
TaskQueue: h.peerflowTaskQueueID,
SearchAttributes: map[string]interface{}{
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
},
}

maxBatchSize := int(cfg.MaxBatchSize)
Expand All @@ -137,6 +142,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(

limits := &peerflow.CDCFlowLimits{
TotalSyncFlows: 0,
ExitAfterRecords: -1,
TotalNormalizeFlows: 0,
MaxBatchSize: maxBatchSize,
}
Expand All @@ -158,13 +164,15 @@ func (h *FlowRequestHandler) CreateCDCFlow(
if req.CreateCatalogEntry {
err := h.createCdcJobEntry(ctx, req, workflowID)
if err != nil {
log.Errorf("unable to create flow job entry: %v", err)
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}
}

var err error
err = h.updateFlowConfigInCatalog(cfg)
if err != nil {
log.Errorf("unable to update flow config in catalog: %v", err)
return nil, fmt.Errorf("unable to update flow config in catalog: %w", err)
}

Expand All @@ -178,6 +186,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
state, // workflow state
)
if err != nil {
log.Errorf("unable to start PeerFlow workflow: %v", err)
return nil, fmt.Errorf("unable to start PeerFlow workflow: %w", err)
}

Expand Down Expand Up @@ -226,7 +235,10 @@ func (h *FlowRequestHandler) CreateQRepFlow(
workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: shared.PeerFlowTaskQueue,
TaskQueue: h.peerflowTaskQueueID,
SearchAttributes: map[string]interface{}{
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
},
}
if req.CreateCatalogEntry {
err := h.createQrepJobEntry(ctx, req, workflowID)
Expand Down Expand Up @@ -308,7 +320,10 @@ func (h *FlowRequestHandler) ShutdownFlow(
workflowID := fmt.Sprintf("%s-dropflow-%s", req.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: shared.PeerFlowTaskQueue,
TaskQueue: h.peerflowTaskQueueID,
SearchAttributes: map[string]interface{}{
shared.MirrorNameSearchAttribute: req.FlowJobName,
},
}
dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(
ctx, // context
Expand Down
4 changes: 2 additions & 2 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (h *FlowRequestHandler) GetSchemas(

defer peerPool.Close()
rows, err := peerPool.Query(ctx, "SELECT schema_name"+
" FROM information_schema.schemata;")
" FROM information_schema.schemata WHERE schema_name !~ '^pg_' AND schema_name <> 'information_schema';")
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (h *FlowRequestHandler) GetAllTables(

defer peerPool.Close()
rows, err := peerPool.Query(ctx, "SELECT table_schema || '.' || table_name AS schema_table "+
"FROM information_schema.tables;")
"FROM information_schema.tables WHERE table_schema !~ '^pg_' AND table_schema <> 'information_schema'")
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}
Expand Down
7 changes: 6 additions & 1 deletion flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
}
defer c.Close()

w := worker.New(c, shared.SnapshotFlowTaskQueue, worker.Options{
taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueueID)
if queueErr != nil {
return queueErr
}

w := worker.New(c, taskQueue, worker.Options{
EnableSessionWorker: true,
})
w.RegisterWorkflow(peerflow.SnapshotFlowWorkflow)
Expand Down
7 changes: 6 additions & 1 deletion flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,12 @@ func WorkerMain(opts *WorkerOptions) error {
log.Info("Created temporal client")
defer c.Close()

w := worker.New(c, shared.PeerFlowTaskQueue, worker.Options{})
taskQueue, queueErr := shared.GetPeerFlowTaskQueueName(shared.PeerFlowTaskQueueID)
if queueErr != nil {
return queueErr
}

w := worker.New(c, taskQueue, worker.Options{})
w.RegisterWorkflow(peerflow.CDCFlowWorkflowWithConfig)
w.RegisterWorkflow(peerflow.SyncFlowWorkflow)
w.RegisterWorkflow(peerflow.SetupFlowWorkflow)
Expand Down
19 changes: 10 additions & 9 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,6 @@ func (c *EventHubConnector) processBatch(
}

func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf("syncing records to eventhub with"+
" push parallelism %d and push batch size %d",
req.PushParallelism, req.PushBatchSize)
})
defer func() {
shutdown <- true
}()

maxParallelism := req.PushParallelism
if maxParallelism <= 0 {
maxParallelism = 10
Expand All @@ -229,6 +220,16 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
batch := req.Records
var numRecords uint32

shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf(
"processed %d records for flow %s",
numRecords, req.FlowJobName,
)
})
defer func() {
shutdown <- true
}()

// if env var PEERDB_BETA_EVENTHUB_PUSH_ASYNC=true
// we kick off processBatch in a goroutine and return immediately.
// otherwise, we block until processBatch is done.
Expand Down
Loading

0 comments on commit dc098a9

Please sign in to comment.