Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gRPC method to get the status of a mirror #506

Merged
merged 2 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ x-catalog-config: &catalog-config
x-flow-worker-env: &flow-worker-env
TEMPORAL_HOST_PORT: temporal:7233
# For GCS, these will be your HMAC keys instead
# For more information:
# For more information:
# https://cloud.google.com/storage/docs/authentication/managing-hmackeys
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-}
Expand Down Expand Up @@ -59,7 +59,7 @@ services:
- POSTGRES_PWD=postgres
- POSTGRES_SEEDS=catalog
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
image: temporalio/auto-setup:1.21.3
image: temporalio/auto-setup:1.22.2
ports:
- 7233:7233
volumes:
Expand All @@ -80,7 +80,7 @@ services:
environment:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CLI_ADDRESS=temporal:7233
image: temporalio/admin-tools:1.21.3
image: temporalio/admin-tools:1.22.2
stdin_open: true
tty: true
healthcheck:
Expand Down Expand Up @@ -208,7 +208,6 @@ services:
- multi-metrics
- metrics


peerdb-grafana:
container_name: peerdb-grafana
build:
Expand All @@ -223,7 +222,6 @@ services:
- multi-metrics
- metrics


peerdb-ui:
container_name: peerdb-ui
build:
Expand Down
8 changes: 6 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,12 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
return nil, fmt.Errorf("failed to get partitions from source: %w", err)
}
if len(partitions) > 0 {
err = a.CatalogMirrorMonitor.InitializeQRepRun(ctx, config.FlowJobName,
runUUID, startTime)
err = a.CatalogMirrorMonitor.InitializeQRepRun(
ctx,
config.FlowJobName,
runUUID,
startTime,
)
if err != nil {
return nil, err
}
Expand Down
56 changes: 55 additions & 1 deletion flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,14 @@ func (h *FlowRequestHandler) CreateCDCFlow(
}
}

var err error
err = h.updateFlowConfigInCatalog(cfg)
if err != nil {
return nil, fmt.Errorf("unable to update flow config in catalog: %w", err)
}

