Skip to content

Commit

Permalink
Sync batch pagination (#2207)
Browse files Browse the repository at this point in the history
Some customers have enough batches now that response size was exceeding grpc limit

Changing this meant loading pages & aggregating graph data on server

Removed searching specific batch id & changing sort column from ui
  • Loading branch information
serprex authored Nov 4, 2024
1 parent f1a94ea commit fc59d94
Show file tree
Hide file tree
Showing 14 changed files with 264 additions and 204 deletions.
124 changes: 106 additions & 18 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,19 @@ func (h *FlowRequestHandler) cdcFlowStatus(
return nil, err
}

cdcBatchesResponse, err := h.GetCDCBatches(ctx, &protos.GetCDCBatchesRequest{
FlowJobName: req.FlowJobName,
Limit: 0,
})
if err != nil {
var cdcBatches []*protos.CDCBatch
if !req.ExcludeBatches {
cdcBatchesResponse, err := h.GetCDCBatches(ctx, &protos.GetCDCBatchesRequest{FlowJobName: req.FlowJobName})
if err != nil {
return nil, err
}
cdcBatches = cdcBatchesResponse.CdcBatches
}

var rowsSynced int64
if err := h.pool.QueryRow(ctx,
"select coalesce(sum(rows_in_batch), 0) from peerdb_stats.cdc_batches where flow_name=$1", req.FlowJobName,
).Scan(&rowsSynced); err != nil {
return nil, err
}

Expand All @@ -190,10 +198,43 @@ func (h *FlowRequestHandler) cdcFlowStatus(
SnapshotStatus: &protos.SnapshotStatus{
Clones: initialLoadResponse.TableSummaries,
},
CdcBatches: cdcBatchesResponse.CdcBatches,
CdcBatches: cdcBatches,
RowsSynced: rowsSynced,
}, nil
}

func (h *FlowRequestHandler) CDCGraph(ctx context.Context, req *protos.GraphRequest) (*protos.GraphResponse, error) {
truncField := "minute"
switch req.AggregateType {
case "1hour":
truncField = "hour"
case "1day":
truncField = "day"
case "1month":
truncField = "month"
}
rows, err := h.pool.Query(ctx, `select tm, coalesce(sum(rows_in_batch), 0)
from generate_series(date_trunc($2, now() - $1::INTERVAL * 30), now(), $1::INTERVAL) tm
left join peerdb_stats.cdc_batches on start_time >= tm and start_time < tm + $1::INTERVAL
group by 1 order by 1`, req.AggregateType, truncField)
if err != nil {
return nil, err
}
data, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.GraphResponseItem, error) {
var t time.Time
var r int64
if err := row.Scan(&t, &r); err != nil {
return nil, err
}
return &protos.GraphResponseItem{Time: float64(t.UnixMilli()), Rows: float64(r)}, nil
})
if err != nil {
return nil, err
}

return &protos.GraphResponse{Data: data}, nil
}

