Skip to content

Commit

Permalink
mainlining forage changes (#1635)
Browse files Browse the repository at this point in the history
better logs mostly

DO NOT DELETE THIS BRANCH AFTER MERGING
  • Loading branch information
heavycrystal authored Apr 22, 2024
1 parent 42e838c commit e3fbc6e
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
16 changes: 11 additions & 5 deletions flow/connectors/connelasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config *
qRecordJsonMap[field.Name] = json.RawMessage(shared.
UnsafeFastStringToReadOnlyBytes(r.Val))
default:
qRecordJsonMap[field.Name] = qRecord[i].Value()
qRecordJsonMap[field.Name] = r.Value()
}
}
qRecordJsonBytes, err := json.Marshal(qRecordJsonMap)
Expand All @@ -159,16 +159,21 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config *
Body: bytes.NewReader(qRecordJsonBytes),

// OnFailure is called for each failed operation, log and let parent handle
OnFailure: func(ctx context.Context, _ esutil.BulkIndexerItem,
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem,
res esutil.BulkIndexerResponseItem, err error,
) {
bulkIndexMutex.Lock()
defer bulkIndexMutex.Unlock()
if err != nil {
bulkIndexErrors = append(bulkIndexErrors, err)
} else {
causeString := ""
if res.Error.Cause.Type != "" || res.Error.Cause.Reason != "" {
causeString = fmt.Sprintf("(caused by type:%s reason:%s)", res.Error.Cause.Type, res.Error.Cause.Reason)
}
bulkIndexErrors = append(bulkIndexErrors,
fmt.Errorf("%s %s %v", res.Error.Type, res.Error.Reason, res.Error.Cause))
fmt.Errorf("id:%s type:%s reason:%s %s", item.DocumentID, res.Error.Type,
res.Error.Reason, causeString))
}
},
})
Expand All @@ -191,8 +196,9 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config *
}
bulkIndexerHasShutdown = true
if len(bulkIndexErrors) > 0 {
esc.logger.Error("[es] failed to bulk index records", slog.Any("errors", bulkIndexErrors))
return 0, fmt.Errorf("[es] failed to bulk index records: %v", bulkIndexErrors)
for _, err := range bulkIndexErrors {
esc.logger.Error("[es] failed to index record", slog.Any("err", err))
}
}

err = esc.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime)
Expand Down
5 changes: 1 addition & 4 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,7 @@ func (c *PostgresConnector) getNumRowsPartitions(
}

// Calculate the number of partitions
numPartitions := totalRows.Int64 / numRowsPerPartition
if totalRows.Int64%numRowsPerPartition != 0 {
numPartitions++
}
numPartitions := shared.DivCeil(totalRows.Int64, numRowsPerPartition)
c.logger.Info(fmt.Sprintf("total rows: %d, num partitions: %d, num rows per partition: %d",
totalRows.Int64, numPartitions, numRowsPerPartition))

Expand Down

0 comments on commit e3fbc6e

Please sign in to comment.