Skip to content

Commit

Permalink
Merge branch 'main' into update-go
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Nov 10, 2023
2 parents 82798ef + 86d9e72 commit 0b48538
Show file tree
Hide file tree
Showing 12 changed files with 395 additions and 41 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/customer-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ jobs:
- uses: depot/setup-action@v1

- name: Login to GitHub Container Registry
uses: docker/login-action@v2.1.0
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{github.actor}}
password: ${{secrets.GITHUB_TOKEN}}

- name: Set Short Commit Hash
id: vars
run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)"
run: echo "sha_short=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT

- name: extract branch name suffix for customer
id: branch
run: |
echo "::set-output name=branch::$(echo $GITHUB_REF | sed -e 's/.*customer-//')"
echo "branch=$(echo $GITHUB_REF | sed -e 's/.*customer-//')" >> $GITHUB_OUTPUT
- name: Build (optionally publish) PeerDB Images
uses: depot/bake-action@v1
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/dev-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ jobs:
- uses: depot/setup-action@v1

- name: Login to GitHub Container Registry
uses: docker/login-action@v2.1.0
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{github.actor}}
password: ${{secrets.GITHUB_TOKEN}}

- name: Set Short Commit Hash
id: vars
run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)"
run: echo "sha_short=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT

- name: Build (optionally publish) PeerDB Images
uses: depot/bake-action@v1
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ jobs:
- name: checkout sources
uses: actions/checkout@v4

- uses: actions/setup-go@v3
- uses: actions/setup-go@v4
with:
go-version: ">=1.21.0"
cache-dependency-path: flow/go.sum

- name: install gotestsum
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/stable-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
- uses: depot/setup-action@v1

- name: Login to GitHub Container Registry
uses: docker/login-action@v2.1.0
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{github.actor}}
Expand Down
21 changes: 20 additions & 1 deletion nexus/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{collections::{HashMap, HashSet}, sync::Arc, time::Duration};

use analyzer::{PeerDDL, QueryAssocation};
use async_trait::async_trait;
Expand Down Expand Up @@ -439,6 +439,25 @@ impl NexusBackend {
let catalog = self.catalog.lock().await;
let mirror_details = Self::check_for_mirror(&catalog, &flow_job.name).await?;
if mirror_details.is_none() {
// reject duplicate source tables or duplicate target tables
let table_mappings_count = flow_job.table_mappings.len();
if table_mappings_count > 1 {
let mut sources = HashSet::with_capacity(table_mappings_count);
let mut destinations = HashSet::with_capacity(table_mappings_count);
for tm in flow_job.table_mappings.iter() {
if !sources.insert(tm.source_table_identifier.as_str()) {
return Err(PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("Duplicate source table identifier {}", tm.source_table_identifier),
})))
}
if !destinations.insert(tm.destination_table_identifier.as_str()) {
return Err(PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("Duplicate destination table identifier {}", tm.destination_table_identifier),
})))
}
}
}

catalog
.create_cdc_flow_job_entry(flow_job)
.await
Expand Down
92 changes: 92 additions & 0 deletions ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import moment from 'moment';

type timestampType = {
timestamp: Date;
count: number;
};

function aggregateCountsByInterval(
timestamps: timestampType[],
interval: string
): [string, number][] {
let timeUnit;
switch (interval) {
case 'hour':
timeUnit = 'YYYY-MM-DD HH:00:00';
break;
case '15min':
timeUnit = 'YYYY-MM-DD HH:mm';
break;
case 'month':
timeUnit = 'YYYY-MM';
break;
case 'day':
timeUnit = 'YYYY-MM-DD';
break;
default:
throw new Error('Invalid interval provided');
}

// Create an object to store the aggregated counts
const aggregatedCounts: { [key: string]: number } = {};

// Iterate through the timestamps and populate the aggregatedCounts object
for (let { timestamp, count } of timestamps) {
const date = roundUpToNearest15Minutes(timestamp);
const formattedTimestamp = moment(date).format(timeUnit);

if (!aggregatedCounts[formattedTimestamp]) {
aggregatedCounts[formattedTimestamp] = 0;
}

aggregatedCounts[formattedTimestamp] += count;
}

// Create an array of intervals between the start and end timestamps
const intervals = [];

let currentTimestamp = new Date();

if (interval === '15min') {
currentTimestamp = roundUpToNearest15Minutes(currentTimestamp);
}

while (intervals.length < 30) {
intervals.push(moment(currentTimestamp).format(timeUnit));
if (interval === 'hour') {
currentTimestamp.setHours(currentTimestamp.getHours() - 1);
} else if (interval === '15min') {
currentTimestamp.setMinutes(currentTimestamp.getMinutes() - 15);
} else if (interval === 'month') {
currentTimestamp.setMonth(currentTimestamp.getMonth() - 1);
} else if (interval === 'day') {
currentTimestamp.setDate(currentTimestamp.getDate() - 1);
}
}

// Populate the result array with intervals and counts
const resultArray: [string, number][] = intervals.map((interval) => [
interval,
aggregatedCounts[interval] || 0,
]);

return resultArray;
}

