Skip to content

Commit

Permalink
fix: no need to specify —opvault to access secrets (#2071)
Browse files Browse the repository at this point in the history
Changes:
- Asynchronous providers are given a list of known entries when syncing
- 1Password provider uses these entries to know which vaults to sync
- Removed logic for whether an asynchronous provider was active. This
was there to make sure we didnt attempt syncing with 1Password for
projects that weren't using 1Password. This is now solved by
1PasswordProvider only syncing known vaults.
  • Loading branch information
matt2e authored Jul 15, 2024
1 parent 5b6d812 commit d012354
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 50 deletions.
50 changes: 30 additions & 20 deletions common/configuration/1password_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/TBD54566975/ftl/internal/exec"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/slices"
)

// OnePasswordProvider is a configuration provider that reads passwords from
Expand All @@ -41,35 +40,46 @@ func (o OnePasswordProvider) SyncInterval() time.Duration {
return time.Second * 10
}

func (o OnePasswordProvider) Sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error {
// Sync will fetch all secrets from the 1Password vault and store them in the values map.
// Do not just sync the o.Vault, instead find all vaults found in entries and sync them.
// If there are no entries, we should not attempt any access of 1Password.
func (o OnePasswordProvider) Sync(ctx context.Context, entries []Entry, values *xsync.MapOf[Ref, SyncedValue]) error {
logger := log.FromContext(ctx)
if o.Vault == "" {
return fmt.Errorf("1Password vault not set: use --opvault flag to specify the vault")
}
if err := checkOpBinary(); err != nil {
return err
}
full, err := o.getItem(ctx, o.Vault)
if err != nil {
return fmt.Errorf("get item failed: %w", err)
// find vaults
vaults := map[string]bool{}
for _, e := range entries {
vault := e.Accessor.Host
if vault == "" {
logger.Warnf("empty vault name for %s", e.Ref)
continue
}
vaults[e.Accessor.Host] = true
}

for _, field := range full.Fields {
ref, err := ParseRef(field.Label)
// get all secrets from all vaults
refs := map[Ref]bool{}
for vault := range vaults {
full, err := o.getItem(ctx, vault)
if err != nil {
logger.Warnf("invalid field label found in 1Password: %q", field.Label)
continue
return fmt.Errorf("get item failed: %w", err)
}
for _, field := range full.Fields {
ref, err := ParseRef(field.Label)
if err != nil {
logger.Warnf("invalid field label found in 1Password: %q", field.Label)
continue
}
refs[ref] = true
values.Store(ref, SyncedValue{
Value: []byte(field.Value),
})
}
values.Store(ref, SyncedValue{
Value: []byte(field.Value),
})
}

// delete old values
values.Range(func(ref Ref, _ SyncedValue) bool {
if _, ok := slices.Find(full.Fields, func(item entry) bool {
return item.Label == ref.String()
}); !ok {
if _, ok := refs[ref]; !ok {
values.Delete(ref)
}
return true
Expand Down
6 changes: 3 additions & 3 deletions common/configuration/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ type AsynchronousProvider[R Role] interface {
// SyncInterval() provides the expected time between syncs.
// If Sync() returns an error, sync will be retried with an exponential backoff.
//
// Sync is only called if the Router has keys referring to this provider.
// If the Router did have keys for this provider but removed them, one more round of sync is executed until Sync() will stop being called
Sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error
// Values should be updated by Sync().
// An array of known entries from the router is provided in case it is helpful, but the provider can store any values it wants.
Sync(ctx context.Context, entries []Entry, values *xsync.MapOf[Ref, SyncedValue]) error
}

type VersionToken any
Expand Down
2 changes: 1 addition & 1 deletion common/configuration/asm.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (a *ASM) SyncInterval() time.Duration {
return client.syncInterval()
}

func (a *ASM) Sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error {
func (a *ASM) Sync(ctx context.Context, entries []Entry, values *xsync.MapOf[Ref, SyncedValue]) error {
client, err := a.coordinator.Get()
if err != nil {
return fmt.Errorf("could not coordinate ASM: %w", err)
Expand Down
12 changes: 6 additions & 6 deletions common/configuration/asm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ func TestLeaderSync(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
sm, _, _, externalClient, clock, _ := setUp(ctx, t, None[Router[Secrets]]())
testClientSync(ctx, t, sm, externalClient, true, func(percentage float64) {
clock.Add(time.Duration(percentage) * asmLeaderSyncInterval)
clock.Add(time.Second * (time.Duration(asmLeaderSyncInterval.Seconds()*percentage) + 1.0))
if percentage == 1.0 {
time.Sleep(time.Second * 2)
time.Sleep(time.Second * 4)
}
})
}
Expand All @@ -202,15 +202,15 @@ func TestFollowerSync(t *testing.T) {

testClientSync(ctx, t, sm, externalClient, false, func(percentage float64) {
// sync leader
leaderClock.Add(time.Duration(percentage) * asmLeaderSyncInterval)
leaderClock.Add(time.Second * (time.Duration(asmLeaderSyncInterval.Seconds()*percentage) + 1.0))
if percentage == 1.0 {
time.Sleep(time.Second * 2)
time.Sleep(time.Second * 4)
}

// then sync follower
followerClock.Add(time.Duration(percentage) * asmFollowerSyncInterval)
followerClock.Add(time.Second * (time.Duration(asmFollowerSyncInterval.Seconds()*percentage) + 1.0))
if percentage == 1.0 {
time.Sleep(time.Second * 2)
time.Sleep(time.Second * 4)
}
})
}
Expand Down
25 changes: 5 additions & 20 deletions common/configuration/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,18 +157,18 @@ func (c *cache[R]) sync(ctx context.Context, clock clock.Clock) {
if len(providersToSync) == 0 {
continue
}
list, err := c.listProvider.List(ctx)
entries, err := c.listProvider.List(ctx)
if err != nil {
logger.Warnf("could not sync: could not get list: %v", err)
continue
}
for _, cp := range providersToSync {
_, hasValues := slices.Find(list, func(e Entry) bool {
entriesForProvider := slices.Filter(entries, func(e Entry) bool {
return ProviderKeyForAccessor(e.Accessor) == cp.provider.Key()
})
wg.Add(1)
go func(cp *cacheProvider[R]) {
cp.sync(ctx, clock, hasValues)
cp.sync(ctx, entriesForProvider, clock)
wg.Done()
}(cp)
}
Expand All @@ -191,11 +191,6 @@ type cacheProvider[R Role] struct {
provider AsynchronousProvider[R]
values *xsync.MapOf[Ref, SyncedValue]

// isActive is the provider is used by any value
// When inactive, the provider will not be synced. This helps avoid accessing resources that are not needed, like 1Password.
// When the provider stops being used, we do one final sync to ensure the cache is up-to-date
isActive bool

loaded chan bool // closed when values have been synced for the first time
loadedOnce *sync.Once // ensures we close the loaded channel only once

Expand Down Expand Up @@ -228,20 +223,11 @@ func (c *cacheProvider[R]) needsSync(clock clock.Clock) bool {
}

// sync executes sync on the provider and updates the cacheProvider sync state
func (c *cacheProvider[R]) sync(ctx context.Context, clock clock.Clock, hasValues bool) {
func (c *cacheProvider[R]) sync(ctx context.Context, entries []Entry, clock clock.Clock) {
logger := log.FromContext(ctx)

if !hasValues && !c.isActive {
// skip
c.loadedOnce.Do(func() {
// treat this as a successful initial sync, so we don't ever block trying to access this provider
close(c.loaded)
})
return
}

c.lastSyncAttempt = optional.Some(clock.Now())
err := c.provider.Sync(ctx, c.values)
err := c.provider.Sync(ctx, entries, c.values)
if err != nil {
logger.Errorf(err, "Error syncing %s", c.provider.Key())
if backoff, ok := c.currentBackoff.Get(); ok {
Expand All @@ -256,7 +242,6 @@ func (c *cacheProvider[R]) sync(ctx context.Context, clock clock.Clock, hasValue
c.loadedOnce.Do(func() {
close(c.loaded)
})
c.isActive = hasValues
}

// processEvent updates the cache after a value was set or deleted
Expand Down

0 comments on commit d012354

Please sign in to comment.