Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 Peer UI #668

Merged
merged 9 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,13 @@ func (h *FlowRequestHandler) CreatePeer(
}
sqlServerConfig := sqlServerConfigObject.SqlserverConfig
encodedConfig, encodingErr = proto.Marshal(sqlServerConfig)

case protos.DBType_S3:
s3ConfigObject, ok := config.(*protos.Peer_S3Config)
if !ok {
return wrongConfigResponse, nil
}
s3Config := s3ConfigObject.S3Config
encodedConfig, encodingErr = proto.Marshal(s3Config)
default:
return wrongConfigResponse, nil
}
Expand Down
8 changes: 6 additions & 2 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,12 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
return nil, fmt.Errorf("missing sqlserver config for %s peer %s", peer.Type.String(), peer.Name)
}
return connsqlserver.NewSQLServerConnector(ctx, sqlServerConfig)
// case protos.DBType_S3:
// return conns3.NewS3Connector(ctx, config.GetS3Config())
case protos.DBType_S3:
s3Config := peer.GetS3Config()
if s3Config == nil {
return nil, fmt.Errorf("missing s3 config for %s peer %s", peer.Type.String(), peer.Name)
}
return conns3.NewS3Connector(ctx, s3Config)
// case protos.DBType_EVENTHUB:
// return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig())
default:
Expand Down
14 changes: 13 additions & 1 deletion flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package connmetadata

