Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace gofmt with gofumpt #835

Merged
merged 3 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/golang-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
pull-requests: write
strategy:
matrix:
runner: [ubicloud-standard-2-ubuntu-2204-arm]
runner: [ubicloud-standard-4-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
Expand Down
2 changes: 1 addition & 1 deletion flow/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ linters:
enable:
- dogsled
- dupl
- gofmt
- gofumpt
- gosec
- misspell
- nakedret
Expand Down
18 changes: 12 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ func (a *FlowableActivity) recordSlotSizePeriodically(

// StartFlow implements StartFlow.
func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput) (*model.SyncResponse, error) {
input *protos.StartFlowInput,
) (*model.SyncResponse, error) {
activity.RecordHeartbeat(ctx, "starting flow...")
conn := input.FlowConnectionConfigs
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
Expand Down Expand Up @@ -658,7 +659,8 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}

func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig,
runUUID string) error {
runUUID string,
) error {
dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID)
Expand Down Expand Up @@ -802,7 +804,8 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
}

func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
config *protos.QRepConfig, last *protos.QRepPartition) error {
config *protos.QRepConfig, last *protos.QRepPartition,
) error {
if config.SourcePeer.Type != protos.DBType_POSTGRES || last.Range == nil {
return nil
}
Expand Down Expand Up @@ -838,7 +841,8 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
}

func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (
*protos.RenameTablesOutput, error) {
*protos.RenameTablesOutput, error,
) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand All @@ -862,7 +866,8 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
}

func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) (
*protos.CreateTablesFromExistingOutput, error) {
*protos.CreateTablesFromExistingOutput, error,
) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, req.Peer)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand Down Expand Up @@ -932,7 +937,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
Range: &protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{
IntRange: &protos.IntPartitionRange{Start: 0, End: int64(numRecords)},
}},
},
},
}
}
updateErr := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
Expand Down
4 changes: 2 additions & 2 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) {
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)

