Skip to content

Commit

Permalink
S3 Peer UI (#668)
Browse files Browse the repository at this point in the history
<img width="1728" alt="Screenshot 2023-11-16 at 1 33 12 PM"
src="https://github.com/PeerDB-io/peerdb/assets/65964360/7b6a0f3e-29e3-451f-8c8e-c16fab9509f1">

- Upon choosing the GCS option, the `Region` and `Role ARN` fields will
not be shown as they have fixed values for GCS
  • Loading branch information
Amogh-Bharadwaj authored Nov 17, 2023
1 parent 0d12f09 commit 4a72fea
Show file tree
Hide file tree
Showing 16 changed files with 455 additions and 13 deletions.
8 changes: 7 additions & 1 deletion flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,13 @@ func (h *FlowRequestHandler) CreatePeer(
}
sqlServerConfig := sqlServerConfigObject.SqlserverConfig
encodedConfig, encodingErr = proto.Marshal(sqlServerConfig)

case protos.DBType_S3:
s3ConfigObject, ok := config.(*protos.Peer_S3Config)
if !ok {
return wrongConfigResponse, nil
}
s3Config := s3ConfigObject.S3Config
encodedConfig, encodingErr = proto.Marshal(s3Config)
default:
return wrongConfigResponse, nil
}
Expand Down
8 changes: 6 additions & 2 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,12 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
return nil, fmt.Errorf("missing sqlserver config for %s peer %s", peer.Type.String(), peer.Name)
}
return connsqlserver.NewSQLServerConnector(ctx, sqlServerConfig)
// case protos.DBType_S3:
// return conns3.NewS3Connector(ctx, config.GetS3Config())
case protos.DBType_S3:
s3Config := peer.GetS3Config()
if s3Config == nil {
return nil, fmt.Errorf("missing s3 config for %s peer %s", peer.Type.String(), peer.Name)
}
return conns3.NewS3Connector(ctx, s3Config)
// case protos.DBType_EVENTHUB:
// return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig())
default:
Expand Down
14 changes: 13 additions & 1 deletion flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package connmetadata

