From 2373446a5eea95d5193be547dad41280a228fe2b Mon Sep 17 00:00:00 2001 From: Faqih Arifian Date: Fri, 28 Feb 2025 09:25:37 +0700 Subject: [PATCH 1/4] feat: improve locking: lock only after fetch completed --- treatment-service/models/storage.go | 30 +++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/treatment-service/models/storage.go b/treatment-service/models/storage.go index 52d0529..8e3cedd 100644 --- a/treatment-service/models/storage.go +++ b/treatment-service/models/storage.go @@ -254,9 +254,6 @@ func (i *ExperimentIndex) checkSegmentHasWeakMatch(segmentName string) bool { } func (s *LocalStorage) InsertProjectSettings(projectSettings *pubsub.ProjectSettings) error { - s.Lock() - defer s.Unlock() - // check that settings with the same Id doesn't exist for _, existingSettings := range s.ProjectSettings { if existingSettings.GetProjectId() == projectSettings.GetProjectId() { @@ -264,7 +261,8 @@ func (s *LocalStorage) InsertProjectSettings(projectSettings *pubsub.ProjectSett } } - s.ProjectSettings = append(s.ProjectSettings, projectSettings) + s.swapProjectSettings(append(s.ProjectSettings, projectSettings)) + // Update project segmenters on creation if err := s.initProjectSegmenters([]*pubsub.ProjectSettings{projectSettings}); err != nil { return err @@ -306,7 +304,7 @@ func (s *LocalStorage) FindProjectSettingsWithId(projectId ProjectId) *pubsub.Pr } project := OpenAPIProjectSettingsSpecToProtobuf(projectSettingsResponse.JSON200.Data) - s.ProjectSettings = append(s.ProjectSettings, project) + s.swapProjectSettings(append(s.ProjectSettings, project)) return project } @@ -487,9 +485,6 @@ func (s *LocalStorage) DumpExperiments(filepath string) error { } func (s *LocalStorage) Init() error { - s.Lock() - defer s.Unlock() - var subscribedProjectSettings []*pubsub.ProjectSettings var err error if len(s.subscribedProjectIds) > 0 { @@ -504,7 +499,8 @@ func (s *LocalStorage) Init() error { if len(s.subscribedProjectIds) > 0 && len(subscribedProjectSettings) != len(s.subscribedProjectIds) { return errors.New("not all subscribed project ids are found") } - s.ProjectSettings = subscribedProjectSettings + + s.swapProjectSettings(subscribedProjectSettings) err = s.initProjectSegmenters(subscribedProjectSettings) if err != nil { @@ -649,12 +645,16 @@ func (s *LocalStorage) initExperiments(subscribedProjectSettings []*pubsub.Proje } } } + + s.Lock() + defer s.Unlock() s.Experiments = index return nil } func (s *LocalStorage) initProjectSegmenters(settings []*pubsub.ProjectSettings) error { + projectSegmenters := make(map[uint32]map[string]schema.SegmenterType) for _, projectSettings := range settings { log.Printf("retrieving project segmenters for %d", projectSettings.ProjectId) segmentersResp, err := s.managementClient.ListSegmentersWithResponse( @@ -669,11 +669,21 @@ func (s *LocalStorage) initProjectSegmenters(settings []*pubsub.ProjectSettings) for _, v := range segmentersResp.JSON200.Data { segmenters[v.Name] = schema.SegmenterType(strings.ToLower(string(v.Type))) } - s.ProjectSegmenters[ProjectId(projectSettings.ProjectId)] = segmenters + projectSegmenters[ProjectId(projectSettings.ProjectId)] = segmenters } + + s.Lock() + defer s.Unlock() + s.ProjectSegmenters = projectSegmenters return nil } +func (s *LocalStorage) swapProjectSettings(projectSettings []*pubsub.ProjectSettings) { + s.Lock() + defer s.Unlock() + s.ProjectSettings = projectSettings +} + func (s *LocalStorage) UpdateProjectSegmenters(segmenter *_segmenters.SegmenterConfiguration, projectId int64) { s.Lock() defer s.Unlock() From 74bd0882138b3d76b0fe856387f61722a9255949 Mon Sep 17 00:00:00 2001 From: Faqih Arifian Date: Mon, 3 Mar 2025 14:30:28 +0700 Subject: [PATCH 2/4] feat: single lock for multiple update at once --- treatment-service/models/storage.go | 81 +++++++++++++++++------------ 1 file changed, 47 insertions(+), 34 deletions(-) diff --git a/treatment-service/models/storage.go b/treatment-service/models/storage.go index 8e3cedd..5690732 100644 --- a/treatment-service/models/storage.go +++ b/treatment-service/models/storage.go @@ -261,12 +261,16 @@ func (s *LocalStorage) InsertProjectSettings(projectSettings *pubsub.ProjectSett } } - s.swapProjectSettings(append(s.ProjectSettings, projectSettings)) - // Update project segmenters on creation - if err := s.initProjectSegmenters([]*pubsub.ProjectSettings{projectSettings}); err != nil { + newSegmenters, err := s.fetchProjectSegmenters([]*pubsub.ProjectSettings{projectSettings}) + if err != nil { return err } + + s.Lock() + defer s.Unlock() + s.ProjectSegmenters = newSegmenters + s.ProjectSettings = append(s.ProjectSettings, projectSettings) return nil } @@ -282,6 +286,21 @@ func (s *LocalStorage) UpdateProjectSettings(updatedProjectSettings *pubsub.Proj } func (s *LocalStorage) FindProjectSettingsWithId(projectId ProjectId) *pubsub.ProjectSettings { + projectSettings := s.findProjectSettingsWithId(projectId) + if projectSettings != nil { + return projectSettings + } + + // In case new project was just created and we are subscribed to its ID + // we'll try to retrieve it from management service + projectSettings, err := s.fetchProjectSettingsWithId(projectId) + if err != nil { + return nil + } + return projectSettings +} + +func (s *LocalStorage) findProjectSettingsWithId(projectId ProjectId) *pubsub.ProjectSettings { s.RLock() defer s.RUnlock() @@ -294,18 +313,21 @@ func (s *LocalStorage) FindProjectSettingsWithId(projectId ProjectId) *pubsub.Pr return settings } } + return nil +} - // In case new project was just created and we are subscribed to its ID - // we'll try to retrieve it from management service +func (s *LocalStorage) fetchProjectSettingsWithId(projectId ProjectId) (*pubsub.ProjectSettings, error) { projectSettingsResponse, err := s.managementClient.GetProjectSettingsWithResponse( context.Background(), int64(projectId)) if err != nil { - return nil + return nil, err } project := OpenAPIProjectSettingsSpecToProtobuf(projectSettingsResponse.JSON200.Data) - s.swapProjectSettings(append(s.ProjectSettings, project)) - return project + s.Lock() + defer s.Unlock() + s.ProjectSettings = append(s.ProjectSettings, project) + return project, nil } func (s *LocalStorage) GetSegmentersTypeMapping(projectId ProjectId) (map[string]schema.SegmenterType, error) { @@ -500,18 +522,22 @@ func (s *LocalStorage) Init() error { return errors.New("not all subscribed project ids are found") } - s.swapProjectSettings(subscribedProjectSettings) - - err = s.initProjectSegmenters(subscribedProjectSettings) + newSegmenters, err := s.fetchProjectSegmenters(subscribedProjectSettings) if err != nil { return err } - err = s.initExperiments(subscribedProjectSettings) + newExperiments, err := s.fetchExperiments(subscribedProjectSettings) if err != nil { return err } + s.Lock() + defer s.Unlock() + s.ProjectSegmenters = newSegmenters + s.Experiments = newExperiments + s.ProjectSettings = subscribedProjectSettings + return nil } @@ -593,7 +619,7 @@ func NewLocalStorage( return &s, err } -func (s *LocalStorage) initExperiments(subscribedProjectSettings []*pubsub.ProjectSettings) error { +func (s *LocalStorage) fetchExperiments(subscribedProjectSettings []*pubsub.ProjectSettings) (map[ProjectId][]*ExperimentIndex, error) { log.Println("retrieving project experiments...") index := make(map[ProjectId][]*ExperimentIndex) for _, projectSettings := range subscribedProjectSettings { @@ -610,7 +636,7 @@ func (s *LocalStorage) initExperiments(subscribedProjectSettings []*pubsub.Proje &managementClient.ListExperimentsParams{StartTime: &startTime, EndTime: &endTime, Status: &activeStatus}, ) if err != nil { - return err + return nil, err } if resp.StatusCode() == 200 { @@ -618,7 +644,7 @@ func (s *LocalStorage) initExperiments(subscribedProjectSettings []*pubsub.Proje index[projectId] = make([]*ExperimentIndex, 0) index, err = flattenProjectExperiments(projectId, index, projectExperiments, segmentersType) if err != nil { - return err + return nil, err } var pages int @@ -633,27 +659,23 @@ func (s *LocalStorage) initExperiments(subscribedProjectSettings []*pubsub.Proje &managementClient.ListExperimentsParams{Page: &page, StartTime: &startTime, EndTime: &endTime, Status: &activeStatus}, ) if err != nil { - return err + return nil, err } if resp.StatusCode() == 200 { projectExperiments := resp.JSON200.Data index, err = flattenProjectExperiments(projectId, index, projectExperiments, segmentersType) if err != nil { - return err + return nil, err } } } } } - s.Lock() - defer s.Unlock() - s.Experiments = index - - return nil + return index, nil } -func (s *LocalStorage) initProjectSegmenters(settings []*pubsub.ProjectSettings) error { +func (s *LocalStorage) fetchProjectSegmenters(settings []*pubsub.ProjectSettings) (map[ProjectId]map[string]schema.SegmenterType, error) { projectSegmenters := make(map[uint32]map[string]schema.SegmenterType) for _, projectSettings := range settings { log.Printf("retrieving project segmenters for %d", projectSettings.ProjectId) @@ -663,7 +685,7 @@ func (s *LocalStorage) initProjectSegmenters(settings []*pubsub.ProjectSettings) &managementClient.ListSegmentersParams{}, ) if err != nil { - return err + return nil, err } segmenters := map[string]schema.SegmenterType{} for _, v := range segmentersResp.JSON200.Data { @@ -672,16 +694,7 @@ func (s *LocalStorage) initProjectSegmenters(settings []*pubsub.ProjectSettings) projectSegmenters[ProjectId(projectSettings.ProjectId)] = segmenters } - s.Lock() - defer s.Unlock() - s.ProjectSegmenters = projectSegmenters - return nil -} - -func (s *LocalStorage) swapProjectSettings(projectSettings []*pubsub.ProjectSettings) { - s.Lock() - defer s.Unlock() - s.ProjectSettings = projectSettings + return projectSegmenters, nil } func (s *LocalStorage) UpdateProjectSegmenters(segmenter *_segmenters.SegmenterConfiguration, projectId int64) { From 66d95e9795e8c3814f5d201c89367b59d09c4fe3 Mon Sep 17 00:00:00 2001 From: Faqih Arifian Date: Tue, 4 Mar 2025 11:02:37 +0700 Subject: [PATCH 3/4] feat: use newly fetched segmenters to fetch experiments --- treatment-service/models/storage.go | 30 +++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/treatment-service/models/storage.go b/treatment-service/models/storage.go index 5690732..ed83d74 100644 --- a/treatment-service/models/storage.go +++ b/treatment-service/models/storage.go @@ -255,10 +255,9 @@ func (i *ExperimentIndex) checkSegmentHasWeakMatch(segmentName string) bool { func (s *LocalStorage) InsertProjectSettings(projectSettings *pubsub.ProjectSettings) error { // check that settings with the same Id doesn't exist - for _, existingSettings := range s.ProjectSettings { - if existingSettings.GetProjectId() == projectSettings.GetProjectId() { - return nil - } + existingProjectSettings := s.findProjectSettingsById(ProjectId(projectSettings.GetProjectId())) + if existingProjectSettings != nil { + return nil } // Update project segmenters on creation @@ -286,7 +285,7 @@ func (s *LocalStorage) UpdateProjectSettings(updatedProjectSettings *pubsub.Proj } func (s *LocalStorage) FindProjectSettingsWithId(projectId ProjectId) *pubsub.ProjectSettings { - projectSettings := s.findProjectSettingsWithId(projectId) + projectSettings := s.findSubscribedProjectSettingsById(projectId) if projectSettings != nil { return projectSettings } @@ -300,7 +299,7 @@ func (s *LocalStorage) FindProjectSettingsWithId(projectId ProjectId) *pubsub.Pr return projectSettings } -func (s *LocalStorage) findProjectSettingsWithId(projectId ProjectId) *pubsub.ProjectSettings { +func (s *LocalStorage) findSubscribedProjectSettingsById(projectId ProjectId) *pubsub.ProjectSettings { s.RLock() defer s.RUnlock() @@ -308,6 +307,13 @@ func (s *LocalStorage) findProjectSettingsWithId(projectId ProjectId) *pubsub.Pr return nil } + return s.findProjectSettingsById(projectId) +} + +func (s *LocalStorage) findProjectSettingsById(projectId ProjectId) *pubsub.ProjectSettings { + s.RLock() + defer s.RUnlock() + for _, settings := range s.ProjectSettings { if ProjectId(settings.ProjectId) == projectId { return settings @@ -342,10 +348,10 @@ func (s *LocalStorage) GetSegmentersTypeMapping(projectId ProjectId) (map[string } func (s *LocalStorage) FindExperiments(projectId ProjectId, filters []SegmentFilter) []*ExperimentMatch { - experiments := s.Experiments[projectId] s.RLock() defer s.RUnlock() + experiments := s.Experiments[projectId] var matched = make([]*ExperimentMatch, 0) for _, item := range experiments { @@ -376,10 +382,10 @@ func (s *LocalStorage) FindExperiments(projectId ProjectId, filters []SegmentFil } func (s *LocalStorage) FindExperimentWithId(projectId ProjectId, experimentId int64) *pubsub.Experiment { - currentExperiments, settingsExist := s.Experiments[projectId] - s.RLock() defer s.RUnlock() + + currentExperiments, settingsExist := s.Experiments[projectId] if !settingsExist { return nil } @@ -527,7 +533,7 @@ func (s *LocalStorage) Init() error { return err } - newExperiments, err := s.fetchExperiments(subscribedProjectSettings) + newExperiments, err := s.fetchExperiments(subscribedProjectSettings, newSegmenters) if err != nil { return err } @@ -619,7 +625,7 @@ func NewLocalStorage( return &s, err } -func (s *LocalStorage) fetchExperiments(subscribedProjectSettings []*pubsub.ProjectSettings) (map[ProjectId][]*ExperimentIndex, error) { +func (s *LocalStorage) fetchExperiments(subscribedProjectSettings []*pubsub.ProjectSettings, projectSegmenters map[ProjectId]map[string]schema.SegmenterType) (map[ProjectId][]*ExperimentIndex, error) { log.Println("retrieving project experiments...") index := make(map[ProjectId][]*ExperimentIndex) for _, projectSettings := range subscribedProjectSettings { @@ -629,7 +635,7 @@ func (s *LocalStorage) fetchExperiments(subscribedProjectSettings []*pubsub.Proj endTime := time.Now().Add(855360 * time.Hour) activeStatus := schema.ExperimentStatusActive - segmentersType := s.ProjectSegmenters[projectId] + segmentersType := projectSegmenters[projectId] resp, err := s.managementClient.ListExperimentsWithResponse( context.TODO(), projectSettings.ProjectId, From cdb488b2c0fcc64d4a3ad825bec516d69bd99f36 Mon Sep 17 00:00:00 2001 From: Faqih Arifian Date: Tue, 4 Mar 2025 11:47:49 +0700 Subject: [PATCH 4/4] fix: linter warning --- treatment-service/models/storage.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/treatment-service/models/storage.go b/treatment-service/models/storage.go index ed83d74..90ad674 100644 --- a/treatment-service/models/storage.go +++ b/treatment-service/models/storage.go @@ -625,7 +625,10 @@ func NewLocalStorage( return &s, err } -func (s *LocalStorage) fetchExperiments(subscribedProjectSettings []*pubsub.ProjectSettings, projectSegmenters map[ProjectId]map[string]schema.SegmenterType) (map[ProjectId][]*ExperimentIndex, error) { +func (s *LocalStorage) fetchExperiments( + subscribedProjectSettings []*pubsub.ProjectSettings, + projectSegmenters map[ProjectId]map[string]schema.SegmenterType, +) (map[ProjectId][]*ExperimentIndex, error) { log.Println("retrieving project experiments...") index := make(map[ProjectId][]*ExperimentIndex) for _, projectSettings := range subscribedProjectSettings {