state := peerflow.NewCDCFlowState()
_, err := h.temporalClient.ExecuteWorkflow(
_, err = h.temporalClient.ExecuteWorkflow(
ctx, // context
workflowOptions, // workflow start options
peerflow.CDCFlowWorkflowWithConfig, // workflow function
Expand All @@ -135,6 +141,27 @@ func (h *FlowRequestHandler) CreateCDCFlow(
}, nil
}

func (h *FlowRequestHandler) updateFlowConfigInCatalog(
cfg *protos.FlowConnectionConfigs,
) error {
var cfgBytes []byte
var err error

cfgBytes, err = proto.Marshal(cfg)
if err != nil {
return fmt.Errorf("unable to marshal flow config: %w", err)
}

_, err = h.pool.Exec(context.Background(),
"UPDATE flows SET config_proto = $1 WHERE name = $2",
cfgBytes, cfg.FlowJobName)
if err != nil {
return fmt.Errorf("unable to update flow config in catalog: %w", err)
}

return nil
}

func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest) (*protos.CreateQRepFlowResponse, error) {
lastPartition := &protos.QRepPartition{
Expand Down Expand Up @@ -162,11 +189,38 @@ func (h *FlowRequestHandler) CreateQRepFlow(
return nil, fmt.Errorf("unable to start QRepFlow workflow: %w", err)
}

err = h.updateQRepConfigInCatalog(cfg)
if err != nil {
return nil, fmt.Errorf("unable to update qrep config in catalog: %w", err)
}

return &protos.CreateQRepFlowResponse{
WorflowId: workflowID,
}, nil
}

// updateQRepConfigInCatalog updates the qrep config in the catalog
func (h *FlowRequestHandler) updateQRepConfigInCatalog(
cfg *protos.QRepConfig,
) error {
var cfgBytes []byte
var err error

cfgBytes, err = proto.Marshal(cfg)
if err != nil {
return fmt.Errorf("unable to marshal qrep config: %w", err)
}

_, err = h.pool.Exec(context.Background(),
"UPDATE flows SET config_proto = $1 WHERE name = $2",
cfgBytes, cfg.FlowJobName)
if err != nil {
return fmt.Errorf("unable to update qrep config in catalog: %w", err)
}

return nil
}

func (h *FlowRequestHandler) ShutdownFlow(
ctx context.Context,
req *protos.ShutdownRequest,
Expand Down
256 changes: 256 additions & 0 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package main

import (
"context"
"database/sql"
"fmt"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/jackc/pgx/v5/pgtype"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

func (h *FlowRequestHandler) MirrorStatus(
ctx context.Context,
req *protos.MirrorStatusRequest,
) (*protos.MirrorStatusResponse, error) {
cdcFlow, err := h.isCDCFlow(ctx, req.FlowJobName)
if err != nil {
return &protos.MirrorStatusResponse{
ErrorMessage: fmt.Sprintf("unable to query flow: %s", err.Error()),
}, nil
}

if cdcFlow {
cdcStatus, err := h.CDCFlowStatus(ctx, req)
if err != nil {
return &protos.MirrorStatusResponse{
ErrorMessage: fmt.Sprintf("unable to query flow: %s", err.Error()),
}, nil
}

return &protos.MirrorStatusResponse{
FlowJobName: req.FlowJobName,
Status: &protos.MirrorStatusResponse_CdcStatus{
CdcStatus: cdcStatus,
},
}, nil
} else {
qrepStatus, err := h.QRepFlowStatus(ctx, req)
if err != nil {
return &protos.MirrorStatusResponse{
ErrorMessage: fmt.Sprintf("unable to query flow: %s", err.Error()),
}, nil
}

return &protos.MirrorStatusResponse{
FlowJobName: req.FlowJobName,
Status: &protos.MirrorStatusResponse_QrepStatus{
QrepStatus: qrepStatus,
},
}, nil
}
}

func (h *FlowRequestHandler) CDCFlowStatus(
ctx context.Context,
req *protos.MirrorStatusRequest,
) (*protos.CDCMirrorStatus, error) {
config, err := h.getFlowConfigFromCatalog(req.FlowJobName)
if err != nil {
return nil, err
}

var initialCopyStatus *protos.SnapshotStatus
if config.DoInitialCopy {
cloneJobNames, err := h.getCloneTableFlowNames(ctx, req.FlowJobName)
if err != nil {
return nil, err
}

cloneStatuses := []*protos.QRepMirrorStatus{}
for _, cloneJobName := range cloneJobNames {
cloneStatus, err := h.QRepFlowStatus(ctx, &protos.MirrorStatusRequest{
FlowJobName: cloneJobName,
})
if err != nil {
return nil, err
}
cloneStatuses = append(cloneStatuses, cloneStatus)
}

initialCopyStatus = &protos.SnapshotStatus{
Clones: cloneStatuses,
}
}

return &protos.CDCMirrorStatus{
Config: config,
SnapshotStatus: initialCopyStatus,
}, nil
}

func (h *FlowRequestHandler) QRepFlowStatus(
ctx context.Context,
req *protos.MirrorStatusRequest,
) (*protos.QRepMirrorStatus, error) {
parts, err := h.getPartitionUUIDs(ctx, req.FlowJobName)
if err != nil {
return nil, err
}

partitionStatuses := []*protos.PartitionStatus{}
for _, part := range parts {
partitionStatus, err := h.getPartitionStatus(ctx, part)
if err != nil {
return nil, err
}

partitionStatuses = append(partitionStatuses, partitionStatus)
}

return &protos.QRepMirrorStatus{
// The clone table jobs that are children of the CDC snapshot flow
// do not have a config entry, so allow this to be nil.
Config: h.getQRepConfigFromCatalog(req.FlowJobName),
Partitions: partitionStatuses,
}, nil
}

// getPartitionStatus returns the status of a partition uuid.
func (h *FlowRequestHandler) getPartitionStatus(
ctx context.Context,
partitionUUID string,
) (*protos.PartitionStatus, error) {
partitionStatus := &protos.PartitionStatus{
PartitionId: partitionUUID,
}

var startTime pgtype.Timestamp
var endTime pgtype.Timestamp
var numRows pgtype.Int4

q := "SELECT start_time, end_time, rows_in_partition FROM peerdb_stats.qrep_partitions WHERE partition_uuid = $1"
err := h.pool.QueryRow(ctx, q, partitionUUID).Scan(&startTime, &endTime, &numRows)
if err != nil {
return nil, fmt.Errorf("unable to query qrep partition - %s: %w", partitionUUID, err)
}

if startTime.Valid {
partitionStatus.StartTime = timestamppb.New(startTime.Time)
}

if endTime.Valid {
partitionStatus.EndTime = timestamppb.New(endTime.Time)
}

if numRows.Valid {
partitionStatus.NumRows = numRows.Int32
}

return partitionStatus, nil
}

func (h *FlowRequestHandler) getPartitionUUIDs(
ctx context.Context,
flowJobName string,
) ([]string, error) {
rows, err := h.pool.Query(ctx,
"SELECT partition_uuid FROM peerdb_stats.qrep_partitions WHERE flow_name = $1", flowJobName)
if err != nil {
return nil, fmt.Errorf("unable to query qrep partitions: %w", err)
}
defer rows.Close()

partitionUUIDs := []string{}
for rows.Next() {
var partitionUUID pgtype.Text
if err := rows.Scan(&partitionUUID); err != nil {
return nil, fmt.Errorf("unable to scan partition row: %w", err)
}
partitionUUIDs = append(partitionUUIDs, partitionUUID.String)
}

return partitionUUIDs, nil
}

func (h *FlowRequestHandler) getFlowConfigFromCatalog(
flowJobName string,
) (*protos.FlowConnectionConfigs, error) {
var configBytes sql.RawBytes
var err error
var config protos.FlowConnectionConfigs

err = h.pool.QueryRow(context.Background(),
"SELECT config_proto FROM flows WHERE name = $1", flowJobName).Scan(&configBytes)
if err != nil {
return nil, fmt.Errorf("unable to query flow config from catalog: %w", err)
}

err = proto.Unmarshal(configBytes, &config)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal flow config: %w", err)
}

return &config, nil
}

func (h *FlowRequestHandler) getQRepConfigFromCatalog(
flowJobName string,
) *protos.QRepConfig {
var configBytes []byte
var err error
var config protos.QRepConfig

err = h.pool.QueryRow(context.Background(),
"SELECT config_proto FROM flows WHERE name = $1", flowJobName).Scan(&configBytes)
if err != nil {
logrus.Warnf("unable to query qrep config from catalog: %s", err.Error())
return nil
}

err = proto.Unmarshal(configBytes, &config)
if err != nil {
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why we don't want to return an error here like in the above function?

if err != nil {
    return nil, fmt.Errorf("unable to unmarshal qrep config: %w", err)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case where initial load is done for cdc we do not add the entry to flows table

}

return &config
}

func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string) (bool, error) {
var query pgtype.Text
err := h.pool.QueryRow(ctx, "SELECT query_string FROM flows WHERE name = $1", flowJobName).Scan(&query)
if err != nil {
return false, fmt.Errorf("unable to query flow: %w", err)
}

if !query.Valid || len(query.String) == 0 {
return true, nil
}

return false, nil
}

func (h *FlowRequestHandler) getCloneTableFlowNames(ctx context.Context, flowJobName string) ([]string, error) {
q := "SELECT flow_name FROM peerdb_stats.qrep_runs WHERE flow_name ILIKE $1"
rows, err := h.pool.Query(ctx, q, "clone_"+flowJobName+"_%")
if err != nil {
return nil, fmt.Errorf("unable to getCloneTableFlowNames: %w", err)
}
defer rows.Close()

flowNames := []string{}
for rows.Next() {
var name pgtype.Text
if err := rows.Scan(&name); err != nil {
return nil, fmt.Errorf("unable to scan flow row: %w", err)
}
if name.Valid {
flowNames = append(flowNames, name.String)
}
}

return flowNames, nil
}
Loading
Loading