Skip to content

Commit

Permalink
refinements and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 11, 2023
1 parent 27b6351 commit ebfbdc0
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 7 deletions.
2 changes: 1 addition & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
PushParallelism: input.FlowConnectionConfigs.PushParallelism,
})
if err != nil {
slog.Warn(fmt.Sprintf("failed to push records %v", err))
slog.Warn("failed to push records", slog.Any("error", err))
return nil, fmt.Errorf("failed to push records: %w", err)
}

Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,12 @@ func (c *BigQueryConnector) GetLastOffset(jobName string) (*protos.LastSyncState
var row []bigquery.Value
err = it.Next(&row)
if err != nil {
c.logger.Info(fmt.Sprintf("no row found for job %s, returning nil", jobName))
c.logger.Info("no row found, returning nil")
return nil, nil
}

if row[0] == nil {
c.logger.Info(fmt.Sprintf("no offset found for job %s, returning nil", jobName))
c.logger.Info("no offset found, returning nil")
return nil, nil
} else {
return &protos.LastSyncState{
Expand All @@ -332,12 +332,12 @@ func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
var row []bigquery.Value
err = it.Next(&row)
if err != nil {
c.logger.Info(fmt.Sprintf("no row found for job %s", jobName))
c.logger.Info("no row found")
return 0, nil
}

if row[0] == nil {
c.logger.Info(fmt.Sprintf("no sync_batch_id found for job %s, returning 0", jobName))
c.logger.Info("no sync_batch_id found, returning 0")
return 0, nil
} else {
return row[0].(int64), nil
Expand Down Expand Up @@ -733,7 +733,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
// if job is not yet found in the peerdb_mirror_jobs_table
// OR sync is lagging end normalize
if !hasJob || normalizeBatchID == syncBatchID {
c.logger.Info(fmt.Sprintf("waiting for sync to catch up for job %s, so finishing", req.FlowJobName))
c.logger.Info("waiting for sync to catch up, so finishing")
return &model.NormalizeResponse{
Done: false,
StartBatchID: normalizeBatchID,
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (c *PostgresConnector) GetLastOffset(jobName string) (*protos.LastSyncState
return nil, fmt.Errorf("error while reading result row: %w", err)
}
if result.Int64 == 0 {
c.logger.Warn("Assuming zero offset means no sync has happened for job %s, returning nil", jobName)
c.logger.Warn("Assuming zero offset means no sync has happened, returning nil")
return nil, nil
}

Expand Down

0 comments on commit ebfbdc0

Please sign in to comment.