Skip to content

Commit

Permalink
updated datastore init
Browse files Browse the repository at this point in the history
  • Loading branch information
awitas committed Oct 7, 2022
1 parent f536a2d commit 4e75df1
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 19 deletions.
14 changes: 14 additions & 0 deletions shared/client/config/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,17 @@ type Remote struct {
func (d *Remote) Init() {
d.MetaInput.Init()
}

func (d *Remote) Validate() error {
//if len(d.Connections) == 0 && d.Datastore.ID != "" {
// return fmt.Errorf("connection was empty")
//}
//var connections = map[string]bool{}
//for _, c := range d.Connections {
// connections[c.ID] = true
//}
//if _, ok := connections[d.Datastore.ID]; !ok {
// return fmt.Errorf("unknown datastore connection: %v", d.Datastore.ID)
//}
return nil
}
48 changes: 30 additions & 18 deletions shared/client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,16 @@ func (s *Service) Run(ctx context.Context, input interface{}, response *Response
onDone(time.Now(), *stats...)
}()

cachable, ok := input.(Cachable)
cachable, isCachable := input.(Cachable)

batchSize := cachable.BatchSize()
if response.Data == nil {
return fmt.Errorf("response data was empty - aborting request")
}
var err error
var cachedCount int
var cached []interface{}
if ok {
if isCachable {
cached = make([]interface{}, batchSize)
dataType := response.DataItemType()
if batchSize >= 1 {
Expand Down Expand Up @@ -132,7 +133,6 @@ func (s *Service) Run(ctx context.Context, input interface{}, response *Response
return fmt.Errorf("failed to handle resp: %w", err)
}
go s.updatedCacheInBackground(ctx, response.Data, cachable, s.dict.hash)

s.assertDictHash(response)
return nil
}
Expand Down Expand Up @@ -210,11 +210,17 @@ func (s *Service) init(options []Option) error {
return err
}
}
if s.datastore == nil {
if err := s.initDatastore(); err != nil {
if ds := s.Config.Datastore; ds != nil {
ds.Init()
if err = ds.Validate(); err != nil {
return err
}
}

if s.datastore == nil {
err := s.initDatastore()
return err
}
s.messages = NewMessages(s.dictionary)
return nil
}
Expand Down Expand Up @@ -300,21 +306,22 @@ func (s *Service) getHTTPClient(host *Host) *http.Client {

func (s *Service) initDatastore() error {
ds := s.Config.Datastore
if ds.Datastore.ID == "" {
return nil
}
if ds == nil {
return nil
}
if len(ds.Connections) > 0 {
datastores := &sconfig.DatastoreList{
Connections: ds.Connections,
Datastores: []*sconfig.Datastore{&ds.Datastore},
}
aMap, err := datastore.NewStores(datastores, s.gmetrics)
if err != nil {
return err
}
s.datastore = aMap[ds.ID]
s.datastore.SetMode(datastore.ModeClient)
var stores = map[string]*datastore.Service{}
var err error
datastores := &sconfig.DatastoreList{
Datastores: []*sconfig.Datastore{&ds.Datastore},
Connections: ds.Connections,
}
if stores, err = datastore.NewStores(datastores, s.gmetrics); err != nil {
return err
}
s.datastore = stores[ds.ID]
if len(ds.Fields) > 0 {
if err := ds.FieldsDescriptor(ds.Fields); err != nil {
return err
Expand All @@ -323,7 +330,6 @@ func (s *Service) initDatastore() error {
return storable.New(ds.Fields)
}
}

return nil
}

Expand Down Expand Up @@ -351,7 +357,8 @@ func New(model string, hosts []*Host, options ...Option) (*Service, error) {
Hosts: hosts,
},
}
return aClient, aClient.init(options)
err := aClient.init(options)
return aClient, err
}

func (s *Service) discoverConfig(host *Host, URL string) (*config.Remote, error) {
Expand Down Expand Up @@ -447,6 +454,8 @@ func (s *Service) httpPost(ctx context.Context, data []byte, host *Host) ([]byte
func (s *Service) getHost() (*Host, error) {
count := len(s.Hosts)
switch count {
case 0:

case 1:
candidate := s.Hosts[0]
if !candidate.IsUp() {
Expand All @@ -469,6 +478,9 @@ func (s *Service) getHost() (*Host, error) {
}

func (s *Service) updatedCacheInBackground(ctx context.Context, target interface{}, cachable Cachable, hash int) {
if s.datastore == nil {
return
}
targetType := reflect.TypeOf(target).Elem()
switch targetType.Kind() {
case reflect.Struct:
Expand Down
2 changes: 1 addition & 1 deletion shared/datastore/client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *service) checkConnectionError(err error) {
func (s *service) connect() error {
hosts := s.hosts()
if len(hosts) == 0 {
return fmt.Errorf("Hostname was empty")
return fmt.Errorf("hostname was empty")
}
client, err := aero.NewClientWithPolicyAndHost(s.clientPolicy, hosts...)
if err != nil {
Expand Down

0 comments on commit 4e75df1

Please sign in to comment.