Skip to content

Commit

Permalink
Merge branch 'main' into ssh-sql
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored May 9, 2024
2 parents 5290d66 + a2e9ab8 commit f07ce13
Showing 1 changed file with 4 additions and 10 deletions.
14 changes: 4 additions & 10 deletions flow/connectors/connelasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,25 +121,23 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context,
req *model.SyncRecordsRequest[model.RecordItems],
) (*model.SyncResponse, error) {
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
// atomics for counts will be unnecessary in other destinations, using a mutex instead
var recordCountsUpdateMutex sync.Mutex
// we're taking a mutex anyway, avoid atomic
var lastSeenLSN atomic.Int64
var numRecords atomic.Int64
var numRecords int64

// no I don't like this either
esBulkIndexerCache := make(map[string]esutil.BulkIndexer)
bulkIndexersHaveShutdown := false
// true if we saw errors while closing
cacheCloser := func() bool {
closeHasErrors := false
if bulkIndexersHaveShutdown {
if !bulkIndexersHaveShutdown {
for _, esBulkIndexer := range maps.Values(esBulkIndexerCache) {
err := esBulkIndexer.Close(context.Background())
if err != nil {
esc.logger.Error("[es] failed to close bulk indexer", slog.Any("error", err))
closeHasErrors = true
}
numRecords += int64(esBulkIndexer.Stats().NumFlushed)
}
bulkIndexersHaveShutdown = true
}
Expand Down Expand Up @@ -237,9 +235,6 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context,

OnSuccess: func(_ context.Context, _ esutil.BulkIndexerItem, _ esutil.BulkIndexerResponseItem) {
shared.AtomicInt64Max(&lastSeenLSN, record.GetCheckpointID())
numRecords.Add(1)
recordCountsUpdateMutex.Lock()
defer recordCountsUpdateMutex.Unlock()
record.PopulateCountMap(tableNameRowsMapping)
},
// OnFailure is called for each failed operation, log and let parent handle
Expand Down Expand Up @@ -284,7 +279,6 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context,
esc.logger.Error("[es] failed to close bulk indexer(s)")
return nil, errors.New("[es] failed to close bulk indexer(s)")
}
bulkIndexersHaveShutdown = true
if len(bulkIndexErrors) > 0 {
for _, err := range bulkIndexErrors {
esc.logger.Error("[es] failed to index record", slog.Any("err", err))
Expand All @@ -299,7 +293,7 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context,
return &model.SyncResponse{
CurrentSyncBatchID: req.SyncBatchID,
LastSyncedCheckpointID: lastCheckpoint,
NumRecordsSynced: numRecords.Load(),
NumRecordsSynced: numRecords,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
Expand Down

0 comments on commit f07ce13

Please sign in to comment.