Skip to content

Commit

Permalink
elasticsearch - fix issue with bulk indexer closing (#1705)
Browse files Browse the repository at this point in the history
Co-authored-by: Philip Dubé <[email protected]>
  • Loading branch information
heavycrystal and serprex authored May 8, 2024
1 parent d9bdb10 commit a2e9ab8
Showing 1 changed file with 4 additions and 10 deletions.
14 changes: 4 additions & 10 deletions flow/connectors/connelasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,25 +121,23 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context,
req *model.SyncRecordsRequest[model.RecordItems],
) (*model.SyncResponse, error) {
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
// atomics for counts will be unnecessary in other destinations, using a mutex instead
var recordCountsUpdateMutex sync.Mutex
// we're taking a mutex anyway, avoid atomic
var lastSeenLSN atomic.Int64
var numRecords atomic.Int64
var numRecords int64

// no I don't like this either
esBulkIndexerCache := make(map[string]esutil.BulkIndexer)
bulkIndexersHaveShutdown := false
// true if we saw errors while closing
cacheCloser := func() bool {
closeHasErrors := false
if bulkIndexersHaveShutdown {
if !bulkIndexersHaveShutdown {
for _, esBulkIndexer := range maps.Values(esBulkIndexerCache) {
err := esBulkIndexer.Close(context.Background())
if err != nil {
esc.logger.Error("[es] failed to close bulk indexer", slog.Any("error", err))
closeHasErrors = true
}
numRecords += int64(esBulkIndexer.Stats().NumFlushed)
}
bulkIndexersHaveShutdown = true
}
Expand Down Expand Up @@ -237,9 +235,6 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context,

OnSuccess: func(_ context.Context, _ esutil.BulkIndexerItem, _ esutil.BulkIndexerResponseItem) {
shared.AtomicInt64Max(&lastSeenLSN, record.GetCheckpointID())
numRecords.Add(1)
recordCountsUpdateMutex.Lock()
defer recordCountsUpdateMutex.Unlock()
record.PopulateCountMap(tableNameRowsMapping)
},
// OnFailure is called for each failed operation, log and let parent handle
Expand Down Expand Up @@ -284,7 +279,6 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context,
esc.logger.Error("[es] failed to close bulk indexer(s)")
return nil, errors.New("[es] failed to close bulk indexer(s)")
}
bulkIndexersHaveShutdown = true
if len(bulkIndexErrors) > 0 {
for _, err := range bulkIndexErrors {
esc.logger.Error("[es] failed to index record", slog.Any("err", err))
Expand All @@ -299,7 +293,7 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context,
return &model.SyncResponse{
CurrentSyncBatchID: req.SyncBatchID,
LastSyncedCheckpointID: lastCheckpoint,
NumRecordsSynced: numRecords.Load(),
NumRecordsSynced: numRecords,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
Expand Down

0 comments on commit a2e9ab8

Please sign in to comment.