Skip to content

Commit

Permalink
Merge branch 'main' into datatypes/support-interval
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored Mar 21, 2024
2 parents f75d9fa + 9e28839 commit 8d1f9e7
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 50 deletions.
12 changes: 12 additions & 0 deletions flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,18 @@ func (g *GenericSQLQueryExecutor) CountNonNullRows(
return count.Int64, err
}

func (g *GenericSQLQueryExecutor) CountSRIDs(
ctx context.Context,
schemaName string,
tableName string,
columnName string,
) (int64, error) {
var count pgtype.Int8
err := g.db.QueryRowxContext(ctx, "SELECT COUNT(CASE WHEN ST_SRID("+columnName+
") <> 0 THEN 1 END) AS not_zero FROM "+schemaName+"."+tableName).Scan(&count)
return count.Int64, err
}

func (g *GenericSQLQueryExecutor) columnTypeToQField(ct *sql.ColumnType) (model.QField, error) {
qvKind, ok := g.dbtypeToQValueKind[ct.DatabaseTypeName()]
if !ok {
Expand Down
18 changes: 15 additions & 3 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() {
for range 6 {
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (line,poly) VALUES ($1,$2)
`, srcTableName), "010200000002000000000000000000F03F000000000000004000000000000008400000000000001040",
`, srcTableName), "SRID=5678;010200000002000000000000000000F03F000000000000004000000000000008400000000000001040",
"010300000001000000050000000000000000000000000000000000000000000000"+
"00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+
"00f03f000000000000000000000000000000000000000000000000")
Expand All @@ -143,6 +143,13 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() {
return false
}

// Make sure SRIDs are set
sridCount, err := s.sfHelper.CountSRIDs("test_invalid_geo_sf_avro_cdc", "line")
if err != nil {
s.t.Log(err)
return false
}

polyCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "poly")
if err != nil {
return false
Expand All @@ -151,9 +158,14 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() {
if lineCount != 6 || polyCount != 6 {
s.t.Logf("wrong counts, expect 6 lines 6 polies, not %d lines %d polies", lineCount, polyCount)
return false
} else {
return true
}

if sridCount != 6 {
s.t.Logf("there are some srids that are 0, expected 6 non-zero srids, got %d non-zero srids", sridCount)
return false
}

return true
})
env.Cancel()

Expand Down
9 changes: 9 additions & 0 deletions flow/e2e/snowflake/snowflake_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,15 @@ func (s *SnowflakeTestHelper) CountNonNullRows(tableName string, columnName stri
return int(res), nil
}

func (s *SnowflakeTestHelper) CountSRIDs(tableName string, columnName string) (int, error) {
res, err := s.testClient.CountSRIDs(context.Background(), s.testSchemaName, tableName, columnName)
if err != nil {
return 0, err
}

return int(res), nil
}

func (s *SnowflakeTestHelper) CheckNull(tableName string, colNames []string) (bool, error) {
return s.testClient.CheckNull(context.Background(), s.testSchemaName, tableName, colNames)
}
Expand Down
4 changes: 4 additions & 0 deletions flow/geo/geo.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func GeoValidate(hexWkb string) (string, error) {
}

wkt := geometryObject.ToWKT()

if SRID := geometryObject.SRID(); SRID != 0 {
wkt = fmt.Sprintf("SRID=%d;%s", geometryObject.SRID(), wkt)
}
return wkt, nil
}

Expand Down
13 changes: 11 additions & 2 deletions flow/model/qrecord_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"log/slog"
"strings"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -237,9 +238,17 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) {
return nil, src.err
}

wkb, err := geo.GeoToWKB(v)
geoWkt := v
if strings.HasPrefix(v, "SRID=") {
_, wkt, found := strings.Cut(v, ";")
if found {
geoWkt = wkt
}
}

wkb, err := geo.GeoToWKB(geoWkt)
if err != nil {
src.err = errors.New("failed to convert Geospatial value to wkb")
src.err = fmt.Errorf("failed to convert Geospatial value to wkb: %v", err)
return nil, src.err
}

