Skip to content

Commit

Permalink
refinements pt.2
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 8, 2023
1 parent bf7cc1b commit 38a15eb
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 11 deletions.
2 changes: 1 addition & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
tmp, err := pgConn.PullQRepRecordStream(config, partition, stream)
numRecords := int64(tmp)
if err != nil {
slog.Error(fmt.Sprintf("failed to pull records: %v", err))
slog.Error("failed to pull records", slog.Any("error", err))
goroutineErr = err
} else {
err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords)
Expand Down
2 changes: 1 addition & 1 deletion flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (a *SnapshotActivity) SetupReplication(
pgConn := conn.(*connpostgres.PostgresConnector)
err = pgConn.SetupReplication(slotSignal, config)
if err != nil {
slog.ErrorContext(ctx, fmt.Sprintf("failed to setup replication: %v", err))
slog.ErrorContext(ctx, "failed to setup replication", slog.Any("error", err))
replicationErr <- err
return
}
Expand Down
8 changes: 4 additions & 4 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (h *FlowRequestHandler) getPeerID(ctx context.Context, peerName string) (in
var peerType int32
err := h.pool.QueryRow(ctx, "SELECT id,type FROM peers WHERE name = $1", peerName).Scan(&id, &peerType)
if err != nil {
slog.Error(fmt.Sprintf("unable to query peer id for peer %s: %s", peerName, err.Error()))
slog.Error("unable to query peer id for peer "+peerName, slog.Any("error", err))
return -1, -1, fmt.Errorf("unable to query peer id for peer %s: %s", peerName, err)
}
return id, peerType, nil
Expand Down Expand Up @@ -164,15 +164,15 @@ func (h *FlowRequestHandler) CreateCDCFlow(
if req.CreateCatalogEntry {
err := h.createCdcJobEntry(ctx, req, workflowID)
if err != nil {
slog.Error(fmt.Sprintf("unable to create flow job entry: %v", err))
slog.Error("unable to create flow job entry", slog.Any("error", err))
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}
}

var err error
err = h.updateFlowConfigInCatalog(cfg)
if err != nil {
slog.Error(fmt.Sprintf("unable to update flow config in catalog: %v", err))
slog.Error("unable to update flow config in catalog", slog.Any("error", err))
return nil, fmt.Errorf("unable to update flow config in catalog: %w", err)
}

Expand All @@ -186,7 +186,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
state, // workflow state
)
if err != nil {
slog.Error(fmt.Sprintf("unable to start PeerFlow workflow: %v", err))
slog.Error("unable to start PeerFlow workflow", slog.Any("error", err))
return nil, fmt.Errorf("unable to start PeerFlow workflow: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
// drop the staging table
if err := bqClient.Dataset(datasetID).Table(stagingTable).Delete(s.connector.ctx); err != nil {
// just log the error this isn't fatal.
slog.Error(fmt.Sprintf("failed to delete staging table %s", stagingTable),
slog.Error("failed to delete staging table "+stagingTable,
slog.Any("error", err),
slog.String("syncBatchID", fmt.Sprint(syncBatchID)),
slog.String("destinationTable", dstTableName))
Expand Down Expand Up @@ -156,7 +156,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
// drop the staging table
if err := bqClient.Dataset(datasetID).Table(stagingTable).Delete(s.connector.ctx); err != nil {
// just log the error this isn't fatal.
slog.Error(fmt.Sprintf("failed to delete staging table %s", stagingTable),
slog.Error("failed to delete staging table "+stagingTable,
slog.Any("error", err),
flowLog)
}
Expand Down
6 changes: 4 additions & 2 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func NewEventHubConnector(
) (*EventHubConnector, error) {
defaultAzureCreds, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
slog.ErrorContext(ctx, fmt.Sprintf("failed to get default azure credentials: %v", err))
slog.ErrorContext(ctx, "failed to get default azure credentials",
slog.Any("error", err))
return nil, err
}

Expand All @@ -41,7 +42,8 @@ func NewEventHubConnector(
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, config.GetMetadataDb(),
metadataSchemaName)
if err != nil {
slog.ErrorContext(ctx, fmt.Sprintf("failed to create postgres metadata store: %v", err))
slog.ErrorContext(ctx, "failed to create postgres metadata store",
slog.Any("error", err))
return nil, err
}

Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/utils/cdc_records/cdc_records_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func (c *cdcRecordsStore) Set(key model.TableWithPkey, rec model.Record) error {
c.inMemoryRecords[key] = rec
} else {
if c.pebbleDB == nil {
slog.Info("more than %d primary keys read, spilling to disk", c.numRecordsSwitchThreshold,
slog.Info(fmt.Sprintf("more than %d primary keys read, spilling to disk",
c.numRecordsSwitchThreshold),
slog.String("flowName", c.flowJobName))
err := c.initPebbleDB()
if err != nil {
Expand Down

0 comments on commit 38a15eb

Please sign in to comment.