Skip to content

Commit

Permalink
improve logs pagination (#2202)
Browse files Browse the repository at this point in the history
using LIMIT/OFFSET is inaccurate when new logs come it while logs being viewed

this makes a minor improvement by instead being based on records before/after an id from previous page,
which is how I'd like to also add pagination to sync batches in mirror status
  • Loading branch information
serprex authored Oct 30, 2024
1 parent 7d63502 commit 9b4d837
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 126 deletions.
66 changes: 57 additions & 9 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"
"log/slog"
"slices"
"strings"
"time"

Expand Down Expand Up @@ -581,8 +582,8 @@ func (h *FlowRequestHandler) ListMirrorLogs(
ctx context.Context,
req *protos.ListMirrorLogsRequest,
) (*protos.ListMirrorLogsResponse, error) {
whereExprs := make([]string, 0, 2)
whereArgs := make([]interface{}, 0, 2)
whereExprs := make([]string, 0, 3)
whereArgs := make([]any, 0, 4)
if req.FlowJobName != "" {
whereArgs = append(whereArgs, req.FlowJobName)
whereExprs = append(whereExprs, "position($1 in flow_name) > 0")
Expand All @@ -593,23 +594,47 @@ func (h *FlowRequestHandler) ListMirrorLogs(
whereExprs = append(whereExprs, fmt.Sprintf("error_type = $%d", len(whereArgs)))
}

// count query doesn't want paging
countWhereArgs := slices.Clone(whereArgs)
var countWhereClause string
if len(whereExprs) != 0 {
countWhereClause = " WHERE " + strings.Join(whereExprs, " AND ")
}

sortOrderBy := "desc"
if req.BeforeId != 0 && req.AfterId != 0 {
if req.BeforeId != -1 {
whereArgs = append(whereArgs, req.BeforeId)
whereExprs = append(whereExprs, fmt.Sprintf("id < $%d", len(whereArgs)))
} else if req.AfterId != -1 {
whereArgs = append(whereArgs, req.AfterId)
whereExprs = append(whereExprs, fmt.Sprintf("id > $%d", len(whereArgs)))
sortOrderBy = ""
}
}

var whereClause string
if len(whereExprs) != 0 {
whereClause = " WHERE " + strings.Join(whereExprs, " AND ")
}

skip := (req.Page - 1) * req.NumPerPage
rows, err := h.pool.Query(ctx, fmt.Sprintf(`select flow_name, error_message, error_type, error_timestamp
from peerdb_stats.flow_errors %s
order by error_timestamp desc
limit %d offset %d`, whereClause, req.NumPerPage, skip), whereArgs...)
// page is deprecated
var offsetClause string
if req.Page != 0 {
offsetClause = fmt.Sprintf(" offset %d", (req.Page-1)*req.NumPerPage)
}

rows, err := h.pool.Query(ctx, fmt.Sprintf(`select id, flow_name, error_message, error_type, error_timestamp
from peerdb_stats.flow_errors%s
order by id %s
limit %d%s`, whereClause, sortOrderBy, req.NumPerPage, offsetClause), whereArgs...)
if err != nil {
return nil, err
}
mirrorErrors, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.MirrorLog, error) {
var log protos.MirrorLog
var errorTimestamp time.Time
if err := rows.Scan(&log.FlowName, &log.ErrorMessage, &log.ErrorType, &errorTimestamp); err != nil {
if err := rows.Scan(&log.Id, &log.FlowName, &log.ErrorMessage, &log.ErrorType, &errorTimestamp); err != nil {
return nil, err
}
log.ErrorTimestamp = float64(errorTimestamp.UnixMilli())
Expand All @@ -618,14 +643,37 @@ func (h *FlowRequestHandler) ListMirrorLogs(
if err != nil {
return nil, err
}
if sortOrderBy == "" {
slices.Reverse(mirrorErrors)
}

var total int32
if err := h.pool.QueryRow(ctx, "select count(*) from peerdb_stats.flow_errors"+whereClause, whereArgs...).Scan(&total); err != nil {
var rowsBehind int32
if len(mirrorErrors) > 0 {
firstId := mirrorErrors[0].Id
countWhereArgs = append(countWhereArgs, firstId)
if err := h.pool.QueryRow(
ctx,
fmt.Sprintf("select count(*), count(*) filter (where id > $%d) from peerdb_stats.flow_errors%s",
len(countWhereArgs), countWhereClause),
countWhereArgs...,
).Scan(&total, &rowsBehind); err != nil {
return nil, err
}
} else if err := h.pool.QueryRow(
ctx, "select count(*) from peerdb_stats.flow_errors"+countWhereClause, countWhereArgs...,
).Scan(&total); err != nil {
return nil, err
}

page := req.Page
if page == 0 {
page = rowsBehind/req.NumPerPage + 1
}

return &protos.ListMirrorLogsResponse{
Errors: mirrorErrors,
Total: total,
Page: page,
}, nil
}
4 changes: 4 additions & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -354,16 +354,20 @@ message MirrorLog {
string error_message = 2;
string error_type = 3;
double error_timestamp = 4;
int32 id = 5;
}
message ListMirrorLogsRequest {
string flow_job_name = 1;
string level = 2;
int32 page = 3;
int32 num_per_page = 4;
int32 before_id = 5;
int32 after_id = 6;
}
message ListMirrorLogsResponse {
repeated MirrorLog errors = 1;
int32 total = 2;
int32 page = 3;
}

message ValidateCDCMirrorResponse{
Expand Down
52 changes: 2 additions & 50 deletions ui/app/mirror-logs/table.tsx
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
'use client';

import LogsTable from '@/components/LogsTable';
import {
ListMirrorLogsRequest,
ListMirrorLogsResponse,
ListMirrorNamesResponse,
MirrorLog,
} from '@/grpc_generated/route';
import { ListMirrorNamesResponse } from '@/grpc_generated/route';
import { ProgressCircle } from '@/lib/ProgressCircle';
import { useEffect, useState } from 'react';
import ReactSelect from 'react-select';
import 'react-toastify/dist/ReactToastify.css';
import useSWR from 'swr';
import { useLocalStorage } from 'usehooks-ts';
import { fetcher } from '../utils/swr';

export default function LogsView() {
const [logs, setLogs] = useState<MirrorLog[]>([]);
const [mirrorName, setMirrorName] = useLocalStorage<string>(
'peerdbMirrorNameFilterForLogs',
''
Expand All @@ -25,45 +18,9 @@ export default function LogsView() {
'peerdbLogTypeFilterForLogs',
'all'
);
const [currentPage, setCurrentPage] = useState(1);
const [totalPages, setTotalPages] = useState(1);
const { data: mirrors }: { data: ListMirrorNamesResponse; error: any } =
useSWR('/api/v1/mirrors/names', fetcher);

useEffect(() => {
setCurrentPage(1);
}, [mirrorName]);

useEffect(() => {
const req: ListMirrorLogsRequest = {
level: logLevel,
flowJobName: mirrorName,
page: currentPage,
numPerPage: 15,
};

const fetchData = async () => {
try {
const response = await fetch('/api/v1/mirrors/logs', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
cache: 'no-store',
body: JSON.stringify(req),
});
const data: ListMirrorLogsResponse = await response.json();
const numPages = Math.ceil(data.total / req.numPerPage);
setLogs(data.errors);
setTotalPages(numPages);
} catch (error) {
console.error('Error fetching mirror logs:', error);
}
};

fetchData();
}, [currentPage, mirrorName, logLevel]);

if (!mirrors) {
return <ProgressCircle variant='determinate_progress_circle' />;
}
Expand Down Expand Up @@ -107,12 +64,7 @@ export default function LogsView() {
/>
</div>
</div>
<LogsTable
logs={logs}
currentPage={currentPage}
totalPages={totalPages}
setCurrentPage={setCurrentPage}
/>
<LogsTable numPerPage={15} mirrorName={mirrorName} logLevel={logLevel} />
</div>
);
}
50 changes: 3 additions & 47 deletions ui/app/mirrors/errors/[mirrorName]/page.tsx
Original file line number Diff line number Diff line change
@@ -1,56 +1,13 @@
'use client';

import LogsTable from '@/components/LogsTable';
import {
ListMirrorLogsRequest,
ListMirrorLogsResponse,
MirrorLog,
} from '@/grpc_generated/route';
import { Label } from '@/lib/Label';
import { useParams } from 'next/navigation';
import { useEffect, useState } from 'react';
import { ToastContainer } from 'react-toastify';
import 'react-toastify/dist/ReactToastify.css';

export default function MirrorError() {
const params = useParams<{ mirrorName: string }>();
const [mirrorErrors, setMirrorErrors] = useState<MirrorLog[]>([]);
const [currentPage, setCurrentPage] = useState(1);
const [totalPages, setTotalPages] = useState(1);

useEffect(() => {
setCurrentPage(1);
}, [params.mirrorName]);

useEffect(() => {
const req: ListMirrorLogsRequest = {
flowJobName: params.mirrorName,
page: currentPage,
numPerPage: 10,
level: 'all',
};

const fetchData = async () => {
try {
const response = await fetch('/api/v1/mirrors/logs', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
cache: 'no-store',
body: JSON.stringify(req),
});
const data: ListMirrorLogsResponse = await response.json();
const numPages = Math.ceil(data.total / req.numPerPage);
setMirrorErrors(data.errors);
setTotalPages(numPages);
} catch (error) {
console.error('Error fetching mirror errors:', error);
}
};

fetchData();
}, [currentPage, params.mirrorName]);

return (
<>
Expand All @@ -72,10 +29,9 @@ export default function MirrorError() {
</div>

<LogsTable
logs={mirrorErrors}
currentPage={currentPage}
totalPages={totalPages}
setCurrentPage={setCurrentPage}
numPerPage={10}
logLevel='all'
mirrorName={params.mirrorName}
/>
</div>
</div>
Expand Down
Loading

0 comments on commit 9b4d837

Please sign in to comment.