From 1dcfb097d2c497e609ad03b9e53afcc66eb86996 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 10 Nov 2023 00:00:37 +0000 Subject: [PATCH 1/3] nexus/server: reject duplicate source tables or duplicate target tables (#632) Not sure how to write tests for this crate, but this updates psql interface to reject ```sql CREATE MIRROR m FROM source TO target WITH (s.t1:s.t2, s.t1:s.t3) CREATE MIRROR m FROM source TO target WITH (s.t1:s.t3, s.t2:s.t3) ``` As this kind of mirror doesn't work today & is support is not prioritized This leaves capability to create these kinds of broken mirrors over grpc open, that should be alright since this change's goal is to fail fast for users to know what's allowed --- nexus/server/src/main.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 578c0f5f4f..d6a80c52a7 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::{HashMap, HashSet}, sync::Arc, time::Duration}; use analyzer::{PeerDDL, QueryAssocation}; use async_trait::async_trait; @@ -439,6 +439,25 @@ impl NexusBackend { let catalog = self.catalog.lock().await; let mirror_details = Self::check_for_mirror(&catalog, &flow_job.name).await?; if mirror_details.is_none() { + // reject duplicate source tables or duplicate target tables + let table_mappings_count = flow_job.table_mappings.len(); + if table_mappings_count > 1 { + let mut sources = HashSet::with_capacity(table_mappings_count); + let mut destinations = HashSet::with_capacity(table_mappings_count); + for tm in flow_job.table_mappings.iter() { + if !sources.insert(tm.source_table_identifier.as_str()) { + return Err(PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("Duplicate source table identifier {}", tm.source_table_identifier), + }))) + } + if !destinations.insert(tm.destination_table_identifier.as_str()) { + return Err(PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("Duplicate destination table identifier {}", tm.destination_table_identifier), + }))) + } + } + } + catalog .create_cdc_flow_job_entry(flow_job) .await From de62965b97517a220f2cc9103edd0f43ff510841 Mon Sep 17 00:00:00 2001 From: pankaj-peerdb <149565017+pankaj-peerdb@users.noreply.github.com> Date: Fri, 10 Nov 2023 20:48:55 +0530 Subject: [PATCH 2/3] feature: add screen to view mirror activity (#627) --- .../[mirrorId]/aggregatedCountsByInterval.ts | 92 ++++++++++++ ui/app/mirrors/edit/[mirrorId]/cdc.tsx | 3 +- ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx | 134 +++++++++++++---- ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx | 135 ++++++++++++++++++ ui/app/mirrors/edit/[mirrorId]/page.tsx | 29 +++- ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx | 6 +- ui/tailwind.config.ts | 1 + 7 files changed, 367 insertions(+), 33 deletions(-) create mode 100644 ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts create mode 100644 ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx diff --git a/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts b/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts new file mode 100644 index 0000000000..b3721873a9 --- /dev/null +++ b/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts @@ -0,0 +1,92 @@ +import moment from 'moment'; + +type timestampType = { + timestamp: Date; + count: number; +}; + +function aggregateCountsByInterval( + timestamps: timestampType[], + interval: string +): [string, number][] { + let timeUnit; + switch (interval) { + case 'hour': + timeUnit = 'YYYY-MM-DD HH:00:00'; + break; + case '15min': + timeUnit = 'YYYY-MM-DD HH:mm'; + break; + case 'month': + timeUnit = 'YYYY-MM'; + break; + case 'day': + timeUnit = 'YYYY-MM-DD'; + break; + default: + throw new Error('Invalid interval provided'); + } + + // Create an object to store the aggregated counts + const aggregatedCounts: { [key: string]: number } = {}; + + // Iterate through the timestamps and populate the aggregatedCounts object + for (let { timestamp, count } of timestamps) { + const date = roundUpToNearest15Minutes(timestamp); + const formattedTimestamp = moment(date).format(timeUnit); + + if (!aggregatedCounts[formattedTimestamp]) { + aggregatedCounts[formattedTimestamp] = 0; + } + + aggregatedCounts[formattedTimestamp] += count; + } + + // Create an array of intervals between the start and end timestamps + const intervals = []; + + let currentTimestamp = new Date(); + + if (interval === '15min') { + currentTimestamp = roundUpToNearest15Minutes(currentTimestamp); + } + + while (intervals.length < 30) { + intervals.push(moment(currentTimestamp).format(timeUnit)); + if (interval === 'hour') { + currentTimestamp.setHours(currentTimestamp.getHours() - 1); + } else if (interval === '15min') { + currentTimestamp.setMinutes(currentTimestamp.getMinutes() - 15); + } else if (interval === 'month') { + currentTimestamp.setMonth(currentTimestamp.getMonth() - 1); + } else if (interval === 'day') { + currentTimestamp.setDate(currentTimestamp.getDate() - 1); + } + } + + // Populate the result array with intervals and counts + const resultArray: [string, number][] = intervals.map((interval) => [ + interval, + aggregatedCounts[interval] || 0, + ]); + + return resultArray; +} + +function roundUpToNearest15Minutes(date: Date) { + const minutes = date.getMinutes(); + const remainder = minutes % 15; + + if (remainder > 0) { + // Round up to the nearest 15 minutes + date.setMinutes(minutes + (15 - remainder)); + } + + // Reset seconds and milliseconds to zero to maintain the same time + date.setSeconds(0); + date.setMilliseconds(0); + + return date; +} + +export default aggregateCountsByInterval; diff --git a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx index ff8263dc6b..7ef56f193e 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx @@ -18,7 +18,6 @@ import { useQueryState } from 'next-usequerystate'; import Link from 'next/link'; import { useState } from 'react'; import styled from 'styled-components'; -import CDCDetails from './cdcDetails'; class TableCloneSummary { flowJobName: string; @@ -243,7 +242,7 @@ export function CDCMirror({ cdc, syncStatusChild }: CDCMirrorStatusProps) { - + {/* */} {syncStatusChild} diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx index bd20ad1851..fcdf7dca83 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx @@ -1,35 +1,117 @@ +'use client'; import { FlowConnectionConfigs } from '@/grpc_generated/flow'; +import { Badge } from '@/lib/Badge'; +import { Icon } from '@/lib/Icon'; +import { Label } from '@/lib/Label'; +import moment from 'moment'; +import CdcGraph from './cdcGraph'; -type CDCDetailsProps = { - config: FlowConnectionConfigs | undefined; +import PeerButton from '@/components/PeerComponent'; +import { dBTypeFromJSON } from '@/grpc_generated/peers'; + +type SyncStatusRow = { + batchId: number; + startTime: Date; + endTime: Date | null; + numRows: number; }; -export default function CDCDetails({ config }: CDCDetailsProps) { - if (!config) { - return
No configuration provided
; - } +type props = { + syncs: SyncStatusRow[]; + mirrorConfig: FlowConnectionConfigs | undefined; +}; +function CdcDetails({ syncs, mirrorConfig }: props) { + let lastSyncedAt = moment(syncs[0]?.startTime).fromNow(); + let rowsSynced = syncs.reduce((acc, sync) => acc + sync.numRows, 0); return ( -
-

CDC Details

-
- - - - - - - - - - - - - - - -
Source{config.source?.name || '-'}
Destination{config.destination?.name || '-'}
Flow Job Name{config.flowJobName}
+ <> +
+
+
+
+ +
+
+ +
+
+
+
+ +
+
+ +
+
+
+
+ +
+
+ +
+
+
+
+ +
+
+ +
+
+
+
+
+
+ +
+
+ +
+
+
+
+ +
+
+ +
+
+
+
+
+
-
+ ); } + +function numberWithCommas(x: Number): string { + return x.toString().replace(/\B(?=(\d{3})+(?!\d))/g, ','); +} + +export default CdcDetails; diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx b/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx new file mode 100644 index 0000000000..8a8901a836 --- /dev/null +++ b/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx @@ -0,0 +1,135 @@ +import { Label } from '@/lib/Label'; +import moment from 'moment'; +import { useEffect, useState } from 'react'; + +type SyncStatusRow = { + batchId: number; + startTime: Date; + endTime: Date | null; + numRows: number; +}; + +import aggregateCountsByInterval from './aggregatedCountsByInterval'; + +const aggregateTypeMap: { [key: string]: string } = { + '15min': ' 15 mins', + hour: 'Hour', + day: 'Day', + month: 'Month', +}; + +function CdcGraph({ syncs }: { syncs: SyncStatusRow[] }) { + let [aggregateType, setAggregateType] = useState('hour'); + const initialCount: [string, number][] = []; + let [counts, setCounts] = useState(initialCount); + + let rows = syncs.map((sync) => ({ + timestamp: sync.startTime, + count: sync.numRows, + })); + + useEffect(() => { + let counts = aggregateCountsByInterval(rows, aggregateType); + counts = counts.slice(0, 29); + counts = counts.reverse(); + setCounts(counts); + }, [aggregateType, rows]); + + return ( +
+
+ {['15min', 'hour', 'day', 'month'].map((type) => { + return ( + + ); + })} +
+
+ +
+
+ {counts.map((count, i) => ( + + ))} +
+
+ ); +} + +type filterButtonProps = { + aggregateType: string; + selectedAggregateType: string; + setAggregateType: Function; +}; +function FilterButton({ + aggregateType, + selectedAggregateType, + setAggregateType, +}: filterButtonProps): React.ReactNode { + return ( + + ); +} + +function formatGraphLabel(date: Date, aggregateType: String): string { + switch (aggregateType) { + case '15min': + return moment(date).format('MMM Do HH:mm'); + case 'hour': + return moment(date).format('MMM Do HH:mm'); + case 'day': + return moment(date).format('MMM Do'); + case 'month': + return moment(date).format('MMM yy'); + default: + return 'Unknown aggregate type: ' + aggregateType; + } +} + +type GraphBarProps = { + count: number; + label: string; +}; + +function GraphBar({ label, count }: GraphBarProps) { + let color = + count && count > 0 ? 'bg-positive-fill-normal' : 'bg-base-border-subtle'; + let classNames = `relative w-10 h-24 rounded ${color}`; + return ( +
+
+
+
{label}
+
{numberWithCommas(count)}
+
+
+
+ ); +} + +function numberWithCommas(x: number): string { + return x.toString().replace(/\B(?=(\d{3})+(?!\d))/g, ','); +} + +export default CdcGraph; diff --git a/ui/app/mirrors/edit/[mirrorId]/page.tsx b/ui/app/mirrors/edit/[mirrorId]/page.tsx index e2c5e6596a..0bd23dddb0 100644 --- a/ui/app/mirrors/edit/[mirrorId]/page.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/page.tsx @@ -1,10 +1,11 @@ +import prisma from '@/app/utils/prisma'; import { MirrorStatusResponse } from '@/grpc_generated/route'; import { Header } from '@/lib/Header'; import { LayoutMain } from '@/lib/Layout'; import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; import { redirect } from 'next/navigation'; import { Suspense } from 'react'; -import { CDCMirror } from './cdc'; +import CdcDetails from './cdcDetails'; import SyncStatus from './syncStatus'; export const dynamic = 'force-dynamic'; @@ -44,16 +45,36 @@ export default async function EditMirror({ redirect(`/mirrors/status/qrep/${mirrorId}`); } + let syncs = await prisma.cdc_batches.findMany({ + where: { + flow_name: mirrorId, + start_time: { + not: undefined, + }, + }, + orderBy: { + start_time: 'desc', + }, + }); + + const rows = syncs.map((sync) => ({ + batchId: sync.id, + startTime: sync.start_time, + endTime: sync.end_time, + numRows: sync.rows_in_batch, + })); + return (
{mirrorId}
}> {mirrorStatus.cdcStatus && ( - )} +
{syncStatusChild}
); diff --git a/ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx b/ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx index 80cb35701c..f1fea81f17 100644 --- a/ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx @@ -29,5 +29,9 @@ export default async function SyncStatus({ flowJobName }: SyncStatusProps) { numRows: sync.rows_in_batch, })); - return ; + return ( +
+ +
+ ); } diff --git a/ui/tailwind.config.ts b/ui/tailwind.config.ts index 49e081636c..afde1d67cf 100644 --- a/ui/tailwind.config.ts +++ b/ui/tailwind.config.ts @@ -16,6 +16,7 @@ module.exports = { current: 'currentColor', extend: { colors: { + ...appThemeColors, blue: { 500: appThemeColors.accent.fill.normal, }, From 86d9e721dcd830d899e97114411e1e15670d7462 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 10 Nov 2023 17:15:20 +0000 Subject: [PATCH 3/3] Remove use of deprecated github ci command set-output (#635) Also update Actions setup-go & login-action https://github.blog/changelog/2022-10-11-github-actions-deprecating-save-state-and-set-output-commands --- .github/workflows/customer-docker.yml | 6 +++--- .github/workflows/dev-docker.yml | 4 ++-- .github/workflows/flow.yml | 3 ++- .github/workflows/stable-docker.yml | 2 +- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/.github/workflows/customer-docker.yml b/.github/workflows/customer-docker.yml index f8c6ae2781..faede44f28 100644 --- a/.github/workflows/customer-docker.yml +++ b/.github/workflows/customer-docker.yml @@ -26,7 +26,7 @@ jobs: - uses: depot/setup-action@v1 - name: Login to GitHub Container Registry - uses: docker/login-action@v2.1.0 + uses: docker/login-action@v3 with: registry: ghcr.io username: ${{github.actor}} @@ -34,12 +34,12 @@ jobs: - name: Set Short Commit Hash id: vars - run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)" + run: echo "sha_short=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT - name: extract branch name suffix for customer id: branch run: | - echo "::set-output name=branch::$(echo $GITHUB_REF | sed -e 's/.*customer-//')" + echo "branch=$(echo $GITHUB_REF | sed -e 's/.*customer-//')" >> $GITHUB_OUTPUT - name: Build (optionally publish) PeerDB Images uses: depot/bake-action@v1 diff --git a/.github/workflows/dev-docker.yml b/.github/workflows/dev-docker.yml index 8d4352004e..d5f1a2dc88 100644 --- a/.github/workflows/dev-docker.yml +++ b/.github/workflows/dev-docker.yml @@ -24,7 +24,7 @@ jobs: - uses: depot/setup-action@v1 - name: Login to GitHub Container Registry - uses: docker/login-action@v2.1.0 + uses: docker/login-action@v3 with: registry: ghcr.io username: ${{github.actor}} @@ -32,7 +32,7 @@ jobs: - name: Set Short Commit Hash id: vars - run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)" + run: echo "sha_short=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT - name: Build (optionally publish) PeerDB Images uses: depot/bake-action@v1 diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 54887adeac..d1a61dbcb7 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -33,9 +33,10 @@ jobs: - name: checkout sources uses: actions/checkout@v4 - - uses: actions/setup-go@v3 + - uses: actions/setup-go@v4 with: go-version: ">=1.21.0" + cache-dependency-path: flow/go.sum - name: install gotestsum run: | diff --git a/.github/workflows/stable-docker.yml b/.github/workflows/stable-docker.yml index 48f9faa113..e1a1f0c4ff 100644 --- a/.github/workflows/stable-docker.yml +++ b/.github/workflows/stable-docker.yml @@ -23,7 +23,7 @@ jobs: - uses: depot/setup-action@v1 - name: Login to GitHub Container Registry - uses: docker/login-action@v2.1.0 + uses: docker/login-action@v3 with: registry: ghcr.io username: ${{github.actor}}