Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clickhouse peer: specify S3 fields #1205

Merged
merged 4 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
}

qrepConfig := &protos.QRepConfig{
StagingPath: c.config.S3Integration,
StagingPath: c.config.S3Path,
FlowJobName: req.FlowJobName,
DestinationTableIdentifier: strings.ToLower(rawTableIdentifier),
}
Expand Down
36 changes: 33 additions & 3 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
conns3 "github.com/PeerDB-io/peer-flow/connectors/s3"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand All @@ -21,12 +23,28 @@ type ClickhouseConnector struct {
tableSchemaMapping map[string]*protos.TableSchema
logger slog.Logger
config *protos.ClickhouseConfig
creds utils.S3PeerCredentials
}

func ValidateS3(ctx context.Context, bucketUrl string, creds utils.S3PeerCredentials) error {
// for validation purposes
s3Client, err := utils.CreateS3Client(creds)
if err != nil {
return fmt.Errorf("failed to create S3 client: %w", err)
}

validErr := conns3.ValidCheck(ctx, s3Client, bucketUrl, nil)
if validErr != nil {
return validErr
}

return nil
}

func NewClickhouseConnector(ctx context.Context,
clickhouseProtoConfig *protos.ClickhouseConfig,
config *protos.ClickhouseConfig,
) (*ClickhouseConnector, error) {
database, err := connect(ctx, clickhouseProtoConfig)
database, err := connect(ctx, config)
if err != nil {
return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err)
}
Expand All @@ -37,14 +55,26 @@ func NewClickhouseConnector(ctx context.Context,
return nil, err
}

s3PeerCreds := utils.S3PeerCredentials{
AccessKeyID: config.AccessKeyId,
SecretAccessKey: config.SecretAccessKey,
Region: config.Region,
}

validateErr := ValidateS3(ctx, config.S3Path, s3PeerCreds)
if validateErr != nil {
return nil, fmt.Errorf("failed to validate S3 bucket: %w", validateErr)
}

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
return &ClickhouseConnector{
ctx: ctx,
database: database,
pgMetadata: pgMetadata,
tableSchemaMapping: nil,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
config: clickhouseProtoConfig,
config: config,
creds: s3PeerCreds,
}, nil
}

Expand Down
12 changes: 6 additions & 6 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ func NewClickhouseAvroSyncMethod(
func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(avroFile *avro.AvroFile) error {
stagingPath := s.config.StagingPath
if stagingPath == "" {
stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Integration // "s3://avro-clickhouse"
stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Path // "s3://avro-clickhouse"
}
s3o, err := utils.NewS3BucketAndPrefix(stagingPath)
if err != nil {
return err
}
awsCreds, err := utils.GetAWSSecrets(utils.S3PeerCredentials{})
awsCreds, err := utils.GetAWSSecrets(s.connector.creds)
avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath)

if err != nil {
Expand Down Expand Up @@ -100,7 +100,7 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
startTime := time.Now()
dstTableName := config.DestinationTableIdentifier

stagingPath := s.config.DestinationPeer.GetClickhouseConfig().S3Integration
stagingPath := s.config.DestinationPeer.GetClickhouseConfig().S3Path

schema, err := stream.Schema()
if err != nil {
Expand All @@ -120,7 +120,7 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
if err != nil {
return 0, err
}
awsCreds, err := utils.GetAWSSecrets(utils.S3PeerCredentials{})
awsCreds, err := utils.GetAWSSecrets(s.connector.creds)
avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath)

if err != nil {
Expand Down Expand Up @@ -166,7 +166,7 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile(
) (*avro.AvroFile, error) {
stagingPath := s.config.StagingPath // "s3://avro-clickhouse"
if stagingPath == "" {
stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Integration // "s3://avro-clickhouse"
stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Path // "s3://avro-clickhouse"
}
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd,
qvalue.QDWHTypeClickhouse)
Expand All @@ -178,7 +178,7 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile(
s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) // s.config.FlowJobName
s3AvroFileKey = strings.Trim(s3AvroFileKey, "/")

avroFile, err := ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{}) ///utils.S3PeerCredentials{})
avroFile, err := ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, s.connector.creds)
if err != nil {
return nil, fmt.Errorf("failed to write records to S3: %w", err)
}
Expand Down
13 changes: 5 additions & 8 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,20 +124,17 @@ func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, meta
}

// check if we can ping external metadata
err := metadataDB.Ping()
if err != nil {
return fmt.Errorf("failed to ping external metadata: %w", err)
if metadataDB != nil {
err := metadataDB.Ping()
if err != nil {
return fmt.Errorf("failed to ping external metadata: %w", err)
}
}

return nil
}

