diff --git a/flow/connectors/connelasticsearch/elasticsearch.go b/flow/connectors/connelasticsearch/elasticsearch.go index cd68f49879..01fa27d7e7 100644 --- a/flow/connectors/connelasticsearch/elasticsearch.go +++ b/flow/connectors/connelasticsearch/elasticsearch.go @@ -144,7 +144,7 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config * qRecordJsonMap[field.Name] = json.RawMessage(shared. UnsafeFastStringToReadOnlyBytes(r.Val)) default: - qRecordJsonMap[field.Name] = qRecord[i].Value() + qRecordJsonMap[field.Name] = r.Value() } } qRecordJsonBytes, err := json.Marshal(qRecordJsonMap) @@ -159,7 +159,7 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config * Body: bytes.NewReader(qRecordJsonBytes), // OnFailure is called for each failed operation, log and let parent handle - OnFailure: func(ctx context.Context, _ esutil.BulkIndexerItem, + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error, ) { bulkIndexMutex.Lock() @@ -167,8 +167,13 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config * if err != nil { bulkIndexErrors = append(bulkIndexErrors, err) } else { + causeString := "" + if res.Error.Cause.Type != "" || res.Error.Cause.Reason != "" { + causeString = fmt.Sprintf("(caused by type:%s reason:%s)", res.Error.Cause.Type, res.Error.Cause.Reason) + } bulkIndexErrors = append(bulkIndexErrors, - fmt.Errorf("%s %s %v", res.Error.Type, res.Error.Reason, res.Error.Cause)) + fmt.Errorf("id:%s type:%s reason:%s %s", item.DocumentID, res.Error.Type, + res.Error.Reason, causeString)) } }, }) @@ -191,8 +196,9 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config * } bulkIndexerHasShutdown = true if len(bulkIndexErrors) > 0 { - esc.logger.Error("[es] failed to bulk index records", slog.Any("errors", bulkIndexErrors)) - return 0, fmt.Errorf("[es] failed to bulk index records: %v", bulkIndexErrors) + for _, err := range bulkIndexErrors { + esc.logger.Error("[es] failed to index record", slog.Any("err", err)) + } } err = esc.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 75d78188e5..8ec936d2ea 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -123,10 +123,7 @@ func (c *PostgresConnector) getNumRowsPartitions( } // Calculate the number of partitions - numPartitions := totalRows.Int64 / numRowsPerPartition - if totalRows.Int64%numRowsPerPartition != 0 { - numPartitions++ - } + numPartitions := shared.DivCeil(totalRows.Int64, numRowsPerPartition) c.logger.Info(fmt.Sprintf("total rows: %d, num partitions: %d, num rows per partition: %d", totalRows.Int64, numPartitions, numRowsPerPartition))