diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 4cdf44df8e..d9a6e0a2c5 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -500,6 +500,13 @@ func (h *FlowRequestHandler) CreatePeer( } sfConfig := sfConfigObject.SnowflakeConfig encodedConfig, encodingErr = proto.Marshal(sfConfig) + case protos.DBType_BIGQUERY: + bqConfigObject, ok := config.(*protos.Peer_BigqueryConfig) + if !ok { + return wrongConfigResponse, nil + } + bqConfig := bqConfigObject.BigqueryConfig + encodedConfig, encodingErr = proto.Marshal(bqConfig) case protos.DBType_SQLSERVER: sqlServerConfigObject, ok := config.(*protos.Peer_SqlserverConfig) if !ok { diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 7383bac2a6..1a3b7fab6d 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -165,6 +165,13 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (* return nil, fmt.Errorf("failed to create BigQuery client: %v", err) } + datasetID := config.GetDatasetId() + _, checkErr := client.Dataset(datasetID).Metadata(ctx) + if checkErr != nil { + log.Errorf("failed to get dataset metadata: %v", checkErr) + return nil, fmt.Errorf("failed to get dataset metadata: %v", checkErr) + } + storageClient, err := bqsa.CreateStorageClient(ctx) if err != nil { return nil, fmt.Errorf("failed to create Storage client: %v", err) @@ -174,7 +181,7 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (* ctx: ctx, bqConfig: config, client: client, - datasetID: config.GetDatasetId(), + datasetID: datasetID, storageClient: storageClient, }, nil } diff --git a/ui/app/api/peers/route.ts b/ui/app/api/peers/route.ts index dc1292c98f..b1d1359e19 100644 --- a/ui/app/api/peers/route.ts +++ b/ui/app/api/peers/route.ts @@ -46,6 +46,12 @@ const constructPeer = ( type: DBType.SNOWFLAKE, snowflakeConfig: config as SnowflakeConfig, }; + case 'BIGQUERY': + return { + name, + type: DBType.BIGQUERY, + bigqueryConfig: config as BigqueryConfig, + }; default: return; } @@ -76,6 +82,7 @@ export async function POST(request: Request) { return new Response(JSON.stringify(response)); } else if (mode === 'create') { const req: CreatePeerRequest = { peer }; + console.log('/peer/create req:', req); const createStatus: CreatePeerResponse = await fetch( `${flowServiceAddr}/v1/peers/create`, { diff --git a/ui/app/dto/PeersDTO.ts b/ui/app/dto/PeersDTO.ts index 271d598226..9954aceaa7 100644 --- a/ui/app/dto/PeersDTO.ts +++ b/ui/app/dto/PeersDTO.ts @@ -1,4 +1,8 @@ -import { PostgresConfig, SnowflakeConfig } from '@/grpc_generated/peers'; +import { + BigqueryConfig, + PostgresConfig, + SnowflakeConfig, +} from '@/grpc_generated/peers'; export type UValidatePeerResponse = { valid: boolean; @@ -27,7 +31,7 @@ export type UDropPeerResponse = { errorMessage: string; }; -export type PeerConfig = PostgresConfig | SnowflakeConfig; +export type PeerConfig = PostgresConfig | SnowflakeConfig | BigqueryConfig; export type CatalogPeer = { id: number; name: string; diff --git a/ui/app/mirrors/create/cdc.tsx b/ui/app/mirrors/create/cdc.tsx index 226167b2d6..98311238da 100644 --- a/ui/app/mirrors/create/cdc.tsx +++ b/ui/app/mirrors/create/cdc.tsx @@ -1,13 +1,13 @@ 'use client'; import { RequiredIndicator } from '@/components/RequiredIndicator'; import { QRepSyncMode } from '@/grpc_generated/flow'; -import { DBType, Peer } from '@/grpc_generated/peers'; +import { DBType, dBTypeToJSON } from '@/grpc_generated/peers'; import { Label } from '@/lib/Label'; import { RowWithSelect, RowWithSwitch, RowWithTextField } from '@/lib/Layout'; import { Select, SelectItem } from '@/lib/Select'; import { Switch } from '@/lib/Switch'; import { TextField } from '@/lib/TextField'; -import { Dispatch, SetStateAction } from 'react'; +import { Dispatch, SetStateAction, useEffect } from 'react'; import { InfoPopover } from '../../../components/InfoPopover'; import { CDCConfig, MirrorSetter, TableMapRow } from '../../dto/MirrorsDTO'; import { MirrorSetting } from './helpers/common'; @@ -16,12 +16,12 @@ import TableMapping from './tablemapping'; interface MirrorConfigProps { settings: MirrorSetting[]; mirrorConfig: CDCConfig; - peers: Peer[]; setter: MirrorSetter; rows: TableMapRow[]; setRows: Dispatch<SetStateAction<TableMapRow[]>>; schema: string; setSchema: Dispatch<SetStateAction<string>>; + setValidSource: Dispatch<SetStateAction<boolean>>; } export const defaultSyncMode = (dtype: DBType | undefined) => { @@ -35,9 +35,18 @@ export const defaultSyncMode = (dtype: DBType | undefined) => { } }; -export default function CDCConfigForm(props: MirrorConfigProps) { +export default function CDCConfigForm({ + settings, + mirrorConfig, + setter, + rows, + setRows, + schema, + setSchema, + setValidSource, +}: MirrorConfigProps) { const setToDefault = (setting: MirrorSetting) => { - const destinationPeerType = props.mirrorConfig.destination?.type; + const destinationPeerType = mirrorConfig.destination?.type; return ( setting.label.includes('Sync') && (destinationPeerType === DBType.POSTGRES || @@ -52,148 +61,167 @@ export default function CDCConfigForm(props: MirrorConfigProps) { ? QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO : QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT; } - setting.stateHandler(stateVal, props.setter); + setting.stateHandler(stateVal, setter); }; const paramDisplayCondition = (setting: MirrorSetting) => { const label = setting.label.toLowerCase(); if ( - (label.includes('snapshot') && - props.mirrorConfig.doInitialCopy !== true) || + (label.includes('snapshot') && mirrorConfig.doInitialCopy !== true) || (label.includes('snapshot staging') && - props.mirrorConfig.snapshotSyncMode?.toString() !== '1') || + mirrorConfig.snapshotSyncMode?.toString() !== '1') || (label.includes('cdc staging') && - props.mirrorConfig.cdcSyncMode?.toString() !== '1') + mirrorConfig.cdcSyncMode?.toString() !== '1') ) { return false; } return true; }; - return ( - <> - {props.mirrorConfig.source && ( + useEffect(() => { + if ( + mirrorConfig.source != undefined && + dBTypeToJSON(mirrorConfig.source?.type) === 'POSTGRES' + ) + setValidSource(true); + else { + setValidSource(false); + setRows([]); + } + }, [mirrorConfig.source, setValidSource, setRows]); + + if ( + mirrorConfig.source != undefined && + dBTypeToJSON(mirrorConfig.source?.type) === 'POSTGRES' + ) + return ( + <> <TableMapping - sourcePeerName={props.mirrorConfig.source.name} - rows={props.rows} - setRows={props.setRows} - setSchema={props.setSchema} - schema={props.schema} + sourcePeerName={mirrorConfig.source?.name} + rows={rows} + setRows={setRows} + setSchema={setSchema} + schema={schema} + peerType={mirrorConfig.destination?.type} /> - )} - {props.settings.map((setting, id) => { - return ( - paramDisplayCondition(setting) && - (setting.type === 'switch' ? ( - <RowWithSwitch - key={id} - label={<Label>{setting.label}</Label>} - action={ - <div - style={{ - display: 'flex', - flexDirection: 'row', - alignItems: 'center', - }} - > - <Switch - onCheckedChange={(state: boolean) => - handleChange(state, setting) - } - /> - {setting.tips && ( - <InfoPopover - tips={setting.tips} - link={setting.helpfulLink} - /> - )} - </div> - } - /> - ) : setting.type === 'select' ? ( - <RowWithSelect - key={id} - label={ - <Label> - {setting.label} - {RequiredIndicator(setting.required)} - </Label> - } - action={ - <div - style={{ - display: 'flex', - flexDirection: 'row', - alignItems: 'center', - }} - > - <Select - placeholder={`Select ${ - setting.label.includes('Peer') - ? 'a destination peer' - : 'a sync mode' - }`} - onValueChange={(val) => handleChange(val, setting)} - disabled={setToDefault(setting)} - value={ - setToDefault(setting) - ? defaultSyncMode(props.mirrorConfig.destination?.type) - : undefined - } + {settings.map((setting, id) => { + return ( + paramDisplayCondition(setting) && + (setting.type === 'switch' ? ( + <RowWithSwitch + key={id} + label={<Label>{setting.label}</Label>} + action={ + <div + style={{ + display: 'flex', + flexDirection: 'row', + alignItems: 'center', + }} > - {['AVRO', 'Copy with Binary'].map((item, id) => { - return ( - <SelectItem key={id} value={item.toString()}> - {item.toString()} - </SelectItem> - ); - })} - </Select> - {setting.tips && ( - <InfoPopover - tips={setting.tips} - link={setting.helpfulLink} + <Switch + onCheckedChange={(state: boolean) => + handleChange(state, setting) + } /> - )} - </div> - } - /> - ) : ( - <RowWithTextField - key={id} - label={ - <Label> - {setting.label} - {RequiredIndicator(setting.required)} - </Label> - } - action={ - <div - style={{ - display: 'flex', - flexDirection: 'row', - alignItems: 'center', - }} - > - <TextField - variant='simple' - type={setting.type} - defaultValue={setting.default} - onChange={(e: React.ChangeEvent<HTMLInputElement>) => - handleChange(e.target.value, setting) - } - /> - {setting.tips && ( - <InfoPopover - tips={setting.tips} - link={setting.helpfulLink} + {setting.tips && ( + <InfoPopover + tips={setting.tips} + link={setting.helpfulLink} + /> + )} + </div> + } + /> + ) : setting.type === 'select' ? ( + <RowWithSelect + key={id} + label={ + <Label> + {setting.label} + {RequiredIndicator(setting.required)} + </Label> + } + action={ + <div + style={{ + display: 'flex', + flexDirection: 'row', + alignItems: 'center', + }} + > + <Select + placeholder={`Select a sync mode`} + onValueChange={(val) => handleChange(val, setting)} + disabled={setToDefault(setting)} + value={ + setToDefault(setting) + ? defaultSyncMode(mirrorConfig.destination?.type) + : undefined + } + > + {['AVRO', 'Copy with Binary'].map((item, id) => { + return ( + <SelectItem key={id} value={item.toString()}> + {item.toString()} + </SelectItem> + ); + })} + </Select> + {setting.tips && ( + <InfoPopover + tips={setting.tips} + link={setting.helpfulLink} + /> + )} + </div> + } + /> + ) : ( + <RowWithTextField + key={id} + label={ + <Label> + {setting.label} + {RequiredIndicator(setting.required)} + </Label> + } + action={ + <div + style={{ + display: 'flex', + flexDirection: 'row', + alignItems: 'center', + }} + > + <TextField + variant='simple' + type={setting.type} + defaultValue={setting.default} + onChange={(e: React.ChangeEvent<HTMLInputElement>) => + handleChange(e.target.value, setting) + } /> - )} - </div> - } - /> - )) - ); - })} - </> - ); + {setting.tips && ( + <InfoPopover + tips={setting.tips} + link={setting.helpfulLink} + /> + )} + </div> + } + /> + )) + ); + })} + </> + ); + else { + return mirrorConfig.source ? ( + <Label> + Only PostgreSQL source peers are currently supported via UI. + </Label> + ) : ( + <></> + ); + } } diff --git a/ui/app/mirrors/create/page.tsx b/ui/app/mirrors/create/page.tsx index b689bffbb9..0609815b8a 100644 --- a/ui/app/mirrors/create/page.tsx +++ b/ui/app/mirrors/create/page.tsx @@ -42,6 +42,7 @@ export default function CreateMirrors() { const [config, setConfig] = useState<CDCConfig | QRepConfig>(blankCDCSetting); const [peers, setPeers] = useState<Peer[]>([]); const [rows, setRows] = useState<TableMapRow[]>([]); + const [validSource, setValidSource] = useState<boolean>(false); const [sourceSchema, setSourceSchema] = useState('public'); const [qrepQuery, setQrepQuery] = useState<string>(`-- Here's a sample template: @@ -167,6 +168,7 @@ export default function CreateMirrors() { </div> <Label as={Link} + target='_blank' style={{ color: 'teal', cursor: 'pointer' }} href='https://docs.peerdb.io/usecases/Real-time%20CDC/overview' > @@ -208,6 +210,7 @@ export default function CreateMirrors() { </div> <Label as={Link} + target='_blank' style={{ color: 'teal', cursor: 'pointer' }} href='https://docs.peerdb.io/usecases/Streaming%20Query%20Replication/overview' > @@ -243,6 +246,7 @@ export default function CreateMirrors() { </Label> <Label as={Link} + target='_blank' style={{ color: 'teal', cursor: 'pointer' }} href='https://docs.peerdb.io/sql/commands/create-mirror#xmin-query-replication' > @@ -386,12 +390,12 @@ export default function CreateMirrors() { <CDCConfigForm settings={cdcSettings} mirrorConfig={config as CDCConfig} - peers={peers} setter={setConfig} rows={rows} setRows={setRows} setSchema={setSourceSchema} schema={sourceSchema} + setValidSource={setValidSource} /> ) : ( <QRepConfigForm @@ -410,6 +414,7 @@ export default function CreateMirrors() { Cancel </Button> <Button + disabled={mirrorType === 'CDC' && !validSource} variant='normalSolid' onClick={() => mirrorType === 'CDC' diff --git a/ui/app/mirrors/create/tablemapping.tsx b/ui/app/mirrors/create/tablemapping.tsx index 65a77a8d8b..ac1a18f79a 100644 --- a/ui/app/mirrors/create/tablemapping.tsx +++ b/ui/app/mirrors/create/tablemapping.tsx @@ -1,5 +1,6 @@ 'use client'; import { RequiredIndicator } from '@/components/RequiredIndicator'; +import { DBType, dBTypeToJSON } from '@/grpc_generated/peers'; import { Label } from '@/lib/Label'; import { RowWithSelect, RowWithTextField } from '@/lib/Layout'; import { SearchField } from '@/lib/SearchField'; @@ -24,6 +25,7 @@ interface TableMappingProps { setRows: Dispatch<SetStateAction<TableMapRow[]>>; schema: string; setSchema: Dispatch<SetStateAction<string>>; + peerType?: DBType; } const TableMapping = ({ @@ -32,6 +34,7 @@ const TableMapping = ({ setRows, schema, setSchema, + peerType, }: TableMappingProps) => { const [allSchemas, setAllSchemas] = useState<string[]>(); const [tableColumns, setTableColumns] = useState< @@ -88,34 +91,60 @@ const TableMapping = ({ const getTablesOfSchema = useCallback( (schemaName: string) => { - fetchTables(sourcePeerName, schemaName, setLoading).then((tableNames) => - setRows((curr) => { - const newRows = [...curr]; - tableNames.forEach((tableName) => { - const row = newRows.find( - (row) => row.source === `${schemaName}.${tableName}` - ); - if (!row) { - newRows.push({ - source: `${schemaName}.${tableName}`, - destination: `${schemaName}.${tableName}`, - partitionKey: '', - selected: false, - }); - } - }); - return newRows; - }) - ); + fetchTables(sourcePeerName, schemaName, setLoading).then((tableNames) => { + if (tableNames) { + const newRows = []; + for (const tableName of tableNames) { + const dstName = + peerType != undefined && dBTypeToJSON(peerType) == 'BIGQUERY' + ? tableName + : `${schemaName}.${tableName}`; + newRows.push({ + source: `${schemaName}.${tableName}`, + destination: dstName, + partitionKey: '', + selected: false, + }); + } + setRows(newRows); + } + }); }, - [sourcePeerName, setRows] + [sourcePeerName, setRows, peerType] ); const [searchQuery, setSearchQuery] = useState(''); - const filteredRows = rows?.filter((row) => { - return row.source.toLowerCase().includes(searchQuery.toLowerCase()); - }); + useEffect(() => { + console.log('peertype and schema in useeffect:', peerType, schema); + if (peerType != undefined && dBTypeToJSON(peerType) == 'BIGQUERY') { + setRows((rows) => { + const newRows = [...rows]; + newRows.forEach((_, i) => { + const row = newRows[i]; + newRows[i] = { + ...row, + destination: row.destination?.split('.')[1], + }; + }); + return newRows; + }); + } else { + setRows((rows) => { + const newRows = [...rows]; + newRows.forEach((_, i) => { + const row = newRows[i]; + newRows[i] = { + ...row, + destination: `${schema}.${ + row.destination?.split('.')[1] || row.destination + }`, + }; + }); + return newRows; + }); + } + }, [peerType, setRows, schema]); useEffect(() => { fetchSchemas(sourcePeerName, setLoading).then((res) => setAllSchemas(res)); @@ -174,134 +203,141 @@ const TableMapping = ({ </div> </div> <div style={{ maxHeight: '40vh', overflow: 'scroll' }}> - {filteredRows ? ( - filteredRows.map((row, index) => ( - <div - key={index} - style={{ - width: '100%', - marginTop: '0.5rem', - padding: '0.5rem', - display: 'flex', - flexDirection: 'column', - border: '1px solid #e9ecf2', - boxShadow: '0px 2px 4px rgba(0, 0, 0, 0.1)', - borderRadius: '0.8rem', - background: 'linear-gradient(135deg, #FFFFFF 40%, #F5F5F5 60%)', - }} - > + {rows ? ( + rows + ?.filter((row) => { + return row.source + .toLowerCase() + .includes(searchQuery.toLowerCase()); + }) + .map((row, index) => ( <div + key={index} style={{ + width: '100%', + marginTop: '0.5rem', + padding: '0.5rem', display: 'flex', - justifyContent: 'space-between', - alignItems: 'start', + flexDirection: 'column', + border: '1px solid #e9ecf2', + boxShadow: '0px 2px 4px rgba(0, 0, 0, 0.1)', + borderRadius: '0.8rem', + background: + 'linear-gradient(135deg, #FFFFFF 40%, #F5F5F5 60%)', }} > - <div> - <div style={{ display: 'flex', alignItems: 'center' }}> - <Switch - checked={row.selected} - onCheckedChange={(state: boolean) => - handleSwitch(state, row.source) - } - /> - <div - style={{ - fontSize: 14, - overflow: 'hidden', - fontWeight: 'bold', - color: 'rgba(0,0,0,0.7)', - textOverflow: 'ellipsis', - whiteSpace: 'nowrap', - }} - > - {row.source} - </div> - </div> - {row.selected && ( - <div style={{ padding: '0.5rem' }}> - <RowWithTextField - key={row.source} - label={ - <div - style={{ - marginTop: '0.5rem', - fontSize: 14, - }} - > - Destination Table Name - {RequiredIndicator(true)} - </div> - } - action={ - <div - style={{ - marginTop: '0.5rem', - display: 'flex', - flexDirection: 'row', - alignItems: 'center', - }} - > - <TextField - variant='simple' - defaultValue={row.destination} - onChange={( - e: React.ChangeEvent<HTMLInputElement> - ) => - updateDestination(row.source, e.target.value) - } - /> - </div> - } - /> - <RowWithTextField - label={ - <div - style={{ - marginTop: '0.5rem', - fontSize: 14, - }} - > - Partition Key - </div> - } - action={ - <div - style={{ - marginTop: '0.5rem', - display: 'flex', - flexDirection: 'row', - alignItems: 'center', - }} - > - <TextField - variant='simple' - onChange={( - e: React.ChangeEvent<HTMLInputElement> - ) => - updatePartitionKey(row.source, e.target.value) - } - /> - </div> + <div + style={{ + display: 'flex', + justifyContent: 'space-between', + alignItems: 'start', + }} + > + <div> + <div style={{ display: 'flex', alignItems: 'center' }}> + <Switch + checked={row.selected} + onCheckedChange={(state: boolean) => + handleSwitch(state, row.source) } /> - <div style={{ fontSize: 14 }}> - This is used only if you enable initial load, and - specifies its watermark. + <div + style={{ + fontSize: 14, + overflow: 'hidden', + fontWeight: 'bold', + color: 'rgba(0,0,0,0.7)', + textOverflow: 'ellipsis', + whiteSpace: 'nowrap', + }} + > + {row.source} </div> </div> - )} + {row.selected && ( + <div style={{ padding: '0.5rem' }}> + <RowWithTextField + key={row.source} + label={ + <div + style={{ + marginTop: '0.5rem', + fontSize: 14, + }} + > + Destination Table Name + {RequiredIndicator(true)} + </div> + } + action={ + <div + style={{ + marginTop: '0.5rem', + display: 'flex', + flexDirection: 'row', + alignItems: 'center', + }} + > + <TextField + variant='simple' + defaultValue={row.destination} + onChange={( + e: React.ChangeEvent<HTMLInputElement> + ) => + updateDestination(row.source, e.target.value) + } + /> + </div> + } + /> + <RowWithTextField + label={ + <div + style={{ + marginTop: '0.5rem', + fontSize: 14, + }} + > + Partition Key + </div> + } + action={ + <div + style={{ + marginTop: '0.5rem', + display: 'flex', + flexDirection: 'row', + alignItems: 'center', + }} + > + <TextField + variant='simple' + onChange={( + e: React.ChangeEvent<HTMLInputElement> + ) => + updatePartitionKey(row.source, e.target.value) + } + /> + </div> + } + /> + <div style={{ fontSize: 14 }}> + This is used only if you enable initial load, and + specifies its watermark. + </div> + </div> + )} + </div> + <ColumnsDisplay + peerName={sourcePeerName} + schemaName={schema} + tableName={row.source.split('.')[1]} + setColumns={setTableColumns} + columns={tableColumns} + /> </div> - <ColumnsDisplay - peerName={sourcePeerName} - schemaName={schema} - tableName={row.source.split('.')[1]} - setColumns={setTableColumns} - columns={tableColumns} - /> </div> - </div> - )) + )) ) : ( <div style={{ diff --git a/ui/app/mirrors/page.tsx b/ui/app/mirrors/page.tsx index 89368719a5..3bb5c66db6 100644 --- a/ui/app/mirrors/page.tsx +++ b/ui/app/mirrors/page.tsx @@ -57,18 +57,9 @@ export default async function Mirrors() { return false; }); - let snapshotFlows = flows.filter((flow) => { - if (flow.config_proto && flow.query_string) { - let config = QRepConfig.decode(flow.config_proto); - return config.watermarkColumn.toLowerCase() === 'ctid'; - } - return false; - }); - stringifyConfig(cdcFlows); stringifyConfig(qrepFlows); stringifyConfig(xminFlows); - stringifyConfig(snapshotFlows); return ( <LayoutMain alignSelf='flex-start' justifySelf='flex-start' width='full'> @@ -105,9 +96,6 @@ export default async function Mirrors() { <Panel className='mt-10'> <QRepFlows title='XMIN Mirrors' qrepFlows={xminFlows} /> </Panel> - <Panel className='mt-10'> - <QRepFlows title='Snapshot Mirrors' qrepFlows={snapshotFlows} /> - </Panel> </LayoutMain> ); } diff --git a/ui/app/peers/create/[peerType]/handlers.ts b/ui/app/peers/create/[peerType]/handlers.ts index 0d09255528..f51e44aff7 100644 --- a/ui/app/peers/create/[peerType]/handlers.ts +++ b/ui/app/peers/create/[peerType]/handlers.ts @@ -4,7 +4,7 @@ import { UValidatePeerResponse, } from '@/app/dto/PeersDTO'; import { Dispatch, SetStateAction } from 'react'; -import { pgSchema, sfSchema } from './schema'; +import { bqSchema, pgSchema, sfSchema } from './schema'; // Frontend form validation const validateFields = ( @@ -27,6 +27,10 @@ const validateFields = ( const sfConfig = sfSchema.safeParse(config); if (!sfConfig.success) validationErr = sfConfig.error.issues[0].message; break; + case 'BIGQUERY': + const bqConfig = bqSchema.safeParse(config); + if (!bqConfig.success) validationErr = bqConfig.error.issues[0].message; + break; default: validationErr = 'Unsupported peer type ' + type; } diff --git a/ui/app/peers/create/[peerType]/helpers/bq.ts b/ui/app/peers/create/[peerType]/helpers/bq.ts new file mode 100644 index 0000000000..a38b3cb3ad --- /dev/null +++ b/ui/app/peers/create/[peerType]/helpers/bq.ts @@ -0,0 +1,15 @@ +import { BigqueryConfig } from '@/grpc_generated/peers'; + +export const blankBigquerySetting: BigqueryConfig = { + authType: 'service_account', + projectId: '', + privateKeyId: '', + privateKey: '', + clientEmail: '', + clientId: '', + authUri: '', + tokenUri: '', + authProviderX509CertUrl: '', + clientX509CertUrl: '', + datasetId: '', +}; diff --git a/ui/app/peers/create/[peerType]/helpers/common.ts b/ui/app/peers/create/[peerType]/helpers/common.ts index 8aad61812e..c08b0035ee 100644 --- a/ui/app/peers/create/[peerType]/helpers/common.ts +++ b/ui/app/peers/create/[peerType]/helpers/common.ts @@ -1,5 +1,6 @@ import { PeerConfig } from '@/app/dto/PeersDTO'; import { PeerSetter } from '@/components/ConfigForm'; +import { blankBigquerySetting } from './bq'; import { blankPostgresSetting } from './pg'; import { blankSnowflakeSetting } from './sf'; @@ -19,6 +20,8 @@ export const getBlankSetting = (dbType: string): PeerConfig => { return blankPostgresSetting; case 'SNOWFLAKE': return blankSnowflakeSetting; + case 'BIGQUERY': + return blankBigquerySetting; default: return blankPostgresSetting; } diff --git a/ui/app/peers/create/[peerType]/helpers/sf.ts b/ui/app/peers/create/[peerType]/helpers/sf.ts index 44cbde7fe8..e6707d7627 100644 --- a/ui/app/peers/create/[peerType]/helpers/sf.ts +++ b/ui/app/peers/create/[peerType]/helpers/sf.ts @@ -71,7 +71,7 @@ export const snowflakeSetting: PeerSetting[] = [ if (!value.length) { // remove password key from state if empty setter((curr) => { - delete curr['password']; + delete (curr as SnowflakeConfig)['password']; return curr; }); } else setter((curr) => ({ ...curr, password: value })); diff --git a/ui/app/peers/create/[peerType]/page.tsx b/ui/app/peers/create/[peerType]/page.tsx index 01af2b0dd1..6bceac8b18 100644 --- a/ui/app/peers/create/[peerType]/page.tsx +++ b/ui/app/peers/create/[peerType]/page.tsx @@ -1,5 +1,6 @@ 'use client'; import { PeerConfig } from '@/app/dto/PeersDTO'; +import BQConfig from '@/components/BigqueryConfig'; import { Button } from '@/lib/Button'; import { ButtonGroup } from '@/lib/ButtonGroup'; import { Label } from '@/lib/Label'; @@ -42,6 +43,8 @@ export default function CreateConfig({ return configForm(postgresSetting); case 'SNOWFLAKE': return configForm(snowflakeSetting); + case 'BIGQUERY': + return <BQConfig setter={setConfig} />; default: return <></>; } diff --git a/ui/app/peers/create/[peerType]/schema.ts b/ui/app/peers/create/[peerType]/schema.ts index 4134f08f1e..69da7b1c1b 100644 --- a/ui/app/peers/create/[peerType]/schema.ts +++ b/ui/app/peers/create/[peerType]/schema.ts @@ -6,7 +6,7 @@ export const pgSchema = z.object({ required_error: 'Host is required', invalid_type_error: 'Host must be a string', }) - .nonempty({ message: 'Host cannot be empty' }) + .min(1, { message: 'Host cannot be empty' }) .max(255, 'Host must be less than 255 characters'), port: z .number({ @@ -49,40 +49,40 @@ export const sfSchema = z.object({ required_error: 'Account ID is required', invalid_type_error: 'Account ID must be a string', }) - .nonempty({ message: 'Account ID must be non-empty' }) + .min(1, { message: 'Account ID must be non-empty' }) .max(255, 'Account ID must be less than 255 characters'), privateKey: z .string({ required_error: 'Private Key is required', invalid_type_error: 'Private Key must be a string', }) - .nonempty({ message: 'Private Key must be non-empty' }), + .min(1, { message: 'Private Key must be non-empty' }), username: z .string({ required_error: 'Username is required', invalid_type_error: 'Username must be a string', }) - .nonempty({ message: 'Username must be non-empty' }) + .min(1, { message: 'Username must be non-empty' }) .max(255, 'Username must be less than 255 characters'), database: z .string({ required_error: 'Database is required', invalid_type_error: 'Database must be a string', }) - .nonempty({ message: 'Database must be non-empty' }) + .min(1, { message: 'Database must be non-empty' }) .max(255, 'Database must be less than 100 characters'), warehouse: z .string({ required_error: 'Warehouse is required', invalid_type_error: 'Warehouse must be a string', }) - .nonempty({ message: 'Warehouse must be non-empty' }) + .min(1, { message: 'Warehouse must be non-empty' }) .max(255, 'Warehouse must be less than 64 characters'), role: z .string({ invalid_type_error: 'Role must be a string', }) - .nonempty({ message: 'Role must be non-empty' }) + .min(1, { message: 'Role must be non-empty' }) .max(255, 'Role must be below 255 characters'), queryTimeout: z .number({ @@ -106,3 +106,82 @@ export const sfSchema = z.object({ .max(255, 's3Integration must be less than 255 characters') .optional(), }); + +export const bqSchema = z.object({ + authType: z + .string({ + required_error: 'Auth Type is required', + invalid_type_error: 'Auth Type must be a string', + }) + .min(1, { message: 'Auth Type must be non-empty' }) + .max(255, 'Auth Type must be less than 255 characters'), + projectId: z + .string({ + required_error: 'Project ID is required', + invalid_type_error: 'Project ID must be a string', + }) + .min(1, { message: 'Project ID must be non-empty' }), + privateKeyId: z + .string({ + required_error: 'Private Key ID is required', + invalid_type_error: 'Private Key ID must be a string', + }) + .min(1, { message: 'Private Key must be non-empty' }), + privateKey: z + .string({ + required_error: 'Private Key is required', + invalid_type_error: 'Private Key must be a string', + }) + .min(1, { message: 'Private Key must be non-empty' }), + clientId: z + .string({ + required_error: 'Client ID is required', + invalid_type_error: 'Client ID must be a string', + }) + .min(1, { message: 'Client ID must be non-empty' }), + clientEmail: z + .string({ + required_error: 'Client Email is required', + invalid_type_error: 'Client Email must be a string', + }) + .min(1, { message: 'Client Email must be non-empty' }), + authUri: z + .string({ + required_error: 'Auth URI is required', + invalid_type_error: 'Auth URI must be a string', + }) + .url({ message: 'Invalid auth URI' }) + .min(1, { message: 'Auth URI must be non-empty' }), + tokenUri: z + .string({ + required_error: 'Token URI is required', + invalid_type_error: 'Token URI must be a string', + }) + .url({ message: 'Invalid token URI' }) + .min(1, { message: 'Token URI must be non-empty' }), + authProviderX509CertUrl: z + .string({ + invalid_type_error: 'Auth Cert URL must be a string', + required_error: 'Auth Cert URL is required', + }) + .url({ message: 'Invalid auth cert URL' }) + .min(1, { message: 'Auth Cert URL must be non-empty' }), + clientX509CertUrl: z + .string({ + invalid_type_error: 'Client Cert URL must be a string', + required_error: 'Client Cert URL is required', + }) + .url({ message: 'Invalid client cert URL' }) + .min(1, { message: 'Client Cert URL must be non-empty' }), + datasetId: z + .string({ + invalid_type_error: 'Dataset ID must be a string', + required_error: 'Dataset ID is required', + }) + .min(1, { message: 'Dataset ID must be non-empty' }) + .max(1024, 'DatasetID must be less than 1025 characters') + .regex( + /^[\w]+$/, + 'Dataset ID must only contain numbers, letters, and underscores' + ), +}); diff --git a/ui/app/peers/peersTable.tsx b/ui/app/peers/peersTable.tsx index 24630cc0ae..5a3099603a 100644 --- a/ui/app/peers/peersTable.tsx +++ b/ui/app/peers/peersTable.tsx @@ -30,7 +30,7 @@ function PeerRow({ peer }: { peer: Peer }) { } function PeersTable({ title, peers }: { title: string; peers: Peer[] }) { - const [rows, setRows] = useState<Peer[]>([]); + const [rows, setRows] = useState<Peer[]>(peers); return ( <Table diff --git a/ui/components/BigqueryConfig.tsx b/ui/components/BigqueryConfig.tsx new file mode 100644 index 0000000000..d58def1d16 --- /dev/null +++ b/ui/components/BigqueryConfig.tsx @@ -0,0 +1,132 @@ +'use client'; +import { blankBigquerySetting } from '@/app/peers/create/[peerType]/helpers/bq'; +import { BigqueryConfig } from '@/grpc_generated/peers'; +import { Label } from '@/lib/Label'; +import { RowWithTextField } from '@/lib/Layout'; +import { TextField } from '@/lib/TextField'; +import { Tooltip } from '@/lib/Tooltip'; +import Link from 'next/link'; +import { useState } from 'react'; +import { PeerSetter } from './ConfigForm'; +import { InfoPopover } from './InfoPopover'; + +interface BQProps { + setter: PeerSetter; +} +export default function BQConfig(props: BQProps) { + const [datasetID, setDatasetID] = useState<string>(''); + const handleJSONFile = (file: File) => { + if (file) { + const reader = new FileReader(); + reader.readAsText(file); + reader.onload = () => { + // Read the file as JSON + let bqJson; + try { + bqJson = JSON.parse(reader.result as string); + } catch (err) { + props.setter(blankBigquerySetting); + return; + } + const bqConfig: BigqueryConfig = { + authType: bqJson.type, + projectId: bqJson.project_id, + privateKeyId: bqJson.private_key_id, + privateKey: bqJson.private_key, + clientEmail: bqJson.client_email, + clientId: bqJson.client_id, + authUri: bqJson.auth_uri, + tokenUri: bqJson.token_uri, + authProviderX509CertUrl: bqJson.auth_provider_x509_cert_url, + clientX509CertUrl: bqJson.client_x509_cert_url, + datasetId: datasetID, + }; + props.setter(bqConfig); + }; + reader.onerror = (error) => { + console.log(error); + }; + } + }; + + return ( + <> + <Label> + A service account JSON file in BigQuery is a file that contains + information which allows PeerDB to securely access BigQuery resources. + </Label> + <Label + as={Link} + style={{ color: 'teal', textDecoration: 'underline' }} + href={`https://cloud.google.com/bigquery/docs/authentication/service-account-file`} + > + Creating a service account file + </Label> + <RowWithTextField + label={ + <Label> + Service Account JSON + <Tooltip + style={{ width: '100%' }} + content={'This is a required field.'} + > + <Label colorName='lowContrast' colorSet='destructive'> + * + </Label> + </Tooltip> + </Label> + } + action={ + <TextField + variant='simple' + type='file' + style={{ border: 'none', height: 'auto' }} + onChange={(e: React.ChangeEvent<HTMLInputElement>) => + e.target.files && handleJSONFile(e.target.files[0]) + } + /> + } + /> + + <RowWithTextField + label={ + <Label> + Dataset ID + <Tooltip + style={{ width: '100%' }} + content={'This is a required field.'} + > + <Label colorName='lowContrast' colorSet='destructive'> + * + </Label> + </Tooltip> + </Label> + } + action={ + <div + style={{ + display: 'flex', + flexDirection: 'row', + alignItems: 'center', + }} + > + <TextField + variant='simple' + onChange={(e: React.ChangeEvent<HTMLInputElement>) => { + setDatasetID(e.target.value); + props.setter((curr) => ({ + ...curr, + datasetId: e.target.value, + })); + }} + /> + <InfoPopover + tips={'ID of the dataset containing the tables you want to sync.'} + link={`https://cloud.google.com/bigquery/docs/datasets`} + /> + </div> + } + /> + </> + ); +} diff --git a/ui/components/PeerComponent.tsx b/ui/components/PeerComponent.tsx index e6a2f49b28..af88fd9252 100644 --- a/ui/components/PeerComponent.tsx +++ b/ui/components/PeerComponent.tsx @@ -4,16 +4,18 @@ import { Button } from '@/lib/Button'; import { Label } from '@/lib/Label'; import Image from 'next/image'; import { useRouter } from 'next/navigation'; -export const DBTypeToImageMapping = (peerType: DBType) => { +export const DBTypeToImageMapping = (peerType: DBType | string) => { switch (peerType) { case DBType.POSTGRES: + case 'POSTGRES': return '/svgs/pg.svg'; case DBType.SNOWFLAKE: + case 'SNOWFLAKE': return '/svgs/sf.svg'; case DBType.BIGQUERY: + case 'BIGQUERY': return '/svgs/bq.svg'; case DBType.EVENTHUB_GROUP: - return '/svgs/ms.svg'; case DBType.EVENTHUB: return '/svgs/ms.svg'; default: diff --git a/ui/components/SelectSource.tsx b/ui/components/SelectSource.tsx index 955cb09160..49fde97c73 100644 --- a/ui/components/SelectSource.tsx +++ b/ui/components/SelectSource.tsx @@ -1,7 +1,9 @@ 'use client'; import { DBType } from '@/grpc_generated/peers'; import { Select, SelectItem } from '@/lib/Select'; +import Image from 'next/image'; import { Dispatch, SetStateAction } from 'react'; +import { DBTypeToImageMapping } from './PeerComponent'; interface SelectSourceProps { peerType: string; @@ -12,8 +14,9 @@ export default function SelectSource({ setPeerType }: SelectSourceProps) { const dbTypes: string[] = Object.values(DBType).filter( (value): value is string => typeof value === 'string' && - (value === 'POSTGRES' || value === 'SNOWFLAKE') + (value === 'POSTGRES' || value === 'SNOWFLAKE' || value === 'BIGQUERY') ); + return ( <Select placeholder='Select a source' @@ -21,9 +24,14 @@ export default function SelectSource({ setPeerType }: SelectSourceProps) { onValueChange={(val) => setPeerType(val)} > {dbTypes.map((dbType, id) => { + const peerLogo = DBTypeToImageMapping(dbType); return ( <SelectItem key={id} value={dbType}> - {dbType} + <div style={{ display: 'flex', alignItems: 'center' }}> + <Image src={peerLogo} alt='peer' height={15} width={15} /> + + <div style={{ marginLeft: 10 }}>{dbType}</div> + </div> </SelectItem> ); })}