From 9917a362cf643ac77e3b7825be37e747025246ac Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 16 Nov 2023 09:15:50 +0530 Subject: [PATCH 1/9] s3 peer boilerplate --- flow/cmd/handler.go | 8 +- ui/app/dto/PeersDTO.ts | 7 +- ui/app/peers/create/[peerType]/handlers.ts | 6 +- ui/app/peers/create/[peerType]/helpers/s3.ts | 59 +++++++++ ui/app/peers/create/[peerType]/page.tsx | 3 + ui/app/peers/create/[peerType]/schema.ts | 40 ++++++ ui/components/PeerComponent.tsx | 3 + ui/components/S3Form.tsx | 132 +++++++++++++++++++ ui/components/SelectSource.tsx | 7 +- 9 files changed, 260 insertions(+), 5 deletions(-) create mode 100644 ui/app/peers/create/[peerType]/helpers/s3.ts create mode 100644 ui/components/S3Form.tsx diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 416800f02f..13ce8115b1 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -559,7 +559,13 @@ func (h *FlowRequestHandler) CreatePeer( } sqlServerConfig := sqlServerConfigObject.SqlserverConfig encodedConfig, encodingErr = proto.Marshal(sqlServerConfig) - + case protos.DBType_S3: + s3ConfigObject, ok := config.(*protos.Peer_S3Config) + if !ok { + return wrongConfigResponse, nil + } + s3Config := s3ConfigObject.S3Config + encodedConfig, encodingErr = proto.Marshal(s3Config) default: return wrongConfigResponse, nil } diff --git a/ui/app/dto/PeersDTO.ts b/ui/app/dto/PeersDTO.ts index 9954aceaa7..0e6dfef818 100644 --- a/ui/app/dto/PeersDTO.ts +++ b/ui/app/dto/PeersDTO.ts @@ -1,6 +1,7 @@ import { BigqueryConfig, PostgresConfig, + S3Config, SnowflakeConfig, } from '@/grpc_generated/peers'; @@ -31,7 +32,11 @@ export type UDropPeerResponse = { errorMessage: string; }; -export type PeerConfig = PostgresConfig | SnowflakeConfig | BigqueryConfig; +export type PeerConfig = + | PostgresConfig + | SnowflakeConfig + | BigqueryConfig + | S3Config; export type CatalogPeer = { id: number; name: string; diff --git a/ui/app/peers/create/[peerType]/handlers.ts b/ui/app/peers/create/[peerType]/handlers.ts index 21c0a63b64..0579854fdd 100644 --- a/ui/app/peers/create/[peerType]/handlers.ts +++ b/ui/app/peers/create/[peerType]/handlers.ts @@ -4,7 +4,7 @@ import { UValidatePeerResponse, } from '@/app/dto/PeersDTO'; import { Dispatch, SetStateAction } from 'react'; -import { bqSchema, peerNameSchema, pgSchema, sfSchema } from './schema'; +import { bqSchema, peerNameSchema, pgSchema, s3Schema, sfSchema } from './schema'; const validateFields = ( type: string, @@ -33,6 +33,10 @@ const validateFields = ( const bqConfig = bqSchema.safeParse(config); if (!bqConfig.success) validationErr = bqConfig.error.issues[0].message; break; + case 'S3': + const s3Config = s3Schema.safeParse(config); + if (!s3Config.success) validationErr = s3Config.error.issues[0].message; + break; default: validationErr = 'Unsupported peer type ' + type; } diff --git a/ui/app/peers/create/[peerType]/helpers/s3.ts b/ui/app/peers/create/[peerType]/helpers/s3.ts new file mode 100644 index 0000000000..6469fa6231 --- /dev/null +++ b/ui/app/peers/create/[peerType]/helpers/s3.ts @@ -0,0 +1,59 @@ +import { S3Config } from '@/grpc_generated/peers'; +import { PeerSetting } from './common'; + +export const s3Setting: PeerSetting[] = [ + { + label: 'Bucket URL', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, url: value })), + tips: 'The URL of the S3/GCS bucket. It begins with s3://', + }, + { + label: 'Access Key ID', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, accessKeyID: parseInt(value, 10) })), + tips: 'Specifies the TCP/IP port or local Unix domain socket file extension on which postgres is listening for connections from client applications.', + }, + { + label: 'Secret Access Key', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, secretAccessKey: value })), + tips: 'Specify the user that we should use to connect to this host.', + helpfulLink: 'https://www.postgresql.org/docs/8.0/user-manag.html', + }, + { + label: 'Role ARN', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, roleArn: value })), + type: 'password', + tips: 'Password associated with the user you provided.', + helpfulLink: 'https://www.postgresql.org/docs/current/auth-password.html', + optional: true, + }, + { + label: 'Region', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, region: value })), + tips: 'Specify which database to associate with this peer.', + helpfulLink: + 'https://www.postgresql.org/docs/current/sql-createdatabase.html', + optional: true, + }, + { + label: 'Endpoint', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, endpoint: value })), + optional: true, + tips: '', + }, +]; + +export const blankS3Config: S3Config = { + url: '', + accessKeyId: undefined, + secretAccessKey: undefined, + roleArn: undefined, + region: undefined, + endpoint: undefined, + metadataDb: undefined, +}; diff --git a/ui/app/peers/create/[peerType]/page.tsx b/ui/app/peers/create/[peerType]/page.tsx index 6bceac8b18..9f86c9d80e 100644 --- a/ui/app/peers/create/[peerType]/page.tsx +++ b/ui/app/peers/create/[peerType]/page.tsx @@ -1,6 +1,7 @@ 'use client'; import { PeerConfig } from '@/app/dto/PeersDTO'; import BQConfig from '@/components/BigqueryConfig'; +import S3ConfigForm from '@/components/S3Form'; import { Button } from '@/lib/Button'; import { ButtonGroup } from '@/lib/ButtonGroup'; import { Label } from '@/lib/Label'; @@ -45,6 +46,8 @@ export default function CreateConfig({ return configForm(snowflakeSetting); case 'BIGQUERY': return ; + case 'S3': + return ; default: return <>; } diff --git a/ui/app/peers/create/[peerType]/schema.ts b/ui/app/peers/create/[peerType]/schema.ts index 3f92aef257..01a6a08dad 100644 --- a/ui/app/peers/create/[peerType]/schema.ts +++ b/ui/app/peers/create/[peerType]/schema.ts @@ -196,3 +196,43 @@ export const bqSchema = z.object({ 'Dataset ID must only contain numbers, letters, and underscores' ), }); + +export const s3Schema = z.object({ + url: z + .string({ + invalid_type_error: 'URL must be a string', + required_error: 'URL is required', + }) + .min(1, { message: 'URL must be non-empty' }) + .refine((url) => url.startsWith('s3:/'), { + message: 'URL must start with s3:/', + }), + accessKeyId: z + .string({ + invalid_type_error: 'Access Key ID must be a string', + required_error: 'Access Key ID is required', + }) + .min(1, { message: 'Access Key ID must be non-empty' }), + secretAccessKey: z + .string({ + invalid_type_error: 'Secret Access Key must be a string', + required_error: 'Secret Access Key is required', + }) + .min(1, { message: 'Secret Access Key must be non-empty' }), + roleArn: z + .string({ + invalid_type_error: 'Role ARN must be a string', + }) + .optional(), + region: z + .string({ + invalid_type_error: 'Region must be a string', + }) + .optional(), + endpoint: z + .string({ + invalid_type_error: 'Endpoint must be a string', + }) + .optional(), + metadataDb: pgSchema, +}); diff --git a/ui/components/PeerComponent.tsx b/ui/components/PeerComponent.tsx index ab956992e2..7378e91635 100644 --- a/ui/components/PeerComponent.tsx +++ b/ui/components/PeerComponent.tsx @@ -15,6 +15,9 @@ export const DBTypeToImageMapping = (peerType: DBType | string) => { case DBType.BIGQUERY: case 'BIGQUERY': return '/svgs/bq.svg'; + case DBType.S3: + case 'S3': + return '/svgs/aws.svg'; case DBType.EVENTHUB_GROUP: case DBType.EVENTHUB: return '/svgs/ms.svg'; diff --git a/ui/components/S3Form.tsx b/ui/components/S3Form.tsx new file mode 100644 index 0000000000..4ca83214e5 --- /dev/null +++ b/ui/components/S3Form.tsx @@ -0,0 +1,132 @@ +'use client'; +import { PeerConfig } from '@/app/dto/PeersDTO'; +import { + blankPostgresSetting, + postgresSetting, +} from '@/app/peers/create/[peerType]/helpers/pg'; +import { s3Setting } from '@/app/peers/create/[peerType]/helpers/s3'; +import { PostgresConfig } from '@/grpc_generated/peers'; +import { Label } from '@/lib/Label'; +import { RowWithTextField } from '@/lib/Layout'; +import { TextField } from '@/lib/TextField'; +import { Tooltip } from '@/lib/Tooltip'; +import { useEffect, useState } from 'react'; +import { PeerSetter } from './ConfigForm'; +import { InfoPopover } from './InfoPopover'; + +interface S3Props { + setter: PeerSetter; +} +const S3ConfigForm = ({ setter }: S3Props) => { + const [metadataDB, setMetadataDB] = + useState(blankPostgresSetting); + useEffect(() => { + setter((prev) => { + return { + ...prev, + metadataDb: metadataDB as PostgresConfig, + }; + }); + }, [metadataDB]); + return ( + <> + {s3Setting.map((setting, index) => { + return ( + + {setting.label}{' '} + {!setting.optional && ( + + + + )} + + } + action={ +
+ ) => + setting.stateHandler(e.target.value, setter) + } + /> + {setting.tips && ( + + )} +
+ } + /> + ); + })} + + + + {postgresSetting.map((setting, index) => { + + {setting.label}{' '} + + + + + } + action={ +
+ ) => + setting.stateHandler(e.target.value, setMetadataDB) + } + /> + {setting.tips && ( + + )} +
+ } + />; + })} + + ); +}; + +export default S3ConfigForm; diff --git a/ui/components/SelectSource.tsx b/ui/components/SelectSource.tsx index 695174a2fe..bc911f8fa5 100644 --- a/ui/components/SelectSource.tsx +++ b/ui/components/SelectSource.tsx @@ -28,7 +28,10 @@ export default function SelectSource({ .filter( (value): value is string => typeof value === 'string' && - (value === 'POSTGRES' || value === 'SNOWFLAKE' || value === 'BIGQUERY') + (value === 'POSTGRES' || + value === 'SNOWFLAKE' || + value === 'BIGQUERY' || + value === 'S3') ) .map((value) => ({ label: value, value })); @@ -37,7 +40,7 @@ export default function SelectSource({ placeholder='Select a source' options={dbTypes} defaultValue={dbTypes.find((opt) => opt.value === peerType)} - onChange={(val, action) => val && setPeerType(val.value)} + onChange={(val, _) => val && setPeerType(val.value)} formatOptionLabel={SourceLabel} /> ); From 314a16436b899c7f30b000eb5d8e4d86450608b7 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 16 Nov 2023 13:32:17 +0530 Subject: [PATCH 2/9] s3 peer ui v0 --- flow/connectors/core.go | 8 +- ui/app/api/peers/route.ts | 6 + ui/app/mirrors/create/helpers/common.ts | 2 +- ui/app/mirrors/create/page.tsx | 1 - ui/app/peers/create/[peerType]/handlers.ts | 16 ++ .../peers/create/[peerType]/helpers/common.ts | 3 + ui/app/peers/create/[peerType]/helpers/s3.ts | 54 ++-- ui/app/peers/create/[peerType]/page.tsx | 14 +- ui/app/peers/create/[peerType]/schema.ts | 4 +- ui/components/S3Form.tsx | 260 +++++++++++------- 10 files changed, 242 insertions(+), 126 deletions(-) diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 0612514dd7..4c393ca48d 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -230,8 +230,12 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) { return nil, fmt.Errorf("missing sqlserver config for %s peer %s", peer.Type.String(), peer.Name) } return connsqlserver.NewSQLServerConnector(ctx, sqlServerConfig) - // case protos.DBType_S3: - // return conns3.NewS3Connector(ctx, config.GetS3Config()) + case protos.DBType_S3: + s3Config := peer.GetS3Config() + if s3Config == nil { + return nil, fmt.Errorf("missing s3 config for %s peer %s", peer.Type.String(), peer.Name) + } + return conns3.NewS3Connector(ctx, s3Config) // case protos.DBType_EVENTHUB: // return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig()) default: diff --git a/ui/app/api/peers/route.ts b/ui/app/api/peers/route.ts index ff790da762..ca74ba0127 100644 --- a/ui/app/api/peers/route.ts +++ b/ui/app/api/peers/route.ts @@ -52,6 +52,12 @@ const constructPeer = ( type: DBType.BIGQUERY, bigqueryConfig: config as BigqueryConfig, }; + case 'S3': + return { + name, + type: DBType.S3, + s3Config: config as S3Config, + }; default: return; } diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index 86a8e04430..02f548eeba 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -59,7 +59,7 @@ export const blankQRepSetting = { waitBetweenBatchesSeconds: 30, writeMode: undefined, stagingPath: '', - numRowsPerPartition: 0, + numRowsPerPartition: 100000, setupWatermarkTableOnDestination: false, dstTableFullResync: false, }; diff --git a/ui/app/mirrors/create/page.tsx b/ui/app/mirrors/create/page.tsx index d2027f1dd8..b443cdd760 100644 --- a/ui/app/mirrors/create/page.tsx +++ b/ui/app/mirrors/create/page.tsx @@ -63,7 +63,6 @@ export default function CreateMirrors() { const [config, setConfig] = useState(blankCDCSetting); const [peers, setPeers] = useState([]); const [rows, setRows] = useState([]); - const [validSource, setValidSource] = useState(false); const [sourceSchema, setSourceSchema] = useState('public'); const [qrepQuery, setQrepQuery] = useState(`-- Here's a sample template: diff --git a/ui/app/peers/create/[peerType]/handlers.ts b/ui/app/peers/create/[peerType]/handlers.ts index 0579854fdd..424200dc7d 100644 --- a/ui/app/peers/create/[peerType]/handlers.ts +++ b/ui/app/peers/create/[peerType]/handlers.ts @@ -3,6 +3,7 @@ import { UCreatePeerResponse, UValidatePeerResponse, } from '@/app/dto/PeersDTO'; +import { S3Config } from '@/grpc_generated/peers'; import { Dispatch, SetStateAction } from 'react'; import { bqSchema, peerNameSchema, pgSchema, s3Schema, sfSchema } from './schema'; @@ -19,6 +20,14 @@ const validateFields = ( return false; } + if (type === 'S3') { + const s3Valid = S3Validation(config as S3Config); + if (s3Valid.length > 0) { + setMessage({ ok: false, msg: s3Valid }); + return false; + } + } + let validationErr: string | undefined; switch (type) { case 'POSTGRES': @@ -76,6 +85,13 @@ export const handleValidate = async ( setLoading(false); }; +const S3Validation = (config: S3Config): string => { + if (!config.secretAccessKey && !config.accessKeyId && !config.roleArn) { + return 'Either both access key and secret or role ARN is required'; + } + return ''; +}; + // API call to create peer export const handleCreate = async ( type: string, diff --git a/ui/app/peers/create/[peerType]/helpers/common.ts b/ui/app/peers/create/[peerType]/helpers/common.ts index c08b0035ee..62b2bb26c2 100644 --- a/ui/app/peers/create/[peerType]/helpers/common.ts +++ b/ui/app/peers/create/[peerType]/helpers/common.ts @@ -2,6 +2,7 @@ import { PeerConfig } from '@/app/dto/PeersDTO'; import { PeerSetter } from '@/components/ConfigForm'; import { blankBigquerySetting } from './bq'; import { blankPostgresSetting } from './pg'; +import { blankS3Setting } from './s3'; import { blankSnowflakeSetting } from './sf'; export interface PeerSetting { @@ -22,6 +23,8 @@ export const getBlankSetting = (dbType: string): PeerConfig => { return blankSnowflakeSetting; case 'BIGQUERY': return blankBigquerySetting; + case 'S3': + return blankS3Setting; default: return blankPostgresSetting; } diff --git a/ui/app/peers/create/[peerType]/helpers/s3.ts b/ui/app/peers/create/[peerType]/helpers/s3.ts index 6469fa6231..c085d3df74 100644 --- a/ui/app/peers/create/[peerType]/helpers/s3.ts +++ b/ui/app/peers/create/[peerType]/helpers/s3.ts @@ -6,54 +6,60 @@ export const s3Setting: PeerSetting[] = [ label: 'Bucket URL', stateHandler: (value, setter) => setter((curr) => ({ ...curr, url: value })), - tips: 'The URL of the S3/GCS bucket. It begins with s3://', + tips: 'The URL of your existing S3/GCS bucket along with a prefix of your choice. It begins with s3://', + helpfulLink: + 'https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-bucket-intro.html#accessing-a-bucket-using-S3-format', + default: 's3:///', }, { label: 'Access Key ID', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, accessKeyID: parseInt(value, 10) })), - tips: 'Specifies the TCP/IP port or local Unix domain socket file extension on which postgres is listening for connections from client applications.', + setter((curr) => ({ ...curr, accessKeyId: value })), + optional: true, + tips: 'The AWS access key ID associated with your account. In case of GCS, this is the HMAC access key ID.', + helpfulLink: + 'https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html', }, { label: 'Secret Access Key', stateHandler: (value, setter) => setter((curr) => ({ ...curr, secretAccessKey: value })), - tips: 'Specify the user that we should use to connect to this host.', - helpfulLink: 'https://www.postgresql.org/docs/8.0/user-manag.html', - }, - { - label: 'Role ARN', - stateHandler: (value, setter) => - setter((curr) => ({ ...curr, roleArn: value })), - type: 'password', - tips: 'Password associated with the user you provided.', - helpfulLink: 'https://www.postgresql.org/docs/current/auth-password.html', + tips: 'The AWS secret access key associated with your account. In case of GCS, this is the HMAC secret.', + helpfulLink: + 'https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html', optional: true, }, { label: 'Region', stateHandler: (value, setter) => setter((curr) => ({ ...curr, region: value })), - tips: 'Specify which database to associate with this peer.', - helpfulLink: - 'https://www.postgresql.org/docs/current/sql-createdatabase.html', - optional: true, + tips: 'The region where your bucket is located. For example, us-east-1. In case of GCS, this will be set to auto, which detects where your bucket it.', }, { - label: 'Endpoint', + label: 'Role ARN', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, endpoint: value })), + setter((curr) => ({ ...curr, roleArn: value })), + type: 'password', + tips: 'You may set this instead of the access key ID and secret.', + helpfulLink: + 'https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_identifiers.html#identifiers-arns', optional: true, - tips: '', }, ]; -export const blankS3Config: S3Config = { - url: '', +export const blankS3Setting: S3Config = { + url: 's3:///', accessKeyId: undefined, secretAccessKey: undefined, roleArn: undefined, region: undefined, - endpoint: undefined, - metadataDb: undefined, + endpoint: '', + metadataDb: { + host: '', + port: 5432, + user: 'postgres', + password: '', + database: 'postgres', + transactionSnapshot: '', + }, }; diff --git a/ui/app/peers/create/[peerType]/page.tsx b/ui/app/peers/create/[peerType]/page.tsx index 9f86c9d80e..cbefbfad8b 100644 --- a/ui/app/peers/create/[peerType]/page.tsx +++ b/ui/app/peers/create/[peerType]/page.tsx @@ -5,7 +5,7 @@ import S3ConfigForm from '@/components/S3Form'; import { Button } from '@/lib/Button'; import { ButtonGroup } from '@/lib/ButtonGroup'; import { Label } from '@/lib/Label'; -import { LayoutMain, RowWithTextField } from '@/lib/Layout'; +import { RowWithTextField } from '@/lib/Layout'; import { Panel } from '@/lib/Panel'; import { TextField } from '@/lib/TextField'; import { Tooltip } from '@/lib/Tooltip'; @@ -58,7 +58,15 @@ export default function CreateConfig({ }; return ( - +
- +
); } diff --git a/ui/app/peers/create/[peerType]/schema.ts b/ui/app/peers/create/[peerType]/schema.ts index 01a6a08dad..ecbe5b63c3 100644 --- a/ui/app/peers/create/[peerType]/schema.ts +++ b/ui/app/peers/create/[peerType]/schema.ts @@ -204,8 +204,8 @@ export const s3Schema = z.object({ required_error: 'URL is required', }) .min(1, { message: 'URL must be non-empty' }) - .refine((url) => url.startsWith('s3:/'), { - message: 'URL must start with s3:/', + .refine((url) => url.startsWith('s3://'), { + message: 'URL must start with s3://', }), accessKeyId: z .string({ diff --git a/ui/components/S3Form.tsx b/ui/components/S3Form.tsx index 4ca83214e5..cbd78ff7ce 100644 --- a/ui/components/S3Form.tsx +++ b/ui/components/S3Form.tsx @@ -1,13 +1,14 @@ 'use client'; import { PeerConfig } from '@/app/dto/PeersDTO'; +import { postgresSetting } from '@/app/peers/create/[peerType]/helpers/pg'; import { - blankPostgresSetting, - postgresSetting, -} from '@/app/peers/create/[peerType]/helpers/pg'; -import { s3Setting } from '@/app/peers/create/[peerType]/helpers/s3'; + blankS3Setting, + s3Setting, +} from '@/app/peers/create/[peerType]/helpers/s3'; import { PostgresConfig } from '@/grpc_generated/peers'; import { Label } from '@/lib/Label'; -import { RowWithTextField } from '@/lib/Layout'; +import { RowWithRadiobutton, RowWithTextField } from '@/lib/Layout'; +import { RadioButton, RadioButtonGroup } from '@/lib/RadioButtonGroup'; import { TextField } from '@/lib/TextField'; import { Tooltip } from '@/lib/Tooltip'; import { useEffect, useState } from 'react'; @@ -18,65 +19,116 @@ interface S3Props { setter: PeerSetter; } const S3ConfigForm = ({ setter }: S3Props) => { - const [metadataDB, setMetadataDB] = - useState(blankPostgresSetting); + const [metadataDB, setMetadataDB] = useState( + blankS3Setting.metadataDb! + ); + const [storageType, setStorageType] = useState<'S3' | 'GCS'>('S3'); + const displayCondition = (label: string) => { + return !( + (label === 'Region' || label === 'Role ARN') && + storageType === 'GCS' + ); + }; useEffect(() => { + const endpoint = storageType === 'S3' ? '' : 'storage.googleapis.com'; setter((prev) => { return { ...prev, metadataDb: metadataDB as PostgresConfig, + endpoint, }; }); - }, [metadataDB]); + + if (storageType === 'GCS') { + setter((prev) => { + return { + ...prev, + region: 'auto', + }; + }); + } + }, [metadataDB, storageType, setter]); + return ( - <> +
+ + setStorageType(val as 'S3' | 'GCS')} + > + S3} + action={} + /> + GCS} + action={} + /> + {s3Setting.map((setting, index) => { - return ( - - {setting.label}{' '} - {!setting.optional && ( - - - - )} - - } - action={ -
- ) => - setting.stateHandler(e.target.value, setter) - } - /> - {setting.tips && ( - - )} -
- } - /> - ); + if (displayCondition(setting.label)) + return ( + + {setting.label}{' '} + {!setting.optional && ( + + + + )} + + } + action={ +
+ ) => + setting.stateHandler(e.target.value, setter) + } + /> + {setting.tips && ( + + )} +
+ } + /> + ); })} - - {postgresSetting.map((setting, index) => { - - {setting.label}{' '} - - - - - } - action={ -
- ) => - setting.stateHandler(e.target.value, setMetadataDB) - } - /> - {setting.tips && ( - - )} -
- } - />; - })} - + } + action={ +
+ ) => + pgSetting.stateHandler(e.target.value, setMetadataDB) + } + defaultValue={ + (metadataDB as PostgresConfig)[ + pgSetting.label.toLowerCase() as keyof PostgresConfig + ] || '' + } + /> + {pgSetting.tips && ( + + )} +
+ } + /> + ) + )} +
); }; From 408d20d021642e062080d0acb1a0558658d86f9b Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 16 Nov 2023 21:10:40 +0530 Subject: [PATCH 3/9] adds more peer validation for s3 --- flow/connectors/external_metadata/store.go | 15 ++++++- flow/connectors/s3/s3.go | 50 ++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 6ec7c07142..25ed960fee 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -2,6 +2,7 @@ package connmetadata import ( "context" + "fmt" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -23,7 +24,7 @@ type PostgresMetadataStore struct { func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig, schemaName string) (*PostgresMetadataStore, error) { connectionString := utils.GetPGConnectionString(pgConfig) - + log.Info("connection string in external metadata: ", connectionString) pool, err := pgxpool.New(ctx, connectionString) if err != nil { log.Errorf("failed to create connection pool: %v", err) @@ -47,6 +48,18 @@ func (p *PostgresMetadataStore) Close() error { return nil } +func (p *PostgresMetadataStore) Ping() error { + if p.pool == nil { + return fmt.Errorf("metadata db ping failed as pool does not exist") + } + pingErr := p.pool.Ping(p.ctx) + if pingErr != nil { + return fmt.Errorf("metadata db ping failed: %w", pingErr) + } + + return nil +} + func (p *PostgresMetadataStore) NeedsSetupMetadata() bool { // check if schema exists rows := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName) diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index fb6ed77d4b..f8f0898585 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -3,11 +3,13 @@ package conns3 import ( "context" "fmt" + "strings" metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" log "github.com/sirupsen/logrus" ) @@ -61,6 +63,12 @@ func NewS3Connector(ctx context.Context, return nil, err } + validErr := ValidCheck(s3Client, config.Url, pgMetadata) + if validErr != nil { + log.Errorf("failed to validate s3 connector: %v", validErr) + return nil, validErr + } + return &S3Connector{ ctx: ctx, url: config.Url, @@ -85,6 +93,48 @@ func (c *S3Connector) Close() error { return nil } +func ValidCheck(s3Client *s3.S3, bucketUrl string, metadataDB *metadataStore.PostgresMetadataStore) error { + _, listErr := s3Client.ListBuckets(nil) + if listErr != nil { + return fmt.Errorf("failed to list buckets: %w", listErr) + } + + reader := strings.NewReader("hello world") + + bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(bucketUrl) + if parseErr != nil { + return fmt.Errorf("failed to parse bucket url: %w", parseErr) + } + + // Write an empty file and then delete it + // to check if we have write permissions + bucketName := aws.String(bucketPrefix.Bucket) + _, putErr := s3Client.PutObject(&s3.PutObjectInput{ + Bucket: bucketName, + Key: aws.String("peerdb_check"), + Body: reader, + }) + if putErr != nil { + return fmt.Errorf("failed to write to bucket: %w", putErr) + } + + _, delErr := s3Client.DeleteObject(&s3.DeleteObjectInput{ + Bucket: bucketName, + Key: aws.String("peerdb_check"), + }) + if delErr != nil { + return fmt.Errorf("failed to delete from bucket: %w", delErr) + } + + // check if we can ping external metadata + err := metadataDB.Ping() + if err != nil { + return fmt.Errorf("failed to ping external metadata: %w", err) + } + + return nil +} + func (c *S3Connector) ConnectionActive() bool { _, err := c.client.ListBuckets(nil) return err == nil From ceb40f60640d9c09eda879f486e22f5555a584cc Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 16 Nov 2023 23:04:52 +0530 Subject: [PATCH 4/9] fix log and style --- flow/connectors/external_metadata/store.go | 1 - flow/connectors/s3/s3.go | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 25ed960fee..8693dabb58 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -24,7 +24,6 @@ type PostgresMetadataStore struct { func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig, schemaName string) (*PostgresMetadataStore, error) { connectionString := utils.GetPGConnectionString(pgConfig) - log.Info("connection string in external metadata: ", connectionString) pool, err := pgxpool.New(ctx, connectionString) if err != nil { log.Errorf("failed to create connection pool: %v", err) diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index f8f0898585..c9fcf5a47a 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -93,15 +93,14 @@ func (c *S3Connector) Close() error { return nil } -func ValidCheck(s3Client *s3.S3, bucketUrl string, metadataDB *metadataStore.PostgresMetadataStore) error { +func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.PostgresMetadataStore) error { _, listErr := s3Client.ListBuckets(nil) if listErr != nil { return fmt.Errorf("failed to list buckets: %w", listErr) } reader := strings.NewReader("hello world") - - bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(bucketUrl) + bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(bucketURL) if parseErr != nil { return fmt.Errorf("failed to parse bucket url: %w", parseErr) } From 15537e45d06485517cedba472dc8664a85e3ec95 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 16 Nov 2023 23:28:28 +0530 Subject: [PATCH 5/9] change required params --- ui/app/peers/create/[peerType]/helpers/s3.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ui/app/peers/create/[peerType]/helpers/s3.ts b/ui/app/peers/create/[peerType]/helpers/s3.ts index c085d3df74..caf93acc22 100644 --- a/ui/app/peers/create/[peerType]/helpers/s3.ts +++ b/ui/app/peers/create/[peerType]/helpers/s3.ts @@ -15,7 +15,6 @@ export const s3Setting: PeerSetting[] = [ label: 'Access Key ID', stateHandler: (value, setter) => setter((curr) => ({ ...curr, accessKeyId: value })), - optional: true, tips: 'The AWS access key ID associated with your account. In case of GCS, this is the HMAC access key ID.', helpfulLink: 'https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html', @@ -27,7 +26,6 @@ export const s3Setting: PeerSetting[] = [ tips: 'The AWS secret access key associated with your account. In case of GCS, this is the HMAC secret.', helpfulLink: 'https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html', - optional: true, }, { label: 'Region', @@ -40,7 +38,7 @@ export const s3Setting: PeerSetting[] = [ stateHandler: (value, setter) => setter((curr) => ({ ...curr, roleArn: value })), type: 'password', - tips: 'You may set this instead of the access key ID and secret.', + tips: 'If set, the role ARN will be used to assume the role before accessing the bucket.', helpfulLink: 'https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_identifiers.html#identifiers-arns', optional: true, From d766599f726a326a975732a06c7d2ef50b3ec1db Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 17 Nov 2023 00:52:10 +0530 Subject: [PATCH 6/9] changes --- flow/connectors/s3/s3.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index c9fcf5a47a..b9afc886d0 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" @@ -14,6 +15,10 @@ import ( log "github.com/sirupsen/logrus" ) +const ( + CHECK_OBJECT_KEY = "peerdb_check" +) + type S3Connector struct { ctx context.Context url string @@ -93,14 +98,15 @@ func (c *S3Connector) Close() error { return nil } -func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.PostgresMetadataStore) error { +func ValidCheck(s3Client *s3.S3, bucketUrl string, metadataDB *metadataStore.PostgresMetadataStore) error { _, listErr := s3Client.ListBuckets(nil) if listErr != nil { return fmt.Errorf("failed to list buckets: %w", listErr) } - reader := strings.NewReader("hello world") - bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(bucketURL) + reader := strings.NewReader(time.Now().Format(time.RFC3339)) + + bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(bucketUrl) if parseErr != nil { return fmt.Errorf("failed to parse bucket url: %w", parseErr) } @@ -110,7 +116,7 @@ func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.Pos bucketName := aws.String(bucketPrefix.Bucket) _, putErr := s3Client.PutObject(&s3.PutObjectInput{ Bucket: bucketName, - Key: aws.String("peerdb_check"), + Key: aws.String(CHECK_OBJECT_KEY), Body: reader, }) if putErr != nil { @@ -119,7 +125,7 @@ func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.Pos _, delErr := s3Client.DeleteObject(&s3.DeleteObjectInput{ Bucket: bucketName, - Key: aws.String("peerdb_check"), + Key: aws.String(CHECK_OBJECT_KEY), }) if delErr != nil { return fmt.Errorf("failed to delete from bucket: %w", delErr) From b3535e1fad3f50eba1f4f297010cc03863b61711 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 17 Nov 2023 01:02:32 +0530 Subject: [PATCH 7/9] case change --- flow/connectors/s3/s3.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index b9afc886d0..8472a3a855 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -16,7 +16,7 @@ import ( ) const ( - CHECK_OBJECT_KEY = "peerdb_check" + _peerDBCheck = "peerdb_check" ) type S3Connector struct { @@ -116,7 +116,7 @@ func ValidCheck(s3Client *s3.S3, bucketUrl string, metadataDB *metadataStore.Pos bucketName := aws.String(bucketPrefix.Bucket) _, putErr := s3Client.PutObject(&s3.PutObjectInput{ Bucket: bucketName, - Key: aws.String(CHECK_OBJECT_KEY), + Key: aws.String(_peerDBCheck), Body: reader, }) if putErr != nil { @@ -125,7 +125,7 @@ func ValidCheck(s3Client *s3.S3, bucketUrl string, metadataDB *metadataStore.Pos _, delErr := s3Client.DeleteObject(&s3.DeleteObjectInput{ Bucket: bucketName, - Key: aws.String(CHECK_OBJECT_KEY), + Key: aws.String(_peerDBCheck), }) if delErr != nil { return fmt.Errorf("failed to delete from bucket: %w", delErr) From c2f406c1965facd6c63fb056973241530bedbc5c Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 17 Nov 2023 02:44:47 +0530 Subject: [PATCH 8/9] bucketUrl case change --- flow/connectors/s3/s3.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 8472a3a855..32113fb456 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -98,7 +98,7 @@ func (c *S3Connector) Close() error { return nil } -func ValidCheck(s3Client *s3.S3, bucketUrl string, metadataDB *metadataStore.PostgresMetadataStore) error { +func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.PostgresMetadataStore) error { _, listErr := s3Client.ListBuckets(nil) if listErr != nil { return fmt.Errorf("failed to list buckets: %w", listErr) @@ -106,7 +106,7 @@ func ValidCheck(s3Client *s3.S3, bucketUrl string, metadataDB *metadataStore.Pos reader := strings.NewReader(time.Now().Format(time.RFC3339)) - bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(bucketUrl) + bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(bucketURL) if parseErr != nil { return fmt.Errorf("failed to parse bucket url: %w", parseErr) } From 4b831cd4e0d18ce2bf066178cc9abb117679fb4c Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 17 Nov 2023 10:59:29 +0530 Subject: [PATCH 9/9] prettier handlers.ts --- ui/app/peers/create/[peerType]/handlers.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/ui/app/peers/create/[peerType]/handlers.ts b/ui/app/peers/create/[peerType]/handlers.ts index 424200dc7d..4b2858ae4e 100644 --- a/ui/app/peers/create/[peerType]/handlers.ts +++ b/ui/app/peers/create/[peerType]/handlers.ts @@ -5,7 +5,13 @@ import { } from '@/app/dto/PeersDTO'; import { S3Config } from '@/grpc_generated/peers'; import { Dispatch, SetStateAction } from 'react'; -import { bqSchema, peerNameSchema, pgSchema, s3Schema, sfSchema } from './schema'; +import { + bqSchema, + peerNameSchema, + pgSchema, + s3Schema, + sfSchema, +} from './schema'; const validateFields = ( type: string,