diff --git a/common/messagequeue/config.go b/common/messagequeue/config.go index fd5d22fb..27183a6c 100644 --- a/common/messagequeue/config.go +++ b/common/messagequeue/config.go @@ -12,10 +12,10 @@ const ( type MessageQueueConfig struct { // The type of Message Queue for event updates - Kind MessageQueueKind `default:""` + Kind MessageQueueKind `json:"kind" default:""` // PubSubConfig captures the config related to publishing and subscribing to a PubSub Message Queue - PubSubConfig *PubSubConfig + PubSubConfig *PubSubConfig `json:"pub_sub_config"` } type PubSubConfig struct { diff --git a/plugins/turing/config/config.go b/plugins/turing/config/config.go index c8ccffc7..d55b82d5 100644 --- a/plugins/turing/config/config.go +++ b/plugins/turing/config/config.go @@ -6,6 +6,7 @@ import ( "github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic" "github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry" + common_mq_config "github.com/caraml-dev/xp/common/messagequeue" "github.com/caraml-dev/xp/treatment-service/config" "github.com/go-playground/validator/v10" ) @@ -46,14 +47,16 @@ type TreatmentServicePluginConfig struct { Port int `json:"port" default:"8080"` PubSubTimeoutSeconds int `json:"pub_sub_timeout_seconds" validate:"required"` - AssignedTreatmentLogger config.AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"` - DebugConfig config.DebugConfig `json:"debug_config"` - DeploymentConfig config.DeploymentConfig `json:"deployment_config"` - ManagementService config.ManagementServiceConfig `json:"management_service"` - MonitoringConfig config.Monitoring `json:"monitoring_config"` - SwaggerConfig config.SwaggerConfig `json:"swagger_config"` - NewRelicConfig newrelic.Config `json:"new_relic_config"` - SentryConfig sentry.Config `json:"sentry_config"` + AssignedTreatmentLogger config.AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"` + DebugConfig config.DebugConfig `json:"debug_config"` + DeploymentConfig config.DeploymentConfig `json:"deployment_config"` + MessageQueueConfig common_mq_config.MessageQueueConfig `json:"message_queue_config"` + ManagementService config.ManagementServiceConfig `json:"management_service"` + MonitoringConfig config.Monitoring `json:"monitoring_config"` + SwaggerConfig config.SwaggerConfig `json:"swagger_config"` + NewRelicConfig newrelic.Config `json:"new_relic_config"` + SentryConfig sentry.Config `json:"sentry_config"` + ManagementServicePollerConfig config.ManagementServicePollerConfig `json:"management_service_poller_config"` } type Variable struct { diff --git a/plugins/turing/manager/experiment_manager.go b/plugins/turing/manager/experiment_manager.go index b10ad2bb..9c93c2fa 100644 --- a/plugins/turing/manager/experiment_manager.go +++ b/plugins/turing/manager/experiment_manager.go @@ -14,7 +14,6 @@ import ( xpclient "github.com/caraml-dev/xp/clients/management" "github.com/caraml-dev/xp/common/api/schema" - common_mq_config "github.com/caraml-dev/xp/common/messagequeue" _config "github.com/caraml-dev/xp/plugins/turing/config" "github.com/caraml-dev/xp/treatment-service/config" "github.com/go-playground/validator/v10" @@ -142,37 +141,20 @@ func (em *experimentManager) MakeTreatmentServicePluginConfig( projectID int, ) (*config.Config, error) { pluginConfig := &config.Config{ - Port: em.TreatmentServicePluginConfig.Port, - ProjectIds: []string{strconv.Itoa(projectID)}, - AssignedTreatmentLogger: em.TreatmentServicePluginConfig.AssignedTreatmentLogger, - DebugConfig: em.TreatmentServicePluginConfig.DebugConfig, - DeploymentConfig: em.TreatmentServicePluginConfig.DeploymentConfig, - ManagementService: em.TreatmentServicePluginConfig.ManagementService, - MonitoringConfig: em.TreatmentServicePluginConfig.MonitoringConfig, - SwaggerConfig: em.TreatmentServicePluginConfig.SwaggerConfig, - NewRelicConfig: em.TreatmentServicePluginConfig.NewRelicConfig, - SentryConfig: em.TreatmentServicePluginConfig.SentryConfig, - SegmenterConfig: *treatmentServiceConfig.SegmenterConfig, - } - messageQueueKind := *treatmentServiceConfig.MessageQueueConfig.Kind - switch messageQueueKind { - case schema.MessageQueueKindPubsub: - pluginConfig.MessageQueueConfig = common_mq_config.MessageQueueConfig{ - Kind: "pubsub", - PubSubConfig: &common_mq_config.PubSubConfig{ - Project: *treatmentServiceConfig.MessageQueueConfig.PubSub.Project, - TopicName: *treatmentServiceConfig.MessageQueueConfig.PubSub.TopicName, - PubSubTimeoutSeconds: em.TreatmentServicePluginConfig.PubSubTimeoutSeconds, - }, - } - case schema.MessageQueueKindNoop: - pluginConfig.MessageQueueConfig = common_mq_config.MessageQueueConfig{ - Kind: "", - } - default: - return nil, fmt.Errorf("invalid message queue kind (%s) was provided", messageQueueKind) + Port: em.TreatmentServicePluginConfig.Port, + ProjectIds: []string{strconv.Itoa(projectID)}, + AssignedTreatmentLogger: em.TreatmentServicePluginConfig.AssignedTreatmentLogger, + DebugConfig: em.TreatmentServicePluginConfig.DebugConfig, + DeploymentConfig: em.TreatmentServicePluginConfig.DeploymentConfig, + MessageQueueConfig: em.TreatmentServicePluginConfig.MessageQueueConfig, + ManagementService: em.TreatmentServicePluginConfig.ManagementService, + MonitoringConfig: em.TreatmentServicePluginConfig.MonitoringConfig, + SwaggerConfig: em.TreatmentServicePluginConfig.SwaggerConfig, + NewRelicConfig: em.TreatmentServicePluginConfig.NewRelicConfig, + SentryConfig: em.TreatmentServicePluginConfig.SentryConfig, + SegmenterConfig: *treatmentServiceConfig.SegmenterConfig, + ManagementServicePollerConfig: em.TreatmentServicePluginConfig.ManagementServicePollerConfig, } - return pluginConfig, nil } diff --git a/plugins/turing/manager/experiment_manager_test.go b/plugins/turing/manager/experiment_manager_test.go index 045a0122..a257c91a 100644 --- a/plugins/turing/manager/experiment_manager_test.go +++ b/plugins/turing/manager/experiment_manager_test.go @@ -62,6 +62,14 @@ func TestNewExperimentManager(t *testing.T) { "environment_type": "dev", "max_go_routines": 200 }, + "message_queue_config": { + "kind": "dev", + "pub_sub_config": { + "project":"dev", + "topic_name":"xp-update", + "pub_sub_timeout_seconds": 30 + } + }, "management_service": { "authorization_enabled": true, "url": "http://xp-management.global.io/api/xp/v1" diff --git a/plugins/turing/runner/experiment_runner.go b/plugins/turing/runner/experiment_runner.go index 197b7371..12a39ba6 100644 --- a/plugins/turing/runner/experiment_runner.go +++ b/plugins/turing/runner/experiment_runner.go @@ -257,6 +257,9 @@ func (er *experimentRunner) startBackgroundServices( } }() } + if er.appContext.PollerService != nil { + er.appContext.PollerService.Start() + } } func (er *experimentRunner) getRequestParams( diff --git a/treatment-service/appcontext/appcontext.go b/treatment-service/appcontext/appcontext.go index 8e961263..e74c1fbd 100644 --- a/treatment-service/appcontext/appcontext.go +++ b/treatment-service/appcontext/appcontext.go @@ -24,6 +24,7 @@ type AppContext struct { AssignedTreatmentLogger *monitoring.AssignedTreatmentLogger LocalStorage *models.LocalStorage + PollerService *services.PollerService } func NewAppContext(cfg *config.Config) (*AppContext, error) { @@ -122,6 +123,11 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { return nil, err } + var pollerService *services.PollerService + if cfg.ManagementServicePollerConfig.Enabled { + pollerService = services.NewPollerService(cfg.ManagementServicePollerConfig, localStorage) + } + appContext := &AppContext{ ExperimentService: experimentSvc, MetricService: metricService, @@ -131,6 +137,7 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { AssignedTreatmentLogger: logger, MessageQueueService: messageQueueService, LocalStorage: localStorage, + PollerService: pollerService, } return appContext, nil diff --git a/treatment-service/config/config.go b/treatment-service/config/config.go index a738b02d..d2c1d5f6 100644 --- a/treatment-service/config/config.go +++ b/treatment-service/config/config.go @@ -3,7 +3,6 @@ package config import ( "fmt" "strconv" - "time" "github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic" "github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry" @@ -96,8 +95,8 @@ type ManagementServiceConfig struct { } type ManagementServicePollerConfig struct { - Enabled bool `default:"false"` - PollInterval time.Duration `default:"30s"` + Enabled bool `json:"enabled" default:"false"` + PollIntervalSeconds int `json:"poll_interval" default:"30"` } func (c *Config) GetProjectIds() []models.ProjectId { diff --git a/treatment-service/config/config_test.go b/treatment-service/config/config_test.go index 9c04fd6c..843faa15 100644 --- a/treatment-service/config/config_test.go +++ b/treatment-service/config/config_test.go @@ -2,7 +2,6 @@ package config import ( "testing" - "time" "github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic" "github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry" @@ -66,8 +65,8 @@ func TestDefaultConfigs(t *testing.T) { SentryConfig: sentry.Config{Enabled: false, Labels: emptyStringMap}, SegmenterConfig: make(map[string]interface{}), ManagementServicePollerConfig: ManagementServicePollerConfig{ - Enabled: false, - PollInterval: 30 * time.Second, + Enabled: false, + PollIntervalSeconds: 30, }, } cfg, err := Load() @@ -133,8 +132,8 @@ func TestLoadMultipleConfigs(t *testing.T) { SentryConfig: sentry.Config{Enabled: true, DSN: "my.amazing.sentry.dsn", Labels: map[string]string{"app": "xp-treatment-service"}}, SegmenterConfig: map[string]interface{}{"s2_ids": map[string]interface{}{"mins2celllevel": 9, "maxs2celllevel": 15}}, ManagementServicePollerConfig: ManagementServicePollerConfig{ - Enabled: false, - PollInterval: 30 * time.Second, + Enabled: false, + PollIntervalSeconds: 30, }, } diff --git a/treatment-service/go.mod b/treatment-service/go.mod index c4c1d473..fda4fabc 100644 --- a/treatment-service/go.mod +++ b/treatment-service/go.mod @@ -14,7 +14,6 @@ require ( github.com/deepmap/oapi-codegen v1.11.0 github.com/getkin/kin-openapi v0.94.0 github.com/go-chi/chi/v5 v5.0.7 - github.com/go-playground/validator/v10 v10.11.1 github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 github.com/golang/geo v0.0.0-20210211234256-740aa86cb551 github.com/google/go-cmp v0.6.0 @@ -104,8 +103,6 @@ require ( github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.22.3 // indirect - github.com/go-playground/locales v0.14.0 // indirect - github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-viper/mapstructure/v2 v2.0.0 // indirect github.com/goccy/go-json v0.9.11 // indirect github.com/gofrs/flock v0.8.1 // indirect @@ -139,7 +136,6 @@ require ( github.com/klauspost/asmfmt v1.3.2 // indirect github.com/klauspost/compress v1.17.4 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect - github.com/leodido/go-urn v1.2.1 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect diff --git a/treatment-service/go.sum b/treatment-service/go.sum index 788fc763..f4f0b24d 100644 --- a/treatment-service/go.sum +++ b/treatment-service/go.sum @@ -333,18 +333,13 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh github.com/go-openapi/swag v0.21.1/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g= github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= -github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= -github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU= github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs= github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= -github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho= github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA= github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4= github.com/go-playground/validator/v10 v10.11.0/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= -github.com/go-playground/validator/v10 v10.11.1 h1:prmOlTVv+YjZjmRmNSF3VmspqJIxJWXmqUsHwfTRRkQ= -github.com/go-playground/validator/v10 v10.11.1/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= github.com/go-sql-driver/mysql v1.3.0 h1:pgwjLi/dvffoP9aabwkT3AKpXQM93QARkjFhDDqC1UE= github.com/go-sql-driver/mysql v1.3.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -604,7 +599,6 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/labstack/echo/v4 v4.7.2/go.mod h1:xkCDAdFCIf8jsFQ5NnbK7oqaF/yU1A1X20Ltm0OvSks= github.com/labstack/gommon v0.3.1/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= -github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/lestrrat-go/backoff/v2 v2.0.8/go.mod h1:rHP/q/r9aT27n24JQLa7JhSQZCKBBOiM/uP402WwN8Y= github.com/lestrrat-go/blackmagic v1.0.0/go.mod h1:TNgH//0vYSs8VXDCfkZLgIrVTTXQELZffUV0tz3MtdQ= diff --git a/treatment-service/server/server.go b/treatment-service/server/server.go index e94f17d4..8704b71e 100644 --- a/treatment-service/server/server.go +++ b/treatment-service/server/server.go @@ -33,8 +33,6 @@ type Server struct { subscribe bool // cleanup captures all the actions to be executed on server shut down cleanup []func() - // poller captures the poller instance - poller *Poller } // NewServer creates and configures an APIServer serving all application routes. @@ -108,11 +106,6 @@ func NewServer(configFiles []string) (*Server, error) { subscribe = true } - var poller *Poller - if cfg.ManagementServicePollerConfig.Enabled { - poller = NewPoller(cfg.ManagementServicePollerConfig, appCtx.LocalStorage) - } - srv := http.Server{ Addr: cfg.ListenAddress(), Handler: mux, @@ -123,7 +116,6 @@ func NewServer(configFiles []string) (*Server, error) { appContext: appCtx, subscribe: subscribe, cleanup: cleanup, - poller: poller, }, nil } @@ -141,11 +133,6 @@ func (srv *Server) Start() { }() log.Printf("Listening on %s\n", srv.Addr) - if srv.poller != nil { - log.Println("Starting poller...") - srv.poller.Start() - } - stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt) @@ -193,5 +180,9 @@ func (srv *Server) startBackgroundService(errChannel chan error) context.CancelF } }() + if srv.appContext.PollerService != nil { + srv.appContext.PollerService.Start() + } + return cancel } diff --git a/treatment-service/server/poller.go b/treatment-service/services/poller_service.go similarity index 53% rename from treatment-service/server/poller.go rename to treatment-service/services/poller_service.go index da636d4f..8d6928a0 100644 --- a/treatment-service/server/poller.go +++ b/treatment-service/services/poller_service.go @@ -1,4 +1,4 @@ -package server +package services import ( "log" @@ -8,31 +8,33 @@ import ( "github.com/caraml-dev/xp/treatment-service/models" ) -type Poller struct { +type PollerService struct { pollerConfig config.ManagementServicePollerConfig localStorage *models.LocalStorage stopChannel chan struct{} } -// NewPoller creates a new Poller instance with the given configuration and local storage. +// NewPollerService creates a new PollerService instance with the given configuration and local storage. // pollerConfig: configuration for the poller // localStorage: local storage to be used by the poller -func NewPoller(pollerConfig config.ManagementServicePollerConfig, localStorage *models.LocalStorage) *Poller { - return &Poller{ +func NewPollerService(pollerConfig config.ManagementServicePollerConfig, localStorage *models.LocalStorage) *PollerService { + return &PollerService{ pollerConfig: pollerConfig, localStorage: localStorage, stopChannel: make(chan struct{}), } } -func (p *Poller) Start() { - ticker := time.NewTicker(p.pollerConfig.PollInterval) +func (p *PollerService) Start() { + log.Println("Starting management service poller service...") + pollInterval := time.Duration(p.pollerConfig.PollIntervalSeconds) * time.Second + ticker := time.NewTicker(pollInterval) go func() { for { select { case <-ticker.C: err := p.Refresh() - log.Printf("Polling at %v with interval %v", time.Now(), p.pollerConfig.PollInterval) + log.Printf("Polling at %v with interval %v", time.Now(), pollInterval) if err != nil { log.Printf("Error updating local storage: %v", err) continue @@ -45,11 +47,11 @@ func (p *Poller) Start() { }() } -func (p *Poller) Stop() { +func (p *PollerService) Stop() { close(p.stopChannel) } -func (p *Poller) Refresh() error { +func (p *PollerService) Refresh() error { err := p.localStorage.Init() return err }