Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDC to S3/GCS #507

Merged
merged 5 commits into from
Oct 13, 2023
Merged

CDC to S3/GCS #507

merged 5 commits into from
Oct 13, 2023

Conversation

Amogh-Bharadwaj
Copy link
Contributor

  • Mandatory Metadata DB field added for the S3/GCS peer just like in Eventhub.
CREATE PEER gcs_peer FROM S3 WITH
(
    url = 's3://<gcs/s3_bucket_name>/<prefix>', -- bucket should exist
    access_key_id = '<hmac_key>', -- or AWS equivalent
    secret_access_key = '<hmac_secret>', -- or AWS equivalent
    region = 'auto', -- change this for S3
    endpoint = 'https://storage.googleapis.com', -- or empty for S3
    metadata_db = 'host=<host> port=<port> user=<user> password=<password> database=<database>'
);
  • Destination File Structure:
    Initial Load: One folder per table
    CDC is a separate folder

Tests added for both S3 and GCS versions of the peer (CDC)

pool *pgxpool.Pool
}

func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresMetadataStore, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we share this impl with eventhubs somehow?

var firstCP int64 = 0
tableNameRowsMapping := make(map[string]uint32)

for _, record := range req.Records.Records {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we do this elsewhere too? can we refactor to share the code?

@@ -40,7 +41,9 @@ func NewEventHubConnector(
}

hubManager := NewEventHubManager(ctx, defaultAzureCreds, config)
pgMetadata, err := NewPostgresMetadataStore(ctx, config.GetMetadataDb())
metadataSchemaName := "peerdb_eventhub_metadata"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
G101: Potential hardcoded credentials (gosec)

@@ -167,11 +161,10 @@ func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) {
}

// update offset for a job
func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error {
ms := c.pgMetadata
func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
unnecessary leading newline (whitespace)

@@ -206,14 +199,13 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error
}

// update offset for a job
func (c *EventHubConnector) incrementSyncBatchID(jobName string) error {
ms := c.pgMetadata
func (p *PostgresMetadataStore) IncrementID(jobName string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
unnecessary leading newline (whitespace)

}

first := true
var firstCP int64 = req.CP
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
ST1023: should omit type int64 from declaration; it will be inferred from the right-hand side (stylecheck)

tableSchemas map[string]*protos.TableSchema
creds *azidentity.DefaultAzureCredential
hubManager *EventHubManager
}

const (
metadataSchemaName = "peerdb_eventhub_metadata"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
G101: Potential hardcoded credentials (gosec)

@Amogh-Bharadwaj Amogh-Bharadwaj merged commit d295566 into main Oct 13, 2023
11 checks passed
@Amogh-Bharadwaj Amogh-Bharadwaj deleted the s3-cdc branch October 13, 2023 21:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants