diff --git a/flow/connectors/connelasticsearch/elasticsearch.go b/flow/connectors/connelasticsearch/elasticsearch.go index db324ea0f5..01fa27d7e7 100644 --- a/flow/connectors/connelasticsearch/elasticsearch.go +++ b/flow/connectors/connelasticsearch/elasticsearch.go @@ -159,7 +159,7 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config * Body: bytes.NewReader(qRecordJsonBytes), // OnFailure is called for each failed operation, log and let parent handle - OnFailure: func(ctx context.Context, _ esutil.BulkIndexerItem, + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error, ) { bulkIndexMutex.Lock() @@ -167,9 +167,13 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config * if err != nil { bulkIndexErrors = append(bulkIndexErrors, err) } else { + causeString := "" + if res.Error.Cause.Type != "" || res.Error.Cause.Reason != "" { + causeString = fmt.Sprintf("(caused by type:%s reason:%s)", res.Error.Cause.Type, res.Error.Cause.Reason) + } bulkIndexErrors = append(bulkIndexErrors, - fmt.Errorf("type:%s reason:%s caused by:(%v)", res.Error.Type, - res.Error.Reason, res.Error.Cause)) + fmt.Errorf("id:%s type:%s reason:%s %s", item.DocumentID, res.Error.Type, + res.Error.Reason, causeString)) } }, }) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 75d78188e5..8ec936d2ea 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -123,10 +123,7 @@ func (c *PostgresConnector) getNumRowsPartitions( } // Calculate the number of partitions - numPartitions := totalRows.Int64 / numRowsPerPartition - if totalRows.Int64%numRowsPerPartition != 0 { - numPartitions++ - } + numPartitions := shared.DivCeil(totalRows.Int64, numRowsPerPartition) c.logger.Info(fmt.Sprintf("total rows: %d, num partitions: %d, num rows per partition: %d", totalRows.Int64, numPartitions, numRowsPerPartition))