Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into bq-multi-merge-withou…
Browse files Browse the repository at this point in the history
…t-tx
  • Loading branch information
iskakaushik committed Jan 11, 2024
2 parents 2312916 + 05c32fa commit 7dcfb23
Show file tree
Hide file tree
Showing 76 changed files with 2,060 additions and 1,255 deletions.
2 changes: 2 additions & 0 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ services:
DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres
PEERDB_FLOW_SERVER_HTTP: http://flow_api:8113
PEERDB_PASSWORD:
NEXTAUTH_SECRET: __changeme__
NEXTAUTH_URL: http://localhost:3000
depends_on:
- flow-api

Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ services:
<<: *catalog-config
DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres
PEERDB_FLOW_SERVER_HTTP: http://flow_api:8113
NEXTAUTH_SECRET: __changeme__
NEXTAUTH_URL: http://localhost:3000
depends_on:
- flow-api

Expand Down
1 change: 1 addition & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (a *FlowableActivity) CheckConnection(
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowName, err)
return nil, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(dstConn)
Expand Down
16 changes: 13 additions & 3 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,22 @@ func (a *SnapshotActivity) SetupReplication(
replicationErr := make(chan error)
defer close(replicationErr)

closeConnectionForError := func(err error) {
slog.ErrorContext(ctx, "failed to setup replication", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
// it is important to close the connection here as it is not closed in CloseSlotKeepAlive
connCloseErr := conn.Close()
if connCloseErr != nil {
slog.ErrorContext(ctx, "failed to close connection", slog.Any("error", connCloseErr))
}
}

// This now happens in a goroutine
go func() {
pgConn := conn.(*connpostgres.PostgresConnector)
err = pgConn.SetupReplication(slotSignal, config)
if err != nil {
slog.ErrorContext(ctx, "failed to setup replication", slog.Any("error", err))
closeConnectionForError(err)
replicationErr <- err
return
}
Expand All @@ -69,12 +79,12 @@ func (a *SnapshotActivity) SetupReplication(
case slotInfo = <-slotSignal.SlotCreated:
slog.InfoContext(ctx, fmt.Sprintf("slot '%s' created", slotInfo.SlotName))
case err := <-replicationErr:
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
closeConnectionForError(err)
return nil, fmt.Errorf("failed to setup replication: %w", err)
}

if slotInfo.Err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, slotInfo.Err)
closeConnectionForError(slotInfo.Err)
return nil, fmt.Errorf("slot error: %w", slotInfo.Err)
}

Expand Down
150 changes: 80 additions & 70 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ func (h *FlowRequestHandler) MirrorStatus(
ctx context.Context,
req *protos.MirrorStatusRequest,
) (*protos.MirrorStatusResponse, error) {
slog.Info("Mirror status endpoint called", slog.String(string(shared.FlowNameKey), req.FlowJobName))
cdcFlow, err := h.isCDCFlow(ctx, req.FlowJobName)
if err != nil {
slog.Error(fmt.Sprintf("unable to query flow: %s", err.Error()))
return &protos.MirrorStatusResponse{
ErrorMessage: fmt.Sprintf("unable to query flow: %s", err.Error()),
}, nil
Expand Down Expand Up @@ -73,28 +75,19 @@ func (h *FlowRequestHandler) CDCFlowStatus(
ctx context.Context,
req *protos.MirrorStatusRequest,
) (*protos.CDCMirrorStatus, error) {
slog.Info("CDC mirror status endpoint called", slog.String(string(shared.FlowNameKey), req.FlowJobName))
config, err := h.getFlowConfigFromCatalog(req.FlowJobName)
if err != nil {
return nil, err
}

var initialCopyStatus *protos.SnapshotStatus

cloneJobNames, err := h.getCloneTableFlowNames(ctx, req.FlowJobName)
cloneStatuses, err := h.cloneTableSummary(ctx, req.FlowJobName)
if err != nil {
return nil, err
}

cloneStatuses := []*protos.CloneTableSummary{}
for _, cloneJobName := range cloneJobNames {
cloneStatus, err := h.cloneTableSummary(ctx, cloneJobName)
if err != nil {
return nil, err
}

cloneStatuses = append(cloneStatuses, cloneStatus)
}

initialCopyStatus = &protos.SnapshotStatus{
Clones: cloneStatuses,
}
Expand All @@ -108,70 +101,103 @@ func (h *FlowRequestHandler) CDCFlowStatus(
func (h *FlowRequestHandler) cloneTableSummary(
ctx context.Context,
flowJobName string,
) (*protos.CloneTableSummary, error) {
cfg := h.getQRepConfigFromCatalog(flowJobName)
res := &protos.CloneTableSummary{
FlowJobName: flowJobName,
TableName: cfg.DestinationTableIdentifier,
}

) ([]*protos.CloneTableSummary, error) {
q := `
SELECT
MIN(start_time) AS StartTime,
qp.flow_name,
qr.config_proto,
MIN(qp.start_time) AS StartTime,
COUNT(*) AS NumPartitionsTotal,
COUNT(CASE WHEN end_time IS NOT NULL THEN 1 END) AS NumPartitionsCompleted,
SUM(rows_in_partition) FILTER (WHERE end_time IS NOT NULL) AS NumRowsSynced,
AVG(EXTRACT(EPOCH FROM (end_time - start_time)) * 1000) FILTER (WHERE end_time IS NOT NULL) AS AvgTimePerPartitionMs
COUNT(CASE WHEN qp.end_time IS NOT NULL THEN 1 END) AS NumPartitionsCompleted,
SUM(qp.rows_in_partition) FILTER (WHERE qp.end_time IS NOT NULL) AS NumRowsSynced,
AVG(EXTRACT(EPOCH FROM (qp.end_time - qp.start_time)) * 1000) FILTER (WHERE qp.end_time IS NOT NULL) AS AvgTimePerPartitionMs
FROM
peerdb_stats.qrep_partitions
peerdb_stats.qrep_partitions qp
JOIN
peerdb_stats.qrep_runs qr
ON
qp.flow_name = qr.flow_name
WHERE
flow_name = $1;
qp.flow_name ILIKE $1
GROUP BY
qp.flow_name, qr.config_proto;
`

var flowName pgtype.Text
var configBytes []byte
var startTime pgtype.Timestamp
var numPartitionsTotal pgtype.Int8
var numPartitionsCompleted pgtype.Int8
var numRowsSynced pgtype.Int8
var avgTimePerPartitionMs pgtype.Float8

err := h.pool.QueryRow(ctx, q, flowJobName).Scan(
&startTime,
&numPartitionsTotal,
&numPartitionsCompleted,
&numRowsSynced,
&avgTimePerPartitionMs,
)
rows, err := h.pool.Query(ctx, q, "clone_"+flowJobName+"_%")
if err != nil {
return nil, fmt.Errorf("unable to query qrep partition - %s: %w", flowJobName, err)
slog.Error(fmt.Sprintf("unable to query initial load partition - %s: %s", flowJobName, err.Error()))
return nil, fmt.Errorf("unable to query initial load partition - %s: %w", flowJobName, err)
}

if startTime.Valid {
res.StartTime = timestamppb.New(startTime.Time)
}
defer rows.Close()

if numPartitionsTotal.Valid {
res.NumPartitionsTotal = int32(numPartitionsTotal.Int64)
}
cloneStatuses := []*protos.CloneTableSummary{}
for rows.Next() {
if err := rows.Scan(
&flowName,
&configBytes,
&startTime,
&numPartitionsTotal,
&numPartitionsCompleted,
&numRowsSynced,
&avgTimePerPartitionMs,
); err != nil {
return nil, fmt.Errorf("unable to scan initial load partition - %s: %w", flowJobName, err)
}

if numPartitionsCompleted.Valid {
res.NumPartitionsCompleted = int32(numPartitionsCompleted.Int64)
}
var res protos.CloneTableSummary

if numRowsSynced.Valid {
res.NumRowsSynced = numRowsSynced.Int64
}
if flowName.Valid {
res.FlowJobName = flowName.String
}
if startTime.Valid {
res.StartTime = timestamppb.New(startTime.Time)
}

if avgTimePerPartitionMs.Valid {
res.AvgTimePerPartitionMs = int64(avgTimePerPartitionMs.Float64)
}
if numPartitionsTotal.Valid {
res.NumPartitionsTotal = int32(numPartitionsTotal.Int64)
}

return res, nil
if numPartitionsCompleted.Valid {
res.NumPartitionsCompleted = int32(numPartitionsCompleted.Int64)
}

if numRowsSynced.Valid {
res.NumRowsSynced = numRowsSynced.Int64
}

if avgTimePerPartitionMs.Valid {
res.AvgTimePerPartitionMs = int64(avgTimePerPartitionMs.Float64)
}

if configBytes != nil {
var config protos.QRepConfig
if err := proto.Unmarshal(configBytes, &config); err != nil {
slog.Error(fmt.Sprintf("unable to unmarshal config: %s", err.Error()))
return nil, fmt.Errorf("unable to unmarshal config: %w", err)
}
res.TableName = config.DestinationTableIdentifier
}

cloneStatuses = append(cloneStatuses, &res)

}
return cloneStatuses, nil
}

func (h *FlowRequestHandler) QRepFlowStatus(
ctx context.Context,
req *protos.MirrorStatusRequest,
) (*protos.QRepMirrorStatus, error) {
slog.Info("QRep Flow status endpoint called", slog.String(string(shared.FlowNameKey), req.FlowJobName))
partitionStatuses, err := h.getPartitionStatuses(ctx, req.FlowJobName)
if err != nil {
slog.Error(fmt.Sprintf("unable to query qrep partition - %s: %s", req.FlowJobName, err.Error()))
Expand Down Expand Up @@ -240,11 +266,13 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog(
err = h.pool.QueryRow(context.Background(),
"SELECT config_proto FROM flows WHERE name = $1", flowJobName).Scan(&configBytes)
if err != nil {
slog.Error(fmt.Sprintf("unable to query flow config from catalog: %s", err.Error()))
return nil, fmt.Errorf("unable to query flow config from catalog: %w", err)
}

err = proto.Unmarshal(configBytes, &config)
if err != nil {
slog.Error(fmt.Sprintf("unable to unmarshal flow config: %s", err.Error()))
return nil, fmt.Errorf("unable to unmarshal flow config: %w", err)
}

Expand Down Expand Up @@ -296,6 +324,7 @@ func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string)
var query pgtype.Text
err := h.pool.QueryRow(ctx, "SELECT query_string FROM flows WHERE name = $1", flowJobName).Scan(&query)
if err != nil {
slog.Error(fmt.Sprintf("unable to query flow: %s", err.Error()))
return false, fmt.Errorf("unable to query flow: %w", err)
}

Expand All @@ -306,36 +335,16 @@ func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string)
return false, nil
}

func (h *FlowRequestHandler) getCloneTableFlowNames(ctx context.Context, flowJobName string) ([]string, error) {
q := "SELECT flow_name FROM peerdb_stats.qrep_runs WHERE flow_name ILIKE $1"
rows, err := h.pool.Query(ctx, q, "clone_"+flowJobName+"_%")
if err != nil {
return nil, fmt.Errorf("unable to getCloneTableFlowNames: %w", err)
}
defer rows.Close()

flowNames := []string{}
for rows.Next() {
var name pgtype.Text
if err := rows.Scan(&name); err != nil {
return nil, fmt.Errorf("unable to scan flow row: %w", err)
}
if name.Valid {
flowNames = append(flowNames, name.String)
}
}

return flowNames, nil
}

func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID string) (*protos.FlowStatus, error) {
res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.FlowStatusQuery)
if err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
return nil, fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
var state *protos.FlowStatus
err = res.Get(&state)
if err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
return nil, fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
return state, nil
Expand All @@ -348,6 +357,7 @@ func (h *FlowRequestHandler) updateWorkflowStatus(
) error {
_, err := h.temporalClient.UpdateWorkflow(ctx, workflowID, "", shared.FlowStatusUpdate, state)
if err != nil {
slog.Error(fmt.Sprintf("failed to update state in workflow with ID %s: %s", workflowID, err.Error()))
return fmt.Errorf("failed to update state in workflow with ID %s: %w", workflowID, err)
}
return nil
Expand Down
34 changes: 28 additions & 6 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,45 @@ func (h *FlowRequestHandler) GetTablesInSchema(
}

defer peerPool.Close()
rows, err := peerPool.Query(ctx, "SELECT table_name "+
"FROM information_schema.tables "+
"WHERE table_schema = $1 AND table_type = 'BASE TABLE';", req.SchemaName)
rows, err := peerPool.Query(ctx, `SELECT DISTINCT ON (t.relname)
t.relname,
CASE
WHEN c.constraint_type = 'PRIMARY KEY' OR t.relreplident = 'i' OR t.relreplident = 'f' THEN true
ELSE false
END AS can_mirror
FROM
information_schema.table_constraints c
RIGHT JOIN
pg_class t ON c.table_name = t.relname
WHERE
t.relnamespace::regnamespace::text = $1
AND
t.relkind = 'r'
ORDER BY
t.relname,
can_mirror DESC;`, req.SchemaName)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
}

defer rows.Close()
var tables []string
var tables []*protos.TableResponse
for rows.Next() {
var table pgtype.Text
err := rows.Scan(&table)
var hasPkeyOrReplica pgtype.Bool
err := rows.Scan(&table, &hasPkeyOrReplica)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
}
canMirror := false
if hasPkeyOrReplica.Valid && hasPkeyOrReplica.Bool {
canMirror = true
}

tables = append(tables, table.String)
tables = append(tables, &protos.TableResponse{
TableName: table.String,
CanMirror: canMirror,
})
}
return &protos.SchemaTablesResponse{Tables: tables}, nil
}
Expand Down
Loading

0 comments on commit 7dcfb23

Please sign in to comment.