From 61fc1a35b35dad77b0f129b9ce7418a71db7a845 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 5 Feb 2024 15:55:37 +0530 Subject: [PATCH 1/4] refactor clickhouse peer --- flow/connectors/clickhouse/cdc.go | 2 +- flow/connectors/clickhouse/clickhouse.go | 38 ++++- flow/connectors/clickhouse/qrep_avro_sync.go | 12 +- flow/connectors/s3/s3.go | 13 +- nexus/analyzer/src/lib.rs | 17 ++- nexus/catalog/src/lib.rs | 31 ++-- nexus/peer-postgres/src/lib.rs | 140 +++++++++--------- nexus/postgres-connection/src/lib.rs | 4 +- protos/peers.proto | 5 +- ui/app/peers/create/[peerType]/helpers/ch.ts | 34 ++++- ui/app/peers/create/[peerType]/schema.ts | 62 ++++---- ui/components/PeerForms/ClickhouseConfig.tsx | 148 ++++++++----------- 12 files changed, 289 insertions(+), 217 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 422502e7b5..61ffa7b232 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -85,7 +85,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro( } qrepConfig := &protos.QRepConfig{ - StagingPath: c.config.S3Integration, + StagingPath: c.config.S3Path, FlowJobName: req.FlowJobName, DestinationTableIdentifier: strings.ToLower(rawTableIdentifier), } diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 43b8d028d3..c006ddb5a4 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -10,6 +10,8 @@ import ( _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" + conns3 "github.com/PeerDB-io/peer-flow/connectors/s3" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" ) @@ -21,12 +23,30 @@ type ClickhouseConnector struct { tableSchemaMapping map[string]*protos.TableSchema logger slog.Logger config *protos.ClickhouseConfig + creds utils.S3PeerCredentials +} + +func ValidateS3(ctx context.Context, bucketUrl string, creds utils.S3PeerCredentials) error { + slog.Info("Validating S3 bucket", slog.String("bucketUrl", bucketUrl)) + slog.Info("Validating S3 credentials", slog.Any("creds", creds)) + // for validation purposes + s3Client, err := utils.CreateS3Client(creds) + if err != nil { + return fmt.Errorf("failed to create S3 client: %w", err) + } + + validErr := conns3.ValidCheck(ctx, s3Client, bucketUrl, nil) + if validErr != nil { + return validErr + } + + return nil } func NewClickhouseConnector(ctx context.Context, - clickhouseProtoConfig *protos.ClickhouseConfig, + config *protos.ClickhouseConfig, ) (*ClickhouseConnector, error) { - database, err := connect(ctx, clickhouseProtoConfig) + database, err := connect(ctx, config) if err != nil { return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err) } @@ -37,6 +57,17 @@ func NewClickhouseConnector(ctx context.Context, return nil, err } + s3PeerCreds := utils.S3PeerCredentials{ + AccessKeyID: config.AccessKeyId, + SecretAccessKey: config.SecretAccessKey, + Region: config.Region, + } + + validateErr := ValidateS3(ctx, config.S3Path, s3PeerCreds) + if validateErr != nil { + return nil, fmt.Errorf("failed to validate S3 bucket: %w", validateErr) + } + flowName, _ := ctx.Value(shared.FlowNameKey).(string) return &ClickhouseConnector{ ctx: ctx, @@ -44,7 +75,8 @@ func NewClickhouseConnector(ctx context.Context, pgMetadata: pgMetadata, tableSchemaMapping: nil, logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), - config: clickhouseProtoConfig, + config: config, + creds: s3PeerCreds, }, nil } diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 3d2f179bca..0c5cafbc2a 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -35,13 +35,13 @@ func NewClickhouseAvroSyncMethod( func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(avroFile *avro.AvroFile) error { stagingPath := s.config.StagingPath if stagingPath == "" { - stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Integration // "s3://avro-clickhouse" + stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Path // "s3://avro-clickhouse" } s3o, err := utils.NewS3BucketAndPrefix(stagingPath) if err != nil { return err } - awsCreds, err := utils.GetAWSSecrets(utils.S3PeerCredentials{}) + awsCreds, err := utils.GetAWSSecrets(s.connector.creds) avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath) if err != nil { @@ -100,7 +100,7 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( startTime := time.Now() dstTableName := config.DestinationTableIdentifier - stagingPath := s.config.DestinationPeer.GetClickhouseConfig().S3Integration + stagingPath := s.config.DestinationPeer.GetClickhouseConfig().S3Path schema, err := stream.Schema() if err != nil { @@ -120,7 +120,7 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( if err != nil { return 0, err } - awsCreds, err := utils.GetAWSSecrets(utils.S3PeerCredentials{}) + awsCreds, err := utils.GetAWSSecrets(s.connector.creds) avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath) if err != nil { @@ -166,7 +166,7 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile( ) (*avro.AvroFile, error) { stagingPath := s.config.StagingPath // "s3://avro-clickhouse" if stagingPath == "" { - stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Integration // "s3://avro-clickhouse" + stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Path // "s3://avro-clickhouse" } ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd, qvalue.QDWHTypeClickhouse) @@ -178,7 +178,7 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile( s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) // s.config.FlowJobName s3AvroFileKey = strings.Trim(s3AvroFileKey, "/") - avroFile, err := ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{}) ///utils.S3PeerCredentials{}) + avroFile, err := ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, s.connector.creds) if err != nil { return nil, fmt.Errorf("failed to write records to S3: %w", err) } diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 9a1877ca32..59803d21b7 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -124,20 +124,17 @@ func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, meta } // check if we can ping external metadata - err := metadataDB.Ping() - if err != nil { - return fmt.Errorf("failed to ping external metadata: %w", err) + if metadataDB != nil { + err := metadataDB.Ping() + if err != nil { + return fmt.Errorf("failed to ping external metadata: %w", err) + } } return nil } func (c *S3Connector) ConnectionActive() error { - _, listErr := c.client.ListBuckets(c.ctx, nil) - if listErr != nil { - return listErr - } - validErr := ValidCheck(c.ctx, &c.client, c.url, c.pgMetadata) if validErr != nil { c.logger.Error("failed to validate s3 connector:", slog.Any("error", validErr)) diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 64c28d1693..24b54c4d3e 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -803,7 +803,22 @@ fn parse_db_options( .get("database") .context("no default database specified")? .to_string(), - s3_integration: s3_int, + s3_path: opts + .get("s3_path") + .context("no s3 path specified")? + .to_string(), + access_key_id: opts + .get("access_key_id") + .context("no access key id specified")? + .to_string(), + secret_access_key: opts + .get("secret_access_key") + .context("no secret access key specified")? + .to_string(), + region: opts + .get("region") + .context("no region specified")? + .to_string(), }; let config = Config::ClickhouseConfig(clickhouse_config); Some(config) diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 51571957da..123e29a0d9 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use anyhow::{anyhow, Context}; use peer_cursor::{QueryExecutor, QueryOutput, Schema}; use peer_postgres::{self, ast}; -use pgwire::error::{PgWireResult}; +use pgwire::error::PgWireResult; use postgres_connection::{connect_postgres, get_pg_connection_string}; use prost::Message; use pt::{ @@ -165,7 +165,10 @@ impl Catalog { pub async fn get_peer_id_i32(&self, peer_name: &str) -> anyhow::Result { let stmt = self .pg - .prepare_typed("SELECT id FROM public.peers WHERE name = $1", &[types::Type::TEXT]) + .prepare_typed( + "SELECT id FROM public.peers WHERE name = $1", + &[types::Type::TEXT], + ) .await?; self.pg @@ -179,7 +182,10 @@ impl Catalog { pub async fn get_peer_type_for_id(&self, peer_id: i32) -> anyhow::Result { let stmt = self .pg - .prepare_typed("SELECT type FROM public.peers WHERE id = $1", &[types::Type::INT4]) + .prepare_typed( + "SELECT type FROM public.peers WHERE id = $1", + &[types::Type::INT4], + ) .await?; self.pg @@ -251,7 +257,10 @@ impl Catalog { pub async fn get_peer_by_id(&self, peer_id: i32) -> anyhow::Result { let stmt = self .pg - .prepare_typed("SELECT name, type, options FROM public.peers WHERE id = $1", &[]) + .prepare_typed( + "SELECT name, type, options FROM public.peers WHERE id = $1", + &[], + ) .await?; let rows = self.pg.query(&stmt, &[&peer_id]).await?; @@ -557,7 +566,10 @@ impl Catalog { pub async fn delete_flow_job_entry(&self, flow_job_name: &str) -> anyhow::Result<()> { let rows = self .pg - .execute("DELETE FROM public.flows WHERE name = $1", &[&flow_job_name]) + .execute( + "DELETE FROM public.flows WHERE name = $1", + &[&flow_job_name], + ) .await?; if rows == 0 { return Err(anyhow!("unable to delete flow job metadata")); @@ -568,7 +580,10 @@ impl Catalog { pub async fn check_peer_entry(&self, peer_name: &str) -> anyhow::Result { let peer_check = self .pg - .query_one("SELECT COUNT(*) FROM public.peers WHERE name = $1", &[&peer_name]) + .query_one( + "SELECT COUNT(*) FROM public.peers WHERE name = $1", + &[&peer_name], + ) .await?; let peer_count: i64 = peer_check.get(0); Ok(peer_count) @@ -599,9 +614,7 @@ impl Catalog { impl QueryExecutor for Catalog { #[tracing::instrument(skip(self, stmt), fields(stmt = %stmt))] async fn execute(&self, stmt: &Statement) -> PgWireResult { - peer_postgres::pg_execute(&self.pg, ast::PostgresAst { - peername: None, - }, stmt).await + peer_postgres::pg_execute(&self.pg, ast::PostgresAst { peername: None }, stmt).await } async fn describe(&self, stmt: &Statement) -> PgWireResult> { diff --git a/nexus/peer-postgres/src/lib.rs b/nexus/peer-postgres/src/lib.rs index 0ae519c7c8..cb29104d87 100644 --- a/nexus/peer-postgres/src/lib.rs +++ b/nexus/peer-postgres/src/lib.rs @@ -44,88 +44,92 @@ async fn schema_from_query(client: &Client, query: &str) -> anyhow::Result PgWireResult { - // if the query is a select statement, we need to fetch the rows - // and return them as a QueryOutput::Stream, else we return the - // number of affected rows. - match stmt { - Statement::Query(query) => { - let mut query = query.clone(); - ast.rewrite_query(&mut query); - let rewritten_query = query.to_string(); - - // first fetch the schema as this connection will be - // short lived, only then run the query as the query - // could hold the pin on the connection for a long time. - let schema = schema_from_query(client, &rewritten_query) - .await - .map_err(|e| { - tracing::error!("error getting schema: {}", e); - PgWireError::ApiError(format!("error getting schema: {}", e).into()) - })?; +pub async fn pg_execute( + client: &Client, + ast: ast::PostgresAst, + stmt: &Statement, +) -> PgWireResult { + // if the query is a select statement, we need to fetch the rows + // and return them as a QueryOutput::Stream, else we return the + // number of affected rows. + match stmt { + Statement::Query(query) => { + let mut query = query.clone(); + ast.rewrite_query(&mut query); + let rewritten_query = query.to_string(); + + // first fetch the schema as this connection will be + // short lived, only then run the query as the query + // could hold the pin on the connection for a long time. + let schema = schema_from_query(client, &rewritten_query) + .await + .map_err(|e| { + tracing::error!("error getting schema: {}", e); + PgWireError::ApiError(format!("error getting schema: {}", e).into()) + })?; - tracing::info!("[peer-postgres] rewritten query: {}", rewritten_query); - // given that there could be a lot of rows returned, we - // need to use a cursor to stream the rows back to the - // client. - let stream = client - .query_raw(&rewritten_query, std::iter::empty::<&str>()) - .await - .map_err(|e| { - tracing::error!("error executing query: {}", e); - PgWireError::ApiError(format!("error executing query: {}", e).into()) - })?; + tracing::info!("[peer-postgres] rewritten query: {}", rewritten_query); + // given that there could be a lot of rows returned, we + // need to use a cursor to stream the rows back to the + // client. + let stream = client + .query_raw(&rewritten_query, std::iter::empty::<&str>()) + .await + .map_err(|e| { + tracing::error!("error executing query: {}", e); + PgWireError::ApiError(format!("error executing query: {}", e).into()) + })?; - // log that raw query execution has completed - tracing::info!("[peer-postgres] raw query execution completed"); + // log that raw query execution has completed + tracing::info!("[peer-postgres] raw query execution completed"); - let cursor = stream::PgRecordStream::new(stream, schema); - Ok(QueryOutput::Stream(Box::pin(cursor))) - } - _ => { - let mut rewritten_stmt = stmt.clone(); - ast.rewrite_statement(&mut rewritten_stmt).map_err(|e| { - tracing::error!("error rewriting statement: {}", e); - PgWireError::ApiError(format!("error rewriting statement: {}", e).into()) - })?; - let rewritten_query = rewritten_stmt.to_string(); - tracing::info!("[peer-postgres] rewritten statement: {}", rewritten_query); - let rows_affected = - client - .execute(&rewritten_query, &[]) - .await - .map_err(|e| { - tracing::error!("error executing query: {}", e); - PgWireError::ApiError(format!("error executing query: {}", e).into()) - })?; - Ok(QueryOutput::AffectedRows(rows_affected as usize)) - } + let cursor = stream::PgRecordStream::new(stream, schema); + Ok(QueryOutput::Stream(Box::pin(cursor))) } - + _ => { + let mut rewritten_stmt = stmt.clone(); + ast.rewrite_statement(&mut rewritten_stmt).map_err(|e| { + tracing::error!("error rewriting statement: {}", e); + PgWireError::ApiError(format!("error rewriting statement: {}", e).into()) + })?; + let rewritten_query = rewritten_stmt.to_string(); + tracing::info!("[peer-postgres] rewritten statement: {}", rewritten_query); + let rows_affected = client.execute(&rewritten_query, &[]).await.map_err(|e| { + tracing::error!("error executing query: {}", e); + PgWireError::ApiError(format!("error executing query: {}", e).into()) + })?; + Ok(QueryOutput::AffectedRows(rows_affected as usize)) + } + } } pub async fn pg_describe(client: &Client, stmt: &Statement) -> PgWireResult> { - match stmt { - Statement::Query(_query) => { - let schema = schema_from_query(client, &stmt.to_string()) - .await - .map_err(|e| { - tracing::error!("error getting schema: {}", e); - PgWireError::ApiError(format!("error getting schema: {}", e).into()) - })?; - Ok(Some(schema)) - } - _ => Ok(None), + match stmt { + Statement::Query(_query) => { + let schema = schema_from_query(client, &stmt.to_string()) + .await + .map_err(|e| { + tracing::error!("error getting schema: {}", e); + PgWireError::ApiError(format!("error getting schema: {}", e).into()) + })?; + Ok(Some(schema)) } + _ => Ok(None), + } } #[async_trait::async_trait] impl QueryExecutor for PostgresQueryExecutor { #[tracing::instrument(skip(self, stmt), fields(stmt = %stmt))] async fn execute(&self, stmt: &Statement) -> PgWireResult { - pg_execute(&self.client, ast::PostgresAst { - peername: Some(self.peername.clone()), - }, stmt).await + pg_execute( + &self.client, + ast::PostgresAst { + peername: Some(self.peername.clone()), + }, + stmt, + ) + .await } async fn describe(&self, stmt: &Statement) -> PgWireResult> { diff --git a/nexus/postgres-connection/src/lib.rs b/nexus/postgres-connection/src/lib.rs index 5053177d66..4328978adf 100644 --- a/nexus/postgres-connection/src/lib.rs +++ b/nexus/postgres-connection/src/lib.rs @@ -68,7 +68,9 @@ pub fn get_pg_connection_string(config: &PostgresConfig) -> String { write!( connection_string, "@{}:{}/{}?connect_timeout=15&application_name=peerdb_nexus", - config.host, config.port, urlencoding::encode(&config.database) + config.host, + config.port, + urlencoding::encode(&config.database) ) .ok(); diff --git a/protos/peers.proto b/protos/peers.proto index aac2627a19..811fdfc2c8 100644 --- a/protos/peers.proto +++ b/protos/peers.proto @@ -93,7 +93,10 @@ message ClickhouseConfig{ string user = 3; string password = 4; string database = 5; - string s3_integration = 6; // staging to store avro files + string s3_path = 6; // path to S3 bucket which will store avro files + string access_key_id = 7; + string secret_access_key = 8; + string region = 9; } message SqlServerConfig { diff --git a/ui/app/peers/create/[peerType]/helpers/ch.ts b/ui/app/peers/create/[peerType]/helpers/ch.ts index 57eee3c39a..3fc272261b 100644 --- a/ui/app/peers/create/[peerType]/helpers/ch.ts +++ b/ui/app/peers/create/[peerType]/helpers/ch.ts @@ -40,13 +40,32 @@ export const clickhouseSetting: PeerSetting[] = [ 'https://www.postgresql.org/docs/current/sql-createdatabase.html', }, { - label: 'S3 Integration', + label: 'S3 Path', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, s3Integration: value })), - optional: true, - tips: `This is needed only if you plan to run a mirror and you wish to stage AVRO files on S3.`, + setter((curr) => ({ ...curr, s3Path: value })), + tips: `This is an S3 bucket/object URL field. This bucket will be used as our intermediate stage for CDC`, + }, + { + label: 'Access Key ID', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, accessKeyId: value })), + tips: 'The AWS access key ID associated with your account.', + helpfulLink: + 'https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html', + }, + { + label: 'Secret Access Key', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, secretAccessKey: value })), + tips: 'The AWS secret access key associated with the above bucket.', helpfulLink: - 'https://docs.snowflake.com/en/user-guide/data-load-s3-config-storage-integration', + 'https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html', + }, + { + label: 'Region', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, region: value })), + tips: 'The region where your bucket is located. For example, us-east-1.', }, ]; @@ -56,5 +75,8 @@ export const blankClickhouseSetting: ClickhouseConfig = { user: '', password: '', database: '', - s3Integration: '', + s3Path: '', + accessKeyId: '', + secretAccessKey: '', + region: '', }; diff --git a/ui/app/peers/create/[peerType]/schema.ts b/ui/app/peers/create/[peerType]/schema.ts index fed2119b12..290eb81b35 100644 --- a/ui/app/peers/create/[peerType]/schema.ts +++ b/ui/app/peers/create/[peerType]/schema.ts @@ -229,6 +229,34 @@ export const bqSchema = z.object({ .max(1024, 'DatasetID must be less than 1025 characters'), }); +const urlSchema = z + .string({ + invalid_type_error: 'URL must be a string', + required_error: 'URL is required', + }) + .min(1, { message: 'URL must be non-empty' }) + .refine((url) => url.startsWith('s3://'), { + message: 'URL must start with s3://', + }); + +const accessKeySchema = z + .string({ + invalid_type_error: 'Access Key ID must be a string', + required_error: 'Access Key ID is required', + }) + .min(1, { message: 'Access Key ID must be non-empty' }); + +const secretKeySchema = z + .string({ + invalid_type_error: 'Secret Access Key must be a string', + required_error: 'Secret Access Key is required', + }) + .min(1, { message: 'Secret Access Key must be non-empty' }); + +const regionSchema = z.string({ + invalid_type_error: 'Region must be a string', +}); + export const chSchema = z.object({ host: z .string({ @@ -266,40 +294,22 @@ export const chSchema = z.object({ }) .min(1, 'Password must be non-empty') .max(100, 'Password must be less than 100 characters'), + s3Path: urlSchema, + accessKeyId: accessKeySchema, + secretAccessKey: secretKeySchema, + region: regionSchema.min(1, { message: 'Region must be non-empty' }), }); export const s3Schema = z.object({ - url: z - .string({ - invalid_type_error: 'URL must be a string', - required_error: 'URL is required', - }) - .min(1, { message: 'URL must be non-empty' }) - .refine((url) => url.startsWith('s3://'), { - message: 'URL must start with s3://', - }), - accessKeyId: z - .string({ - invalid_type_error: 'Access Key ID must be a string', - required_error: 'Access Key ID is required', - }) - .min(1, { message: 'Access Key ID must be non-empty' }), - secretAccessKey: z - .string({ - invalid_type_error: 'Secret Access Key must be a string', - required_error: 'Secret Access Key is required', - }) - .min(1, { message: 'Secret Access Key must be non-empty' }), + url: urlSchema, + accessKeyId: accessKeySchema, + secretAccessKey: secretKeySchema, roleArn: z .string({ invalid_type_error: 'Role ARN must be a string', }) .optional(), - region: z - .string({ - invalid_type_error: 'Region must be a string', - }) - .optional(), + region: regionSchema.optional(), endpoint: z .string({ invalid_type_error: 'Endpoint must be a string', diff --git a/ui/components/PeerForms/ClickhouseConfig.tsx b/ui/components/PeerForms/ClickhouseConfig.tsx index 3e589c5045..9045c23a97 100644 --- a/ui/components/PeerForms/ClickhouseConfig.tsx +++ b/ui/components/PeerForms/ClickhouseConfig.tsx @@ -1,17 +1,10 @@ 'use client'; import { PeerSetter } from '@/app/dto/PeersDTO'; import { PeerSetting } from '@/app/peers/create/[peerType]/helpers/common'; -import { - blankSSHConfig, - sshSetting, -} from '@/app/peers/create/[peerType]/helpers/pg'; -import { SSHConfig } from '@/grpc_generated/peers'; import { Label } from '@/lib/Label'; import { RowWithTextField } from '@/lib/Layout'; -import { Switch } from '@/lib/Switch'; import { TextField } from '@/lib/TextField'; import { Tooltip } from '@/lib/Tooltip'; -import { useEffect, useState } from 'react'; import { InfoPopover } from '../InfoPopover'; interface ConfigProps { settings: PeerSetting[]; @@ -19,9 +12,7 @@ interface ConfigProps { } export default function ClickhouseForm({ settings, setter }: ConfigProps) { - const [showSSH, setShowSSH] = useState(false); - const [sshConfig, setSSHConfig] = useState(blankSSHConfig); - + const S3Labels = ['S3 Path', 'Access Key ID', 'Secret Access Key', 'Region']; const handleChange = ( e: React.ChangeEvent, setting: PeerSetting @@ -29,85 +20,77 @@ export default function ClickhouseForm({ settings, setter }: ConfigProps) { setting.stateHandler(e.target.value, setter); }; - useEffect(() => { - setter((prev) => { - return { - ...prev, - sshConfig: showSSH ? sshConfig : undefined, - }; - }); - }, [sshConfig, setter, showSSH]); - return ( <> - {settings.map((setting, id) => { - return ( - - {setting.label}{' '} - {!setting.optional && ( - - - - )} - - } - action={ -
- ) => - handleChange(e, setting) - } - /> - {setting.tips && ( - - )} -
- } - /> - ); - })} - + {settings + .filter((setting) => !S3Labels.includes(setting.label)) + .map((setting, id) => { + return ( + + {setting.label}{' '} + {!setting.optional && ( + + + + )} + + } + action={ +
+ ) => + handleChange(e, setting) + } + /> + {setting.tips && ( + + )} +
+ } + /> + ); + })} -
- - setShowSSH(state)} /> -
- {showSSH && - sshSetting.map((sshParam, index) => ( + {settings + .filter((setting) => S3Labels.includes(setting.label)) + .map((setting, id) => ( - {sshParam.label}{' '} - {!sshParam.optional && ( + {setting.label}{' '} + {!setting.optional && ( ) => - sshParam.stateHandler(e.target.value, setSSHConfig) - } - type={sshParam.type} - defaultValue={ - (sshConfig as SSHConfig)[ - sshParam.label === 'BASE64 Private Key' - ? 'privateKey' - : sshParam.label === 'Host Key' - ? 'hostKey' - : (sshParam.label.toLowerCase() as keyof SSHConfig) - ] || '' + handleChange(e, setting) } + type={setting.type} /> - {sshParam.tips && } + {setting.tips && } } /> From 0e883f6c8da1448383dd6a596dba005c64f7b4e1 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 5 Feb 2024 15:56:17 +0530 Subject: [PATCH 2/4] remove logs --- flow/connectors/clickhouse/clickhouse.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index c006ddb5a4..1da16c8c9f 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -27,8 +27,6 @@ type ClickhouseConnector struct { } func ValidateS3(ctx context.Context, bucketUrl string, creds utils.S3PeerCredentials) error { - slog.Info("Validating S3 bucket", slog.String("bucketUrl", bucketUrl)) - slog.Info("Validating S3 credentials", slog.Any("creds", creds)) // for validation purposes s3Client, err := utils.CreateS3Client(creds) if err != nil { From eeb8702f24702cc1491835d8520538686c2ca3c5 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 5 Feb 2024 17:31:15 +0530 Subject: [PATCH 3/4] lint --- nexus/analyzer/src/lib.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 24b54c4d3e..17863c9e1d 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -779,11 +779,6 @@ fn parse_db_options( Some(config) } DbType::Clickhouse => { - let s3_int = opts - .get("s3_integration") - .map(|s| s.to_string()) - .unwrap_or_default(); - let clickhouse_config = ClickhouseConfig { host: opts.get("host").context("no host specified")?.to_string(), port: opts From fb0ac23c536ecfe3018b507f7a235f44082529c1 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 5 Feb 2024 17:32:21 +0530 Subject: [PATCH 4/4] minor change --- ui/components/PeerForms/ClickhouseConfig.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ui/components/PeerForms/ClickhouseConfig.tsx b/ui/components/PeerForms/ClickhouseConfig.tsx index 9045c23a97..9614870a83 100644 --- a/ui/components/PeerForms/ClickhouseConfig.tsx +++ b/ui/components/PeerForms/ClickhouseConfig.tsx @@ -76,7 +76,7 @@ export default function ClickhouseForm({ settings, setter }: ConfigProps) { variant='subheadline' colorName='lowContrast' > - Transient Stage + Transient S3 Stage