Skip to content

Commit

Permalink
Return a new indexer rather than a pointer to the same object (#45)
Browse files Browse the repository at this point in the history
Signed-off-by: Raul Sevilla <[email protected]>
  • Loading branch information
rsevilla87 authored May 3, 2024
1 parent ec99247 commit 35a376f
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 64 deletions.
22 changes: 9 additions & 13 deletions indexers/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,12 @@ type Elastic struct {
// ESClient elasticsearch client instance
var ESClient *elasticsearch.Client

// Init function
func init() {
indexerMap[ElasticIndexer] = &Elastic{}
}

// Returns new indexer for elastic search
func (esIndexer *Elastic) new(indexerConfig IndexerConfig) error {
// Returns new indexer for Elastic
func NewElasticIndexer(indexerConfig IndexerConfig) (*Elastic, error) {
var err error
var esIndexer Elastic
if indexerConfig.Index == "" {
return fmt.Errorf("index name not specified")
return &esIndexer, fmt.Errorf("index name not specified")
}
esIndex := strings.ToLower(indexerConfig.Index)
cfg := elasticsearch.Config{
Expand All @@ -58,24 +54,24 @@ func (esIndexer *Elastic) new(indexerConfig IndexerConfig) error {
}
ESClient, err = elasticsearch.NewClient(cfg)
if err != nil {
return fmt.Errorf("error creating the ES client: %s", err)
return &esIndexer, fmt.Errorf("error creating the ES client: %s", err)
}
r, err := ESClient.Cluster.Health()
if err != nil {
return fmt.Errorf("ES health check failed: %s", err)
return &esIndexer, fmt.Errorf("ES health check failed: %s", err)
}
if r.StatusCode != 200 {
return fmt.Errorf("unexpected ES status code: %d", r.StatusCode)
return &esIndexer, fmt.Errorf("unexpected ES status code: %d", r.StatusCode)
}
esIndexer.index = esIndex
r, _ = ESClient.Indices.Exists([]string{esIndex})
if r.IsError() {
r, _ = ESClient.Indices.Create(esIndex)
if r.IsError() {
return fmt.Errorf("error creating index %s on ES: %s", esIndex, r.String())
return &esIndexer, fmt.Errorf("error creating index %s on ES: %s", esIndex, r.String())
}
}
return nil
return &esIndexer, nil
}

// Index uses bulkIndexer to index the documents in the given index
Expand Down
12 changes: 6 additions & 6 deletions indexers/elastic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package indexers

import (
"errors"
"log"
"net/http"
"net/http/httptest"
"os"
"log"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -40,12 +40,12 @@ var _ = Describe("Tests for elastic.go", func() {
}))
defer testcase.mockServer.Close()
testcase.indexerConfig.Servers = []string{testcase.mockServer.URL}
err := indexer.new(testcase.indexerConfig)
_, err := NewElasticIndexer(testcase.indexerConfig)
Expect(err).To(BeEquivalentTo(errors.New("unexpected ES status code: 400")))
})

It("when no url is passed", func() {
err := indexer.new(testcase.indexerConfig)
_, err := NewElasticIndexer(testcase.indexerConfig)
testcase.mockServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusGatewayTimeout)
}))
Expand All @@ -58,15 +58,15 @@ var _ = Describe("Tests for elastic.go", func() {
os.Setenv("ELASTICSEARCH_URL", "not a valid url:port")
defer os.Unsetenv("ELASTICSEARCH_URL")
defer testcase.mockServer.Close()
err := indexer.new(testcase.indexerConfig)
_, err := NewElasticIndexer(testcase.indexerConfig)
Expect(err).To(BeEquivalentTo(errors.New("error creating the ES client: cannot create client: cannot parse url: parse \"not a valid url:port\": first path segment in URL cannot contain colon")))
})

It("Returns err no index name", func() {
defer testcase.mockServer.Close()
testcase.indexerConfig.Servers = []string{testcase.mockServer.URL}
testcase.indexerConfig.Index = ""
err := indexer.new(testcase.indexerConfig)
_, err := NewElasticIndexer(testcase.indexerConfig)
Expect(err).To(BeEquivalentTo(errors.New("index name not specified")))
})

