diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 416800f02f..13ce8115b1 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -559,7 +559,13 @@ func (h *FlowRequestHandler) CreatePeer( } sqlServerConfig := sqlServerConfigObject.SqlserverConfig encodedConfig, encodingErr = proto.Marshal(sqlServerConfig) - + case protos.DBType_S3: + s3ConfigObject, ok := config.(*protos.Peer_S3Config) + if !ok { + return wrongConfigResponse, nil + } + s3Config := s3ConfigObject.S3Config + encodedConfig, encodingErr = proto.Marshal(s3Config) default: return wrongConfigResponse, nil } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 0612514dd7..4c393ca48d 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -230,8 +230,12 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) { return nil, fmt.Errorf("missing sqlserver config for %s peer %s", peer.Type.String(), peer.Name) } return connsqlserver.NewSQLServerConnector(ctx, sqlServerConfig) - // case protos.DBType_S3: - // return conns3.NewS3Connector(ctx, config.GetS3Config()) + case protos.DBType_S3: + s3Config := peer.GetS3Config() + if s3Config == nil { + return nil, fmt.Errorf("missing s3 config for %s peer %s", peer.Type.String(), peer.Name) + } + return conns3.NewS3Connector(ctx, s3Config) // case protos.DBType_EVENTHUB: // return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig()) default: diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 6ec7c07142..8693dabb58 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -2,6 +2,7 @@ package connmetadata import ( "context" + "fmt" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -23,7 +24,6 @@ type PostgresMetadataStore struct { func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig, schemaName string) (*PostgresMetadataStore, error) { connectionString := utils.GetPGConnectionString(pgConfig) - pool, err := pgxpool.New(ctx, connectionString) if err != nil { log.Errorf("failed to create connection pool: %v", err) @@ -47,6 +47,18 @@ func (p *PostgresMetadataStore) Close() error { return nil } +func (p *PostgresMetadataStore) Ping() error { + if p.pool == nil { + return fmt.Errorf("metadata db ping failed as pool does not exist") + } + pingErr := p.pool.Ping(p.ctx) + if pingErr != nil { + return fmt.Errorf("metadata db ping failed: %w", pingErr) + } + + return nil +} + func (p *PostgresMetadataStore) NeedsSetupMetadata() bool { // check if schema exists rows := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName) diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index fb6ed77d4b..32113fb456 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -3,15 +3,22 @@ package conns3 import ( "context" "fmt" + "strings" + "time" metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" log "github.com/sirupsen/logrus" ) +const ( + _peerDBCheck = "peerdb_check" +) + type S3Connector struct { ctx context.Context url string @@ -61,6 +68,12 @@ func NewS3Connector(ctx context.Context, return nil, err } + validErr := ValidCheck(s3Client, config.Url, pgMetadata) + if validErr != nil { + log.Errorf("failed to validate s3 connector: %v", validErr) + return nil, validErr + } + return &S3Connector{ ctx: ctx, url: config.Url, @@ -85,6 +98,48 @@ func (c *S3Connector) Close() error { return nil } +func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.PostgresMetadataStore) error { + _, listErr := s3Client.ListBuckets(nil) + if listErr != nil { + return fmt.Errorf("failed to list buckets: %w", listErr) + } + + reader := strings.NewReader(time.Now().Format(time.RFC3339)) + + bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(bucketURL) + if parseErr != nil { + return fmt.Errorf("failed to parse bucket url: %w", parseErr) + } + + // Write an empty file and then delete it + // to check if we have write permissions + bucketName := aws.String(bucketPrefix.Bucket) + _, putErr := s3Client.PutObject(&s3.PutObjectInput{ + Bucket: bucketName, + Key: aws.String(_peerDBCheck), + Body: reader, + }) + if putErr != nil { + return fmt.Errorf("failed to write to bucket: %w", putErr) + } + + _, delErr := s3Client.DeleteObject(&s3.DeleteObjectInput{ + Bucket: bucketName, + Key: aws.String(_peerDBCheck), + }) + if delErr != nil { + return fmt.Errorf("failed to delete from bucket: %w", delErr) + } + + // check if we can ping external metadata + err := metadataDB.Ping() + if err != nil { + return fmt.Errorf("failed to ping external metadata: %w", err) + } + + return nil +} + func (c *S3Connector) ConnectionActive() bool { _, err := c.client.ListBuckets(nil) return err == nil diff --git a/ui/app/api/peers/route.ts b/ui/app/api/peers/route.ts index ff790da762..ca74ba0127 100644 --- a/ui/app/api/peers/route.ts +++ b/ui/app/api/peers/route.ts @@ -52,6 +52,12 @@ const constructPeer = ( type: DBType.BIGQUERY, bigqueryConfig: config as BigqueryConfig, }; + case 'S3': + return { + name, + type: DBType.S3, + s3Config: config as S3Config, + }; default: return; } diff --git a/ui/app/dto/PeersDTO.ts b/ui/app/dto/PeersDTO.ts index 9954aceaa7..0e6dfef818 100644 --- a/ui/app/dto/PeersDTO.ts +++ b/ui/app/dto/PeersDTO.ts @@ -1,6 +1,7 @@ import { BigqueryConfig, PostgresConfig, + S3Config, SnowflakeConfig, } from '@/grpc_generated/peers'; @@ -31,7 +32,11 @@ export type UDropPeerResponse = { errorMessage: string; }; -export type PeerConfig = PostgresConfig | SnowflakeConfig | BigqueryConfig; +export type PeerConfig = + | PostgresConfig + | SnowflakeConfig + | BigqueryConfig + | S3Config; export type CatalogPeer = { id: number; name: string; diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index 86a8e04430..02f548eeba 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -59,7 +59,7 @@ export const blankQRepSetting = { waitBetweenBatchesSeconds: 30, writeMode: undefined, stagingPath: '', - numRowsPerPartition: 0, + numRowsPerPartition: 100000, setupWatermarkTableOnDestination: false, dstTableFullResync: false, }; diff --git a/ui/app/mirrors/create/page.tsx b/ui/app/mirrors/create/page.tsx index d2027f1dd8..b443cdd760 100644 --- a/ui/app/mirrors/create/page.tsx +++ b/ui/app/mirrors/create/page.tsx @@ -63,7 +63,6 @@ export default function CreateMirrors() { const [config, setConfig] = useState(blankCDCSetting); const [peers, setPeers] = useState([]); const [rows, setRows] = useState([]); - const [validSource, setValidSource] = useState(false); const [sourceSchema, setSourceSchema] = useState('public'); const [qrepQuery, setQrepQuery] = useState(`-- Here's a sample template: diff --git a/ui/app/peers/create/[peerType]/handlers.ts b/ui/app/peers/create/[peerType]/handlers.ts index 21c0a63b64..4b2858ae4e 100644 --- a/ui/app/peers/create/[peerType]/handlers.ts +++ b/ui/app/peers/create/[peerType]/handlers.ts @@ -3,8 +3,15 @@ import { UCreatePeerResponse, UValidatePeerResponse, } from '@/app/dto/PeersDTO'; +import { S3Config } from '@/grpc_generated/peers'; import { Dispatch, SetStateAction } from 'react'; -import { bqSchema, peerNameSchema, pgSchema, sfSchema } from './schema'; +import { + bqSchema, + peerNameSchema, + pgSchema, + s3Schema, + sfSchema, +} from './schema'; const validateFields = ( type: string, @@ -19,6 +26,14 @@ const validateFields = ( return false; } + if (type === 'S3') { + const s3Valid = S3Validation(config as S3Config); + if (s3Valid.length > 0) { + setMessage({ ok: false, msg: s3Valid }); + return false; + } + } + let validationErr: string | undefined; switch (type) { case 'POSTGRES': @@ -33,6 +48,10 @@ const validateFields = ( const bqConfig = bqSchema.safeParse(config); if (!bqConfig.success) validationErr = bqConfig.error.issues[0].message; break; + case 'S3': + const s3Config = s3Schema.safeParse(config); + if (!s3Config.success) validationErr = s3Config.error.issues[0].message; + break; default: validationErr = 'Unsupported peer type ' + type; } @@ -72,6 +91,13 @@ export const handleValidate = async ( setLoading(false); }; +const S3Validation = (config: S3Config): string => { + if (!config.secretAccessKey && !config.accessKeyId && !config.roleArn) { + return 'Either both access key and secret or role ARN is required'; + } + return ''; +}; + // API call to create peer export const handleCreate = async ( type: string, diff --git a/ui/app/peers/create/[peerType]/helpers/common.ts b/ui/app/peers/create/[peerType]/helpers/common.ts index c08b0035ee..62b2bb26c2 100644 --- a/ui/app/peers/create/[peerType]/helpers/common.ts +++ b/ui/app/peers/create/[peerType]/helpers/common.ts @@ -2,6 +2,7 @@ import { PeerConfig } from '@/app/dto/PeersDTO'; import { PeerSetter } from '@/components/ConfigForm'; import { blankBigquerySetting } from './bq'; import { blankPostgresSetting } from './pg'; +import { blankS3Setting } from './s3'; import { blankSnowflakeSetting } from './sf'; export interface PeerSetting { @@ -22,6 +23,8 @@ export const getBlankSetting = (dbType: string): PeerConfig => { return blankSnowflakeSetting; case 'BIGQUERY': return blankBigquerySetting; + case 'S3': + return blankS3Setting; default: return blankPostgresSetting; } diff --git a/ui/app/peers/create/[peerType]/helpers/s3.ts b/ui/app/peers/create/[peerType]/helpers/s3.ts new file mode 100644 index 0000000000..caf93acc22 --- /dev/null +++ b/ui/app/peers/create/[peerType]/helpers/s3.ts @@ -0,0 +1,63 @@ +import { S3Config } from '@/grpc_generated/peers'; +import { PeerSetting } from './common'; + +export const s3Setting: PeerSetting[] = [ + { + label: 'Bucket URL', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, url: value })), + tips: 'The URL of your existing S3/GCS bucket along with a prefix of your choice. It begins with s3://', + helpfulLink: + 'https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-bucket-intro.html#accessing-a-bucket-using-S3-format', + default: 's3:///', + }, + { + label: 'Access Key ID', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, accessKeyId: value })), + tips: 'The AWS access key ID associated with your account. In case of GCS, this is the HMAC access key ID.', + helpfulLink: + 'https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html', + }, + { + label: 'Secret Access Key', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, secretAccessKey: value })), + tips: 'The AWS secret access key associated with your account. In case of GCS, this is the HMAC secret.', + helpfulLink: + 'https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html', + }, + { + label: 'Region', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, region: value })), + tips: 'The region where your bucket is located. For example, us-east-1. In case of GCS, this will be set to auto, which detects where your bucket it.', + }, + { + label: 'Role ARN', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, roleArn: value })), + type: 'password', + tips: 'If set, the role ARN will be used to assume the role before accessing the bucket.', + helpfulLink: + 'https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_identifiers.html#identifiers-arns', + optional: true, + }, +]; + +export const blankS3Setting: S3Config = { + url: 's3:///', + accessKeyId: undefined, + secretAccessKey: undefined, + roleArn: undefined, + region: undefined, + endpoint: '', + metadataDb: { + host: '', + port: 5432, + user: 'postgres', + password: '', + database: 'postgres', + transactionSnapshot: '', + }, +}; diff --git a/ui/app/peers/create/[peerType]/page.tsx b/ui/app/peers/create/[peerType]/page.tsx index 6bceac8b18..cbefbfad8b 100644 --- a/ui/app/peers/create/[peerType]/page.tsx +++ b/ui/app/peers/create/[peerType]/page.tsx @@ -1,10 +1,11 @@ 'use client'; import { PeerConfig } from '@/app/dto/PeersDTO'; import BQConfig from '@/components/BigqueryConfig'; +import S3ConfigForm from '@/components/S3Form'; import { Button } from '@/lib/Button'; import { ButtonGroup } from '@/lib/ButtonGroup'; import { Label } from '@/lib/Label'; -import { LayoutMain, RowWithTextField } from '@/lib/Layout'; +import { RowWithTextField } from '@/lib/Layout'; import { Panel } from '@/lib/Panel'; import { TextField } from '@/lib/TextField'; import { Tooltip } from '@/lib/Tooltip'; @@ -45,6 +46,8 @@ export default function CreateConfig({ return configForm(snowflakeSetting); case 'BIGQUERY': return ; + case 'S3': + return ; default: return <>; } @@ -55,7 +58,15 @@ export default function CreateConfig({ }; return ( - +
- +
); } diff --git a/ui/app/peers/create/[peerType]/schema.ts b/ui/app/peers/create/[peerType]/schema.ts index 3f92aef257..ecbe5b63c3 100644 --- a/ui/app/peers/create/[peerType]/schema.ts +++ b/ui/app/peers/create/[peerType]/schema.ts @@ -196,3 +196,43 @@ export const bqSchema = z.object({ 'Dataset ID must only contain numbers, letters, and underscores' ), }); + +export const s3Schema = z.object({ + url: z + .string({ + invalid_type_error: 'URL must be a string', + required_error: 'URL is required', + }) + .min(1, { message: 'URL must be non-empty' }) + .refine((url) => url.startsWith('s3://'), { + message: 'URL must start with s3://', + }), + accessKeyId: z + .string({ + invalid_type_error: 'Access Key ID must be a string', + required_error: 'Access Key ID is required', + }) + .min(1, { message: 'Access Key ID must be non-empty' }), + secretAccessKey: z + .string({ + invalid_type_error: 'Secret Access Key must be a string', + required_error: 'Secret Access Key is required', + }) + .min(1, { message: 'Secret Access Key must be non-empty' }), + roleArn: z + .string({ + invalid_type_error: 'Role ARN must be a string', + }) + .optional(), + region: z + .string({ + invalid_type_error: 'Region must be a string', + }) + .optional(), + endpoint: z + .string({ + invalid_type_error: 'Endpoint must be a string', + }) + .optional(), + metadataDb: pgSchema, +}); diff --git a/ui/components/PeerComponent.tsx b/ui/components/PeerComponent.tsx index ab956992e2..7378e91635 100644 --- a/ui/components/PeerComponent.tsx +++ b/ui/components/PeerComponent.tsx @@ -15,6 +15,9 @@ export const DBTypeToImageMapping = (peerType: DBType | string) => { case DBType.BIGQUERY: case 'BIGQUERY': return '/svgs/bq.svg'; + case DBType.S3: + case 'S3': + return '/svgs/aws.svg'; case DBType.EVENTHUB_GROUP: case DBType.EVENTHUB: return '/svgs/ms.svg'; diff --git a/ui/components/S3Form.tsx b/ui/components/S3Form.tsx new file mode 100644 index 0000000000..cbd78ff7ce --- /dev/null +++ b/ui/components/S3Form.tsx @@ -0,0 +1,206 @@ +'use client'; +import { PeerConfig } from '@/app/dto/PeersDTO'; +import { postgresSetting } from '@/app/peers/create/[peerType]/helpers/pg'; +import { + blankS3Setting, + s3Setting, +} from '@/app/peers/create/[peerType]/helpers/s3'; +import { PostgresConfig } from '@/grpc_generated/peers'; +import { Label } from '@/lib/Label'; +import { RowWithRadiobutton, RowWithTextField } from '@/lib/Layout'; +import { RadioButton, RadioButtonGroup } from '@/lib/RadioButtonGroup'; +import { TextField } from '@/lib/TextField'; +import { Tooltip } from '@/lib/Tooltip'; +import { useEffect, useState } from 'react'; +import { PeerSetter } from './ConfigForm'; +import { InfoPopover } from './InfoPopover'; + +interface S3Props { + setter: PeerSetter; +} +const S3ConfigForm = ({ setter }: S3Props) => { + const [metadataDB, setMetadataDB] = useState( + blankS3Setting.metadataDb! + ); + const [storageType, setStorageType] = useState<'S3' | 'GCS'>('S3'); + const displayCondition = (label: string) => { + return !( + (label === 'Region' || label === 'Role ARN') && + storageType === 'GCS' + ); + }; + useEffect(() => { + const endpoint = storageType === 'S3' ? '' : 'storage.googleapis.com'; + setter((prev) => { + return { + ...prev, + metadataDb: metadataDB as PostgresConfig, + endpoint, + }; + }); + + if (storageType === 'GCS') { + setter((prev) => { + return { + ...prev, + region: 'auto', + }; + }); + } + }, [metadataDB, storageType, setter]); + + return ( +
+ + setStorageType(val as 'S3' | 'GCS')} + > + S3} + action={} + /> + GCS} + action={} + /> + + {s3Setting.map((setting, index) => { + if (displayCondition(setting.label)) + return ( + + {setting.label}{' '} + {!setting.optional && ( + + + + )} + + } + action={ +
+ ) => + setting.stateHandler(e.target.value, setter) + } + /> + {setting.tips && ( + + )} +
+ } + /> + ); + })} + + + + {postgresSetting.map( + (pgSetting, index) => + pgSetting.label !== 'Transaction Snapshot' && ( + + {pgSetting.label}{' '} + + + + + } + action={ +
+ ) => + pgSetting.stateHandler(e.target.value, setMetadataDB) + } + defaultValue={ + (metadataDB as PostgresConfig)[ + pgSetting.label.toLowerCase() as keyof PostgresConfig + ] || '' + } + /> + {pgSetting.tips && ( + + )} +
+ } + /> + ) + )} +
+ ); +}; + +export default S3ConfigForm; diff --git a/ui/components/SelectSource.tsx b/ui/components/SelectSource.tsx index 695174a2fe..bc911f8fa5 100644 --- a/ui/components/SelectSource.tsx +++ b/ui/components/SelectSource.tsx @@ -28,7 +28,10 @@ export default function SelectSource({ .filter( (value): value is string => typeof value === 'string' && - (value === 'POSTGRES' || value === 'SNOWFLAKE' || value === 'BIGQUERY') + (value === 'POSTGRES' || + value === 'SNOWFLAKE' || + value === 'BIGQUERY' || + value === 'S3') ) .map((value) => ({ label: value, value })); @@ -37,7 +40,7 @@ export default function SelectSource({ placeholder='Select a source' options={dbTypes} defaultValue={dbTypes.find((opt) => opt.value === peerType)} - onChange={(val, action) => val && setPeerType(val.value)} + onChange={(val, _) => val && setPeerType(val.value)} formatOptionLabel={SourceLabel} /> );