Skip to content

Commit

Permalink
Add gRPC method to get the status of a mirror
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Oct 12, 2023
1 parent 4d42a0e commit ad87da5
Show file tree
Hide file tree
Showing 14 changed files with 3,026 additions and 156 deletions.
8 changes: 3 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ x-catalog-config: &catalog-config
x-flow-worker-env: &flow-worker-env
TEMPORAL_HOST_PORT: temporal:7233
# For GCS, these will be your HMAC keys instead
# For more information:
# For more information:
# https://cloud.google.com/storage/docs/authentication/managing-hmackeys
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-}
Expand Down Expand Up @@ -59,7 +59,7 @@ services:
- POSTGRES_PWD=postgres
- POSTGRES_SEEDS=catalog
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
image: temporalio/auto-setup:1.21.3
image: temporalio/auto-setup:1.22.2
ports:
- 7233:7233
volumes:
Expand All @@ -80,7 +80,7 @@ services:
environment:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CLI_ADDRESS=temporal:7233
image: temporalio/admin-tools:1.21.3
image: temporalio/admin-tools:1.22.2
stdin_open: true
tty: true
healthcheck:
Expand Down Expand Up @@ -208,7 +208,6 @@ services:
- multi-metrics
- metrics


peerdb-grafana:
container_name: peerdb-grafana
build:
Expand All @@ -223,7 +222,6 @@ services:
- multi-metrics
- metrics


peerdb-ui:
container_name: peerdb-ui
build:
Expand Down
8 changes: 6 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,12 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
return nil, fmt.Errorf("failed to get partitions from source: %w", err)
}
if len(partitions) > 0 {
err = a.CatalogMirrorMonitor.InitializeQRepRun(ctx, config.FlowJobName,
runUUID, startTime)
err = a.CatalogMirrorMonitor.InitializeQRepRun(
ctx,
config.FlowJobName,
runUUID,
startTime,
)
if err != nil {
return nil, err
}
Expand Down
56 changes: 55 additions & 1 deletion flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,14 @@ func (h *FlowRequestHandler) CreateCDCFlow(
}
}

var err error
err = h.updateFlowConfigInCatalog(cfg)
if err != nil {
return nil, fmt.Errorf("unable to update flow config in catalog: %w", err)
}