Expand Down Expand Up @@ -116,7 +116,7 @@ var _ = Describe("Tests for elastic.go", func() {
_, err := indexer.Index(testcase.documents, testcase.opts)
Expect(err).To(BeNil())
})

It("err returned docs not processed", func() {
testcase.documents = append(testcase.documents, make(chan string))
_, err := indexer.Index(testcase.documents, testcase.opts)
Expand Down
23 changes: 11 additions & 12 deletions indexers/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,19 @@ import (
"fmt"
)

var indexerMap = make(map[IndexerType]Indexer)

// NewIndexer creates a new Indexer with the specified IndexerConfig
func NewIndexer(indexerConfig IndexerConfig) (*Indexer, error) {
var indexer Indexer
var exists bool
cfg := indexerConfig
if indexer, exists = indexerMap[cfg.Type]; exists {
err := indexer.new(indexerConfig)
if err != nil {
return &indexer, err
}
} else {
return &indexer, fmt.Errorf("Indexer not found: %s", cfg.Type)
var err error
switch indexerConfig.Type {
case LocalIndexer:
indexer, err = NewLocalIndexer(indexerConfig)
case ElasticIndexer:
indexer, err = NewElasticIndexer(indexerConfig)
case OpenSearchIndexer:
indexer, err = NewOpenSearchIndexer(indexerConfig)
default:
return &indexer, fmt.Errorf("Indexer not found: %s", indexerConfig.Type)
}
return &indexer, nil
return &indexer, err
}
18 changes: 7 additions & 11 deletions indexers/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,15 @@ type Local struct {
metricsDirectory string
}

// Init function
func init() {
indexerMap[LocalIndexer] = &Local{}
}

// Prepares local indexing directory
func (l *Local) new(indexerConfig IndexerConfig) error {
// NewLocalIndexer returns a new Local Indexer
func NewLocalIndexer(indexerConfig IndexerConfig) (*Local, error) {
var localIndexer Local
if indexerConfig.MetricsDirectory == "" {
return fmt.Errorf("directory name not specified")
return &localIndexer, fmt.Errorf("directory name not specified")
}
l.metricsDirectory = indexerConfig.MetricsDirectory
err := os.MkdirAll(l.metricsDirectory, 0744)
return err
localIndexer.metricsDirectory = indexerConfig.MetricsDirectory
err := os.MkdirAll(localIndexer.metricsDirectory, 0744)
return &localIndexer, err
}

// Index uses generates a local file with the given name and metrics
Expand Down
5 changes: 2 additions & 3 deletions indexers/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ var _ = Describe("Tests for local.go", func() {
indexerconfig IndexerConfig
}
var testcase newtestcase
var localIndexer Local
BeforeEach(func() {
testcase = newtestcase{
indexerconfig: IndexerConfig{Type: "local",
Expand All @@ -30,13 +29,13 @@ var _ = Describe("Tests for local.go", func() {
})

It("returns err no metrics directory", func() {
err := localIndexer.new(testcase.indexerconfig)
_, err := NewLocalIndexer(testcase.indexerconfig)
Expect(err).To(BeEquivalentTo(errors.New("directory name not specified")))
})

It("returns nil as error", func() {
testcase.indexerconfig.MetricsDirectory = "placeholder"
err := localIndexer.new(testcase.indexerconfig)
_, err := NewLocalIndexer(testcase.indexerconfig)
Expect(err).To(BeNil())
})
})
Expand Down
22 changes: 9 additions & 13 deletions indexers/opensearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,12 @@ type OpenSearch struct {
index string
}

// Init function
func init() {
indexerMap[OpenSearchIndexer] = &OpenSearch{}
}

// Returns new indexer for OpenSearch
func (OpenSearchIndexer *OpenSearch) new(indexerConfig IndexerConfig) error {
func NewOpenSearchIndexer(indexerConfig IndexerConfig) (*OpenSearch, error) {
var err error
var osIndexer OpenSearch
if indexerConfig.Index == "" {
return fmt.Errorf("index name not specified")
return &osIndexer, fmt.Errorf("index name not specified")
}
OpenSearchIndex := strings.ToLower(indexerConfig.Index)
cfg := opensearch.Config{
Expand All @@ -58,24 +54,24 @@ func (OpenSearchIndexer *OpenSearch) new(indexerConfig IndexerConfig) error {
}
OSClient, err = opensearch.NewClient(cfg)
if err != nil {
return fmt.Errorf("error creating the OpenSearch client: %s", err)
return &osIndexer, fmt.Errorf("error creating the OpenSearch client: %s", err)
}
r, err := OSClient.Cluster.Health()
if err != nil {
return fmt.Errorf("OpenSearch health check failed: %s", err)
return &osIndexer, fmt.Errorf("OpenSearch health check failed: %s", err)
}
if r.StatusCode != 200 {
return fmt.Errorf("unexpected OpenSearch status code: %d", r.StatusCode)
return &osIndexer, fmt.Errorf("unexpected OpenSearch status code: %d", r.StatusCode)
}
OpenSearchIndexer.index = OpenSearchIndex
osIndexer.index = OpenSearchIndex
r, _ = OSClient.Indices.Exists([]string{OpenSearchIndex})
if r.IsError() {
r, _ = OSClient.Indices.Create(OpenSearchIndex)
if r.IsError() {
return fmt.Errorf("error creating index %s on OpenSearch: %s", OpenSearchIndex, r.String())
return &osIndexer, fmt.Errorf("error creating index %s on OpenSearch: %s", OpenSearchIndex, r.String())
}
}
return nil
return &osIndexer, nil
}

// Index uses bulkIndexer to index the documents in the given index
Expand Down
10 changes: 5 additions & 5 deletions indexers/opensearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package indexers

import (
"errors"
"log"
"net/http"
"net/http/httptest"
"os"
"log"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -39,12 +39,12 @@ var _ = Describe("Tests for opensearch.go", func() {
}))
defer testcase.mockServer.Close()
testcase.indexerConfig.Servers = []string{testcase.mockServer.URL}
err := indexer.new(testcase.indexerConfig)
_, err := NewOpenSearchIndexer(testcase.indexerConfig)
Expect(err).To(BeEquivalentTo(errors.New("OpenSearch health check failed: cannot retrieve information from OpenSearch")))
})

It("when no url is passed", func() {
err := indexer.new(testcase.indexerConfig)
_, err := NewOpenSearchIndexer(testcase.indexerConfig)
testcase.mockServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusGatewayTimeout)
}))
Expand All @@ -56,15 +56,15 @@ var _ = Describe("Tests for opensearch.go", func() {
os.Setenv("ELASTICSEARCH_URL", "not a valid url:port")
defer os.Unsetenv("ELASTICSEARCH_URL")
defer testcase.mockServer.Close()
err := indexer.new(testcase.indexerConfig)
_, err := NewOpenSearchIndexer(testcase.indexerConfig)
Expect(err).To(BeEquivalentTo(errors.New("error creating the OpenSearch client: cannot create client: cannot parse url: parse \"not a valid url:port\": first path segment in URL cannot contain colon")))
})

It("Returns err no index name", func() {
defer testcase.mockServer.Close()
testcase.indexerConfig.Servers = []string{testcase.mockServer.URL}
testcase.indexerConfig.Index = ""
err := indexer.new(testcase.indexerConfig)
_, err := NewOpenSearchIndexer(testcase.indexerConfig)
Expect(err).To(BeEquivalentTo(errors.New("index name not specified")))
})

Expand Down
1 change: 0 additions & 1 deletion indexers/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ const (
// Indexer interface
type Indexer interface {
Index([]interface{}, IndexingOpts) (string, error)
new(IndexerConfig) error
}

// Indexing options
Expand Down

0 comments on commit 35a376f

Please sign in to comment.