Skip to content

Commit

Permalink
Replace gofmt with gofumpt (#835)
Browse files Browse the repository at this point in the history
gofumpt is a stricter superset of gofmt
  • Loading branch information
serprex authored Dec 18, 2023
1 parent 6a8b9ce commit 860d0a5
Show file tree
Hide file tree
Showing 45 changed files with 197 additions and 118 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/golang-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
pull-requests: write
strategy:
matrix:
runner: [ubicloud-standard-2-ubuntu-2204-arm]
runner: [ubicloud-standard-4-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
Expand Down
2 changes: 1 addition & 1 deletion flow/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ linters:
enable:
- dogsled
- dupl
- gofmt
- gofumpt
- gosec
- misspell
- nakedret
Expand Down
18 changes: 12 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ func (a *FlowableActivity) recordSlotSizePeriodically(

// StartFlow implements StartFlow.
func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput) (*model.SyncResponse, error) {
input *protos.StartFlowInput,
) (*model.SyncResponse, error) {
activity.RecordHeartbeat(ctx, "starting flow...")
conn := input.FlowConnectionConfigs
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
Expand Down Expand Up @@ -658,7 +659,8 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}

func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig,
runUUID string) error {
runUUID string,
) error {
dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID)
Expand Down Expand Up @@ -802,7 +804,8 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
}

func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
config *protos.QRepConfig, last *protos.QRepPartition) error {
config *protos.QRepConfig, last *protos.QRepPartition,
) error {
if config.SourcePeer.Type != protos.DBType_POSTGRES || last.Range == nil {
return nil
}
Expand Down Expand Up @@ -838,7 +841,8 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
}

func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (
*protos.RenameTablesOutput, error) {
*protos.RenameTablesOutput, error,
) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand All @@ -862,7 +866,8 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
}

func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) (
*protos.CreateTablesFromExistingOutput, error) {
*protos.CreateTablesFromExistingOutput, error,
) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, req.Peer)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand Down Expand Up @@ -932,7 +937,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
Range: &protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{
IntRange: &protos.IntPartitionRange{Start: 0, End: int64(numRecords)},
}},
},
},
}
}
updateErr := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
Expand Down
4 changes: 2 additions & 2 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) {
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)

