Skip to content

Commit

Permalink
paralelized model bootstrap
Browse files Browse the repository at this point in the history
  • Loading branch information
awitas committed Oct 7, 2022
1 parent aad6ba1 commit 6fce30e
Showing 1 changed file with 29 additions and 10 deletions.
39 changes: 29 additions & 10 deletions service/endpoint/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"github.com/viant/gmetric"
"github.com/viant/mly/service"
"github.com/viant/mly/service/buffer"
"github.com/viant/mly/service/config"
"github.com/viant/mly/shared/common"
"github.com/viant/mly/shared/datastore"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"sync"

"log"
"net/http"
Expand Down Expand Up @@ -56,17 +58,34 @@ func (s *Service) Shutdown(ctx context.Context) error {
}

func (s *Service) initModelHandler(datastores map[string]*datastore.Service, pool *buffer.Pool, mux *http.ServeMux) error {
for i, model := range s.config.ModelList.Models {
srv, err := service.New(context.TODO(), s.fs, s.config.ModelList.Models[i], s.metrics, datastores)
if err != nil {
return fmt.Errorf("failed to create service for modelService: %v, due to %w", model.ID, err)
}
handler := service.NewHandler(srv, pool, s.config.Endpoint.WriteTimeout-time.Millisecond)
s.handlers[model.ID] = handler
mux.Handle(fmt.Sprintf(common.ModelURI, model.ID), handler)
metaHandler := newMetaHandler(srv, &s.config.DatastoreList)
mux.Handle(fmt.Sprintf(common.MetaURI, model.ID), metaHandler)
var err error
waitGroup := sync.WaitGroup{}
waitGroup.Add(len(s.config.ModelList.Models))
var lock sync.Mutex
for i := range s.config.ModelList.Models {
go func(model *config.Model) {
waitGroup.Done()
if e := s.initModel(datastores, pool, mux, model, &lock); e != nil {
err = e
}
}(s.config.ModelList.Models[i])
}
waitGroup.Wait()
return err
}

func (s *Service) initModel(datastores map[string]*datastore.Service, pool *buffer.Pool, mux *http.ServeMux, model *config.Model, lock *sync.Mutex) error {
srv, err := service.New(context.TODO(), s.fs, model, s.metrics, datastores)
if err != nil {
return fmt.Errorf("failed to create service for modelService: %v, due to %w", model.ID, err)
}
handler := service.NewHandler(srv, pool, s.config.Endpoint.WriteTimeout-time.Millisecond)
lock.Lock()
defer lock.Unlock()
s.handlers[model.ID] = handler
mux.Handle(fmt.Sprintf(common.ModelURI, model.ID), handler)
metaHandler := newMetaHandler(srv, &s.config.DatastoreList)
mux.Handle(fmt.Sprintf(common.MetaURI, model.ID), metaHandler)
return nil
}

Expand Down

0 comments on commit 6fce30e

Please sign in to comment.