Skip to content

Commit

Permalink
flaky test
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed May 20, 2024
1 parent 58eedc0 commit 15b49b3
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,9 @@ public TopicProducer getOrCreate(
throw new RuntimeException(ex);
}
}

@Override
public void close() {
cache.invalidateAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import ai.langstream.api.runner.topics.TopicProducer;
import java.util.function.Supplier;

public interface TopicProducerCache {
public interface TopicProducerCache extends AutoCloseable {
record Key(
String tenant,
String application,
Expand All @@ -27,4 +27,7 @@ record Key(
String configString) {}

TopicProducer getOrCreate(Key key, Supplier<TopicProducer> topicProducerSupplier);

@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
*/
package ai.langstream.apigateway.gateways;

import ai.langstream.api.runner.topics.TopicProducer;
import ai.langstream.apigateway.MetricsNames;
import ai.langstream.apigateway.config.TopicProperties;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.binder.cache.GuavaCacheMetrics;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.function.Supplier;

@Configuration
public class TopicProducerCacheFactory {

Expand All @@ -34,7 +37,15 @@ public TopicProducerCache topicProducerCache(TopicProperties topicProperties) {
Metrics.globalRegistry, cache.getCache(), MetricsNames.TOPIC_PRODUCER_CACHE);
return cache;
} else {
return (key, topicProducerSupplier) -> topicProducerSupplier.get();
return new TopicProducerCache() {
@Override
public TopicProducer getOrCreate(Key key, Supplier<TopicProducer> topicProducerSupplier) {
return topicProducerSupplier.get();
}
@Override
public void close() {
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,6 @@ public void onDestroy() {
log.info("Shutting down WebSocket");
consumeThreadPool.shutdownNow();
clusterRuntimeRegistry.close();
topicProducerCache.close();
}
}

0 comments on commit 15b49b3

Please sign in to comment.