diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index c4a286098c..a11d5f48b0 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -574,7 +574,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, tmp, err := pgConn.PullQRepRecordStream(config, partition, stream) numRecords := int64(tmp) if err != nil { - slog.Error(fmt.Sprintf("failed to pull records: %v", err)) + slog.Error("failed to pull records", slog.Any("error", err)) goroutineErr = err } else { err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords) diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 06feddfa47..48e86df3c3 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -53,7 +53,7 @@ func (a *SnapshotActivity) SetupReplication( pgConn := conn.(*connpostgres.PostgresConnector) err = pgConn.SetupReplication(slotSignal, config) if err != nil { - slog.ErrorContext(ctx, fmt.Sprintf("failed to setup replication: %v", err)) + slog.ErrorContext(ctx, "failed to setup replication", slog.Any("error", err)) replicationErr <- err return } diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 5d10eca74f..0fafd328ef 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -40,7 +40,7 @@ func (h *FlowRequestHandler) getPeerID(ctx context.Context, peerName string) (in var peerType int32 err := h.pool.QueryRow(ctx, "SELECT id,type FROM peers WHERE name = $1", peerName).Scan(&id, &peerType) if err != nil { - slog.Error(fmt.Sprintf("unable to query peer id for peer %s: %s", peerName, err.Error())) + slog.Error("unable to query peer id for peer "+peerName, slog.Any("error", err)) return -1, -1, fmt.Errorf("unable to query peer id for peer %s: %s", peerName, err) } return id, peerType, nil @@ -164,7 +164,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( if req.CreateCatalogEntry { err := h.createCdcJobEntry(ctx, req, workflowID) if err != nil { - slog.Error(fmt.Sprintf("unable to create flow job entry: %v", err)) + slog.Error("unable to create flow job entry", slog.Any("error", err)) return nil, fmt.Errorf("unable to create flow job entry: %w", err) } } @@ -172,7 +172,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( var err error err = h.updateFlowConfigInCatalog(cfg) if err != nil { - slog.Error(fmt.Sprintf("unable to update flow config in catalog: %v", err)) + slog.Error("unable to update flow config in catalog", slog.Any("error", err)) return nil, fmt.Errorf("unable to update flow config in catalog: %w", err) } @@ -186,7 +186,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( state, // workflow state ) if err != nil { - slog.Error(fmt.Sprintf("unable to start PeerFlow workflow: %v", err)) + slog.Error("unable to start PeerFlow workflow", slog.Any("error", err)) return nil, fmt.Errorf("unable to start PeerFlow workflow: %w", err) } diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index db44a7dc1b..9b133c85ea 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -87,7 +87,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( // drop the staging table if err := bqClient.Dataset(datasetID).Table(stagingTable).Delete(s.connector.ctx); err != nil { // just log the error this isn't fatal. - slog.Error(fmt.Sprintf("failed to delete staging table %s", stagingTable), + slog.Error("failed to delete staging table "+stagingTable, slog.Any("error", err), slog.String("syncBatchID", fmt.Sprint(syncBatchID)), slog.String("destinationTable", dstTableName)) @@ -156,7 +156,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( // drop the staging table if err := bqClient.Dataset(datasetID).Table(stagingTable).Delete(s.connector.ctx); err != nil { // just log the error this isn't fatal. - slog.Error(fmt.Sprintf("failed to delete staging table %s", stagingTable), + slog.Error("failed to delete staging table "+stagingTable, slog.Any("error", err), flowLog) } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 0a57a71ab7..b2563d5638 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -32,7 +32,8 @@ func NewEventHubConnector( ) (*EventHubConnector, error) { defaultAzureCreds, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { - slog.ErrorContext(ctx, fmt.Sprintf("failed to get default azure credentials: %v", err)) + slog.ErrorContext(ctx, "failed to get default azure credentials", + slog.Any("error", err)) return nil, err } @@ -41,7 +42,8 @@ func NewEventHubConnector( pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, config.GetMetadataDb(), metadataSchemaName) if err != nil { - slog.ErrorContext(ctx, fmt.Sprintf("failed to create postgres metadata store: %v", err)) + slog.ErrorContext(ctx, "failed to create postgres metadata store", + slog.Any("error", err)) return nil, err } diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage.go b/flow/connectors/utils/cdc_records/cdc_records_storage.go index 025618fa6e..8f353bbff2 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage.go @@ -84,7 +84,8 @@ func (c *cdcRecordsStore) Set(key model.TableWithPkey, rec model.Record) error { c.inMemoryRecords[key] = rec } else { if c.pebbleDB == nil { - slog.Info("more than %d primary keys read, spilling to disk", c.numRecordsSwitchThreshold, + slog.Info(fmt.Sprintf("more than %d primary keys read, spilling to disk", + c.numRecordsSwitchThreshold), slog.String("flowName", c.flowJobName)) err := c.initPebbleDB() if err != nil {