Expand Down
10 changes: 9 additions & 1 deletion flow/model/qvalue/qvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,15 @@ func compareGeometry(value1, value2 interface{}) bool {
case *geom.Geom:
return v1.Equals(geo2)
case string:
geo1, err := geom.NewGeomFromWKT(v1)
geoWkt := v1
if strings.HasPrefix(geoWkt, "SRID=") {
_, wkt, found := strings.Cut(geoWkt, ";")
if found {
geoWkt = wkt
}
}

geo1, err := geom.NewGeomFromWKT(geoWkt)
if err != nil {
panic(err)
}
Expand Down
44 changes: 17 additions & 27 deletions ui/app/mirrors/[mirrorId]/page.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import prisma from '@/app/utils/prisma';
import EditButton from '@/components/EditButton';
import { ResyncDialog } from '@/components/ResyncDialog';
import MirrorActions from '@/components/MirrorActionsDropdown';
import { FlowConnectionConfigs, FlowStatus } from '@/grpc_generated/flow';
import { DBType } from '@/grpc_generated/peers';
import { MirrorStatusResponse } from '@/grpc_generated/route';
Expand Down Expand Up @@ -77,8 +76,7 @@ export default async function ViewMirror({
}

let syncStatusChild = null;
let resyncComponent = null;
let editButtonHTML = null;
let actionsDropdown = null;

if (mirrorStatus.cdcStatus) {
let rowsSynced = syncs.reduce((acc, sync) => {
Expand All @@ -88,32 +86,27 @@ export default async function ViewMirror({
return acc;
}, 0);
const mirrorConfig = FlowConnectionConfigs.decode(mirrorInfo.config_proto!);
syncStatusChild = (
<SyncStatus rowsSynced={rowsSynced} rows={rows} flowJobName={mirrorId} />
);

const dbType = mirrorConfig.destination!.type;
const canResync =
dbType.valueOf() === DBType.BIGQUERY.valueOf() ||
dbType.valueOf() === DBType.SNOWFLAKE.valueOf();
if (canResync) {
resyncComponent = (
<ResyncDialog
mirrorConfig={mirrorConfig}
workflowId={mirrorInfo.workflow_id || ''}
/>
);
}

syncStatusChild = (
<SyncStatus rowsSynced={rowsSynced} rows={rows} flowJobName={mirrorId} />
);
const isNotPaused =
mirrorStatus.currentFlowState.toString() !==
FlowStatus[FlowStatus.STATUS_PAUSED];
editButtonHTML = (
<div style={{ display: 'flex', alignItems: 'center' }}>
<EditButton
toLink={`/mirrors/${mirrorId}/edit`}
disabled={isNotPaused}
/>
</div>

actionsDropdown = (
<MirrorActions
mirrorConfig={mirrorConfig}
workflowId={mirrorInfo.workflow_id || ''}
editLink={`/mirrors/${mirrorId}/edit`}
canResync={canResync}
isNotPaused={isNotPaused}
/>
);
} else {
redirect(`/mirrors/status/qrep/${mirrorId}`);
Expand All @@ -129,11 +122,8 @@ export default async function ViewMirror({
paddingRight: '2rem',
}}
>
<div style={{ display: 'flex', alignItems: 'center' }}>
<Header variant='title2'>{mirrorId}</Header>
{editButtonHTML}
</div>
{resyncComponent}
<Header variant='title2'>{mirrorId}</Header>
{actionsDropdown}
</div>
<CDCMirror
rows={rows}
Expand Down
4 changes: 0 additions & 4 deletions ui/components/AlertDropdown.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ const AlertDropdown = ({
setOpen((prevOpen) => !prevOpen);
};

const handleClose = () => {
setOpen(false);
};

return (
<DropdownMenu.Root>
<DropdownMenu.Trigger>
Expand Down
16 changes: 5 additions & 11 deletions ui/components/EditButton.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use client';
import { Button } from '@/lib/Button';
import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label';
import { ProgressCircle } from '@/lib/ProgressCircle';
import { useRouter } from 'next/navigation';
Expand All @@ -25,22 +24,17 @@ const EditButton = ({
className='IconButton'
onClick={handleEdit}
aria-label='sort up'
variant='normal'
style={{
display: 'flex',
marginLeft: '1rem',
alignItems: 'center',
backgroundColor: 'whitesmoke',
border: '1px solid rgba(0,0,0,0.1)',
borderRadius: '0.5rem',
alignItems: 'flex-start',
columnGap: '0.3rem',
width: '100%',
}}
disabled={disabled}
>
<Label>Edit Mirror</Label>
{loading ? (
<ProgressCircle variant='determinate_progress_circle' />
) : (
<Icon name='edit' />
)}
{loading && <ProgressCircle variant='determinate_progress_circle' />}
</Button>
);
};
Expand Down
72 changes: 72 additions & 0 deletions ui/components/MirrorActionsDropdown.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
'use client';
import EditButton from '@/components/EditButton';
import { ResyncDialog } from '@/components/ResyncDialog';
import { FlowConnectionConfigs } from '@/grpc_generated/flow';
import { Button } from '@/lib/Button/Button';
import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label/Label';
import * as DropdownMenu from '@radix-ui/react-dropdown-menu';
import { useEffect, useState } from 'react';

const MirrorActions = ({
mirrorConfig,
workflowId,
editLink,
canResync,
isNotPaused,
}: {
mirrorConfig: FlowConnectionConfigs;
workflowId: string;
editLink: string;
canResync: boolean;
isNotPaused: boolean;
}) => {
const [mounted, setMounted] = useState(false);
const [open, setOpen] = useState(false);
const handleToggle = () => {
setOpen((prevOpen) => !prevOpen);
};
useEffect(() => setMounted(true), []);
if (mounted)
return (
<DropdownMenu.Root>
<DropdownMenu.Trigger>
<Button
aria-controls={open ? 'menu-list-grow' : undefined}
aria-haspopup='true'
variant='normal'
onClick={handleToggle}
style={{
boxShadow: '0px 1px 1px rgba(0,0,0,0.1)',
border: '1px solid rgba(0,0,0,0.1)',
}}
>
<Label>Actions</Label>
<Icon name='arrow_downward_alt' />
</Button>
</DropdownMenu.Trigger>

<DropdownMenu.Portal>
<DropdownMenu.Content
style={{
border: '1px solid rgba(0,0,0,0.1)',
borderRadius: '0.5rem',
backgroundColor: 'white',
}}
>
<EditButton toLink={editLink} disabled={isNotPaused} />

{canResync && (
<ResyncDialog
mirrorConfig={mirrorConfig}
workflowId={workflowId}
/>
)}
</DropdownMenu.Content>
</DropdownMenu.Portal>
</DropdownMenu.Root>
);
return <></>;
};

export default MirrorActions;
4 changes: 2 additions & 2 deletions ui/components/ResyncDialog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ export const ResyncDialog = ({
noInteract={true}
size='xLarge'
triggerButton={
<Button variant='blue' style={{ height: '2em', width: '8em' }}>
Resync
<Button variant='normal' style={{ width: '100%' }}>
<Label as='label'>Resync</Label>
</Button>
}
>
Expand Down

0 comments on commit 8d1f9e7

Please sign in to comment.