Skip to content

Commit

Permalink
Cache missing keys
Browse files Browse the repository at this point in the history
  • Loading branch information
eduwercamacaro committed Feb 7, 2024
1 parent 07204f9 commit 9a6f38b
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

Expand Down Expand Up @@ -246,9 +245,6 @@ public KafkaStreamsServerImpl(LHServerConfig config) {
this.coreStreams = new KafkaStreams(
ServerTopology.initCoreTopology(config, this, metadataCache, taskQueueManager),
config.getCoreStreamsConfig());
for (StreamsMetadata metadata : this.coreStreams.metadataForAllStreamsClients()) {
metadata.stateStoreNames();
}
this.timerStreams = new KafkaStreams(ServerTopology.initTimerTopology(config), config.getTimerStreamsConfig());
this.healthService = new HealthService(config, coreStreams, timerStreams);
Executor networkThreadpool = Executors.newFixedThreadPool(config.getNumNetworkThreads());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ abstract class BaseStoreImpl extends ReadOnlyBaseStoreImpl implements BaseStore
@Override
public void put(Storeable<?> thing) {
String key = maybeAddTenantPrefix(thing.getFullStoreKey());
metadataCache.removeMissingKey(key);
nativeStore.put(key, new Bytes(thing.toBytes()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,17 @@ public <U extends Message, T extends Storeable<U>> T get(String storeKey, Class<
if (storedGetablePb != null) {
return LHSerializable.fromProto(storedGetablePb, cls, executionContext);
} else {
if (metadataCache.isMissingKey(keyToLookFor)) {
return null;
}
GeneratedMessageV3 stored = getFromNativeStore(keyToLookFor, cls);
if (stored instanceof StoredGetablePb storedGetable) {
metadataCache.maybeUpdateCache(keyToLookFor, storedGetable);
}
if (stored != null) {
return LHSerializable.fromProto(stored, cls, executionContext);
}
metadataCache.maybeStoreMissingKey(keyToLookFor);
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ public void process(final Record<String, Bytes> record) {
if (value != null) {
store.put(key, value);
metadataCache.updateCache(key, StoredGetablePb.parseFrom(value.get()));
metadataCache.removeMissingKey(key);
} else {
store.delete(key);
metadataCache.evictCache(key);
metadataCache.maybeStoreMissingKey(key);
}
} catch (InvalidProtocolBufferException e) {
log.error("unable to parse metadata object");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.littlehorse.common.proto.StoredGetablePb;
import io.littlehorse.sdk.common.exception.LHSerdeError;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -18,6 +19,8 @@ public class MetadataCache extends LHCache<String, StoredGetablePb> {
Pattern.compile(StoreableType.STORED_GETABLE_VALUE + "\\/(?<getableType>\\d+)\\/(?<key>.+)");
public static final int LATEST_VERSION = -1;

private final ConcurrentSkipListSet<String> emptyKeys = new ConcurrentSkipListSet<>();

private static Set<GetableClassEnum> allowedObjets = Set.of(
GetableClassEnum.TENANT, GetableClassEnum.WF_SPEC, GetableClassEnum.PRINCIPAL, GetableClassEnum.TASK_DEF);

Expand Down Expand Up @@ -48,6 +51,22 @@ private void evictOrUpdate(StoredGetablePb value, String cacheKey) throws LHSerd
}
}

public void maybeStoreMissingKey(String missingKey) {
if (isCacheableKey(missingKey)) {
emptyKeys.add(missingKey);
}
}

public boolean isMissingKey(String missingKey) {
return emptyKeys.contains(missingKey);
}

public void removeMissingKey(String missingKey) {
if (isCacheableKey(missingKey)) {
emptyKeys.remove(missingKey);
}
}

public boolean isCacheableKey(String key) {
Matcher keyMatcher = CACHEABLE_KEY_PATTERN.matcher(key);
Matcher withouTenantKeyMatcher = CACHEABLE_WITHOUT_TENANT_KEY_PATTERN.matcher(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,20 @@ public void shouldFindStoredGetableFromCache() {
Mockito.verify(nativeInMemoryStore, Mockito.times(1)).get(Mockito.anyString());
}

@Test
public void shouldCacheMissingValues() {
WfSpecModel wfSpec = TestUtil.wfSpec("my-wf-spec");
TenantScopedStore store = TenantScopedStore.newInstance(
nativeInMemoryStore, new TenantIdModel(tenantA), executionContext, metadataCache);
StoredGetable result = store.get(wfSpec.getObjectId().getStoreableKey(), StoredGetable.class);
Assertions.assertThat(result).isNull();
result = store.get(wfSpec.getObjectId().getStoreableKey(), StoredGetable.class);
Assertions.assertThat(result).isNull();
Mockito.verify(nativeInMemoryStore, Mockito.times(1)).get(Mockito.anyString());
}

@Test
public void shouldNotUseCacheForNonMetadataObjects() {
MetadataCache cache = new MetadataCache();
WfRunModel wfRun = TestUtil.wfRun(UUID.randomUUID().toString());
TenantScopedStore store = TenantScopedStore.newInstance(
nativeInMemoryStore, new TenantIdModel(tenantA), executionContext, metadataCache);
Expand Down

0 comments on commit 9a6f38b

Please sign in to comment.