diff --git a/indexers/elastic.go b/indexers/elastic.go index c7b4872..724d861 100644 --- a/indexers/elastic.go +++ b/indexers/elastic.go @@ -85,6 +85,10 @@ func (esIndexer *Elastic) Index(documents []interface{}, opts IndexingOpts) (str var statString string var indexerStatsLock sync.Mutex indexerStats := make(map[string]int) + + if len(documents) <= 0 { + return fmt.Sprintf("Indexing skipped due to %v docs", len(documents)), nil + } hasher := sha256.New() bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Client: ESClient, @@ -97,18 +101,25 @@ func (esIndexer *Elastic) Index(documents []interface{}, opts IndexingOpts) (str return "", fmt.Errorf("Error creating the indexer: %s", err) } start := time.Now().UTC() + docHash := make(map[string]bool) + redundantSkipped := 0 for _, document := range documents { j, err := json.Marshal(document) if err != nil { return "", fmt.Errorf("Cannot encode document %s: %s", document, err) } hasher.Write(j) + docId := hex.EncodeToString(hasher.Sum(nil)) + if _, exists := docHash[docId]; exists { + redundantSkipped += 1 + continue + } err = bi.Add( context.Background(), esutil.BulkIndexerItem{ Action: "index", Body: bytes.NewReader(j), - DocumentID: hex.EncodeToString(hasher.Sum(nil)), + DocumentID: docId, OnSuccess: func(c context.Context, bii esutil.BulkIndexerItem, biri esutil.BulkIndexerResponseItem) { indexerStatsLock.Lock() defer indexerStatsLock.Unlock() @@ -119,6 +130,7 @@ func (esIndexer *Elastic) Index(documents []interface{}, opts IndexingOpts) (str if err != nil { return "", fmt.Errorf("Unexpected ES indexing error: %s", err) } + docHash[docId] = true hasher.Reset() } if err := bi.Close(context.Background()); err != nil { @@ -128,5 +140,8 @@ func (esIndexer *Elastic) Index(documents []interface{}, opts IndexingOpts) (str for stat, val := range indexerStats { statString += fmt.Sprintf(" %s=%d", stat, val) } + if(redundantSkipped > 0){ + statString += fmt.Sprintf(" redundantskipped=%d", redundantSkipped) + } return fmt.Sprintf("Indexing finished in %v:%v", dur.Truncate(time.Millisecond), statString), nil } diff --git a/indexers/elastic_test.go b/indexers/elastic_test.go index 66a2925..c72a097 100644 --- a/indexers/elastic_test.go +++ b/indexers/elastic_test.go @@ -105,6 +105,18 @@ var _ = Describe("Tests for elastic.go", func() { Expect(err).To(BeNil()) }) + It("Test empty list of docs", func() { + _, err := indexer.Index([]interface{}{}, testcase.opts) + Expect(err).To(BeNil()) + }) + + It("Redundant list of docs", func() { + lastDoc := testcase.documents[len(testcase.documents)-1] + testcase.documents = append(testcase.documents, lastDoc) + _, err := indexer.Index(testcase.documents, testcase.opts) + Expect(err).To(BeNil()) + }) + It("err returned docs not processed", func() { testcase.documents = append(testcase.documents, make(chan string)) _, err := indexer.Index(testcase.documents, testcase.opts) diff --git a/indexers/local_test.go b/indexers/local_test.go index 8070b90..762249e 100644 --- a/indexers/local_test.go +++ b/indexers/local_test.go @@ -72,6 +72,12 @@ var _ = Describe("Tests for local.go", func() { }, } }) + AfterEach(func() { + err := os.RemoveAll(indexer.metricsDirectory) + if err != nil { + log.Fatal(err) + } + }) It("No err is returned", func() { _, err := indexer.Index(testcase.documents, testcase.opts) diff --git a/indexers/opensearch.go b/indexers/opensearch.go index d8e17bd..40e227a 100644 --- a/indexers/opensearch.go +++ b/indexers/opensearch.go @@ -85,6 +85,10 @@ func (OpenSearchIndexer *OpenSearch) Index(documents []interface{}, opts Indexin var statString string var indexerStatsLock sync.Mutex indexerStats := make(map[string]int) + + if len(documents) <= 0 { + return fmt.Sprintf("Indexing skipped due to %v docs", len(documents)), nil + } hasher := sha256.New() bi, err := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{ Client: OSClient, @@ -97,18 +101,25 @@ func (OpenSearchIndexer *OpenSearch) Index(documents []interface{}, opts Indexin return "", fmt.Errorf("Error creating the indexer: %s", err) } start := time.Now().UTC() + docHash := make(map[string]bool) + redundantSkipped := 0 for _, document := range documents { j, err := json.Marshal(document) if err != nil { return "", fmt.Errorf("Cannot encode document %s: %s", document, err) } hasher.Write(j) + docId := hex.EncodeToString(hasher.Sum(nil)) + if _, exists := docHash[docId]; exists { + redundantSkipped += 1 + continue + } err = bi.Add( context.Background(), opensearchutil.BulkIndexerItem{ Action: "index", Body: bytes.NewReader(j), - DocumentID: hex.EncodeToString(hasher.Sum(nil)), + DocumentID: docId, OnSuccess: func(c context.Context, bii opensearchutil.BulkIndexerItem, biri opensearchutil.BulkIndexerResponseItem) { indexerStatsLock.Lock() defer indexerStatsLock.Unlock() @@ -119,6 +130,7 @@ func (OpenSearchIndexer *OpenSearch) Index(documents []interface{}, opts Indexin if err != nil { return "", fmt.Errorf("Unexpected OpenSearch indexing error: %s", err) } + docHash[docId] = true hasher.Reset() } if err := bi.Close(context.Background()); err != nil { @@ -128,5 +140,8 @@ func (OpenSearchIndexer *OpenSearch) Index(documents []interface{}, opts Indexin for stat, val := range indexerStats { statString += fmt.Sprintf(" %s=%d", stat, val) } + if(redundantSkipped > 0){ + statString += fmt.Sprintf(" redundantskipped=%d", redundantSkipped) + } return fmt.Sprintf("Indexing finished in %v:%v", dur.Truncate(time.Millisecond), statString), nil } diff --git a/indexers/opensearch_test.go b/indexers/opensearch_test.go index 688c051..3a4ed3c 100644 --- a/indexers/opensearch_test.go +++ b/indexers/opensearch_test.go @@ -103,6 +103,18 @@ var _ = Describe("Tests for opensearch.go", func() { Expect(err).To(BeNil()) }) + It("Test empty list of docs", func() { + _, err := indexer.Index([]interface{}{}, testcase.opts) + Expect(err).To(BeNil()) + }) + + It("Redundant list of docs", func() { + lastDoc := testcase.documents[len(testcase.documents)-1] + testcase.documents = append(testcase.documents, lastDoc) + _, err := indexer.Index(testcase.documents, testcase.opts) + Expect(err).To(BeNil()) + }) + It("err returned docs not processed", func() { testcase.documents = append(testcase.documents, make(chan string)) _, err := indexer.Index(testcase.documents, testcase.opts)