import (
"context"
"fmt"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand All @@ -23,7 +24,6 @@ type PostgresMetadataStore struct {
func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig,
schemaName string) (*PostgresMetadataStore, error) {
connectionString := utils.GetPGConnectionString(pgConfig)

pool, err := pgxpool.New(ctx, connectionString)
if err != nil {
log.Errorf("failed to create connection pool: %v", err)
Expand All @@ -47,6 +47,18 @@ func (p *PostgresMetadataStore) Close() error {
return nil
}

func (p *PostgresMetadataStore) Ping() error {
if p.pool == nil {
return fmt.Errorf("metadata db ping failed as pool does not exist")
}
pingErr := p.pool.Ping(p.ctx)
if pingErr != nil {
return fmt.Errorf("metadata db ping failed: %w", pingErr)
}

return nil
}

func (p *PostgresMetadataStore) NeedsSetupMetadata() bool {
// check if schema exists
rows := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName)
Expand Down
55 changes: 55 additions & 0 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,22 @@ package conns3
import (
"context"
"fmt"
"strings"
"time"

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
log "github.com/sirupsen/logrus"
)

const (
_peerDBCheck = "peerdb_check"
)

type S3Connector struct {
ctx context.Context
url string
Expand Down Expand Up @@ -61,6 +68,12 @@ func NewS3Connector(ctx context.Context,
return nil, err
}

validErr := ValidCheck(s3Client, config.Url, pgMetadata)
if validErr != nil {
log.Errorf("failed to validate s3 connector: %v", validErr)
return nil, validErr
}

return &S3Connector{
ctx: ctx,
url: config.Url,
Expand All @@ -85,6 +98,48 @@ func (c *S3Connector) Close() error {
return nil
}

func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.PostgresMetadataStore) error {
_, listErr := s3Client.ListBuckets(nil)
if listErr != nil {
return fmt.Errorf("failed to list buckets: %w", listErr)
}

reader := strings.NewReader(time.Now().Format(time.RFC3339))

bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(bucketURL)
if parseErr != nil {
return fmt.Errorf("failed to parse bucket url: %w", parseErr)
}

// Write an empty file and then delete it
// to check if we have write permissions
bucketName := aws.String(bucketPrefix.Bucket)
_, putErr := s3Client.PutObject(&s3.PutObjectInput{
Bucket: bucketName,
Key: aws.String(_peerDBCheck),
Body: reader,
})
if putErr != nil {
return fmt.Errorf("failed to write to bucket: %w", putErr)
}

_, delErr := s3Client.DeleteObject(&s3.DeleteObjectInput{
Bucket: bucketName,
Key: aws.String(_peerDBCheck),
})
if delErr != nil {
return fmt.Errorf("failed to delete from bucket: %w", delErr)
}

// check if we can ping external metadata
err := metadataDB.Ping()
if err != nil {
return fmt.Errorf("failed to ping external metadata: %w", err)
}

return nil
}

func (c *S3Connector) ConnectionActive() bool {
_, err := c.client.ListBuckets(nil)
return err == nil
Expand Down
6 changes: 6 additions & 0 deletions ui/app/api/peers/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ const constructPeer = (
type: DBType.BIGQUERY,
bigqueryConfig: config as BigqueryConfig,
};
case 'S3':
return {
name,
type: DBType.S3,
s3Config: config as S3Config,
};
default:
return;
}
Expand Down
7 changes: 6 additions & 1 deletion ui/app/dto/PeersDTO.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
BigqueryConfig,
PostgresConfig,
S3Config,
SnowflakeConfig,
} from '@/grpc_generated/peers';

Expand Down Expand Up @@ -31,7 +32,11 @@ export type UDropPeerResponse = {
errorMessage: string;
};

export type PeerConfig = PostgresConfig | SnowflakeConfig | BigqueryConfig;
export type PeerConfig =
| PostgresConfig
| SnowflakeConfig
| BigqueryConfig
| S3Config;
export type CatalogPeer = {
id: number;
name: string;
Expand Down
2 changes: 1 addition & 1 deletion ui/app/mirrors/create/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export const blankQRepSetting = {
waitBetweenBatchesSeconds: 30,
writeMode: undefined,
stagingPath: '',
numRowsPerPartition: 0,
numRowsPerPartition: 100000,
setupWatermarkTableOnDestination: false,
dstTableFullResync: false,
};
1 change: 0 additions & 1 deletion ui/app/mirrors/create/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ export default function CreateMirrors() {
const [config, setConfig] = useState<CDCConfig | QRepConfig>(blankCDCSetting);
const [peers, setPeers] = useState<Peer[]>([]);
const [rows, setRows] = useState<TableMapRow[]>([]);
const [validSource, setValidSource] = useState<boolean>(false);
const [sourceSchema, setSourceSchema] = useState('public');
const [qrepQuery, setQrepQuery] =
useState<string>(`-- Here's a sample template:
Expand Down
28 changes: 27 additions & 1 deletion ui/app/peers/create/[peerType]/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,15 @@ import {
UCreatePeerResponse,
UValidatePeerResponse,
} from '@/app/dto/PeersDTO';
import { S3Config } from '@/grpc_generated/peers';
import { Dispatch, SetStateAction } from 'react';
import { bqSchema, peerNameSchema, pgSchema, sfSchema } from './schema';
import {
bqSchema,
peerNameSchema,
pgSchema,
s3Schema,
sfSchema,
} from './schema';

const validateFields = (
type: string,
Expand All @@ -19,6 +26,14 @@ const validateFields = (
return false;
}

if (type === 'S3') {
const s3Valid = S3Validation(config as S3Config);
if (s3Valid.length > 0) {
setMessage({ ok: false, msg: s3Valid });
return false;
}
}

let validationErr: string | undefined;
switch (type) {
case 'POSTGRES':
Expand All @@ -33,6 +48,10 @@ const validateFields = (
const bqConfig = bqSchema.safeParse(config);
if (!bqConfig.success) validationErr = bqConfig.error.issues[0].message;
break;
case 'S3':
const s3Config = s3Schema.safeParse(config);
if (!s3Config.success) validationErr = s3Config.error.issues[0].message;
break;
default:
validationErr = 'Unsupported peer type ' + type;
}
Expand Down Expand Up @@ -72,6 +91,13 @@ export const handleValidate = async (
setLoading(false);
};

const S3Validation = (config: S3Config): string => {
if (!config.secretAccessKey && !config.accessKeyId && !config.roleArn) {
return 'Either both access key and secret or role ARN is required';
}
return '';
};

// API call to create peer
export const handleCreate = async (
type: string,
Expand Down
3 changes: 3 additions & 0 deletions ui/app/peers/create/[peerType]/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { PeerConfig } from '@/app/dto/PeersDTO';
import { PeerSetter } from '@/components/ConfigForm';
import { blankBigquerySetting } from './bq';
import { blankPostgresSetting } from './pg';
import { blankS3Setting } from './s3';
import { blankSnowflakeSetting } from './sf';

export interface PeerSetting {
Expand All @@ -22,6 +23,8 @@ export const getBlankSetting = (dbType: string): PeerConfig => {
return blankSnowflakeSetting;
case 'BIGQUERY':
return blankBigquerySetting;
case 'S3':
return blankS3Setting;
default:
return blankPostgresSetting;
}
Expand Down
63 changes: 63 additions & 0 deletions ui/app/peers/create/[peerType]/helpers/s3.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { S3Config } from '@/grpc_generated/peers';
import { PeerSetting } from './common';

export const s3Setting: PeerSetting[] = [
{
label: 'Bucket URL',
stateHandler: (value, setter) =>
setter((curr) => ({ ...curr, url: value })),
tips: 'The URL of your existing S3/GCS bucket along with a prefix of your choice. It begins with s3://',
helpfulLink:
'https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-bucket-intro.html#accessing-a-bucket-using-S3-format',
default: 's3://<bucket_name>/<prefix_name>',
},
{
label: 'Access Key ID',
stateHandler: (value, setter) =>
setter((curr) => ({ ...curr, accessKeyId: value })),
tips: 'The AWS access key ID associated with your account. In case of GCS, this is the HMAC access key ID.',
helpfulLink:
'https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html',
},
{
label: 'Secret Access Key',
stateHandler: (value, setter) =>
setter((curr) => ({ ...curr, secretAccessKey: value })),
tips: 'The AWS secret access key associated with your account. In case of GCS, this is the HMAC secret.',
helpfulLink:
'https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html',
},
{
label: 'Region',
stateHandler: (value, setter) =>
setter((curr) => ({ ...curr, region: value })),
tips: 'The region where your bucket is located. For example, us-east-1. In case of GCS, this will be set to auto, which detects where your bucket it.',
},
{
label: 'Role ARN',
stateHandler: (value, setter) =>
setter((curr) => ({ ...curr, roleArn: value })),
type: 'password',
tips: 'If set, the role ARN will be used to assume the role before accessing the bucket.',
helpfulLink:
'https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_identifiers.html#identifiers-arns',
optional: true,
},
];

export const blankS3Setting: S3Config = {
url: 's3://<bucket_name>/<prefix_name>',
accessKeyId: undefined,
secretAccessKey: undefined,
roleArn: undefined,
region: undefined,
endpoint: '',
metadataDb: {
host: '',
port: 5432,
user: 'postgres',
password: '',
database: 'postgres',
transactionSnapshot: '',
},
};
17 changes: 14 additions & 3 deletions ui/app/peers/create/[peerType]/page.tsx
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
'use client';
import { PeerConfig } from '@/app/dto/PeersDTO';
import BQConfig from '@/components/BigqueryConfig';
import S3ConfigForm from '@/components/S3Form';
import { Button } from '@/lib/Button';
import { ButtonGroup } from '@/lib/ButtonGroup';
import { Label } from '@/lib/Label';
import { LayoutMain, RowWithTextField } from '@/lib/Layout';
import { RowWithTextField } from '@/lib/Layout';
import { Panel } from '@/lib/Panel';
import { TextField } from '@/lib/TextField';
import { Tooltip } from '@/lib/Tooltip';
Expand Down Expand Up @@ -45,6 +46,8 @@ export default function CreateConfig({
return configForm(snowflakeSetting);
case 'BIGQUERY':
return <BQConfig setter={setConfig} />;
case 'S3':
return <S3ConfigForm setter={setConfig} />;
default:
return <></>;
}
Expand All @@ -55,7 +58,15 @@ export default function CreateConfig({
};

return (
<LayoutMain alignSelf='center' justifySelf='center' width='xxLarge'>
<div
style={{
display: 'flex',
flexDirection: 'column',
alignSelf: 'center',
justifySelf: 'center',
width: '45%',
}}
>
<Panel>
<Label variant='title3'>
Setup a new{' '}
Expand Down Expand Up @@ -144,6 +155,6 @@ export default function CreateConfig({
)}
</Panel>
</Panel>
</LayoutMain>
</div>
);
}
Loading

0 comments on commit 4a72fea

Please sign in to comment.