import (
"context"
"fmt"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand All @@ -23,7 +24,6 @@ type PostgresMetadataStore struct {
func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig,
schemaName string) (*PostgresMetadataStore, error) {
connectionString := utils.GetPGConnectionString(pgConfig)

pool, err := pgxpool.New(ctx, connectionString)
if err != nil {
log.Errorf("failed to create connection pool: %v", err)
Expand All @@ -47,6 +47,18 @@ func (p *PostgresMetadataStore) Close() error {
return nil
}

func (p *PostgresMetadataStore) Ping() error {
if p.pool == nil {
return fmt.Errorf("metadata db ping failed as pool does not exist")
}
pingErr := p.pool.Ping(p.ctx)
if pingErr != nil {
return fmt.Errorf("metadata db ping failed: %w", pingErr)
}

return nil
}

func (p *PostgresMetadataStore) NeedsSetupMetadata() bool {
// check if schema exists
rows := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName)
Expand Down
55 changes: 55 additions & 0 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,22 @@ package conns3
import (
"context"
"fmt"
"strings"
"time"

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
log "github.com/sirupsen/logrus"
)

const (
_peerDBCheck = "peerdb_check"
)

type S3Connector struct {
ctx context.Context
url string
Expand Down Expand Up @@ -61,6 +68,12 @@ func NewS3Connector(ctx context.Context,
return nil, err
}

validErr := ValidCheck(s3Client, config.Url, pgMetadata)
if validErr != nil {
log.Errorf("failed to validate s3 connector: %v", validErr)
return nil, validErr
}

return &S3Connector{
ctx: ctx,
url: config.Url,
Expand All @@ -85,6 +98,48 @@ func (c *S3Connector) Close() error {
return nil
}

func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.PostgresMetadataStore) error {
_, listErr := s3Client.ListBuckets(nil)
if listErr != nil {
return fmt.Errorf("failed to list buckets: %w", listErr)
}

reader := strings.NewReader(time.Now().Format(time.RFC3339))

bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(bucketURL)
if parseErr != nil {
return fmt.Errorf("failed to parse bucket url: %w", parseErr)
}

// Write an empty file and then delete it
// to check if we have write permissions
bucketName := aws.String(bucketPrefix.Bucket)
_, putErr := s3Client.PutObject(&s3.PutObjectInput{
Bucket: bucketName,
Key: aws.String(_peerDBCheck),
Body: reader,
})
if putErr != nil {
return fmt.Errorf("failed to write to bucket: %w", putErr)
}

_, delErr := s3Client.DeleteObject(&s3.DeleteObjectInput{
Bucket: bucketName,
Key: aws.String(_peerDBCheck),
})
if delErr != nil {
return fmt.Errorf("failed to delete from bucket: %w", delErr)
}

// check if we can ping external metadata
err := metadataDB.Ping()
if err != nil {
return fmt.Errorf("failed to ping external metadata: %w", err)
}

return nil
}

func (c *S3Connector) ConnectionActive() bool {
_, err := c.client.ListBuckets(nil)
return err == nil
Expand Down
6 changes: 6 additions & 0 deletions ui/app/api/peers/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ const constructPeer = (
type: DBType.BIGQUERY,
bigqueryConfig: config as BigqueryConfig,
};
case 'S3':
return {
name,
type: DBType.S3,
s3Config: config as S3Config,
};
default:
return;
}
Expand Down
7 changes: 6 additions & 1 deletion ui/app/dto/PeersDTO.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
BigqueryConfig,
PostgresConfig,
S3Config,
SnowflakeConfig,
} from '@/grpc_generated/peers';

Expand Down Expand Up @@ -31,7 +32,11 @@ export type UDropPeerResponse = {
errorMessage: string;
};

export type PeerConfig = PostgresConfig | SnowflakeConfig | BigqueryConfig;
export type PeerConfig =
| PostgresConfig
| SnowflakeConfig
| BigqueryConfig
| S3Config;
export type CatalogPeer = {
id: number;
name: string;
Expand Down
2 changes: 1 addition & 1 deletion ui/app/mirrors/create/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export const blankQRepSetting = {
waitBetweenBatchesSeconds: 30,
writeMode: undefined,
stagingPath: '',
numRowsPerPartition: 0,
numRowsPerPartition: 100000,
setupWatermarkTableOnDestination: false,
dstTableFullResync: false,
};
1 change: 0 additions & 1 deletion ui/app/mirrors/create/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ export default function CreateMirrors() {
const [config, setConfig] = useState<CDCConfig | QRepConfig>(blankCDCSetting);
const [peers, setPeers] = useState<Peer[]>([]);
const [rows, setRows] = useState<TableMapRow[]>([]);
const [validSource, setValidSource] = useState<boolean>(false);
const [sourceSchema, setSourceSchema] = useState('public');
const [qrepQuery, setQrepQuery] =
useState<string>(`-- Here's a sample template:
Expand Down
28 changes: 27 additions & 1 deletion ui/app/peers/create/[peerType]/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,15 @@ import {
UCreatePeerResponse,
UValidatePeerResponse,
} from '@/app/dto/PeersDTO';
import { S3Config } from '@/grpc_generated/peers';
import { Dispatch, SetStateAction } from 'react';
import { bqSchema, peerNameSchema, pgSchema, sfSchema } from './schema';
import {
bqSchema,
peerNameSchema,
pgSchema,
s3Schema,
sfSchema,
} from './schema';

const validateFields = (
type: string,
Expand All @@ -19,6 +26,14 @@ const validateFields = (
return false;
}

if (type === 'S3') {
const s3Valid = S3Validation(config as S3Config);
if (s3Valid.length > 0) {
setMessage({ ok: false, msg: s3Valid });
return false;
}
}

let validationErr: string | undefined;
switch (type) {
case 'POSTGRES':
Expand All @@ -33,6 +48,10 @@ const validateFields = (
const bqConfig = bqSchema.safeParse(config);
if (!bqConfig.success) validationErr = bqConfig.error.issues[0].message;
break;
case 'S3':
const s3Config = s3Schema.safeParse(config);
if (!s3Config.success) validationErr = s3Config.error.issues[0].message;
break;
default:
validationErr = 'Unsupported peer type ' + type;
}
Expand Down Expand Up @@ -72,6 +91,13 @@ export const handleValidate = async (
setLoading(false);
};

const S3Validation = (config: S3Config): string => {
if (!config.secretAccessKey && !config.accessKeyId && !config.roleArn) {
return 'Either both access key and secret or role ARN is required';
}
return '';
};

// API call to create peer
export const handleCreate = async (
type: string,
Expand Down
3 changes: 3 additions & 0 deletions ui/app/peers/create/[peerType]/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { PeerConfig } from '@/app/dto/PeersDTO';
import { PeerSetter } from '@/components/ConfigForm';
import { blankBigquerySetting } from './bq';
import { blankPostgresSetting } from './pg';
import { blankS3Setting } from './s3';
import { blankSnowflakeSetting } from './sf';

export interface PeerSetting {
Expand All @@ -22,6 +23,8 @@ export const getBlankSetting = (dbType: string): PeerConfig => {
return blankSnowflakeSetting;
case 'BIGQUERY':
return blankBigquerySetting;
case 'S3':
return blankS3Setting;
default:
return blankPostgresSetting;
}
Expand Down
63 changes: 63 additions & 0 deletions ui/app/peers/create/[peerType]/helpers/s3.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { S3Config } from '@/grpc_generated/peers';
import { PeerSetting } from './common';

export const s3Setting: PeerSetting[] = [
{
label: 'Bucket URL',
stateHandler: (value, setter) =>
setter((curr) => ({ ...curr, url: value })),
tips: 'The URL of your existing S3/GCS bucket along with a prefix of your choice. It begins with s3://',
helpfulLink:
'https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-bucket-intro.html#accessing-a-bucket-using-S3-format',
default: 's3://<bucket_name>/<prefix_name>',
},
{
label: 'Access Key ID',
stateHandler: (value, setter) =>
setter((curr) => ({ ...curr, accessKeyId: value })),
tips: 'The AWS access key ID associated with your account. In case of GCS, this is the HMAC access key ID.',
helpfulLink:
'https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html',
},
{
label: 'Secret Access Key',
stateHandler: (value, setter) =>
setter((curr) => ({ ...curr, secretAccessKey: value })),
tips: 'The AWS secret access key associated with your account. In case of GCS, this is the HMAC secret.',
helpfulLink:
'https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html',
},
{
label: 'Region',
stateHandler: (value, setter) =>
setter((curr) => ({ ...curr, region: value })),
tips: 'The region where your bucket is located. For example, us-east-1. In case of GCS, this will be set to auto, which detects where your bucket it.',
},
{
label: 'Role ARN',
stateHandler: (value, setter) =>
setter((curr) => ({ ...curr, roleArn: value })),
type: 'password',
tips: 'If set, the role ARN will be used to assume the role before accessing the bucket.',
helpfulLink:
'https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_identifiers.html#identifiers-arns',
optional: true,
},
];

export const blankS3Setting: S3Config = {
url: 's3://<bucket_name>/<prefix_name>',
accessKeyId: undefined,
secretAccessKey: undefined,
roleArn: undefined,
region: undefined,
endpoint: '',
metadataDb: {
host: '',
port: 5432,
user: 'postgres',
password: '',
database: 'postgres',
transactionSnapshot: '',
},
Comment on lines +55 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't expose metadata db to the user, just use catalog.

};
17 changes: 14 additions & 3 deletions ui/app/peers/create/[peerType]/page.tsx
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
'use client';
import { PeerConfig } from '@/app/dto/PeersDTO';
import BQConfig from '@/components/BigqueryConfig';
import S3ConfigForm from '@/components/S3Form';
import { Button } from '@/lib/Button';
import { ButtonGroup } from '@/lib/ButtonGroup';
import { Label } from '@/lib/Label';
import { LayoutMain, RowWithTextField } from '@/lib/Layout';
import { RowWithTextField } from '@/lib/Layout';
import { Panel } from '@/lib/Panel';
import { TextField } from '@/lib/TextField';
import { Tooltip } from '@/lib/Tooltip';
Expand Down Expand Up @@ -45,6 +46,8 @@ export default function CreateConfig({
return configForm(snowflakeSetting);
case 'BIGQUERY':
return <BQConfig setter={setConfig} />;
case 'S3':
return <S3ConfigForm setter={setConfig} />;
default:
return <></>;
}
Expand All @@ -55,7 +58,15 @@ export default function CreateConfig({
};

return (
<LayoutMain alignSelf='center' justifySelf='center' width='xxLarge'>
<div
style={{
display: 'flex',
flexDirection: 'column',
alignSelf: 'center',
justifySelf: 'center',
width: '45%',
}}
>
<Panel>
<Label variant='title3'>
Setup a new{' '}
Expand Down Expand Up @@ -144,6 +155,6 @@ export default function CreateConfig({
)}
</Panel>
</Panel>
</LayoutMain>
</div>
);
}
Loading