Skip to content

Commit

Permalink
CDC to S3/GCS (#507)
Browse files Browse the repository at this point in the history
- Mandatory Metadata DB field added for the S3/GCS peer just like in
Eventhub.
```sql
CREATE PEER gcs_peer FROM S3 WITH
(
    url = 's3://<gcs/s3_bucket_name>/<prefix>', -- bucket should exist
    access_key_id = '<hmac_key>', -- or AWS equivalent
    secret_access_key = '<hmac_secret>', -- or AWS equivalent
    region = 'auto', -- change this for S3
    endpoint = 'https://storage.googleapis.com', -- or empty for S3
    metadata_db = 'host=<host> port=<port> user=<user> password=<password> database=<database>'
);
```
- Destination File Structure:
Initial Load: One folder per table
CDC is a separate folder

Tests added for both S3 and GCS versions of the peer (CDC)
  • Loading branch information
Amogh-Bharadwaj authored Oct 13, 2023
1 parent b48507e commit d295566
Show file tree
Hide file tree
Showing 18 changed files with 852 additions and 349 deletions.
16 changes: 16 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ jobs:
with:
name: "snowflake_creds.json"
json: ${{ secrets.SNOWFLAKE_GH_CI_PKEY }}

- name: setup S3 credentials
id: s3-credentials
uses: jsdaniell/[email protected]
with:
name: "s3_creds.json"
json: ${{ secrets.S3_CREDS }}

- name: setup GCS credentials
id: gcs-credentials
uses: jsdaniell/[email protected]
with:
name: "gcs_creds.json"
json: ${{ secrets.GCS_CREDS }}

- name: create hstore extension and increase logical replication limits
run: |
Expand All @@ -82,6 +96,8 @@ jobs:
AWS_REGION: ${{ secrets.AWS_REGION }}
TEST_BQ_CREDS: ${{ github.workspace }}/bq_service_account.json
TEST_SF_CREDS: ${{ github.workspace }}/snowflake_creds.json
TEST_S3_CREDS: ${{ github.workspace }}/s3_creds.json
TEST_GCS_CREDS: ${{ github.workspace }}/gcs_creds.json
AZURE_TENANT_ID: ${{ secrets.AZURE_TENANT_ID }}
AZURE_CLIENT_ID: ${{ secrets.AZURE_CLIENT_ID }}
AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }}
Expand Down
2 changes: 2 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne
return nil, fmt.Errorf("use eventhub group config instead")
case *protos.Peer_EventhubGroupConfig:
return conneventhub.NewEventHubConnector(ctx, config.GetEventhubGroupConfig())
case *protos.Peer_S3Config:
return conns3.NewS3Connector(ctx, config.GetS3Config())
default:
return nil, ErrUnsupportedFunctionality
}
Expand Down
59 changes: 56 additions & 3 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand All @@ -22,7 +23,7 @@ import (
type EventHubConnector struct {
ctx context.Context
config *protos.EventHubGroupConfig
pgMetadata *PostgresMetadataStore
pgMetadata *metadataStore.PostgresMetadataStore
tableSchemas map[string]*protos.TableSchema
creds *azidentity.DefaultAzureCredential
hubManager *EventHubManager
Expand All @@ -40,7 +41,9 @@ func NewEventHubConnector(
}

hubManager := NewEventHubManager(ctx, defaultAzureCreds, config)
pgMetadata, err := NewPostgresMetadataStore(ctx, config.GetMetadataDb())
metadataSchemaName := "peerdb_eventhub_metadata" // #nosec G101
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, config.GetMetadataDb(),
metadataSchemaName)
if err != nil {
log.Errorf("failed to create postgres metadata store: %v", err)
return nil, err
Expand Down Expand Up @@ -84,6 +87,48 @@ func (c *EventHubConnector) InitializeTableSchema(req map[string]*protos.TableSc
return nil
}

func (c *EventHubConnector) NeedsSetupMetadataTables() bool {
return c.pgMetadata.NeedsSetupMetadata()
}

func (c *EventHubConnector) SetupMetadataTables() error {
err := c.pgMetadata.SetupMetadata()
if err != nil {
log.Errorf("failed to setup metadata tables: %v", err)
return err
}

return nil
}

func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) {
syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName)
if err != nil {
return 0, err
}

return syncBatchID, nil
}

