Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Sync publishes streaming update #371

8 changes: 4 additions & 4 deletions internal/dev_server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s Server) GetDevProjectsProjectKey(ctx context.Context, request GetDevProj
LastSyncedFromSource: project.LastSyncTime.Unix(),
Context: project.Context,
SourceEnvironmentKey: project.SourceEnvironmentKey,
FlagsState: &project.FlagState,
FlagsState: &project.AllFlagsState,
}

if request.Params.Expand != nil {
Expand Down Expand Up @@ -98,7 +98,7 @@ func (s Server) PostDevProjectsProjectKey(ctx context.Context, request PostDevPr
LastSyncedFromSource: project.LastSyncTime.Unix(),
Context: project.Context,
SourceEnvironmentKey: project.SourceEnvironmentKey,
FlagsState: &project.FlagState,
FlagsState: &project.AllFlagsState,
}

if request.Params.Expand != nil {
Expand Down Expand Up @@ -143,7 +143,7 @@ func (s Server) PatchDevProjectsProjectKey(ctx context.Context, request PatchDev
LastSyncedFromSource: project.LastSyncTime.Unix(),
Context: project.Context,
SourceEnvironmentKey: project.SourceEnvironmentKey,
FlagsState: &project.FlagState,
FlagsState: &project.AllFlagsState,
}

if request.Params.Expand != nil {
Expand Down Expand Up @@ -188,7 +188,7 @@ func (s Server) PatchDevProjectsProjectKeySync(ctx context.Context, request Patc
LastSyncedFromSource: project.LastSyncTime.Unix(),
Context: project.Context,
SourceEnvironmentKey: project.SourceEnvironmentKey,
FlagsState: &project.FlagState,
FlagsState: &project.AllFlagsState,
}

if request.Params.Expand != nil {
Expand Down
6 changes: 3 additions & 3 deletions internal/dev_server/db/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ func (s Sqlite) GetDevProject(ctx context.Context, key string) (*model.Project,
}

// Parse the flag state JSON string
if err := json.Unmarshal([]byte(flagStateData), &project.FlagState); err != nil {
if err := json.Unmarshal([]byte(flagStateData), &project.AllFlagsState); err != nil {
return nil, errors.Wrap(err, "unable to unmarshal flag state data")
}

return &project, nil
}

func (s Sqlite) UpdateProject(ctx context.Context, project model.Project) (bool, error) {
flagsStateJson, err := json.Marshal(project.FlagState)
flagsStateJson, err := json.Marshal(project.AllFlagsState)
if err != nil {
return false, errors.Wrap(err, "unable to marshal flags state when updating project")
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func (s Sqlite) DeleteDevProject(ctx context.Context, key string) (bool, error)
}

func (s Sqlite) InsertProject(ctx context.Context, project model.Project) error {
flagsStateJson, err := json.Marshal(project.FlagState)
flagsStateJson, err := json.Marshal(project.AllFlagsState)
if err != nil {
return errors.Wrap(err, "unable to marshal flags state when writing project")
}
Expand Down
14 changes: 14 additions & 0 deletions internal/dev_server/model/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package model

// Event for individual flag overrides
type UpsertOverrideEvent struct {
FlagKey string
ProjectKey string
FlagState FlagState
}

// Event for full project sync
type SyncEvent struct {
ProjectKey string
AllFlagsState FlagsState
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This got renamed from FlagsState to AllFlagsState just to add a smidge of clarity

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! It was very confusing.

}
Comment on lines +1 to +14
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulled these into a catalog file of events.

13 changes: 3 additions & 10 deletions internal/dev_server/model/override.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ type Override struct {
Version int
}

type UpsertOverrideEvent struct {
FlagKey string
ProjectKey string
FlagState FlagState
}

func UpsertOverride(ctx context.Context, projectKey, flagKey string, value ldvalue.Value) (Override, error) {
// TODO: validate if the flag type matches

Expand All @@ -31,7 +25,7 @@ func UpsertOverride(ctx context.Context, projectKey, flagKey string, value ldval
}

var flagExists bool
for flag := range project.FlagState {
for flag := range project.AllFlagsState {
if flagKey == flag {
flagExists = true
break
Expand All @@ -54,9 +48,8 @@ func UpsertOverride(ctx context.Context, projectKey, flagKey string, value ldval
return Override{}, err
}

observers := GetObserversFromContext(ctx)
flagState := override.Apply(project.FlagState[flagKey])
observers.Notify(UpsertOverrideEvent{
flagState := override.Apply(project.AllFlagsState[flagKey])
GetObserversFromContext(ctx).Notify(UpsertOverrideEvent{
FlagKey: flagKey,
ProjectKey: projectKey,
FlagState: flagState,
Expand Down
8 changes: 4 additions & 4 deletions internal/dev_server/model/override_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func TestUpsertOverride(t *testing.T) {
}

project := &model.Project{
Key: projKey,
FlagState: model.FlagsState{flagKey: model.FlagState{Value: ldvalue.Bool(false), Version: 1}},
Key: projKey,
AllFlagsState: model.FlagsState{flagKey: model.FlagState{Value: ldvalue.Bool(false), Version: 1}},
}

ctx = model.ContextWithStore(ctx, store)
Expand All @@ -50,8 +50,8 @@ func TestUpsertOverride(t *testing.T) {

t.Run("Returns error if flag does not exist in project", func(t *testing.T) {
badProj := model.Project{
Key: projKey,
FlagState: model.FlagsState{},
Key: projKey,
AllFlagsState: model.FlagsState{},
}
store.EXPECT().GetDevProject(gomock.Any(), projKey).Return(&badProj, nil)

Expand Down
24 changes: 18 additions & 6 deletions internal/dev_server/model/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Project struct {
SourceEnvironmentKey string
Context ldcontext.Context
LastSyncTime time.Time
FlagState FlagsState
AllFlagsState FlagsState
}

// CreateProject creates a project and adds it to the database.
Expand All @@ -35,7 +35,7 @@ func CreateProject(ctx context.Context, projectKey, sourceEnvironmentKey string,
return Project{}, err
}

project.FlagState = flagsState
project.AllFlagsState = flagsState
project.LastSyncTime = time.Now()

store := StoreFromContext(ctx)
Expand Down Expand Up @@ -65,7 +65,7 @@ func UpdateProject(ctx context.Context, projectKey string, context *ldcontext.Co
if err != nil {
return Project{}, err
}
project.FlagState = flagsState
project.AllFlagsState = flagsState
project.LastSyncTime = time.Now()
}

Expand All @@ -76,6 +76,7 @@ func UpdateProject(ctx context.Context, projectKey string, context *ldcontext.Co
if !updated {
return Project{}, errors.New("Project not updated")
}

return *project, nil
}

Expand All @@ -90,15 +91,26 @@ func SyncProject(ctx context.Context, projectKey string) (Project, error) {
return Project{}, err
}

project.FlagState = flagsState
project.AllFlagsState = flagsState
project.LastSyncTime = time.Now()

updated, err := store.UpdateProject(ctx, *project)
if err != nil {
return Project{}, err
}
if !updated {
return Project{}, errors.New("Project not updated")
}

allFlagsWithOverrides, err := project.GetFlagStateWithOverridesForProject(ctx)
if err != nil {
return Project{}, errors.Wrapf(err, "unable to get overrides for project, %s", projectKey)
}

GetObserversFromContext(ctx).Notify(SyncEvent{
ProjectKey: project.Key,
AllFlagsState: allFlagsWithOverrides,
})
return *project, nil
}

Expand All @@ -108,8 +120,8 @@ func (p Project) GetFlagStateWithOverridesForProject(ctx context.Context) (Flags
if err != nil {
return FlagsState{}, errors.Wrapf(err, "unable to fetch overrides for project %s", p.Key)
}
withOverrides := make(FlagsState, len(p.FlagState))
for flagKey, flagState := range p.FlagState {
withOverrides := make(FlagsState, len(p.AllFlagsState))
for flagKey, flagState := range p.AllFlagsState {
if override, ok := overrides.GetFlag(flagKey); ok {
flagState = override.Apply(flagState)
}
Expand Down
19 changes: 15 additions & 4 deletions internal/dev_server/model/project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ func TestCreateProject(t *testing.T) {
Key: projKey,
SourceEnvironmentKey: sourceEnvKey,
Context: ldcontext.NewBuilder("user").Key("dev-environment").Build(),
FlagState: model.FromAllFlags(allFlags),
AllFlagsState: model.FromAllFlags(allFlags),
}

assert.Equal(t, expectedProj.Key, p.Key)
assert.Equal(t, expectedProj.SourceEnvironmentKey, p.SourceEnvironmentKey)
assert.Equal(t, expectedProj.Context, p.Context)
assert.Equal(t, expectedProj.FlagState, p.FlagState)
assert.Equal(t, expectedProj.AllFlagsState, p.AllFlagsState)
})
}

Expand Down Expand Up @@ -134,6 +134,11 @@ func TestSyncProject(t *testing.T) {
ctx := model.ContextWithStore(context.Background(), store)
ctx, api, sdk := adapters_mocks.WithMockApiAndSdk(ctx, mockController)

observer := mocks.NewMockObserver(mockController)
observers := model.NewObservers()
observers.RegisterObserver(observer)
ctx = model.SetObserversOnContext(ctx, observers)

proj := model.Project{
Key: "projKey",
SourceEnvironmentKey: "srcEnvKey",
Expand Down Expand Up @@ -183,6 +188,12 @@ func TestSyncProject(t *testing.T) {
api.EXPECT().GetSdkKey(gomock.Any(), proj.Key, proj.SourceEnvironmentKey).Return("sdkKey", nil)
sdk.EXPECT().GetAllFlagsState(gomock.Any(), gomock.Any(), "sdkKey").Return(flagstate.AllFlags{}, nil)
store.EXPECT().UpdateProject(gomock.Any(), gomock.Any()).Return(true, nil)
observer.
EXPECT().
Handle(model.SyncEvent{
ProjectKey: proj.Key,
AllFlagsState: model.FlagsState{},
})

project, err := model.SyncProject(ctx, proj.Key)
assert.Nil(t, err)
Expand All @@ -196,8 +207,8 @@ func TestGetFlagStateWithOverridesForProject(t *testing.T) {
ctx := model.ContextWithStore(context.Background(), store)
flagKey := "flg"
proj := model.Project{
Key: "projKey",
FlagState: model.FlagsState{flagKey: model.FlagState{Value: ldvalue.Bool(false), Version: 1}},
Key: "projKey",
AllFlagsState: model.FlagsState{flagKey: model.FlagState{Value: ldvalue.Bool(false), Version: 1}},
}

t.Run("Returns error if store fetch fails", func(t *testing.T) {
Expand Down
37 changes: 36 additions & 1 deletion internal/dev_server/sdk/go_sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/launchdarkly/go-sdk-common/v3/ldcontext"
"github.com/launchdarkly/go-sdk-common/v3/ldvalue"
ldclient "github.com/launchdarkly/go-server-sdk/v7"
"github.com/launchdarkly/go-server-sdk/v7/interfaces"
"github.com/launchdarkly/go-server-sdk/v7/interfaces/flagstate"
"github.com/launchdarkly/ldcli/internal/dev_server/adapters/mocks"
"github.com/launchdarkly/ldcli/internal/dev_server/db"
Expand Down Expand Up @@ -106,8 +107,42 @@ func TestSDKRoutesViaGoSDK(t *testing.T) {
assert.Equal(t, map[string]any{"cat": "hat"}, val.AsArbitraryValue())
})

// Mock scenario: we re-sync and the SDK returns new values and higher version numbers
updatedFlags := flagstate.NewAllFlagsBuilder().
AddFlag("boolFlag", flagstate.FlagState{Value: ldvalue.Bool(false), Version: 2}).
AddFlag("stringFlag", flagstate.FlagState{Value: ldvalue.String("pool"), Version: 2}).
AddFlag("intFlag", flagstate.FlagState{Value: ldvalue.Int(789), Version: 2}).
AddFlag("doubleFlag", flagstate.FlagState{Value: ldvalue.Float64(101.01), Version: 2}).
AddFlag("jsonFlag", flagstate.FlagState{Value: ldvalue.CopyArbitraryValue(map[string]any{"cat": "bababooey"}), Version: 2}).
Build()
valuesMap := updatedFlags.ToValuesMap()

sdk.EXPECT().GetAllFlagsState(gomock.Any(), gomock.Any(), testSdkKey).Return(updatedFlags, nil)

// This test is testing the "put" payload in a roundabout way by verifying each of the flags are in there.
tvarney13 marked this conversation as resolved.
Show resolved Hide resolved
t.Run("Sync sends full flag payload for project", func(t *testing.T) {
trackers := make(map[string]<-chan interfaces.FlagValueChangeEvent, len(valuesMap))

for flagKey := range valuesMap {
flagUpdateChan := ld.GetFlagTracker().AddFlagValueChangeListener(flagKey, ldContext, ldvalue.String("uh-oh"))
defer ld.GetFlagTracker().RemoveFlagValueChangeListener(flagUpdateChan)
trackers[flagKey] = flagUpdateChan
}

_, err := model.SyncProject(ctx, projectKey)
require.NoError(t, err)

for flagKey, value := range valuesMap {
updateTracker, ok := trackers[flagKey]
require.True(t, ok)

update := <-updateTracker
assert.Equal(t, value.AsArbitraryValue(), update.NewValue.AsArbitraryValue())
}
})

updates := map[string]ldvalue.Value{
"boolFlag": ldvalue.Bool(false),
"boolFlag": ldvalue.Bool(true),
"stringFlag": ldvalue.String("drool"),
"intFlag": ldvalue.Int(456),
"doubleFlag": ldvalue.Float64(88.88),
Expand Down
29 changes: 22 additions & 7 deletions internal/dev_server/sdk/stream_client_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ func StreamClientFlags(w http.ResponseWriter, r *http.Request) {
WriteError(ctx, w, errors.Wrap(err, "failed to marshal flag state"))
return
}
updateChan, doneChan := OpenStream(w, r.Context().Done(), Message{"put", jsonBody})
updateChan, doneChan := OpenStream(
w,
r.Context().Done(),
Message{Event: TYPE_PUT, Data: jsonBody},
)
defer close(updateChan)
projectKey := GetProjectKeyFromContext(ctx)
observer := clientFlagsObserver{updateChan, projectKey}
Expand Down Expand Up @@ -50,23 +54,34 @@ func (c clientFlagsObserver) Handle(event interface{}) {
log.Printf("clientFlagsObserver: handling flag state event: %v", event)
switch event := event.(type) {
case model.UpsertOverrideEvent:
data, err := json.Marshal(clientSidePatchData{
err := SendMessage(c.updateChan, TYPE_PATCH, clientFlag{
Key: event.FlagKey,
Version: event.FlagState.Version,
Value: event.FlagState.Value,
})
if err != nil {
panic(errors.Wrap(err, "failed to marshal flag state in observer"))
}
c.updateChan <- Message{
Event: "patch",
Data: data,
case model.SyncEvent:
clientFlags := clientFlags{}
for flagKey, flagState := range event.AllFlagsState {
clientFlags[flagKey] = clientFlag{
Version: flagState.Version,
Value: flagState.Value,
}
}

err := SendMessage(c.updateChan, TYPE_PUT, clientFlags)
if err != nil {
panic(errors.Wrap(err, "failed to marshal flag state in observer"))
Comment on lines +65 to +76
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
}

type clientSidePatchData struct {
Key string `json:"key"`
type clientFlag struct {
Key string `json:"key,omitempty"`
Version int `json:"version"`
Value ldvalue.Value `json:"value"`
}

type clientFlags map[string]clientFlag
Comment on lines +81 to +87
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Declaring a new struct didn't feel super useful at the moment vs using omitempty. If we see significant delta between different event bodies some bifurcation would become necessary but for now 🤷

Loading