diff --git a/src/HotChocolate/Core/src/Subscriptions.Redis/RedisPubSub.cs b/src/HotChocolate/Core/src/Subscriptions.Redis/RedisPubSub.cs index fe4328589fb..ce6a15a95ab 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Redis/RedisPubSub.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Redis/RedisPubSub.cs @@ -13,6 +13,8 @@ internal sealed class RedisPubSub : DefaultPubSub private readonly string _completed; private readonly int _topicBufferCapacity; private readonly TopicBufferFullMode _topicBufferFullMode; + private readonly string? _topicPrefix; + public RedisPubSub( IConnectionMultiplexer connection, IMessageSerializer serializer, @@ -25,6 +27,7 @@ public RedisPubSub( _topicBufferCapacity = options.TopicBufferCapacity; _topicBufferFullMode = options.TopicBufferFullMode; _completed = serializer.CompleteMessage; + _topicPrefix = options.TopicPrefix; } protected override async ValueTask OnSendAsync( @@ -37,7 +40,7 @@ protected override async ValueTask OnSendAsync( // The object returned from GetSubscriber is a cheap pass-thru object that does not need // to be stored. var subscriber = _connection.GetSubscriber(); - await subscriber.PublishAsync(formattedTopic, serialized).ConfigureAwait(false); + await subscriber.PublishAsync(GetPrefixedTopic(formattedTopic), serialized).ConfigureAwait(false); } protected override async ValueTask OnCompleteAsync(string formattedTopic) @@ -45,7 +48,7 @@ protected override async ValueTask OnCompleteAsync(string formattedTopic) // The object returned from GetSubscriber is a cheap pass-thru object that does not need // to be stored. var subscriber = _connection.GetSubscriber(); - await subscriber.PublishAsync(formattedTopic, _completed).ConfigureAwait(false); + await subscriber.PublishAsync(GetPrefixedTopic(formattedTopic), _completed).ConfigureAwait(false); } protected override DefaultTopic OnCreateTopic( @@ -53,10 +56,20 @@ protected override DefaultTopic OnCreateTopic( int? bufferCapacity, TopicBufferFullMode? bufferFullMode) => new RedisTopic( - formattedTopic, + GetPrefixedTopic(formattedTopic), _connection, _serializer, bufferCapacity ?? _topicBufferCapacity, bufferFullMode ?? _topicBufferFullMode, DiagnosticEvents); + + private string GetPrefixedTopic(string topic) + { + if (string.IsNullOrWhiteSpace(_topicPrefix)) + { + return topic; + } + + return $"{_topicPrefix}{topic}"; + } } diff --git a/src/HotChocolate/Core/test/Subscriptions.Redis.Tests/RedisTopicPrefixIntegrationTests.cs b/src/HotChocolate/Core/test/Subscriptions.Redis.Tests/RedisTopicPrefixIntegrationTests.cs new file mode 100644 index 00000000000..894c65b84bb --- /dev/null +++ b/src/HotChocolate/Core/test/Subscriptions.Redis.Tests/RedisTopicPrefixIntegrationTests.cs @@ -0,0 +1,69 @@ +using HotChocolate.Execution; +using HotChocolate.Execution.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Squadron; +using StackExchange.Redis; +using Xunit.Abstractions; + +namespace HotChocolate.Subscriptions.Redis; + +public class RedisTopicPrefixIntegrationTests(RedisResource redisResource, ITestOutputHelper output) + : SubscriptionIntegrationTestBase(output), IClassFixture +{ + private const string TopicPrefix = "prefix:"; + + [Fact] + public override Task Subscribe_Infer_Topic() + => base.Subscribe_Infer_Topic(); + + [Fact] + public override Task Subscribe_Static_Topic() + => base.Subscribe_Static_Topic(); + + [Fact] + public override Task Subscribe_Topic_With_Arguments() + => base.Subscribe_Topic_With_Arguments(); + + [Fact] + public override Task Subscribe_Topic_With_Arguments_2_Subscriber() + => base.Subscribe_Topic_With_Arguments_2_Subscriber(); + + [Fact] + public override Task Subscribe_Topic_With_Arguments_2_Topics() + => base.Subscribe_Topic_With_Arguments_2_Topics(); + + [Fact] + public override Task Subscribe_Topic_With_2_Arguments() + => base.Subscribe_Topic_With_2_Arguments(); + + [Fact] + public override Task Subscribe_And_Complete_Topic() + => base.Subscribe_And_Complete_Topic(); + + [Fact] + public override Task Subscribe_And_Complete_Topic_With_ValueTypeMessage() + => base.Subscribe_And_Complete_Topic_With_ValueTypeMessage(); + + [Fact] + public async Task Subscribe_Should_Create_Channel_With_Prefix() + { + using var cts = new CancellationTokenSource(Timeout); + await using var services = CreateServer(); + + await using var result = await services.ExecuteRequestAsync( + "subscription { onMessage }", + cancellationToken: cts.Token); + + var activeChannels = await GetActiveChannelsAsync(); + + Assert.Contains(activeChannels, channel => channel.ToString()!.StartsWith(TopicPrefix)); + } + + private async Task GetActiveChannelsAsync() + { + return (RedisResult[])(await redisResource.GetConnection().GetDatabase().ExecuteAsync("PUBSUB", "CHANNELS"))!; + } + + protected override void ConfigurePubSub(IRequestExecutorBuilder graphqlBuilder) + => graphqlBuilder.AddRedisSubscriptions(_ => redisResource.GetConnection(), new SubscriptionOptions { TopicPrefix = TopicPrefix }); +}