state := peerflow.NewCDCFlowState()
_, err := h.temporalClient.ExecuteWorkflow(
_, err = h.temporalClient.ExecuteWorkflow(
ctx, // context
workflowOptions, // workflow start options
peerflow.CDCFlowWorkflowWithConfig, // workflow function
Expand All @@ -135,6 +141,27 @@ func (h *FlowRequestHandler) CreateCDCFlow(
}, nil
}

func (h *FlowRequestHandler) updateFlowConfigInCatalog(
cfg *protos.FlowConnectionConfigs,
) error {
var cfgBytes []byte
var err error

cfgBytes, err = proto.Marshal(cfg)
if err != nil {
return fmt.Errorf("unable to marshal flow config: %w", err)
}

_, err = h.pool.Exec(context.Background(),
"UPDATE flows SET config_proto = $1 WHERE name = $2",
cfgBytes, cfg.FlowJobName)
if err != nil {
return fmt.Errorf("unable to update flow config in catalog: %w", err)
}

return nil
}

func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest) (*protos.CreateQRepFlowResponse, error) {
lastPartition := &protos.QRepPartition{
Expand Down Expand Up @@ -162,11 +189,38 @@ func (h *FlowRequestHandler) CreateQRepFlow(
return nil, fmt.Errorf("unable to start QRepFlow workflow: %w", err)
}

err = h.updateQRepConfigInCatalog(cfg)
if err != nil {
return nil, fmt.Errorf("unable to update qrep config in catalog: %w", err)
}

return &protos.CreateQRepFlowResponse{
WorflowId: workflowID,
}, nil
}

// updateQRepConfigInCatalog updates the qrep config in the catalog
func (h *FlowRequestHandler) updateQRepConfigInCatalog(
cfg *protos.QRepConfig,
) error {
var cfgBytes []byte
var err error

cfgBytes, err = proto.Marshal(cfg)
if err != nil {
return fmt.Errorf("unable to marshal qrep config: %w", err)
}

_, err = h.pool.Exec(context.Background(),
"UPDATE flows SET config_proto = $1 WHERE name = $2",
cfgBytes, cfg.FlowJobName)
if err != nil {
return fmt.Errorf("unable to update qrep config in catalog: %w", err)
}

return nil
}

func (h *FlowRequestHandler) ShutdownFlow(
ctx context.Context,
req *protos.ShutdownRequest,
Expand Down
256 changes: 256 additions & 0 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package main

import (
"context"
"database/sql"
"fmt"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/jackc/pgx/v5/pgtype"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

func (h *FlowRequestHandler) MirrorStatus(
ctx context.Context,
req *protos.MirrorStatusRequest,
) (*protos.MirrorStatusResponse, error) {
cdcFlow, err := h.isCDCFlow(ctx, req.FlowJobName)
if err != nil {
return &protos.MirrorStatusResponse{
ErrorMessage: fmt.Sprintf("unable to query flow: %s", err.Error()),
}, nil
}

if cdcFlow {
cdcStatus, err := h.CDCFlowStatus(ctx, req)
if err != nil {
return &protos.MirrorStatusResponse{
ErrorMessage: fmt.Sprintf("unable to query flow: %s", err.Error()),
}, nil
}

return &protos.MirrorStatusResponse{
FlowJobName: req.FlowJobName,
Status: &protos.MirrorStatusResponse_CdcStatus{
CdcStatus: cdcStatus,
},
}, nil
} else {
qrepStatus, err := h.QRepFlowStatus(ctx, req)
if err != nil {
return &protos.MirrorStatusResponse{
ErrorMessage: fmt.Sprintf("unable to query flow: %s", err.Error()),
}, nil
}

return &protos.MirrorStatusResponse{
FlowJobName: req.FlowJobName,
Status: &protos.MirrorStatusResponse_QrepStatus{
QrepStatus: qrepStatus,
},
}, nil
}
}

func (h *FlowRequestHandler) CDCFlowStatus(
ctx context.Context,
req *protos.MirrorStatusRequest,
) (*protos.CDCMirrorStatus, error) {
config, err := h.getFlowConfigFromCatalog(req.FlowJobName)
if err != nil {
return nil, err
}

var initialCopyStatus *protos.SnapshotStatus
if config.DoInitialCopy {
cloneJobNames, err := h.getCloneTableFlowNames(ctx, req.FlowJobName)
if err != nil {
return nil, err
}

cloneStatuses := []*protos.QRepMirrorStatus{}
for _, cloneJobName := range cloneJobNames {
cloneStatus, err := h.QRepFlowStatus(ctx, &protos.MirrorStatusRequest{
FlowJobName: cloneJobName,
})
if err != nil {
return nil, err
}
cloneStatuses = append(cloneStatuses, cloneStatus)
}

initialCopyStatus = &protos.SnapshotStatus{
Clones: cloneStatuses,
}
}

return &protos.CDCMirrorStatus{
Config: config,
SnapshotStatus: initialCopyStatus,
}, nil
}

func (h *FlowRequestHandler) QRepFlowStatus(
ctx context.Context,
req *protos.MirrorStatusRequest,
) (*protos.QRepMirrorStatus, error) {
parts, err := h.getPartitionUUIDs(ctx, req.FlowJobName)
if err != nil {
return nil, err
}

partitionStatuses := []*protos.PartitionStatus{}
for _, part := range parts {
partitionStatus, err := h.getPartitionStatus(ctx, part)
if err != nil {
return nil, err
}

partitionStatuses = append(partitionStatuses, partitionStatus)
}

return &protos.QRepMirrorStatus{
// The clone table jobs that are children of the CDC snapshot flow
// do not have a config entry, so allow this to be nil.
Config: h.getQRepConfigFromCatalog(req.FlowJobName),
Partitions: partitionStatuses,
}, nil
}

// getPartitionStatus returns the status of a partition uuid.
func (h *FlowRequestHandler) getPartitionStatus(
ctx context.Context,
partitionUUID string,
) (*protos.PartitionStatus, error) {
partitionStatus := &protos.PartitionStatus{
PartitionId: partitionUUID,
}

var startTime pgtype.Timestamp
var endTime pgtype.Timestamp
var numRows pgtype.Int4

q := "SELECT start_time, end_time, rows_in_partition FROM peerdb_stats.qrep_partitions WHERE partition_uuid = $1"
err := h.pool.QueryRow(ctx, q, partitionUUID).Scan(&startTime, &endTime, &numRows)
if err != nil {
return nil, fmt.Errorf("unable to query qrep partition - %s: %w", partitionUUID, err)
}

if startTime.Valid {
partitionStatus.StartTime = timestamppb.New(startTime.Time)
}

if endTime.Valid {
partitionStatus.EndTime = timestamppb.New(endTime.Time)
}

if numRows.Valid {
partitionStatus.NumRows = numRows.Int32
}

return partitionStatus, nil
}

func (h *FlowRequestHandler) getPartitionUUIDs(
ctx context.Context,
flowJobName string,
) ([]string, error) {
rows, err := h.pool.Query(ctx,
"SELECT partition_uuid FROM peerdb_stats.qrep_partitions WHERE flow_name = $1", flowJobName)
if err != nil {
return nil, fmt.Errorf("unable to query qrep partitions: %w", err)
}
defer rows.Close()

partitionUUIDs := []string{}
for rows.Next() {
var partitionUUID pgtype.Text
if err := rows.Scan(&partitionUUID); err != nil {
return nil, fmt.Errorf("unable to scan partition row: %w", err)
}
partitionUUIDs = append(partitionUUIDs, partitionUUID.String)
}

return partitionUUIDs, nil
}

func (h *FlowRequestHandler) getFlowConfigFromCatalog(
flowJobName string,
) (*protos.FlowConnectionConfigs, error) {
var configBytes sql.RawBytes
var err error
var config protos.FlowConnectionConfigs

err = h.pool.QueryRow(context.Background(),
"SELECT config_proto FROM flows WHERE name = $1", flowJobName).Scan(&configBytes)
if err != nil {
return nil, fmt.Errorf("unable to query flow config from catalog: %w", err)
}

err = proto.Unmarshal(configBytes, &config)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal flow config: %w", err)
}

return &config, nil
}

func (h *FlowRequestHandler) getQRepConfigFromCatalog(
flowJobName string,
) *protos.QRepConfig {
var configBytes []byte
var err error
var config protos.QRepConfig

err = h.pool.QueryRow(context.Background(),
"SELECT config_proto FROM flows WHERE name = $1", flowJobName).Scan(&configBytes)
if err != nil {
logrus.Warnf("unable to query qrep config from catalog: %s", err.Error())
return nil
}

err = proto.Unmarshal(configBytes, &config)
if err != nil {
return nil
}

return &config
}

func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string) (bool, error) {
var query pgtype.Text
err := h.pool.QueryRow(ctx, "SELECT query_string FROM flows WHERE name = $1", flowJobName).Scan(&query)
if err != nil {
return false, fmt.Errorf("unable to query flow: %w", err)
}

if !query.Valid || len(query.String) == 0 {
return true, nil
}

return false, nil
}

func (h *FlowRequestHandler) getCloneTableFlowNames(ctx context.Context, flowJobName string) ([]string, error) {
q := "SELECT flow_name FROM peerdb_stats.qrep_runs WHERE flow_name ILIKE $1"
rows, err := h.pool.Query(ctx, q, "clone_"+flowJobName+"_%")
if err != nil {
return nil, fmt.Errorf("unable to getCloneTableFlowNames: %w", err)
}
defer rows.Close()

flowNames := []string{}
for rows.Next() {
var name pgtype.Text
if err := rows.Scan(&name); err != nil {
return nil, fmt.Errorf("unable to scan flow row: %w", err)
}
if name.Valid {
flowNames = append(flowNames, name.String)
}
}

return flowNames, nil
}
Loading

0 comments on commit ad87da5

Please sign in to comment.