Skip to content

Commit

Permalink
explicit logging id
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Apr 22, 2024
1 parent ac99c5b commit 2388f10
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
10 changes: 7 additions & 3 deletions flow/connectors/connelasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,21 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config *
Body: bytes.NewReader(qRecordJsonBytes),

// OnFailure is called for each failed operation, log and let parent handle
OnFailure: func(ctx context.Context, _ esutil.BulkIndexerItem,
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem,
res esutil.BulkIndexerResponseItem, err error,
) {
bulkIndexMutex.Lock()
defer bulkIndexMutex.Unlock()
if err != nil {
bulkIndexErrors = append(bulkIndexErrors, err)
} else {
causeString := ""
if res.Error.Cause.Type != "" || res.Error.Cause.Reason != "" {
causeString = fmt.Sprintf("(caused by type:%s reason:%s)", res.Error.Cause.Type, res.Error.Cause.Reason)
}
bulkIndexErrors = append(bulkIndexErrors,
fmt.Errorf("type:%s reason:%s caused by:(%v)", res.Error.Type,
res.Error.Reason, res.Error.Cause))
fmt.Errorf("id:%s type:%s reason:%s %s", item.DocumentID, res.Error.Type,
res.Error.Reason, causeString))
}
},
})
Expand Down
5 changes: 1 addition & 4 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,7 @@ func (c *PostgresConnector) getNumRowsPartitions(
}

// Calculate the number of partitions
numPartitions := totalRows.Int64 / numRowsPerPartition
if totalRows.Int64%numRowsPerPartition != 0 {
numPartitions++
}
numPartitions := shared.DivCeil(totalRows.Int64, numRowsPerPartition)
c.logger.Info(fmt.Sprintf("total rows: %d, num partitions: %d, num rows per partition: %d",
totalRows.Int64, numPartitions, numRowsPerPartition))

Expand Down

0 comments on commit 2388f10

Please sign in to comment.