diff --git a/docusaurus/docs/develop/packages/cache.md b/docusaurus/docs/develop/packages/cache.md new file mode 100644 index 000000000..3c919df6f --- /dev/null +++ b/docusaurus/docs/develop/packages/cache.md @@ -0,0 +1,58 @@ +--- +title: Cache Package +sidebar_position: 4 +--- + +## `pocket/pkg/cache` Package + +// TODO_DOCUMENT(@bryanchriswhite): Add more detailed documentation. + +```mermaid +--- +title: Legend +--- + +classDiagram-v2 + + class GenericInterface__T__any { + <> + GenericMethod() T + } + + class Implemenetation { + ExportedField FieldType + unexportedField FieldType + } + + Implemenetation --|> GenericInterface__T__any: implements +``` + +```mermaid +--- +title: Cache Components +--- + +classDiagram-v2 + + +class KeyValueCache__T__any { + <> + Get(key string) (value T, isCached bool) + Set(key string, value T) + Delete(key string) + Clear() +} + +class HistoricalKeyValueCache__T__any { + <> + GetLatestVersion(key string) (value T, isCached bool) + GetVersion(key string, version int64) (value T, isCached bool) + SetVersion(key string, value T, version int64) (err error) +} + +class keyValueCache__T__any:::cacheImpl +keyValueCache__T__any --|> KeyValueCache__T__any + +class historicalKeyValueCache__T__any +historicalKeyValueCache__T__any --|> HistoricalKeyValueCache__T__any +``` diff --git a/pkg/cache/errors.go b/pkg/cache/errors.go new file mode 100644 index 000000000..e38ff1ffc --- /dev/null +++ b/pkg/cache/errors.go @@ -0,0 +1,11 @@ +package cache + +import "cosmossdk.io/errors" + +const codespace = "cache" + +var ( + ErrKeyValueCacheConfigValidation = errors.Register(codespace, 1, "invalid query cache config") + ErrCacheInternal = errors.Register(codespace, 2, "cache internal error") + ErrNoOverwrite = errors.Register(codespace, 3, "refusing to overwrite existing value") +) diff --git a/pkg/cache/interface.go b/pkg/cache/interface.go new file mode 100644 index 000000000..da7e0e74e --- /dev/null +++ b/pkg/cache/interface.go @@ -0,0 +1,24 @@ +package cache + +// KeyValueCache is a key/value store style interface for a cache of a single type. +// It is intended to be used to cache arbitrary data, where each key uniquely indexes +// the most recently observed version of the data associated that key. +type KeyValueCache[T any] interface { + Get(key string) (T, bool) + Set(key string, value T) + Delete(key string) + Clear() +} + +// HistoricalKeyValueCache is a key/value store style interface for a cache of a single type. +// It is intended to be used to cache arbitrary data, where each key uniquely indexes +// a mapping of version numbers to values corresponding to the historical values of the data +// associated that key. +type HistoricalKeyValueCache[T any] interface { + // GetLatestVersion returns the value of the latest version for the given key. + GetLatestVersion(key string) (T, bool) + // GetVersion retrieves the nearest value <= the specified version number. + GetVersion(key string, version int64) (T, bool) + // SetVersion adds or updates a value at a specific version number. + SetVersion(key string, value T, version int64) error +} diff --git a/pkg/cache/memory/config.go b/pkg/cache/memory/config.go new file mode 100644 index 000000000..f4720ab51 --- /dev/null +++ b/pkg/cache/memory/config.go @@ -0,0 +1,160 @@ +package memory + +import ( + "fmt" + "time" + + "github.com/pokt-network/poktroll/pkg/cache" +) + +// EvictionPolicy determines which values are removed when number of keys in the cache reaches maxKeys. +type EvictionPolicy int64 + +const ( + FirstInFirstOut = EvictionPolicy(iota) + LeastRecentlyUsed + LeastFrequentlyUsed +) + +var ( + DefaultKeyValueCacheConfig = keyValueCacheConfig{ + evictionPolicy: FirstInFirstOut, + // TODO_MAINNET(@bryanchriswhite): Consider how we can "guarantee" good + // alignment between the TTL and the block production rate, + // by accessing onchain block times directly. + ttl: time.Minute, + } + DefaultHistoricalKeyValueCacheConfig = historicalKeyValueCacheConfig{ + keyValueCacheConfig: DefaultKeyValueCacheConfig, + maxVersionAge: 10, + } +) + +// keyValueCacheConfig is the configuration for constructing a keyValueCache. +// It is intended to be configured via KeyValueCacheOptionFn functions. +type keyValueCacheConfig struct { + // maxKeys is the maximum number of key/value pairs the cache can + // hold before it starts evicting. + maxKeys int64 + + // TODO_CONSIDERATION: + // + // maxValueSize is the maximum cumulative size of all values in the cache. + // maxValueSize int64 + // maxCacheSize is the maximum cumulative size of all keys AND values in the cache. + // maxCacheSize int64 + + // evictionPolicy determines which values are removed when number of keys in the cache reaches maxKeys. + evictionPolicy EvictionPolicy + + // ttl is how long values should remain valid in the cache. Items older than the + // ttl MAY NOT be evicted immediately, but are NEVER considered as cache hits. + ttl time.Duration +} + +// historicalKeyValueCacheConfig is the configuration for constructing a historicalKeyValueCache. +// It is intended to be configured via KeyValueCacheOptionFn functions. +type historicalKeyValueCacheConfig struct { + keyValueCacheConfig + + // maxVersionAge is the max difference between the latest known version and + // any other version, below which value versions are retained, and above which + // value versions are pruned. + // E.g.: Given a latest version of 100, and a maxVersionAge of 10, then the + // oldest version that is not pruned is 90 (100 - 10). + // If 0, no historical pruning is performed. + maxVersionAge int64 +} + +// KeyValueCacheOptionFn is a function which receives a keyValueCacheConfig for configuration. +type KeyValueCacheOptionFn func(keyValueConfigI) error + +// Validate ensures that the keyValueCacheConfig isn't configured with incompatible options. +func (cfg *keyValueCacheConfig) Validate() error { + switch cfg.evictionPolicy { + case FirstInFirstOut: + // TODO_IMPROVE: support LeastRecentlyUsed and LeastFrequentlyUsed policies. + default: + return cache.ErrKeyValueCacheConfigValidation.Wrapf("eviction policy %d not imlemented", cfg.evictionPolicy) + } + return nil +} + +// Validate ensures that the historicalKeyValueCacheConfig isn't configured with incompatible options. +func (cfg *historicalKeyValueCacheConfig) Validate() error { + if err := cfg.keyValueCacheConfig.Validate(); err != nil { + return err + } + + if cfg.maxVersionAge < 0 { + return cache.ErrKeyValueCacheConfigValidation.Wrapf("maxVersionAge MUST be >= 0, got: %d", cfg.maxVersionAge) + } + + return nil +} + +// WithMaxKeys sets the maximum number of distinct key/value pairs the cache will +// hold before evicting according to the configured eviction policy. +func WithMaxKeys(maxKeys int64) KeyValueCacheOptionFn { + return func(cfg keyValueConfigI) error { + cfg.SetMaxKeys(maxKeys) + return nil + } +} + +// WithEvictionPolicy sets the eviction policy. +func WithEvictionPolicy(policy EvictionPolicy) KeyValueCacheOptionFn { + return func(cfg keyValueConfigI) error { + cfg.SetEvictionPolicy(policy) + return nil + } +} + +// WithTTL sets the time-to-live for cached values. Values older than the TTL +// MAY NOT be evicted immediately, but are NEVER considered as cache hits. +// NOTE: TTL is ignored by the HistoricalKeyValueCache. +func WithTTL(ttl time.Duration) KeyValueCacheOptionFn { + return func(cfg keyValueConfigI) error { + cfg.SetTTL(ttl) + return nil + } +} + +// WithMaxVersionAge sets the given maxVersionAge on the configuration; if 0, no historical pruning is performed. +// It can ONLY be used in the context of a HistoricalKeyValueCache. +func WithMaxVersionAge(numRetainedVersions int64) KeyValueCacheOptionFn { + return func(cfg keyValueConfigI) error { + histCfg, ok := cfg.(*historicalKeyValueCacheConfig) + if !ok { + return fmt.Errorf("unexpected cache config type, expected %T, got: %T", histCfg, cfg) + } + + histCfg.maxVersionAge = numRetainedVersions + return nil + } +} + +// keyValueConfigI is an interface which is implemented by the keyValueCacheConfig. +// It is also embedded in historicalKeyValueCacheConfig so that the two constructors +// can reuse these common configuration fields, while allowing extension of the config +// struct in the historical key/value (and/or other) cache(s). +type keyValueConfigI interface { + SetMaxKeys(maxKeys int64) + SetEvictionPolicy(policy EvictionPolicy) + SetTTL(ttl time.Duration) +} + +func (cfg *keyValueCacheConfig) SetMaxKeys(maxKeys int64) { + cfg.maxKeys = maxKeys +} + +func (cfg *keyValueCacheConfig) SetEvictionPolicy(policy EvictionPolicy) { + cfg.evictionPolicy = policy +} + +func (cfg *keyValueCacheConfig) SetTTL(ttl time.Duration) { + cfg.ttl = ttl +} +func (cfg *keyValueCacheConfig) GetTTL() time.Duration { + return cfg.ttl +} diff --git a/pkg/cache/memory/historical_kvcache.go b/pkg/cache/memory/historical_kvcache.go new file mode 100644 index 000000000..61dd7bb79 --- /dev/null +++ b/pkg/cache/memory/historical_kvcache.go @@ -0,0 +1,246 @@ +package memory + +import ( + "fmt" + "sort" + "sync" + "time" + + "github.com/pokt-network/poktroll/pkg/cache" +) + +var _ cache.HistoricalKeyValueCache[any] = (*historicalKeyValueCache[any])(nil) + +// historicalKeyValueCache provides a concurrency-safe in-memory cache implementation +// with support for tracking multiple value versions for a given key. +type historicalKeyValueCache[T any] struct { + config historicalKeyValueCacheConfig + + // valuesMu is used to protect values AND valueHistories from concurrent access. + valuesMu sync.RWMutex + // valueHistories holds the cached historical values. + valueHistories map[string]cacheValueHistory[T] +} + +// cacheValueHistory maintains: +// - Cached values indexed by version number +// - A descending-sorted list of version numbers for existing cached values +// +// The descending sort order optimizes performance by correlating index with age. +type cacheValueHistory[T any] struct { + // sortedDescVersions is a list of the version numbers for which values are + // cached. It is sorted in descending order. + sortedDescVersions []int64 + // versionToValueMap is a map from a version number to the cached value at + // that version number, if present. + versionToValueMap map[int64]cacheValue[T] +} + +// NewHistoricalKeyValueCache creates a new historicalKeyValueCache with the +// configuration generated by the given option functions. +func NewHistoricalKeyValueCache[T any](opts ...KeyValueCacheOptionFn) (*historicalKeyValueCache[T], error) { + config := DefaultHistoricalKeyValueCacheConfig + + for _, opt := range opts { + if err := opt(&config); err != nil { + return nil, err + } + } + + if err := config.Validate(); err != nil { + return nil, err + } + + return &historicalKeyValueCache[T]{ + valueHistories: make(map[string]cacheValueHistory[T]), + config: config, + }, nil +} + +// GetVersion retrieves the value from the cache with the given key, as of the +// given version. If a value is not found for that version, the value at the nearest +// previous version is returned. +func (c *historicalKeyValueCache[T]) GetVersion(key string, version int64) (T, bool) { + c.valuesMu.RLock() + defer c.valuesMu.RUnlock() + + return c.getVersion(key, version) +} + +// getVersion retrieves the value from the cache with the given key, as of the +// given version. If a value is not found for that version, the value at the nearest +// previous version is returned. +// It is NOT safe to call concurrently; i.e., the caller MUST hold the valuesMu lock. +func (c *historicalKeyValueCache[T]) getVersion(key string, version int64) (T, bool) { + var zero T + valueHistory, exists := c.valueHistories[key] + if !exists { + return zero, false + } + + var nearestCachedVersion int64 = -1 + for _, cachedVersion := range valueHistory.sortedDescVersions { + if cachedVersion <= version { + nearestCachedVersion = cachedVersion + // DEV_NOTE: Since the list is sorted in descending order, once we + // encounter a cachedVersion that is less than or equal to version, + // all subsequent cachedVersions SHOULD also be less than or equal to + // version. + break + } + } + + if nearestCachedVersion == -1 { + return zero, false + } + + cachedValue, exists := valueHistory.versionToValueMap[nearestCachedVersion] + if !exists { + // DEV_NOTE: This SHOULD NEVER happen. If it does, it means that the cache has been corrupted. + return zero, false + } + + return cachedValue.value, true +} + +// GetLatestVersion returns the value of the latest version of the given key. +func (c *historicalKeyValueCache[T]) GetLatestVersion(key string) (T, bool) { + c.valuesMu.RLock() + defer c.valuesMu.RUnlock() + + var zero T + version := c.getLatestVersionNumber(key) + if version == -1 { + return zero, false + } + + return c.getVersion(key, version) +} + +// SetVersion adds or updates the historical value in the cache for the given key, +// and at the version number. +func (c *historicalKeyValueCache[T]) SetVersion(key string, value T, version int64) error { + c.valuesMu.Lock() + defer c.valuesMu.Unlock() + + latestVersion := max(version, c.getLatestVersionNumber(key)) + if version > latestVersion { + latestVersion = version + } + + valueHistory, exists := c.valueHistories[key] + if !exists { + versionToValueMap := make(map[int64]cacheValue[T]) + valueHistory = cacheValueHistory[T]{ + sortedDescVersions: make([]int64, 0), + versionToValueMap: versionToValueMap, + } + } + + if _, versionExists := valueHistory.versionToValueMap[version]; versionExists { + return cache.ErrNoOverwrite.Wrapf("version: %d", version) + } + + // Update sortedDescVersions and ensure the list is sorted in descending order. + valueHistory.sortedDescVersions = append(valueHistory.sortedDescVersions, version) + sort.Slice(valueHistory.sortedDescVersions, func(i, j int) bool { + return valueHistory.sortedDescVersions[i] > valueHistory.sortedDescVersions[j] + }) + + // Prune historical values for this key, where the version + // is older than the configured maxVersionAge. + if c.config.maxVersionAge > 0 { + lenCachedVersions := int64(len(valueHistory.sortedDescVersions)) + for versionIdx := lenCachedVersions - 1; versionIdx >= 0; versionIdx-- { + cachedVersion := valueHistory.sortedDescVersions[versionIdx] + + // DEV_NOTE: Since the list is sorted, and we're iterating from lowest + // (oldest) to highest (newest) version, once we encounter a cachedVersion + // that is newer than the configured maxVersionAge, ALL subsequent + // heights SHOULD also be newer than the configured maxVersionAge. + cachedVersionAge := latestVersion - cachedVersion + if cachedVersionAge <= c.config.maxVersionAge { + valueHistory.sortedDescVersions = valueHistory.sortedDescVersions[:versionIdx+1] + break + } + + delete(valueHistory.versionToValueMap, cachedVersion) + } + } + + valueHistory.versionToValueMap[version] = cacheValue[T]{ + value: value, + cachedAt: time.Now(), + } + + c.valueHistories[key] = valueHistory + + // Evict after adding the new key/value. + if err := c.evict(); err != nil { + return err + } + + return nil +} + +// evict removes one value (and all its versions) from the cache, +// to make space for a new one, according to the configured eviction policy. +func (c *historicalKeyValueCache[T]) evict() error { + isMaxKeysConfigured := c.config.maxKeys > 0 + cacheMaxKeysReached := int64(len(c.valueHistories)) > c.config.maxKeys + if !isMaxKeysConfigured || !cacheMaxKeysReached { + return nil + } + + switch c.config.evictionPolicy { + case FirstInFirstOut: + var ( + first = true + oldestKey string + oldestTime time.Time + ) + for key, valueHistory := range c.valueHistories { + latestVersion := valueHistory.sortedDescVersions[0] + value, exists := valueHistory.versionToValueMap[latestVersion] + if !exists { + return cache.ErrCacheInternal.Wrapf( + "expected value history for key %s to contain version %d but it did not 💣", + key, latestVersion, + ) + } + + if first || value.cachedAt.Before(oldestTime) { + oldestKey = key + oldestTime = value.cachedAt + } + first = false + } + delete(c.valueHistories, oldestKey) + return nil + + case LeastRecentlyUsed: + // TODO_IMPROVE: Implement LRU eviction + // This will require tracking access times + panic("LRU eviction not implemented") + + case LeastFrequentlyUsed: + // TODO_IMPROVE: Implement LFU eviction + // This will require tracking access times + panic("LFU eviction not implemented") + + default: + // DEV_NOTE: This SHOULD NEVER happen, KeyValueCacheConfig#Validate, SHOULD prevent it. + panic(fmt.Sprintf("unsupported eviction policy: %d", c.config.evictionPolicy)) + } +} + +// getLatestVersionNumber returns the latest version number (not the value) of the given key. +// It is NOT safe to call concurrently; i.e., the caller MUST hold the valuesMu lock. +func (c *historicalKeyValueCache[T]) getLatestVersionNumber(key string) int64 { + valueHistory, exists := c.valueHistories[key] + if !exists { + return -1 + } + + return valueHistory.sortedDescVersions[0] +} diff --git a/pkg/cache/memory/historical_kvcache_test.go b/pkg/cache/memory/historical_kvcache_test.go new file mode 100644 index 000000000..50171b0c6 --- /dev/null +++ b/pkg/cache/memory/historical_kvcache_test.go @@ -0,0 +1,235 @@ +package memory + +import ( + "context" + "fmt" + "strconv" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/pokt-network/poktroll/pkg/cache" +) + +// TestMemoryHistoricalKeyValueCache exercises the historical key/value cache functionality. +func TestMemoryHistoricalKeyValueCache(t *testing.T) { + t.Run("basic operations", func(t *testing.T) { + kvcache, err := NewHistoricalKeyValueCache[string]( + WithMaxVersionAge(100), + ) + require.NoError(t, err) + + // Test SetVersion and GetVersion + err = kvcache.SetVersion("key", "value1", 10) + require.NoError(t, err) + + // Test getting the latest version + latestVersion := kvcache.getLatestVersionNumber("key") + require.Equal(t, int64(10), latestVersion) + + // Test getting the latest value + latestValue, isCached := kvcache.GetLatestVersion("key") + require.True(t, isCached) + require.Equal(t, "value1", latestValue) + + // Update the latest version + err = kvcache.SetVersion("key", "value2", 20) + require.NoError(t, err) + + // Test getting the latest version + latestVersion = kvcache.getLatestVersionNumber("key") + require.Equal(t, int64(20), latestVersion) + + // Test getting the latest value + latestValue, isCached = kvcache.GetLatestVersion("key") + require.True(t, isCached) + require.Equal(t, "value2", latestValue) + + // Test getting exact versions + val, isCached := kvcache.GetVersion("key", 10) + require.True(t, isCached) + require.Equal(t, "value1", val) + + val, isCached = kvcache.GetVersion("key", 20) + require.True(t, isCached) + require.Equal(t, "value2", val) + + // Test getting intermediate version (should return nearest lower version) + val, isCached = kvcache.GetVersion("key", 15) + require.True(t, isCached) + require.Equal(t, "value1", val) + + // Test getting version before first entry + _, isCached = kvcache.GetVersion("key", 5) + require.False(t, isCached) + + // Test getting version after last entry + val, isCached = kvcache.GetVersion("key", 25) + require.True(t, isCached) + require.Equal(t, "value2", val) + + // Test getting a version for a key that isn't cached + _, isCached = kvcache.GetVersion("key2", 20) + require.False(t, isCached) + }) + + t.Run("max keys eviction", func(t *testing.T) { + kvcache, err := NewHistoricalKeyValueCache[string](WithMaxKeys(2)) + require.NoError(t, err) + + // Add values up to max keys + err = kvcache.SetVersion("key1", "value1", 10) + require.NoError(t, err) + err = kvcache.SetVersion("key2", "value2", 20) + require.NoError(t, err) + + // Add one more value, should trigger eviction + err = kvcache.SetVersion("key3", "value3", 30) + require.NoError(t, err) + + // First value should be evicted + _, isCached := kvcache.GetVersion("key1", 10) + require.False(t, isCached) + + // Other values should still be present + val, isCached := kvcache.GetVersion("key2", 20) + require.True(t, isCached) + require.Equal(t, "value2", val) + + val, isCached = kvcache.GetVersion("key3", 30) + require.True(t, isCached) + require.Equal(t, "value3", val) + }) + + t.Run("historical cache ignores TTL expiration", func(t *testing.T) { + cache, err := NewHistoricalKeyValueCache[string]( + WithMaxVersionAge(100), + WithTTL(100*time.Millisecond), + ) + require.NoError(t, err) + + err = cache.SetVersion("key", "value1", 10) + require.NoError(t, err) + + // Value should be available immediately + val, isCached := cache.GetVersion("key", 10) + require.True(t, isCached) + require.Equal(t, "value1", val) + + // Wait for ttl to expire + time.Sleep(150 * time.Millisecond) + + // Value should now be expired + _, isCached = cache.GetVersion("key", 10) + require.True(t, isCached) + }) + + t.Run("pruning old versions", func(t *testing.T) { + cache, err := NewHistoricalKeyValueCache[string]( + WithMaxVersionAge(10), // Prune entries older than 10 versions + ) + require.NoError(t, err) + + // Add entries at different versions + err = cache.SetVersion("key", "value1", 10) + require.NoError(t, err) + err = cache.SetVersion("key", "value2", 20) + require.NoError(t, err) + err = cache.SetVersion("key", "value3", 30) + require.NoError(t, err) + + // Add a new entry that should trigger pruning + err = cache.SetVersion("key", "value4", 40) + require.NoError(t, err) + + // Entries more than 10 blocks old should be pruned + _, isCached := cache.GetVersion("key", 10) + require.False(t, isCached) + _, isCached = cache.GetVersion("key", 20) + require.False(t, isCached) + + // Recent entries should still be available + val, isCached := cache.GetVersion("key", 30) + require.True(t, isCached) + require.Equal(t, "value3", val) + + val, isCached = cache.GetVersion("key", 40) + require.True(t, isCached) + require.Equal(t, "value4", val) + }) + + t.Run("cannot update existing versions", func(t *testing.T) { + kvcache, err := NewHistoricalKeyValueCache[string]( + WithMaxVersionAge(100), + ) + require.NoError(t, err) + + // Add entries at different versions + err = kvcache.SetVersion("key", "value1", 10) + require.NoError(t, err) + err = kvcache.SetVersion("key", "value2", 20) + require.NoError(t, err) + + // Attempt to update an existing version + err = kvcache.SetVersion("key", "value3", 10) + require.EqualError(t, err, cache.ErrNoOverwrite.Wrapf("version: %d", 10).Error()) + }) +} + +// TestHistoricalKeyValueCache_ConcurrentAccess exercises thread safety of the cache +func TestHistoricalKeyValueCache_ConcurrentAccess(t *testing.T) { + kvcache, err := NewHistoricalKeyValueCache[int]( + WithMaxVersionAge(10000), + WithMaxKeys(100), + ) + require.NoError(t, err) + + const numGoroutines = 10 + const numOpsPerGoRoutine = 100 + + // Create a context with timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func(i int) { + defer wg.Done() + for j := 0; j < numOpsPerGoRoutine; j++ { + // DEV_NOTE: versions MUST be unique per op per goroutine. + version, _ := strconv.Atoi(fmt.Sprintf("%d%d", i*10, j)) + + select { + case <-ctx.Done(): + return + default: + key := "key" + err = kvcache.SetVersion(key, i+j, int64(version)) + require.NoError(t, err) + + val, isCached := kvcache.GetVersion(key, int64(version)) + require.True(t, isCached) + require.Equal(t, i+j, val) + } + } + }(i) + } + + // Wait for waitgroup with timeout. + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-ctx.Done(): + t.Errorf("test timed out waiting for goroutines to complete: %+v", ctx.Err()) + case <-done: + t.Log("test completed successfully") + } +} diff --git a/pkg/cache/memory/kvcache.go b/pkg/cache/memory/kvcache.go new file mode 100644 index 000000000..f2990d3f3 --- /dev/null +++ b/pkg/cache/memory/kvcache.go @@ -0,0 +1,146 @@ +package memory + +import ( + "fmt" + "sync" + "time" + + "github.com/pokt-network/poktroll/pkg/cache" +) + +var _ cache.KeyValueCache[any] = (*keyValueCache[any])(nil) + +// keyValueCache provides a concurrency-safe in-memory key/value cache implementation. +type keyValueCache[T any] struct { + config keyValueCacheConfig + + // valuesMu is used to protect values AND valueHistories from concurrent access. + valuesMu sync.RWMutex + // values holds the cached values. + values map[string]cacheValue[T] +} + +// cacheValue wraps cached values with a cachedAt for later comparison against +// the configured TTL. +type cacheValue[T any] struct { + value T + cachedAt time.Time +} + +// NewKeyValueCache creates a new keyValueCache with the configuration generated +// by the given option functions. +func NewKeyValueCache[T any](opts ...KeyValueCacheOptionFn) (*keyValueCache[T], error) { + config := DefaultKeyValueCacheConfig + + for _, opt := range opts { + if err := opt(&config); err != nil { + return nil, err + } + } + + if err := config.Validate(); err != nil { + return nil, err + } + + return &keyValueCache[T]{ + values: make(map[string]cacheValue[T]), + config: config, + }, nil +} + +// Get retrieves the value from the cache with the given key. +func (c *keyValueCache[T]) Get(key string) (T, bool) { + var zero T + c.valuesMu.RLock() + defer c.valuesMu.RUnlock() + + cachedValue, exists := c.values[key] + if !exists { + return zero, false + } + + isTTLEnabled := c.config.ttl > 0 + isCacheValueExpired := time.Since(cachedValue.cachedAt) > c.config.ttl + if isTTLEnabled && isCacheValueExpired { + // DEV_NOTE: Not pruning here to optimize concurrent speed: + // - Read lock alone would be insufficient for pruning + // - Next Set() call will overwrite the value + // - If values aren't subsequently set, maxKeys config will eventually trigger + // pruning of TTL-expired values + return zero, false + } + + return cachedValue.value, true +} + +// Set adds or updates the value in the cache for the given key. +func (c *keyValueCache[T]) Set(key string, value T) { + c.valuesMu.Lock() + defer c.valuesMu.Unlock() + + c.values[key] = cacheValue[T]{ + value: value, + cachedAt: time.Now(), + } + + // Evict after adding the new key/value. + c.evict() +} + +// Delete removes a value from the cache. +func (c *keyValueCache[T]) Delete(key string) { + c.valuesMu.Lock() + defer c.valuesMu.Unlock() + + delete(c.values, key) +} + +// Clear removes all values from the cache. +func (c *keyValueCache[T]) Clear() { + c.valuesMu.Lock() + defer c.valuesMu.Unlock() + + c.values = make(map[string]cacheValue[T]) +} + +// evict removes one item from the cache, to make space for a new one, +// according to the configured eviction policy. +func (c *keyValueCache[T]) evict() { + isMaxKeysConfigured := c.config.maxKeys > 0 + cacheMaxKeysReached := int64(len(c.values)) > c.config.maxKeys + if !isMaxKeysConfigured || !cacheMaxKeysReached { + return + } + + switch c.config.evictionPolicy { + case FirstInFirstOut: + var ( + first = true + oldestKey string + oldestTime time.Time + ) + for key, value := range c.values { + if first || value.cachedAt.Before(oldestTime) { + oldestKey = key + oldestTime = value.cachedAt + } + first = false + } + delete(c.values, oldestKey) + return + + // DEV_NOTE: The following cases SHOULD NEVER happen, KeyValueCacheConfig#Validate, SHOULD prevent it. + case LeastRecentlyUsed: + // TODO_IMPROVE: Implement LRU eviction + // This will require tracking access times + panic("LRU eviction not implemented") + + case LeastFrequentlyUsed: + // TODO_IMPROVE: Implement LFU eviction + // This will require tracking access times + panic("LFU eviction not implemented") + + default: + panic(fmt.Sprintf("unsupported eviction policy: %d", c.config.evictionPolicy)) + } +} diff --git a/pkg/cache/memory/kvcache_test.go b/pkg/cache/memory/kvcache_test.go new file mode 100644 index 000000000..fc0e20aae --- /dev/null +++ b/pkg/cache/memory/kvcache_test.go @@ -0,0 +1,154 @@ +package memory + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// TestMemoryKeyValueCache exercises the basic cache functionality. +func TestMemoryKeyValueCache(t *testing.T) { + t.Run("basic operations", func(t *testing.T) { + cache, err := NewKeyValueCache[string]() + require.NoError(t, err) + + // Test Set and Get + cache.Set("key1", "value1") + val, isCached := cache.Get("key1") + require.True(t, isCached) + require.Equal(t, "value1", val) + + // Test missing key + _, isCached = cache.Get("nonexistent") + require.False(t, isCached) + + // Test Delete + cache.Delete("key1") + _, isCached = cache.Get("key1") + require.False(t, isCached) + + // Test Clear + cache.Set("key2", "value2") + cache.Clear() + _, isCached = cache.Get("key2") + require.False(t, isCached) + }) + + t.Run("TTL expiration", func(t *testing.T) { + cache, err := NewKeyValueCache[string]( + WithTTL(100 * time.Millisecond), + ) + require.NoError(t, err) + + cache.Set("key", "value") + + // Value should be available immediately + val, isCached := cache.Get("key") + require.True(t, isCached) + require.Equal(t, "value", val) + + // Wait for TTL to expire + time.Sleep(150 * time.Millisecond) + + // Value should now be expired + _, isCached = cache.Get("key") + require.False(t, isCached) + }) + + t.Run("max keys eviction", func(t *testing.T) { + cache, err := NewKeyValueCache[string]( + WithMaxKeys(2), + WithEvictionPolicy(FirstInFirstOut), + ) + require.NoError(t, err) + + // Add values up to max keys + cache.Set("key1", "value1") + cache.Set("key2", "value2") + + // Add one more value, should trigger eviction + cache.Set("key3", "value3") + + // First value should be evicted + _, isCached := cache.Get("key1") + require.False(t, isCached) + + // Other values should still be present + val, isCached := cache.Get("key2") + require.True(t, isCached) + require.Equal(t, "value2", val) + + val, isCached = cache.Get("key3") + require.True(t, isCached) + require.Equal(t, "value3", val) + }) +} + +// TestKeyValueCache_ErrorCases exercises various error conditions +func TestKeyValueCache_ErrorCases(t *testing.T) { + t.Run("zero values", func(t *testing.T) { + cache, err := NewKeyValueCache[string]() + require.NoError(t, err) + + // Test with empty key + cache.Set("", "value") + val, isCached := cache.Get("") + require.True(t, isCached) + require.Equal(t, "value", val) + + // Test with empty value + cache.Set("key", "") + val, isCached = cache.Get("key") + require.True(t, isCached) + require.Equal(t, "", val) + }) +} + +// TestKeyValueCache_ConcurrentAccess exercises thread safety of the cache +func TestKeyValueCache_ConcurrentAccess(t *testing.T) { + cache, err := NewKeyValueCache[int]() + require.NoError(t, err) + + const numGoroutines = 10 + const numOperations = 100 + + // Create a context with timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + for j := 0; j < numOperations; j++ { + select { + case <-ctx.Done(): + return + default: + key := "key" + cache.Set(key, j) + _, _ = cache.Get(key) + } + } + }() + } + + // Wait for waitgroup with timeout. + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-ctx.Done(): + t.Errorf("test timed out waiting for workgroup to complete: %+v", ctx.Err()) + case <-done: + t.Log("test completed successfully") + } +}