diff --git a/.github/workflows/customer-docker.yml b/.github/workflows/customer-docker.yml index f8c6ae2781..faede44f28 100644 --- a/.github/workflows/customer-docker.yml +++ b/.github/workflows/customer-docker.yml @@ -26,7 +26,7 @@ jobs: - uses: depot/setup-action@v1 - name: Login to GitHub Container Registry - uses: docker/login-action@v2.1.0 + uses: docker/login-action@v3 with: registry: ghcr.io username: ${{github.actor}} @@ -34,12 +34,12 @@ jobs: - name: Set Short Commit Hash id: vars - run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)" + run: echo "sha_short=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT - name: extract branch name suffix for customer id: branch run: | - echo "::set-output name=branch::$(echo $GITHUB_REF | sed -e 's/.*customer-//')" + echo "branch=$(echo $GITHUB_REF | sed -e 's/.*customer-//')" >> $GITHUB_OUTPUT - name: Build (optionally publish) PeerDB Images uses: depot/bake-action@v1 diff --git a/.github/workflows/dev-docker.yml b/.github/workflows/dev-docker.yml index 8d4352004e..d5f1a2dc88 100644 --- a/.github/workflows/dev-docker.yml +++ b/.github/workflows/dev-docker.yml @@ -24,7 +24,7 @@ jobs: - uses: depot/setup-action@v1 - name: Login to GitHub Container Registry - uses: docker/login-action@v2.1.0 + uses: docker/login-action@v3 with: registry: ghcr.io username: ${{github.actor}} @@ -32,7 +32,7 @@ jobs: - name: Set Short Commit Hash id: vars - run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)" + run: echo "sha_short=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT - name: Build (optionally publish) PeerDB Images uses: depot/bake-action@v1 diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 54887adeac..d1a61dbcb7 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -33,9 +33,10 @@ jobs: - name: checkout sources uses: actions/checkout@v4 - - uses: actions/setup-go@v3 + - uses: actions/setup-go@v4 with: go-version: ">=1.21.0" + cache-dependency-path: flow/go.sum - name: install gotestsum run: | diff --git a/.github/workflows/stable-docker.yml b/.github/workflows/stable-docker.yml index 48f9faa113..e1a1f0c4ff 100644 --- a/.github/workflows/stable-docker.yml +++ b/.github/workflows/stable-docker.yml @@ -23,7 +23,7 @@ jobs: - uses: depot/setup-action@v1 - name: Login to GitHub Container Registry - uses: docker/login-action@v2.1.0 + uses: docker/login-action@v3 with: registry: ghcr.io username: ${{github.actor}} diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 578c0f5f4f..d6a80c52a7 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::{HashMap, HashSet}, sync::Arc, time::Duration}; use analyzer::{PeerDDL, QueryAssocation}; use async_trait::async_trait; @@ -439,6 +439,25 @@ impl NexusBackend { let catalog = self.catalog.lock().await; let mirror_details = Self::check_for_mirror(&catalog, &flow_job.name).await?; if mirror_details.is_none() { + // reject duplicate source tables or duplicate target tables + let table_mappings_count = flow_job.table_mappings.len(); + if table_mappings_count > 1 { + let mut sources = HashSet::with_capacity(table_mappings_count); + let mut destinations = HashSet::with_capacity(table_mappings_count); + for tm in flow_job.table_mappings.iter() { + if !sources.insert(tm.source_table_identifier.as_str()) { + return Err(PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("Duplicate source table identifier {}", tm.source_table_identifier), + }))) + } + if !destinations.insert(tm.destination_table_identifier.as_str()) { + return Err(PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("Duplicate destination table identifier {}", tm.destination_table_identifier), + }))) + } + } + } + catalog .create_cdc_flow_job_entry(flow_job) .await diff --git a/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts b/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts new file mode 100644 index 0000000000..b3721873a9 --- /dev/null +++ b/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts @@ -0,0 +1,92 @@ +import moment from 'moment'; + +type timestampType = { + timestamp: Date; + count: number; +}; + +function aggregateCountsByInterval( + timestamps: timestampType[], + interval: string +): [string, number][] { + let timeUnit; + switch (interval) { + case 'hour': + timeUnit = 'YYYY-MM-DD HH:00:00'; + break; + case '15min': + timeUnit = 'YYYY-MM-DD HH:mm'; + break; + case 'month': + timeUnit = 'YYYY-MM'; + break; + case 'day': + timeUnit = 'YYYY-MM-DD'; + break; + default: + throw new Error('Invalid interval provided'); + } + + // Create an object to store the aggregated counts + const aggregatedCounts: { [key: string]: number } = {}; + + // Iterate through the timestamps and populate the aggregatedCounts object + for (let { timestamp, count } of timestamps) { + const date = roundUpToNearest15Minutes(timestamp); + const formattedTimestamp = moment(date).format(timeUnit); + + if (!aggregatedCounts[formattedTimestamp]) { + aggregatedCounts[formattedTimestamp] = 0; + } + + aggregatedCounts[formattedTimestamp] += count; + } + + // Create an array of intervals between the start and end timestamps + const intervals = []; + + let currentTimestamp = new Date(); + + if (interval === '15min') { + currentTimestamp = roundUpToNearest15Minutes(currentTimestamp); + } + + while (intervals.length < 30) { + intervals.push(moment(currentTimestamp).format(timeUnit)); + if (interval === 'hour') { + currentTimestamp.setHours(currentTimestamp.getHours() - 1); + } else if (interval === '15min') { + currentTimestamp.setMinutes(currentTimestamp.getMinutes() - 15); + } else if (interval === 'month') { + currentTimestamp.setMonth(currentTimestamp.getMonth() - 1); + } else if (interval === 'day') { + currentTimestamp.setDate(currentTimestamp.getDate() - 1); + } + } + + // Populate the result array with intervals and counts + const resultArray: [string, number][] = intervals.map((interval) => [ + interval, + aggregatedCounts[interval] || 0, + ]); + + return resultArray; +} + +function roundUpToNearest15Minutes(date: Date) { + const minutes = date.getMinutes(); + const remainder = minutes % 15; + + if (remainder > 0) { + // Round up to the nearest 15 minutes + date.setMinutes(minutes + (15 - remainder)); + } + + // Reset seconds and milliseconds to zero to maintain the same time + date.setSeconds(0); + date.setMilliseconds(0); + + return date; +} + +export default aggregateCountsByInterval; diff --git a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx index ff8263dc6b..7ef56f193e 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx @@ -18,7 +18,6 @@ import { useQueryState } from 'next-usequerystate'; import Link from 'next/link'; import { useState } from 'react'; import styled from 'styled-components'; -import CDCDetails from './cdcDetails'; class TableCloneSummary { flowJobName: string; @@ -243,7 +242,7 @@ export function CDCMirror({ cdc, syncStatusChild }: CDCMirrorStatusProps) { - + {/* */} {syncStatusChild} diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx index bd20ad1851..fcdf7dca83 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx @@ -1,35 +1,117 @@ +'use client'; import { FlowConnectionConfigs } from '@/grpc_generated/flow'; +import { Badge } from '@/lib/Badge'; +import { Icon } from '@/lib/Icon'; +import { Label } from '@/lib/Label'; +import moment from 'moment'; +import CdcGraph from './cdcGraph'; -type CDCDetailsProps = { - config: FlowConnectionConfigs | undefined; +import PeerButton from '@/components/PeerComponent'; +import { dBTypeFromJSON } from '@/grpc_generated/peers'; + +type SyncStatusRow = { + batchId: number; + startTime: Date; + endTime: Date | null; + numRows: number; }; -export default function CDCDetails({ config }: CDCDetailsProps) { - if (!config) { - return
No configuration provided
; - } +type props = { + syncs: SyncStatusRow[]; + mirrorConfig: FlowConnectionConfigs | undefined; +}; +function CdcDetails({ syncs, mirrorConfig }: props) { + let lastSyncedAt = moment(syncs[0]?.startTime).fromNow(); + let rowsSynced = syncs.reduce((acc, sync) => acc + sync.numRows, 0); return ( -
-

CDC Details

-
- - - - - - - - - - - - - - - -
Source{config.source?.name || '-'}
Destination{config.destination?.name || '-'}
Flow Job Name{config.flowJobName}
+ <> +
+
+
+
+ +
+
+ +
+
+
+
+ +
+
+ +
+
+
+
+ +
+
+ +
+
+
+
+ +
+
+ +
+
+
+
+
+
+ +
+
+ +
+
+
+
+ +
+
+ +
+
+
+
+
+
-
+ ); } + +function numberWithCommas(x: Number): string { + return x.toString().replace(/\B(?=(\d{3})+(?!\d))/g, ','); +} + +export default CdcDetails; diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx b/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx new file mode 100644 index 0000000000..8a8901a836 --- /dev/null +++ b/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx @@ -0,0 +1,135 @@ +import { Label } from '@/lib/Label'; +import moment from 'moment'; +import { useEffect, useState } from 'react'; + +type SyncStatusRow = { + batchId: number; + startTime: Date; + endTime: Date | null; + numRows: number; +}; + +import aggregateCountsByInterval from './aggregatedCountsByInterval'; + +const aggregateTypeMap: { [key: string]: string } = { + '15min': ' 15 mins', + hour: 'Hour', + day: 'Day', + month: 'Month', +}; + +function CdcGraph({ syncs }: { syncs: SyncStatusRow[] }) { + let [aggregateType, setAggregateType] = useState('hour'); + const initialCount: [string, number][] = []; + let [counts, setCounts] = useState(initialCount); + + let rows = syncs.map((sync) => ({ + timestamp: sync.startTime, + count: sync.numRows, + })); + + useEffect(() => { + let counts = aggregateCountsByInterval(rows, aggregateType); + counts = counts.slice(0, 29); + counts = counts.reverse(); + setCounts(counts); + }, [aggregateType, rows]); + + return ( +
+
+ {['15min', 'hour', 'day', 'month'].map((type) => { + return ( + + ); + })} +
+
+ +
+
+ {counts.map((count, i) => ( + + ))} +
+
+ ); +} + +type filterButtonProps = { + aggregateType: string; + selectedAggregateType: string; + setAggregateType: Function; +}; +function FilterButton({ + aggregateType, + selectedAggregateType, + setAggregateType, +}: filterButtonProps): React.ReactNode { + return ( + + ); +} + +function formatGraphLabel(date: Date, aggregateType: String): string { + switch (aggregateType) { + case '15min': + return moment(date).format('MMM Do HH:mm'); + case 'hour': + return moment(date).format('MMM Do HH:mm'); + case 'day': + return moment(date).format('MMM Do'); + case 'month': + return moment(date).format('MMM yy'); + default: + return 'Unknown aggregate type: ' + aggregateType; + } +} + +type GraphBarProps = { + count: number; + label: string; +}; + +function GraphBar({ label, count }: GraphBarProps) { + let color = + count && count > 0 ? 'bg-positive-fill-normal' : 'bg-base-border-subtle'; + let classNames = `relative w-10 h-24 rounded ${color}`; + return ( +
+
+
+
{label}
+
{numberWithCommas(count)}
+
+
+
+ ); +} + +function numberWithCommas(x: number): string { + return x.toString().replace(/\B(?=(\d{3})+(?!\d))/g, ','); +} + +export default CdcGraph; diff --git a/ui/app/mirrors/edit/[mirrorId]/page.tsx b/ui/app/mirrors/edit/[mirrorId]/page.tsx index e2c5e6596a..0bd23dddb0 100644 --- a/ui/app/mirrors/edit/[mirrorId]/page.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/page.tsx @@ -1,10 +1,11 @@ +import prisma from '@/app/utils/prisma'; import { MirrorStatusResponse } from '@/grpc_generated/route'; import { Header } from '@/lib/Header'; import { LayoutMain } from '@/lib/Layout'; import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; import { redirect } from 'next/navigation'; import { Suspense } from 'react'; -import { CDCMirror } from './cdc'; +import CdcDetails from './cdcDetails'; import SyncStatus from './syncStatus'; export const dynamic = 'force-dynamic'; @@ -44,16 +45,36 @@ export default async function EditMirror({ redirect(`/mirrors/status/qrep/${mirrorId}`); } + let syncs = await prisma.cdc_batches.findMany({ + where: { + flow_name: mirrorId, + start_time: { + not: undefined, + }, + }, + orderBy: { + start_time: 'desc', + }, + }); + + const rows = syncs.map((sync) => ({ + batchId: sync.id, + startTime: sync.start_time, + endTime: sync.end_time, + numRows: sync.rows_in_batch, + })); + return (
{mirrorId}
}> {mirrorStatus.cdcStatus && ( - )} +
{syncStatusChild}
); diff --git a/ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx b/ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx index 80cb35701c..f1fea81f17 100644 --- a/ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx @@ -29,5 +29,9 @@ export default async function SyncStatus({ flowJobName }: SyncStatusProps) { numRows: sync.rows_in_batch, })); - return ; + return ( +
+ +
+ ); } diff --git a/ui/tailwind.config.ts b/ui/tailwind.config.ts index 49e081636c..afde1d67cf 100644 --- a/ui/tailwind.config.ts +++ b/ui/tailwind.config.ts @@ -16,6 +16,7 @@ module.exports = { current: 'currentColor', extend: { colors: { + ...appThemeColors, blue: { 500: appThemeColors.accent.fill.normal, },