From 3e0ff70279a046353a73f3f1cd6b15422d15997e Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 24 Nov 2023 01:10:47 +0530 Subject: [PATCH] removed SQL transport for BQ/SF, removed choices --- flow/activities/flowable.go | 1 - flow/connectors/bigquery/bigquery.go | 229 +----------------- flow/connectors/bigquery/qrep.go | 13 +- flow/connectors/bigquery/qrep_sql_sync.go | 143 ----------- flow/connectors/postgres/qrep.go | 19 +- flow/connectors/snowflake/qrep.go | 52 ++-- flow/connectors/snowflake/snowflake.go | 174 +------------ flow/e2e/bigquery/peer_flow_bq_test.go | 15 -- flow/e2e/bigquery/qrep_flow_bq_test.go | 4 +- flow/e2e/congen.go | 5 +- flow/e2e/postgres/qrep_flow_pg_test.go | 1 - flow/e2e/s3/qrep_flow_s3_test.go | 3 - flow/e2e/snowflake/peer_flow_sf_test.go | 14 -- flow/e2e/snowflake/qrep_flow_sf_test.go | 6 +- .../e2e/sqlserver/qrep_flow_sqlserver_test.go | 1 - flow/e2e/test_utils.go | 3 +- flow/workflows/snapshot_flow.go | 1 - nexus/analyzer/src/lib.rs | 43 +--- nexus/analyzer/src/qrep.rs | 6 - nexus/flow-rs/src/grpc.rs | 16 -- nexus/pt/src/flow_model.rs | 11 - 21 files changed, 51 insertions(+), 709 deletions(-) delete mode 100644 flow/connectors/bigquery/qrep_sql_sync.go diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 167f15e3b5..a4b26c2775 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -269,7 +269,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{ Records: recordBatch, FlowJobName: input.FlowConnectionConfigs.FlowJobName, - SyncMode: input.FlowConnectionConfigs.CdcSyncMode, StagingPath: input.FlowConnectionConfigs.CdcStagingPath, PushBatchSize: input.FlowConnectionConfigs.PushBatchSize, PushParallelism: input.FlowConnectionConfigs.PushParallelism, diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index c216ac5233..434014cf10 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "math/rand" "reflect" "regexp" "strings" @@ -465,206 +464,12 @@ func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S } syncBatchID = syncBatchID + 1 - var res *model.SyncResponse - if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO { - res, err = c.syncRecordsViaAvro(req, rawTableName, syncBatchID) - if err != nil { - return nil, err - } - } - if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT { - res, err = c.syncRecordsViaSQL(req, rawTableName, syncBatchID) - if err != nil { - return nil, err - } - } - - return res, nil -} - -func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, - rawTableName string, syncBatchID int64) (*model.SyncResponse, error) { - stagingTableName := c.getStagingTableName(req.FlowJobName) - stagingTable := c.client.Dataset(c.datasetID).Table(stagingTableName) - err := c.truncateTable(stagingTableName) + res, err := c.syncRecordsViaAvro(req, rawTableName, syncBatchID) if err != nil { - return nil, fmt.Errorf("failed to truncate staging table: %v", err) - } - // separate staging batchID which is random/unique - // to handle the case where ingestion into staging passes but raw fails - // helps avoid duplicates in the raw table - //nolint:gosec - stagingBatchID := rand.Int63() - records := make([]StagingBQRecord, 0) - tableNameRowsMapping := make(map[string]uint32) - first := true - var firstCP int64 = 0 - - // loop over req.Records - for record := range req.Records.GetRecords() { - switch r := record.(type) { - case *model.InsertRecord: - // create the 3 required fields - // 1. _peerdb_uid - uuid - // 2. _peerdb_timestamp - current timestamp - // 2. _peerdb_timestamp_nanos - current timestamp in nano seconds - // 3. _peerdb_data - itemsJSON of `r.Items` - itemsJSON, err := r.Items.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to create items to json: %v", err) - } - - // append the row to the records - records = append(records, StagingBQRecord{ - uid: uuid.New().String(), - timestamp: time.Now(), - timestampNanos: time.Now().UnixNano(), - destinationTableName: r.DestinationTableName, - data: itemsJSON, - recordType: 0, - matchData: "", - batchID: syncBatchID, - stagingBatchID: stagingBatchID, - unchangedToastColumns: "", - }) - tableNameRowsMapping[r.DestinationTableName] += 1 - case *model.UpdateRecord: - // create the 5 required fields - // 1. _peerdb_uid - uuid - // 2. _peerdb_timestamp - current timestamp - // 3. _peerdb_data - json of `r.NewItems` - // 4. _peerdb_record_type - 1 - // 5. _peerdb_match_data - json of `r.OldItems` - - newItemsJSON, err := r.NewItems.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to create new items to json: %v", err) - } - - oldItemsJSON, err := r.OldItems.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to create old items to json: %v", err) - } - - // append the row to the records - records = append(records, StagingBQRecord{ - uid: uuid.New().String(), - timestamp: time.Now(), - timestampNanos: time.Now().UnixNano(), - destinationTableName: r.DestinationTableName, - data: newItemsJSON, - recordType: 1, - matchData: oldItemsJSON, - batchID: syncBatchID, - stagingBatchID: stagingBatchID, - unchangedToastColumns: utils.KeysToString(r.UnchangedToastColumns), - }) - tableNameRowsMapping[r.DestinationTableName] += 1 - case *model.DeleteRecord: - // create the 4 required fields - // 1. _peerdb_uid - uuid - // 2. _peerdb_timestamp - current timestamp - // 3. _peerdb_record_type - 2 - // 4. _peerdb_match_data - json of `r.Items` - - // json.Marshal converts bytes in Hex automatically to BASE64 string. - itemsJSON, err := r.Items.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to create items to json: %v", err) - } - - // append the row to the records - records = append(records, StagingBQRecord{ - uid: uuid.New().String(), - timestamp: time.Now(), - timestampNanos: time.Now().UnixNano(), - destinationTableName: r.DestinationTableName, - data: itemsJSON, - recordType: 2, - matchData: itemsJSON, - batchID: syncBatchID, - stagingBatchID: stagingBatchID, - unchangedToastColumns: "", - }) - - tableNameRowsMapping[r.DestinationTableName] += 1 - default: - return nil, fmt.Errorf("record type %T not supported", r) - } - - if first { - firstCP = record.GetCheckPointID() - first = false - } - } - - numRecords := len(records) - // insert the records into the staging table - stagingInserter := stagingTable.Inserter() - stagingInserter.IgnoreUnknownValues = true - - // insert the records into the staging table in batches of size syncRecordsBatchSize - for i := 0; i < numRecords; i += SyncRecordsBatchSize { - end := i + SyncRecordsBatchSize - - if end > numRecords { - end = numRecords - } - - chunk := records[i:end] - err = stagingInserter.Put(c.ctx, chunk) - if err != nil { - return nil, fmt.Errorf("failed to insert chunked rows into staging table: %v", err) - } - } - - lastCP, err := req.Records.GetLastCheckpoint() - if err != nil { - return nil, fmt.Errorf("failed to get last checkpoint: %v", err) - } - - release, err := c.grabJobsUpdateLock() - if err != nil { - return nil, fmt.Errorf("failed to grab jobs update lock: %v", err) - } - - defer func() { - err := release() - if err != nil { - log.Errorf("failed to release jobs update lock: %v", err) - } - }() - - // we have to do the following things in a transaction - // 1. append the records in the staging table to the raw table. - // 2. execute the update metadata query to store the last committed watermark. - // 2.(contd) keep track of the last batchID that is synced. - appendStmt := c.getAppendStagingToRawStmt(rawTableName, stagingTableName, stagingBatchID) - updateMetadataStmt, err := c.getUpdateMetadataStmt(req.FlowJobName, lastCP, syncBatchID) - if err != nil { - return nil, fmt.Errorf("failed to get update metadata statement: %v", err) - } - - // execute the statements in a transaction - stmts := []string{} - stmts = append(stmts, "BEGIN TRANSACTION;") - stmts = append(stmts, appendStmt) - stmts = append(stmts, updateMetadataStmt) - stmts = append(stmts, "COMMIT TRANSACTION;") - _, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx) - if err != nil { - return nil, fmt.Errorf("failed to execute statements in a transaction: %v", err) + return nil, err } - log.Printf("pushed %d records to %s.%s", numRecords, c.datasetID, rawTableName) - - return &model.SyncResponse{ - FirstSyncedCheckPointID: firstCP, - LastSyncedCheckPointID: lastCP, - NumRecordsSynced: int64(numRecords), - CurrentSyncBatchID: syncBatchID, - TableNameRowsMapping: tableNameRowsMapping, - }, nil + return res, nil } func (c *BigQueryConnector) syncRecordsViaAvro( @@ -1083,17 +888,6 @@ func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedChec return jobStatement, nil } -// getAppendStagingToRawStmt returns the statement to append the staging table to the raw table. -func (c *BigQueryConnector) getAppendStagingToRawStmt( - rawTableName string, stagingTableName string, stagingBatchID int64, -) string { - return fmt.Sprintf( - `INSERT INTO %s.%s SELECT _peerdb_uid,_peerdb_timestamp,_peerdb_timestamp_nanos, - _peerdb_destination_table_name,_peerdb_data,_peerdb_record_type,_peerdb_match_data, - _peerdb_batch_id,_peerdb_unchanged_toast_columns FROM %s.%s WHERE _peerdb_staging_batch_id = %d;`, - c.datasetID, rawTableName, c.datasetID, stagingTableName, stagingBatchID) -} - // metadataHasJob checks if the metadata table has the given job. func (c *BigQueryConnector) metadataHasJob(jobName string) (bool, error) { checkStmt := fmt.Sprintf( @@ -1213,23 +1007,6 @@ func (c *BigQueryConnector) getStagingTableName(flowJobName string) string { return fmt.Sprintf("_peerdb_staging_%s", flowJobName) } -// truncateTable truncates a table. -func (c *BigQueryConnector) truncateTable(tableIdentifier string) error { - // execute DELETE FROM table where the timestamp is older than 90 mins from now. - // The timestamp is used to ensure that the streaming rows are not effected by the delete. - // column of interest is the _peerdb_timestamp column. - deleteStmt := fmt.Sprintf( - "DELETE FROM %s.%s WHERE _peerdb_timestamp < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 90 MINUTE)", - c.datasetID, tableIdentifier) - q := c.client.Query(deleteStmt) - _, err := q.Read(c.ctx) - if err != nil { - return fmt.Errorf("failed to delete rows from table %s: %w", tableIdentifier, err) - } - - return nil -} - // Bigquery doesn't allow concurrent updates to the same table. // we grab a lock on catalog to ensure that only one job is updating // bigquery tables at a time. diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index 7f4a5cdb8a..cffd5d7864 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -48,17 +48,8 @@ func (c *BigQueryConnector) SyncQRepRecords( " partition %s of destination table %s", partition.PartitionId, destTable) - syncMode := config.SyncMode - switch syncMode { - case protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT: - stagingTableSync := &QRepStagingTableSync{connector: c} - return stagingTableSync.SyncQRepRecords(config.FlowJobName, destTable, partition, tblMetadata, stream) - case protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO: - avroSync := &QRepAvroSyncMethod{connector: c, gcsBucket: config.StagingPath} - return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition, tblMetadata, stream) - default: - return 0, fmt.Errorf("unsupported sync mode: %s", syncMode) - } + avroSync := &QRepAvroSyncMethod{connector: c, gcsBucket: config.StagingPath} + return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition, tblMetadata, stream) } func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfig, partition *protos.QRepPartition, diff --git a/flow/connectors/bigquery/qrep_sql_sync.go b/flow/connectors/bigquery/qrep_sql_sync.go deleted file mode 100644 index 3580731dbc..0000000000 --- a/flow/connectors/bigquery/qrep_sql_sync.go +++ /dev/null @@ -1,143 +0,0 @@ -package connbigquery - -import ( - "fmt" - "math/rand" - "strings" - "time" - - "cloud.google.com/go/bigquery" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/model" - log "github.com/sirupsen/logrus" -) - -type QRepSQLSyncMethod interface { - SyncQRepRecords( - flowJobName string, - dstTableName string, - partition *protos.QRepPartition, - dstTableMetadata *bigquery.TableMetadata, - stream *model.QRecordStream, - ) (int, error) -} - -type QRepStagingTableSync struct { - connector *BigQueryConnector -} - -func (s *QRepStagingTableSync) SyncQRepRecords( - flowJobName string, - dstTableName string, - partition *protos.QRepPartition, - dstTableMetadata *bigquery.TableMetadata, - stream *model.QRecordStream, -) (int, error) { - partitionID := partition.PartitionId - - startTime := time.Now() - - // generate a 128 bit random runID for this run - //nolint:gosec - runID := rand.Int63() - - // create a staging table with the same schema as the destination table if it doesn't exist - stagingTable := fmt.Sprintf("%s_staging", dstTableName) - stagingBQTable := s.connector.client.Dataset(s.connector.datasetID).Table(stagingTable) - if _, err := stagingBQTable.Metadata(s.connector.ctx); err != nil { - metadata := &bigquery.TableMetadata{ - Name: stagingTable, - Schema: dstTableMetadata.Schema, - } - - // add partitionID column with string type - metadata.Schema = append(metadata.Schema, &bigquery.FieldSchema{ - Name: "partitionID", - Type: bigquery.StringFieldType, - }) - - // add runID column with integer type - metadata.Schema = append(metadata.Schema, &bigquery.FieldSchema{ - Name: "runID", - Type: bigquery.IntegerFieldType, - }) - - // create the staging table - if err := stagingBQTable.Create(s.connector.ctx, metadata); err != nil { - return 0, fmt.Errorf("failed to create staging table %s: %w", stagingTable, err) - } - } - - // get an inserter for the staging table and insert the records - inserter := stagingBQTable.Inserter() - - schema, err := stream.Schema() - if err != nil { - log.WithFields(log.Fields{ - "flowName": flowJobName, - "partitionID": partitionID, - }).Errorf("failed to get schema from stream: %v", err) - return 0, fmt.Errorf("failed to get schema from stream: %w", err) - } - - // Step 2: Insert records into the staging table. - valueSaverRecords := make([]bigquery.ValueSaver, 0) - for qRecordOrErr := range stream.Records { - if qRecordOrErr.Err != nil { - log.WithFields(log.Fields{ - "flowName": flowJobName, - "partitionID": partitionID, - }).Errorf("[bq] failed to get record from stream: %v", qRecordOrErr.Err) - return 0, fmt.Errorf("[bq] failed to get record from stream: %w", qRecordOrErr.Err) - } - - qRecord := qRecordOrErr.Record - toPut := QRecordValueSaver{ - ColumnNames: schema.GetColumnNames(), - Record: qRecord, - PartitionID: partitionID, - RunID: runID, - } - - valueSaverRecords = append(valueSaverRecords, toPut) - } - - err = inserter.Put(s.connector.ctx, valueSaverRecords) - if err != nil { - return -1, fmt.Errorf("failed to insert records into staging table: %v", err) - } - - // Copy the records into the destination table in a transaction. - // append all the statements to one list - stmts := []string{} - stmts = append(stmts, "BEGIN TRANSACTION;") - - // col names for the destination table joined by comma - colNames := []string{} - for _, col := range dstTableMetadata.Schema { - colNames = append(colNames, fmt.Sprintf("`%s`", col.Name)) - } - colNamesStr := strings.Join(colNames, ", ") - - paritionSelect := fmt.Sprintf("SELECT %s FROM %s.%s WHERE partitionID = '%s' AND runID = %d;", - colNamesStr, s.connector.datasetID, stagingTable, partitionID, runID) - appendStmt := fmt.Sprintf("INSERT INTO %s.%s %s", s.connector.datasetID, dstTableName, paritionSelect) - stmts = append(stmts, appendStmt) - - insertMetadataStmt, err := s.connector.createMetadataInsertStatement(partition, flowJobName, startTime) - if err != nil { - return -1, fmt.Errorf("failed to create metadata insert statement: %v", err) - } - stmts = append(stmts, insertMetadataStmt) - - stmts = append(stmts, "COMMIT TRANSACTION;") - - // execute the statements in a transaction - _, err = s.connector.client.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx) - if err != nil { - return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err) - } - - log.Printf("pushed %d records to %s.%s", len(valueSaverRecords), s.connector.datasetID, dstTableName) - return len(valueSaverRecords), nil -} diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 26742b4114..66b7e9aa69 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -85,8 +85,6 @@ func (c *PostgresConnector) getNumRowsPartitions( numRowsPerPartition := int64(config.NumRowsPerPartition) quotedWatermarkColumn := fmt.Sprintf("\"%s\"", config.WatermarkColumn) if config.WatermarkColumn == "xmin" { - quotedWatermarkColumn = fmt.Sprintf("%s::text::bigint", quotedWatermarkColumn) - minVal, maxVal, err := c.getMinMaxValues(tx, config, last) if err != nil { return nil, fmt.Errorf("failed to get min max values for xmin: %w", err) @@ -98,7 +96,8 @@ func (c *PostgresConnector) getNumRowsPartitions( // we will only return 1 partition for xmin: // if there is no last partition, we will return a partition with the min and max values - // if there is a last partition, we will return a partition with the last partition's end value + 1 and the max value + // if there is a last partition, we will return a partition with the last partition's + // end value + 1 and the max value if last != nil && last.Range != nil { minValInt += 1 } @@ -538,17 +537,9 @@ func (c *PostgresConnector) SyncQRepRecords( "partition": partition.PartitionId, }).Infof("SyncRecords called and initial checks complete.") - syncMode := config.SyncMode - switch syncMode { - case protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT: - stagingTableSync := &QRepStagingTableSync{connector: c} - return stagingTableSync.SyncQRepRecords( - config.FlowJobName, dstTable, partition, stream, config.WriteMode) - case protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO: - return 0, fmt.Errorf("[postgres] SyncQRepRecords not implemented for storage avro sync mode") - default: - return 0, fmt.Errorf("unsupported sync mode: %s", syncMode) - } + stagingTableSync := &QRepStagingTableSync{connector: c} + return stagingTableSync.SyncQRepRecords( + config.FlowJobName, dstTable, partition, stream, config.WriteMode) } // SetupQRepMetadataTables function for postgres connector diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index 3fcc8720a0..95ba0aa2b3 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -49,16 +49,8 @@ func (c *SnowflakeConnector) SyncQRepRecords( return 0, nil } - syncMode := config.SyncMode - switch syncMode { - case protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT: - return 0, fmt.Errorf("multi-insert sync mode not supported for snowflake") - case protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO: - avroSync := NewSnowflakeAvroSyncMethod(config, c) - return avroSync.SyncQRepRecords(config, partition, tblSchema, stream) - default: - return 0, fmt.Errorf("unsupported sync mode: %s", syncMode) - } + avroSync := NewSnowflakeAvroSyncMethod(config, c) + return avroSync.SyncQRepRecords(config, partition, tblSchema, stream) } func (c *SnowflakeConnector) createMetadataInsertStatement( @@ -269,32 +261,24 @@ func (c *SnowflakeConnector) ConsolidateQRepPartitions(config *protos.QRepConfig destTable := config.DestinationTableIdentifier stageName := c.getStageNameForJob(config.FlowJobName) - syncMode := config.SyncMode - switch syncMode { - case protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT: - return fmt.Errorf("multi-insert sync mode not supported for snowflake") - case protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO: - colInfo, err := c.getColsFromTable(destTable) - if err != nil { - log.WithFields(log.Fields{ - "flowName": config.FlowJobName, - }).Errorf("failed to get columns from table %s: %v", destTable, err) - return fmt.Errorf("failed to get columns from table %s: %w", destTable, err) - } - - allCols := colInfo.Columns - err = CopyStageToDestination(c, config, destTable, stageName, allCols) - if err != nil { - log.WithFields(log.Fields{ - "flowName": config.FlowJobName, - }).Errorf("failed to copy stage to destination: %v", err) - return fmt.Errorf("failed to copy stage to destination: %w", err) - } + colInfo, err := c.getColsFromTable(destTable) + if err != nil { + log.WithFields(log.Fields{ + "flowName": config.FlowJobName, + }).Errorf("failed to get columns from table %s: %v", destTable, err) + return fmt.Errorf("failed to get columns from table %s: %w", destTable, err) + } - return nil - default: - return fmt.Errorf("unsupported sync mode: %s", syncMode) + allCols := colInfo.Columns + err = CopyStageToDestination(c, config, destTable, stageName, allCols) + if err != nil { + log.WithFields(log.Fields{ + "flowName": config.FlowJobName, + }).Errorf("failed to copy stage to destination: %v", err) + return fmt.Errorf("failed to copy stage to destination: %w", err) } + + return nil } // CleanupQRepFlow function for snowflake connector diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 4c4433cdec..f04b9be387 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -16,7 +16,6 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" util "github.com/PeerDB-io/peer-flow/utils" - "github.com/google/uuid" log "github.com/sirupsen/logrus" "github.com/snowflakedb/gosnowflake" "go.temporal.io/sdk/activity" @@ -76,8 +75,6 @@ const ( dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?" checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?" - - syncRecordsChunkSize = 1024 ) type tableNameComponents struct { @@ -92,17 +89,6 @@ type SnowflakeConnector struct { metadataSchema string } -type snowflakeRawRecord struct { - uid string - timestamp int64 - destinationTableName string - data string - recordType int - matchData string - batchID int64 - unchangedToastColumns string -} - // creating this to capture array results from snowflake. type ArrayString []string @@ -505,13 +491,10 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model. } syncBatchID = syncBatchID + 1 - var res *model.SyncResponse - if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO { - log.Infof("sync mode for flow %s is AVRO", req.FlowJobName) - res, err = c.syncRecordsViaAvro(req, rawTableIdentifier, syncBatchID) - if err != nil { - return nil, err - } + log.Infof("sync mode for flow %s is AVRO", req.FlowJobName) + res, err := c.syncRecordsViaAvro(req, rawTableIdentifier, syncBatchID) + if err != nil { + return nil, err } // transaction for SyncRecords @@ -530,14 +513,6 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model. } }() - if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT { - log.Infof("sync mode for flow %s is MULTI_INSERT", req.FlowJobName) - res, err = c.syncRecordsViaSQL(req, rawTableIdentifier, syncBatchID, syncRecordsTx) - if err != nil { - return nil, err - } - } - // updating metadata with new offset and syncBatchID err = c.updateSyncMetadata(req.FlowJobName, res.LastSyncedCheckPointID, syncBatchID, syncRecordsTx) if err != nil { @@ -552,112 +527,6 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model. return res, nil } -func (c *SnowflakeConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, rawTableIdentifier string, - syncBatchID int64, syncRecordsTx *sql.Tx) (*model.SyncResponse, error) { - records := make([]snowflakeRawRecord, 0) - tableNameRowsMapping := make(map[string]uint32) - - first := true - var firstCP int64 = 0 - - for record := range req.Records.GetRecords() { - switch typedRecord := record.(type) { - case *model.InsertRecord: - // json.Marshal converts bytes in Hex automatically to BASE64 string. - itemsJSON, err := typedRecord.Items.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err) - } - - // add insert record to the raw table - records = append(records, snowflakeRawRecord{ - uid: uuid.New().String(), - timestamp: time.Now().UnixNano(), - destinationTableName: typedRecord.DestinationTableName, - data: itemsJSON, - recordType: 0, - matchData: "", - batchID: syncBatchID, - unchangedToastColumns: "", - }) - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - case *model.UpdateRecord: - newItemsJSON, err := typedRecord.NewItems.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err) - } - oldItemsJSON, err := typedRecord.OldItems.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err) - } - - // add update record to the raw table - records = append(records, snowflakeRawRecord{ - uid: uuid.New().String(), - timestamp: time.Now().UnixNano(), - destinationTableName: typedRecord.DestinationTableName, - data: newItemsJSON, - recordType: 1, - matchData: oldItemsJSON, - batchID: syncBatchID, - unchangedToastColumns: utils.KeysToString(typedRecord.UnchangedToastColumns), - }) - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - case *model.DeleteRecord: - itemsJSON, err := typedRecord.Items.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err) - } - - // append delete record to the raw table - records = append(records, snowflakeRawRecord{ - uid: uuid.New().String(), - timestamp: time.Now().UnixNano(), - destinationTableName: typedRecord.DestinationTableName, - data: itemsJSON, - recordType: 2, - matchData: itemsJSON, - batchID: syncBatchID, - unchangedToastColumns: "", - }) - tableNameRowsMapping[typedRecord.DestinationTableName] += 1 - default: - return nil, fmt.Errorf("record type %T not supported in Snowflake flow connector", typedRecord) - } - - if first { - firstCP = record.GetCheckPointID() - first = false - } - } - - // inserting records into raw table. - numRecords := len(records) - for begin := 0; begin < numRecords; begin += syncRecordsChunkSize { - end := begin + syncRecordsChunkSize - if end > numRecords { - end = numRecords - } - err := c.insertRecordsInRawTable(rawTableIdentifier, records[begin:end], syncRecordsTx) - if err != nil { - return nil, err - } - } - - lastCheckpoint, err := req.Records.GetLastCheckpoint() - if err != nil { - return nil, err - } - - return &model.SyncResponse{ - FirstSyncedCheckPointID: firstCP, - LastSyncedCheckPointID: lastCheckpoint, - NumRecordsSynced: int64(len(records)), - CurrentSyncBatchID: syncBatchID, - TableNameRowsMapping: tableNameRowsMapping, - }, nil -} - func (c *SnowflakeConnector) syncRecordsViaAvro( req *model.SyncRecordsRequest, rawTableIdentifier string, @@ -807,12 +676,10 @@ func (c *SnowflakeConnector) CreateRawTable(req *protos.CreateRawTableInput) (*p return nil, fmt.Errorf("unable to commit transaction for creation of raw table: %w", err) } - if req.CdcSyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO { - stage := c.getStageNameForJob(req.FlowJobName) - err = c.createStage(stage, &protos.QRepConfig{}) - if err != nil { - return nil, err - } + stage := c.getStageNameForJob(req.FlowJobName) + err = c.createStage(stage, &protos.QRepConfig{}) + if err != nil { + return nil, err } return &protos.CreateRawTableOutput{ @@ -928,36 +795,11 @@ func generateCreateTableSQLForNormalizedTable( strings.TrimSuffix(strings.Join(createTableSQLArray, ""), ",")) } -func generateMultiValueInsertSQL(metadataSchema string, tableIdentifier string, chunkSize int) string { - // inferring the width of the raw table from the create table statement - rawTableWidth := strings.Count(createRawTableSQL, ",") + 1 - - return fmt.Sprintf(rawTableMultiValueInsertSQL, metadataSchema, tableIdentifier, - strings.TrimSuffix(strings.Repeat(fmt.Sprintf("(%s),", - strings.TrimSuffix(strings.Repeat("?,", rawTableWidth), ",")), chunkSize), ",")) -} - func getRawTableIdentifier(jobName string) string { jobName = regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(jobName, "_") return fmt.Sprintf("%s_%s", rawTablePrefix, jobName) } -func (c *SnowflakeConnector) insertRecordsInRawTable(rawTableIdentifier string, - snowflakeRawRecords []snowflakeRawRecord, syncRecordsTx *sql.Tx) error { - rawRecordsData := make([]any, 0) - - for _, record := range snowflakeRawRecords { - rawRecordsData = append(rawRecordsData, record.uid, record.timestamp, record.destinationTableName, - record.data, record.recordType, record.matchData, record.batchID, record.unchangedToastColumns) - } - _, err := syncRecordsTx.ExecContext(c.ctx, - generateMultiValueInsertSQL(c.metadataSchema, rawTableIdentifier, len(snowflakeRawRecords)), rawRecordsData...) - if err != nil { - return fmt.Errorf("failed to insert record into raw table: %w", err) - } - return nil -} - func (c *SnowflakeConnector) generateAndExecuteMergeStatement( ctx context.Context, destinationTableIdentifier string, diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 90fbb552dc..1618e84326 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/PeerDB-io/peer-flow/e2e" - "github.com/PeerDB-io/peer-flow/generated/protos" util "github.com/PeerDB-io/peer-flow/utils" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/jackc/pgx/v5/pgxpool" @@ -150,7 +149,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, CdcStagingPath: "", } @@ -196,7 +194,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, CdcStagingPath: "", } @@ -245,7 +242,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, CdcStagingPath: "", } @@ -315,7 +311,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, CdcStagingPath: "", } @@ -385,7 +380,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, CdcStagingPath: "", } @@ -448,7 +442,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, CdcStagingPath: "", } @@ -523,7 +516,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, CdcStagingPath: "", } @@ -593,7 +585,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, CdcStagingPath: "", } @@ -662,7 +653,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, CdcStagingPath: "", } @@ -740,7 +730,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { TableNameMapping: map[string]string{srcTable1Name: dstTable1Name, srcTable2Name: dstTable2Name}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, CdcStagingPath: "", } @@ -803,7 +792,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, CdcStagingPath: "", } @@ -908,7 +896,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, CdcStagingPath: "", } @@ -984,7 +971,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, CdcStagingPath: "", } @@ -1063,7 +1049,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, CdcStagingPath: "", } diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index 8a97c3b85b..79e4201852 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -6,7 +6,6 @@ import ( connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" - "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/stretchr/testify/require" ) @@ -64,9 +63,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName), tblName, query, - protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, s.bqHelper.Peer, - "peerdb_staging") + "") s.NoError(err) e2e.RunQrepFlowWorkflow(env, qrepConfig) diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index 72a756e531..4b026c072e 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -151,7 +151,6 @@ type FlowConnectionGenerationConfig struct { TableNameMapping map[string]string PostgresPort int Destination *protos.Peer - CDCSyncMode protos.QRepSyncMode CdcStagingPath string } @@ -182,7 +181,6 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() (*proto ret.TableMappings = tblMappings ret.Source = GeneratePostgresPeer(c.PostgresPort) ret.Destination = c.Destination - ret.CdcSyncMode = c.CDCSyncMode ret.CdcStagingPath = c.CdcStagingPath ret.SoftDeleteColName = "_PEERDB_IS_DELETED" ret.SyncedAtColName = "_PEERDB_SYNCED_AT" @@ -200,7 +198,7 @@ type QRepFlowConnectionGenerationConfig struct { // GenerateQRepConfig generates a qrep config for testing. func (c *QRepFlowConnectionGenerationConfig) GenerateQRepConfig( - query string, watermark string, syncMode protos.QRepSyncMode) (*protos.QRepConfig, error) { + query string, watermark string) (*protos.QRepConfig, error) { ret := &protos.QRepConfig{} ret.FlowJobName = c.FlowJobName ret.WatermarkTable = c.WatermarkTable @@ -214,7 +212,6 @@ func (c *QRepFlowConnectionGenerationConfig) GenerateQRepConfig( ret.Query = query ret.WatermarkColumn = watermark - ret.SyncMode = syncMode ret.StagingPath = c.StagingPath ret.WriteMode = &protos.QRepWriteMode{ WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND, diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index df1653b992..f8658a9825 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -165,7 +165,6 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { srcSchemaQualified, dstSchemaQualified, query, - protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT, postgresPeer, "", ) diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index 2fca18a700..d053e14372 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/PeerDB-io/peer-flow/e2e" - "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" log "github.com/sirupsen/logrus" @@ -107,7 +106,6 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { schemaQualifiedName, "e2e_dest_1", query, - protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, s.s3Helper.GetPeer(), "stage", ) @@ -154,7 +152,6 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { schemaQualifiedName, "e2e_dest_ctid", query, - protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, s.s3Helper.GetPeer(), "stage", ) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 3c62490271..e489545e3f 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -143,7 +143,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -220,7 +219,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -308,7 +306,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -379,7 +376,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -442,7 +438,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -517,7 +512,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -587,7 +581,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -657,7 +650,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -733,7 +725,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { TableNameMapping: map[string]string{srcTable1Name: dstTable1Name, srcTable2Name: dstTable2Name}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -793,7 +784,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -958,7 +948,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -1034,7 +1023,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -1112,7 +1100,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -1196,7 +1183,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { }, }, Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), - CdcSyncMode: connectionGen.CDCSyncMode, CdcStagingPath: connectionGen.CdcStagingPath, } diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 82901beac2..9795412667 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -75,7 +75,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, - protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, s.sfHelper.Peer, "", ) @@ -116,7 +115,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, - protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, s.sfHelper.Peer, "", ) @@ -161,7 +159,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { s.attachSchemaSuffix(tblName), dstSchemaQualified, query, - protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, s.sfHelper.Peer, "", ) @@ -201,7 +198,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, - protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, s.sfHelper.Peer, "", ) @@ -249,7 +245,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration( s.attachSchemaSuffix(tblName), dstSchemaQualified, query, - protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + sfPeer, "", ) diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 9c2b27bcb0..dbc48e577d 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -163,7 +163,6 @@ func (s *PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append WatermarkColumn: "v_from", NumRowsPerPartition: 5, InitialCopyOnly: true, - SyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT, MaxParallelWorkers: 1, WaitBetweenBatchesSeconds: 5, } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index b57ad72214..ac668ae2f5 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -274,7 +274,6 @@ func CreateQRepWorkflowConfig( sourceTable string, dstTable string, query string, - syncMode protos.QRepSyncMode, dest *protos.Peer, stagingPath string, ) (*protos.QRepConfig, error) { @@ -289,7 +288,7 @@ func CreateQRepWorkflowConfig( watermark := "updated_at" - qrepConfig, err := connectionGen.GenerateQRepConfig(query, watermark, syncMode) + qrepConfig, err := connectionGen.GenerateQRepConfig(query, watermark) if err != nil { return nil, err } diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 26f8475e12..eb9eac8126 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -169,7 +169,6 @@ func (s *SnapshotFlowExecution) cloneTable( InitialCopyOnly: true, DestinationTableIdentifier: dstName, NumRowsPerPartition: numRowsPerPartition, - SyncMode: s.config.SnapshotSyncMode, MaxParallelWorkers: numWorkers, StagingPath: s.config.SnapshotStagingPath, WriteMode: &protos.QRepWriteMode{ diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 2e64012a35..49784c2ff4 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -8,7 +8,7 @@ use std::{ use anyhow::Context; use pt::{ - flow_model::{FlowJob, FlowJobTableMapping, FlowSyncMode, QRepFlowJob}, + flow_model::{FlowJob, FlowJobTableMapping, QRepFlowJob}, peerdb_peers::{ peer::Config, BigqueryConfig, DbType, EventHubConfig, MongoConfig, Peer, PostgresConfig, S3Config, SnowflakeConfig, SqlServerConfig, @@ -181,7 +181,7 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { .exclude .as_ref() .map(|ss| ss.iter().map(|s| s.to_string()).collect()) - .unwrap_or_default() + .unwrap_or_default(), }); } @@ -248,28 +248,12 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { Some(sqlparser::ast::Value::Number(n, _)) => Some(n.parse::()?), _ => None, }; - let snapshot_sync_mode: Option = - match raw_options.remove("snapshot_sync_mode") { - Some(sqlparser::ast::Value::SingleQuotedString(s)) => { - let s = s.to_lowercase(); - FlowSyncMode::parse_string(&s).ok() - } - _ => None, - }; let snapshot_staging_path = match raw_options .remove("snapshot_staging_path") { Some(sqlparser::ast::Value::SingleQuotedString(s)) => Some(s.clone()), - _ => None, + _ => Some("".to_string()), }; - let cdc_sync_mode: Option = - match raw_options.remove("cdc_sync_mode") { - Some(sqlparser::ast::Value::SingleQuotedString(s)) => { - let s = s.to_lowercase(); - FlowSyncMode::parse_string(&s).ok() - } - _ => None, - }; let snapshot_max_parallel_workers: Option = match raw_options .remove("snapshot_max_parallel_workers") @@ -280,7 +264,7 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { let cdc_staging_path = match raw_options.remove("cdc_staging_path") { Some(sqlparser::ast::Value::SingleQuotedString(s)) => Some(s.clone()), - _ => None, + _ => Some("".to_string()), }; let soft_delete = match raw_options.remove("soft_delete") { @@ -333,9 +317,7 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { snapshot_num_rows_per_partition, snapshot_max_parallel_workers, snapshot_num_tables_in_parallel, - snapshot_sync_mode, snapshot_staging_path, - cdc_sync_mode, cdc_staging_path, soft_delete, replication_slot_name, @@ -347,15 +329,6 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { synced_at_col_name, }; - // Error reporting - if Some(FlowSyncMode::Avro) == flow_job.snapshot_sync_mode - && flow_job.snapshot_staging_path.is_none() - { - return Err(anyhow::anyhow!( - "snapshot_staging_path must be set for AVRO snapshot mode." - )); - } - Ok(Some(PeerDDL::CreateMirrorForCDC { if_not_exists: *if_not_exists, flow_job, @@ -516,7 +489,13 @@ fn parse_db_options( let val = match opt.value { sqlparser::ast::Value::SingleQuotedString(ref str) => str, sqlparser::ast::Value::Number(ref v, _) => v, - sqlparser::ast::Value::Boolean(v) => if v { "true" } else { "false" }, + sqlparser::ast::Value::Boolean(v) => { + if v { + "true" + } else { + "false" + } + } _ => panic!("invalid option type for peer"), }; opts.insert(&opt.name.value, val); diff --git a/nexus/analyzer/src/qrep.rs b/nexus/analyzer/src/qrep.rs index 88af3da66b..4e09757117 100644 --- a/nexus/analyzer/src/qrep.rs +++ b/nexus/analyzer/src/qrep.rs @@ -54,12 +54,6 @@ const QREP_OPTIONS: &[QRepOptionType] = &[ QRepOptionType::StringArray { name: "unique_key_columns", }, - QRepOptionType::String { - name: "sync_data_format", - default_val: Some("default"), - required: false, - accepted_values: Some(&["default", "avro"]), - }, QRepOptionType::String { name: "staging_path", default_val: None, diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 4e09f093fb..df6a642d86 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -186,17 +186,7 @@ impl FlowGrpcClient { snapshot_num_rows_per_partition: snapshot_num_rows_per_partition.unwrap_or(0), snapshot_max_parallel_workers: snapshot_max_parallel_workers.unwrap_or(0), snapshot_num_tables_in_parallel: snapshot_num_tables_in_parallel.unwrap_or(0), - snapshot_sync_mode: job - .snapshot_sync_mode - .clone() - .map(|s| s.as_proto_sync_mode()) - .unwrap_or(0), snapshot_staging_path: job.snapshot_staging_path.clone().unwrap_or_default(), - cdc_sync_mode: job - .cdc_sync_mode - .clone() - .map(|s| s.as_proto_sync_mode()) - .unwrap_or(0), cdc_staging_path: job.cdc_staging_path.clone().unwrap_or_default(), soft_delete: job.soft_delete, replication_slot_name: replication_slot_name.unwrap_or_default(), @@ -232,12 +222,6 @@ impl FlowGrpcClient { "destination_table_name" => cfg.destination_table_identifier = s.clone(), "watermark_column" => cfg.watermark_column = s.clone(), "watermark_table_name" => cfg.watermark_table = s.clone(), - "sync_data_format" => { - cfg.sync_mode = match s.as_str() { - "avro" => pt::peerdb_flow::QRepSyncMode::QrepSyncModeStorageAvro as i32, - _ => pt::peerdb_flow::QRepSyncMode::QrepSyncModeMultiInsert as i32, - } - } "mode" => { let mut wm = QRepWriteMode { write_type: QRepWriteType::QrepWriteModeAppend as i32, diff --git a/nexus/pt/src/flow_model.rs b/nexus/pt/src/flow_model.rs index a6cfe4a68f..5c8b1e3857 100644 --- a/nexus/pt/src/flow_model.rs +++ b/nexus/pt/src/flow_model.rs @@ -3,8 +3,6 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::peerdb_flow; - #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] pub struct FlowJobTableMapping { pub source_table_identifier: String, @@ -27,13 +25,6 @@ impl FlowSyncMode { _ => Err(format!("{} is not a valid FlowSyncMode", s)), } } - - pub fn as_proto_sync_mode(&self) -> i32 { - match self { - FlowSyncMode::Avro => peerdb_flow::QRepSyncMode::QrepSyncModeStorageAvro as i32, - FlowSyncMode::SQL => peerdb_flow::QRepSyncMode::QrepSyncModeMultiInsert as i32, - } - } } impl std::str::FromStr for FlowSyncMode { @@ -69,9 +60,7 @@ pub struct FlowJob { pub snapshot_num_rows_per_partition: Option, pub snapshot_max_parallel_workers: Option, pub snapshot_num_tables_in_parallel: Option, - pub snapshot_sync_mode: Option, pub snapshot_staging_path: Option, - pub cdc_sync_mode: Option, pub cdc_staging_path: Option, pub soft_delete: bool, pub replication_slot_name: Option,