diff --git a/.github/workflows/golang-lint.yml b/.github/workflows/golang-lint.yml index 803a9e09b2..48a0510b20 100644 --- a/.github/workflows/golang-lint.yml +++ b/.github/workflows/golang-lint.yml @@ -13,7 +13,7 @@ jobs: pull-requests: write strategy: matrix: - runner: [ubicloud-standard-2-ubuntu-2204-arm] + runner: [ubicloud-standard-4-ubuntu-2204-arm] runs-on: ${{ matrix.runner }} steps: - name: checkout diff --git a/flow/.golangci.yml b/flow/.golangci.yml index a359e70310..fa679ccf56 100644 --- a/flow/.golangci.yml +++ b/flow/.golangci.yml @@ -5,7 +5,7 @@ linters: enable: - dogsled - dupl - - gofmt + - gofumpt - gosec - misspell - nakedret diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index a4a8b21205..69628c28d5 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -211,7 +211,8 @@ func (a *FlowableActivity) recordSlotSizePeriodically( // StartFlow implements StartFlow. func (a *FlowableActivity) StartFlow(ctx context.Context, - input *protos.StartFlowInput) (*model.SyncResponse, error) { + input *protos.StartFlowInput, +) (*model.SyncResponse, error) { activity.RecordHeartbeat(ctx, "starting flow...") conn := input.FlowConnectionConfigs dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination) @@ -658,7 +659,8 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig, - runUUID string) error { + runUUID string, +) error { dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer) if errors.Is(err, connectors.ErrUnsupportedFunctionality) { return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID) @@ -802,7 +804,8 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { } func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context, - config *protos.QRepConfig, last *protos.QRepPartition) error { + config *protos.QRepConfig, last *protos.QRepPartition, +) error { if config.SourcePeer.Type != protos.DBType_POSTGRES || last.Range == nil { return nil } @@ -838,7 +841,8 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context, } func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) ( - *protos.RenameTablesOutput, error) { + *protos.RenameTablesOutput, error, +) { dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer) if err != nil { return nil, fmt.Errorf("failed to get connector: %w", err) @@ -862,7 +866,8 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena } func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) ( - *protos.CreateTablesFromExistingOutput, error) { + *protos.CreateTablesFromExistingOutput, error, +) { dstConn, err := connectors.GetCDCSyncConnector(ctx, req.Peer) if err != nil { return nil, fmt.Errorf("failed to get connector: %w", err) @@ -932,7 +937,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, Range: &protos.PartitionRange{ Range: &protos.PartitionRange_IntRange{ IntRange: &protos.IntPartitionRange{Start: 0, End: int64(numRecords)}, - }}, + }, + }, } } updateErr := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics}) diff --git a/flow/cmd/api.go b/flow/cmd/api.go index e008299109..09185a0fc1 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -45,7 +45,6 @@ func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) { grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()), ) - if err != nil { return nil, fmt.Errorf("unable to dial grpc server: %w", err) } @@ -68,7 +67,8 @@ func killExistingHeartbeatFlows( ctx context.Context, tc client.Client, namespace string, - taskQueue string) error { + taskQueue string, +) error { listRes, err := tc.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ Namespace: namespace, diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index b12623ec55..9efdc59fb5 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -55,7 +55,8 @@ func schemaForTableIdentifier(tableIdentifier string, peerDBType int32) string { } func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context, - req *protos.CreateCDCFlowRequest, workflowID string) error { + req *protos.CreateCDCFlowRequest, workflowID string, +) error { sourcePeerID, sourePeerType, srcErr := h.getPeerID(ctx, req.ConnectionConfigs.Source.Name) if srcErr != nil { return fmt.Errorf("unable to get peer id for source peer %s: %w", @@ -86,7 +87,8 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context, } func (h *FlowRequestHandler) createQrepJobEntry(ctx context.Context, - req *protos.CreateQRepFlowRequest, workflowID string) error { + req *protos.CreateQRepFlowRequest, workflowID string, +) error { sourcePeerName := req.QrepConfig.SourcePeer.Name sourcePeerID, _, srcErr := h.getPeerID(ctx, sourcePeerName) if srcErr != nil { @@ -117,7 +119,8 @@ func (h *FlowRequestHandler) createQrepJobEntry(ctx context.Context, } func (h *FlowRequestHandler) CreateCDCFlow( - ctx context.Context, req *protos.CreateCDCFlowRequest) (*protos.CreateCDCFlowResponse, error) { + ctx context.Context, req *protos.CreateCDCFlowRequest, +) (*protos.CreateCDCFlowResponse, error) { cfg := req.ConnectionConfigs workflowID := fmt.Sprintf("%s-peerflow-%s", cfg.FlowJobName, uuid.New()) workflowOptions := client.StartWorkflowOptions{ @@ -224,7 +227,8 @@ func (h *FlowRequestHandler) removeFlowEntryInCatalog( } func (h *FlowRequestHandler) CreateQRepFlow( - ctx context.Context, req *protos.CreateQRepFlowRequest) (*protos.CreateQRepFlowResponse, error) { + ctx context.Context, req *protos.CreateQRepFlowRequest, +) (*protos.CreateQRepFlowResponse, error) { cfg := req.QrepConfig workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New()) workflowOptions := client.StartWorkflowOptions{ @@ -697,5 +701,4 @@ func (h *FlowRequestHandler) DropPeer( return &protos.DropPeerResponse{ Ok: true, }, nil - } diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index 40d2486277..ba6e0d0e18 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -66,7 +66,6 @@ func setupPyroscope(opts *WorkerOptions) { pyroscope.ProfileBlockDuration, }, }) - if err != nil { log.Fatal(err) } diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 9b32e30b43..233f319a02 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -259,7 +259,8 @@ func (c *BigQueryConnector) WaitForTableReady(tblName string) error { // ReplayTableSchemaDeltas changes a destination table to match the schema at source // This could involve adding or dropping multiple columns. func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string, - schemaDeltas []*protos.TableSchemaDelta) error { + schemaDeltas []*protos.TableSchemaDelta, +) error { for _, schemaDelta := range schemaDeltas { if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 { continue @@ -390,7 +391,8 @@ func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, erro } func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64, - normalizeBatchID int64) ([]string, error) { + normalizeBatchID int64, +) ([]string, error) { rawTableName := c.getRawTableName(flowJobName) // Prepare the query to retrieve distinct tables in that batch @@ -426,7 +428,8 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syn } func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, syncBatchID int64, - normalizeBatchID int64) (map[string][]string, error) { + normalizeBatchID int64, +) (map[string][]string, error) { rawTableName := c.getRawTableName(flowJobName) // Prepare the query to retrieve distinct tables in that batch @@ -795,7 +798,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) mergeStmts := mergeGen.generateMergeStmts() stmts = append(stmts, mergeStmts...) } - //update metadata to make the last normalized batch id to the recent last sync batch id. + // update metadata to make the last normalized batch id to the recent last sync batch id. updateMetadataStmt := fmt.Sprintf( "UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name = '%s';", c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName) @@ -894,7 +897,8 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr // getUpdateMetadataStmt updates the metadata tables for a given job. func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedCheckpointID int64, - batchID int64) (string, error) { + batchID int64, +) (string, error) { hasJob, err := c.metadataHasJob(jobName) if err != nil { return "", fmt.Errorf("failed to check if job exists: %w", err) @@ -1089,7 +1093,8 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos } func (c *BigQueryConnector) CreateTablesFromExisting(req *protos.CreateTablesFromExistingInput) ( - *protos.CreateTablesFromExistingOutput, error) { + *protos.CreateTablesFromExistingOutput, error, +) { for newTable, existingTable := range req.NewToExistingTableMapping { c.logger.Info(fmt.Sprintf("creating table '%s' similar to '%s'", newTable, existingTable)) diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 7e35aadc44..2a37ef5ecb 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -61,7 +61,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { switch qvalue.QValueKind(colType) { case qvalue.QValueKindJSON: - //if the type is JSON, then just extract JSON + // if the type is JSON, then just extract JSON castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`", colName, bqType, colName) // expecting data in BASE64 format diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index 90bb871a75..a353d432eb 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -50,7 +50,8 @@ func (c *BigQueryConnector) SyncQRepRecords( } func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfig, partition *protos.QRepPartition, - srcSchema *model.QRecordSchema) (*bigquery.TableMetadata, error) { + srcSchema *model.QRecordSchema, +) (*bigquery.TableMetadata, error) { destTable := config.DestinationTableIdentifier bqTable := c.client.Dataset(c.datasetID).Table(destTable) dstTableMetadata, err := bqTable.Metadata(c.ctx) diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index d60577ec74..9bb01157fe 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -25,7 +25,8 @@ type QRepAvroSyncMethod struct { } func NewQRepAvroSyncMethod(connector *BigQueryConnector, gcsBucket string, - flowJobName string) *QRepAvroSyncMethod { + flowJobName string, +) *QRepAvroSyncMethod { return &QRepAvroSyncMethod{ connector: connector, gcsBucket: gcsBucket, @@ -179,7 +180,8 @@ type AvroSchema struct { } func DefineAvroSchema(dstTableName string, - dstTableMetadata *bigquery.TableMetadata) (*model.QRecordAvroSchemaDefinition, error) { + dstTableMetadata *bigquery.TableMetadata, +) (*model.QRecordAvroSchemaDefinition, error) { avroFields := []AvroField{} nullableFields := make(map[string]struct{}) diff --git a/flow/connectors/core.go b/flow/connectors/core.go index f7a518a6a5..707a7f0b11 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -161,7 +161,8 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne } func GetCDCNormalizeConnector(ctx context.Context, - config *protos.Peer) (CDCNormalizeConnector, error) { + config *protos.Peer, +) (CDCNormalizeConnector, error) { inner := config.Config switch inner.(type) { case *protos.Peer_PostgresConfig: @@ -246,7 +247,8 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) { } func GetQRepConsolidateConnector(ctx context.Context, - config *protos.Peer) (QRepConsolidateConnector, error) { + config *protos.Peer, +) (QRepConsolidateConnector, error) { inner := config.Config switch inner.(type) { case *protos.Peer_SnowflakeConfig: diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 01b8510e42..4be57309f2 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -281,7 +281,8 @@ func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr func (c *EventHubConnector) SetupNormalizedTables( req *protos.SetupNormalizedTableBatchInput) ( - *protos.SetupNormalizedTableBatchOutput, error) { + *protos.SetupNormalizedTableBatchOutput, error, +) { c.logger.Info("normalization for event hub is a no-op") return &protos.SetupNormalizedTableBatchOutput{ TableExistsMapping: nil, diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 725414b287..803816b819 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -42,7 +42,8 @@ func NewEventHubManager( } func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) ( - *azeventhubs.ProducerClient, error) { + *azeventhubs.ProducerClient, error, +) { ehConfig, ok := m.peerConfig.Get(name.PeerName) if !ok { return nil, fmt.Errorf("eventhub '%s' not configured", name) @@ -118,7 +119,8 @@ func (m *EventHubManager) Close(ctx context.Context) error { } func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, name ScopedEventhub) ( - *azeventhubs.EventDataBatch, error) { + *azeventhubs.EventDataBatch, error, +) { hub, err := m.GetOrCreateHubClient(ctx, name) if err != nil { return nil, err diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index e2127b3099..ef2cf5e45b 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -26,7 +26,8 @@ type PostgresMetadataStore struct { } func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig, - schemaName string) (*PostgresMetadataStore, error) { + schemaName string, +) (*PostgresMetadataStore, error) { var storePool *pgxpool.Pool var poolErr error if pgConfig == nil { @@ -222,7 +223,6 @@ func (p *PostgresMetadataStore) IncrementID(jobName string) error { UPDATE `+p.schemaName+`.`+lastSyncStateTableName+` SET sync_batch_id=sync_batch_id+1 WHERE job_name=$1 `, jobName) - if err != nil { p.logger.Error("failed to increment sync batch id", slog.Any("error", err)) return err diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 20b5a2f2e5..031ae5a8e3 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -349,7 +349,6 @@ func (p *PostgresCDCSource) consumeStream( p.logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n", xld.WALStart, xld.ServerWALEnd, xld.ServerTime)) rec, err := p.processMessage(records, xld) - if err != nil { return fmt.Errorf("error processing message: %w", err) } @@ -812,7 +811,8 @@ func (p *PostgresCDCSource) processRelationMessage( } func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest, - rec model.Record) (*model.TableWithPkey, error) { + rec model.Record, +) (*model.TableWithPkey, error) { tableName := rec.GetTableName() pkeyColsMerged := make([]byte, 0) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 1eec3c5bf7..dbae60b9e6 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -347,7 +347,8 @@ func getRawTableIdentifier(jobName string) string { } func generateCreateTableSQLForNormalizedTable(sourceTableIdentifier string, - sourceTableSchema *protos.TableSchema) string { + sourceTableSchema *protos.TableSchema, +) string { createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns)) for columnName, genericColumnType := range sourceTableSchema.Columns { createTableSQLArray = append(createTableSQLArray, fmt.Sprintf("\"%s\" %s,", columnName, @@ -443,7 +444,8 @@ func (c *PostgresConnector) majorVersionCheck(majorVersion int) (bool, error) { } func (c *PostgresConnector) updateSyncMetadata(flowJobName string, lastCP int64, syncBatchID int64, - syncRecordsTx pgx.Tx) error { + syncRecordsTx pgx.Tx, +) error { jobMetadataExists, err := c.jobMetadataExistsTx(syncRecordsTx, flowJobName) if err != nil { return fmt.Errorf("failed to get sync status for flow job: %w", err) @@ -469,7 +471,8 @@ func (c *PostgresConnector) updateSyncMetadata(flowJobName string, lastCP int64, } func (c *PostgresConnector) updateNormalizeMetadata(flowJobName string, normalizeBatchID int64, - normalizeRecordsTx pgx.Tx) error { + normalizeRecordsTx pgx.Tx, +) error { jobMetadataExists, err := c.jobMetadataExistsTx(normalizeRecordsTx, flowJobName) if err != nil { return fmt.Errorf("failed to get sync status for flow job: %w", err) @@ -489,7 +492,8 @@ func (c *PostgresConnector) updateNormalizeMetadata(flowJobName string, normaliz } func (c *PostgresConnector) getTableNametoUnchangedCols(flowJobName string, syncBatchID int64, - normalizeBatchID int64) (map[string][]string, error) { + normalizeBatchID int64, +) (map[string][]string, error) { rawTableIdentifier := getRawTableIdentifier(flowJobName) rows, err := c.pool.Query(c.ctx, fmt.Sprintf(getTableNameToUnchangedToastColsSQL, c.metadataSchema, @@ -518,7 +522,8 @@ func (c *PostgresConnector) getTableNametoUnchangedCols(flowJobName string, sync } func (c *PostgresConnector) generateNormalizeStatements(destinationTableIdentifier string, - unchangedToastColumns []string, rawTableIdentifier string, supportsMerge bool) []string { + unchangedToastColumns []string, rawTableIdentifier string, supportsMerge bool, +) []string { if supportsMerge { return []string{c.generateMergeStatement(destinationTableIdentifier, unchangedToastColumns, rawTableIdentifier)} } @@ -528,7 +533,8 @@ func (c *PostgresConnector) generateNormalizeStatements(destinationTableIdentifi } func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifier string, - rawTableIdentifier string) []string { + rawTableIdentifier string, +) []string { normalizedTableSchema := c.tableSchemaMapping[destinationTableIdentifier] columnNames := make([]string, 0, len(normalizedTableSchema.Columns)) flattenedCastsSQLArray := make([]string, 0, len(normalizedTableSchema.Columns)) @@ -576,7 +582,8 @@ func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifie } func (c *PostgresConnector) generateMergeStatement(destinationTableIdentifier string, unchangedToastColumns []string, - rawTableIdentifier string) string { + rawTableIdentifier string, +) string { normalizedTableSchema := c.tableSchemaMapping[destinationTableIdentifier] columnNames := maps.Keys(normalizedTableSchema.Columns) for i, columnName := range columnNames { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index cf2a92b5b2..da65b09c0d 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -352,8 +352,10 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S }() syncedRecordsCount, err := syncRecordsTx.CopyFrom(c.ctx, pgx.Identifier{c.metadataSchema, rawTableIdentifier}, - []string{"_peerdb_uid", "_peerdb_timestamp", "_peerdb_destination_table_name", "_peerdb_data", - "_peerdb_record_type", "_peerdb_match_data", "_peerdb_batch_id", "_peerdb_unchanged_toast_columns"}, + []string{ + "_peerdb_uid", "_peerdb_timestamp", "_peerdb_destination_table_name", "_peerdb_data", + "_peerdb_record_type", "_peerdb_match_data", "_peerdb_batch_id", "_peerdb_unchanged_toast_columns", + }, pgx.CopyFromRows(records)) if err != nil { return nil, fmt.Errorf("error syncing records: %w", err) @@ -523,7 +525,8 @@ func (c *PostgresConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr // GetTableSchema returns the schema for a table, implementing the Connector interface. func (c *PostgresConnector) GetTableSchema( - req *protos.GetTableSchemaBatchInput) (*protos.GetTableSchemaBatchOutput, error) { + req *protos.GetTableSchemaBatchInput, +) (*protos.GetTableSchemaBatchOutput, error) { res := make(map[string]*protos.TableSchema) for _, tableName := range req.TableIdentifiers { tableSchema, err := c.getTableSchemaForTable(tableName) @@ -596,7 +599,8 @@ func (c *PostgresConnector) getTableSchemaForTable( // SetupNormalizedTable sets up a normalized table, implementing the Connector interface. func (c *PostgresConnector) SetupNormalizedTables(req *protos.SetupNormalizedTableBatchInput) ( - *protos.SetupNormalizedTableBatchOutput, error) { + *protos.SetupNormalizedTableBatchOutput, error, +) { tableExistsMapping := make(map[string]bool) // Postgres is cool and supports transactional DDL. So we use a transaction. createNormalizedTablesTx, err := c.pool.Begin(c.ctx) @@ -657,7 +661,8 @@ func (c *PostgresConnector) InitializeTableSchema(req map[string]*protos.TableSc // ReplayTableSchemaDelta changes a destination table to match the schema at source // This could involve adding or dropping multiple columns. func (c *PostgresConnector) ReplayTableSchemaDeltas(flowJobName string, - schemaDeltas []*protos.TableSchemaDelta) error { + schemaDeltas []*protos.TableSchemaDelta, +) error { // Postgres is cool and supports transactional DDL. So we use a transaction. tableSchemaModifyTx, err := c.pool.Begin(c.ctx) if err != nil { @@ -736,7 +741,8 @@ func (c *PostgresConnector) EnsurePullability(req *protos.EnsurePullabilityBatch tableIdentifierMapping[tableName] = &protos.TableIdentifier{ TableIdentifier: &protos.TableIdentifier_PostgresTableIdentifier{ PostgresTableIdentifier: &protos.PostgresTableIdentifier{ - RelId: relID}, + RelId: relID, + }, }, } utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("ensured pullability table %s", tableName)) diff --git a/flow/connectors/postgres/postgres_repl_test.go b/flow/connectors/postgres/postgres_repl_test.go index 07ddd2743c..b50a1f89fc 100644 --- a/flow/connectors/postgres/postgres_repl_test.go +++ b/flow/connectors/postgres/postgres_repl_test.go @@ -105,7 +105,6 @@ func (suite *PostgresReplicationSnapshotTestSuite) TearDownSuite() { } func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { - tables := map[string]string{ "pgpeer_repl_test.test_1": "test_1_dst", } diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 69641a7415..be8daa903d 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -268,7 +268,8 @@ func (c *PostgresConnector) getMinMaxValues( } func (c *PostgresConnector) CheckForUpdatedMaxValue(config *protos.QRepConfig, - last *protos.QRepPartition) (bool, error) { + last *protos.QRepPartition, +) (bool, error) { tx, err := c.pool.Begin(c.ctx) if err != nil { return false, fmt.Errorf("unable to begin transaction for getting max value: %w", err) @@ -303,7 +304,8 @@ func (c *PostgresConnector) CheckForUpdatedMaxValue(config *protos.QRepConfig, func (c *PostgresConnector) PullQRepRecords( config *protos.QRepConfig, - partition *protos.QRepPartition) (*model.QRecordBatch, error) { + partition *protos.QRepPartition, +) (*model.QRecordBatch, error) { partitionIdLog := slog.String(string(shared.PartitionIDKey), partition.PartitionId) if partition.FullTablePartition { c.logger.Info("pulling full table partition", partitionIdLog) diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 08f9f93488..2118cec70a 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -30,7 +30,8 @@ type QRepQueryExecutor struct { } func NewQRepQueryExecutor(pool *pgxpool.Pool, ctx context.Context, - flowJobName string, partitionID string) *QRepQueryExecutor { + flowJobName string, partitionID string, +) *QRepQueryExecutor { return &QRepQueryExecutor{ pool: pool, ctx: ctx, @@ -44,7 +45,8 @@ func NewQRepQueryExecutor(pool *pgxpool.Pool, ctx context.Context, } func NewQRepQueryExecutorSnapshot(pool *pgxpool.Pool, ctx context.Context, snapshot string, - flowJobName string, partitionID string) (*QRepQueryExecutor, error) { + flowJobName string, partitionID string, +) (*QRepQueryExecutor, error) { qrepLog := slog.Group("qrep-metadata", slog.String(string(shared.FlowNameKey), flowJobName), slog.String(string(shared.PartitionIDKey), partitionID)) slog.Info("Declared new qrep executor for snapshot", qrepLog) @@ -421,7 +423,8 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx( } func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, - customTypeMap map[uint32]string) (*model.QRecord, error) { + customTypeMap map[uint32]string, +) (*model.QRecord, error) { // make vals an empty array of QValue of size len(fds) record := model.NewQRecord(len(fds)) diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index c4f2931e06..4b27f9c689 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -348,8 +348,10 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( case qvalue.QValueKindPoint: xCoord := value.(pgtype.Point).P.X yCoord := value.(pgtype.Point).P.Y - val = qvalue.QValue{Kind: qvalue.QValueKindPoint, - Value: fmt.Sprintf("POINT(%f %f)", xCoord, yCoord)} + val = qvalue.QValue{ + Kind: qvalue.QValueKindPoint, + Value: fmt.Sprintf("POINT(%f %f)", xCoord, yCoord), + } default: // log.Warnf("unhandled QValueKind => %v, parsing as string", qvalueKind) textVal, ok := value.(string) diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 6707d96200..96d16930cc 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -30,7 +30,8 @@ type S3Connector struct { } func NewS3Connector(ctx context.Context, - config *protos.S3Config) (*S3Connector, error) { + config *protos.S3Config, +) (*S3Connector, error) { keyID := "" if config.AccessKeyId != nil { keyID = *config.AccessKeyId @@ -237,7 +238,8 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes func (c *S3Connector) SetupNormalizedTables(req *protos.SetupNormalizedTableBatchInput) ( *protos.SetupNormalizedTableBatchOutput, - error) { + error, +) { c.logger.Info("SetupNormalizedTables for S3 is a no-op") return nil, nil } diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index a765ac9e60..8c926acb94 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -30,7 +30,8 @@ type SnowflakeAvroSyncMethod struct { func NewSnowflakeAvroSyncMethod( config *protos.QRepConfig, - connector *SnowflakeConnector) *SnowflakeAvroSyncMethod { + connector *SnowflakeConnector, +) *SnowflakeAvroSyncMethod { return &SnowflakeAvroSyncMethod{ config: config, connector: connector, @@ -422,7 +423,8 @@ func NewSnowflakeAvroWriteHandler( } func (s *SnowflakeAvroWriteHandler) HandleAppendMode( - copyInfo *CopyInfo) error { + copyInfo *CopyInfo, +) error { //nolint:gosec copyCmd := fmt.Sprintf("COPY INTO %s(%s) FROM (SELECT %s FROM @%s) %s", s.dstTableName, copyInfo.columnsSQL, copyInfo.transformationSQL, s.stage, strings.Join(s.copyOpts, ",")) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 9920cdad36..90b29e4765 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -113,7 +113,8 @@ type UnchangedToastColumnResult struct { } func NewSnowflakeConnector(ctx context.Context, - snowflakeProtoConfig *protos.SnowflakeConfig) (*SnowflakeConnector, error) { + snowflakeProtoConfig *protos.SnowflakeConfig, +) (*SnowflakeConnector, error) { PrivateKeyRSA, err := shared.DecodePKCS8PrivateKey([]byte(snowflakeProtoConfig.PrivateKey), snowflakeProtoConfig.Password) if err != nil { @@ -225,7 +226,8 @@ func (c *SnowflakeConnector) SetupMetadataTables() error { // only used for testing atm. doesn't return info about pkey or ReplicaIdentity [which is PG specific anyway]. func (c *SnowflakeConnector) GetTableSchema( - req *protos.GetTableSchemaBatchInput) (*protos.GetTableSchemaBatchOutput, error) { + req *protos.GetTableSchemaBatchInput, +) (*protos.GetTableSchemaBatchOutput, error) { res := make(map[string]*protos.TableSchema) for _, tableName := range req.TableIdentifiers { tableSchema, err := c.getTableSchemaForTable(strings.ToUpper(tableName)) @@ -352,7 +354,8 @@ func (c *SnowflakeConnector) GetLastNormalizeBatchID(jobName string) (int64, err } func (c *SnowflakeConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64, - normalizeBatchID int64) ([]string, error) { + normalizeBatchID int64, +) ([]string, error) { rawTableIdentifier := getRawTableIdentifier(flowJobName) rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getDistinctDestinationTableNames, c.metadataSchema, @@ -374,7 +377,8 @@ func (c *SnowflakeConnector) getDistinctTableNamesInBatch(flowJobName string, sy } func (c *SnowflakeConnector) getTableNametoUnchangedCols(flowJobName string, syncBatchID int64, - normalizeBatchID int64) (map[string][]string, error) { + normalizeBatchID int64, +) (map[string][]string, error) { rawTableIdentifier := getRawTableIdentifier(flowJobName) rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getTableNametoUnchangedColsSQL, c.metadataSchema, @@ -400,7 +404,8 @@ func (c *SnowflakeConnector) getTableNametoUnchangedCols(flowJobName string, syn } func (c *SnowflakeConnector) SetupNormalizedTables( - req *protos.SetupNormalizedTableBatchInput) (*protos.SetupNormalizedTableBatchOutput, error) { + req *protos.SetupNormalizedTableBatchInput, +) (*protos.SetupNormalizedTableBatchOutput, error) { tableExistsMapping := make(map[string]bool) for tableIdentifier, tableSchema := range req.TableNameSchemaMapping { normalizedTableNameComponents, err := parseTableName(tableIdentifier) @@ -439,7 +444,8 @@ func (c *SnowflakeConnector) InitializeTableSchema(req map[string]*protos.TableS // ReplayTableSchemaDeltas changes a destination table to match the schema at source // This could involve adding or dropping multiple columns. func (c *SnowflakeConnector) ReplayTableSchemaDeltas(flowJobName string, - schemaDeltas []*protos.TableSchemaDelta) error { + schemaDeltas []*protos.TableSchemaDelta, +) error { tableSchemaModifyTx, err := c.database.Begin() if err != nil { return fmt.Errorf("error starting transaction for schema modification: %w", @@ -927,6 +933,7 @@ func (c *SnowflakeConnector) jobMetadataExists(jobName string) (bool, error) { } return result.Bool, nil } + func (c *SnowflakeConnector) jobMetadataExistsTx(tx *sql.Tx, jobName string) (bool, error) { var result pgtype.Bool err := tx.QueryRowContext(c.ctx, @@ -938,7 +945,8 @@ func (c *SnowflakeConnector) jobMetadataExistsTx(tx *sql.Tx, jobName string) (bo } func (c *SnowflakeConnector) updateSyncMetadata(flowJobName string, lastCP int64, - syncBatchID int64, syncRecordsTx *sql.Tx) error { + syncBatchID int64, syncRecordsTx *sql.Tx, +) error { jobMetadataExists, err := c.jobMetadataExistsTx(syncRecordsTx, flowJobName) if err != nil { return fmt.Errorf("failed to get sync status for flow job: %w", err) @@ -1015,7 +1023,8 @@ and updating the other columns. */ func (c *SnowflakeConnector) generateUpdateStatements( syncedAtCol string, softDeleteCol string, softDelete bool, - allCols []string, unchangedToastCols []string) []string { + allCols []string, unchangedToastCols []string, +) []string { updateStmts := make([]string, 0, len(unchangedToastCols)) for _, cols := range unchangedToastCols { @@ -1138,7 +1147,8 @@ func (c *SnowflakeConnector) RenameTables(req *protos.RenameTablesInput) (*proto } func (c *SnowflakeConnector) CreateTablesFromExisting(req *protos.CreateTablesFromExistingInput) ( - *protos.CreateTablesFromExistingOutput, error) { + *protos.CreateTablesFromExistingOutput, error, +) { createTablesFromExistingTx, err := c.database.BeginTx(c.ctx, nil) if err != nil { return nil, fmt.Errorf("unable to begin transaction for rename tables: %w", err) diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index 02bc14bbfa..82a13b691f 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -133,7 +133,8 @@ func (g *GenericSQLQueryExecutor) CountRows(schemaName string, tableName string) func (g *GenericSQLQueryExecutor) CountNonNullRows( schemaName string, tableName string, - columnName string) (int64, error) { + columnName string, +) (int64, error) { var count pgtype.Int8 err := g.db.QueryRowx("SELECT COUNT(CASE WHEN " + columnName + " IS NOT NULL THEN 1 END) AS non_null_count FROM " + schemaName + "." + tableName).Scan(&count) @@ -265,7 +266,8 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa } func (g *GenericSQLQueryExecutor) ExecuteAndProcessQuery( - query string, args ...interface{}) (*model.QRecordBatch, error) { + query string, args ...interface{}, +) (*model.QRecordBatch, error) { rows, err := g.db.QueryxContext(g.ctx, query, args...) if err != nil { return nil, err @@ -276,7 +278,8 @@ func (g *GenericSQLQueryExecutor) ExecuteAndProcessQuery( } func (g *GenericSQLQueryExecutor) NamedExecuteAndProcessQuery( - query string, arg interface{}) (*model.QRecordBatch, error) { + query string, arg interface{}, +) (*model.QRecordBatch, error) { rows, err := g.db.NamedQueryContext(g.ctx, query, arg) if err != nil { return nil, err diff --git a/flow/connectors/sqlserver/qrep.go b/flow/connectors/sqlserver/qrep.go index fbebd4f201..cd91b1fc9a 100644 --- a/flow/connectors/sqlserver/qrep.go +++ b/flow/connectors/sqlserver/qrep.go @@ -15,7 +15,8 @@ import ( ) func (c *SQLServerConnector) GetQRepPartitions( - config *protos.QRepConfig, last *protos.QRepPartition) ([]*protos.QRepPartition, error) { + config *protos.QRepConfig, last *protos.QRepPartition, +) ([]*protos.QRepPartition, error) { if config.WatermarkTable == "" { c.logger.Info("watermark table is empty, doing full table refresh") return []*protos.QRepPartition{ @@ -152,7 +153,8 @@ func (c *SQLServerConnector) GetQRepPartitions( } func (c *SQLServerConnector) PullQRepRecords( - config *protos.QRepConfig, partition *protos.QRepPartition) (*model.QRecordBatch, error) { + config *protos.QRepConfig, partition *protos.QRepPartition, +) (*model.QRecordBatch, error) { // Build the query to pull records within the range from the source table // Be sure to order the results by the watermark column to ensure consistency across pulls query, err := BuildQuery(config.Query) diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 1f44f4b7ae..ce70801658 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -22,8 +22,10 @@ import ( uber_atomic "go.uber.org/atomic" ) -type AvroCompressionCodec int64 -type AvroStorageLocation int64 +type ( + AvroCompressionCodec int64 + AvroStorageLocation int64 +) const ( CompressNone AvroCompressionCodec = iota @@ -214,7 +216,6 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils Key: aws.String(key), Body: r, }) - if err != nil { slog.Error("failed to upload file: ", slog.Any("error", err)) return nil, fmt.Errorf("failed to upload file: %w", err) diff --git a/flow/connectors/utils/catalog/env.go b/flow/connectors/utils/catalog/env.go index ec99905092..cdd85535b9 100644 --- a/flow/connectors/utils/catalog/env.go +++ b/flow/connectors/utils/catalog/env.go @@ -11,8 +11,10 @@ import ( "github.com/jackc/pgx/v5/pgxpool" ) -var poolMutex = &sync.Mutex{} -var pool *pgxpool.Pool +var ( + poolMutex = &sync.Mutex{} + pool *pgxpool.Pool +) func GetCatalogConnectionPoolFromEnv() (*pgxpool.Pool, error) { var err error diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index 857a33c398..7815372277 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -34,7 +34,8 @@ func InitializeCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName stri } func UpdateLatestLSNAtSourceForCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string, - latestLSNAtSource pglogrepl.LSN) error { + latestLSNAtSource pglogrepl.LSN, +) error { _, err := pool.Exec(ctx, "UPDATE peerdb_stats.cdc_flows SET latest_lsn_at_source=$1 WHERE flow_name=$2", uint64(latestLSNAtSource), flowJobName) @@ -45,7 +46,8 @@ func UpdateLatestLSNAtSourceForCDCFlow(ctx context.Context, pool *pgxpool.Pool, } func UpdateLatestLSNAtTargetForCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string, - latestLSNAtTarget pglogrepl.LSN) error { + latestLSNAtTarget pglogrepl.LSN, +) error { _, err := pool.Exec(ctx, "UPDATE peerdb_stats.cdc_flows SET latest_lsn_at_target=$1 WHERE flow_name=$2", uint64(latestLSNAtTarget), flowJobName) @@ -56,7 +58,8 @@ func UpdateLatestLSNAtTargetForCDCFlow(ctx context.Context, pool *pgxpool.Pool, } func AddCDCBatchForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string, - batchInfo CDCBatchInfo) error { + batchInfo CDCBatchInfo, +) error { _, err := pool.Exec(ctx, `INSERT INTO peerdb_stats.cdc_batches(flow_name,batch_id,rows_in_batch,batch_start_lsn,batch_end_lsn, start_time) VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING`, @@ -102,7 +105,8 @@ func UpdateEndTimeForCDCBatch( } func AddCDCBatchTablesForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string, - batchID int64, tableNameRowsMapping map[string]uint32) error { + batchID int64, tableNameRowsMapping map[string]uint32, +) error { insertBatchTablesTx, err := pool.Begin(ctx) if err != nil { return fmt.Errorf("error while beginning transaction for inserting statistics into cdc_batch_table: %w", err) @@ -217,7 +221,8 @@ func AppendSlotSizeInfo( } func addPartitionToQRepRun(ctx context.Context, pool *pgxpool.Pool, flowJobName string, - runUUID string, partition *protos.QRepPartition) error { + runUUID string, partition *protos.QRepPartition, +) error { if partition.Range == nil && partition.FullTablePartition { slog.Info("partition"+partition.PartitionId+ " is a full table partition. Metrics logging is skipped.", @@ -286,7 +291,8 @@ func UpdateStartTimeForPartition( } func UpdatePullEndTimeAndRowsForPartition(ctx context.Context, pool *pgxpool.Pool, runUUID string, - partition *protos.QRepPartition, rowsInPartition int64) error { + partition *protos.QRepPartition, rowsInPartition int64, +) error { _, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET pull_end_time=$1,rows_in_partition=$2 WHERE run_uuid=$3 AND partition_uuid=$4`, time.Now(), rowsInPartition, runUUID, partition.PartitionId) if err != nil { @@ -296,7 +302,8 @@ func UpdatePullEndTimeAndRowsForPartition(ctx context.Context, pool *pgxpool.Poo } func UpdateEndTimeForPartition(ctx context.Context, pool *pgxpool.Pool, runUUID string, - partition *protos.QRepPartition) error { + partition *protos.QRepPartition, +) error { _, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET end_time=$1 WHERE run_uuid=$2 AND partition_uuid=$3`, time.Now(), runUUID, partition.PartitionId) if err != nil { @@ -306,7 +313,8 @@ func UpdateEndTimeForPartition(ctx context.Context, pool *pgxpool.Pool, runUUID } func UpdateRowsSyncedForPartition(ctx context.Context, pool *pgxpool.Pool, rowsSynced int, runUUID string, - partition *protos.QRepPartition) error { + partition *protos.QRepPartition, +) error { _, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET rows_synced=$1 WHERE run_uuid=$2 AND partition_uuid=$3`, rowsSynced, runUUID, partition.PartitionId) if err != nil { diff --git a/flow/connectors/utils/partition/partition.go b/flow/connectors/utils/partition/partition.go index 1c4f4381aa..cb2f326a66 100644 --- a/flow/connectors/utils/partition/partition.go +++ b/flow/connectors/utils/partition/partition.go @@ -54,7 +54,7 @@ func compareValues(prevEnd interface{}, start interface{}) int { return 0 } } - case uint32: //xmin + case uint32: // xmin if prevEnd.(uint32) < v { return -1 } else if prevEnd.(uint32) > v { diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 70a27b52da..de3ddae7e5 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -461,7 +461,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { // and execute a transaction touching toast columns go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - //complex transaction with random DMLs on a table with toast columns + // complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; INSERT INTO %s(t1,t2,k) SELECT random_string(9000),random_string(9000), @@ -534,7 +534,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { // and execute a transaction touching toast columns go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - //complex transaction with random DMLs on a table with toast columns + // complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; INSERT INTO %s(t1,k) SELECT random_string(9000), @@ -661,7 +661,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 1, MaxBatchSize: 100, } @@ -697,10 +696,12 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - noNulls, err := s.bqHelper.CheckNull(dstTableName, []string{"c41", "c1", "c2", "c3", "c4", + noNulls, err := s.bqHelper.CheckNull(dstTableName, []string{ + "c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36", - "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44"}) + "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", + }) if err != nil { fmt.Println("error %w", err) } diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index 3355571057..ac28879f45 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -217,7 +217,8 @@ type QRepFlowConnectionGenerationConfig struct { // GenerateQRepConfig generates a qrep config for testing. func (c *QRepFlowConnectionGenerationConfig) GenerateQRepConfig( - query string, watermark string) (*protos.QRepConfig, error) { + query string, watermark string, +) (*protos.QRepConfig, error) { ret := &protos.QRepConfig{} ret.FlowJobName = c.FlowJobName ret.WatermarkTable = c.WatermarkTable diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index 478877ef4b..bfa19f866b 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -52,7 +52,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) s.NoError(err) - //insert 20 rows + // insert 20 rows for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) @@ -124,7 +124,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) s.NoError(err) - //insert 20 rows + // insert 20 rows for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) diff --git a/flow/e2e/s3/s3_helper.go b/flow/e2e/s3/s3_helper.go index e6a4fc7ad7..249745dd3c 100644 --- a/flow/e2e/s3/s3_helper.go +++ b/flow/e2e/s3/s3_helper.go @@ -90,7 +90,6 @@ func (h *S3TestHelper) ListAllFiles( ctx context.Context, jobName string, ) ([]*s3.Object, error) { - Bucket := h.bucketName Prefix := fmt.Sprintf("%s/%s/", h.prefix, jobName) files, err := h.client.ListObjects(&s3.ListObjectsInput{ diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 037bd10eda..3e6f0c2bc0 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -455,7 +455,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { // and execute a transaction touching toast columns go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - //complex transaction with random DMLs on a table with toast columns + // complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; INSERT INTO %s (t1,t2,k) SELECT random_string(9000),random_string(9000), @@ -527,7 +527,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { // and execute a transaction touching toast columns go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - //complex transaction with random DMLs on a table with toast columns + // complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; INSERT INTO %s (t1,k) SELECT random_string(9000), @@ -688,10 +688,12 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{"c41", "c1", "c2", "c3", "c4", + noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{ + "c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36", - "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46"}) + "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46", + }) if err != nil { fmt.Println("error %w", err) } diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 456541e2ca..49ed3614b9 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -22,7 +22,6 @@ func (s PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, numRows int) func (s PeerFlowE2ETestSuiteSF) setupSFDestinationTable(dstTable string) { schema := e2e.GetOwnersSchema() err := s.sfHelper.CreateTable(dstTable, schema) - // fail if table creation fails if err != nil { require.FailNow(s.t, "unable to create table on snowflake", err) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 0d6e434457..fea19ab808 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -64,7 +64,8 @@ func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment, t *t } func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, - connectionGen FlowConnectionGenerationConfig) { + connectionGen FlowConnectionGenerationConfig, +) { // wait for PeerFlowStatusQuery to finish setup // sleep for 5 second to allow the workflow to start time.Sleep(5 * time.Second) @@ -93,7 +94,8 @@ func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, func NormalizeFlowCountQuery(env *testsuite.TestWorkflowEnvironment, connectionGen FlowConnectionGenerationConfig, - minCount int) { + minCount int, +) { // wait for PeerFlowStatusQuery to finish setup // sleep for 5 second to allow the workflow to start time.Sleep(5 * time.Second) diff --git a/flow/shared/constants.go b/flow/shared/constants.go index 8379b6718f..8589b55487 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -15,8 +15,10 @@ const ( const MirrorNameSearchAttribute = "MirrorName" -type CDCFlowSignal int64 -type ContextKey string +type ( + CDCFlowSignal int64 + ContextKey string +) const ( NoopSignal CDCFlowSignal = iota diff --git a/flow/shared/random.go b/flow/shared/random.go index 9565c34c4d..7ef3c8e5dc 100644 --- a/flow/shared/random.go +++ b/flow/shared/random.go @@ -30,7 +30,7 @@ func RandomUInt64() (uint64, error) { func RandomString(n int) string { const alphanum = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" - var bytes = make([]byte, n) + bytes := make([]byte, n) _, err := rand.Read(bytes) if err != nil { return "temp" diff --git a/flow/shared/signals.go b/flow/shared/signals.go index 91e6303e90..2097ba95c5 100644 --- a/flow/shared/signals.go +++ b/flow/shared/signals.go @@ -5,7 +5,8 @@ import ( ) func FlowSignalHandler(activeSignal CDCFlowSignal, - v CDCFlowSignal, logger log.Logger) CDCFlowSignal { + v CDCFlowSignal, logger log.Logger, +) CDCFlowSignal { if v == ShutdownSignal { logger.Info("received shutdown signal") return v diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 3e6cce1030..a2cb9a388e 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -428,8 +428,7 @@ func CDCFlowWorkflowWithConfig( state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error()) } else { for i := range modifiedSrcTables { - cfg.TableNameSchemaMapping[modifiedDstTables[i]] = - getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]] + cfg.TableNameSchemaMapping[modifiedDstTables[i]] = getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]] } } } diff --git a/flow/workflows/heartbeat_flow.go b/flow/workflows/heartbeat_flow.go index 80e89745d9..1c99900b71 100644 --- a/flow/workflows/heartbeat_flow.go +++ b/flow/workflows/heartbeat_flow.go @@ -8,7 +8,6 @@ import ( // HeartbeatFlowWorkflow is the workflow that sets up heartbeat sending. func HeartbeatFlowWorkflow(ctx workflow.Context) error { - ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 7 * 24 * time.Hour, }) diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 5f267079e1..2373427c8e 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -72,7 +72,8 @@ func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUU // NewQRepFlowExecution creates a new instance of QRepFlowExecution. func NewQRepPartitionFlowExecution(ctx workflow.Context, - config *protos.QRepConfig, runUUID string) *QRepPartitionFlowExecution { + config *protos.QRepConfig, runUUID string, +) *QRepPartitionFlowExecution { return &QRepPartitionFlowExecution{ config: config, flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID, @@ -161,7 +162,8 @@ func (q *QRepFlowExecution) GetPartitions( // ReplicatePartitions replicates the partition batch. func (q *QRepPartitionFlowExecution) ReplicatePartitions(ctx workflow.Context, - partitions *protos.QRepPartitionBatch) error { + partitions *protos.QRepPartitionBatch, +) error { ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 24 * 5 * time.Hour, HeartbeatTimeout: 5 * time.Minute, @@ -194,7 +196,8 @@ func (q *QRepFlowExecution) getPartitionWorkflowID(ctx workflow.Context) (string // startChildWorkflow starts a single child workflow. func (q *QRepFlowExecution) startChildWorkflow( ctx workflow.Context, - partitions *protos.QRepPartitionBatch) (workflow.ChildWorkflowFuture, error) { + partitions *protos.QRepPartitionBatch, +) (workflow.ChildWorkflowFuture, error) { wid, err := q.getPartitionWorkflowID(ctx) if err != nil { return nil, fmt.Errorf("failed to get child workflow ID: %w", err) diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 9604efd0fa..ab8168191c 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -164,7 +164,8 @@ func (s *SetupFlowExecution) createRawTable( // fetchTableSchemaAndSetupNormalizedTables fetches the table schema for the source table and // sets up the normalized tables on the destination peer. func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( - ctx workflow.Context, flowConnectionConfigs *protos.FlowConnectionConfigs) (map[string]*protos.TableSchema, error) { + ctx workflow.Context, flowConnectionConfigs *protos.FlowConnectionConfigs, +) (map[string]*protos.TableSchema, error) { s.logger.Info("fetching table schema for peer flow - ", s.CDCFlowName) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ @@ -270,7 +271,8 @@ func (s *SetupFlowExecution) executeSetupFlow( // SetupFlowWorkflow is the workflow that sets up the flow. func SetupFlowWorkflow(ctx workflow.Context, - config *protos.FlowConnectionConfigs) (*protos.FlowConnectionConfigs, error) { + config *protos.FlowConnectionConfigs, +) (*protos.FlowConnectionConfigs, error) { tblNameMapping := make(map[string]string) for _, v := range config.TableMappings { tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 8765bf7f6c..527fde5720 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -88,7 +88,6 @@ func (s *SnapshotFlowExecution) cloneTable( snapshotName string, mapping *protos.TableMapping, ) error { - flowName := s.config.FlowJobName cloneLog := slog.Group("clone-log", slog.String(string(shared.FlowNameKey), flowName), @@ -229,7 +228,7 @@ func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionCon logger: logger, } - var replCtx = ctx + replCtx := ctx replCtx = workflow.WithValue(replCtx, shared.FlowNameKey, config.FlowJobName) if config.DoInitialCopy { sessionOpts := &workflow.SessionOptions{