From d407f9e8ba3b5ea2a8b6e2319dcc2cef49b60385 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 25 Dec 2023 13:09:21 +0000 Subject: [PATCH 1/9] BQ/SF: always use GREATEST when setting offset (#896) While adding SetLastOffset, missed updating previous metadata update sql to use GREATEST too --- flow/connectors/bigquery/bigquery.go | 2 +- flow/connectors/snowflake/snowflake.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 64d12057f6..3c0787527e 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -927,7 +927,7 @@ func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedChec c.datasetID, MirrorJobsTable, jobName, lastSyncedCheckpointID, batchID) if hasJob { jobStatement = fmt.Sprintf( - "UPDATE %s.%s SET offset = %d,sync_batch_id=%d WHERE mirror_job_name = '%s';", + "UPDATE %s.%s SET offset=GREATEST(offset,%d),sync_batch_id=%d WHERE mirror_job_name = '%s';", c.datasetID, MirrorJobsTable, lastSyncedCheckpointID, batchID, jobName) } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index e47c27a5f4..db13e188b8 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -39,11 +39,10 @@ const ( rawTableMultiValueInsertSQL = "INSERT INTO %s.%s VALUES%s" createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)" toVariantColumnName = "VAR_COLS" - mergeStatementSQL = `MERGE INTO %s TARGET USING (WITH VARIANT_CONVERTED AS (SELECT _PEERDB_UID, - _PEERDB_TIMESTAMP, - TO_VARIANT(PARSE_JSON(_PEERDB_DATA)) %s,_PEERDB_RECORD_TYPE,_PEERDB_MATCH_DATA,_PEERDB_BATCH_ID, - _PEERDB_UNCHANGED_TOAST_COLUMNS FROM - _PEERDB_INTERNAL.%s WHERE _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND + mergeStatementSQL = `MERGE INTO %s TARGET USING (WITH VARIANT_CONVERTED AS ( + SELECT _PEERDB_UID,_PEERDB_TIMESTAMP,TO_VARIANT(PARSE_JSON(_PEERDB_DATA)) %s,_PEERDB_RECORD_TYPE, + _PEERDB_MATCH_DATA,_PEERDB_BATCH_ID,_PEERDB_UNCHANGED_TOAST_COLUMNS + FROM _PEERDB_INTERNAL.%s WHERE _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND _PEERDB_DESTINATION_TABLE_NAME = ? ), FLATTENED AS (SELECT _PEERDB_UID,_PEERDB_TIMESTAMP,_PEERDB_RECORD_TYPE,_PEERDB_MATCH_DATA,_PEERDB_BATCH_ID, _PEERDB_UNCHANGED_TOAST_COLUMNS,%s @@ -66,7 +65,8 @@ const ( insertJobMetadataSQL = "INSERT INTO %s.%s VALUES (?,?,?,?)" - updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET OFFSET=?, SYNC_BATCH_ID=? WHERE MIRROR_JOB_NAME=?" + updateMetadataForSyncRecordsSQL = `UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?), SYNC_BATCH_ID=? + WHERE MIRROR_JOB_NAME=?` updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET NORMALIZE_BATCH_ID=? WHERE MIRROR_JOB_NAME=?" checkIfTableExistsSQL = `SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.TABLES From ac5bbb2b851ffa033c9ddf3815f706fa2cc7b254 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Mon, 25 Dec 2023 18:39:56 +0530 Subject: [PATCH 2/9] UI: Create Mirror And Mirror Overview Improvements (#890) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## UI Improvement Features These are some features which I thought would be nice to get in. At the end of the day, these are just proposals from my end. ### Mirror Create Pages: Error Toasts And Floating Button We now show errors as toasts and have the Create Mirror button with a fixed position on the bottom-right. Users now don't have to do a lot of scrolling up and down to look at the error message, come back, and click create. Screenshot 2023-12-23 at 10 26 14 PM ### QRep Mirror: Upsert Columns Selection of Unique Key Columns for QRep Upsert mode now looks like this, saving users from having to type out columns. Also added validation for the columns being an empty array. Screenshot 2023-12-23 at 9 49 35 PM ### Better Tabs UI for Mirror Overview I thought the tabs we have there look unpolished so used Tremor to come up with this. This also achieves significant code reduction in that file. Screenshot 2023-12-23 at 11 37 58 PM ### Wiring Status in Mirror Overview Page Wires in the Status we show in the mirror overview page. This is a follow-up to #883 Screenshot 2023-12-23 at 10 28 23 PM ### Others - Removes 'Authentication failed' message in login landing page. - Makes the source-destination table list in Mirror Overview page have scrollable height and sticky headers - Error table now has time column before message column and the rows are sorted by timestamp (latest first) --------- Co-authored-by: Kaushik Iska --- ui/app/login/page.tsx | 4 +- ui/app/mirrors/create/handlers.ts | 75 +++++-------- ui/app/mirrors/create/helpers/qrep.ts | 3 +- ui/app/mirrors/create/page.tsx | 93 ++++++++-------- ui/app/mirrors/create/qrep/qrep.tsx | 13 ++- ui/app/mirrors/create/qrep/upsertcols.tsx | 95 +++++++++++++++++ ui/app/mirrors/edit/[mirrorId]/cdc.tsx | 87 ++++----------- ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx | 13 +-- ui/app/mirrors/edit/[mirrorId]/tablePairs.tsx | 100 ++++++++++-------- ui/app/mirrors/errors/[mirrorName]/page.tsx | 19 ++-- ui/app/mirrors/mirror-status.tsx | 15 ++- ui/app/mirrors/tables.tsx | 2 +- ui/package-lock.json | 21 ++++ ui/package.json | 1 + 14 files changed, 308 insertions(+), 233 deletions(-) create mode 100644 ui/app/mirrors/create/qrep/upsertcols.tsx diff --git a/ui/app/login/page.tsx b/ui/app/login/page.tsx index 41c00c0dd8..1172b0e811 100644 --- a/ui/app/login/page.tsx +++ b/ui/app/login/page.tsx @@ -13,9 +13,7 @@ export default function Login() { const searchParams = useSearchParams(); const [pass, setPass] = useState(''); const [show, setShow] = useState(false); - const [error, setError] = useState(() => - searchParams.has('reject') ? 'Authentication failed, please login' : '' - ); + const [error, setError] = useState(() => ''); const login = () => { fetch('/api/login', { method: 'POST', diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index 039766171d..81f36bd739 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -73,45 +73,33 @@ const validateCDCFields = ( } | undefined )[], - setMsg: Dispatch>, config: CDCConfig -): boolean => { +): string | undefined => { let validationErr: string | undefined; const tablesValidity = tableMappingSchema.safeParse(tableMapping); if (!tablesValidity.success) { validationErr = tablesValidity.error.issues[0].message; - setMsg({ ok: false, msg: validationErr }); - return false; } const configValidity = cdcSchema.safeParse(config); if (!configValidity.success) { validationErr = configValidity.error.issues[0].message; - setMsg({ ok: false, msg: validationErr }); - return false; } - setMsg({ ok: true, msg: '' }); - return true; + return validationErr; }; const validateQRepFields = ( query: string, - setMsg: Dispatch>, config: QRepConfig -): boolean => { +): string | undefined => { if (query.length < 5) { - setMsg({ ok: false, msg: 'Query is invalid' }); - return false; + return 'Query is invalid'; } - let validationErr: string | undefined; const configValidity = qrepSchema.safeParse(config); if (!configValidity.success) { validationErr = configValidity.error.issues[0].message; - setMsg({ ok: false, msg: validationErr }); - return false; } - setMsg({ ok: true, msg: '' }); - return true; + return validationErr; }; interface TableMapping { @@ -140,25 +128,23 @@ export const handleCreateCDC = async ( flowJobName: string, rows: TableMapRow[], config: CDCConfig, - setMsg: Dispatch< - SetStateAction<{ - ok: boolean; - msg: string; - }> - >, + notify: (msg: string) => void, setLoading: Dispatch>, route: RouteCallback ) => { const flowNameValid = flowNameSchema.safeParse(flowJobName); if (!flowNameValid.success) { const flowNameErr = flowNameValid.error.issues[0].message; - setMsg({ ok: false, msg: flowNameErr }); + notify(flowNameErr); return; } const tableNameMapping = reformattedTableMapping(rows); - const isValid = validateCDCFields(tableNameMapping, setMsg, config); - if (!isValid) return; + const fieldErr = validateCDCFields(tableNameMapping, config); + if (fieldErr) { + notify(fieldErr); + return; + } config['tableMappings'] = tableNameMapping as TableMapping[]; config['flowJobName'] = flowJobName; @@ -172,10 +158,7 @@ export const handleCreateCDC = async ( } if (config.doInitialCopy == false && config.initialCopyOnly == true) { - setMsg({ - ok: false, - msg: 'Initial Copy Only cannot be true if Initial Copy is false.', - }); + notify('Initial Copy Only cannot be true if Initial Copy is false.'); return; } @@ -187,11 +170,11 @@ export const handleCreateCDC = async ( }), }).then((res) => res.json()); if (!statusMessage.created) { - setMsg({ ok: false, msg: 'unable to create mirror.' }); + notify('unable to create mirror.'); setLoading(false); return; } - setMsg({ ok: true, msg: 'CDC Mirror created successfully' }); + notify('CDC Mirror created successfully'); route(); setLoading(false); }; @@ -209,12 +192,7 @@ export const handleCreateQRep = async ( flowJobName: string, query: string, config: QRepConfig, - setMsg: Dispatch< - SetStateAction<{ - ok: boolean; - msg: string; - }> - >, + notify: (msg: string) => void, setLoading: Dispatch>, route: RouteCallback, xmin?: boolean @@ -222,7 +200,7 @@ export const handleCreateQRep = async ( const flowNameValid = flowNameSchema.safeParse(flowJobName); if (!flowNameValid.success) { const flowNameErr = flowNameValid.error.issues[0].message; - setMsg({ ok: false, msg: flowNameErr }); + notify(flowNameErr); return; } @@ -237,16 +215,17 @@ export const handleCreateQRep = async ( if ( config.writeMode?.writeType == QRepWriteType.QREP_WRITE_MODE_UPSERT && - !config.writeMode?.upsertKeyColumns + (!config.writeMode?.upsertKeyColumns || + config.writeMode?.upsertKeyColumns.length == 0) ) { - setMsg({ - ok: false, - msg: 'For upsert mode, unique key columns cannot be empty.', - }); + notify('For upsert mode, unique key columns cannot be empty.'); + return; + } + const fieldErr = validateQRepFields(query, config); + if (fieldErr) { + notify(fieldErr); return; } - const isValid = validateQRepFields(query, setMsg, config); - if (!isValid) return; config.flowJobName = flowJobName; config.query = query; @@ -267,11 +246,11 @@ export const handleCreateQRep = async ( } ).then((res) => res.json()); if (!statusMessage.created) { - setMsg({ ok: false, msg: 'unable to create mirror.' }); + notify('unable to create mirror.'); setLoading(false); return; } - setMsg({ ok: true, msg: 'Query Replication Mirror created successfully' }); + notify('Query Replication Mirror created successfully'); route(); setLoading(false); }; diff --git a/ui/app/mirrors/create/helpers/qrep.ts b/ui/app/mirrors/create/helpers/qrep.ts index 654d5c7ff3..fca1f9fcb2 100644 --- a/ui/app/mirrors/create/helpers/qrep.ts +++ b/ui/app/mirrors/create/helpers/qrep.ts @@ -112,8 +112,9 @@ export const qrepSettings: MirrorSetting[] = [ writeMode: currWriteMode, }; }), - tips: `Comma separated string column names. Needed when write mode is set to UPSERT. + tips: `Needed when write mode is set to UPSERT. These columns need to be unique and are used for updates.`, + type: 'select', }, { label: 'Initial Copy Only', diff --git a/ui/app/mirrors/create/page.tsx b/ui/app/mirrors/create/page.tsx index a075432bb3..497d3aea4b 100644 --- a/ui/app/mirrors/create/page.tsx +++ b/ui/app/mirrors/create/page.tsx @@ -4,17 +4,18 @@ import { RequiredIndicator } from '@/components/RequiredIndicator'; import { QRepConfig } from '@/grpc_generated/flow'; import { DBType, Peer } from '@/grpc_generated/peers'; import { Button } from '@/lib/Button'; -import { ButtonGroup } from '@/lib/ButtonGroup'; +import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import { RowWithSelect, RowWithTextField } from '@/lib/Layout'; import { Panel } from '@/lib/Panel'; import { TextField } from '@/lib/TextField'; import { Divider } from '@tremor/react'; import Image from 'next/image'; -import Link from 'next/link'; import { useRouter } from 'next/navigation'; import { useEffect, useState } from 'react'; import ReactSelect from 'react-select'; +import { ToastContainer, toast } from 'react-toastify'; +import 'react-toastify/dist/ReactToastify.css'; import { InfoPopover } from '../../../components/InfoPopover'; import { CDCConfig, TableMapRow } from '../../dto/MirrorsDTO'; import CDCConfigForm from './cdc/cdc'; @@ -47,22 +48,23 @@ function getPeerLabel(peer: Peer) { ); } +const notifyErr = (errMsg: string) => { + toast.error(errMsg, { + position: toast.POSITION.BOTTOM_CENTER, + }); +}; export default function CreateMirrors() { const router = useRouter(); const [mirrorName, setMirrorName] = useState(''); const [mirrorType, setMirrorType] = useState(''); - const [formMessage, setFormMessage] = useState<{ ok: boolean; msg: string }>({ - ok: true, - msg: '', - }); const [loading, setLoading] = useState(false); const [config, setConfig] = useState(blankCDCSetting); const [peers, setPeers] = useState([]); const [rows, setRows] = useState([]); const [qrepQuery, setQrepQuery] = useState(`-- Here's a sample template: - SELECT * FROM - WHERE + SELECT * FROM + WHERE BETWEEN {{.start}} AND {{.end}}`); useEffect(() => { @@ -183,15 +185,7 @@ export default function CreateMirrors() { Configuration )} - {!loading && formMessage.msg.length > 0 && ( - - )} + {!loading && } {mirrorType === '' ? ( <> ) : mirrorType === 'CDC' ? ( @@ -213,36 +207,41 @@ export default function CreateMirrors() { {mirrorType && ( - - - - + )} diff --git a/ui/app/mirrors/create/qrep/qrep.tsx b/ui/app/mirrors/create/qrep/qrep.tsx index 6b5b7b4b35..9dd7943b80 100644 --- a/ui/app/mirrors/create/qrep/qrep.tsx +++ b/ui/app/mirrors/create/qrep/qrep.tsx @@ -14,6 +14,7 @@ import { MirrorSetter } from '../../../dto/MirrorsDTO'; import { defaultSyncMode } from '../cdc/cdc'; import { fetchAllTables, fetchColumns } from '../handlers'; import { MirrorSetting, blankQRepSetting } from '../helpers/common'; +import UpsertColsDisplay from './upsertcols'; interface QRepConfigProps { settings: MirrorSetting[]; @@ -29,10 +30,6 @@ interface QRepConfigProps { xmin?: boolean; } -const SyncModes = ['AVRO', 'Copy with Binary'].map((value) => ({ - label: value, - value, -})); const WriteModes = ['Append', 'Upsert', 'Overwrite'].map((value) => ({ label: value, value, @@ -50,6 +47,7 @@ export default function QRepConfigForm({ const [watermarkColumns, setWatermarkColumns] = useState< { value: string; label: string }[] >([]); + const [loading, setLoading] = useState(false); const handleChange = (val: string | boolean, setting: MirrorSetting) => { @@ -220,6 +218,13 @@ export default function QRepConfigForm({ } options={WriteModes} /> + ) : setting.label === 'Upsert Key Columns' ? ( + ) : ( { + const [uniqueColumnsSet, setUniqueColumnsSet] = useState>( + new Set() + ); + + const handleUniqueColumns = ( + col: string, + setting: MirrorSetting, + action: 'add' | 'remove' + ) => { + if (action === 'add') setUniqueColumnsSet((prev) => new Set(prev).add(col)); + else if (action === 'remove') { + setUniqueColumnsSet((prev) => { + const newSet = new Set(prev); + newSet.delete(col); + return newSet; + }); + } + const uniqueColsArr = Array.from(uniqueColumnsSet); + setting.stateHandler(uniqueColsArr, setter); + }; + + useEffect(() => { + const uniqueColsArr = Array.from(uniqueColumnsSet); + setter((curr) => { + let defaultMode: QRepWriteMode = { + writeType: QRepWriteType.QREP_WRITE_MODE_APPEND, + upsertKeyColumns: [], + }; + let currWriteMode = (curr as QRepConfig).writeMode || defaultMode; + currWriteMode.upsertKeyColumns = uniqueColsArr as string[]; + return { + ...curr, + writeMode: currWriteMode, + }; + }); + }, [uniqueColumnsSet, setter]); + return ( + <> + { + val && handleUniqueColumns(val.value, setting, 'add'); + }} + isLoading={loading} + options={columns} + /> +
+ {Array.from(uniqueColumnsSet).map((col: string) => { + return ( + + {col} + + + ); + })} +
+ + ); +}; + +export default UpsertColsDisplay; diff --git a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx index 7f54d227ab..8c33fa7c9b 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx @@ -13,12 +13,11 @@ import { Label } from '@/lib/Label'; import { ProgressBar } from '@/lib/ProgressBar'; import { SearchField } from '@/lib/SearchField'; import { Table, TableCell, TableRow } from '@/lib/Table'; -import * as Tabs from '@radix-ui/react-tabs'; +import { Tab, TabGroup, TabList, TabPanel, TabPanels } from '@tremor/react'; import moment, { Duration, Moment } from 'moment'; import Link from 'next/link'; import { useEffect, useMemo, useState } from 'react'; import ReactSelect from 'react-select'; -import styled from 'styled-components'; import CdcDetails from './cdcDetails'; class TableCloneSummary { @@ -264,21 +263,6 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => { ); }; -const Trigger = styled( - ({ isActive, ...props }: { isActive?: boolean } & Tabs.TabsTriggerProps) => ( - - ) -)<{ isActive?: boolean }>` - background-color: ${({ theme, isActive }) => - isActive ? theme.colors.accent.surface.selected : 'white'}; - - font-weight: ${({ isActive }) => (isActive ? 'bold' : 'normal')}; - - &:hover { - color: ${({ theme }) => theme.colors.accent.text.highContrast}; - } -`; - type CDCMirrorStatusProps = { cdc: CDCMirrorStatus; rows: SyncStatusRow[]; @@ -298,11 +282,6 @@ export function CDCMirror({ snapshot = ; } - const handleTab = (tabVal: string) => { - localStorage.setItem('mirrortab', tabVal); - setSelectedTab(tabVal); - }; - useEffect(() => { if (typeof window !== 'undefined') { setSelectedTab(localStorage?.getItem('mirrortab') || 'tab1'); @@ -310,48 +289,26 @@ export function CDCMirror({ }, []); return ( - handleTab(val)} - style={{ marginTop: '2rem' }} - > - - - Overview - - - Sync Status - - - Initial Copy - - - - - - - {syncStatusChild} - - - {snapshot} - - + + + Overview + Sync Status + Initial Copy + + + + + + {syncStatusChild} + {snapshot} + + ); } diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx index 32992f871d..e7729c487d 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx @@ -5,10 +5,9 @@ import PeerButton from '@/components/PeerComponent'; import TimeLabel from '@/components/TimeComponent'; import { FlowConnectionConfigs } from '@/grpc_generated/flow'; import { dBTypeFromJSON } from '@/grpc_generated/peers'; -import { Badge } from '@/lib/Badge'; -import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import moment from 'moment'; +import { MirrorError } from '../../mirror-status'; import MirrorValues from './configValues'; import TablePairs from './tablePairs'; @@ -33,12 +32,10 @@ function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {
- +
diff --git a/ui/app/mirrors/edit/[mirrorId]/tablePairs.tsx b/ui/app/mirrors/edit/[mirrorId]/tablePairs.tsx index 5289e77a04..516b8dd3ce 100644 --- a/ui/app/mirrors/edit/[mirrorId]/tablePairs.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/tablePairs.tsx @@ -15,7 +15,7 @@ const TablePairs = ({ tables }: { tables?: TableMapping[] }) => { }, [tables, searchQuery]); if (tables) return ( - <> +
{ }} />
- - - - - + - Destination Table - - - - - {shownTables?.map((table) => ( - - - + + - ))} - -
- Source Table - +
- {table.sourceTableIdentifier} - - {table.destinationTableIdentifier} - + Source Table + + Destination Table +
- + + + {shownTables?.map((table) => ( + + + {table.sourceTableIdentifier} + + + {table.destinationTableIdentifier} + + + ))} + + +
+
); }; diff --git a/ui/app/mirrors/errors/[mirrorName]/page.tsx b/ui/app/mirrors/errors/[mirrorName]/page.tsx index 75e8c91d2b..97efca644a 100644 --- a/ui/app/mirrors/errors/[mirrorName]/page.tsx +++ b/ui/app/mirrors/errors/[mirrorName]/page.tsx @@ -15,6 +15,9 @@ const MirrorError = async ({ params: { mirrorName } }: MirrorErrorProps) => { error_type: 'error', }, distinct: ['error_message'], + orderBy: { + error_timestamp: 'desc', + }, }); return ( @@ -40,12 +43,10 @@ const MirrorError = async ({ params: { mirrorName } }: MirrorErrorProps) => { header={ Type + Message - - - } > @@ -54,15 +55,15 @@ const MirrorError = async ({ params: { mirrorName } }: MirrorErrorProps) => { {mirrorError.error_type.toUpperCase()} - - {mirrorError.error_message} - - + + + {mirrorError.error_message} + ))} diff --git a/ui/app/mirrors/mirror-status.tsx b/ui/app/mirrors/mirror-status.tsx index 2a68b7b3d2..27d797e389 100644 --- a/ui/app/mirrors/mirror-status.tsx +++ b/ui/app/mirrors/mirror-status.tsx @@ -26,7 +26,13 @@ export const ErrorModal = ({ flowName }: { flowName: string }) => { ); }; -export const MirrorError = ({ flowName }: { flowName: string }) => { +export const MirrorError = ({ + flowName, + detailed, +}: { + flowName: string; + detailed: boolean; +}) => { const [flowStatus, setFlowStatus] = useState(); const [isLoading, setIsLoading] = useState(true); const [error, setError] = useState(null); @@ -76,6 +82,13 @@ export const MirrorError = ({ flowName }: { flowName: string }) => { } if (flowStatus == 'healthy') { + if (detailed) + return ( +
+ + +
+ ); return ; } diff --git a/ui/app/mirrors/tables.tsx b/ui/app/mirrors/tables.tsx index 6c1289befc..106c7cd22d 100644 --- a/ui/app/mirrors/tables.tsx +++ b/ui/app/mirrors/tables.tsx @@ -90,7 +90,7 @@ export function CDCFlows({ cdcFlows }: { cdcFlows: any }) {
- + =16", + "react-dom": ">=16" + } + }, + "node_modules/react-toastify/node_modules/clsx": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/clsx/-/clsx-1.2.1.tgz", + "integrity": "sha512-EcR6r5a8bj6pu3ycsa/E/cKVGuTgZJZdsyUYHOksG/UHIiKfjxzRxYJpyVBwYaQeOvghal9fcc4PidlgzugAQg==", + "engines": { + "node": ">=6" + } + }, "node_modules/react-transition-group": { "version": "4.4.5", "resolved": "https://registry.npmjs.org/react-transition-group/-/react-transition-group-4.4.5.tgz", diff --git a/ui/package.json b/ui/package.json index 35fdd5f17d..1ff4f487ca 100644 --- a/ui/package.json +++ b/ui/package.json @@ -49,6 +49,7 @@ "react-dom": "18.2.0", "react-select": "^5.8.0", "react-spinners": "^0.13.8", + "react-toastify": "^9.1.3", "styled-components": "^6.1.1", "swr": "^2.2.4", "zod": "^3.22.4", From 0806d33a47cd7375f98e3d17ce3487e92256d308 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 25 Dec 2023 14:19:07 +0000 Subject: [PATCH 3/9] reqwest: use rustls (#899) Consolidates dependency tree flow-rs wasn't using its reqwest dependency, nor was it using serde, serde_yaml, or tokio --- nexus/Cargo.lock | 77 --------------------------------- nexus/flow-rs/Cargo.toml | 10 ----- nexus/peer-snowflake/Cargo.toml | 2 +- 3 files changed, 1 insertion(+), 88 deletions(-) diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index 652a9ebd11..59914b0952 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -897,11 +897,7 @@ dependencies = [ "anyhow", "catalog", "pt", - "reqwest", - "serde", "serde_json", - "serde_yaml", - "tokio", "tonic", "tonic-health", "tracing", @@ -1286,19 +1282,6 @@ dependencies = [ "tokio-io-timeout", ] -[[package]] -name = "hyper-tls" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" -dependencies = [ - "bytes", - "hyper", - "native-tls", - "tokio", - "tokio-native-tls", -] - [[package]] name = "iana-time-zone" version = "0.1.58" @@ -1574,24 +1557,6 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" -[[package]] -name = "native-tls" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" -dependencies = [ - "lazy_static", - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework", - "security-framework-sys", - "tempfile", -] - [[package]] name = "nom" version = "7.1.3" @@ -1737,15 +1702,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" -[[package]] -name = "openssl-src" -version = "300.2.1+3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fe476c29791a5ca0d1273c697e96085bbabbbea2ef7afd5617e78a4b40332d3" -dependencies = [ - "cc", -] - [[package]] name = "openssl-sys" version = "0.9.97" @@ -1754,7 +1710,6 @@ checksum = "c3eaad34cdd97d81de97964fc7f29e2d104f483840d906ef56daa1912338460b" dependencies = [ "cc", "libc", - "openssl-src", "pkg-config", "vcpkg", ] @@ -2603,12 +2558,10 @@ dependencies = [ "http-body", "hyper", "hyper-rustls", - "hyper-tls", "ipnet", "js-sys", "log", "mime", - "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -2619,7 +2572,6 @@ dependencies = [ "serde_urlencoded", "system-configuration", "tokio", - "tokio-native-tls", "tokio-rustls 0.24.1", "tokio-util", "tower-service", @@ -2970,19 +2922,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_yaml" -version = "0.9.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cc7a1570e38322cfe4154732e5110f887ea57e22b76f4bfd32b5bdd3368666c" -dependencies = [ - "indexmap 2.1.0", - "itoa", - "ryu", - "serde", - "unsafe-libyaml", -] - [[package]] name = "sha2" version = "0.10.8" @@ -3400,16 +3339,6 @@ dependencies = [ "syn 2.0.41", ] -[[package]] -name = "tokio-native-tls" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" -dependencies = [ - "native-tls", - "tokio", -] - [[package]] name = "tokio-openssl" version = "0.6.4" @@ -3756,12 +3685,6 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" -[[package]] -name = "unsafe-libyaml" -version = "0.2.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b" - [[package]] name = "untrusted" version = "0.9.0" diff --git a/nexus/flow-rs/Cargo.toml b/nexus/flow-rs/Cargo.toml index fc989a378f..2fd4bb1cf9 100644 --- a/nexus/flow-rs/Cargo.toml +++ b/nexus/flow-rs/Cargo.toml @@ -3,18 +3,8 @@ name = "flow-rs" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] -tokio = { version = "1", features = ["full"] } -serde = { version = "1.0", features = ["derive"] } -serde_yaml = "0.9" serde_json = "1.0" -reqwest = { version = "0.11", features = [ - "blocking", - "json", - "native-tls-vendored", -] } anyhow = "1.0" tracing = "0.1" tonic = "0.10" diff --git a/nexus/peer-snowflake/Cargo.toml b/nexus/peer-snowflake/Cargo.toml index 6e16ff63cf..45b3fe413d 100644 --- a/nexus/peer-snowflake/Cargo.toml +++ b/nexus/peer-snowflake/Cargo.toml @@ -25,7 +25,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" futures = "0.3" ureq = { version = "2", features = ["json", "charset"] } -reqwest = { version = "0.11", features = ["json", "gzip"] } +reqwest = { version = "0.11", default-features = false, features = ["json", "gzip", "rustls-tls"] } anyhow = "1.0" tokio = { version = "1.21", features = ["full"] } hex = "0.4" From 79e9ce381f0a3adab40c8789e151c35e436a6e2c Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Mon, 25 Dec 2023 10:12:09 -0500 Subject: [PATCH 4/9] Never update consumed xlogpos even when num records is zero (#900) This could potentially cause the WAL to build up but based on reports in the field this seems like the safer alternative for now. --- flow/connectors/postgres/cdc.go | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index f2eda2e5f4..b3686f4d09 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -197,7 +197,6 @@ func (p *PostgresCDCSource) consumeStream( return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err) } } - proposedConsumedXLogPos := consumedXLogPos var standByLastLogged time.Time cdcRecordsStorage := cdc_records.NewCDCRecordsStore(p.flowJobName) @@ -254,17 +253,6 @@ func (p *PostgresCDCSource) consumeStream( for { if pkmRequiresResponse { - // Update XLogPos to the last processed position, we can only confirm - // that this is the last row committed on the destination. - if proposedConsumedXLogPos > consumedXLogPos { - p.logger.Info(fmt.Sprintf("Heartbeat adjusting lsn from %d to %d", consumedXLogPos, proposedConsumedXLogPos)) - consumedXLogPos = proposedConsumedXLogPos - err := p.SetLastOffset(int64(consumedXLogPos)) - if err != nil { - return fmt.Errorf("storing updated LSN failed: %w", err) - } - } - err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos}) if err != nil { @@ -477,13 +465,6 @@ func (p *PostgresCDCSource) consumeStream( if xld.WALStart > clientXLogPos { clientXLogPos = xld.WALStart } - - if cdcRecordsStorage.IsEmpty() { - // given that we have no records it is safe to update the flush wal position - // to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages. - proposedConsumedXLogPos = clientXLogPos - records.UpdateLatestCheckpoint(int64(clientXLogPos)) - } } } } From ed67a63ec9c0bfee98dc8501ac6194acce781837 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 25 Dec 2023 15:28:19 +0000 Subject: [PATCH 5/9] BQ merge: don't be transactional (#895) Discussed with Kevin BQ lock contention, he made #889 to remove temp table Merge is idempotent, so no need to have transaction, which removes need to have advisory lock on catalog --- flow/connectors/bigquery/bigquery.go | 63 ++-------------------- flow/connectors/bigquery/qrep_avro_sync.go | 26 ++++----- 2 files changed, 18 insertions(+), 71 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 3c0787527e..0a220ef424 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -781,25 +781,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err) } - stmts := []string{} + stmts := make([]string, 0, len(distinctTableNames)+1) // append all the statements to one list c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v", c.datasetID, rawTableName, distinctTableNames)) - release, err := c.grabJobsUpdateLock() - if err != nil { - return nil, fmt.Errorf("failed to grab lock: %v", err) - } - - defer func() { - err := release() - if err != nil { - c.logger.Error("failed to release lock", slog.Any("error", err)) - } - }() - - stmts = append(stmts, "BEGIN TRANSACTION;") - for _, tableName := range distinctTableNames { mergeGen := &mergeStmtGenerator{ Dataset: c.datasetID, @@ -824,11 +810,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) "UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';", c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName) stmts = append(stmts, updateMetadataStmt) - stmts = append(stmts, "COMMIT TRANSACTION;") - _, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx) + query := strings.Join(stmts, "\n") + _, err = c.client.Query(query).Read(c.ctx) if err != nil { - return nil, fmt.Errorf("failed to execute statements %s in a transaction: %v", strings.Join(stmts, "\n"), err) + return nil, fmt.Errorf("failed to execute statements %s: %v", query, err) } return &model.NormalizeResponse{ @@ -1023,21 +1009,9 @@ func (c *BigQueryConnector) SetupNormalizedTables( } func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error { - release, err := c.grabJobsUpdateLock() - if err != nil { - return fmt.Errorf("failed to grab lock: %w", err) - } - - defer func() { - err := release() - if err != nil { - c.logger.Error("failed to release lock", slog.Any("error", err)) - } - }() - dataset := c.client.Dataset(c.datasetID) // deleting PeerDB specific tables - err = dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) + err := dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) if err != nil { return fmt.Errorf("failed to delete raw table: %w", err) } @@ -1069,33 +1043,6 @@ func (c *BigQueryConnector) getStagingTableName(flowJobName string) string { return fmt.Sprintf("_peerdb_staging_%s", flowJobName) } -// Bigquery doesn't allow concurrent updates to the same table. -// we grab a lock on catalog to ensure that only one job is updating -// bigquery tables at a time. -// returns a function to release the lock. -func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) { - tx, err := c.catalogPool.Begin(c.ctx) - if err != nil { - return nil, fmt.Errorf("failed to begin transaction: %w", err) - } - - // grab an advisory lock based on the mirror jobs table hash - mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable) - _, err = tx.Exec(c.ctx, "SELECT pg_advisory_xact_lock(hashtext($1))", mjTbl) - if err != nil { - err = tx.Rollback(c.ctx) - return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err) - } - - return func() error { - err = tx.Commit(c.ctx) - if err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) - } - return nil - }, nil -} - func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) { for _, renameRequest := range req.RenameTableOptions { src := renameRequest.CurrentName diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index d52e7c42e3..7ed87b0c06 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -74,12 +74,12 @@ func (s *QRepAvroSyncMethod) SyncRecords( flowJobName, dstTableName, syncBatchID), ) - // execute the statements in a transaction - stmts := []string{} - stmts = append(stmts, "BEGIN TRANSACTION;") - stmts = append(stmts, insertStmt) - stmts = append(stmts, updateMetadataStmt) - stmts = append(stmts, "COMMIT TRANSACTION;") + stmts := []string{ + "BEGIN TRANSACTION;", + insertStmt, + updateMetadataStmt, + "COMMIT TRANSACTION;", + } _, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx) if err != nil { return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err) @@ -136,8 +136,6 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( ) bqClient := s.connector.client datasetID := s.connector.datasetID - // Start a transaction - stmts := []string{"BEGIN TRANSACTION;"} selector := "*" if softDeleteCol != "" { // PeerDB column @@ -150,16 +148,18 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT %s FROM `%s.%s`;", datasetID, dstTableName, selector, datasetID, stagingTable) - stmts = append(stmts, insertStmt) - insertMetadataStmt, err := s.connector.createMetadataInsertStatement(partition, flowJobName, startTime) if err != nil { return -1, fmt.Errorf("failed to create metadata insert statement: %v", err) } slog.Info("Performing transaction inside QRep sync function", flowLog) - stmts = append(stmts, insertMetadataStmt) - stmts = append(stmts, "COMMIT TRANSACTION;") - // Execute the statements in a transaction + + stmts := []string{ + "BEGIN TRANSACTION;", + insertStmt, + insertMetadataStmt, + "COMMIT TRANSACTION;", + } _, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx) if err != nil { return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err) From f1038ba4613edc66850e8b14e5610dc9c00595e2 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Tue, 26 Dec 2023 01:09:10 +0530 Subject: [PATCH 6/9] Errors UI: Acknowledge functionality (#901) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Users can now mark errors as 'Acknowledged'. If there are no unacknowledged errors for a mirror, it's status is said to be 'Active' Screenshot 2023-12-25 at 10 51 03 PM Users can click on the status and be taken to the page above. Screenshot 2023-12-25 at 10 22 34 PM - Loading indicator for acknowledge button - Error toast incase acknowledge operation fails --- ui/app/api/mirrors/alerts/route.ts | 19 +++ ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx | 7 +- .../mirrors/errors/[mirrorName]/ackbutton.tsx | 57 +++++++++ ui/app/mirrors/errors/[mirrorName]/page.tsx | 119 +++++++++++------- ui/app/mirrors/mirror-status.tsx | 41 +++--- ui/app/mirrors/tables.tsx | 2 +- 6 files changed, 174 insertions(+), 71 deletions(-) create mode 100644 ui/app/mirrors/errors/[mirrorName]/ackbutton.tsx diff --git a/ui/app/api/mirrors/alerts/route.ts b/ui/app/api/mirrors/alerts/route.ts index ecb9891cbd..13cc612503 100644 --- a/ui/app/api/mirrors/alerts/route.ts +++ b/ui/app/api/mirrors/alerts/route.ts @@ -19,3 +19,22 @@ export async function POST(request: Request) { } return new Response(JSON.stringify(mirrorStatus)); } + +// We accept a list here in preparation for a Select All feature in UI +export async function PUT(request: Request) { + const { mirrorIDStringList } = await request.json(); + const mirrorIDList: bigint[] = mirrorIDStringList.map((id: string) => + BigInt(id) + ); + const success = await prisma.flow_errors.updateMany({ + where: { + id: { + in: mirrorIDList, + }, + }, + data: { + ack: true, + }, + }); + return new Response(JSON.stringify(success.count)); +} diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx index e7729c487d..738218c64f 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx @@ -32,10 +32,9 @@ function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {
- +
diff --git a/ui/app/mirrors/errors/[mirrorName]/ackbutton.tsx b/ui/app/mirrors/errors/[mirrorName]/ackbutton.tsx new file mode 100644 index 0000000000..b6d14eed89 --- /dev/null +++ b/ui/app/mirrors/errors/[mirrorName]/ackbutton.tsx @@ -0,0 +1,57 @@ +'use client'; +import { Button } from '@/lib/Button'; +import { Label } from '@/lib/Label'; +import { ProgressCircle } from '@/lib/ProgressCircle'; +import { useState } from 'react'; +import { toast } from 'react-toastify'; + +const notifyErr = (errMsg: string) => { + toast.error(errMsg, { + position: toast.POSITION.BOTTOM_CENTER, + }); +}; + +const AckButton = ({ ack, id }: { ack: boolean; id: number | bigint }) => { + const [loading, setLoading] = useState(false); + const [updated, setUpdated] = useState(false); + // handleAck updates ack to true for the given mirrorID + const handleAck = async (mirrorID: bigint | number) => { + setLoading(true); + const updateResResult = await fetch('/api/mirrors/alerts', { + method: 'PUT', + body: JSON.stringify({ + mirrorIDStringList: [mirrorID.toString()], + }), + }); + const updateRes = await updateResResult.json(); + setLoading(false); + if (!updateRes) { + notifyErr('Something went wrong when trying to acknowledge'); + return; + } + setUpdated(true); + }; + return ( + <> + {ack !== true && updated !== true ? ( + + ) : ( + + )} + + ); +}; + +export default AckButton; diff --git a/ui/app/mirrors/errors/[mirrorName]/page.tsx b/ui/app/mirrors/errors/[mirrorName]/page.tsx index 97efca644a..899b25a496 100644 --- a/ui/app/mirrors/errors/[mirrorName]/page.tsx +++ b/ui/app/mirrors/errors/[mirrorName]/page.tsx @@ -3,6 +3,9 @@ import prisma from '@/app/utils/prisma'; import TimeLabel from '@/components/TimeComponent'; import { Label } from '@/lib/Label'; import { Table, TableCell, TableRow } from '@/lib/Table'; +import { ToastContainer } from 'react-toastify'; +import 'react-toastify/dist/ReactToastify.css'; +import AckButton from './ackbutton'; type MirrorErrorProps = { params: { mirrorName: string }; @@ -14,62 +17,84 @@ const MirrorError = async ({ params: { mirrorName } }: MirrorErrorProps) => { flow_name: mirrorName, error_type: 'error', }, - distinct: ['error_message'], orderBy: { error_timestamp: 'desc', }, }); return ( -
- -
-
- - -
- - Type - - Message - - } + <> +
+ +
+
+ + + +
+ +
+ +
- {mirrorErrors.map((mirrorError) => ( - - - {mirrorError.error_type.toUpperCase()} - - - - - - {mirrorError.error_message} - - - ))} -
+ + Type + + + + Message + + + } + > + {mirrorErrors.map((mirrorError) => ( + + + {mirrorError.error_type.toUpperCase()} + + + + + + {mirrorError.error_message} + + + + + + ))} +
+
-
+ + ); }; diff --git a/ui/app/mirrors/mirror-status.tsx b/ui/app/mirrors/mirror-status.tsx index 27d797e389..b64d87ff23 100644 --- a/ui/app/mirrors/mirror-status.tsx +++ b/ui/app/mirrors/mirror-status.tsx @@ -13,26 +13,21 @@ export const ErrorModal = ({ flowName }: { flowName: string }) => { ); }; -export const MirrorError = ({ - flowName, - detailed, -}: { - flowName: string; - detailed: boolean; -}) => { +export const MirrorError = ({ flowName }: { flowName: string }) => { const [flowStatus, setFlowStatus] = useState(); const [isLoading, setIsLoading] = useState(true); const [error, setError] = useState(null); @@ -81,15 +76,23 @@ export const MirrorError = ({ ); } - if (flowStatus == 'healthy') { - if (detailed) - return ( -
+ if (flowStatus === 'healthy') { + return ( + +
- ); - return ; + + + + ); } return ; diff --git a/ui/app/mirrors/tables.tsx b/ui/app/mirrors/tables.tsx index 106c7cd22d..6c1289befc 100644 --- a/ui/app/mirrors/tables.tsx +++ b/ui/app/mirrors/tables.tsx @@ -90,7 +90,7 @@ export function CDCFlows({ cdcFlows }: { cdcFlows: any }) {
- + Date: Tue, 26 Dec 2023 01:37:18 +0530 Subject: [PATCH 7/9] Fix flow err log and cleanup flowable.go (#887) A few places we were logging peer names instead of flow name for the `peerdb_stats.flow_errors` table. Also some long lines have been split in this PR Co-authored-by: Kaushik Iska --- flow/activities/flowable.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 9591cab251..66a4a2033f 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -72,8 +72,9 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot } defer connectors.CloseConnector(dstConn) + flowName, _ := ctx.Value(shared.FlowNameKey).(string) if err := dstConn.SetupMetadataTables(); err != nil { - a.Alerter.LogFlowError(ctx, config.Name, err) + a.Alerter.LogFlowError(ctx, flowName, err) return fmt.Errorf("failed to setup metadata tables: %w", err) } @@ -112,7 +113,7 @@ func (a *FlowableActivity) EnsurePullability( output, err := srcConn.EnsurePullability(config) if err != nil { - a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err) + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return nil, fmt.Errorf("failed to ensure pullability: %w", err) } @@ -169,7 +170,8 @@ func (a *FlowableActivity) CreateNormalizedTable( setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(config) if err != nil { - a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err) + flowName, _ := ctx.Value(shared.FlowNameKey).(string) + a.Alerter.LogFlowError(ctx, flowName, err) return nil, fmt.Errorf("failed to setup normalized tables: %w", err) } @@ -580,7 +582,8 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, slog.Error("failed to pull records", slog.Any("error", err)) goroutineErr = err } else { - err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords) + err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, + a.CatalogPool, runUUID, partition, numRecords) if err != nil { slog.Error(fmt.Sprintf("%v", err)) goroutineErr = err @@ -935,7 +938,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, }, } } - updateErr := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics}) + updateErr := monitoring.InitializeQRepRun( + ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics}) if updateErr != nil { return updateErr } @@ -945,7 +949,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, return fmt.Errorf("failed to update start time for partition: %w", err) } - err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, a.CatalogPool, runUUID, partition, int64(numRecords)) + err = monitoring.UpdatePullEndTimeAndRowsForPartition( + errCtx, a.CatalogPool, runUUID, partition, int64(numRecords)) if err != nil { slog.Error(fmt.Sprintf("%v", err)) return err From 93d754a405cd672baa2ca3ea3c9388cb575fbb15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 25 Dec 2023 20:45:46 +0000 Subject: [PATCH 8/9] Fix golangci-lint failures (#902) --- flow/.golangci.yml | 8 ++++++-- flow/activities/slot.go | 2 +- flow/cmd/api.go | 5 ++++- flow/cmd/handler.go | 4 +++- flow/cmd/peer_data.go | 16 ++++++++-------- flow/cmd/snapshot_worker.go | 5 ++++- flow/cmd/worker.go | 5 ++++- flow/connectors/postgres/cdc.go | 4 ++-- flow/connectors/postgres/client.go | 6 +++--- .../snowflake/avro_file_writer_test.go | 1 + flow/connectors/utils/ssh.go | 5 +++-- flow/e2e/snowflake/qrep_flow_sf_test.go | 1 + 12 files changed, 40 insertions(+), 22 deletions(-) diff --git a/flow/.golangci.yml b/flow/.golangci.yml index 2c8032f91d..ab612fb893 100644 --- a/flow/.golangci.yml +++ b/flow/.golangci.yml @@ -4,7 +4,6 @@ run: linters: enable: - dogsled - - dupl - gofumpt - gosec - gosimple @@ -18,9 +17,14 @@ linters: - prealloc - staticcheck - ineffassign + - unparam - unused - lll linters-settings: + stylecheck: + checks: + - all + - '-ST1003' lll: - line-length: 120 + line-length: 144 tab-width: 4 diff --git a/flow/activities/slot.go b/flow/activities/slot.go index 117dbecea3..baa0fbc0fa 100644 --- a/flow/activities/slot.go +++ b/flow/activities/slot.go @@ -23,7 +23,7 @@ func (a *FlowableActivity) handleSlotInfo( return err } - if slotInfo == nil || len(slotInfo) == 0 { + if len(slotInfo) == 0 { slog.WarnContext(ctx, "warning: unable to get slot info", slog.Any("slotName", slotName)) return nil } diff --git a/flow/cmd/api.go b/flow/cmd/api.go index 09185a0fc1..b16034bf20 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -104,7 +104,10 @@ func APIMain(args *APIServerParams) error { } connOptions := client.ConnectionOptions{ - TLS: &tls.Config{Certificates: certs}, + TLS: &tls.Config{ + Certificates: certs, + MinVersion: tls.VersionTLS13, + }, } clientOptions.ConnectionOptions = connOptions } diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 7b03c9de67..dd922ba1f5 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -261,7 +261,9 @@ func (h *FlowRequestHandler) CreateQRepFlow( slog.Any("error", err), slog.String("flowName", cfg.FlowJobName)) return nil, fmt.Errorf("invalid xmin txid for xmin rep: %w", err) } - state.LastPartition.Range = &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: txid}}} + state.LastPartition.Range = &protos.PartitionRange{ + Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: txid}}, + } } workflowFn = peerflow.XminFlowWorkflow diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index 34f31219ed..f9383d8c5e 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -31,24 +31,24 @@ func (h *FlowRequestHandler) getPGPeerConfig(ctx context.Context, peerName strin return &pgPeerConfig, nil } -func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, string, error) { +func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, error) { pgPeerConfig, err := h.getPGPeerConfig(ctx, peerName) if err != nil { - return nil, "", err + return nil, err } connStr := utils.GetPGConnectionString(pgPeerConfig) peerPool, err := pgxpool.New(ctx, connStr) if err != nil { - return nil, "", err + return nil, err } - return peerPool, pgPeerConfig.User, nil + return peerPool, nil } func (h *FlowRequestHandler) GetSchemas( ctx context.Context, req *protos.PostgresPeerActivityInfoRequest, ) (*protos.PeerSchemasResponse, error) { - peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName) + peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName) if err != nil { return &protos.PeerSchemasResponse{Schemas: nil}, err } @@ -78,7 +78,7 @@ func (h *FlowRequestHandler) GetTablesInSchema( ctx context.Context, req *protos.SchemaTablesRequest, ) (*protos.SchemaTablesResponse, error) { - peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName) + peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName) if err != nil { return &protos.SchemaTablesResponse{Tables: nil}, err } @@ -110,7 +110,7 @@ func (h *FlowRequestHandler) GetAllTables( ctx context.Context, req *protos.PostgresPeerActivityInfoRequest, ) (*protos.AllTablesResponse, error) { - peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName) + peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName) if err != nil { return &protos.AllTablesResponse{Tables: nil}, err } @@ -140,7 +140,7 @@ func (h *FlowRequestHandler) GetColumns( ctx context.Context, req *protos.TableColumnsRequest, ) (*protos.TableColumnsResponse, error) { - peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName) + peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName) if err != nil { return &protos.TableColumnsResponse{Columns: nil}, err } diff --git a/flow/cmd/snapshot_worker.go b/flow/cmd/snapshot_worker.go index 16008cc6a5..c68d44d925 100644 --- a/flow/cmd/snapshot_worker.go +++ b/flow/cmd/snapshot_worker.go @@ -32,7 +32,10 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error { } connOptions := client.ConnectionOptions{ - TLS: &tls.Config{Certificates: certs}, + TLS: &tls.Config{ + Certificates: certs, + MinVersion: tls.VersionTLS13, + }, } clientOptions.ConnectionOptions = connOptions } diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index eea0e9184f..f060230b63 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -100,7 +100,10 @@ func WorkerMain(opts *WorkerOptions) error { return fmt.Errorf("unable to process certificate and key: %w", err) } connOptions := client.ConnectionOptions{ - TLS: &tls.Config{Certificates: certs}, + TLS: &tls.Config{ + Certificates: certs, + MinVersion: tls.VersionTLS13, + }, } clientOptions.ConnectionOptions = connOptions } diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index b3686f4d09..2be3fcb2a5 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -360,7 +360,6 @@ func (p *PostgresCDCSource) consumeStream( p.logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n", xld.WALStart, xld.ServerWALEnd, xld.ServerTime)) rec, err := p.processMessage(records, xld, clientXLogPos) - if err != nil { return fmt.Errorf("error processing message: %w", err) } @@ -470,7 +469,8 @@ func (p *PostgresCDCSource) consumeStream( } func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pglogrepl.XLogData, - currentClientXlogPos pglogrepl.LSN) (model.Record, error) { + currentClientXlogPos pglogrepl.LSN, +) (model.Record, error) { logicalMsg, err := pglogrepl.Parse(xld.WALData) if err != nil { return nil, fmt.Errorf("error parsing logical message: %w", err) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index dc604d5631..e48c71b29d 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -84,9 +84,9 @@ type ReplicaIdentityType rune const ( ReplicaIdentityDefault ReplicaIdentityType = 'd' - ReplicaIdentityFull = 'f' - ReplicaIdentityIndex = 'i' - ReplicaIdentityNothing = 'n' + ReplicaIdentityFull ReplicaIdentityType = 'f' + ReplicaIdentityIndex ReplicaIdentityType = 'i' + ReplicaIdentityNothing ReplicaIdentityType = 'n' ) // getRelIDForTable returns the relation ID for a table. diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index 46f18aaa3f..f08b66a6c8 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -55,6 +55,7 @@ func createQValue(t *testing.T, kind qvalue.QValueKind, placeHolder int) qvalue. } } +// nolint:unparam func generateRecords( t *testing.T, nullable bool, diff --git a/flow/connectors/utils/ssh.go b/flow/connectors/utils/ssh.go index 7bd8ed141f..511eea672a 100644 --- a/flow/connectors/utils/ssh.go +++ b/flow/connectors/utils/ssh.go @@ -41,8 +41,9 @@ func GetSSHClientConfig(user, password, privateKeyString string) (*ssh.ClientCon } return &ssh.ClientConfig{ - User: user, - Auth: authMethods, + User: user, + Auth: authMethods, + //nolint:gosec HostKeyCallback: ssh.InsecureIgnoreHostKey(), }, nil } diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 3ac7fee713..b3cd9b9c2a 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" ) +// nolint:unparam func (s PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, numRows int) { err := e2e.CreateTableForQRep(s.pool, s.pgSuffix, tableName) require.NoError(s.t, err) From eb63a7685f85f4162430b9b33bbeffd3ed0de5f5 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Tue, 26 Dec 2023 18:44:21 +0530 Subject: [PATCH 9/9] added capability for BQ CDC across datasets (#904) 1) Just like Snowflake and Postgres, now BigQuery takes tables in the form of `.`. If dataset is omitted then it defaults to using the dataset specified at the time of peer creation. 2) If the dataset doesn't exist at the time of mirror creation, it is created during `SetupNormalizedTables` before the tables in the dataset. 3) A check has also been added so that two source tables cannot point to the same destination table specified in 2 different formats. --- flow/connectors/bigquery/bigquery.go | 175 ++++++++++-------- .../bigquery/merge_statement_generator.go | 46 ++--- flow/connectors/bigquery/qrep.go | 10 +- flow/connectors/bigquery/qrep_avro_sync.go | 53 +++--- flow/connectors/eventhub/eventhub.go | 2 +- flow/connectors/postgres/cdc.go | 4 +- flow/e2e/bigquery/bigquery_helper.go | 23 ++- flow/e2e/bigquery/peer_flow_bq_test.go | 70 ++++++- flow/model/model.go | 12 +- 9 files changed, 248 insertions(+), 147 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 0a220ef424..3da34f99d7 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -29,9 +29,7 @@ import ( const ( /* Different batch Ids in code/BigQuery - 1. batchID - identifier in raw/staging tables on target to depict which batch a row was inserted. - 2. stagingBatchID - the random batch id we generate before ingesting into staging table. - helps filter rows in the current batch before inserting into raw table. + 1. batchID - identifier in raw table on target to depict which batch a row was inserted. 3. syncBatchID - batch id that was last synced or will be synced 4. normalizeBatchID - batch id that was last normalized or will be normalized. */ @@ -233,8 +231,8 @@ func (c *BigQueryConnector) InitializeTableSchema(req map[string]*protos.TableSc return nil } -func (c *BigQueryConnector) waitForTableReady(tblName string) error { - table := c.client.Dataset(c.datasetID).Table(tblName) +func (c *BigQueryConnector) waitForTableReady(datasetTable *datasetTable) error { + table := c.client.Dataset(datasetTable.dataset).Table(datasetTable.table) maxDuration := 5 * time.Minute deadline := time.Now().Add(maxDuration) sleepInterval := 5 * time.Second @@ -242,7 +240,7 @@ func (c *BigQueryConnector) waitForTableReady(tblName string) error { for { if time.Now().After(deadline) { - return fmt.Errorf("timeout reached while waiting for table %s to be ready", tblName) + return fmt.Errorf("timeout reached while waiting for table %s to be ready", datasetTable) } _, err := table.Metadata(c.ctx) @@ -250,7 +248,8 @@ func (c *BigQueryConnector) waitForTableReady(tblName string) error { return nil } - slog.Info("waiting for table to be ready", slog.String("table", tblName), slog.Int("attempt", attempt)) + slog.Info("waiting for table to be ready", + slog.String("table", datasetTable.table), slog.Int("attempt", attempt)) attempt++ time.Sleep(sleepInterval) } @@ -267,9 +266,10 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string, } for _, addedColumn := range schemaDelta.AddedColumns { + dstDatasetTable, _ := c.convertToDatasetTable(schemaDelta.DstTableName) _, err := c.client.Query(fmt.Sprintf( - "ALTER TABLE %s.%s ADD COLUMN IF NOT EXISTS `%s` %s", c.datasetID, - schemaDelta.DstTableName, addedColumn.ColumnName, + "ALTER TABLE %s.%s ADD COLUMN IF NOT EXISTS `%s` %s", dstDatasetTable.dataset, + dstDatasetTable.table, addedColumn.ColumnName, qValueKindToBigQueryType(addedColumn.ColumnType))).Read(c.ctx) if err != nil { return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName, @@ -593,16 +593,11 @@ func (c *BigQueryConnector) syncRecordsViaAvro( var entries [10]qvalue.QValue switch r := record.(type) { case *model.InsertRecord: - itemsJSON, err := r.Items.ToJSON() if err != nil { return nil, fmt.Errorf("failed to create items to json: %v", err) } - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: r.DestinationTableName, - } entries[4] = qvalue.QValue{ Kind: qvalue.QValueKindString, Value: itemsJSON, @@ -626,16 +621,11 @@ func (c *BigQueryConnector) syncRecordsViaAvro( if err != nil { return nil, fmt.Errorf("failed to create new items to json: %v", err) } - oldItemsJSON, err := r.OldItems.ToJSON() if err != nil { return nil, fmt.Errorf("failed to create old items to json: %v", err) } - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: r.DestinationTableName, - } entries[4] = qvalue.QValue{ Kind: qvalue.QValueKindString, Value: newItemsJSON, @@ -660,10 +650,6 @@ func (c *BigQueryConnector) syncRecordsViaAvro( return nil, fmt.Errorf("failed to create items to json: %v", err) } - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: r.DestinationTableName, - } entries[4] = qvalue.QValue{ Kind: qvalue.QValueKindString, Value: itemsJSON, @@ -698,6 +684,10 @@ func (c *BigQueryConnector) syncRecordsViaAvro( Kind: qvalue.QValueKindInt64, Value: time.Now().UnixNano(), } + entries[3] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: record.GetDestinationTableName(), + } entries[7] = qvalue.QValue{ Kind: qvalue.QValueKindInt64, Value: syncBatchID, @@ -787,14 +777,18 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) c.datasetID, rawTableName, distinctTableNames)) for _, tableName := range distinctTableNames { + dstDatasetTable, _ := c.convertToDatasetTable(tableName) mergeGen := &mergeStmtGenerator{ - Dataset: c.datasetID, - NormalizedTable: tableName, - RawTable: rawTableName, - NormalizedTableSchema: c.tableNameSchemaMapping[tableName], - SyncBatchID: syncBatchID, - NormalizeBatchID: normalizeBatchID, - UnchangedToastColumns: tableNametoUnchangedToastCols[tableName], + rawDatasetTable: &datasetTable{ + dataset: c.datasetID, + table: rawTableName, + }, + dstTableName: tableName, + dstDatasetTable: dstDatasetTable, + normalizedTableSchema: c.tableNameSchemaMapping[tableName], + syncBatchID: syncBatchID, + normalizeBatchID: normalizeBatchID, + unchangedToastColumns: tableNametoUnchangedToastCols[tableName], peerdbCols: &protos.PeerDBColumns{ SoftDeleteColName: req.SoftDeleteColName, SyncedAtColName: req.SyncedAtColName, @@ -846,19 +840,6 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr {Name: "_peerdb_unchanged_toast_columns", Type: bigquery.StringFieldType}, } - stagingSchema := bigquery.Schema{ - {Name: "_peerdb_uid", Type: bigquery.StringFieldType}, - {Name: "_peerdb_timestamp", Type: bigquery.TimestampFieldType}, - {Name: "_peerdb_timestamp_nanos", Type: bigquery.IntegerFieldType}, - {Name: "_peerdb_destination_table_name", Type: bigquery.StringFieldType}, - {Name: "_peerdb_data", Type: bigquery.StringFieldType}, - {Name: "_peerdb_record_type", Type: bigquery.IntegerFieldType}, - {Name: "_peerdb_match_data", Type: bigquery.StringFieldType}, - {Name: "_peerdb_batch_id", Type: bigquery.IntegerFieldType}, - {Name: "_peerdb_staging_batch_id", Type: bigquery.IntegerFieldType}, - {Name: "_peerdb_unchanged_toast_columns", Type: bigquery.StringFieldType}, - } - // create the table table := c.client.Dataset(c.datasetID).Table(rawTableName) @@ -883,16 +864,6 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr return nil, fmt.Errorf("failed to create table %s.%s: %w", c.datasetID, rawTableName, err) } - // also create a staging table for this raw table - stagingTableName := c.getStagingTableName(req.FlowJobName) - stagingTable := c.client.Dataset(c.datasetID).Table(stagingTableName) - err = stagingTable.Create(c.ctx, &bigquery.TableMetadata{ - Schema: stagingSchema, - }) - if err != nil { - return nil, fmt.Errorf("failed to create table %s.%s: %w", c.datasetID, stagingTableName, err) - } - return &protos.CreateRawTableOutput{ TableIdentifier: rawTableName, }, nil @@ -952,14 +923,41 @@ func (c *BigQueryConnector) SetupNormalizedTables( req *protos.SetupNormalizedTableBatchInput, ) (*protos.SetupNormalizedTableBatchOutput, error) { tableExistsMapping := make(map[string]bool) + datasetTablesSet := make(map[datasetTable]struct{}) for tableIdentifier, tableSchema := range req.TableNameSchemaMapping { - table := c.client.Dataset(c.datasetID).Table(tableIdentifier) + // only place where we check for parsing errors + datasetTable, err := c.convertToDatasetTable(tableIdentifier) + if err != nil { + return nil, err + } + _, ok := datasetTablesSet[*datasetTable] + if ok { + return nil, fmt.Errorf("invalid mirror: two tables mirror to the same BigQuery table %s", + datasetTable.string()) + } + dataset := c.client.Dataset(datasetTable.dataset) + _, err = dataset.Metadata(c.ctx) + // just assume this means dataset don't exist, and create it + if err != nil { + // if err message does not contain `notFound`, then other error happened. + if !strings.Contains(err.Error(), "notFound") { + return nil, fmt.Errorf("error while checking metadata for BigQuery dataset %s: %w", + datasetTable.dataset, err) + } + c.logger.InfoContext(c.ctx, fmt.Sprintf("creating dataset %s...", dataset.DatasetID)) + err = dataset.Create(c.ctx, nil) + if err != nil { + return nil, fmt.Errorf("failed to create BigQuery dataset %s: %w", dataset.DatasetID, err) + } + } + table := dataset.Table(datasetTable.table) // check if the table exists - _, err := table.Metadata(c.ctx) + _, err = table.Metadata(c.ctx) if err == nil { // table exists, go to next table tableExistsMapping[tableIdentifier] = true + datasetTablesSet[*datasetTable] = struct{}{} continue } @@ -999,6 +997,7 @@ func (c *BigQueryConnector) SetupNormalizedTables( } tableExistsMapping[tableIdentifier] = false + datasetTablesSet[*datasetTable] = struct{}{} // log that table was created c.logger.Info(fmt.Sprintf("created table %s", tableIdentifier)) } @@ -1015,10 +1014,6 @@ func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error { if err != nil { return fmt.Errorf("failed to delete raw table: %w", err) } - err = dataset.Table(c.getStagingTableName(jobName)).Delete(c.ctx) - if err != nil { - return fmt.Errorf("failed to delete staging table: %w", err) - } // deleting job from metadata table query := fmt.Sprintf("DELETE FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName) @@ -1036,35 +1031,33 @@ func (c *BigQueryConnector) getRawTableName(flowJobName string) string { return fmt.Sprintf("_peerdb_raw_%s", flowJobName) } -// getStagingTableName returns the staging table name for the given table identifier. -func (c *BigQueryConnector) getStagingTableName(flowJobName string) string { - // replace all non-alphanumeric characters with _ - flowJobName = regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(flowJobName, "_") - return fmt.Sprintf("_peerdb_staging_%s", flowJobName) -} - func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) { for _, renameRequest := range req.RenameTableOptions { - src := renameRequest.CurrentName - dst := renameRequest.NewName - c.logger.Info(fmt.Sprintf("renaming table '%s' to '%s'...", src, dst)) + srcDatasetTable, _ := c.convertToDatasetTable(renameRequest.CurrentName) + dstDatasetTable, _ := c.convertToDatasetTable(renameRequest.NewName) + c.logger.Info(fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(), + dstDatasetTable.string())) - activity.RecordHeartbeat(c.ctx, fmt.Sprintf("renaming table '%s' to '%s'...", src, dst)) + activity.RecordHeartbeat(c.ctx, fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(), + dstDatasetTable.string())) // drop the dst table if exists - _, err := c.client.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", c.datasetID, dst)).Run(c.ctx) + _, err := c.client.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", + dstDatasetTable.dataset, dstDatasetTable.table)).Run(c.ctx) if err != nil { - return nil, fmt.Errorf("unable to drop table %s: %w", dst, err) + return nil, fmt.Errorf("unable to drop table %s: %w", dstDatasetTable.string(), err) } // rename the src table to dst _, err = c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s RENAME TO %s", - c.datasetID, src, dst)).Run(c.ctx) + srcDatasetTable.dataset, srcDatasetTable.table, dstDatasetTable.table)).Run(c.ctx) if err != nil { - return nil, fmt.Errorf("unable to rename table %s to %s: %w", src, dst, err) + return nil, fmt.Errorf("unable to rename table %s to %s: %w", srcDatasetTable.string(), + dstDatasetTable.string(), err) } - c.logger.Info(fmt.Sprintf("successfully renamed table '%s' to '%s'", src, dst)) + c.logger.Info(fmt.Sprintf("successfully renamed table '%s' to '%s'", srcDatasetTable.string(), + dstDatasetTable.string())) } return &protos.RenameTablesOutput{ @@ -1076,13 +1069,15 @@ func (c *BigQueryConnector) CreateTablesFromExisting(req *protos.CreateTablesFro *protos.CreateTablesFromExistingOutput, error, ) { for newTable, existingTable := range req.NewToExistingTableMapping { + newDatasetTable, _ := c.convertToDatasetTable(newTable) + existingDatasetTable, _ := c.convertToDatasetTable(existingTable) c.logger.Info(fmt.Sprintf("creating table '%s' similar to '%s'", newTable, existingTable)) activity.RecordHeartbeat(c.ctx, fmt.Sprintf("creating table '%s' similar to '%s'", newTable, existingTable)) // rename the src table to dst - _, err := c.client.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.%s LIKE %s.%s", - c.datasetID, newTable, c.datasetID, existingTable)).Run(c.ctx) + _, err := c.client.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` LIKE `%s`", + newDatasetTable.string(), existingDatasetTable.string())).Run(c.ctx) if err != nil { return nil, fmt.Errorf("unable to create table %s: %w", newTable, err) } @@ -1094,3 +1089,29 @@ func (c *BigQueryConnector) CreateTablesFromExisting(req *protos.CreateTablesFro FlowJobName: req.FlowJobName, }, nil } + +type datasetTable struct { + dataset string + table string +} + +func (d *datasetTable) string() string { + return fmt.Sprintf("%s.%s", d.dataset, d.table) +} + +func (c *BigQueryConnector) convertToDatasetTable(tableName string) (*datasetTable, error) { + parts := strings.Split(tableName, ".") + if len(parts) == 1 { + return &datasetTable{ + dataset: c.datasetID, + table: parts[0], + }, nil + } else if len(parts) == 2 { + return &datasetTable{ + dataset: parts[0], + table: parts[1], + }, nil + } else { + return nil, fmt.Errorf("invalid BigQuery table name: %s", tableName) + } +} diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 22f876b8c3..e9a71b06cd 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -11,20 +11,20 @@ import ( ) type mergeStmtGenerator struct { - // dataset of all the tables - Dataset string - // the table to merge into - NormalizedTable string - // the table where the data is currently staged. - RawTable string + // dataset + raw table + rawDatasetTable *datasetTable + // destination table name, used to retrieve records from raw table + dstTableName string + // dataset + destination table + dstDatasetTable *datasetTable // last synced batchID. - SyncBatchID int64 + syncBatchID int64 // last normalized batchID. - NormalizeBatchID int64 + normalizeBatchID int64 // the schema of the table to merge into - NormalizedTableSchema *protos.TableSchema + normalizedTableSchema *protos.TableSchema // array of toast column combinations that are unchanged - UnchangedToastColumns []string + unchangedToastColumns []string // _PEERDB_IS_DELETED and _SYNCED_AT columns peerdbCols *protos.PeerDBColumns } @@ -34,7 +34,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { // for each column in the normalized table, generate CAST + JSON_EXTRACT_SCALAR // statement. flattenedProjs := make([]string, 0) - for colName, colType := range m.NormalizedTableSchema.Columns { + for colName, colType := range m.normalizedTableSchema.Columns { bqType := qValueKindToBigQueryType(colType) // CAST doesn't work for FLOAT, so rewrite it to FLOAT64. if bqType == bigquery.FloatFieldType { @@ -87,10 +87,10 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { // normalize anything between last normalized batch id to last sync batchid return fmt.Sprintf(`WITH _peerdb_flattened AS - (SELECT %s FROM %s.%s WHERE _peerdb_batch_id > %d and _peerdb_batch_id <= %d and + (SELECT %s FROM %s WHERE _peerdb_batch_id > %d and _peerdb_batch_id <= %d and _peerdb_destination_table_name='%s')`, - strings.Join(flattenedProjs, ", "), m.Dataset, m.RawTable, m.NormalizeBatchID, - m.SyncBatchID, m.NormalizedTable) + strings.Join(flattenedProjs, ", "), m.rawDatasetTable.string(), m.normalizeBatchID, + m.syncBatchID, m.dstTableName) } // generateDeDupedCTE generates a de-duped CTE. @@ -104,7 +104,7 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string { ) _peerdb_ranked WHERE _peerdb_rank = 1 ) SELECT * FROM _peerdb_de_duplicated_data_res` - pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(m.NormalizedTableSchema.PrimaryKeyColumns, + pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(m.normalizedTableSchema.PrimaryKeyColumns, ", '_peerdb_concat_', ")) return fmt.Sprintf(cte, pkeyColsStr) } @@ -112,9 +112,9 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string { // generateMergeStmt generates a merge statement. func (m *mergeStmtGenerator) generateMergeStmt() string { // comma separated list of column names - backtickColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns)) - pureColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns)) - for colName := range m.NormalizedTableSchema.Columns { + backtickColNames := make([]string, 0, len(m.normalizedTableSchema.Columns)) + pureColNames := make([]string, 0, len(m.normalizedTableSchema.Columns)) + for colName := range m.normalizedTableSchema.Columns { backtickColNames = append(backtickColNames, fmt.Sprintf("`%s`", colName)) pureColNames = append(pureColNames, colName) } @@ -123,7 +123,7 @@ func (m *mergeStmtGenerator) generateMergeStmt() string { insertValuesSQL := csep + ",CURRENT_TIMESTAMP" updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, - m.UnchangedToastColumns, m.peerdbCols) + m.unchangedToastColumns, m.peerdbCols) if m.peerdbCols.SoftDelete { softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(", `%s`", m.peerdbCols.SoftDeleteColName) softDeleteInsertValuesSQL := insertValuesSQL + ", TRUE" @@ -134,8 +134,8 @@ func (m *mergeStmtGenerator) generateMergeStmt() string { } updateStringToastCols := strings.Join(updateStatementsforToastCols, " ") - pkeySelectSQLArray := make([]string, 0, len(m.NormalizedTableSchema.PrimaryKeyColumns)) - for _, pkeyColName := range m.NormalizedTableSchema.PrimaryKeyColumns { + pkeySelectSQLArray := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns)) + for _, pkeyColName := range m.normalizedTableSchema.PrimaryKeyColumns { pkeySelectSQLArray = append(pkeySelectSQLArray, fmt.Sprintf("_peerdb_target.%s = _peerdb_deduped.%s", pkeyColName, pkeyColName)) } @@ -153,14 +153,14 @@ func (m *mergeStmtGenerator) generateMergeStmt() string { } return fmt.Sprintf(` - MERGE %s.%s _peerdb_target USING (%s,%s) _peerdb_deduped + MERGE %s _peerdb_target USING (%s,%s) _peerdb_deduped ON %s WHEN NOT MATCHED and (_peerdb_deduped._peerdb_record_type != 2) THEN INSERT (%s) VALUES (%s) %s WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type = 2) THEN %s; - `, m.Dataset, m.NormalizedTable, m.generateFlattenedCTE(), m.generateDeDupedCTE(), + `, m.dstDatasetTable.string(), m.generateFlattenedCTE(), m.generateDeDupedCTE(), pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart) } diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index df771e50a2..305bab01eb 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -45,7 +45,7 @@ func (c *BigQueryConnector) SyncQRepRecords( " partition %s of destination table %s", partition.PartitionId, destTable)) - avroSync := &QRepAvroSyncMethod{connector: c, gcsBucket: config.StagingPath} + avroSync := NewQRepAvroSyncMethod(c, config.StagingPath, config.FlowJobName) return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition, tblMetadata, stream, config.SyncedAtColName, config.SoftDeleteColName) } @@ -53,11 +53,11 @@ func (c *BigQueryConnector) SyncQRepRecords( func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfig, partition *protos.QRepPartition, srcSchema *model.QRecordSchema, ) (*bigquery.TableMetadata, error) { - destTable := config.DestinationTableIdentifier - bqTable := c.client.Dataset(c.datasetID).Table(destTable) + destDatasetTable, _ := c.convertToDatasetTable(config.DestinationTableIdentifier) + bqTable := c.client.Dataset(destDatasetTable.dataset).Table(destDatasetTable.table) dstTableMetadata, err := bqTable.Metadata(c.ctx) if err != nil { - return nil, fmt.Errorf("failed to get metadata of table %s: %w", destTable, err) + return nil, fmt.Errorf("failed to get metadata of table %s: %w", destDatasetTable, err) } tableSchemaDelta := &protos.TableSchemaDelta{ @@ -92,7 +92,7 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfi } dstTableMetadata, err = bqTable.Metadata(c.ctx) if err != nil { - return nil, fmt.Errorf("failed to get metadata of table %s: %w", destTable, err) + return nil, fmt.Errorf("failed to get metadata of table %s: %w", destDatasetTable, err) } return dstTableMetadata, nil } diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 7ed87b0c06..8e600d5279 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -35,7 +35,7 @@ func NewQRepAvroSyncMethod(connector *BigQueryConnector, gcsBucket string, } func (s *QRepAvroSyncMethod) SyncRecords( - dstTableName string, + rawTableName string, flowJobName string, lastCP int64, dstTableMetadata *bigquery.TableMetadata, @@ -45,16 +45,20 @@ func (s *QRepAvroSyncMethod) SyncRecords( activity.RecordHeartbeat(s.connector.ctx, time.Minute, fmt.Sprintf("Flow job %s: Obtaining Avro schema"+ " for destination table %s and sync batch ID %d", - flowJobName, dstTableName, syncBatchID), + flowJobName, rawTableName, syncBatchID), ) // You will need to define your Avro schema as a string - avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, "", "") + avroSchema, err := DefineAvroSchema(rawTableName, dstTableMetadata, "", "") if err != nil { return 0, fmt.Errorf("failed to define Avro schema: %w", err) } - stagingTable := fmt.Sprintf("%s_%s_staging", dstTableName, fmt.Sprint(syncBatchID)) - numRecords, err := s.writeToStage(fmt.Sprint(syncBatchID), dstTableName, avroSchema, stagingTable, stream) + stagingTable := fmt.Sprintf("%s_%s_staging", rawTableName, fmt.Sprint(syncBatchID)) + numRecords, err := s.writeToStage(fmt.Sprint(syncBatchID), rawTableName, avroSchema, + &datasetTable{ + dataset: s.connector.datasetID, + table: stagingTable, + }, stream) if err != nil { return -1, fmt.Errorf("failed to push to avro stage: %v", err) } @@ -62,7 +66,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( bqClient := s.connector.client datasetID := s.connector.datasetID insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT * FROM `%s.%s`;", - datasetID, dstTableName, datasetID, stagingTable) + datasetID, rawTableName, datasetID, stagingTable) updateMetadataStmt, err := s.connector.getUpdateMetadataStmt(flowJobName, lastCP, syncBatchID) if err != nil { return -1, fmt.Errorf("failed to update metadata: %v", err) @@ -71,7 +75,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( activity.RecordHeartbeat(s.connector.ctx, time.Minute, fmt.Sprintf("Flow job %s: performing insert and update transaction"+ " for destination table %s and sync batch ID %d", - flowJobName, dstTableName, syncBatchID), + flowJobName, rawTableName, syncBatchID), ) stmts := []string{ @@ -91,12 +95,12 @@ func (s *QRepAvroSyncMethod) SyncRecords( slog.Error("failed to delete staging table "+stagingTable, slog.Any("error", err), slog.String("syncBatchID", fmt.Sprint(syncBatchID)), - slog.String("destinationTable", dstTableName)) + slog.String("destinationTable", rawTableName)) } - slog.Info(fmt.Sprintf("loaded stage into %s.%s", datasetID, dstTableName), + slog.Info(fmt.Sprintf("loaded stage into %s.%s", datasetID, rawTableName), slog.String(string(shared.FlowNameKey), flowJobName), - slog.String("dstTableName", dstTableName)) + slog.String("dstTableName", rawTableName)) return numRecords, nil } @@ -124,8 +128,14 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( slog.Info("Obtained Avro schema for destination table", flowLog) slog.Info(fmt.Sprintf("Avro schema: %v\n", avroSchema), flowLog) // create a staging table name with partitionID replace hyphens with underscores - stagingTable := fmt.Sprintf("%s_%s_staging", dstTableName, strings.ReplaceAll(partition.PartitionId, "-", "_")) - numRecords, err := s.writeToStage(partition.PartitionId, flowJobName, avroSchema, stagingTable, stream) + dstDatasetTable, _ := s.connector.convertToDatasetTable(dstTableName) + stagingDatasetTable := &datasetTable{ + dataset: dstDatasetTable.dataset, + table: fmt.Sprintf("%s_%s_staging", dstDatasetTable.table, + strings.ReplaceAll(partition.PartitionId, "-", "_")), + } + numRecords, err := s.writeToStage(partition.PartitionId, flowJobName, avroSchema, + stagingDatasetTable, stream) if err != nil { return -1, fmt.Errorf("failed to push to avro stage: %v", err) } @@ -135,7 +145,6 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( flowJobName, dstTableName, partition.PartitionId), ) bqClient := s.connector.client - datasetID := s.connector.datasetID selector := "*" if softDeleteCol != "" { // PeerDB column @@ -145,8 +154,8 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( selector += ", CURRENT_TIMESTAMP" } // Insert the records from the staging table into the destination table - insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT %s FROM `%s.%s`;", - datasetID, dstTableName, selector, datasetID, stagingTable) + insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT %s FROM `%s`;", + dstDatasetTable.string(), selector, stagingDatasetTable.string()) insertMetadataStmt, err := s.connector.createMetadataInsertStatement(partition, flowJobName, startTime) if err != nil { @@ -166,14 +175,15 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( } // drop the staging table - if err := bqClient.Dataset(datasetID).Table(stagingTable).Delete(s.connector.ctx); err != nil { + if err := bqClient.Dataset(stagingDatasetTable.dataset). + Table(stagingDatasetTable.table).Delete(s.connector.ctx); err != nil { // just log the error this isn't fatal. - slog.Error("failed to delete staging table "+stagingTable, + slog.Error("failed to delete staging table "+stagingDatasetTable.string(), slog.Any("error", err), flowLog) } - slog.Info(fmt.Sprintf("loaded stage into %s.%s", datasetID, dstTableName), flowLog) + slog.Info(fmt.Sprintf("loaded stage into %s", dstDatasetTable.string()), flowLog) return numRecords, nil } @@ -323,7 +333,7 @@ func (s *QRepAvroSyncMethod) writeToStage( syncID string, objectFolder string, avroSchema *model.QRecordAvroSchemaDefinition, - stagingTable string, + stagingTable *datasetTable, stream *model.QRecordStream, ) (int, error) { shutdown := utils.HeartbeatRoutine(s.connector.ctx, time.Minute, @@ -379,7 +389,6 @@ func (s *QRepAvroSyncMethod) writeToStage( slog.Info(fmt.Sprintf("wrote %d records", avroFile.NumRecords), idLog) bqClient := s.connector.client - datasetID := s.connector.datasetID var avroRef bigquery.LoadSource if s.gcsBucket != "" { gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, avroFile.FilePath)) @@ -396,7 +405,7 @@ func (s *QRepAvroSyncMethod) writeToStage( avroRef = localRef } - loader := bqClient.Dataset(datasetID).Table(stagingTable).LoaderFrom(avroRef) + loader := bqClient.Dataset(stagingTable.dataset).Table(stagingTable.table).LoaderFrom(avroRef) loader.UseAvroLogicalTypes = true loader.WriteDisposition = bigquery.WriteTruncate job, err := loader.Run(s.connector.ctx) @@ -412,7 +421,7 @@ func (s *QRepAvroSyncMethod) writeToStage( if err := status.Err(); err != nil { return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %w", err) } - slog.Info(fmt.Sprintf("Pushed into %s/%s", avroFile.FilePath, syncID)) + slog.Info(fmt.Sprintf("Pushed into %s", avroFile.FilePath)) err = s.connector.waitForTableReady(stagingTable) if err != nil { diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 05347a4263..c8ba3dad41 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -164,7 +164,7 @@ func (c *EventHubConnector) processBatch( return 0, err } - topicName, err := NewScopedEventhub(record.GetTableName()) + topicName, err := NewScopedEventhub(record.GetDestinationTableName()) if err != nil { c.logger.Error("failed to get topic name", slog.Any("error", err)) return 0, err diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 2be3fcb2a5..4c5693f292 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -365,7 +365,7 @@ func (p *PostgresCDCSource) consumeStream( } if rec != nil { - tableName := rec.GetTableName() + tableName := rec.GetDestinationTableName() switch r := rec.(type) { case *model.UpdateRecord: // tableName here is destination tableName. @@ -843,7 +843,7 @@ func (p *PostgresCDCSource) processRelationMessage( func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest, rec model.Record, ) (*model.TableWithPkey, error) { - tableName := rec.GetTableName() + tableName := rec.GetDestinationTableName() pkeyColsMerged := make([]byte, 0) for _, pkeyCol := range req.TableNameSchemaMapping[tableName].PrimaryKeyColumns { diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index fb9dadb9ba..21bd3b5c75 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -94,12 +94,11 @@ func generateBQPeer(bigQueryConfig *protos.BigqueryConfig) *protos.Peer { } // datasetExists checks if the dataset exists. -func (b *BigQueryTestHelper) datasetExists() (bool, error) { - dataset := b.client.Dataset(b.Config.DatasetId) +func (b *BigQueryTestHelper) datasetExists(datasetName string) (bool, error) { + dataset := b.client.Dataset(datasetName) meta, err := dataset.Metadata(context.Background()) if err != nil { // if err message contains `notFound` then dataset does not exist. - // first we cast the error to a bigquery.Error if strings.Contains(err.Error(), "notFound") { fmt.Printf("dataset %s does not exist\n", b.Config.DatasetId) return false, nil @@ -117,12 +116,12 @@ func (b *BigQueryTestHelper) datasetExists() (bool, error) { // RecreateDataset recreates the dataset, i.e, deletes it if exists and creates it again. func (b *BigQueryTestHelper) RecreateDataset() error { - exists, err := b.datasetExists() + exists, err := b.datasetExists(b.datasetName) if err != nil { return fmt.Errorf("failed to check if dataset %s exists: %w", b.Config.DatasetId, err) } - dataset := b.client.Dataset(b.Config.DatasetId) + dataset := b.client.Dataset(b.datasetName) if exists { err := dataset.DeleteWithContents(context.Background()) if err != nil { @@ -135,13 +134,13 @@ func (b *BigQueryTestHelper) RecreateDataset() error { return fmt.Errorf("failed to create dataset: %w", err) } - fmt.Printf("created dataset %s successfully\n", b.Config.DatasetId) + fmt.Printf("created dataset %s successfully\n", b.datasetName) return nil } // DropDataset drops the dataset. -func (b *BigQueryTestHelper) DropDataset() error { - exists, err := b.datasetExists() +func (b *BigQueryTestHelper) DropDataset(datasetName string) error { + exists, err := b.datasetExists(datasetName) if err != nil { return fmt.Errorf("failed to check if dataset %s exists: %w", b.Config.DatasetId, err) } @@ -150,7 +149,7 @@ func (b *BigQueryTestHelper) DropDataset() error { return nil } - dataset := b.client.Dataset(b.Config.DatasetId) + dataset := b.client.Dataset(datasetName) err = dataset.DeleteWithContents(context.Background()) if err != nil { return fmt.Errorf("failed to delete dataset: %w", err) @@ -171,7 +170,11 @@ func (b *BigQueryTestHelper) RunCommand(command string) error { // countRows(tableName) returns the number of rows in the given table. func (b *BigQueryTestHelper) countRows(tableName string) (int, error) { - command := fmt.Sprintf("SELECT COUNT(*) FROM `%s.%s`", b.Config.DatasetId, tableName) + return b.countRowsWithDataset(b.datasetName, tableName) +} + +func (b *BigQueryTestHelper) countRowsWithDataset(dataset, tableName string) (int, error) { + command := fmt.Sprintf("SELECT COUNT(*) FROM `%s.%s`", dataset, tableName) it, err := b.client.Query(command).Read(context.Background()) if err != nil { return 0, fmt.Errorf("failed to run command: %w", err) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index b28577f4d3..c76688f79b 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -150,7 +150,7 @@ func (s PeerFlowE2ETestSuiteBQ) tearDownSuite() { s.FailNow() } - err = s.bqHelper.DropDataset() + err = s.bqHelper.DropDataset(s.bqHelper.datasetName) if err != nil { slog.Error("failed to tear down bigquery", slog.Any("error", err)) s.FailNow() @@ -1203,3 +1203,71 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { env.AssertExpectations(s.t) } + +func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) + + srcTable1Name := s.attachSchemaSuffix("test1_bq") + dstTable1Name := "test1_bq" + secondDataset := fmt.Sprintf("%s_2", s.bqHelper.datasetName) + srcTable2Name := s.attachSchemaSuffix("test2_bq") + dstTable2Name := "test2_bq" + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %s(id serial primary key, c1 int, c2 text); + CREATE TABLE %s(id serial primary key, c1 int, c2 text); + `, srcTable1Name, srcTable2Name)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_multi_table_multi_dataset_bq"), + TableNameMapping: map[string]string{ + srcTable1Name: dstTable1Name, + srcTable2Name: fmt.Sprintf("%s.%s", secondDataset, dstTable2Name), + }, + PostgresPort: e2e.PostgresPort, + Destination: s.bqHelper.Peer, + CdcStagingPath: "", + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + require.NoError(s.t, err) + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 2, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and execute a transaction touching toast columns + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + /* inserting across multiple tables*/ + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); + INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); + `, srcTable1Name, srcTable2Name)) + require.NoError(s.t, err) + fmt.Println("Executed an insert on two tables") + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + require.True(s.t, env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + + count1, err := s.bqHelper.countRows(dstTable1Name) + require.NoError(s.t, err) + count2, err := s.bqHelper.countRowsWithDataset(secondDataset, dstTable2Name) + require.NoError(s.t, err) + + s.Equal(1, count1) + s.Equal(1, count2) + + err = s.bqHelper.DropDataset(secondDataset) + require.NoError(s.t, err) + + env.AssertExpectations(s.t) +} diff --git a/flow/model/model.go b/flow/model/model.go index 581b57178b..fc2c12d849 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -58,7 +58,7 @@ type Record interface { // GetCheckPointID returns the ID of the record. GetCheckPointID() int64 // get table name - GetTableName() string + GetDestinationTableName() string // get columns and values for the record GetItems() *RecordItems } @@ -244,7 +244,7 @@ func (r *InsertRecord) GetCheckPointID() int64 { return r.CheckPointID } -func (r *InsertRecord) GetTableName() string { +func (r *InsertRecord) GetDestinationTableName() string { return r.DestinationTableName } @@ -273,7 +273,7 @@ func (r *UpdateRecord) GetCheckPointID() int64 { } // Implement Record interface for UpdateRecord. -func (r *UpdateRecord) GetTableName() string { +func (r *UpdateRecord) GetDestinationTableName() string { return r.DestinationTableName } @@ -299,7 +299,7 @@ func (r *DeleteRecord) GetCheckPointID() int64 { return r.CheckPointID } -func (r *DeleteRecord) GetTableName() string { +func (r *DeleteRecord) GetDestinationTableName() string { return r.DestinationTableName } @@ -470,8 +470,8 @@ func (r *RelationRecord) GetCheckPointID() int64 { return r.CheckPointID } -func (r *RelationRecord) GetTableName() string { - return r.TableSchemaDelta.SrcTableName +func (r *RelationRecord) GetDestinationTableName() string { + return r.TableSchemaDelta.DstTableName } func (r *RelationRecord) GetItems() *RecordItems {