function roundUpToNearest15Minutes(date: Date) {
const minutes = date.getMinutes();
const remainder = minutes % 15;

if (remainder > 0) {
// Round up to the nearest 15 minutes
date.setMinutes(minutes + (15 - remainder));
}

// Reset seconds and milliseconds to zero to maintain the same time
date.setSeconds(0);
date.setMilliseconds(0);

return date;
}

export default aggregateCountsByInterval;
3 changes: 1 addition & 2 deletions ui/app/mirrors/edit/[mirrorId]/cdc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import { useQueryState } from 'next-usequerystate';
import Link from 'next/link';
import { useState } from 'react';
import styled from 'styled-components';
import CDCDetails from './cdcDetails';

class TableCloneSummary {
flowJobName: string;
Expand Down Expand Up @@ -243,7 +242,7 @@ export function CDCMirror({ cdc, syncStatusChild }: CDCMirrorStatusProps) {
</Trigger>
</Tabs.List>
<Tabs.Content className='p-5 rounded-b-md' value='tab1'>
<CDCDetails config={cdc.config} />
{/* <CDCDetails config={cdc.config} /> */}
</Tabs.Content>
<Tabs.Content className='p-5 rounded-b-md' value='tab2'>
{syncStatusChild}
Expand Down
134 changes: 108 additions & 26 deletions ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx
Original file line number Diff line number Diff line change
@@ -1,35 +1,117 @@
'use client';
import { FlowConnectionConfigs } from '@/grpc_generated/flow';
import { Badge } from '@/lib/Badge';
import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label';
import moment from 'moment';
import CdcGraph from './cdcGraph';

type CDCDetailsProps = {
config: FlowConnectionConfigs | undefined;
import PeerButton from '@/components/PeerComponent';
import { dBTypeFromJSON } from '@/grpc_generated/peers';

type SyncStatusRow = {
batchId: number;
startTime: Date;
endTime: Date | null;
numRows: number;
};

export default function CDCDetails({ config }: CDCDetailsProps) {
if (!config) {
return <div className='text-red-500'>No configuration provided</div>;
}
type props = {
syncs: SyncStatusRow[];
mirrorConfig: FlowConnectionConfigs | undefined;
};
function CdcDetails({ syncs, mirrorConfig }: props) {
let lastSyncedAt = moment(syncs[0]?.startTime).fromNow();
let rowsSynced = syncs.reduce((acc, sync) => acc + sync.numRows, 0);

return (
<div className='p-4 rounded-md'>
<h2 className='text-xl font-semibold mb-4'>CDC Details</h2>
<div className='overflow-x-auto'>
<table className='min-w-full divide-y divide-gray-300'>
<tbody>
<tr>
<td className='px-4 py-2 font-medium'>Source</td>
<td className='px-4 py-2'>{config.source?.name || '-'}</td>
</tr>
<tr>
<td className='px-4 py-2 font-medium'>Destination</td>
<td className='px-4 py-2'>{config.destination?.name || '-'}</td>
</tr>
<tr>
<td className='px-4 py-2 font-medium'>Flow Job Name</td>
<td className='px-4 py-2'>{config.flowJobName}</td>
</tr>
</tbody>
</table>
<>
<div className='mt-10'>
<div className='flex flex-row'>
<div className='basis-1/4 md:basis-1/3'>
<div>
<Label variant='subheadline' colorName='lowContrast'>
Status
</Label>
</div>
<div>
<Label variant='body'>
<Badge variant='positive' key={1}>
<Icon name='play_circle' />
Active
</Badge>
</Label>
</div>
</div>
<div className='basis-1/4 md:basis-1/3'>
<div>
<Label variant='subheadline' colorName='lowContrast'>
Mirror Type
</Label>
</div>
<div>
<Label variant='body'>CDC</Label>
</div>
</div>
<div className='basis-1/4 md:basis-1/3'>
<div>
<Label variant='subheadline' colorName='lowContrast'>
Source
</Label>
</div>
<div>
<PeerButton
peerName={mirrorConfig?.source?.name ?? ''}
peerType={dBTypeFromJSON(mirrorConfig?.source?.type)}
/>
</div>
</div>
<div className='basis-1/4 md:basis-1/3'>
<div>
<Label variant='subheadline' colorName='lowContrast'>
Destination
</Label>
</div>
<div>
<PeerButton
peerName={mirrorConfig?.destination?.name ?? ''}
peerType={dBTypeFromJSON(mirrorConfig?.destination?.type)}
/>
</div>
</div>
</div>
<div className='flex flex-row mt-10'>
<div className='basis-1/4'>
<div>
<Label variant='subheadline' colorName='lowContrast'>
Last Sync
</Label>
</div>
<div>
<Label variant='body'>{lastSyncedAt}</Label>
</div>
</div>
<div className='basis-1/4'>
<div>
<Label variant='subheadline' colorName='lowContrast'>
Rows synced
</Label>
</div>
<div>
<Label variant='body'>{numberWithCommas(rowsSynced)}</Label>
</div>
</div>
</div>
</div>
<div className='mt-10'>
<CdcGraph syncs={syncs} />
</div>
</div>
</>
);
}

function numberWithCommas(x: Number): string {
return x.toString().replace(/\B(?=(\d{3})+(?!\d))/g, ',');
}

export default CdcDetails;
Loading

0 comments on commit 0b48538

Please sign in to comment.