if err != nil {
return nil, fmt.Errorf("unable to dial grpc server: %w", err)
}
Expand All @@ -68,7 +67,8 @@ func killExistingHeartbeatFlows(
ctx context.Context,
tc client.Client,
namespace string,
taskQueue string) error {
taskQueue string,
) error {
listRes, err := tc.ListWorkflow(ctx,
&workflowservice.ListWorkflowExecutionsRequest{
Namespace: namespace,
Expand Down
13 changes: 8 additions & 5 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func schemaForTableIdentifier(tableIdentifier string, peerDBType int32) string {
}

func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
req *protos.CreateCDCFlowRequest, workflowID string) error {
req *protos.CreateCDCFlowRequest, workflowID string,
) error {
sourcePeerID, sourePeerType, srcErr := h.getPeerID(ctx, req.ConnectionConfigs.Source.Name)
if srcErr != nil {
return fmt.Errorf("unable to get peer id for source peer %s: %w",
Expand Down Expand Up @@ -86,7 +87,8 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
}

func (h *FlowRequestHandler) createQrepJobEntry(ctx context.Context,
req *protos.CreateQRepFlowRequest, workflowID string) error {
req *protos.CreateQRepFlowRequest, workflowID string,
) error {
sourcePeerName := req.QrepConfig.SourcePeer.Name
sourcePeerID, _, srcErr := h.getPeerID(ctx, sourcePeerName)
if srcErr != nil {
Expand Down Expand Up @@ -117,7 +119,8 @@ func (h *FlowRequestHandler) createQrepJobEntry(ctx context.Context,
}

func (h *FlowRequestHandler) CreateCDCFlow(
ctx context.Context, req *protos.CreateCDCFlowRequest) (*protos.CreateCDCFlowResponse, error) {
ctx context.Context, req *protos.CreateCDCFlowRequest,
) (*protos.CreateCDCFlowResponse, error) {
cfg := req.ConnectionConfigs
workflowID := fmt.Sprintf("%s-peerflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
Expand Down Expand Up @@ -224,7 +227,8 @@ func (h *FlowRequestHandler) removeFlowEntryInCatalog(
}

func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest) (*protos.CreateQRepFlowResponse, error) {
ctx context.Context, req *protos.CreateQRepFlowRequest,
) (*protos.CreateQRepFlowResponse, error) {
cfg := req.QrepConfig
workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
Expand Down Expand Up @@ -697,5 +701,4 @@ func (h *FlowRequestHandler) DropPeer(
return &protos.DropPeerResponse{
Ok: true,
}, nil

}
1 change: 0 additions & 1 deletion flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func setupPyroscope(opts *WorkerOptions) {
pyroscope.ProfileBlockDuration,
},
})

if err != nil {
log.Fatal(err)
}
Expand Down
17 changes: 11 additions & 6 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ func (c *BigQueryConnector) WaitForTableReady(tblName string) error {
// ReplayTableSchemaDeltas changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string,
schemaDeltas []*protos.TableSchemaDelta) error {
schemaDeltas []*protos.TableSchemaDelta,
) error {
for _, schemaDelta := range schemaDeltas {
if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 {
continue
Expand Down Expand Up @@ -390,7 +391,8 @@ func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, erro
}

func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64,
normalizeBatchID int64) ([]string, error) {
normalizeBatchID int64,
) ([]string, error) {
rawTableName := c.getRawTableName(flowJobName)

// Prepare the query to retrieve distinct tables in that batch
Expand Down Expand Up @@ -426,7 +428,8 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syn
}

func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, syncBatchID int64,
normalizeBatchID int64) (map[string][]string, error) {
normalizeBatchID int64,
) (map[string][]string, error) {
rawTableName := c.getRawTableName(flowJobName)

// Prepare the query to retrieve distinct tables in that batch
Expand Down Expand Up @@ -795,7 +798,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
mergeStmts := mergeGen.generateMergeStmts()
stmts = append(stmts, mergeStmts...)
}
//update metadata to make the last normalized batch id to the recent last sync batch id.
// update metadata to make the last normalized batch id to the recent last sync batch id.
updateMetadataStmt := fmt.Sprintf(
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name = '%s';",
c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName)
Expand Down Expand Up @@ -894,7 +897,8 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr

// getUpdateMetadataStmt updates the metadata tables for a given job.
func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedCheckpointID int64,
batchID int64) (string, error) {
batchID int64,
) (string, error) {
hasJob, err := c.metadataHasJob(jobName)
if err != nil {
return "", fmt.Errorf("failed to check if job exists: %w", err)
Expand Down Expand Up @@ -1089,7 +1093,8 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos
}

func (c *BigQueryConnector) CreateTablesFromExisting(req *protos.CreateTablesFromExistingInput) (
*protos.CreateTablesFromExistingOutput, error) {
*protos.CreateTablesFromExistingOutput, error,
) {
for newTable, existingTable := range req.NewToExistingTableMapping {
c.logger.Info(fmt.Sprintf("creating table '%s' similar to '%s'", newTable, existingTable))

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {

switch qvalue.QValueKind(colType) {
case qvalue.QValueKindJSON:
//if the type is JSON, then just extract JSON
// if the type is JSON, then just extract JSON
castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`",
colName, bqType, colName)
// expecting data in BASE64 format
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func (c *BigQueryConnector) SyncQRepRecords(
}

func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfig, partition *protos.QRepPartition,
srcSchema *model.QRecordSchema) (*bigquery.TableMetadata, error) {
srcSchema *model.QRecordSchema,
) (*bigquery.TableMetadata, error) {
destTable := config.DestinationTableIdentifier
bqTable := c.client.Dataset(c.datasetID).Table(destTable)
dstTableMetadata, err := bqTable.Metadata(c.ctx)
Expand Down
6 changes: 4 additions & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ type QRepAvroSyncMethod struct {
}

func NewQRepAvroSyncMethod(connector *BigQueryConnector, gcsBucket string,
flowJobName string) *QRepAvroSyncMethod {
flowJobName string,
) *QRepAvroSyncMethod {
return &QRepAvroSyncMethod{
connector: connector,
gcsBucket: gcsBucket,
Expand Down Expand Up @@ -179,7 +180,8 @@ type AvroSchema struct {
}

func DefineAvroSchema(dstTableName string,
dstTableMetadata *bigquery.TableMetadata) (*model.QRecordAvroSchemaDefinition, error) {
dstTableMetadata *bigquery.TableMetadata,
) (*model.QRecordAvroSchemaDefinition, error) {
avroFields := []AvroField{}
nullableFields := make(map[string]struct{})

Expand Down
6 changes: 4 additions & 2 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne
}

func GetCDCNormalizeConnector(ctx context.Context,
config *protos.Peer) (CDCNormalizeConnector, error) {
config *protos.Peer,
) (CDCNormalizeConnector, error) {
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
Expand Down Expand Up @@ -246,7 +247,8 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
}

func GetQRepConsolidateConnector(ctx context.Context,
config *protos.Peer) (QRepConsolidateConnector, error) {
config *protos.Peer,
) (QRepConsolidateConnector, error) {
inner := config.Config
switch inner.(type) {
case *protos.Peer_SnowflakeConfig:
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr

func (c *EventHubConnector) SetupNormalizedTables(
req *protos.SetupNormalizedTableBatchInput) (
*protos.SetupNormalizedTableBatchOutput, error) {
*protos.SetupNormalizedTableBatchOutput, error,
) {
c.logger.Info("normalization for event hub is a no-op")
return &protos.SetupNormalizedTableBatchOutput{
TableExistsMapping: nil,
Expand Down
6 changes: 4 additions & 2 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func NewEventHubManager(
}

func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) (
*azeventhubs.ProducerClient, error) {
*azeventhubs.ProducerClient, error,
) {
ehConfig, ok := m.peerConfig.Get(name.PeerName)
if !ok {
return nil, fmt.Errorf("eventhub '%s' not configured", name)
Expand Down Expand Up @@ -118,7 +119,8 @@ func (m *EventHubManager) Close(ctx context.Context) error {
}

func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, name ScopedEventhub) (
*azeventhubs.EventDataBatch, error) {
*azeventhubs.EventDataBatch, error,
) {
hub, err := m.GetOrCreateHubClient(ctx, name)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type PostgresMetadataStore struct {
}

func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig,
schemaName string) (*PostgresMetadataStore, error) {
schemaName string,
) (*PostgresMetadataStore, error) {
var storePool *pgxpool.Pool
var poolErr error
if pgConfig == nil {
Expand Down Expand Up @@ -222,7 +223,6 @@ func (p *PostgresMetadataStore) IncrementID(jobName string) error {
UPDATE `+p.schemaName+`.`+lastSyncStateTableName+`
SET sync_batch_id=sync_batch_id+1 WHERE job_name=$1
`, jobName)

if err != nil {
p.logger.Error("failed to increment sync batch id", slog.Any("error", err))
return err
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,6 @@ func (p *PostgresCDCSource) consumeStream(
p.logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n",
xld.WALStart, xld.ServerWALEnd, xld.ServerTime))
rec, err := p.processMessage(records, xld)

if err != nil {
return fmt.Errorf("error processing message: %w", err)
}
Expand Down Expand Up @@ -812,7 +811,8 @@ func (p *PostgresCDCSource) processRelationMessage(
}

func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest,
rec model.Record) (*model.TableWithPkey, error) {
rec model.Record,
) (*model.TableWithPkey, error) {
tableName := rec.GetTableName()
pkeyColsMerged := make([]byte, 0)

Expand Down
Loading

0 comments on commit 860d0a5

Please sign in to comment.