func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
res, err := c.pgMetadata.FetchLastOffset(jobName)
if err != nil {
return nil, err
}

return res, nil
}

func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(jobName, offset)
if err != nil {
log.Errorf("failed to update last offset: %v", err)
return err
}

return nil
}

func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf("syncing records to eventhub with"+
Expand Down Expand Up @@ -177,7 +222,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
log.Errorf("failed to update last offset: %v", err)
return nil, err
}
err = c.incrementSyncBatchID(req.FlowJobName)
err = c.pgMetadata.IncrementID(req.FlowJobName)
if err != nil {
log.Errorf("%v", err)
return nil, err
Expand Down Expand Up @@ -311,3 +356,11 @@ func (c *EventHubConnector) SetupNormalizedTables(
TableExistsMapping: nil,
}, nil
}

func (c *EventHubConnector) SyncFlowCleanup(jobName string) error {
err := c.pgMetadata.DropMetadata(jobName)
if err != nil {
return err
}
return nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package conneventhub
package connmetadata

import (
"context"
Expand All @@ -10,30 +10,32 @@ import (
)

const (
// schema for the peerdb metadata
metadataSchema = "peerdb_eventhub_metadata"
// The name of the table that stores the last sync state.
lastSyncStateTableName = "last_sync_state"
)

type PostgresMetadataStore struct {
config *protos.PostgresConfig
pool *pgxpool.Pool
ctx context.Context
config *protos.PostgresConfig
pool *pgxpool.Pool
schemaName string
}

func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresMetadataStore, error) {
func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig,
schemaName string) (*PostgresMetadataStore, error) {
connectionString := utils.GetPGConnectionString(pgConfig)

pool, err := pgxpool.New(ctx, connectionString)
if err != nil {
log.Errorf("failed to create connection pool: %v", err)
return nil, err
}
log.Info("created connection pool for eventhub metadata store")
log.Info("created connection pool for metadata store")

return &PostgresMetadataStore{
config: pgConfig,
pool: pool,
ctx: ctx,
config: pgConfig,
pool: pool,
schemaName: schemaName,
}, nil
}

Expand All @@ -45,11 +47,9 @@ func (p *PostgresMetadataStore) Close() error {
return nil
}

func (c *EventHubConnector) NeedsSetupMetadataTables() bool {
ms := c.pgMetadata

func (p *PostgresMetadataStore) NeedsSetupMetadata() bool {
// check if schema exists
rows := ms.pool.QueryRow(c.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", metadataSchema)
rows := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName)

var exists int64
err := rows.Scan(&exists)
Expand All @@ -65,26 +65,24 @@ func (c *EventHubConnector) NeedsSetupMetadataTables() bool {
return true
}

func (c *EventHubConnector) SetupMetadataTables() error {
ms := c.pgMetadata

func (p *PostgresMetadataStore) SetupMetadata() error {
// start a transaction
tx, err := ms.pool.Begin(c.ctx)
tx, err := p.pool.Begin(p.ctx)
if err != nil {
log.Errorf("failed to start transaction: %v", err)
return err
}

// create the schema
_, err = tx.Exec(c.ctx, "CREATE SCHEMA IF NOT EXISTS "+metadataSchema)
_, err = tx.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName)
if err != nil {
log.Errorf("failed to create schema: %v", err)
return err
}

// create the last sync state table
_, err = tx.Exec(c.ctx, `
CREATE TABLE IF NOT EXISTS `+metadataSchema+`.`+lastSyncStateTableName+` (
_, err = tx.Exec(p.ctx, `
CREATE TABLE IF NOT EXISTS `+p.schemaName+`.`+lastSyncStateTableName+` (
job_name TEXT PRIMARY KEY NOT NULL,
last_offset BIGINT NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
Expand All @@ -95,9 +93,10 @@ func (c *EventHubConnector) SetupMetadataTables() error {
log.Errorf("failed to create last sync state table: %v", err)
return err
}
log.Infof("created external metadata table %s.%s", p.schemaName, lastSyncStateTableName)

// commit the transaction
err = tx.Commit(c.ctx)
err = tx.Commit(p.ctx)
if err != nil {
log.Errorf("failed to commit transaction: %v", err)
return err
Expand All @@ -106,15 +105,12 @@ func (c *EventHubConnector) SetupMetadataTables() error {
return nil
}

func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
ms := c.pgMetadata

rows := ms.pool.QueryRow(c.ctx, `
func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyncState, error) {
rows := p.pool.QueryRow(p.ctx, `
SELECT last_offset
FROM `+metadataSchema+`.`+lastSyncStateTableName+`
FROM `+p.schemaName+`.`+lastSyncStateTableName+`
WHERE job_name = $1
`, jobName)

var offset int64
err := rows.Scan(&offset)
if err != nil {
Expand All @@ -138,12 +134,10 @@ func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState
}, nil
}

func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) {
ms := c.pgMetadata

rows := ms.pool.QueryRow(c.ctx, `
func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
rows := p.pool.QueryRow(p.ctx, `
SELECT sync_batch_id
FROM `+metadataSchema+`.`+lastSyncStateTableName+`
FROM `+p.schemaName+`.`+lastSyncStateTableName+`
WHERE job_name = $1
`, jobName)

Expand All @@ -167,11 +161,9 @@ func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) {
}

// update offset for a job
func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error {
ms := c.pgMetadata

func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) error {
// start a transaction
tx, err := ms.pool.Begin(c.ctx)
tx, err := p.pool.Begin(p.ctx)
if err != nil {
log.Errorf("failed to start transaction: %v", err)
return err
Expand All @@ -181,8 +173,8 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error
log.WithFields(log.Fields{
"flowName": jobName,
}).Infof("updating last offset for job `%s` to `%d`", jobName, offset)
_, err = tx.Exec(c.ctx, `
INSERT INTO `+metadataSchema+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id)
_, err = tx.Exec(p.ctx, `
INSERT INTO `+p.schemaName+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id)
VALUES ($1, $2, $3)
ON CONFLICT (job_name)
DO UPDATE SET last_offset = $2, updated_at = NOW()
Expand All @@ -196,7 +188,7 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error
}

// commit the transaction
err = tx.Commit(c.ctx)
err = tx.Commit(p.ctx)
if err != nil {
log.Errorf("failed to commit transaction: %v", err)
return err
Expand All @@ -206,14 +198,12 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error
}

// update offset for a job
func (c *EventHubConnector) incrementSyncBatchID(jobName string) error {
ms := c.pgMetadata

func (p *PostgresMetadataStore) IncrementID(jobName string) error {
log.WithFields(log.Fields{
"flowName": jobName,
}).Infof("incrementing sync batch id for job `%s`", jobName)
_, err := ms.pool.Exec(c.ctx, `
UPDATE `+metadataSchema+`.`+lastSyncStateTableName+`
_, err := p.pool.Exec(p.ctx, `
UPDATE `+p.schemaName+`.`+lastSyncStateTableName+`
SET sync_batch_id=sync_batch_id+1 WHERE job_name=$1
`, jobName)

Expand All @@ -227,9 +217,9 @@ func (c *EventHubConnector) incrementSyncBatchID(jobName string) error {
return nil
}

func (c *EventHubConnector) SyncFlowCleanup(jobName string) error {
_, err := c.pgMetadata.pool.Exec(c.ctx, `
DELETE FROM `+metadataSchema+`.`+lastSyncStateTableName+`
func (p *PostgresMetadataStore) DropMetadata(jobName string) error {
_, err := p.pool.Exec(p.ctx, `
DELETE FROM `+p.schemaName+`.`+lastSyncStateTableName+`
WHERE job_name = $1
`, jobName)
return err
Expand Down
Loading

0 comments on commit d295566

Please sign in to comment.