func (h *FlowRequestHandler) InitialLoadSummary(
ctx context.Context,
req *protos.InitialLoadSummaryRequest,
Expand Down Expand Up @@ -455,18 +496,39 @@ func (h *FlowRequestHandler) getMirrorCreatedAt(ctx context.Context, flowJobName
}

func (h *FlowRequestHandler) GetCDCBatches(ctx context.Context, req *protos.GetCDCBatchesRequest) (*protos.GetCDCBatchesResponse, error) {
mirrorName := req.FlowJobName
limit := req.Limit
return h.CDCBatches(ctx, req)
}

func (h *FlowRequestHandler) CDCBatches(ctx context.Context, req *protos.GetCDCBatchesRequest) (*protos.GetCDCBatchesResponse, error) {
limitClause := ""
if limit > 0 {
limitClause = fmt.Sprintf(" LIMIT %d", limit)
if req.Limit > 0 {
limitClause = fmt.Sprintf(" LIMIT %d", req.Limit)
}
q := `SELECT DISTINCT ON(batch_id) batch_id,start_time,end_time,rows_in_batch,batch_start_lsn,batch_end_lsn FROM peerdb_stats.cdc_batches
WHERE flow_name=$1 AND start_time IS NOT NULL ORDER BY batch_id DESC, start_time DESC` + limitClause
rows, err := h.pool.Query(ctx, q, mirrorName)

whereExpr := ""
queryArgs := append(make([]any, 0, 2), req.FlowJobName)

sortOrderBy := "desc"
if req.BeforeId != 0 || req.AfterId != 0 {
if req.BeforeId != -1 {
queryArgs = append(queryArgs, req.BeforeId)
whereExpr = fmt.Sprintf(" AND batch_id < $%d", len(queryArgs))
} else if req.AfterId != -1 {
queryArgs = append(queryArgs, req.AfterId)
whereExpr = fmt.Sprintf(" AND batch_id > $%d", len(queryArgs))
sortOrderBy = "asc"
}
}

q := fmt.Sprintf(`SELECT DISTINCT ON(batch_id)
batch_id,start_time,end_time,rows_in_batch,batch_start_lsn,batch_end_lsn
FROM peerdb_stats.cdc_batches
WHERE flow_name=$1 AND start_time IS NOT NULL%s
ORDER BY batch_id %s%s`, whereExpr, sortOrderBy, limitClause)
rows, err := h.pool.Query(ctx, q, queryArgs...)
if err != nil {
slog.Error(fmt.Sprintf("unable to query cdc batches - %s: %s", mirrorName, err.Error()))
return nil, fmt.Errorf("unable to query cdc batches - %s: %w", mirrorName, err)
slog.Error(fmt.Sprintf("unable to query cdc batches - %s: %s", req.FlowJobName, err.Error()))
return nil, fmt.Errorf("unable to query cdc batches - %s: %w", req.FlowJobName, err)
}

batches, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.CDCBatch, error) {
Expand All @@ -477,8 +539,8 @@ func (h *FlowRequestHandler) GetCDCBatches(ctx context.Context, req *protos.GetC
var startLSN pgtype.Numeric
var endLSN pgtype.Numeric
if err := rows.Scan(&batchID, &startTime, &endTime, &numRows, &startLSN, &endLSN); err != nil {
slog.Error(fmt.Sprintf("unable to scan cdc batches - %s: %s", mirrorName, err.Error()))
return nil, fmt.Errorf("unable to scan cdc batches - %s: %w", mirrorName, err)
slog.Error(fmt.Sprintf("unable to scan cdc batches - %s: %s", req.FlowJobName, err.Error()))
return nil, fmt.Errorf("unable to scan cdc batches - %s: %w", req.FlowJobName, err)
}

var batch protos.CDCBatch
Expand Down Expand Up @@ -511,9 +573,35 @@ func (h *FlowRequestHandler) GetCDCBatches(ctx context.Context, req *protos.GetC
if batches == nil {
batches = []*protos.CDCBatch{}
}
if req.Ascending != (sortOrderBy == "asc") {
slices.Reverse(batches)
}

var total int32
var rowsBehind int32
if len(batches) > 0 {
op := '>'
if req.Ascending {
op = '<'
}
firstId := batches[0].BatchId
if err := h.pool.QueryRow(ctx, fmt.Sprintf(`select count(distinct batch_id), count(distinct batch_id) filter (where batch_id%c$2)
from peerdb_stats.cdc_batches where flow_name=$1 and start_time is not null`, op), req.FlowJobName, firstId,
).Scan(&total, &rowsBehind); err != nil {
return nil, err
}
} else if err := h.pool.QueryRow(
ctx,
"select count(distinct batch_id) from peerdb_stats.cdc_batches where flow_name=$1 and start_time is not null",
req.FlowJobName,
).Scan(&total); err != nil {
return nil, err
}

return &protos.GetCDCBatchesResponse{
CdcBatches: batches,
Total: total,
Page: rowsBehind/int32(req.Limit) + 1,
}, nil
}

Expand Down Expand Up @@ -602,7 +690,7 @@ func (h *FlowRequestHandler) ListMirrorLogs(
}

sortOrderBy := "desc"
if req.BeforeId != 0 && req.AfterId != 0 {
if req.BeforeId != 0 || req.AfterId != 0 {
if req.BeforeId != -1 {
whereArgs = append(whereArgs, req.BeforeId)
whereExprs = append(whereExprs, fmt.Sprintf("id < $%d", len(whereArgs)))
Expand Down
33 changes: 31 additions & 2 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ message CreatePeerResponse {
message MirrorStatusRequest {
string flow_job_name = 1;
bool include_flow_info = 2;
bool exclude_batches = 3;
}

message PartitionStatus {
Expand Down Expand Up @@ -320,6 +321,7 @@ message CDCMirrorStatus {
repeated CDCBatch cdc_batches = 3;
peerdb_peers.DBType source_type = 4;
peerdb_peers.DBType destination_type = 5;
int64 rows_synced = 6;
}

message MirrorStatusResponse {
Expand All @@ -343,10 +345,29 @@ message InitialLoadSummaryResponse {
message GetCDCBatchesRequest {
string flow_job_name = 1;
uint32 limit = 2;
bool ascending = 3;
int64 before_id = 4;
int64 after_id = 5;
}

message GetCDCBatchesResponse {
repeated CDCBatch cdc_batches = 1;
int32 total = 2;
int32 page = 3;
}

message GraphRequest {
string flow_job_name = 1;
string aggregate_type = 2; // TODO name?
}

message GraphResponseItem {
double time = 1;
double rows = 2;
}

message GraphResponse {
repeated GraphResponseItem data = 1;
}

message MirrorLog {
Expand Down Expand Up @@ -545,11 +566,19 @@ service FlowService {
}

rpc GetCDCBatches(GetCDCBatchesRequest) returns (GetCDCBatchesResponse) {
option (google.api.http) = { get: "/v1/mirrors/cdc/batches/{flow_job_name}"};
option (google.api.http) = { get: "/v1/mirrors/cdc/batches/{flow_job_name}" };
}

rpc CDCBatches(GetCDCBatchesRequest) returns (GetCDCBatchesResponse) {
option (google.api.http) = { post: "/v1/mirrors/cdc/batches", body: "*" };
}

rpc CDCGraph(GraphRequest) returns (GraphResponse) {
option (google.api.http) = { post: "/v1/mirrors/cdc/graph", body: "*" };
}

rpc InitialLoadSummary(InitialLoadSummaryRequest) returns (InitialLoadSummaryResponse) {
option (google.api.http) = { get: "/v1/mirrors/cdc/initial_load/{parent_mirror_name}"};
option (google.api.http) = { get: "/v1/mirrors/cdc/initial_load/{parent_mirror_name}" };
}

rpc GetPeerInfo(PeerInfoRequest) returns (PeerInfoResponse) {
Expand Down
4 changes: 1 addition & 3 deletions ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type timestampType = {
count: number;
};

function aggregateCountsByInterval(
export default function aggregateCountsByInterval(
timestamps: timestampType[],
interval: TimeAggregateTypes
): [string, number][] {
Expand Down Expand Up @@ -83,5 +83,3 @@ function aggregateCountsByInterval(

return resultArray;
}

export default aggregateCountsByInterval;
10 changes: 2 additions & 8 deletions ui/app/mirrors/[mirrorId]/cdc.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use client';
import { CDCBatch, MirrorStatusResponse } from '@/grpc_generated/route';
import { MirrorStatusResponse } from '@/grpc_generated/route';
import { Label } from '@/lib/Label';
import { ProgressCircle } from '@/lib/ProgressCircle';
import { Tab, TabGroup, TabList, TabPanel, TabPanels } from '@tremor/react';
Expand All @@ -10,14 +10,9 @@ import { SnapshotStatusTable } from './snapshot';

type CDCMirrorStatusProps = {
status: MirrorStatusResponse;
rows: CDCBatch[];
syncStatusChild?: React.ReactNode;
};
export function CDCMirror({
status,
rows,
syncStatusChild,
}: CDCMirrorStatusProps) {
export function CDCMirror({ status, syncStatusChild }: CDCMirrorStatusProps) {
const LocalStorageTabKey = 'cdctab';
const [selectedTab, setSelectedTab] = useLocalStorage(LocalStorageTabKey, 0);
const [mounted, setMounted] = useState(false);
Expand Down Expand Up @@ -60,7 +55,6 @@ export function CDCMirror({
<TabPanels>
<TabPanel>
<CdcDetails
syncs={rows}
createdAt={status.createdAt}
mirrorConfig={status.cdcStatus!}
mirrorStatus={status.currentFlowState}
Expand Down
40 changes: 17 additions & 23 deletions ui/app/mirrors/[mirrorId]/cdcDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import PeerButton from '@/components/PeerComponent';
import TimeLabel from '@/components/TimeComponent';
import { FlowStatus } from '@/grpc_generated/flow';
import { dBTypeFromJSON } from '@/grpc_generated/peers';
import { CDCBatch, CDCMirrorStatus } from '@/grpc_generated/route';
import { CDCMirrorStatus } from '@/grpc_generated/route';
import { Label } from '@/lib/Label';
import { ProgressCircle } from '@/lib/ProgressCircle';
import Link from 'next/link';
Expand All @@ -16,27 +16,22 @@ import { RowDataFormatter } from './rowsDisplay';
import TablePairs from './tablePairs';

type props = {
syncs: CDCBatch[];
mirrorConfig: CDCMirrorStatus;
createdAt?: Date;
mirrorStatus: FlowStatus;
};
function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
const [syncInterval, getSyncInterval] = useState<number>();

let rowsSynced = syncs.reduce((acc, sync) => {
if (sync.endTime !== null) {
return acc + Number(sync.numRows);
}
return acc;
}, 0);
export default function CdcDetails({
createdAt,
mirrorConfig,
mirrorStatus,
}: props) {
const [syncInterval, setSyncInterval] = useState<number>();

const tablesSynced = mirrorConfig.config?.tableMappings;
useEffect(() => {
getCurrentIdleTimeout(mirrorConfig.config?.flowJobName ?? '').then(
(res) => {
getSyncInterval(res);
}
getCurrentIdleTimeout(mirrorConfig.config?.flowJobName ?? '').then((res) =>
setSyncInterval(res)
);
}, [mirrorConfig.config?.flowJobName]);
return (
Expand Down Expand Up @@ -82,8 +77,8 @@ function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
</div>
<div>
<PeerButton
peerName={mirrorConfig?.config?.sourceName ?? ''}
peerType={dBTypeFromJSON(mirrorConfig?.sourceType)}
peerName={mirrorConfig.config?.sourceName ?? ''}
peerType={dBTypeFromJSON(mirrorConfig.sourceType)}
/>
</div>
</div>
Expand All @@ -95,8 +90,8 @@ function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
</div>
<div>
<PeerButton
peerName={mirrorConfig?.config?.destinationName ?? ''}
peerType={dBTypeFromJSON(mirrorConfig?.destinationType)}
peerName={mirrorConfig.config?.destinationName ?? ''}
peerType={dBTypeFromJSON(mirrorConfig.destinationType)}
/>
</div>
</div>
Expand Down Expand Up @@ -129,7 +124,9 @@ function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
</Label>
</div>
<div>
<Label variant='body'>{RowDataFormatter(rowsSynced)}</Label>
<Label variant='body'>
{RowDataFormatter(mirrorConfig.rowsSynced)}
</Label>
</div>
</div>

Expand All @@ -151,8 +148,7 @@ const SyncIntervalLabel: React.FC<{ syncInterval?: number }> = ({

if (!syncInterval) {
return <ProgressCircle variant='determinate_progress_circle' />;
}
if (syncInterval >= 3600) {
} else if (syncInterval >= 3600) {
const hours = Math.floor(syncInterval / 3600);
formattedInterval = `${hours} hour${hours !== 1 ? 's' : ''}`;
} else if (syncInterval >= 60) {
Expand All @@ -164,5 +160,3 @@ const SyncIntervalLabel: React.FC<{ syncInterval?: number }> = ({

return <Label>{formattedInterval}</Label>;
};

export default CdcDetails;
Loading

0 comments on commit fc59d94

Please sign in to comment.