Skip to content

Commit

Permalink
feat: Sync publishes streaming update (#371)
Browse files Browse the repository at this point in the history
* Add event, emit event, rename a field

* server side update

* client side event

* Code consolidation and cleanup

* little more cleanup

* half an attempt at an integration test

* correct log text

* add overrides

* add expected times (causes timeout)

* make test go

* Make comment more accurate

Co-authored-by: Mike Zorn <[email protected]>

* chans are pointer types already

* modify test

---------

Co-authored-by: Mike Zorn <[email protected]>
  • Loading branch information
tvarney13 and mike-zorn authored Jul 9, 2024
1 parent 7c6597e commit 786c548
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 46 deletions.
8 changes: 4 additions & 4 deletions internal/dev_server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s Server) GetDevProjectsProjectKey(ctx context.Context, request GetDevProj
LastSyncedFromSource: project.LastSyncTime.Unix(),
Context: project.Context,
SourceEnvironmentKey: project.SourceEnvironmentKey,
FlagsState: &project.FlagState,
FlagsState: &project.AllFlagsState,
}

if request.Params.Expand != nil {
Expand Down Expand Up @@ -98,7 +98,7 @@ func (s Server) PostDevProjectsProjectKey(ctx context.Context, request PostDevPr
LastSyncedFromSource: project.LastSyncTime.Unix(),
Context: project.Context,
SourceEnvironmentKey: project.SourceEnvironmentKey,
FlagsState: &project.FlagState,
FlagsState: &project.AllFlagsState,
}

if request.Params.Expand != nil {
Expand Down Expand Up @@ -143,7 +143,7 @@ func (s Server) PatchDevProjectsProjectKey(ctx context.Context, request PatchDev
LastSyncedFromSource: project.LastSyncTime.Unix(),
Context: project.Context,
SourceEnvironmentKey: project.SourceEnvironmentKey,
FlagsState: &project.FlagState,
FlagsState: &project.AllFlagsState,
}

if request.Params.Expand != nil {
Expand Down Expand Up @@ -188,7 +188,7 @@ func (s Server) PatchDevProjectsProjectKeySync(ctx context.Context, request Patc
LastSyncedFromSource: project.LastSyncTime.Unix(),
Context: project.Context,
SourceEnvironmentKey: project.SourceEnvironmentKey,
FlagsState: &project.FlagState,
FlagsState: &project.AllFlagsState,
}

if request.Params.Expand != nil {
Expand Down
6 changes: 3 additions & 3 deletions internal/dev_server/db/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ func (s Sqlite) GetDevProject(ctx context.Context, key string) (*model.Project,
}

// Parse the flag state JSON string
if err := json.Unmarshal([]byte(flagStateData), &project.FlagState); err != nil {
if err := json.Unmarshal([]byte(flagStateData), &project.AllFlagsState); err != nil {
return nil, errors.Wrap(err, "unable to unmarshal flag state data")
}

return &project, nil
}

func (s Sqlite) UpdateProject(ctx context.Context, project model.Project) (bool, error) {
flagsStateJson, err := json.Marshal(project.FlagState)
flagsStateJson, err := json.Marshal(project.AllFlagsState)
if err != nil {
return false, errors.Wrap(err, "unable to marshal flags state when updating project")
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func (s Sqlite) DeleteDevProject(ctx context.Context, key string) (bool, error)
}

func (s Sqlite) InsertProject(ctx context.Context, project model.Project) error {
flagsStateJson, err := json.Marshal(project.FlagState)
flagsStateJson, err := json.Marshal(project.AllFlagsState)
if err != nil {
return errors.Wrap(err, "unable to marshal flags state when writing project")
}
Expand Down
14 changes: 14 additions & 0 deletions internal/dev_server/model/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package model

// Event for individual flag overrides
type UpsertOverrideEvent struct {
FlagKey string
ProjectKey string
FlagState FlagState
}

// Event for full project sync
type SyncEvent struct {
ProjectKey string
AllFlagsState FlagsState
}
13 changes: 3 additions & 10 deletions internal/dev_server/model/override.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ type Override struct {
Version int
}

type UpsertOverrideEvent struct {
FlagKey string
ProjectKey string
FlagState FlagState
}

func UpsertOverride(ctx context.Context, projectKey, flagKey string, value ldvalue.Value) (Override, error) {
// TODO: validate if the flag type matches

Expand All @@ -31,7 +25,7 @@ func UpsertOverride(ctx context.Context, projectKey, flagKey string, value ldval
}

var flagExists bool
for flag := range project.FlagState {
for flag := range project.AllFlagsState {
if flagKey == flag {
flagExists = true
break
Expand All @@ -54,9 +48,8 @@ func UpsertOverride(ctx context.Context, projectKey, flagKey string, value ldval
return Override{}, err
}

observers := GetObserversFromContext(ctx)
flagState := override.Apply(project.FlagState[flagKey])
observers.Notify(UpsertOverrideEvent{
flagState := override.Apply(project.AllFlagsState[flagKey])
GetObserversFromContext(ctx).Notify(UpsertOverrideEvent{
FlagKey: flagKey,
ProjectKey: projectKey,
FlagState: flagState,
Expand Down
8 changes: 4 additions & 4 deletions internal/dev_server/model/override_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func TestUpsertOverride(t *testing.T) {
}

project := &model.Project{
Key: projKey,
FlagState: model.FlagsState{flagKey: model.FlagState{Value: ldvalue.Bool(false), Version: 1}},
Key: projKey,
AllFlagsState: model.FlagsState{flagKey: model.FlagState{Value: ldvalue.Bool(false), Version: 1}},
}

ctx = model.ContextWithStore(ctx, store)
Expand All @@ -50,8 +50,8 @@ func TestUpsertOverride(t *testing.T) {

t.Run("Returns error if flag does not exist in project", func(t *testing.T) {
badProj := model.Project{
Key: projKey,
FlagState: model.FlagsState{},
Key: projKey,
AllFlagsState: model.FlagsState{},
}
store.EXPECT().GetDevProject(gomock.Any(), projKey).Return(&badProj, nil)

Expand Down
24 changes: 18 additions & 6 deletions internal/dev_server/model/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Project struct {
SourceEnvironmentKey string
Context ldcontext.Context
LastSyncTime time.Time
FlagState FlagsState
AllFlagsState FlagsState
}

// CreateProject creates a project and adds it to the database.
Expand All @@ -35,7 +35,7 @@ func CreateProject(ctx context.Context, projectKey, sourceEnvironmentKey string,
return Project{}, err
}

project.FlagState = flagsState
project.AllFlagsState = flagsState
project.LastSyncTime = time.Now()

store := StoreFromContext(ctx)
Expand Down Expand Up @@ -65,7 +65,7 @@ func UpdateProject(ctx context.Context, projectKey string, context *ldcontext.Co
if err != nil {
return Project{}, err
}
project.FlagState = flagsState
project.AllFlagsState = flagsState
project.LastSyncTime = time.Now()
}

Expand All @@ -76,6 +76,7 @@ func UpdateProject(ctx context.Context, projectKey string, context *ldcontext.Co
if !updated {
return Project{}, errors.New("Project not updated")
}

return *project, nil
}

Expand All @@ -90,15 +91,26 @@ func SyncProject(ctx context.Context, projectKey string) (Project, error) {
return Project{}, err
}

project.FlagState = flagsState
project.AllFlagsState = flagsState
project.LastSyncTime = time.Now()

updated, err := store.UpdateProject(ctx, *project)
if err != nil {
return Project{}, err
}
if !updated {
return Project{}, errors.New("Project not updated")
}

allFlagsWithOverrides, err := project.GetFlagStateWithOverridesForProject(ctx)
if err != nil {
return Project{}, errors.Wrapf(err, "unable to get overrides for project, %s", projectKey)
}

GetObserversFromContext(ctx).Notify(SyncEvent{
ProjectKey: project.Key,
AllFlagsState: allFlagsWithOverrides,
})
return *project, nil
}

Expand All @@ -108,8 +120,8 @@ func (p Project) GetFlagStateWithOverridesForProject(ctx context.Context) (Flags
if err != nil {
return FlagsState{}, errors.Wrapf(err, "unable to fetch overrides for project %s", p.Key)
}
withOverrides := make(FlagsState, len(p.FlagState))
for flagKey, flagState := range p.FlagState {
withOverrides := make(FlagsState, len(p.AllFlagsState))
for flagKey, flagState := range p.AllFlagsState {
if override, ok := overrides.GetFlag(flagKey); ok {
flagState = override.Apply(flagState)
}
Expand Down
19 changes: 15 additions & 4 deletions internal/dev_server/model/project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ func TestCreateProject(t *testing.T) {
Key: projKey,
SourceEnvironmentKey: sourceEnvKey,
Context: ldcontext.NewBuilder("user").Key("dev-environment").Build(),
FlagState: model.FromAllFlags(allFlags),
AllFlagsState: model.FromAllFlags(allFlags),
}

assert.Equal(t, expectedProj.Key, p.Key)
assert.Equal(t, expectedProj.SourceEnvironmentKey, p.SourceEnvironmentKey)
assert.Equal(t, expectedProj.Context, p.Context)
assert.Equal(t, expectedProj.FlagState, p.FlagState)
assert.Equal(t, expectedProj.AllFlagsState, p.AllFlagsState)
})
}

Expand Down Expand Up @@ -134,6 +134,11 @@ func TestSyncProject(t *testing.T) {
ctx := model.ContextWithStore(context.Background(), store)
ctx, api, sdk := adapters_mocks.WithMockApiAndSdk(ctx, mockController)

observer := mocks.NewMockObserver(mockController)
observers := model.NewObservers()
observers.RegisterObserver(observer)
ctx = model.SetObserversOnContext(ctx, observers)

proj := model.Project{
Key: "projKey",
SourceEnvironmentKey: "srcEnvKey",
Expand Down Expand Up @@ -183,6 +188,12 @@ func TestSyncProject(t *testing.T) {
api.EXPECT().GetSdkKey(gomock.Any(), proj.Key, proj.SourceEnvironmentKey).Return("sdkKey", nil)
sdk.EXPECT().GetAllFlagsState(gomock.Any(), gomock.Any(), "sdkKey").Return(flagstate.AllFlags{}, nil)
store.EXPECT().UpdateProject(gomock.Any(), gomock.Any()).Return(true, nil)
observer.
EXPECT().
Handle(model.SyncEvent{
ProjectKey: proj.Key,
AllFlagsState: model.FlagsState{},
})

project, err := model.SyncProject(ctx, proj.Key)
assert.Nil(t, err)
Expand All @@ -196,8 +207,8 @@ func TestGetFlagStateWithOverridesForProject(t *testing.T) {
ctx := model.ContextWithStore(context.Background(), store)
flagKey := "flg"
proj := model.Project{
Key: "projKey",
FlagState: model.FlagsState{flagKey: model.FlagState{Value: ldvalue.Bool(false), Version: 1}},
Key: "projKey",
AllFlagsState: model.FlagsState{flagKey: model.FlagState{Value: ldvalue.Bool(false), Version: 1}},
}

t.Run("Returns error if store fetch fails", func(t *testing.T) {
Expand Down
37 changes: 36 additions & 1 deletion internal/dev_server/sdk/go_sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/launchdarkly/go-sdk-common/v3/ldcontext"
"github.com/launchdarkly/go-sdk-common/v3/ldvalue"
ldclient "github.com/launchdarkly/go-server-sdk/v7"
"github.com/launchdarkly/go-server-sdk/v7/interfaces"
"github.com/launchdarkly/go-server-sdk/v7/interfaces/flagstate"
"github.com/launchdarkly/ldcli/internal/dev_server/adapters/mocks"
"github.com/launchdarkly/ldcli/internal/dev_server/db"
Expand Down Expand Up @@ -106,8 +107,42 @@ func TestSDKRoutesViaGoSDK(t *testing.T) {
assert.Equal(t, map[string]any{"cat": "hat"}, val.AsArbitraryValue())
})

// Mock scenario: we re-sync and the SDK returns new values and higher version numbers
updatedFlags := flagstate.NewAllFlagsBuilder().
AddFlag("boolFlag", flagstate.FlagState{Value: ldvalue.Bool(false), Version: 2}).
AddFlag("stringFlag", flagstate.FlagState{Value: ldvalue.String("pool"), Version: 2}).
AddFlag("intFlag", flagstate.FlagState{Value: ldvalue.Int(789), Version: 2}).
AddFlag("doubleFlag", flagstate.FlagState{Value: ldvalue.Float64(101.01), Version: 2}).
AddFlag("jsonFlag", flagstate.FlagState{Value: ldvalue.CopyArbitraryValue(map[string]any{"cat": "bababooey"}), Version: 2}).
Build()
valuesMap := updatedFlags.ToValuesMap()

sdk.EXPECT().GetAllFlagsState(gomock.Any(), gomock.Any(), testSdkKey).Return(updatedFlags, nil)

// This test is testing the "put" payload in a roundabout way by verifying each of the flags are in there.
t.Run("Sync sends full flag payload for project", func(t *testing.T) {
trackers := make(map[string]<-chan interfaces.FlagValueChangeEvent, len(valuesMap))

for flagKey := range valuesMap {
flagUpdateChan := ld.GetFlagTracker().AddFlagValueChangeListener(flagKey, ldContext, ldvalue.String("uh-oh"))
defer ld.GetFlagTracker().RemoveFlagValueChangeListener(flagUpdateChan)
trackers[flagKey] = flagUpdateChan
}

_, err := model.SyncProject(ctx, projectKey)
require.NoError(t, err)

for flagKey, value := range valuesMap {
updateTracker, ok := trackers[flagKey]
require.True(t, ok)

update := <-updateTracker
assert.Equal(t, value.AsArbitraryValue(), update.NewValue.AsArbitraryValue())
}
})

updates := map[string]ldvalue.Value{
"boolFlag": ldvalue.Bool(false),
"boolFlag": ldvalue.Bool(true),
"stringFlag": ldvalue.String("drool"),
"intFlag": ldvalue.Int(456),
"doubleFlag": ldvalue.Float64(88.88),
Expand Down
29 changes: 22 additions & 7 deletions internal/dev_server/sdk/stream_client_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ func StreamClientFlags(w http.ResponseWriter, r *http.Request) {
WriteError(ctx, w, errors.Wrap(err, "failed to marshal flag state"))
return
}
updateChan, doneChan := OpenStream(w, r.Context().Done(), Message{"put", jsonBody})
updateChan, doneChan := OpenStream(
w,
r.Context().Done(),
Message{Event: TYPE_PUT, Data: jsonBody},
)
defer close(updateChan)
projectKey := GetProjectKeyFromContext(ctx)
observer := clientFlagsObserver{updateChan, projectKey}
Expand Down Expand Up @@ -50,23 +54,34 @@ func (c clientFlagsObserver) Handle(event interface{}) {
log.Printf("clientFlagsObserver: handling flag state event: %v", event)
switch event := event.(type) {
case model.UpsertOverrideEvent:
data, err := json.Marshal(clientSidePatchData{
err := SendMessage(c.updateChan, TYPE_PATCH, clientFlag{
Key: event.FlagKey,
Version: event.FlagState.Version,
Value: event.FlagState.Value,
})
if err != nil {
panic(errors.Wrap(err, "failed to marshal flag state in observer"))
}
c.updateChan <- Message{
Event: "patch",
Data: data,
case model.SyncEvent:
clientFlags := clientFlags{}
for flagKey, flagState := range event.AllFlagsState {
clientFlags[flagKey] = clientFlag{
Version: flagState.Version,
Value: flagState.Value,
}
}

err := SendMessage(c.updateChan, TYPE_PUT, clientFlags)
if err != nil {
panic(errors.Wrap(err, "failed to marshal flag state in observer"))
}
}
}

type clientSidePatchData struct {
Key string `json:"key"`
type clientFlag struct {
Key string `json:"key,omitempty"`
Version int `json:"version"`
Value ldvalue.Value `json:"value"`
}

type clientFlags map[string]clientFlag
Loading

0 comments on commit 786c548

Please sign in to comment.