Skip to content

Commit

Permalink
Handling redundant metrics for indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
Vishnu Challa committed Nov 1, 2023
1 parent dc2da49 commit b0dead6
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 2 deletions.
17 changes: 16 additions & 1 deletion indexers/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func (esIndexer *Elastic) Index(documents []interface{}, opts IndexingOpts) (str
var statString string
var indexerStatsLock sync.Mutex
indexerStats := make(map[string]int)

if len(documents) <= 0 {
return fmt.Sprintf("Indexing skipped due to %v docs", len(documents)), nil
}
hasher := sha256.New()
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: ESClient,
Expand All @@ -97,18 +101,25 @@ func (esIndexer *Elastic) Index(documents []interface{}, opts IndexingOpts) (str
return "", fmt.Errorf("Error creating the indexer: %s", err)
}
start := time.Now().UTC()
docHash := make(map[string]bool)
redundantSkipped := 0
for _, document := range documents {
j, err := json.Marshal(document)
if err != nil {
return "", fmt.Errorf("Cannot encode document %s: %s", document, err)
}
hasher.Write(j)
docId := hex.EncodeToString(hasher.Sum(nil))
if _, exists := docHash[docId]; exists {
redundantSkipped += 1
continue
}
err = bi.Add(
context.Background(),
esutil.BulkIndexerItem{
Action: "index",
Body: bytes.NewReader(j),
DocumentID: hex.EncodeToString(hasher.Sum(nil)),
DocumentID: docId,
OnSuccess: func(c context.Context, bii esutil.BulkIndexerItem, biri esutil.BulkIndexerResponseItem) {
indexerStatsLock.Lock()
defer indexerStatsLock.Unlock()
Expand All @@ -119,6 +130,7 @@ func (esIndexer *Elastic) Index(documents []interface{}, opts IndexingOpts) (str
if err != nil {
return "", fmt.Errorf("Unexpected ES indexing error: %s", err)
}
docHash[docId] = true
hasher.Reset()
}
if err := bi.Close(context.Background()); err != nil {
Expand All @@ -128,5 +140,8 @@ func (esIndexer *Elastic) Index(documents []interface{}, opts IndexingOpts) (str
for stat, val := range indexerStats {
statString += fmt.Sprintf(" %s=%d", stat, val)
}
if(redundantSkipped > 0){
statString += fmt.Sprintf(" redundantskipped=%d", redundantSkipped)
}
return fmt.Sprintf("Indexing finished in %v:%v", dur.Truncate(time.Millisecond), statString), nil
}
12 changes: 12 additions & 0 deletions indexers/elastic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,18 @@ var _ = Describe("Tests for elastic.go", func() {
Expect(err).To(BeNil())
})

It("Test empty list of docs", func() {
_, err := indexer.Index([]interface{}{}, testcase.opts)
Expect(err).To(BeNil())
})

It("Redundant list of docs", func() {
lastDoc := testcase.documents[len(testcase.documents)-1]
testcase.documents = append(testcase.documents, lastDoc)
_, err := indexer.Index(testcase.documents, testcase.opts)
Expect(err).To(BeNil())
})

It("err returned docs not processed", func() {
testcase.documents = append(testcase.documents, make(chan string))
_, err := indexer.Index(testcase.documents, testcase.opts)
Expand Down
6 changes: 6 additions & 0 deletions indexers/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ var _ = Describe("Tests for local.go", func() {
},
}
})
AfterEach(func() {
err := os.RemoveAll(indexer.metricsDirectory)
if err != nil {
log.Fatal(err)
}
})

It("No err is returned", func() {
_, err := indexer.Index(testcase.documents, testcase.opts)
Expand Down
17 changes: 16 additions & 1 deletion indexers/opensearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func (OpenSearchIndexer *OpenSearch) Index(documents []interface{}, opts Indexin
var statString string
var indexerStatsLock sync.Mutex
indexerStats := make(map[string]int)

if len(documents) <= 0 {
return fmt.Sprintf("Indexing skipped due to %v docs", len(documents)), nil
}
hasher := sha256.New()
bi, err := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{
Client: OSClient,
Expand All @@ -97,18 +101,25 @@ func (OpenSearchIndexer *OpenSearch) Index(documents []interface{}, opts Indexin
return "", fmt.Errorf("Error creating the indexer: %s", err)
}
start := time.Now().UTC()
docHash := make(map[string]bool)
redundantSkipped := 0
for _, document := range documents {
j, err := json.Marshal(document)
if err != nil {
return "", fmt.Errorf("Cannot encode document %s: %s", document, err)
}
hasher.Write(j)
docId := hex.EncodeToString(hasher.Sum(nil))
if _, exists := docHash[docId]; exists {
redundantSkipped += 1
continue
}
err = bi.Add(
context.Background(),
opensearchutil.BulkIndexerItem{
Action: "index",
Body: bytes.NewReader(j),
DocumentID: hex.EncodeToString(hasher.Sum(nil)),
DocumentID: docId,
OnSuccess: func(c context.Context, bii opensearchutil.BulkIndexerItem, biri opensearchutil.BulkIndexerResponseItem) {
indexerStatsLock.Lock()
defer indexerStatsLock.Unlock()
Expand All @@ -119,6 +130,7 @@ func (OpenSearchIndexer *OpenSearch) Index(documents []interface{}, opts Indexin
if err != nil {
return "", fmt.Errorf("Unexpected OpenSearch indexing error: %s", err)
}
docHash[docId] = true
hasher.Reset()
}
if err := bi.Close(context.Background()); err != nil {
Expand All @@ -128,5 +140,8 @@ func (OpenSearchIndexer *OpenSearch) Index(documents []interface{}, opts Indexin
for stat, val := range indexerStats {
statString += fmt.Sprintf(" %s=%d", stat, val)
}
if(redundantSkipped > 0){
statString += fmt.Sprintf(" redundantskipped=%d", redundantSkipped)
}
return fmt.Sprintf("Indexing finished in %v:%v", dur.Truncate(time.Millisecond), statString), nil
}
12 changes: 12 additions & 0 deletions indexers/opensearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ var _ = Describe("Tests for opensearch.go", func() {
Expect(err).To(BeNil())
})

It("Test empty list of docs", func() {
_, err := indexer.Index([]interface{}{}, testcase.opts)
Expect(err).To(BeNil())
})

It("Redundant list of docs", func() {
lastDoc := testcase.documents[len(testcase.documents)-1]
testcase.documents = append(testcase.documents, lastDoc)
_, err := indexer.Index(testcase.documents, testcase.opts)
Expect(err).To(BeNil())
})

It("err returned docs not processed", func() {
testcase.documents = append(testcase.documents, make(chan string))
_, err := indexer.Index(testcase.documents, testcase.opts)
Expand Down

0 comments on commit b0dead6

Please sign in to comment.