From d407f9e8ba3b5ea2a8b6e2319dcc2cef49b60385 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 25 Dec 2023 13:09:21 +0000 Subject: [PATCH 1/5] BQ/SF: always use GREATEST when setting offset (#896) While adding SetLastOffset, missed updating previous metadata update sql to use GREATEST too --- flow/connectors/bigquery/bigquery.go | 2 +- flow/connectors/snowflake/snowflake.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 64d12057f6..3c0787527e 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -927,7 +927,7 @@ func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedChec c.datasetID, MirrorJobsTable, jobName, lastSyncedCheckpointID, batchID) if hasJob { jobStatement = fmt.Sprintf( - "UPDATE %s.%s SET offset = %d,sync_batch_id=%d WHERE mirror_job_name = '%s';", + "UPDATE %s.%s SET offset=GREATEST(offset,%d),sync_batch_id=%d WHERE mirror_job_name = '%s';", c.datasetID, MirrorJobsTable, lastSyncedCheckpointID, batchID, jobName) } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index e47c27a5f4..db13e188b8 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -39,11 +39,10 @@ const ( rawTableMultiValueInsertSQL = "INSERT INTO %s.%s VALUES%s" createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)" toVariantColumnName = "VAR_COLS" - mergeStatementSQL = `MERGE INTO %s TARGET USING (WITH VARIANT_CONVERTED AS (SELECT _PEERDB_UID, - _PEERDB_TIMESTAMP, - TO_VARIANT(PARSE_JSON(_PEERDB_DATA)) %s,_PEERDB_RECORD_TYPE,_PEERDB_MATCH_DATA,_PEERDB_BATCH_ID, - _PEERDB_UNCHANGED_TOAST_COLUMNS FROM - _PEERDB_INTERNAL.%s WHERE _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND + mergeStatementSQL = `MERGE INTO %s TARGET USING (WITH VARIANT_CONVERTED AS ( + SELECT _PEERDB_UID,_PEERDB_TIMESTAMP,TO_VARIANT(PARSE_JSON(_PEERDB_DATA)) %s,_PEERDB_RECORD_TYPE, + _PEERDB_MATCH_DATA,_PEERDB_BATCH_ID,_PEERDB_UNCHANGED_TOAST_COLUMNS + FROM _PEERDB_INTERNAL.%s WHERE _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND _PEERDB_DESTINATION_TABLE_NAME = ? ), FLATTENED AS (SELECT _PEERDB_UID,_PEERDB_TIMESTAMP,_PEERDB_RECORD_TYPE,_PEERDB_MATCH_DATA,_PEERDB_BATCH_ID, _PEERDB_UNCHANGED_TOAST_COLUMNS,%s @@ -66,7 +65,8 @@ const ( insertJobMetadataSQL = "INSERT INTO %s.%s VALUES (?,?,?,?)" - updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET OFFSET=?, SYNC_BATCH_ID=? WHERE MIRROR_JOB_NAME=?" + updateMetadataForSyncRecordsSQL = `UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?), SYNC_BATCH_ID=? + WHERE MIRROR_JOB_NAME=?` updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET NORMALIZE_BATCH_ID=? WHERE MIRROR_JOB_NAME=?" checkIfTableExistsSQL = `SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.TABLES From ac5bbb2b851ffa033c9ddf3815f706fa2cc7b254 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Mon, 25 Dec 2023 18:39:56 +0530 Subject: [PATCH 2/5] UI: Create Mirror And Mirror Overview Improvements (#890) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## UI Improvement Features These are some features which I thought would be nice to get in. At the end of the day, these are just proposals from my end. ### Mirror Create Pages: Error Toasts And Floating Button We now show errors as toasts and have the Create Mirror button with a fixed position on the bottom-right. Users now don't have to do a lot of scrolling up and down to look at the error message, come back, and click create. Screenshot 2023-12-23 at 10 26 14 PM ### QRep Mirror: Upsert Columns Selection of Unique Key Columns for QRep Upsert mode now looks like this, saving users from having to type out columns. Also added validation for the columns being an empty array. Screenshot 2023-12-23 at 9 49 35 PM ### Better Tabs UI for Mirror Overview I thought the tabs we have there look unpolished so used Tremor to come up with this. This also achieves significant code reduction in that file. Screenshot 2023-12-23 at 11 37 58 PM ### Wiring Status in Mirror Overview Page Wires in the Status we show in the mirror overview page. This is a follow-up to #883 Screenshot 2023-12-23 at 10 28 23 PM ### Others - Removes 'Authentication failed' message in login landing page. - Makes the source-destination table list in Mirror Overview page have scrollable height and sticky headers - Error table now has time column before message column and the rows are sorted by timestamp (latest first) --------- Co-authored-by: Kaushik Iska --- ui/app/login/page.tsx | 4 +- ui/app/mirrors/create/handlers.ts | 75 +++++-------- ui/app/mirrors/create/helpers/qrep.ts | 3 +- ui/app/mirrors/create/page.tsx | 93 ++++++++-------- ui/app/mirrors/create/qrep/qrep.tsx | 13 ++- ui/app/mirrors/create/qrep/upsertcols.tsx | 95 +++++++++++++++++ ui/app/mirrors/edit/[mirrorId]/cdc.tsx | 87 ++++----------- ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx | 13 +-- ui/app/mirrors/edit/[mirrorId]/tablePairs.tsx | 100 ++++++++++-------- ui/app/mirrors/errors/[mirrorName]/page.tsx | 19 ++-- ui/app/mirrors/mirror-status.tsx | 15 ++- ui/app/mirrors/tables.tsx | 2 +- ui/package-lock.json | 21 ++++ ui/package.json | 1 + 14 files changed, 308 insertions(+), 233 deletions(-) create mode 100644 ui/app/mirrors/create/qrep/upsertcols.tsx diff --git a/ui/app/login/page.tsx b/ui/app/login/page.tsx index 41c00c0dd8..1172b0e811 100644 --- a/ui/app/login/page.tsx +++ b/ui/app/login/page.tsx @@ -13,9 +13,7 @@ export default function Login() { const searchParams = useSearchParams(); const [pass, setPass] = useState(''); const [show, setShow] = useState(false); - const [error, setError] = useState(() => - searchParams.has('reject') ? 'Authentication failed, please login' : '' - ); + const [error, setError] = useState(() => ''); const login = () => { fetch('/api/login', { method: 'POST', diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index 039766171d..81f36bd739 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -73,45 +73,33 @@ const validateCDCFields = ( } | undefined )[], - setMsg: Dispatch>, config: CDCConfig -): boolean => { +): string | undefined => { let validationErr: string | undefined; const tablesValidity = tableMappingSchema.safeParse(tableMapping); if (!tablesValidity.success) { validationErr = tablesValidity.error.issues[0].message; - setMsg({ ok: false, msg: validationErr }); - return false; } const configValidity = cdcSchema.safeParse(config); if (!configValidity.success) { validationErr = configValidity.error.issues[0].message; - setMsg({ ok: false, msg: validationErr }); - return false; } - setMsg({ ok: true, msg: '' }); - return true; + return validationErr; }; const validateQRepFields = ( query: string, - setMsg: Dispatch>, config: QRepConfig -): boolean => { +): string | undefined => { if (query.length < 5) { - setMsg({ ok: false, msg: 'Query is invalid' }); - return false; + return 'Query is invalid'; } - let validationErr: string | undefined; const configValidity = qrepSchema.safeParse(config); if (!configValidity.success) { validationErr = configValidity.error.issues[0].message; - setMsg({ ok: false, msg: validationErr }); - return false; } - setMsg({ ok: true, msg: '' }); - return true; + return validationErr; }; interface TableMapping { @@ -140,25 +128,23 @@ export const handleCreateCDC = async ( flowJobName: string, rows: TableMapRow[], config: CDCConfig, - setMsg: Dispatch< - SetStateAction<{ - ok: boolean; - msg: string; - }> - >, + notify: (msg: string) => void, setLoading: Dispatch>, route: RouteCallback ) => { const flowNameValid = flowNameSchema.safeParse(flowJobName); if (!flowNameValid.success) { const flowNameErr = flowNameValid.error.issues[0].message; - setMsg({ ok: false, msg: flowNameErr }); + notify(flowNameErr); return; } const tableNameMapping = reformattedTableMapping(rows); - const isValid = validateCDCFields(tableNameMapping, setMsg, config); - if (!isValid) return; + const fieldErr = validateCDCFields(tableNameMapping, config); + if (fieldErr) { + notify(fieldErr); + return; + } config['tableMappings'] = tableNameMapping as TableMapping[]; config['flowJobName'] = flowJobName; @@ -172,10 +158,7 @@ export const handleCreateCDC = async ( } if (config.doInitialCopy == false && config.initialCopyOnly == true) { - setMsg({ - ok: false, - msg: 'Initial Copy Only cannot be true if Initial Copy is false.', - }); + notify('Initial Copy Only cannot be true if Initial Copy is false.'); return; } @@ -187,11 +170,11 @@ export const handleCreateCDC = async ( }), }).then((res) => res.json()); if (!statusMessage.created) { - setMsg({ ok: false, msg: 'unable to create mirror.' }); + notify('unable to create mirror.'); setLoading(false); return; } - setMsg({ ok: true, msg: 'CDC Mirror created successfully' }); + notify('CDC Mirror created successfully'); route(); setLoading(false); }; @@ -209,12 +192,7 @@ export const handleCreateQRep = async ( flowJobName: string, query: string, config: QRepConfig, - setMsg: Dispatch< - SetStateAction<{ - ok: boolean; - msg: string; - }> - >, + notify: (msg: string) => void, setLoading: Dispatch>, route: RouteCallback, xmin?: boolean @@ -222,7 +200,7 @@ export const handleCreateQRep = async ( const flowNameValid = flowNameSchema.safeParse(flowJobName); if (!flowNameValid.success) { const flowNameErr = flowNameValid.error.issues[0].message; - setMsg({ ok: false, msg: flowNameErr }); + notify(flowNameErr); return; } @@ -237,16 +215,17 @@ export const handleCreateQRep = async ( if ( config.writeMode?.writeType == QRepWriteType.QREP_WRITE_MODE_UPSERT && - !config.writeMode?.upsertKeyColumns + (!config.writeMode?.upsertKeyColumns || + config.writeMode?.upsertKeyColumns.length == 0) ) { - setMsg({ - ok: false, - msg: 'For upsert mode, unique key columns cannot be empty.', - }); + notify('For upsert mode, unique key columns cannot be empty.'); + return; + } + const fieldErr = validateQRepFields(query, config); + if (fieldErr) { + notify(fieldErr); return; } - const isValid = validateQRepFields(query, setMsg, config); - if (!isValid) return; config.flowJobName = flowJobName; config.query = query; @@ -267,11 +246,11 @@ export const handleCreateQRep = async ( } ).then((res) => res.json()); if (!statusMessage.created) { - setMsg({ ok: false, msg: 'unable to create mirror.' }); + notify('unable to create mirror.'); setLoading(false); return; } - setMsg({ ok: true, msg: 'Query Replication Mirror created successfully' }); + notify('Query Replication Mirror created successfully'); route(); setLoading(false); }; diff --git a/ui/app/mirrors/create/helpers/qrep.ts b/ui/app/mirrors/create/helpers/qrep.ts index 654d5c7ff3..fca1f9fcb2 100644 --- a/ui/app/mirrors/create/helpers/qrep.ts +++ b/ui/app/mirrors/create/helpers/qrep.ts @@ -112,8 +112,9 @@ export const qrepSettings: MirrorSetting[] = [ writeMode: currWriteMode, }; }), - tips: `Comma separated string column names. Needed when write mode is set to UPSERT. + tips: `Needed when write mode is set to UPSERT. These columns need to be unique and are used for updates.`, + type: 'select', }, { label: 'Initial Copy Only', diff --git a/ui/app/mirrors/create/page.tsx b/ui/app/mirrors/create/page.tsx index a075432bb3..497d3aea4b 100644 --- a/ui/app/mirrors/create/page.tsx +++ b/ui/app/mirrors/create/page.tsx @@ -4,17 +4,18 @@ import { RequiredIndicator } from '@/components/RequiredIndicator'; import { QRepConfig } from '@/grpc_generated/flow'; import { DBType, Peer } from '@/grpc_generated/peers'; import { Button } from '@/lib/Button'; -import { ButtonGroup } from '@/lib/ButtonGroup'; +import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import { RowWithSelect, RowWithTextField } from '@/lib/Layout'; import { Panel } from '@/lib/Panel'; import { TextField } from '@/lib/TextField'; import { Divider } from '@tremor/react'; import Image from 'next/image'; -import Link from 'next/link'; import { useRouter } from 'next/navigation'; import { useEffect, useState } from 'react'; import ReactSelect from 'react-select'; +import { ToastContainer, toast } from 'react-toastify'; +import 'react-toastify/dist/ReactToastify.css'; import { InfoPopover } from '../../../components/InfoPopover'; import { CDCConfig, TableMapRow } from '../../dto/MirrorsDTO'; import CDCConfigForm from './cdc/cdc'; @@ -47,22 +48,23 @@ function getPeerLabel(peer: Peer) { ); } +const notifyErr = (errMsg: string) => { + toast.error(errMsg, { + position: toast.POSITION.BOTTOM_CENTER, + }); +}; export default function CreateMirrors() { const router = useRouter(); const [mirrorName, setMirrorName] = useState(''); const [mirrorType, setMirrorType] = useState(''); - const [formMessage, setFormMessage] = useState<{ ok: boolean; msg: string }>({ - ok: true, - msg: '', - }); const [loading, setLoading] = useState(false); const [config, setConfig] = useState(blankCDCSetting); const [peers, setPeers] = useState([]); const [rows, setRows] = useState([]); const [qrepQuery, setQrepQuery] = useState(`-- Here's a sample template: - SELECT * FROM - WHERE + SELECT * FROM + WHERE BETWEEN {{.start}} AND {{.end}}`); useEffect(() => { @@ -183,15 +185,7 @@ export default function CreateMirrors() { Configuration )} - {!loading && formMessage.msg.length > 0 && ( - - )} + {!loading && } {mirrorType === '' ? ( <> ) : mirrorType === 'CDC' ? ( @@ -213,36 +207,41 @@ export default function CreateMirrors() { {mirrorType && ( - - - - + )} diff --git a/ui/app/mirrors/create/qrep/qrep.tsx b/ui/app/mirrors/create/qrep/qrep.tsx index 6b5b7b4b35..9dd7943b80 100644 --- a/ui/app/mirrors/create/qrep/qrep.tsx +++ b/ui/app/mirrors/create/qrep/qrep.tsx @@ -14,6 +14,7 @@ import { MirrorSetter } from '../../../dto/MirrorsDTO'; import { defaultSyncMode } from '../cdc/cdc'; import { fetchAllTables, fetchColumns } from '../handlers'; import { MirrorSetting, blankQRepSetting } from '../helpers/common'; +import UpsertColsDisplay from './upsertcols'; interface QRepConfigProps { settings: MirrorSetting[]; @@ -29,10 +30,6 @@ interface QRepConfigProps { xmin?: boolean; } -const SyncModes = ['AVRO', 'Copy with Binary'].map((value) => ({ - label: value, - value, -})); const WriteModes = ['Append', 'Upsert', 'Overwrite'].map((value) => ({ label: value, value, @@ -50,6 +47,7 @@ export default function QRepConfigForm({ const [watermarkColumns, setWatermarkColumns] = useState< { value: string; label: string }[] >([]); + const [loading, setLoading] = useState(false); const handleChange = (val: string | boolean, setting: MirrorSetting) => { @@ -220,6 +218,13 @@ export default function QRepConfigForm({ } options={WriteModes} /> + ) : setting.label === 'Upsert Key Columns' ? ( + ) : ( { + const [uniqueColumnsSet, setUniqueColumnsSet] = useState>( + new Set() + ); + + const handleUniqueColumns = ( + col: string, + setting: MirrorSetting, + action: 'add' | 'remove' + ) => { + if (action === 'add') setUniqueColumnsSet((prev) => new Set(prev).add(col)); + else if (action === 'remove') { + setUniqueColumnsSet((prev) => { + const newSet = new Set(prev); + newSet.delete(col); + return newSet; + }); + } + const uniqueColsArr = Array.from(uniqueColumnsSet); + setting.stateHandler(uniqueColsArr, setter); + }; + + useEffect(() => { + const uniqueColsArr = Array.from(uniqueColumnsSet); + setter((curr) => { + let defaultMode: QRepWriteMode = { + writeType: QRepWriteType.QREP_WRITE_MODE_APPEND, + upsertKeyColumns: [], + }; + let currWriteMode = (curr as QRepConfig).writeMode || defaultMode; + currWriteMode.upsertKeyColumns = uniqueColsArr as string[]; + return { + ...curr, + writeMode: currWriteMode, + }; + }); + }, [uniqueColumnsSet, setter]); + return ( + <> + { + val && handleUniqueColumns(val.value, setting, 'add'); + }} + isLoading={loading} + options={columns} + /> +
+ {Array.from(uniqueColumnsSet).map((col: string) => { + return ( + + {col} + + + ); + })} +
+ + ); +}; + +export default UpsertColsDisplay; diff --git a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx index 7f54d227ab..8c33fa7c9b 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx @@ -13,12 +13,11 @@ import { Label } from '@/lib/Label'; import { ProgressBar } from '@/lib/ProgressBar'; import { SearchField } from '@/lib/SearchField'; import { Table, TableCell, TableRow } from '@/lib/Table'; -import * as Tabs from '@radix-ui/react-tabs'; +import { Tab, TabGroup, TabList, TabPanel, TabPanels } from '@tremor/react'; import moment, { Duration, Moment } from 'moment'; import Link from 'next/link'; import { useEffect, useMemo, useState } from 'react'; import ReactSelect from 'react-select'; -import styled from 'styled-components'; import CdcDetails from './cdcDetails'; class TableCloneSummary { @@ -264,21 +263,6 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => { ); }; -const Trigger = styled( - ({ isActive, ...props }: { isActive?: boolean } & Tabs.TabsTriggerProps) => ( - - ) -)<{ isActive?: boolean }>` - background-color: ${({ theme, isActive }) => - isActive ? theme.colors.accent.surface.selected : 'white'}; - - font-weight: ${({ isActive }) => (isActive ? 'bold' : 'normal')}; - - &:hover { - color: ${({ theme }) => theme.colors.accent.text.highContrast}; - } -`; - type CDCMirrorStatusProps = { cdc: CDCMirrorStatus; rows: SyncStatusRow[]; @@ -298,11 +282,6 @@ export function CDCMirror({ snapshot = ; } - const handleTab = (tabVal: string) => { - localStorage.setItem('mirrortab', tabVal); - setSelectedTab(tabVal); - }; - useEffect(() => { if (typeof window !== 'undefined') { setSelectedTab(localStorage?.getItem('mirrortab') || 'tab1'); @@ -310,48 +289,26 @@ export function CDCMirror({ }, []); return ( - handleTab(val)} - style={{ marginTop: '2rem' }} - > - - - Overview - - - Sync Status - - - Initial Copy - - - - - - - {syncStatusChild} - - - {snapshot} - - + + + Overview + Sync Status + Initial Copy + + + + + + {syncStatusChild} + {snapshot} + + ); } diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx index 32992f871d..e7729c487d 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx @@ -5,10 +5,9 @@ import PeerButton from '@/components/PeerComponent'; import TimeLabel from '@/components/TimeComponent'; import { FlowConnectionConfigs } from '@/grpc_generated/flow'; import { dBTypeFromJSON } from '@/grpc_generated/peers'; -import { Badge } from '@/lib/Badge'; -import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import moment from 'moment'; +import { MirrorError } from '../../mirror-status'; import MirrorValues from './configValues'; import TablePairs from './tablePairs'; @@ -33,12 +32,10 @@ function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {
- +
diff --git a/ui/app/mirrors/edit/[mirrorId]/tablePairs.tsx b/ui/app/mirrors/edit/[mirrorId]/tablePairs.tsx index 5289e77a04..516b8dd3ce 100644 --- a/ui/app/mirrors/edit/[mirrorId]/tablePairs.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/tablePairs.tsx @@ -15,7 +15,7 @@ const TablePairs = ({ tables }: { tables?: TableMapping[] }) => { }, [tables, searchQuery]); if (tables) return ( - <> +
{ }} />
- - - - - + - Destination Table - - - - - {shownTables?.map((table) => ( - - - + + - ))} - -
- Source Table - +
- {table.sourceTableIdentifier} - - {table.destinationTableIdentifier} - + Source Table + + Destination Table +
- + + + {shownTables?.map((table) => ( + + + {table.sourceTableIdentifier} + + + {table.destinationTableIdentifier} + + + ))} + + +
+
); }; diff --git a/ui/app/mirrors/errors/[mirrorName]/page.tsx b/ui/app/mirrors/errors/[mirrorName]/page.tsx index 75e8c91d2b..97efca644a 100644 --- a/ui/app/mirrors/errors/[mirrorName]/page.tsx +++ b/ui/app/mirrors/errors/[mirrorName]/page.tsx @@ -15,6 +15,9 @@ const MirrorError = async ({ params: { mirrorName } }: MirrorErrorProps) => { error_type: 'error', }, distinct: ['error_message'], + orderBy: { + error_timestamp: 'desc', + }, }); return ( @@ -40,12 +43,10 @@ const MirrorError = async ({ params: { mirrorName } }: MirrorErrorProps) => { header={ Type + Message - - - } > @@ -54,15 +55,15 @@ const MirrorError = async ({ params: { mirrorName } }: MirrorErrorProps) => { {mirrorError.error_type.toUpperCase()} - - {mirrorError.error_message} - - + + + {mirrorError.error_message} + ))} diff --git a/ui/app/mirrors/mirror-status.tsx b/ui/app/mirrors/mirror-status.tsx index 2a68b7b3d2..27d797e389 100644 --- a/ui/app/mirrors/mirror-status.tsx +++ b/ui/app/mirrors/mirror-status.tsx @@ -26,7 +26,13 @@ export const ErrorModal = ({ flowName }: { flowName: string }) => { ); }; -export const MirrorError = ({ flowName }: { flowName: string }) => { +export const MirrorError = ({ + flowName, + detailed, +}: { + flowName: string; + detailed: boolean; +}) => { const [flowStatus, setFlowStatus] = useState(); const [isLoading, setIsLoading] = useState(true); const [error, setError] = useState(null); @@ -76,6 +82,13 @@ export const MirrorError = ({ flowName }: { flowName: string }) => { } if (flowStatus == 'healthy') { + if (detailed) + return ( +
+ + +
+ ); return ; } diff --git a/ui/app/mirrors/tables.tsx b/ui/app/mirrors/tables.tsx index 6c1289befc..106c7cd22d 100644 --- a/ui/app/mirrors/tables.tsx +++ b/ui/app/mirrors/tables.tsx @@ -90,7 +90,7 @@ export function CDCFlows({ cdcFlows }: { cdcFlows: any }) {
- + =16", + "react-dom": ">=16" + } + }, + "node_modules/react-toastify/node_modules/clsx": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/clsx/-/clsx-1.2.1.tgz", + "integrity": "sha512-EcR6r5a8bj6pu3ycsa/E/cKVGuTgZJZdsyUYHOksG/UHIiKfjxzRxYJpyVBwYaQeOvghal9fcc4PidlgzugAQg==", + "engines": { + "node": ">=6" + } + }, "node_modules/react-transition-group": { "version": "4.4.5", "resolved": "https://registry.npmjs.org/react-transition-group/-/react-transition-group-4.4.5.tgz", diff --git a/ui/package.json b/ui/package.json index 35fdd5f17d..1ff4f487ca 100644 --- a/ui/package.json +++ b/ui/package.json @@ -49,6 +49,7 @@ "react-dom": "18.2.0", "react-select": "^5.8.0", "react-spinners": "^0.13.8", + "react-toastify": "^9.1.3", "styled-components": "^6.1.1", "swr": "^2.2.4", "zod": "^3.22.4", From 0806d33a47cd7375f98e3d17ce3487e92256d308 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 25 Dec 2023 14:19:07 +0000 Subject: [PATCH 3/5] reqwest: use rustls (#899) Consolidates dependency tree flow-rs wasn't using its reqwest dependency, nor was it using serde, serde_yaml, or tokio --- nexus/Cargo.lock | 77 --------------------------------- nexus/flow-rs/Cargo.toml | 10 ----- nexus/peer-snowflake/Cargo.toml | 2 +- 3 files changed, 1 insertion(+), 88 deletions(-) diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index 652a9ebd11..59914b0952 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -897,11 +897,7 @@ dependencies = [ "anyhow", "catalog", "pt", - "reqwest", - "serde", "serde_json", - "serde_yaml", - "tokio", "tonic", "tonic-health", "tracing", @@ -1286,19 +1282,6 @@ dependencies = [ "tokio-io-timeout", ] -[[package]] -name = "hyper-tls" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" -dependencies = [ - "bytes", - "hyper", - "native-tls", - "tokio", - "tokio-native-tls", -] - [[package]] name = "iana-time-zone" version = "0.1.58" @@ -1574,24 +1557,6 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" -[[package]] -name = "native-tls" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" -dependencies = [ - "lazy_static", - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework", - "security-framework-sys", - "tempfile", -] - [[package]] name = "nom" version = "7.1.3" @@ -1737,15 +1702,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" -[[package]] -name = "openssl-src" -version = "300.2.1+3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fe476c29791a5ca0d1273c697e96085bbabbbea2ef7afd5617e78a4b40332d3" -dependencies = [ - "cc", -] - [[package]] name = "openssl-sys" version = "0.9.97" @@ -1754,7 +1710,6 @@ checksum = "c3eaad34cdd97d81de97964fc7f29e2d104f483840d906ef56daa1912338460b" dependencies = [ "cc", "libc", - "openssl-src", "pkg-config", "vcpkg", ] @@ -2603,12 +2558,10 @@ dependencies = [ "http-body", "hyper", "hyper-rustls", - "hyper-tls", "ipnet", "js-sys", "log", "mime", - "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -2619,7 +2572,6 @@ dependencies = [ "serde_urlencoded", "system-configuration", "tokio", - "tokio-native-tls", "tokio-rustls 0.24.1", "tokio-util", "tower-service", @@ -2970,19 +2922,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_yaml" -version = "0.9.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cc7a1570e38322cfe4154732e5110f887ea57e22b76f4bfd32b5bdd3368666c" -dependencies = [ - "indexmap 2.1.0", - "itoa", - "ryu", - "serde", - "unsafe-libyaml", -] - [[package]] name = "sha2" version = "0.10.8" @@ -3400,16 +3339,6 @@ dependencies = [ "syn 2.0.41", ] -[[package]] -name = "tokio-native-tls" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" -dependencies = [ - "native-tls", - "tokio", -] - [[package]] name = "tokio-openssl" version = "0.6.4" @@ -3756,12 +3685,6 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" -[[package]] -name = "unsafe-libyaml" -version = "0.2.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b" - [[package]] name = "untrusted" version = "0.9.0" diff --git a/nexus/flow-rs/Cargo.toml b/nexus/flow-rs/Cargo.toml index fc989a378f..2fd4bb1cf9 100644 --- a/nexus/flow-rs/Cargo.toml +++ b/nexus/flow-rs/Cargo.toml @@ -3,18 +3,8 @@ name = "flow-rs" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] -tokio = { version = "1", features = ["full"] } -serde = { version = "1.0", features = ["derive"] } -serde_yaml = "0.9" serde_json = "1.0" -reqwest = { version = "0.11", features = [ - "blocking", - "json", - "native-tls-vendored", -] } anyhow = "1.0" tracing = "0.1" tonic = "0.10" diff --git a/nexus/peer-snowflake/Cargo.toml b/nexus/peer-snowflake/Cargo.toml index 6e16ff63cf..45b3fe413d 100644 --- a/nexus/peer-snowflake/Cargo.toml +++ b/nexus/peer-snowflake/Cargo.toml @@ -25,7 +25,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" futures = "0.3" ureq = { version = "2", features = ["json", "charset"] } -reqwest = { version = "0.11", features = ["json", "gzip"] } +reqwest = { version = "0.11", default-features = false, features = ["json", "gzip", "rustls-tls"] } anyhow = "1.0" tokio = { version = "1.21", features = ["full"] } hex = "0.4" From 79e9ce381f0a3adab40c8789e151c35e436a6e2c Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Mon, 25 Dec 2023 10:12:09 -0500 Subject: [PATCH 4/5] Never update consumed xlogpos even when num records is zero (#900) This could potentially cause the WAL to build up but based on reports in the field this seems like the safer alternative for now. --- flow/connectors/postgres/cdc.go | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index f2eda2e5f4..b3686f4d09 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -197,7 +197,6 @@ func (p *PostgresCDCSource) consumeStream( return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err) } } - proposedConsumedXLogPos := consumedXLogPos var standByLastLogged time.Time cdcRecordsStorage := cdc_records.NewCDCRecordsStore(p.flowJobName) @@ -254,17 +253,6 @@ func (p *PostgresCDCSource) consumeStream( for { if pkmRequiresResponse { - // Update XLogPos to the last processed position, we can only confirm - // that this is the last row committed on the destination. - if proposedConsumedXLogPos > consumedXLogPos { - p.logger.Info(fmt.Sprintf("Heartbeat adjusting lsn from %d to %d", consumedXLogPos, proposedConsumedXLogPos)) - consumedXLogPos = proposedConsumedXLogPos - err := p.SetLastOffset(int64(consumedXLogPos)) - if err != nil { - return fmt.Errorf("storing updated LSN failed: %w", err) - } - } - err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos}) if err != nil { @@ -477,13 +465,6 @@ func (p *PostgresCDCSource) consumeStream( if xld.WALStart > clientXLogPos { clientXLogPos = xld.WALStart } - - if cdcRecordsStorage.IsEmpty() { - // given that we have no records it is safe to update the flush wal position - // to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages. - proposedConsumedXLogPos = clientXLogPos - records.UpdateLatestCheckpoint(int64(clientXLogPos)) - } } } } From ed67a63ec9c0bfee98dc8501ac6194acce781837 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 25 Dec 2023 15:28:19 +0000 Subject: [PATCH 5/5] BQ merge: don't be transactional (#895) Discussed with Kevin BQ lock contention, he made #889 to remove temp table Merge is idempotent, so no need to have transaction, which removes need to have advisory lock on catalog --- flow/connectors/bigquery/bigquery.go | 63 ++-------------------- flow/connectors/bigquery/qrep_avro_sync.go | 26 ++++----- 2 files changed, 18 insertions(+), 71 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 3c0787527e..0a220ef424 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -781,25 +781,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err) } - stmts := []string{} + stmts := make([]string, 0, len(distinctTableNames)+1) // append all the statements to one list c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v", c.datasetID, rawTableName, distinctTableNames)) - release, err := c.grabJobsUpdateLock() - if err != nil { - return nil, fmt.Errorf("failed to grab lock: %v", err) - } - - defer func() { - err := release() - if err != nil { - c.logger.Error("failed to release lock", slog.Any("error", err)) - } - }() - - stmts = append(stmts, "BEGIN TRANSACTION;") - for _, tableName := range distinctTableNames { mergeGen := &mergeStmtGenerator{ Dataset: c.datasetID, @@ -824,11 +810,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) "UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';", c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName) stmts = append(stmts, updateMetadataStmt) - stmts = append(stmts, "COMMIT TRANSACTION;") - _, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx) + query := strings.Join(stmts, "\n") + _, err = c.client.Query(query).Read(c.ctx) if err != nil { - return nil, fmt.Errorf("failed to execute statements %s in a transaction: %v", strings.Join(stmts, "\n"), err) + return nil, fmt.Errorf("failed to execute statements %s: %v", query, err) } return &model.NormalizeResponse{ @@ -1023,21 +1009,9 @@ func (c *BigQueryConnector) SetupNormalizedTables( } func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error { - release, err := c.grabJobsUpdateLock() - if err != nil { - return fmt.Errorf("failed to grab lock: %w", err) - } - - defer func() { - err := release() - if err != nil { - c.logger.Error("failed to release lock", slog.Any("error", err)) - } - }() - dataset := c.client.Dataset(c.datasetID) // deleting PeerDB specific tables - err = dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) + err := dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) if err != nil { return fmt.Errorf("failed to delete raw table: %w", err) } @@ -1069,33 +1043,6 @@ func (c *BigQueryConnector) getStagingTableName(flowJobName string) string { return fmt.Sprintf("_peerdb_staging_%s", flowJobName) } -// Bigquery doesn't allow concurrent updates to the same table. -// we grab a lock on catalog to ensure that only one job is updating -// bigquery tables at a time. -// returns a function to release the lock. -func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) { - tx, err := c.catalogPool.Begin(c.ctx) - if err != nil { - return nil, fmt.Errorf("failed to begin transaction: %w", err) - } - - // grab an advisory lock based on the mirror jobs table hash - mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable) - _, err = tx.Exec(c.ctx, "SELECT pg_advisory_xact_lock(hashtext($1))", mjTbl) - if err != nil { - err = tx.Rollback(c.ctx) - return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err) - } - - return func() error { - err = tx.Commit(c.ctx) - if err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) - } - return nil - }, nil -} - func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) { for _, renameRequest := range req.RenameTableOptions { src := renameRequest.CurrentName diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index d52e7c42e3..7ed87b0c06 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -74,12 +74,12 @@ func (s *QRepAvroSyncMethod) SyncRecords( flowJobName, dstTableName, syncBatchID), ) - // execute the statements in a transaction - stmts := []string{} - stmts = append(stmts, "BEGIN TRANSACTION;") - stmts = append(stmts, insertStmt) - stmts = append(stmts, updateMetadataStmt) - stmts = append(stmts, "COMMIT TRANSACTION;") + stmts := []string{ + "BEGIN TRANSACTION;", + insertStmt, + updateMetadataStmt, + "COMMIT TRANSACTION;", + } _, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx) if err != nil { return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err) @@ -136,8 +136,6 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( ) bqClient := s.connector.client datasetID := s.connector.datasetID - // Start a transaction - stmts := []string{"BEGIN TRANSACTION;"} selector := "*" if softDeleteCol != "" { // PeerDB column @@ -150,16 +148,18 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT %s FROM `%s.%s`;", datasetID, dstTableName, selector, datasetID, stagingTable) - stmts = append(stmts, insertStmt) - insertMetadataStmt, err := s.connector.createMetadataInsertStatement(partition, flowJobName, startTime) if err != nil { return -1, fmt.Errorf("failed to create metadata insert statement: %v", err) } slog.Info("Performing transaction inside QRep sync function", flowLog) - stmts = append(stmts, insertMetadataStmt) - stmts = append(stmts, "COMMIT TRANSACTION;") - // Execute the statements in a transaction + + stmts := []string{ + "BEGIN TRANSACTION;", + insertStmt, + insertMetadataStmt, + "COMMIT TRANSACTION;", + } _, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx) if err != nil { return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err)