func (c *S3Connector) ConnectionActive() error {
_, listErr := c.client.ListBuckets(c.ctx, nil)
if listErr != nil {
return listErr
}

validErr := ValidCheck(c.ctx, &c.client, c.url, c.pgMetadata)
if validErr != nil {
c.logger.Error("failed to validate s3 connector:", slog.Any("error", validErr))
Expand Down
22 changes: 16 additions & 6 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,11 +779,6 @@ fn parse_db_options(
Some(config)
}
DbType::Clickhouse => {
let s3_int = opts
.get("s3_integration")
.map(|s| s.to_string())
.unwrap_or_default();

let clickhouse_config = ClickhouseConfig {
host: opts.get("host").context("no host specified")?.to_string(),
port: opts
Expand All @@ -803,7 +798,22 @@ fn parse_db_options(
.get("database")
.context("no default database specified")?
.to_string(),
s3_integration: s3_int,
s3_path: opts
.get("s3_path")
.context("no s3 path specified")?
.to_string(),
access_key_id: opts
.get("access_key_id")
.context("no access key id specified")?
.to_string(),
secret_access_key: opts
.get("secret_access_key")
.context("no secret access key specified")?
.to_string(),
region: opts
.get("region")
.context("no region specified")?
.to_string(),
};
let config = Config::ClickhouseConfig(clickhouse_config);
Some(config)
Expand Down
31 changes: 22 additions & 9 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::HashMap;
use anyhow::{anyhow, Context};
use peer_cursor::{QueryExecutor, QueryOutput, Schema};
use peer_postgres::{self, ast};
use pgwire::error::{PgWireResult};
use pgwire::error::PgWireResult;
use postgres_connection::{connect_postgres, get_pg_connection_string};
use prost::Message;
use pt::{
Expand Down Expand Up @@ -165,7 +165,10 @@ impl Catalog {
pub async fn get_peer_id_i32(&self, peer_name: &str) -> anyhow::Result<i32> {
let stmt = self
.pg
.prepare_typed("SELECT id FROM public.peers WHERE name = $1", &[types::Type::TEXT])
.prepare_typed(
"SELECT id FROM public.peers WHERE name = $1",
&[types::Type::TEXT],
)
.await?;

self.pg
Expand All @@ -179,7 +182,10 @@ impl Catalog {
pub async fn get_peer_type_for_id(&self, peer_id: i32) -> anyhow::Result<DbType> {
let stmt = self
.pg
.prepare_typed("SELECT type FROM public.peers WHERE id = $1", &[types::Type::INT4])
.prepare_typed(
"SELECT type FROM public.peers WHERE id = $1",
&[types::Type::INT4],
)
.await?;

self.pg
Expand Down Expand Up @@ -251,7 +257,10 @@ impl Catalog {
pub async fn get_peer_by_id(&self, peer_id: i32) -> anyhow::Result<Peer> {
let stmt = self
.pg
.prepare_typed("SELECT name, type, options FROM public.peers WHERE id = $1", &[])
.prepare_typed(
"SELECT name, type, options FROM public.peers WHERE id = $1",
&[],
)
.await?;

let rows = self.pg.query(&stmt, &[&peer_id]).await?;
Expand Down Expand Up @@ -557,7 +566,10 @@ impl Catalog {
pub async fn delete_flow_job_entry(&self, flow_job_name: &str) -> anyhow::Result<()> {
let rows = self
.pg
.execute("DELETE FROM public.flows WHERE name = $1", &[&flow_job_name])
.execute(
"DELETE FROM public.flows WHERE name = $1",
&[&flow_job_name],
)
.await?;
if rows == 0 {
return Err(anyhow!("unable to delete flow job metadata"));
Expand All @@ -568,7 +580,10 @@ impl Catalog {
pub async fn check_peer_entry(&self, peer_name: &str) -> anyhow::Result<i64> {
let peer_check = self
.pg
.query_one("SELECT COUNT(*) FROM public.peers WHERE name = $1", &[&peer_name])
.query_one(
"SELECT COUNT(*) FROM public.peers WHERE name = $1",
&[&peer_name],
)
.await?;
let peer_count: i64 = peer_check.get(0);
Ok(peer_count)
Expand Down Expand Up @@ -599,9 +614,7 @@ impl Catalog {
impl QueryExecutor for Catalog {
#[tracing::instrument(skip(self, stmt), fields(stmt = %stmt))]
async fn execute(&self, stmt: &Statement) -> PgWireResult<QueryOutput> {
peer_postgres::pg_execute(&self.pg, ast::PostgresAst {
peername: None,
}, stmt).await
peer_postgres::pg_execute(&self.pg, ast::PostgresAst { peername: None }, stmt).await
}

async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<Schema>> {
Expand Down
Loading
Loading