if err != nil {
return nil, fmt.Errorf("unable to dial grpc server: %w", err)
}
Expand All @@ -68,7 +67,8 @@ func killExistingHeartbeatFlows(
ctx context.Context,
tc client.Client,
namespace string,
taskQueue string) error {
taskQueue string,
) error {
listRes, err := tc.ListWorkflow(ctx,
&workflowservice.ListWorkflowExecutionsRequest{
Namespace: namespace,
Expand Down
13 changes: 8 additions & 5 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func schemaForTableIdentifier(tableIdentifier string, peerDBType int32) string {
}

func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
req *protos.CreateCDCFlowRequest, workflowID string) error {
req *protos.CreateCDCFlowRequest, workflowID string,
) error {
sourcePeerID, sourePeerType, srcErr := h.getPeerID(ctx, req.ConnectionConfigs.Source.Name)
if srcErr != nil {
return fmt.Errorf("unable to get peer id for source peer %s: %w",
Expand Down Expand Up @@ -86,7 +87,8 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
}

func (h *FlowRequestHandler) createQrepJobEntry(ctx context.Context,
req *protos.CreateQRepFlowRequest, workflowID string) error {
req *protos.CreateQRepFlowRequest, workflowID string,
) error {
sourcePeerName := req.QrepConfig.SourcePeer.Name
sourcePeerID, _, srcErr := h.getPeerID(ctx, sourcePeerName)
if srcErr != nil {
Expand Down Expand Up @@ -117,7 +119,8 @@ func (h *FlowRequestHandler) createQrepJobEntry(ctx context.Context,
}

func (h *FlowRequestHandler) CreateCDCFlow(
ctx context.Context, req *protos.CreateCDCFlowRequest) (*protos.CreateCDCFlowResponse, error) {
ctx context.Context, req *protos.CreateCDCFlowRequest,
) (*protos.CreateCDCFlowResponse, error) {
cfg := req.ConnectionConfigs
workflowID := fmt.Sprintf("%s-peerflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
Expand Down Expand Up @@ -224,7 +227,8 @@ func (h *FlowRequestHandler) removeFlowEntryInCatalog(
}

func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest) (*protos.CreateQRepFlowResponse, error) {
ctx context.Context, req *protos.CreateQRepFlowRequest,
) (*protos.CreateQRepFlowResponse, error) {
cfg := req.QrepConfig
workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
Expand Down Expand Up @@ -697,5 +701,4 @@ func (h *FlowRequestHandler) DropPeer(
return &protos.DropPeerResponse{
Ok: true,
}, nil

}
1 change: 0 additions & 1 deletion flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func setupPyroscope(opts *WorkerOptions) {
pyroscope.ProfileBlockDuration,
},
})

if err != nil {
log.Fatal(err)
}
Expand Down
17 changes: 11 additions & 6 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ func (c *BigQueryConnector) WaitForTableReady(tblName string) error {
// ReplayTableSchemaDeltas changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string,
schemaDeltas []*protos.TableSchemaDelta) error {
schemaDeltas []*protos.TableSchemaDelta,
) error {
for _, schemaDelta := range schemaDeltas {
if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 {
continue
Expand Down Expand Up @@ -390,7 +391,8 @@ func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, erro
}

func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64,
normalizeBatchID int64) ([]string, error) {
normalizeBatchID int64,
) ([]string, error) {
rawTableName := c.getRawTableName(flowJobName)

// Prepare the query to retrieve distinct tables in that batch
Expand Down Expand Up @@ -426,7 +428,8 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syn
}

func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, syncBatchID int64,
normalizeBatchID int64) (map[string][]string, error) {
normalizeBatchID int64,
) (map[string][]string, error) {
rawTableName := c.getRawTableName(flowJobName)

// Prepare the query to retrieve distinct tables in that batch
Expand Down Expand Up @@ -795,7 +798,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
mergeStmts := mergeGen.generateMergeStmts()
stmts = append(stmts, mergeStmts...)
}
//update metadata to make the last normalized batch id to the recent last sync batch id.
// update metadata to make the last normalized batch id to the recent last sync batch id.
updateMetadataStmt := fmt.Sprintf(
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name = '%s';",
c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName)
Expand Down Expand Up @@ -894,7 +897,8 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr

// getUpdateMetadataStmt updates the metadata tables for a given job.
func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedCheckpointID int64,
batchID int64) (string, error) {
batchID int64,
) (string, error) {
hasJob, err := c.metadataHasJob(jobName)
if err != nil {
return "", fmt.Errorf("failed to check if job exists: %w", err)
Expand Down Expand Up @@ -1089,7 +1093,8 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos
}

func (c *BigQueryConnector) CreateTablesFromExisting(req *protos.CreateTablesFromExistingInput) (
*protos.CreateTablesFromExistingOutput, error) {
*protos.CreateTablesFromExistingOutput, error,
) {
for newTable, existingTable := range req.NewToExistingTableMapping {
c.logger.Info(fmt.Sprintf("creating table '%s' similar to '%s'", newTable, existingTable))

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {

switch qvalue.QValueKind(colType) {
case qvalue.QValueKindJSON:
//if the type is JSON, then just extract JSON
// if the type is JSON, then just extract JSON
castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`",
colName, bqType, colName)
// expecting data in BASE64 format
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func (c *BigQueryConnector) SyncQRepRecords(
}

func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfig, partition *protos.QRepPartition,
srcSchema *model.QRecordSchema) (*bigquery.TableMetadata, error) {
srcSchema *model.QRecordSchema,
) (*bigquery.TableMetadata, error) {
destTable := config.DestinationTableIdentifier
bqTable := c.client.Dataset(c.datasetID).Table(destTable)
dstTableMetadata, err := bqTable.Metadata(c.ctx)
Expand Down
6 changes: 4 additions & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ type QRepAvroSyncMethod struct {
}

func NewQRepAvroSyncMethod(connector *BigQueryConnector, gcsBucket string,
flowJobName string) *QRepAvroSyncMethod {
flowJobName string,
) *QRepAvroSyncMethod {
return &QRepAvroSyncMethod{
connector: connector,
gcsBucket: gcsBucket,
Expand Down Expand Up @@ -179,7 +180,8 @@ type AvroSchema struct {
}

func DefineAvroSchema(dstTableName string,
dstTableMetadata *bigquery.TableMetadata) (*model.QRecordAvroSchemaDefinition, error) {
dstTableMetadata *bigquery.TableMetadata,
) (*model.QRecordAvroSchemaDefinition, error) {
avroFields := []AvroField{}
nullableFields := make(map[string]struct{})

Expand Down
6 changes: 4 additions & 2 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne
}

func GetCDCNormalizeConnector(ctx context.Context,
config *protos.Peer) (CDCNormalizeConnector, error) {
config *protos.Peer,
) (CDCNormalizeConnector, error) {
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
Expand Down Expand Up @@ -246,7 +247,8 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
}

func GetQRepConsolidateConnector(ctx context.Context,
config *protos.Peer) (QRepConsolidateConnector, error) {
config *protos.Peer,
) (QRepConsolidateConnector, error) {
inner := config.Config
switch inner.(type) {
case *protos.Peer_SnowflakeConfig:
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr

func (c *EventHubConnector) SetupNormalizedTables(
req *protos.SetupNormalizedTableBatchInput) (
*protos.SetupNormalizedTableBatchOutput, error) {
*protos.SetupNormalizedTableBatchOutput, error,
) {
c.logger.Info("normalization for event hub is a no-op")
return &protos.SetupNormalizedTableBatchOutput{
TableExistsMapping: nil,
Expand Down
6 changes: 4 additions & 2 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func NewEventHubManager(
}

func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) (
*azeventhubs.ProducerClient, error) {
*azeventhubs.ProducerClient, error,
) {
ehConfig, ok := m.peerConfig.Get(name.PeerName)
if !ok {
return nil, fmt.Errorf("eventhub '%s' not configured", name)
Expand Down Expand Up @@ -118,7 +119,8 @@ func (m *EventHubManager) Close(ctx context.Context) error {
}

func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, name ScopedEventhub) (
*azeventhubs.EventDataBatch, error) {
*azeventhubs.EventDataBatch, error,
) {
hub, err := m.GetOrCreateHubClient(ctx, name)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type PostgresMetadataStore struct {
}

func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig,
schemaName string) (*PostgresMetadataStore, error) {
schemaName string,
) (*PostgresMetadataStore, error) {
var storePool *pgxpool.Pool
var poolErr error
if pgConfig == nil {
Expand Down Expand Up @@ -222,7 +223,6 @@ func (p *PostgresMetadataStore) IncrementID(jobName string) error {
UPDATE `+p.schemaName+`.`+lastSyncStateTableName+`
SET sync_batch_id=sync_batch_id+1 WHERE job_name=$1
`, jobName)

if err != nil {
p.logger.Error("failed to increment sync batch id", slog.Any("error", err))
return err
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,6 @@ func (p *PostgresCDCSource) consumeStream(
p.logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n",
xld.WALStart, xld.ServerWALEnd, xld.ServerTime))
rec, err := p.processMessage(records, xld)

if err != nil {
return fmt.Errorf("error processing message: %w", err)
}
Expand Down Expand Up @@ -812,7 +811,8 @@ func (p *PostgresCDCSource) processRelationMessage(
}

func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest,
rec model.Record) (*model.TableWithPkey, error) {
rec model.Record,
) (*model.TableWithPkey, error) {
tableName := rec.GetTableName()
pkeyColsMerged := make([]byte, 0)

Expand Down
Loading
Loading