diff --git a/esutil/bulk_indexer.go b/esutil/bulk_indexer.go index 64812726fa..806ad06e00 100644 --- a/esutil/bulk_indexer.go +++ b/esutil/bulk_indexer.go @@ -95,6 +95,7 @@ type BulkIndexerStats struct { NumDeleted uint64 NumRequests uint64 FlushedBytes uint64 + FlushedMs uint64 } // BulkIndexerItem represents an indexer item. @@ -276,6 +277,7 @@ type bulkIndexerStats struct { numDeleted uint64 numRequests uint64 flushedBytes uint64 + flushedMs uint64 } // NewBulkIndexer creates a new bulk indexer. @@ -365,6 +367,7 @@ func (bi *bulkIndexer) Stats() BulkIndexerStats { NumDeleted: atomic.LoadUint64(&bi.stats.numDeleted), NumRequests: atomic.LoadUint64(&bi.stats.numRequests), FlushedBytes: atomic.LoadUint64(&bi.stats.flushedBytes), + FlushedMs: atomic.LoadUint64(&bi.stats.flushedMs), } } @@ -568,6 +571,7 @@ func (w *worker) flushBuffer(ctx context.Context) error { } req.Header.Set(elasticsearch.HeaderClientMeta, "h=bp") + t := time.Now() res, err := req.Do(ctx, w.bi.config.Client) if err != nil { atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items))) @@ -576,6 +580,9 @@ func (w *worker) flushBuffer(ctx context.Context) error { } return fmt.Errorf("flush: %s", err) } + elapsed := time.Since(t) + atomic.AddUint64(&w.bi.stats.flushedMs, uint64(elapsed.Milliseconds())) + if res.Body != nil { defer res.Body.Close() }