From d2955661231cb386d39a8378ef75cb3e8e469fa1 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Sat, 14 Oct 2023 02:32:42 +0530 Subject: [PATCH] CDC to S3/GCS (#507) - Mandatory Metadata DB field added for the S3/GCS peer just like in Eventhub. ```sql CREATE PEER gcs_peer FROM S3 WITH ( url = 's3:///', -- bucket should exist access_key_id = '', -- or AWS equivalent secret_access_key = '', -- or AWS equivalent region = 'auto', -- change this for S3 endpoint = 'https://storage.googleapis.com', -- or empty for S3 metadata_db = 'host= port= user= password= database=' ); ``` - Destination File Structure: Initial Load: One folder per table CDC is a separate folder Tests added for both S3 and GCS versions of the peer (CDC) --- .github/workflows/flow.yml | 16 ++ flow/connectors/core.go | 2 + flow/connectors/eventhub/eventhub.go | 59 +++++- .../store.go} | 86 ++++---- flow/connectors/s3/s3.go | 181 ++++++++++++++-- flow/connectors/snowflake/snowflake.go | 184 +--------------- flow/connectors/utils/stream.go | 189 +++++++++++++++++ flow/e2e/congen.go | 6 + flow/e2e/s3/cdc_s3_test.go | 157 ++++++++++++++ flow/e2e/s3/qrep_flow_s3_test.go | 10 +- flow/e2e/s3/s3_helper.go | 54 ++++- flow/generated/protos/peers.pb.go | 199 ++++++++++-------- flow/model/qrecord_stream.go | 12 ++ nexus/analyzer/src/lib.rs | 6 + nexus/pt/src/peerdb_peers.rs | 2 + nexus/pt/src/peerdb_peers.serde.rs | 18 ++ protos/peers.proto | 1 + ui/grpc_generated/peers.ts | 19 ++ 18 files changed, 852 insertions(+), 349 deletions(-) rename flow/connectors/{eventhub/metadata.go => external_metadata/store.go} (66%) create mode 100644 flow/connectors/utils/stream.go create mode 100644 flow/e2e/s3/cdc_s3_test.go diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 04698de4d..ffceae7c5 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -59,6 +59,20 @@ jobs: with: name: "snowflake_creds.json" json: ${{ secrets.SNOWFLAKE_GH_CI_PKEY }} + + - name: setup S3 credentials + id: s3-credentials + uses: jsdaniell/create-json@v1.2.2 + with: + name: "s3_creds.json" + json: ${{ secrets.S3_CREDS }} + + - name: setup GCS credentials + id: gcs-credentials + uses: jsdaniell/create-json@v1.2.2 + with: + name: "gcs_creds.json" + json: ${{ secrets.GCS_CREDS }} - name: create hstore extension and increase logical replication limits run: | @@ -82,6 +96,8 @@ jobs: AWS_REGION: ${{ secrets.AWS_REGION }} TEST_BQ_CREDS: ${{ github.workspace }}/bq_service_account.json TEST_SF_CREDS: ${{ github.workspace }}/snowflake_creds.json + TEST_S3_CREDS: ${{ github.workspace }}/s3_creds.json + TEST_GCS_CREDS: ${{ github.workspace }}/gcs_creds.json AZURE_TENANT_ID: ${{ secrets.AZURE_TENANT_ID }} AZURE_CLIENT_ID: ${{ secrets.AZURE_CLIENT_ID }} AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }} diff --git a/flow/connectors/core.go b/flow/connectors/core.go index a67f9e346..3d1c22673 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -148,6 +148,8 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne return nil, fmt.Errorf("use eventhub group config instead") case *protos.Peer_EventhubGroupConfig: return conneventhub.NewEventHubConnector(ctx, config.GetEventhubGroupConfig()) + case *protos.Peer_S3Config: + return conns3.NewS3Connector(ctx, config.GetS3Config()) default: return nil, ErrUnsupportedFunctionality } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index b614ec329..51dba78b7 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -10,6 +10,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azidentity" azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/metrics" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -22,7 +23,7 @@ import ( type EventHubConnector struct { ctx context.Context config *protos.EventHubGroupConfig - pgMetadata *PostgresMetadataStore + pgMetadata *metadataStore.PostgresMetadataStore tableSchemas map[string]*protos.TableSchema creds *azidentity.DefaultAzureCredential hubManager *EventHubManager @@ -40,7 +41,9 @@ func NewEventHubConnector( } hubManager := NewEventHubManager(ctx, defaultAzureCreds, config) - pgMetadata, err := NewPostgresMetadataStore(ctx, config.GetMetadataDb()) + metadataSchemaName := "peerdb_eventhub_metadata" // #nosec G101 + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, config.GetMetadataDb(), + metadataSchemaName) if err != nil { log.Errorf("failed to create postgres metadata store: %v", err) return nil, err @@ -84,6 +87,48 @@ func (c *EventHubConnector) InitializeTableSchema(req map[string]*protos.TableSc return nil } +func (c *EventHubConnector) NeedsSetupMetadataTables() bool { + return c.pgMetadata.NeedsSetupMetadata() +} + +func (c *EventHubConnector) SetupMetadataTables() error { + err := c.pgMetadata.SetupMetadata() + if err != nil { + log.Errorf("failed to setup metadata tables: %v", err) + return err + } + + return nil +} + +func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) { + syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName) + if err != nil { + return 0, err + } + + return syncBatchID, nil +} + +func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { + res, err := c.pgMetadata.FetchLastOffset(jobName) + if err != nil { + return nil, err + } + + return res, nil +} + +func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error { + err := c.pgMetadata.UpdateLastOffset(jobName, offset) + if err != nil { + log.Errorf("failed to update last offset: %v", err) + return err + } + + return nil +} + func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string { return fmt.Sprintf("syncing records to eventhub with"+ @@ -177,7 +222,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S log.Errorf("failed to update last offset: %v", err) return nil, err } - err = c.incrementSyncBatchID(req.FlowJobName) + err = c.pgMetadata.IncrementID(req.FlowJobName) if err != nil { log.Errorf("%v", err) return nil, err @@ -311,3 +356,11 @@ func (c *EventHubConnector) SetupNormalizedTables( TableExistsMapping: nil, }, nil } + +func (c *EventHubConnector) SyncFlowCleanup(jobName string) error { + err := c.pgMetadata.DropMetadata(jobName) + if err != nil { + return err + } + return nil +} diff --git a/flow/connectors/eventhub/metadata.go b/flow/connectors/external_metadata/store.go similarity index 66% rename from flow/connectors/eventhub/metadata.go rename to flow/connectors/external_metadata/store.go index 178ebbb07..6ec7c0714 100644 --- a/flow/connectors/eventhub/metadata.go +++ b/flow/connectors/external_metadata/store.go @@ -1,4 +1,4 @@ -package conneventhub +package connmetadata import ( "context" @@ -10,18 +10,18 @@ import ( ) const ( - // schema for the peerdb metadata - metadataSchema = "peerdb_eventhub_metadata" - // The name of the table that stores the last sync state. lastSyncStateTableName = "last_sync_state" ) type PostgresMetadataStore struct { - config *protos.PostgresConfig - pool *pgxpool.Pool + ctx context.Context + config *protos.PostgresConfig + pool *pgxpool.Pool + schemaName string } -func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresMetadataStore, error) { +func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig, + schemaName string) (*PostgresMetadataStore, error) { connectionString := utils.GetPGConnectionString(pgConfig) pool, err := pgxpool.New(ctx, connectionString) @@ -29,11 +29,13 @@ func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConf log.Errorf("failed to create connection pool: %v", err) return nil, err } - log.Info("created connection pool for eventhub metadata store") + log.Info("created connection pool for metadata store") return &PostgresMetadataStore{ - config: pgConfig, - pool: pool, + ctx: ctx, + config: pgConfig, + pool: pool, + schemaName: schemaName, }, nil } @@ -45,11 +47,9 @@ func (p *PostgresMetadataStore) Close() error { return nil } -func (c *EventHubConnector) NeedsSetupMetadataTables() bool { - ms := c.pgMetadata - +func (p *PostgresMetadataStore) NeedsSetupMetadata() bool { // check if schema exists - rows := ms.pool.QueryRow(c.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", metadataSchema) + rows := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName) var exists int64 err := rows.Scan(&exists) @@ -65,26 +65,24 @@ func (c *EventHubConnector) NeedsSetupMetadataTables() bool { return true } -func (c *EventHubConnector) SetupMetadataTables() error { - ms := c.pgMetadata - +func (p *PostgresMetadataStore) SetupMetadata() error { // start a transaction - tx, err := ms.pool.Begin(c.ctx) + tx, err := p.pool.Begin(p.ctx) if err != nil { log.Errorf("failed to start transaction: %v", err) return err } // create the schema - _, err = tx.Exec(c.ctx, "CREATE SCHEMA IF NOT EXISTS "+metadataSchema) + _, err = tx.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName) if err != nil { log.Errorf("failed to create schema: %v", err) return err } // create the last sync state table - _, err = tx.Exec(c.ctx, ` - CREATE TABLE IF NOT EXISTS `+metadataSchema+`.`+lastSyncStateTableName+` ( + _, err = tx.Exec(p.ctx, ` + CREATE TABLE IF NOT EXISTS `+p.schemaName+`.`+lastSyncStateTableName+` ( job_name TEXT PRIMARY KEY NOT NULL, last_offset BIGINT NOT NULL, updated_at TIMESTAMP NOT NULL DEFAULT NOW(), @@ -95,9 +93,10 @@ func (c *EventHubConnector) SetupMetadataTables() error { log.Errorf("failed to create last sync state table: %v", err) return err } + log.Infof("created external metadata table %s.%s", p.schemaName, lastSyncStateTableName) // commit the transaction - err = tx.Commit(c.ctx) + err = tx.Commit(p.ctx) if err != nil { log.Errorf("failed to commit transaction: %v", err) return err @@ -106,15 +105,12 @@ func (c *EventHubConnector) SetupMetadataTables() error { return nil } -func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { - ms := c.pgMetadata - - rows := ms.pool.QueryRow(c.ctx, ` +func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyncState, error) { + rows := p.pool.QueryRow(p.ctx, ` SELECT last_offset - FROM `+metadataSchema+`.`+lastSyncStateTableName+` + FROM `+p.schemaName+`.`+lastSyncStateTableName+` WHERE job_name = $1 `, jobName) - var offset int64 err := rows.Scan(&offset) if err != nil { @@ -138,12 +134,10 @@ func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState }, nil } -func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) { - ms := c.pgMetadata - - rows := ms.pool.QueryRow(c.ctx, ` +func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { + rows := p.pool.QueryRow(p.ctx, ` SELECT sync_batch_id - FROM `+metadataSchema+`.`+lastSyncStateTableName+` + FROM `+p.schemaName+`.`+lastSyncStateTableName+` WHERE job_name = $1 `, jobName) @@ -167,11 +161,9 @@ func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) { } // update offset for a job -func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error { - ms := c.pgMetadata - +func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) error { // start a transaction - tx, err := ms.pool.Begin(c.ctx) + tx, err := p.pool.Begin(p.ctx) if err != nil { log.Errorf("failed to start transaction: %v", err) return err @@ -181,8 +173,8 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error log.WithFields(log.Fields{ "flowName": jobName, }).Infof("updating last offset for job `%s` to `%d`", jobName, offset) - _, err = tx.Exec(c.ctx, ` - INSERT INTO `+metadataSchema+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) + _, err = tx.Exec(p.ctx, ` + INSERT INTO `+p.schemaName+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) VALUES ($1, $2, $3) ON CONFLICT (job_name) DO UPDATE SET last_offset = $2, updated_at = NOW() @@ -196,7 +188,7 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error } // commit the transaction - err = tx.Commit(c.ctx) + err = tx.Commit(p.ctx) if err != nil { log.Errorf("failed to commit transaction: %v", err) return err @@ -206,14 +198,12 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error } // update offset for a job -func (c *EventHubConnector) incrementSyncBatchID(jobName string) error { - ms := c.pgMetadata - +func (p *PostgresMetadataStore) IncrementID(jobName string) error { log.WithFields(log.Fields{ "flowName": jobName, }).Infof("incrementing sync batch id for job `%s`", jobName) - _, err := ms.pool.Exec(c.ctx, ` - UPDATE `+metadataSchema+`.`+lastSyncStateTableName+` + _, err := p.pool.Exec(p.ctx, ` + UPDATE `+p.schemaName+`.`+lastSyncStateTableName+` SET sync_batch_id=sync_batch_id+1 WHERE job_name=$1 `, jobName) @@ -227,9 +217,9 @@ func (c *EventHubConnector) incrementSyncBatchID(jobName string) error { return nil } -func (c *EventHubConnector) SyncFlowCleanup(jobName string) error { - _, err := c.pgMetadata.pool.Exec(c.ctx, ` - DELETE FROM `+metadataSchema+`.`+lastSyncStateTableName+` +func (p *PostgresMetadataStore) DropMetadata(jobName string) error { + _, err := p.pool.Exec(p.ctx, ` + DELETE FROM `+p.schemaName+`.`+lastSyncStateTableName+` WHERE job_name = $1 `, jobName) return err diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 9ef8fb528..51e163f13 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -3,41 +3,46 @@ package conns3 import ( "context" "fmt" + "time" + metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/connectors/utils/metrics" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" "github.com/aws/aws-sdk-go/service/s3" log "github.com/sirupsen/logrus" ) type S3Connector struct { - ctx context.Context - url string - client s3.S3 - creds utils.S3PeerCredentials + ctx context.Context + url string + pgMetadata *metadataStore.PostgresMetadataStore + client s3.S3 + creds utils.S3PeerCredentials } func NewS3Connector(ctx context.Context, - s3ProtoConfig *protos.S3Config) (*S3Connector, error) { + config *protos.S3Config) (*S3Connector, error) { keyID := "" - if s3ProtoConfig.AccessKeyId != nil { - keyID = *s3ProtoConfig.AccessKeyId + if config.AccessKeyId != nil { + keyID = *config.AccessKeyId } secretKey := "" - if s3ProtoConfig.SecretAccessKey != nil { - secretKey = *s3ProtoConfig.SecretAccessKey + if config.SecretAccessKey != nil { + secretKey = *config.SecretAccessKey } roleArn := "" - if s3ProtoConfig.RoleArn != nil { - roleArn = *s3ProtoConfig.RoleArn + if config.RoleArn != nil { + roleArn = *config.RoleArn } region := "" - if s3ProtoConfig.Region != nil { - region = *s3ProtoConfig.Region + if config.Region != nil { + region = *config.Region } endpoint := "" - if s3ProtoConfig.Endpoint != nil { - endpoint = *s3ProtoConfig.Endpoint + if config.Endpoint != nil { + endpoint = *config.Endpoint } s3PeerCreds := utils.S3PeerCredentials{ AccessKeyID: keyID, @@ -50,14 +55,33 @@ func NewS3Connector(ctx context.Context, if err != nil { return nil, fmt.Errorf("failed to create S3 client: %w", err) } + metadataSchemaName := "peerdb_s3_metadata" // #nosec G101 + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, + config.GetMetadataDb(), metadataSchemaName) + if err != nil { + log.Errorf("failed to create postgres metadata store: %v", err) + return nil, err + } + return &S3Connector{ - ctx: ctx, - url: s3ProtoConfig.Url, - client: *s3Client, - creds: s3PeerCreds, + ctx: ctx, + url: config.Url, + pgMetadata: pgMetadata, + client: *s3Client, + creds: s3PeerCreds, }, nil } +func (c *S3Connector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { + log.Infof("CreateRawTable for S3 is a no-op") + return nil, nil +} + +func (c *S3Connector) InitializeTableSchema(req map[string]*protos.TableSchema) error { + log.Infof("InitializeTableSchema for S3 is a no-op") + return nil +} + func (c *S3Connector) Close() error { log.Debugf("Closing s3 connector is a noop") return nil @@ -67,3 +91,122 @@ func (c *S3Connector) ConnectionActive() bool { _, err := c.client.ListBuckets(nil) return err == nil } + +func (c *S3Connector) NeedsSetupMetadataTables() bool { + return c.pgMetadata.NeedsSetupMetadata() +} + +func (c *S3Connector) SetupMetadataTables() error { + err := c.pgMetadata.SetupMetadata() + if err != nil { + log.Errorf("failed to setup metadata tables: %v", err) + return err + } + + return nil +} + +func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) { + syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName) + if err != nil { + return 0, err + } + + return syncBatchID, nil +} + +func (c *S3Connector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { + res, err := c.pgMetadata.FetchLastOffset(jobName) + if err != nil { + return nil, err + } + + return res, nil +} + +// update offset for a job +func (c *S3Connector) updateLastOffset(jobName string, offset int64) error { + err := c.pgMetadata.UpdateLastOffset(jobName, offset) + if err != nil { + log.Errorf("failed to update last offset: %v", err) + return err + } + + return nil +} + +func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { + if len(req.Records.Records) == 0 { + return &model.SyncResponse{ + FirstSyncedCheckPointID: 0, + LastSyncedCheckPointID: 0, + NumRecordsSynced: 0, + }, nil + } + + syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName) + if err != nil { + return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err) + } + syncBatchID = syncBatchID + 1 + lastCP := req.Records.LastCheckPointID + + tableNameRowsMapping := make(map[string]uint32) + streamRes, err := utils.RecordsToRawTableStream(model.RecordsToStreamRequest{ + Records: req.Records.Records, + TableMapping: tableNameRowsMapping, + CP: 0, + BatchID: syncBatchID, + }) + if err != nil { + return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err) + } + firstCP := streamRes.CP + recordStream := streamRes.Stream + qrepConfig := &protos.QRepConfig{ + FlowJobName: req.FlowJobName, + DestinationTableIdentifier: fmt.Sprintf("raw_table_%s", req.FlowJobName), + } + partition := &protos.QRepPartition{ + PartitionId: fmt.Sprint(syncBatchID), + } + startTime := time.Now() + close(recordStream.Records) + numRecords, err := c.SyncQRepRecords(qrepConfig, partition, recordStream) + if err != nil { + return nil, err + } + + err = c.updateLastOffset(req.FlowJobName, lastCP) + if err != nil { + log.Errorf("failed to update last offset for s3 cdc: %v", err) + return nil, err + } + err = c.pgMetadata.IncrementID(req.FlowJobName) + if err != nil { + log.Errorf("%v", err) + return nil, err + } + metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(numRecords), time.Since(startTime)) + return &model.SyncResponse{ + FirstSyncedCheckPointID: firstCP, + LastSyncedCheckPointID: lastCP, + NumRecordsSynced: int64(numRecords), + TableNameRowsMapping: tableNameRowsMapping, + }, nil +} + +func (c *S3Connector) SetupNormalizedTables(req *protos.SetupNormalizedTableBatchInput) ( + *protos.SetupNormalizedTableBatchOutput, + error) { + log.Infof("SetupNormalizedTables for S3 is a no-op") + return nil, nil +} + +func (c *S3Connector) SyncFlowCleanup(jobName string) error { + err := c.pgMetadata.DropMetadata(jobName) + if err != nil { + return err + } + return nil +} diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 4335ee85e..7046746ec 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -645,183 +645,20 @@ func (c *SnowflakeConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, ra func (c *SnowflakeConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest, rawTableIdentifier string, syncBatchID int64) (*model.SyncResponse, error) { - recordStream := model.NewQRecordStream(len(req.Records.Records)) - - err := recordStream.SetSchema(&model.QRecordSchema{ - Fields: []*model.QField{ - { - Name: "_peerdb_uid", - Type: qvalue.QValueKindString, - Nullable: false, - }, - { - Name: "_peerdb_timestamp", - Type: qvalue.QValueKindInt64, - Nullable: false, - }, - { - Name: "_peerdb_destination_table_name", - Type: qvalue.QValueKindString, - Nullable: false, - }, - { - Name: "_peerdb_data", - Type: qvalue.QValueKindString, - Nullable: false, - }, - { - Name: "_peerdb_record_type", - Type: qvalue.QValueKindInt64, - Nullable: true, - }, - { - Name: "_peerdb_match_data", - Type: qvalue.QValueKindString, - Nullable: true, - }, - { - Name: "_peerdb_batch_id", - Type: qvalue.QValueKindInt64, - Nullable: true, - }, - { - Name: "_peerdb_unchanged_toast_columns", - Type: qvalue.QValueKindString, - Nullable: true, - }, - }, - }) - if err != nil { - return nil, err - } - first := true - var firstCP int64 = 0 lastCP := req.Records.LastCheckPointID tableNameRowsMapping := make(map[string]uint32) - - for _, record := range req.Records.Records { - var entries [8]qvalue.QValue - switch typedRecord := record.(type) { - case *model.InsertRecord: - // json.Marshal converts bytes in Hex automatically to BASE64 string. - itemsJSON, err := typedRecord.Items.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err) - } - - // add insert record to the raw table - entries[2] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: typedRecord.DestinationTableName, - } - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: itemsJSON, - } - entries[4] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: 0, - } - entries[5] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: "", - } - entries[7] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: utils.KeysToString(typedRecord.UnchangedToastColumns), - } - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - case *model.UpdateRecord: - newItemsJSON, err := typedRecord.NewItems.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err) - } - oldItemsJSON, err := typedRecord.OldItems.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err) - } - - // add update record to the raw table - entries[2] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: typedRecord.DestinationTableName, - } - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: newItemsJSON, - } - entries[4] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: 1, - } - entries[5] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: oldItemsJSON, - } - entries[7] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: utils.KeysToString(typedRecord.UnchangedToastColumns), - } - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - case *model.DeleteRecord: - itemsJSON, err := typedRecord.Items.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err) - } - - // append delete record to the raw table - entries[2] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: typedRecord.DestinationTableName, - } - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: itemsJSON, - } - entries[4] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: 2, - } - entries[5] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: itemsJSON, - } - entries[7] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: utils.KeysToString(typedRecord.UnchangedToastColumns), - } - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - default: - return nil, fmt.Errorf("record type %T not supported in Snowflake flow connector", typedRecord) - } - - if first { - firstCP = record.GetCheckPointID() - first = false - } - - entries[0] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: uuid.New().String(), - } - entries[1] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: time.Now().UnixNano(), - } - entries[6] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: syncBatchID, - } - - recordStream.Records <- &model.QRecordOrError{ - Record: &model.QRecord{ - NumEntries: 8, - Entries: entries[:], - }, - } + streamRes, err := utils.RecordsToRawTableStream(model.RecordsToStreamRequest{ + Records: req.Records.Records, + TableMapping: tableNameRowsMapping, + CP: 0, + BatchID: syncBatchID, + }) + if err != nil { + return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err) } - + firstCP := streamRes.CP + recordStream := streamRes.Stream qrepConfig := &protos.QRepConfig{ StagingPath: "", FlowJobName: req.FlowJobName, @@ -841,7 +678,6 @@ func (c *SnowflakeConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest, r return nil, err } metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(numRecords), time.Since(startTime)) - return &model.SyncResponse{ FirstSyncedCheckPointID: firstCP, LastSyncedCheckPointID: lastCP, diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go new file mode 100644 index 000000000..422106ea5 --- /dev/null +++ b/flow/connectors/utils/stream.go @@ -0,0 +1,189 @@ +package utils + +import ( + "fmt" + "time" + + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/google/uuid" +) + +func RecordsToRawTableStream(req model.RecordsToStreamRequest) (*model.RecordsToStreamResponse, error) { + recordStream := model.NewQRecordStream(len(req.Records)) + err := recordStream.SetSchema(&model.QRecordSchema{ + Fields: []*model.QField{ + { + Name: "_peerdb_uid", + Type: qvalue.QValueKindString, + Nullable: false, + }, + { + Name: "_peerdb_timestamp", + Type: qvalue.QValueKindInt64, + Nullable: false, + }, + { + Name: "_peerdb_destination_table_name", + Type: qvalue.QValueKindString, + Nullable: false, + }, + { + Name: "_peerdb_data", + Type: qvalue.QValueKindString, + Nullable: false, + }, + { + Name: "_peerdb_record_type", + Type: qvalue.QValueKindInt64, + Nullable: true, + }, + { + Name: "_peerdb_match_data", + Type: qvalue.QValueKindString, + Nullable: true, + }, + { + Name: "_peerdb_batch_id", + Type: qvalue.QValueKindInt64, + Nullable: true, + }, + { + Name: "_peerdb_unchanged_toast_columns", + Type: qvalue.QValueKindString, + Nullable: true, + }, + }, + }) + if err != nil { + return nil, err + } + + first := true + firstCP := req.CP + for _, record := range req.Records { + var entries [8]qvalue.QValue + switch typedRecord := record.(type) { + case *model.InsertRecord: + // json.Marshal converts bytes in Hex automatically to BASE64 string. + itemsJSON, err := typedRecord.Items.ToJSON() + if err != nil { + return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err) + } + + // add insert record to the raw table + entries[2] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: typedRecord.DestinationTableName, + } + entries[3] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: itemsJSON, + } + entries[4] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: 0, + } + entries[5] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: "", + } + entries[7] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: KeysToString(typedRecord.UnchangedToastColumns), + } + req.TableMapping[typedRecord.DestinationTableName] += 1 + case *model.UpdateRecord: + newItemsJSON, err := typedRecord.NewItems.ToJSON() + if err != nil { + return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err) + } + oldItemsJSON, err := typedRecord.OldItems.ToJSON() + if err != nil { + return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err) + } + + entries[2] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: typedRecord.DestinationTableName, + } + entries[3] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: newItemsJSON, + } + entries[4] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: 1, + } + entries[5] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: oldItemsJSON, + } + entries[7] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: KeysToString(typedRecord.UnchangedToastColumns), + } + req.TableMapping[typedRecord.DestinationTableName] += 1 + case *model.DeleteRecord: + itemsJSON, err := typedRecord.Items.ToJSON() + if err != nil { + return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err) + } + + // append delete record to the raw table + entries[2] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: typedRecord.DestinationTableName, + } + entries[3] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: itemsJSON, + } + entries[4] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: 2, + } + entries[5] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: itemsJSON, + } + entries[7] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: KeysToString(typedRecord.UnchangedToastColumns), + } + req.TableMapping[typedRecord.DestinationTableName] += 1 + default: + return nil, fmt.Errorf("record type %T not supported in Snowflake flow connector", typedRecord) + } + + if first { + firstCP = record.GetCheckPointID() + first = false + } + + entries[0] = qvalue.QValue{ + Kind: qvalue.QValueKindString, + Value: uuid.New().String(), + } + entries[1] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: time.Now().UnixNano(), + } + entries[6] = qvalue.QValue{ + Kind: qvalue.QValueKindInt64, + Value: req.BatchID, + } + + recordStream.Records <- &model.QRecordOrError{ + Record: &model.QRecord{ + NumEntries: 8, + Entries: entries[:], + }, + } + } + + return &model.RecordsToStreamResponse{ + Stream: recordStream, + CP: firstCP, + }, nil +} diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index 80903eb55..02f5b2194 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -34,6 +34,12 @@ func cleanPostgres(pool *pgxpool.Pool, suffix string) error { return fmt.Errorf("failed to drop e2e_test schema: %w", err) } + // drop the S3 metadata database if it exists + _, err = pool.Exec(context.Background(), "DROP SCHEMA IF EXISTS peerdb_s3_metadata CASCADE") + if err != nil { + return fmt.Errorf("failed to drop metadata schema: %w", err) + } + // drop all open slots with the given suffix _, err = pool.Exec( context.Background(), diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go new file mode 100644 index 000000000..aaca2a125 --- /dev/null +++ b/flow/e2e/s3/cdc_s3_test.go @@ -0,0 +1,157 @@ +package e2e_s3 + +import ( + "context" + "fmt" + "time" + + "github.com/PeerDB-io/peer-flow/e2e" + peerflow "github.com/PeerDB-io/peer-flow/workflows" + "github.com/stretchr/testify/require" +) + +func (s *PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string { + return fmt.Sprintf("e2e_test_%s.%s", s3Suffix, tableName) +} + +func (s *PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { + return fmt.Sprintf("%s_%s", input, s3Suffix) +} + +func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env) + + srcTableName := s.attachSchemaSuffix("test_simple_flow_s3") + dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_s3", "test_simple_flow_s3") + flowJobName := s.attachSuffix("test_simple_flow") + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %s ( + id SERIAL PRIMARY KEY, + key TEXT NOT NULL, + value TEXT NOT NULL + ); + `, srcTableName)) + s.NoError(err) + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: flowJobName, + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.s3Helper.GetPeer(), + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + s.NoError(err) + + limits := peerflow.CDCFlowLimits{ + TotalSyncFlows: 5, + MaxBatchSize: 5, + } + + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + s.NoError(err) + //insert 20 rows + for i := 1; i <= 20; i++ { + testKey := fmt.Sprintf("test_key_%d", i) + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s (key, value) VALUES ($1, $2) + `, srcTableName), testKey, testValue) + s.NoError(err) + } + s.NoError(err) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + + // allow only continue as new error + s.Error(err) + s.Contains(err.Error(), "continue as new") + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + fmt.Println("JobName: ", flowJobName) + files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) + fmt.Println("Files in Test_Complete_Simple_Flow_S3: ", len(files)) + require.NoError(s.T(), err) + + require.Equal(s.T(), 4, len(files)) + + env.AssertExpectations(s.T()) +} + +func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env) + setupErr := s.setupS3("gcs") + if setupErr != nil { + s.Fail("failed to setup S3", setupErr) + } + + srcTableName := s.attachSchemaSuffix("test_simple_flow_gcs_interop") + dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_gcs_interop", "test_simple_flow_gcs_interop") + flowJobName := s.attachSuffix("test_simple_flow") + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %s ( + id SERIAL PRIMARY KEY, + key TEXT NOT NULL, + value TEXT NOT NULL + ); + `, srcTableName)) + s.NoError(err) + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: flowJobName, + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.s3Helper.GetPeer(), + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + s.NoError(err) + + limits := peerflow.CDCFlowLimits{ + TotalSyncFlows: 5, + MaxBatchSize: 5, + } + + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + s.NoError(err) + //insert 20 rows + for i := 1; i <= 20; i++ { + testKey := fmt.Sprintf("test_key_%d", i) + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s (key, value) VALUES ($1, $2) + `, srcTableName), testKey, testValue) + s.NoError(err) + } + s.NoError(err) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + + // allow only continue as new error + s.Error(err) + s.Contains(err.Error(), "continue as new") + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + fmt.Println("JobName: ", flowJobName) + files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) + fmt.Println("Files in Test_Complete_Simple_Flow_GCS: ", len(files)) + require.NoError(s.T(), err) + + require.Equal(s.T(), 4, len(files)) + + env.AssertExpectations(s.T()) +} diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index e47d23592..2fca18a70 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -37,8 +37,12 @@ func (s *PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int s.NoError(err) } -func (s *PeerFlowE2ETestSuiteS3) setupS3() error { - helper, err := NewS3TestHelper() +func (s *PeerFlowE2ETestSuiteS3) setupS3(mode string) error { + switchToGCS := false + if mode == "gcs" { + switchToGCS = true + } + helper, err := NewS3TestHelper(switchToGCS) if err != nil { return err } @@ -63,7 +67,7 @@ func (s *PeerFlowE2ETestSuiteS3) SetupSuite() { } s.pool = pool - err = s.setupS3() + err = s.setupS3("s3") if err != nil { s.Fail("failed to setup S3", err) } diff --git a/flow/e2e/s3/s3_helper.go b/flow/e2e/s3/s3_helper.go index b02752b02..5d40d9a0e 100644 --- a/flow/e2e/s3/s3_helper.go +++ b/flow/e2e/s3/s3_helper.go @@ -2,9 +2,12 @@ package e2e_s3 import ( "context" + "encoding/json" "fmt" + "os" "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" @@ -13,26 +16,58 @@ import ( const ( peerName string = "test_s3_peer" - bucketName string = "peerdb-test-bucket" prefixName string = "test-s3" ) type S3TestHelper struct { - client *s3.S3 - s3Config *protos.S3Config + client *s3.S3 + s3Config *protos.S3Config + bucketName string } -func NewS3TestHelper() (*S3TestHelper, error) { - client, err := utils.CreateS3Client(utils.S3PeerCredentials{}) +func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) { + credsPath := os.Getenv("TEST_S3_CREDS") + bucketName := "peerdb-test-bucket" + if switchToGCS { + credsPath = os.Getenv("TEST_GCS_CREDS") + bucketName = "peerdb_staging" + } + + content, err := e2e.ReadFileToBytes(credsPath) + if err != nil { + return nil, fmt.Errorf("failed to read file: %w", err) + } + + var config utils.S3PeerCredentials + err = json.Unmarshal(content, &config) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal json: %w", err) + } + endpoint := "" + if switchToGCS { + endpoint = "https://storage.googleapis.com" + } + client, err := utils.CreateS3Client(config) if err != nil { return nil, err } - log.Infof("S3 client obtained") return &S3TestHelper{ client, &protos.S3Config{ - Url: fmt.Sprintf("s3://%s/%s", bucketName, prefixName), + Url: fmt.Sprintf("s3://%s/%s", bucketName, prefixName), + AccessKeyId: &config.AccessKeyID, + SecretAccessKey: &config.SecretAccessKey, + Region: &config.Region, + Endpoint: &endpoint, + MetadataDb: &protos.PostgresConfig{ + Host: "localhost", + Port: 7132, + Password: "postgres", + User: "postgres", + Database: "postgres", + }, }, + bucketName, }, nil } @@ -52,7 +87,8 @@ func (h *S3TestHelper) ListAllFiles( ctx context.Context, jobName string, ) ([]*s3.Object, error) { - Bucket := bucketName + + Bucket := h.bucketName Prefix := fmt.Sprintf("%s/%s/", prefixName, jobName) files, err := h.client.ListObjects(&s3.ListObjectsInput{ Bucket: &Bucket, @@ -68,7 +104,7 @@ func (h *S3TestHelper) ListAllFiles( // Delete all generated objects during the test func (h *S3TestHelper) CleanUp() error { - Bucket := bucketName + Bucket := h.bucketName Prefix := prefixName files, err := h.client.ListObjects(&s3.ListObjectsInput{ Bucket: &Bucket, diff --git a/flow/generated/protos/peers.pb.go b/flow/generated/protos/peers.pb.go index 2a8377ea7..7f8bb4a37 100644 --- a/flow/generated/protos/peers.pb.go +++ b/flow/generated/protos/peers.pb.go @@ -656,12 +656,13 @@ type S3Config struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Url string `protobuf:"bytes,1,opt,name=url,proto3" json:"url,omitempty"` - AccessKeyId *string `protobuf:"bytes,2,opt,name=access_key_id,json=accessKeyId,proto3,oneof" json:"access_key_id,omitempty"` - SecretAccessKey *string `protobuf:"bytes,3,opt,name=secret_access_key,json=secretAccessKey,proto3,oneof" json:"secret_access_key,omitempty"` - RoleArn *string `protobuf:"bytes,4,opt,name=role_arn,json=roleArn,proto3,oneof" json:"role_arn,omitempty"` - Region *string `protobuf:"bytes,5,opt,name=region,proto3,oneof" json:"region,omitempty"` - Endpoint *string `protobuf:"bytes,6,opt,name=endpoint,proto3,oneof" json:"endpoint,omitempty"` + Url string `protobuf:"bytes,1,opt,name=url,proto3" json:"url,omitempty"` + AccessKeyId *string `protobuf:"bytes,2,opt,name=access_key_id,json=accessKeyId,proto3,oneof" json:"access_key_id,omitempty"` + SecretAccessKey *string `protobuf:"bytes,3,opt,name=secret_access_key,json=secretAccessKey,proto3,oneof" json:"secret_access_key,omitempty"` + RoleArn *string `protobuf:"bytes,4,opt,name=role_arn,json=roleArn,proto3,oneof" json:"role_arn,omitempty"` + Region *string `protobuf:"bytes,5,opt,name=region,proto3,oneof" json:"region,omitempty"` + Endpoint *string `protobuf:"bytes,6,opt,name=endpoint,proto3,oneof" json:"endpoint,omitempty"` + MetadataDb *PostgresConfig `protobuf:"bytes,7,opt,name=metadata_db,json=metadataDb,proto3" json:"metadata_db,omitempty"` } func (x *S3Config) Reset() { @@ -738,6 +739,13 @@ func (x *S3Config) GetEndpoint() string { return "" } +func (x *S3Config) GetMetadataDb() *PostgresConfig { + if x != nil { + return x.MetadataDb + } + return nil +} + type SqlServerConfig struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1108,7 +1116,7 @@ var file_peers_proto_rawDesc = []byte{ 0x79, 0x12, 0x32, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, 0x62, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x05, - 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xa1, 0x02, 0x0a, 0x08, 0x53, 0x33, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xe0, 0x02, 0x0a, 0x08, 0x53, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x12, 0x27, 0x0a, 0x0d, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, @@ -1122,77 +1130,81 @@ var file_peers_proto_rawDesc = []byte{ 0x28, 0x09, 0x48, 0x03, 0x52, 0x06, 0x72, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x88, 0x01, 0x01, 0x12, 0x1f, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x48, 0x04, 0x52, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x88, 0x01, 0x01, - 0x42, 0x10, 0x0a, 0x0e, 0x5f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x5f, 0x6b, 0x65, 0x79, 0x5f, - 0x69, 0x64, 0x42, 0x14, 0x0a, 0x12, 0x5f, 0x73, 0x65, 0x63, 0x72, 0x65, 0x74, 0x5f, 0x61, 0x63, - 0x63, 0x65, 0x73, 0x73, 0x5f, 0x6b, 0x65, 0x79, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x72, 0x6f, 0x6c, - 0x65, 0x5f, 0x61, 0x72, 0x6e, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x72, 0x65, 0x67, 0x69, 0x6f, 0x6e, - 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x22, 0x89, 0x01, - 0x0a, 0x0f, 0x53, 0x71, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x06, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, - 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x12, 0x0a, - 0x04, 0x75, 0x73, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x75, 0x73, 0x65, - 0x72, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x18, 0x04, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x12, 0x1a, 0x0a, - 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x08, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x22, 0x91, 0x05, 0x0a, 0x04, 0x50, 0x65, - 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x28, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, - 0x65, 0x72, 0x73, 0x2e, 0x44, 0x42, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, - 0x12, 0x4a, 0x0a, 0x10, 0x73, 0x6e, 0x6f, 0x77, 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x5f, 0x63, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x6e, 0x6f, 0x77, 0x66, 0x6c, - 0x61, 0x6b, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0f, 0x73, 0x6e, 0x6f, - 0x77, 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, 0x0f, - 0x62, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, - 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, - 0x65, 0x65, 0x72, 0x73, 0x2e, 0x42, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x43, 0x6f, 0x6e, - 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x62, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x43, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3e, 0x0a, 0x0c, 0x6d, 0x6f, 0x6e, 0x67, 0x6f, 0x5f, 0x63, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x70, 0x65, - 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x4d, 0x6f, 0x6e, 0x67, 0x6f, - 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0b, 0x6d, 0x6f, 0x6e, 0x67, 0x6f, 0x43, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, 0x0f, 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, - 0x73, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, - 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, 0x6f, - 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, - 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, - 0x0a, 0x0f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, - 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, 0x62, 0x43, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, - 0x62, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x35, 0x0a, 0x09, 0x73, 0x33, 0x5f, 0x63, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x70, 0x65, 0x65, - 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x33, 0x43, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x48, 0x00, 0x52, 0x08, 0x73, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x4a, - 0x0a, 0x10, 0x73, 0x71, 0x6c, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, - 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x71, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x65, - 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0f, 0x73, 0x71, 0x6c, 0x73, 0x65, - 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x57, 0x0a, 0x15, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x5f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x63, 0x6f, 0x6e, - 0x66, 0x69, 0x67, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, - 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, - 0x62, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x13, - 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x6f, 0x6e, - 0x66, 0x69, 0x67, 0x42, 0x08, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2a, 0x77, 0x0a, - 0x06, 0x44, 0x42, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x42, 0x49, 0x47, 0x51, 0x55, - 0x45, 0x52, 0x59, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x4e, 0x4f, 0x57, 0x46, 0x4c, 0x41, - 0x4b, 0x45, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x4d, 0x4f, 0x4e, 0x47, 0x4f, 0x10, 0x02, 0x12, - 0x0c, 0x0a, 0x08, 0x50, 0x4f, 0x53, 0x54, 0x47, 0x52, 0x45, 0x53, 0x10, 0x03, 0x12, 0x0c, 0x0a, - 0x08, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x48, 0x55, 0x42, 0x10, 0x04, 0x12, 0x06, 0x0a, 0x02, 0x53, - 0x33, 0x10, 0x05, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x51, 0x4c, 0x53, 0x45, 0x52, 0x56, 0x45, 0x52, - 0x10, 0x06, 0x12, 0x12, 0x0a, 0x0e, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x48, 0x55, 0x42, 0x5f, 0x47, - 0x52, 0x4f, 0x55, 0x50, 0x10, 0x07, 0x42, 0x7c, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, - 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x42, 0x0a, 0x50, 0x65, 0x65, 0x72, - 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, - 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, - 0xaa, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0xca, 0x02, - 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0xe2, 0x02, 0x17, 0x50, - 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, - 0x65, 0x65, 0x72, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x3d, 0x0a, 0x0b, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x64, 0x62, 0x18, + 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, + 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x52, 0x0a, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x44, 0x62, 0x42, + 0x10, 0x0a, 0x0e, 0x5f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x69, + 0x64, 0x42, 0x14, 0x0a, 0x12, 0x5f, 0x73, 0x65, 0x63, 0x72, 0x65, 0x74, 0x5f, 0x61, 0x63, 0x63, + 0x65, 0x73, 0x73, 0x5f, 0x6b, 0x65, 0x79, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x72, 0x6f, 0x6c, 0x65, + 0x5f, 0x61, 0x72, 0x6e, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x72, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x42, + 0x0b, 0x0a, 0x09, 0x5f, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x22, 0x89, 0x01, 0x0a, + 0x0f, 0x53, 0x71, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x12, 0x16, 0x0a, 0x06, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x06, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x12, 0x0a, 0x04, + 0x75, 0x73, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x75, 0x73, 0x65, 0x72, + 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x12, 0x1a, 0x0a, 0x08, + 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, + 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x22, 0x91, 0x05, 0x0a, 0x04, 0x50, 0x65, 0x65, + 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x28, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, + 0x72, 0x73, 0x2e, 0x44, 0x42, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, + 0x4a, 0x0a, 0x10, 0x73, 0x6e, 0x6f, 0x77, 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x5f, 0x63, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x6e, 0x6f, 0x77, 0x66, 0x6c, 0x61, + 0x6b, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0f, 0x73, 0x6e, 0x6f, 0x77, + 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, 0x0f, 0x62, + 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, + 0x65, 0x72, 0x73, 0x2e, 0x42, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x62, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3e, 0x0a, 0x0c, 0x6d, 0x6f, 0x6e, 0x67, 0x6f, 0x5f, 0x63, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x70, 0x65, 0x65, + 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x4d, 0x6f, 0x6e, 0x67, 0x6f, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0b, 0x6d, 0x6f, 0x6e, 0x67, 0x6f, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, 0x0f, 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, + 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, + 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, 0x6f, 0x73, + 0x74, 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x70, + 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x47, 0x0a, + 0x0f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, + 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, 0x62, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, + 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x35, 0x0a, 0x09, 0x73, 0x33, 0x5f, 0x63, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x70, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x48, 0x00, 0x52, 0x08, 0x73, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x4a, 0x0a, + 0x10, 0x73, 0x71, 0x6c, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, + 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x53, 0x71, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, + 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x0f, 0x73, 0x71, 0x6c, 0x73, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x57, 0x0a, 0x15, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x68, 0x75, 0x62, 0x5f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x63, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x75, 0x62, + 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x13, 0x65, + 0x76, 0x65, 0x6e, 0x74, 0x68, 0x75, 0x62, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x42, 0x08, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2a, 0x77, 0x0a, 0x06, + 0x44, 0x42, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x42, 0x49, 0x47, 0x51, 0x55, 0x45, + 0x52, 0x59, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x4e, 0x4f, 0x57, 0x46, 0x4c, 0x41, 0x4b, + 0x45, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x4d, 0x4f, 0x4e, 0x47, 0x4f, 0x10, 0x02, 0x12, 0x0c, + 0x0a, 0x08, 0x50, 0x4f, 0x53, 0x54, 0x47, 0x52, 0x45, 0x53, 0x10, 0x03, 0x12, 0x0c, 0x0a, 0x08, + 0x45, 0x56, 0x45, 0x4e, 0x54, 0x48, 0x55, 0x42, 0x10, 0x04, 0x12, 0x06, 0x0a, 0x02, 0x53, 0x33, + 0x10, 0x05, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x51, 0x4c, 0x53, 0x45, 0x52, 0x56, 0x45, 0x52, 0x10, + 0x06, 0x12, 0x12, 0x0a, 0x0e, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x48, 0x55, 0x42, 0x5f, 0x47, 0x52, + 0x4f, 0x55, 0x50, 0x10, 0x07, 0x42, 0x7c, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, + 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x42, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x73, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, + 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, + 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0xca, 0x02, 0x0b, + 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0xe2, 0x02, 0x17, 0x50, 0x65, + 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, 0x65, 0x72, 0x73, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x50, 0x65, + 0x65, 0x72, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1226,21 +1238,22 @@ var file_peers_proto_depIdxs = []int32{ 4, // 0: peerdb_peers.EventHubConfig.metadata_db:type_name -> peerdb_peers.PostgresConfig 10, // 1: peerdb_peers.EventHubGroupConfig.eventhubs:type_name -> peerdb_peers.EventHubGroupConfig.EventhubsEntry 4, // 2: peerdb_peers.EventHubGroupConfig.metadata_db:type_name -> peerdb_peers.PostgresConfig - 0, // 3: peerdb_peers.Peer.type:type_name -> peerdb_peers.DBType - 1, // 4: peerdb_peers.Peer.snowflake_config:type_name -> peerdb_peers.SnowflakeConfig - 2, // 5: peerdb_peers.Peer.bigquery_config:type_name -> peerdb_peers.BigqueryConfig - 3, // 6: peerdb_peers.Peer.mongo_config:type_name -> peerdb_peers.MongoConfig - 4, // 7: peerdb_peers.Peer.postgres_config:type_name -> peerdb_peers.PostgresConfig - 5, // 8: peerdb_peers.Peer.eventhub_config:type_name -> peerdb_peers.EventHubConfig - 7, // 9: peerdb_peers.Peer.s3_config:type_name -> peerdb_peers.S3Config - 8, // 10: peerdb_peers.Peer.sqlserver_config:type_name -> peerdb_peers.SqlServerConfig - 6, // 11: peerdb_peers.Peer.eventhub_group_config:type_name -> peerdb_peers.EventHubGroupConfig - 5, // 12: peerdb_peers.EventHubGroupConfig.EventhubsEntry.value:type_name -> peerdb_peers.EventHubConfig - 13, // [13:13] is the sub-list for method output_type - 13, // [13:13] is the sub-list for method input_type - 13, // [13:13] is the sub-list for extension type_name - 13, // [13:13] is the sub-list for extension extendee - 0, // [0:13] is the sub-list for field type_name + 4, // 3: peerdb_peers.S3Config.metadata_db:type_name -> peerdb_peers.PostgresConfig + 0, // 4: peerdb_peers.Peer.type:type_name -> peerdb_peers.DBType + 1, // 5: peerdb_peers.Peer.snowflake_config:type_name -> peerdb_peers.SnowflakeConfig + 2, // 6: peerdb_peers.Peer.bigquery_config:type_name -> peerdb_peers.BigqueryConfig + 3, // 7: peerdb_peers.Peer.mongo_config:type_name -> peerdb_peers.MongoConfig + 4, // 8: peerdb_peers.Peer.postgres_config:type_name -> peerdb_peers.PostgresConfig + 5, // 9: peerdb_peers.Peer.eventhub_config:type_name -> peerdb_peers.EventHubConfig + 7, // 10: peerdb_peers.Peer.s3_config:type_name -> peerdb_peers.S3Config + 8, // 11: peerdb_peers.Peer.sqlserver_config:type_name -> peerdb_peers.SqlServerConfig + 6, // 12: peerdb_peers.Peer.eventhub_group_config:type_name -> peerdb_peers.EventHubGroupConfig + 5, // 13: peerdb_peers.EventHubGroupConfig.EventhubsEntry.value:type_name -> peerdb_peers.EventHubConfig + 14, // [14:14] is the sub-list for method output_type + 14, // [14:14] is the sub-list for method input_type + 14, // [14:14] is the sub-list for extension type_name + 14, // [14:14] is the sub-list for extension extendee + 0, // [0:14] is the sub-list for field type_name } func init() { file_peers_proto_init() } diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index 1fb22826e..a338ace4f 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -19,6 +19,18 @@ type QRecordStream struct { schemaCache *QRecordSchema } +type RecordsToStreamRequest struct { + Records []Record + TableMapping map[string]uint32 + CP int64 + BatchID int64 +} + +type RecordsToStreamResponse struct { + Stream *QRecordStream + CP int64 +} + func NewQRecordStream(buffer int) *QRecordStream { return &QRecordStream{ schema: make(chan *QRecordSchemaOrError, 1), diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 4da17cd4d..6a727d19f 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -619,6 +619,11 @@ fn parse_db_options( Some(config) } DbType::S3 => { + let s3_conn_str: String = opts + .get("metadata_db") + .map(|s| s.to_string()) + .unwrap_or_default(); + let metadata_db = parse_metadata_db_info(&s3_conn_str)?; let s3_config = S3Config { url: opts .get("url") @@ -629,6 +634,7 @@ fn parse_db_options( region: opts.get("region").map(|s| s.to_string()), role_arn: opts.get("role_arn").map(|s| s.to_string()), endpoint: opts.get("endpoint").map(|s| s.to_string()), + metadata_db, }; let config = Config::S3Config(s3_config); Some(config) diff --git a/nexus/pt/src/peerdb_peers.rs b/nexus/pt/src/peerdb_peers.rs index 562c29d7b..f8a304b14 100644 --- a/nexus/pt/src/peerdb_peers.rs +++ b/nexus/pt/src/peerdb_peers.rs @@ -125,6 +125,8 @@ pub struct S3Config { pub region: ::core::option::Option<::prost::alloc::string::String>, #[prost(string, optional, tag="6")] pub endpoint: ::core::option::Option<::prost::alloc::string::String>, + #[prost(message, optional, tag="7")] + pub metadata_db: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/nexus/pt/src/peerdb_peers.serde.rs b/nexus/pt/src/peerdb_peers.serde.rs index cde121a45..e03dc494e 100644 --- a/nexus/pt/src/peerdb_peers.serde.rs +++ b/nexus/pt/src/peerdb_peers.serde.rs @@ -1314,6 +1314,9 @@ impl serde::Serialize for S3Config { if self.endpoint.is_some() { len += 1; } + if self.metadata_db.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("peerdb_peers.S3Config", len)?; if !self.url.is_empty() { struct_ser.serialize_field("url", &self.url)?; @@ -1333,6 +1336,9 @@ impl serde::Serialize for S3Config { if let Some(v) = self.endpoint.as_ref() { struct_ser.serialize_field("endpoint", v)?; } + if let Some(v) = self.metadata_db.as_ref() { + struct_ser.serialize_field("metadataDb", v)?; + } struct_ser.end() } } @@ -1352,6 +1358,8 @@ impl<'de> serde::Deserialize<'de> for S3Config { "roleArn", "region", "endpoint", + "metadata_db", + "metadataDb", ]; #[allow(clippy::enum_variant_names)] @@ -1362,6 +1370,7 @@ impl<'de> serde::Deserialize<'de> for S3Config { RoleArn, Region, Endpoint, + MetadataDb, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -1390,6 +1399,7 @@ impl<'de> serde::Deserialize<'de> for S3Config { "roleArn" | "role_arn" => Ok(GeneratedField::RoleArn), "region" => Ok(GeneratedField::Region), "endpoint" => Ok(GeneratedField::Endpoint), + "metadataDb" | "metadata_db" => Ok(GeneratedField::MetadataDb), _ => Ok(GeneratedField::__SkipField__), } } @@ -1415,6 +1425,7 @@ impl<'de> serde::Deserialize<'de> for S3Config { let mut role_arn__ = None; let mut region__ = None; let mut endpoint__ = None; + let mut metadata_db__ = None; while let Some(k) = map.next_key()? { match k { GeneratedField::Url => { @@ -1453,6 +1464,12 @@ impl<'de> serde::Deserialize<'de> for S3Config { } endpoint__ = map.next_value()?; } + GeneratedField::MetadataDb => { + if metadata_db__.is_some() { + return Err(serde::de::Error::duplicate_field("metadataDb")); + } + metadata_db__ = map.next_value()?; + } GeneratedField::__SkipField__ => { let _ = map.next_value::()?; } @@ -1465,6 +1482,7 @@ impl<'de> serde::Deserialize<'de> for S3Config { role_arn: role_arn__, region: region__, endpoint: endpoint__, + metadata_db: metadata_db__, }) } } diff --git a/protos/peers.proto b/protos/peers.proto index b15d09dc4..162a63f3a 100644 --- a/protos/peers.proto +++ b/protos/peers.proto @@ -74,6 +74,7 @@ message S3Config { optional string role_arn = 4; optional string region = 5; optional string endpoint = 6; + PostgresConfig metadata_db = 7; } message SqlServerConfig { diff --git a/ui/grpc_generated/peers.ts b/ui/grpc_generated/peers.ts index a5acf3217..6d9bb4b30 100644 --- a/ui/grpc_generated/peers.ts +++ b/ui/grpc_generated/peers.ts @@ -151,6 +151,7 @@ export interface S3Config { roleArn?: string | undefined; region?: string | undefined; endpoint?: string | undefined; + metadataDb: PostgresConfig | undefined; } export interface SqlServerConfig { @@ -1197,6 +1198,7 @@ function createBaseS3Config(): S3Config { roleArn: undefined, region: undefined, endpoint: undefined, + metadataDb: undefined, }; } @@ -1220,6 +1222,9 @@ export const S3Config = { if (message.endpoint !== undefined) { writer.uint32(50).string(message.endpoint); } + if (message.metadataDb !== undefined) { + PostgresConfig.encode(message.metadataDb, writer.uint32(58).fork()).ldelim(); + } return writer; }, @@ -1272,6 +1277,13 @@ export const S3Config = { message.endpoint = reader.string(); continue; + case 7: + if (tag !== 58) { + break; + } + + message.metadataDb = PostgresConfig.decode(reader, reader.uint32()); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -1289,6 +1301,7 @@ export const S3Config = { roleArn: isSet(object.roleArn) ? String(object.roleArn) : undefined, region: isSet(object.region) ? String(object.region) : undefined, endpoint: isSet(object.endpoint) ? String(object.endpoint) : undefined, + metadataDb: isSet(object.metadataDb) ? PostgresConfig.fromJSON(object.metadataDb) : undefined, }; }, @@ -1312,6 +1325,9 @@ export const S3Config = { if (message.endpoint !== undefined) { obj.endpoint = message.endpoint; } + if (message.metadataDb !== undefined) { + obj.metadataDb = PostgresConfig.toJSON(message.metadataDb); + } return obj; }, @@ -1326,6 +1342,9 @@ export const S3Config = { message.roleArn = object.roleArn ?? undefined; message.region = object.region ?? undefined; message.endpoint = object.endpoint ?? undefined; + message.metadataDb = (object.metadataDb !== undefined && object.metadataDb !== null) + ? PostgresConfig.fromPartial(object.metadataDb) + : undefined; return message; }, };