From 3a801a34c41dd0b3a9260f5d2df3af141d7c6982 Mon Sep 17 00:00:00 2001 From: Miguel Costa Date: Thu, 6 Feb 2025 11:27:00 +0000 Subject: [PATCH 1/4] fix(tests): fix ProduceNullMessageTest tests (#611) --- .../KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs | 2 +- tests/KafkaFlow.IntegrationTests/ProducerTest.cs | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs b/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs index 40fc634e7..61ddec33b 100644 --- a/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs +++ b/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs @@ -128,7 +128,7 @@ public static async Task AssertMessageAsync(byte[] message) public static async Task AssertNullMessageAsync() { var start = DateTime.Now; - while (!s_nullMessages.IsEmpty) + while (s_nullMessages.IsEmpty) { if (DateTime.Now.Subtract(start).Seconds > TimeoutSec) { diff --git a/tests/KafkaFlow.IntegrationTests/ProducerTest.cs b/tests/KafkaFlow.IntegrationTests/ProducerTest.cs index ca44e4e56..fd8006258 100644 --- a/tests/KafkaFlow.IntegrationTests/ProducerTest.cs +++ b/tests/KafkaFlow.IntegrationTests/ProducerTest.cs @@ -44,8 +44,6 @@ public async Task ProduceNullMessageTest() var producer = _provider.GetRequiredService>(); var key = Guid.NewGuid().ToString(); - MessageStorage.Clear(); - // Act await producer.ProduceAsync(key, Array.Empty()); From 891c48238a3b9e817b60df3c6d118bc62600ab53 Mon Sep 17 00:00:00 2001 From: Kiryl Labada Date: Mon, 3 Feb 2025 14:13:35 +0100 Subject: [PATCH 2/4] feat: enrich error log with partition and offset --- src/KafkaFlow/Consumers/ConsumerWorker.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index eba54afca..03829166f 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -162,6 +162,7 @@ private async Task ProcessMessageAsync(IMessageContext context, CancellationToke { context.Message, context.ConsumerContext.Topic, + context.ConsumerContext.TopicPartitionOffset, MessageKey = context.Message.Key, context.ConsumerContext.ConsumerName, }); From 1f2d89a2048425ed613ea38d895ddf421e02e68d Mon Sep 17 00:00:00 2001 From: Miguel Costa Date: Thu, 21 Nov 2024 01:46:37 +0000 Subject: [PATCH 3/4] feat: upgrade to dotnet8.0 --- .editorconfig | 46 ++++++--- .github/workflows/build.yml | 4 +- .github/workflows/deploy-website.yml | 10 +- .github/workflows/publish.yml | 4 +- .github/workflows/test-deploy-website.yml | 8 +- KafkaFlow.sln | 7 ++ docker-compose.yml | 16 ++-- .../KafkaFlow.Sample.BatchOperations.csproj | 46 ++++----- ...KafkaFlow.Sample.ConsumerThrottling.csproj | 48 +++++----- .../KafkaFlow.Sample.Dashboard.csproj | 2 +- .../KafkaFlow.Sample.FlowControl.csproj | 47 +++++----- .../KafkaFlow.Sample.OpenTelemetry.csproj | 42 ++++----- ...fkaFlow.Sample.PauseConsumerOnError.csproj | 36 +++---- .../KafkaFlow.Sample.SchemaRegistry.csproj | 52 +++++----- .../KafkaFlow.Sample.WebApi.csproj | 50 +++++----- .../KafkaFlow.Sample.WildcardConsumer.csproj | 41 ++++---- .../Program.cs | 4 +- .../KafkaFlow.Sample/KafkaFlow.Sample.csproj | 44 ++++----- src/Directory.Build.props | 2 +- .../Configuration/SaslOauthbearerMethod.cs | 17 ++-- .../KafkaFlow.Abstractions.csproj | 20 ++-- .../KafkaFlow.Admin.Dashboard.csproj | 16 ++-- .../KafkaFlow.Admin.WebApi.csproj | 28 +++--- src/KafkaFlow.Admin/KafkaFlow.Admin.csproj | 20 ++-- .../KafkaFlow.Compressor.Gzip.csproj | 19 ++-- .../KafkaFlow.Extensions.Hosting.csproj | 26 ++--- .../KafkaFlow.LogHandler.Console.csproj | 20 ++-- .../KafkaFlow.LogHandler.Microsoft.csproj | 28 +++--- ...aFlow.Microsoft.DependencyInjection.csproj | 26 ++--- .../ActivitySourceAccessor.cs | 13 ++- .../KafkaFlow.OpenTelemetry.csproj | 5 +- .../KafkaFlow.SchemaRegistry.csproj | 26 ++--- .../KafkaFlow.Serializer.JsonCore.csproj | 24 ++--- ...KafkaFlow.Serializer.NewtonsoftJson.csproj | 24 ++--- .../KafkaFlow.Serializer.ProtobufNet.csproj | 24 ++--- ...alizer.SchemaRegistry.ConfluentAvro.csproj | 32 +++---- ...alizer.SchemaRegistry.ConfluentJson.csproj | 26 ++--- ...er.SchemaRegistry.ConfluentProtobuf.csproj | 26 ++--- src/KafkaFlow.Unity/KafkaFlow.Unity.csproj | 22 ++--- src/KafkaFlow/Consumers/PartitionOffsets.cs | 4 +- src/KafkaFlow/KafkaFlow.csproj | 38 ++++---- .../KafkaFlow.IntegrationTests.csproj | 94 +++++++++---------- .../CompressorConsumerMiddlewareTests.cs | 2 +- .../ConsumerConfigurationBuilderTests.cs | 2 +- .../KafkaFlow.UnitTests.csproj | 78 +++++++-------- .../MessageContextTests.cs | 6 +- .../SerializerConsumerMiddlewareTests.cs | 2 +- .../TypedHandler/HandlerTypeMappingTests.cs | 8 +- 48 files changed, 604 insertions(+), 581 deletions(-) diff --git a/.editorconfig b/.editorconfig index eba3de15d..b7ea5f1ad 100644 --- a/.editorconfig +++ b/.editorconfig @@ -1,5 +1,13 @@ # editorconfig.org +root = true + +[*] +insert_final_newline = true +indent_style = space +indent_size = 4 +trim_trailing_whitespace = true + [*.cs] # Naming rules for fields dotnet_naming_rule.private_fields_with_underscore.symbols = private_field @@ -35,20 +43,20 @@ dotnet_naming_rule.style_dotnet_naming_rule_DotNetNamingStyle.DoNotUseThisForPri # name all constant fields using PascalCase dotnet_naming_rule.constant_fields_should_be_pascal_case.severity = warning -dotnet_naming_rule.constant_fields_should_be_pascal_case.symbols = constant_fields -dotnet_naming_rule.constant_fields_should_be_pascal_case.style = pascal_case_style +dotnet_naming_rule.constant_fields_should_be_pascal_case.symbols = constant_fields +dotnet_naming_rule.constant_fields_should_be_pascal_case.style = pascal_case_style -dotnet_naming_symbols.constant_fields.applicable_kinds = field +dotnet_naming_symbols.constant_fields.applicable_kinds = field dotnet_naming_symbols.constant_fields.required_modifiers = const dotnet_naming_style.pascal_case_style.capitalization = pascal_case # static fields should have s_ prefix dotnet_naming_rule.static_fields_should_have_prefix.severity = warning -dotnet_naming_rule.static_fields_should_have_prefix.symbols = static_fields -dotnet_naming_rule.static_fields_should_have_prefix.style = static_prefix_style +dotnet_naming_rule.static_fields_should_have_prefix.symbols = static_fields +dotnet_naming_rule.static_fields_should_have_prefix.style = static_prefix_style -dotnet_naming_symbols.static_fields.applicable_kinds = field +dotnet_naming_symbols.static_fields.applicable_kinds = field dotnet_naming_symbols.static_fields.required_modifiers = static dotnet_naming_symbols.static_fields.applicable_accessibilities = private, internal, private_protected @@ -74,11 +82,8 @@ csharp_style_expression_bodied_accessors = true:silent csharp_style_expression_bodied_lambdas = true:silent csharp_style_expression_bodied_local_functions = false:silent -[*.{cs,vb}] +[*.cs] dotnet_style_operator_placement_when_wrapping = beginning_of_line -tab_width = 4 -indent_size = 4 -end_of_line = crlf dotnet_style_coalesce_expression = true:suggestion dotnet_style_null_propagation = true:suggestion dotnet_style_prefer_is_null_check_over_reference_equality_method = true:suggestion @@ -93,4 +98,23 @@ dotnet_style_explicit_tuple_names = true:suggestion dotnet_style_prefer_inferred_tuple_names = true:suggestion dotnet_style_prefer_inferred_anonymous_type_member_names = true:suggestion dotnet_style_prefer_compound_assignment = true:suggestion -dotnet_style_prefer_simplified_interpolation = true:suggestion \ No newline at end of file +dotnet_style_prefer_simplified_interpolation = true:suggestion + +[Makefile] +indent_style = tab +indent_size = 2 + +[*.csproj] +indent_size = 2 + +[*.props] +indent_size = 2 + +[*.js] +indent_size = 2 + +[*.json] +indent_size = 2 + +[*.{yaml,yml}] +indent_size = 2 diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e4ccca1dd..335448092 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -30,9 +30,9 @@ jobs: failOnWarnings: true - name: Setup .NET - uses: actions/setup-dotnet@v3 + uses: actions/setup-dotnet@v4 with: - dotnet-version: '6.0.x' + dotnet-version: '8.x' - name: Setup Node uses: actions/setup-node@v1 diff --git a/.github/workflows/deploy-website.yml b/.github/workflows/deploy-website.yml index 24104c401..d4d407479 100644 --- a/.github/workflows/deploy-website.yml +++ b/.github/workflows/deploy-website.yml @@ -5,7 +5,7 @@ on: branches: - master - release-* - + workflow_dispatch: jobs: @@ -17,9 +17,9 @@ jobs: - uses: actions/checkout@v4 - name: Setup .NET - uses: actions/setup-dotnet@v3 + uses: actions/setup-dotnet@v4 with: - dotnet-version: '6.0.x' + dotnet-version: '8.x' - run: dotnet tool install --global gsferreira.XmlDocMarkdown.Docusaurus --version 0.0.1-beta2 # using this version while the Pull Request isn't accepted here: https://github.com/ejball/XmlDocMarkdown/pull/126 shell: bash @@ -84,7 +84,7 @@ jobs: - name: Install dependencies working-directory: ./website run: yarn install --frozen-lockfile - + - name: Build website working-directory: ./website run: yarn build @@ -93,4 +93,4 @@ jobs: uses: peaceiris/actions-gh-pages@v3 with: github_token: ${{ secrets.GITHUB_TOKEN }} - publish_dir: ./website/build \ No newline at end of file + publish_dir: ./website/build diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index c45e828a2..b9905452d 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -12,9 +12,9 @@ jobs: - uses: actions/checkout@v4 - name: Setup .NET - uses: actions/setup-dotnet@v3 + uses: actions/setup-dotnet@v4 with: - dotnet-version: '6.0.x' + dotnet-version: '8.x' - name: Setup Node uses: actions/setup-node@v1 diff --git a/.github/workflows/test-deploy-website.yml b/.github/workflows/test-deploy-website.yml index 053b0f803..b062efd2d 100644 --- a/.github/workflows/test-deploy-website.yml +++ b/.github/workflows/test-deploy-website.yml @@ -11,9 +11,9 @@ jobs: - uses: actions/checkout@v4 - name: Setup .NET - uses: actions/setup-dotnet@v3 + uses: actions/setup-dotnet@v4 with: - dotnet-version: '6.0.x' + dotnet-version: '8.x' - run: dotnet tool install --global gsferreira.XmlDocMarkdown.Docusaurus --version 0.0.1-beta2 # using this version while the Pull Request isn't accepted here: https://github.com/ejball/XmlDocMarkdown/pull/126 shell: bash @@ -76,7 +76,7 @@ jobs: - name: Install dependencies working-directory: ./website run: yarn install --frozen-lockfile - + - name: Test build website working-directory: ./website - run: yarn build \ No newline at end of file + run: yarn build diff --git a/KafkaFlow.sln b/KafkaFlow.sln index a591122dc..b75b9aadb 100644 --- a/KafkaFlow.sln +++ b/KafkaFlow.sln @@ -96,6 +96,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.OpenTelemetry", "samples\KafkaFlow.Sample.OpenTelemetry\KafkaFlow.Sample.OpenTelemetry.csproj", "{E9E8B374-4165-45F2-8DF5-F141E141AC1D}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.WildcardConsumer", "samples\KafkaFlow.Sample.WildcardConsumer\KafkaFlow.Sample.WildcardConsumer.csproj", "{8F6CDF12-5316-4AAF-A1F1-264337585698}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -222,6 +224,10 @@ Global {E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Debug|Any CPU.Build.0 = Debug|Any CPU {E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Release|Any CPU.ActiveCfg = Release|Any CPU {E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Release|Any CPU.Build.0 = Release|Any CPU + {8F6CDF12-5316-4AAF-A1F1-264337585698}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8F6CDF12-5316-4AAF-A1F1-264337585698}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8F6CDF12-5316-4AAF-A1F1-264337585698}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8F6CDF12-5316-4AAF-A1F1-264337585698}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -265,6 +271,7 @@ Global {1755E8DB-970C-4A24-8B7C-A2BEC1410BEE} = {7A9B997B-DAAC-4004-94F3-32F6B88E0068} {80080C1D-579E-4AB2-935D-5CFFC51843D8} = {7A9B997B-DAAC-4004-94F3-32F6B88E0068} {E9E8B374-4165-45F2-8DF5-F141E141AC1D} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B} + {8F6CDF12-5316-4AAF-A1F1-264337585698} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {6AE955B5-16B0-41CF-9F12-66D15B3DD1AB} diff --git a/docker-compose.yml b/docker-compose.yml index 4aa2b10ac..84b2aecad 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,6 @@ services: zookeeper: - image: confluentinc/cp-zookeeper:7.2.1 + image: confluentinc/cp-zookeeper:7.8.0 hostname: zookeeper container_name: zookeeper ports: @@ -8,9 +8,9 @@ services: environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 - + broker: - image: confluentinc/cp-server:7.2.1 + image: confluentinc/cp-server:7.8.0 hostname: broker container_name: broker depends_on: @@ -42,9 +42,9 @@ services: interval: 10s timeout: 5s retries: 5 - + schema-registry: - image: confluentinc/cp-schema-registry:7.2.1 + image: confluentinc/cp-schema-registry:7.8.0 hostname: schema-registry container_name: schema-registry depends_on: @@ -56,10 +56,10 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 - + kafka-tools: - image: confluentinc/cp-kafka:7.0.5 + image: confluentinc/cp-kafka:7.8.0 hostname: kafka container_name: kafka command: ["tail", "-f", "/dev/null"] - network_mode: "host" \ No newline at end of file + network_mode: "host" diff --git a/samples/KafkaFlow.Sample.BatchOperations/KafkaFlow.Sample.BatchOperations.csproj b/samples/KafkaFlow.Sample.BatchOperations/KafkaFlow.Sample.BatchOperations.csproj index 6336785ab..cd140ed0f 100644 --- a/samples/KafkaFlow.Sample.BatchOperations/KafkaFlow.Sample.BatchOperations.csproj +++ b/samples/KafkaFlow.Sample.BatchOperations/KafkaFlow.Sample.BatchOperations.csproj @@ -1,31 +1,31 @@ - - Exe - net6.0 - false - false - true - + + Exe + net8.0 + false + false + true + - - 1701;1702;CS1591;SA1600 - + + 1701;1702;CS1591;SA1600 + - - 1701;1702;CS1591;SA1600 - + + 1701;1702;CS1591;SA1600 + - - - + + + - - - - - - - + + + + + + + diff --git a/samples/KafkaFlow.Sample.ConsumerThrottling/KafkaFlow.Sample.ConsumerThrottling.csproj b/samples/KafkaFlow.Sample.ConsumerThrottling/KafkaFlow.Sample.ConsumerThrottling.csproj index 3f2204ac3..62eaf5bf8 100644 --- a/samples/KafkaFlow.Sample.ConsumerThrottling/KafkaFlow.Sample.ConsumerThrottling.csproj +++ b/samples/KafkaFlow.Sample.ConsumerThrottling/KafkaFlow.Sample.ConsumerThrottling.csproj @@ -1,32 +1,32 @@ - - Exe - net6.0 - false - false - true - + + Exe + net8.0 + false + false + true + - - 1701;1702;CS1591;SA1600 - + + 1701;1702;CS1591;SA1600 + - - 1701;1702;CS1591;SA1600 - + + 1701;1702;CS1591;SA1600 + - - - - - - - + + + + + + + - - - - + + + + diff --git a/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj b/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj index 80eed6be1..c71b81066 100644 --- a/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj +++ b/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj @@ -1,7 +1,7 @@  - net6.0 + net8.0 false true diff --git a/samples/KafkaFlow.Sample.FlowControl/KafkaFlow.Sample.FlowControl.csproj b/samples/KafkaFlow.Sample.FlowControl/KafkaFlow.Sample.FlowControl.csproj index 20946db23..40235d76f 100644 --- a/samples/KafkaFlow.Sample.FlowControl/KafkaFlow.Sample.FlowControl.csproj +++ b/samples/KafkaFlow.Sample.FlowControl/KafkaFlow.Sample.FlowControl.csproj @@ -1,32 +1,31 @@ - - Exe - net6.0 - false - false - 10 - true - + + Exe + net8.0 + false + false + true + - - 1701;1702;CS1591;SA1600 - + + 1701;1702;CS1591;SA1600 + - - 1701;1702;CS1591;SA1600 - + + 1701;1702;CS1591;SA1600 + - - - + + + - - - - - - - + + + + + + + diff --git a/samples/KafkaFlow.Sample.OpenTelemetry/KafkaFlow.Sample.OpenTelemetry.csproj b/samples/KafkaFlow.Sample.OpenTelemetry/KafkaFlow.Sample.OpenTelemetry.csproj index b2400f211..18cc4eee4 100644 --- a/samples/KafkaFlow.Sample.OpenTelemetry/KafkaFlow.Sample.OpenTelemetry.csproj +++ b/samples/KafkaFlow.Sample.OpenTelemetry/KafkaFlow.Sample.OpenTelemetry.csproj @@ -1,28 +1,26 @@ - - Exe - net6.0 - false - false - true - + + Exe + net8.0 + false + false + true + + + + + + + + - - - - - - - - - - - - - - - + + + + + + diff --git a/samples/KafkaFlow.Sample.PauseConsumerOnError/KafkaFlow.Sample.PauseConsumerOnError.csproj b/samples/KafkaFlow.Sample.PauseConsumerOnError/KafkaFlow.Sample.PauseConsumerOnError.csproj index 7a55c9fd7..74847a904 100644 --- a/samples/KafkaFlow.Sample.PauseConsumerOnError/KafkaFlow.Sample.PauseConsumerOnError.csproj +++ b/samples/KafkaFlow.Sample.PauseConsumerOnError/KafkaFlow.Sample.PauseConsumerOnError.csproj @@ -1,24 +1,24 @@ - - Exe - net6.0 - enable - enable - false - false - true - + + Exe + net8.0 + enable + enable + false + false + true + - - - - - - + + + + + + - - - + + + diff --git a/samples/KafkaFlow.Sample.SchemaRegistry/KafkaFlow.Sample.SchemaRegistry.csproj b/samples/KafkaFlow.Sample.SchemaRegistry/KafkaFlow.Sample.SchemaRegistry.csproj index ddef38718..501b6a184 100644 --- a/samples/KafkaFlow.Sample.SchemaRegistry/KafkaFlow.Sample.SchemaRegistry.csproj +++ b/samples/KafkaFlow.Sample.SchemaRegistry/KafkaFlow.Sample.SchemaRegistry.csproj @@ -1,34 +1,34 @@ - - Exe - net6.0 - false - false - KafkaFlow.Sample.SchemaRegistry - KafkaFlow.Sample.SchemaRegistry - true - + + Exe + net8.0 + false + false + KafkaFlow.Sample.SchemaRegistry + KafkaFlow.Sample.SchemaRegistry + true + - - 1701;1702;CS1591;SA1600 - + + 1701;1702;CS1591;SA1600 + - - 1701;1702;CS1591;SA1600 - + + 1701;1702;CS1591;SA1600 + - - - - - - - - + + + + + + + + - - - + + + diff --git a/samples/KafkaFlow.Sample.WebApi/KafkaFlow.Sample.WebApi.csproj b/samples/KafkaFlow.Sample.WebApi/KafkaFlow.Sample.WebApi.csproj index bf85308d2..b6105f2eb 100644 --- a/samples/KafkaFlow.Sample.WebApi/KafkaFlow.Sample.WebApi.csproj +++ b/samples/KafkaFlow.Sample.WebApi/KafkaFlow.Sample.WebApi.csproj @@ -1,34 +1,34 @@ - - net6.0 - false - true - enable - enable - + + net8.0 + false + true + enable + enable + - - 1701;1702;CS1591;SA1600 - + + 1701;1702;CS1591;SA1600 + - - 1701;1702;CS1591;SA1600 - + + 1701;1702;CS1591;SA1600 + - - - - - - - + + + + + + + - - - - - + + + + + diff --git a/samples/KafkaFlow.Sample.WildcardConsumer/KafkaFlow.Sample.WildcardConsumer.csproj b/samples/KafkaFlow.Sample.WildcardConsumer/KafkaFlow.Sample.WildcardConsumer.csproj index 05d9e4f29..50b43b9a1 100644 --- a/samples/KafkaFlow.Sample.WildcardConsumer/KafkaFlow.Sample.WildcardConsumer.csproj +++ b/samples/KafkaFlow.Sample.WildcardConsumer/KafkaFlow.Sample.WildcardConsumer.csproj @@ -1,27 +1,24 @@ - - Exe - net6.0 - enable - enable - + + Exe + net8.0 + enable + enable + - - - - - - - - - - - - + + + + + + + + + + + + + - - - - diff --git a/samples/KafkaFlow.Sample.WildcardConsumer/Program.cs b/samples/KafkaFlow.Sample.WildcardConsumer/Program.cs index 32d13df22..e37a712d0 100644 --- a/samples/KafkaFlow.Sample.WildcardConsumer/Program.cs +++ b/samples/KafkaFlow.Sample.WildcardConsumer/Program.cs @@ -20,7 +20,7 @@ producerName, _ => { }) .AddConsumer( consumer => consumer - .Topic($"^{topicPrefix}*") // Any topic starting with `random-topic-*` + .Topic($"^{topicPrefix}*") // Any topic starting with `random-topic-*` .WithGroupId("random-topic-handler") .WithBufferSize(5) .WithWorkersCount(3) @@ -67,4 +67,4 @@ await producer.ProduceAsync( Guid.NewGuid().ToString(), Encoding.UTF8.GetBytes( $"Message to {input}: {Guid.NewGuid()}")); -} \ No newline at end of file +} diff --git a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj index 89cfb73ca..3a3f539c1 100644 --- a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj +++ b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj @@ -1,30 +1,30 @@  - - Exe - net6.0 - false - false - true - + + Exe + net8.0 + false + false + true + - - 1701;1702;CS1591;SA1600 - + + 1701;1702;CS1591;SA1600 + - - 1701;1702;CS1591;SA1600 - + + 1701;1702;CS1591;SA1600 + - - - - - - + + + + + + - - - + + + diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 673cb04c4..182bf9809 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -19,7 +19,7 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs b/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs index 3b4177c05..9b0ee6e37 100644 --- a/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs +++ b/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs @@ -1,12 +1,11 @@ -namespace KafkaFlow.Configuration +namespace KafkaFlow.Configuration; + +/// SaslOauthbearerMethod enum values +public enum SaslOauthbearerMethod { - /// SaslOauthbearerMethod enum values - public enum SaslOauthbearerMethod - { - /// Default - Default, + /// Default + Default, - /// Oidc - Oidc, - } + /// Oidc + Oidc, } diff --git a/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj b/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj index 283ef9c6b..93aa9ad9d 100644 --- a/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj +++ b/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj @@ -1,15 +1,15 @@  - - netstandard2.0 - KafkaFlow - KafkaFlow.Abstractions - Contains all KafkaFlow extendable interfaces - + + netstandard2.0 + KafkaFlow + KafkaFlow.Abstractions + Contains all KafkaFlow extendable interfaces + - - - - + + + + diff --git a/src/KafkaFlow.Admin.Dashboard/KafkaFlow.Admin.Dashboard.csproj b/src/KafkaFlow.Admin.Dashboard/KafkaFlow.Admin.Dashboard.csproj index ae071e278..03017f229 100644 --- a/src/KafkaFlow.Admin.Dashboard/KafkaFlow.Admin.Dashboard.csproj +++ b/src/KafkaFlow.Admin.Dashboard/KafkaFlow.Admin.Dashboard.csproj @@ -1,7 +1,7 @@  - net6.0 + net8.0 true Latest true @@ -21,23 +21,23 @@ - + - - + + - + - + - + - + diff --git a/src/KafkaFlow.Admin.WebApi/KafkaFlow.Admin.WebApi.csproj b/src/KafkaFlow.Admin.WebApi/KafkaFlow.Admin.WebApi.csproj index 44b6cf74e..b2849fc38 100644 --- a/src/KafkaFlow.Admin.WebApi/KafkaFlow.Admin.WebApi.csproj +++ b/src/KafkaFlow.Admin.WebApi/KafkaFlow.Admin.WebApi.csproj @@ -1,20 +1,20 @@ - - net6.0 - Library - KafkaFlow.Admin.WebApi - Allows KafkaFlow to use the Admin Web API - true - + + net8.0 + Library + KafkaFlow.Admin.WebApi + Allows KafkaFlow to use the Admin Web API + true + - - - - + + + + - - - + + + diff --git a/src/KafkaFlow.Admin/KafkaFlow.Admin.csproj b/src/KafkaFlow.Admin/KafkaFlow.Admin.csproj index cb8c11c06..b4927d83c 100644 --- a/src/KafkaFlow.Admin/KafkaFlow.Admin.csproj +++ b/src/KafkaFlow.Admin/KafkaFlow.Admin.csproj @@ -1,15 +1,15 @@ - - netstandard2.0 - KafkaFlow.Admin - Allows KafkaFlow to use the Admin commands - + + netstandard2.0 + KafkaFlow.Admin + Allows KafkaFlow to use the Admin commands + - - - - - + + + + + diff --git a/src/KafkaFlow.Compressor.Gzip/KafkaFlow.Compressor.Gzip.csproj b/src/KafkaFlow.Compressor.Gzip/KafkaFlow.Compressor.Gzip.csproj index 69586386a..2349b899d 100644 --- a/src/KafkaFlow.Compressor.Gzip/KafkaFlow.Compressor.Gzip.csproj +++ b/src/KafkaFlow.Compressor.Gzip/KafkaFlow.Compressor.Gzip.csproj @@ -1,12 +1,13 @@  - - netstandard2.0 - KafkaFlow.Compressor.Gzip - Gzip implementation for KafkaFlow compression middleware - + + netstandard2.0 + KafkaFlow.Compressor.Gzip + Gzip implementation for KafkaFlow compression middleware + - - - - \ No newline at end of file + + + + + diff --git a/src/KafkaFlow.Extensions.Hosting/KafkaFlow.Extensions.Hosting.csproj b/src/KafkaFlow.Extensions.Hosting/KafkaFlow.Extensions.Hosting.csproj index 0533b40c3..9cf6838c5 100644 --- a/src/KafkaFlow.Extensions.Hosting/KafkaFlow.Extensions.Hosting.csproj +++ b/src/KafkaFlow.Extensions.Hosting/KafkaFlow.Extensions.Hosting.csproj @@ -1,19 +1,19 @@ - - netstandard2.0 - KafkaFlow - KafkaFlow.Extensions.Hosting - Helper to run KafkaFlow as a Hosted Service - + + netstandard2.0 + KafkaFlow + KafkaFlow.Extensions.Hosting + Helper to run KafkaFlow as a Hosted Service + - - - - + + + + - - - + + + diff --git a/src/KafkaFlow.LogHandler.Console/KafkaFlow.LogHandler.Console.csproj b/src/KafkaFlow.LogHandler.Console/KafkaFlow.LogHandler.Console.csproj index 02d95c335..fb26c26aa 100644 --- a/src/KafkaFlow.LogHandler.Console/KafkaFlow.LogHandler.Console.csproj +++ b/src/KafkaFlow.LogHandler.Console/KafkaFlow.LogHandler.Console.csproj @@ -1,16 +1,16 @@ - - netstandard2.0 - KafkaFlow - + + netstandard2.0 + KafkaFlow + - - - + + + - - - + + + diff --git a/src/KafkaFlow.LogHandler.Microsoft/KafkaFlow.LogHandler.Microsoft.csproj b/src/KafkaFlow.LogHandler.Microsoft/KafkaFlow.LogHandler.Microsoft.csproj index 0bd03e3aa..97fbdd929 100644 --- a/src/KafkaFlow.LogHandler.Microsoft/KafkaFlow.LogHandler.Microsoft.csproj +++ b/src/KafkaFlow.LogHandler.Microsoft/KafkaFlow.LogHandler.Microsoft.csproj @@ -1,19 +1,19 @@ - - netstandard2.0 - KafkaFlow - KafkaFlow.LogHandler.Microsoft - LogHandler implementation leveraging Microsoft Logging framework. - + + netstandard2.0 + KafkaFlow + KafkaFlow.LogHandler.Microsoft + LogHandler implementation leveraging Microsoft Logging framework. + - - - + + + + + + + + - - - - - diff --git a/src/KafkaFlow.Microsoft.DependencyInjection/KafkaFlow.Microsoft.DependencyInjection.csproj b/src/KafkaFlow.Microsoft.DependencyInjection/KafkaFlow.Microsoft.DependencyInjection.csproj index ebd8ab6a0..e021fdeef 100644 --- a/src/KafkaFlow.Microsoft.DependencyInjection/KafkaFlow.Microsoft.DependencyInjection.csproj +++ b/src/KafkaFlow.Microsoft.DependencyInjection/KafkaFlow.Microsoft.DependencyInjection.csproj @@ -1,19 +1,19 @@ - - netstandard2.0 - KafkaFlow.Microsoft.DependencyInjection - Adapts KafkaFlow to use Microsoft Dependency Injection - KafkaFlow - + + netstandard2.0 + KafkaFlow.Microsoft.DependencyInjection + Adapts KafkaFlow to use Microsoft Dependency Injection + KafkaFlow + - - - - + + + + - - - + + + diff --git a/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs b/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs index 5a119dbbf..8fb371ce3 100644 --- a/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs +++ b/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs @@ -1,9 +1,6 @@ -extern alias SemanticConventions; - -using System.Collections.Generic; +using System.Collections.Generic; using System.Diagnostics; using System.Linq; -using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions; namespace KafkaFlow.OpenTelemetry; @@ -19,7 +16,9 @@ internal static class ActivitySourceAccessor internal static void SetGenericTags(Activity activity, IEnumerable bootstrapServers) { - activity?.SetTag(Conventions.AttributeMessagingSystem, MessagingSystemId); - activity?.SetTag(Conventions.AttributePeerService, string.Join(",", bootstrapServers ?? Enumerable.Empty())); + // https://opentelemetry.io/docs/languages/net/libraries/#note-on-versioning + // https://github.com/open-telemetry/opentelemetry-dotnet/blob/core-1.9.0/src/Shared/SemanticConventions.cs + activity?.SetTag("message.type", MessagingSystemId); + activity?.SetTag("peer.service", string.Join(",", bootstrapServers ?? Enumerable.Empty())); } -} \ No newline at end of file +} diff --git a/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj b/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj index 506126fe9..612e8d3bc 100644 --- a/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj +++ b/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj @@ -5,10 +5,7 @@ - - - SemanticConventions - + diff --git a/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj b/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj index cc514a288..ec0c9e550 100644 --- a/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj +++ b/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj @@ -1,19 +1,19 @@  - - netstandard2.0 - KafkaFlow.SchemaRegistry - Provides extensions methods to use Schema Registry - KafkaFlow - + + netstandard2.0 + KafkaFlow.SchemaRegistry + Provides extensions methods to use Schema Registry + KafkaFlow + - - - + + + - - - - + + + + diff --git a/src/KafkaFlow.Serializer.JsonCore/KafkaFlow.Serializer.JsonCore.csproj b/src/KafkaFlow.Serializer.JsonCore/KafkaFlow.Serializer.JsonCore.csproj index 31582eed3..24d2d5585 100644 --- a/src/KafkaFlow.Serializer.JsonCore/KafkaFlow.Serializer.JsonCore.csproj +++ b/src/KafkaFlow.Serializer.JsonCore/KafkaFlow.Serializer.JsonCore.csproj @@ -1,17 +1,17 @@  - - netstandard2.0 - KafkaFlow.Serializer.JsonCore - JSON implementation for KafkaFlow serializer middleware using System.Text.Json - KafkaFlow.Serializer - + + netstandard2.0 + KafkaFlow.Serializer.JsonCore + JSON implementation for KafkaFlow serializer middleware using System.Text.Json + KafkaFlow.Serializer + - - - + + + - - - + + + diff --git a/src/KafkaFlow.Serializer.NewtonsoftJson/KafkaFlow.Serializer.NewtonsoftJson.csproj b/src/KafkaFlow.Serializer.NewtonsoftJson/KafkaFlow.Serializer.NewtonsoftJson.csproj index 8d68ef533..b1b7911f1 100644 --- a/src/KafkaFlow.Serializer.NewtonsoftJson/KafkaFlow.Serializer.NewtonsoftJson.csproj +++ b/src/KafkaFlow.Serializer.NewtonsoftJson/KafkaFlow.Serializer.NewtonsoftJson.csproj @@ -1,18 +1,18 @@ - - netstandard2.0 - KafkaFlow.Serializer.NewtonsoftJson - JSON implementation for KafkaFlow serializer middleware using Newtonsoft Json - KafkaFlow.Serializer - + + netstandard2.0 + KafkaFlow.Serializer.NewtonsoftJson + JSON implementation for KafkaFlow serializer middleware using Newtonsoft Json + KafkaFlow.Serializer + - - - + + + - - - + + + diff --git a/src/KafkaFlow.Serializer.ProtobufNet/KafkaFlow.Serializer.ProtobufNet.csproj b/src/KafkaFlow.Serializer.ProtobufNet/KafkaFlow.Serializer.ProtobufNet.csproj index 67808b7e5..c528b551f 100644 --- a/src/KafkaFlow.Serializer.ProtobufNet/KafkaFlow.Serializer.ProtobufNet.csproj +++ b/src/KafkaFlow.Serializer.ProtobufNet/KafkaFlow.Serializer.ProtobufNet.csproj @@ -1,17 +1,17 @@  - - netstandard2.0 - KafkaFlow.Serializer.ProtobufNet - Protobuf implementation for KafkaFlow serializer middleware using protobuf-net - KafkaFlow.Serializer - + + netstandard2.0 + KafkaFlow.Serializer.ProtobufNet + Protobuf implementation for KafkaFlow serializer middleware using protobuf-net + KafkaFlow.Serializer + - - - + + + - - - + + + diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro.csproj b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro.csproj index c81fed089..732d627df 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro.csproj +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro.csproj @@ -1,22 +1,22 @@ - - netstandard2.0 - KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro - Avro implementation for KafkaFlow serializer middleware using Confluent.SchemaRegistry.Serdes.Avro package - KafkaFlow.Serializer.SchemaRegistry - + + netstandard2.0 + KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro + Avro implementation for KafkaFlow serializer middleware using Confluent.SchemaRegistry.Serdes.Avro package + KafkaFlow.Serializer.SchemaRegistry + - - - - + + + + - - - - - - + + + + + + diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson.csproj b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson.csproj index 2c3e2619a..6c4f14a60 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson.csproj +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson.csproj @@ -1,19 +1,19 @@ - - netstandard2.0 - KafkaFlow.Serializer.SchemaRegistry.ConfluentJson - Json implementation for KafkaFlow serializer middleware using Confluent.SchemaRegistry.Serdes.Json package - KafkaFlow.Serializer.SchemaRegistry - + + netstandard2.0 + KafkaFlow.Serializer.SchemaRegistry.ConfluentJson + Json implementation for KafkaFlow serializer middleware using Confluent.SchemaRegistry.Serdes.Json package + KafkaFlow.Serializer.SchemaRegistry + - - - + + + - - - - + + + + diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf.csproj b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf.csproj index a99bcc155..8b4f7dd5f 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf.csproj +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf.csproj @@ -1,18 +1,18 @@ - - netstandard2.0 - KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf - Protobuf implementation for KafkaFlow serializer middleware using Confluent.SchemaRegistry.Serdes.Protobuf package - KafkaFlow.Serializer.SchemaRegistry - + + netstandard2.0 + KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf + Protobuf implementation for KafkaFlow serializer middleware using Confluent.SchemaRegistry.Serdes.Protobuf package + KafkaFlow.Serializer.SchemaRegistry + - - - + + + - - - - + + + + diff --git a/src/KafkaFlow.Unity/KafkaFlow.Unity.csproj b/src/KafkaFlow.Unity/KafkaFlow.Unity.csproj index 63b2a5df4..935f41284 100644 --- a/src/KafkaFlow.Unity/KafkaFlow.Unity.csproj +++ b/src/KafkaFlow.Unity/KafkaFlow.Unity.csproj @@ -1,17 +1,17 @@ - - netstandard2.0 - KafkaFlow.Unity - Adapts KafkaFlow to use Unity Dependency Injection - + + netstandard2.0 + KafkaFlow.Unity + Adapts KafkaFlow to use Unity Dependency Injection + - - - + + + - - - + + + diff --git a/src/KafkaFlow/Consumers/PartitionOffsets.cs b/src/KafkaFlow/Consumers/PartitionOffsets.cs index 98575034d..108f6c02e 100644 --- a/src/KafkaFlow/Consumers/PartitionOffsets.cs +++ b/src/KafkaFlow/Consumers/PartitionOffsets.cs @@ -7,8 +7,8 @@ namespace KafkaFlow.Consumers; internal class PartitionOffsets { - private readonly SortedDictionary _processedContexts = new(); - private readonly LinkedList _receivedContexts = new(); + private readonly SortedDictionary _processedContexts = new (); + private readonly LinkedList _receivedContexts = new (); public IConsumerContext DequeuedContext { get; private set; } diff --git a/src/KafkaFlow/KafkaFlow.csproj b/src/KafkaFlow/KafkaFlow.csproj index bebc9488a..4925e4af5 100644 --- a/src/KafkaFlow/KafkaFlow.csproj +++ b/src/KafkaFlow/KafkaFlow.csproj @@ -1,25 +1,25 @@  - - netstandard2.0 - KafkaFlow - KafkaFlow main package - + + netstandard2.0 + KafkaFlow + KafkaFlow main package + - - - - - - + + + + + + - - - + + + - - - <_Parameter1>$(AssemblyName).UnitTests - - + + + <_Parameter1>$(AssemblyName).UnitTests + + \ No newline at end of file diff --git a/tests/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj b/tests/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj index 9db78e47b..b41f49e34 100644 --- a/tests/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj +++ b/tests/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj @@ -1,56 +1,56 @@  - - net6.0 - false - true - + + net8.0 + false + true + - - 1701;1702;CS1591;SA1600;CS0618 - + + 1701;1702;CS1591;SA1600;CS0618 + - - 1701;1702;CS1591;SA1600;CS0618 - + + 1701;1702;CS1591;SA1600;CS0618 + - - - - - - - - - true - - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - - - - + + + + + + + + + + + + true + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + - - - - - - - - - - - - + + + + + + + + + + + + - - - - Always - - + + + Always + + diff --git a/tests/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs b/tests/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs index d1e0d7e62..1aae88f33 100644 --- a/tests/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs +++ b/tests/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs @@ -37,7 +37,7 @@ public void Invoke_NotByteArrayMessage_ThrowsInvalidOperationException() Func act = () => _target.Invoke(_contextMock.Object, _ => this.SetNextCalled()); // Assert - act.Should().Throw(); + act.Should().ThrowAsync(); _nextCalled.Should().BeFalse(); _contextMock.Verify(x => x.SetMessage(It.IsAny(), It.IsAny()), Times.Never); _decompressorMock.Verify(x => x.Decompress(It.IsAny()), Times.Never); diff --git a/tests/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs b/tests/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs index 0164ac2e9..52f49bac2 100644 --- a/tests/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs +++ b/tests/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs @@ -119,7 +119,7 @@ public void Build_AllCalls_ReturnPassedValues() configuration.BufferSize.Should().Be(bufferSize); configuration.WorkersCountCalculator(null, null).Result.Should().Be(workers); configuration.GroupId.Should().Be(groupId); - configuration.GetKafkaConfig().AutoOffsetReset.Should().Be(offsetReset); + configuration.GetKafkaConfig().AutoOffsetReset.Should().HaveSameValueAs(offsetReset); configuration.AutoMessageCompletion.Should().Be(false); configuration.GetKafkaConfig().EnableAutoOffsetStore.Should().Be(false); configuration.GetKafkaConfig().EnableAutoCommit.Should().Be(false); diff --git a/tests/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj b/tests/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj index 479ec854f..6026c38b6 100644 --- a/tests/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj +++ b/tests/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj @@ -1,47 +1,47 @@  - - false - net6.0 - true - + + false + net8.0 + true + - - 1701;1702;CS1591;SA1600 - + + 1701;1702;CS1591;SA1600 + - - 1701;1702;CS1591;SA1600 - + + 1701;1702;CS1591;SA1600 + - - - - - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - - - - - - true - - - + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + true + + + - - - - - - - - - - - + + + + + + + + + + + diff --git a/tests/KafkaFlow.UnitTests/MessageContextTests.cs b/tests/KafkaFlow.UnitTests/MessageContextTests.cs index 442b8ad34..fcf9e855a 100644 --- a/tests/KafkaFlow.UnitTests/MessageContextTests.cs +++ b/tests/KafkaFlow.UnitTests/MessageContextTests.cs @@ -1,6 +1,4 @@ using System.Collections.Generic; -using KafkaFlow.Consumers; -using KafkaFlow.Producers; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -21,7 +19,7 @@ public void SetMessage_ShouldSetMessageCorrectly() Mock.Of(), Mock.Of>() ); - + // Act var changedMessage = messageContext.SetMessage("changed-key", "changed-value"); @@ -36,4 +34,4 @@ public void SetMessage_ShouldSetMessageCorrectly() Assert.AreSame(messageContext.Brokers, changedMessage.Brokers); Assert.AreSame(messageContext.Items, changedMessage.Items); } -} \ No newline at end of file +} diff --git a/tests/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs b/tests/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs index 4370ec540..98abeb226 100644 --- a/tests/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs +++ b/tests/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs @@ -87,7 +87,7 @@ public void Invoke_NotByteArrayMessage_ThrowsInvalidOperationException() Func act = () => _target.Invoke(_contextMock.Object, _ => this.SetNextCalled()); // Assert - act.Should().Throw(); + act.Should().ThrowAsync(); _nextCalled.Should().BeFalse(); _contextMock.Verify(x => x.SetMessage(It.IsAny(), It.IsAny()), Times.Never); _deserializerMock.Verify( diff --git a/tests/KafkaFlow.UnitTests/TypedHandler/HandlerTypeMappingTests.cs b/tests/KafkaFlow.UnitTests/TypedHandler/HandlerTypeMappingTests.cs index 4a1162d29..b06a586a7 100644 --- a/tests/KafkaFlow.UnitTests/TypedHandler/HandlerTypeMappingTests.cs +++ b/tests/KafkaFlow.UnitTests/TypedHandler/HandlerTypeMappingTests.cs @@ -1,3 +1,5 @@ +using System; +using System.Collections.Generic; using FluentAssertions; using KafkaFlow.Middlewares.TypedHandler; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -26,9 +28,11 @@ public void AddSeveralMappings_GetHandlersTypesReturnsListOfHandlers() // Assert _target.GetHandlersTypes(typeof(int)) .Should() - .BeEquivalentTo( + .BeEquivalentTo(new List + { typeof(string), typeof(double), - typeof(bool)); + typeof(bool) + }); } } From 4ab446e0321dfde05d24704d125e002a0fe0d758 Mon Sep 17 00:00:00 2001 From: Miguel Costa Date: Thu, 6 Feb 2025 02:44:24 +0000 Subject: [PATCH 4/4] feat: upgrade to Confluent.Kafka.* 2.8.0 --- .../KafkaFlow.SchemaRegistry.csproj | 2 +- src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs | 8 +++++++- .../KafkaFlow.Serializer.ProtobufNet.csproj | 2 +- .../ConfluentAvroSerializer.cs | 3 ++- ...fkaFlow.Serializer.SchemaRegistry.ConfluentAvro.csproj | 6 +++--- .../ConfluentJsonSerializer.cs | 3 ++- ...fkaFlow.Serializer.SchemaRegistry.ConfluentJson.csproj | 2 +- .../ConfluentProtobufSerializer.cs | 3 ++- .../ConfluentProtobufTypeNameResolver.cs | 7 ++++--- ...low.Serializer.SchemaRegistry.ConfluentProtobuf.csproj | 8 +++++++- src/KafkaFlow/KafkaFlow.csproj | 2 +- 11 files changed, 31 insertions(+), 15 deletions(-) diff --git a/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj b/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj index ec0c9e550..5ffd4bbd5 100644 --- a/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj +++ b/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs b/src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs index 6ef0b3f7d..c9aa9dd36 100644 --- a/src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs +++ b/src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs @@ -1,4 +1,5 @@ -using System.IO; +using System; +using System.IO; using System.Text.Json; using System.Threading.Tasks; @@ -52,6 +53,11 @@ public JsonCoreSerializer() /// public Task SerializeAsync(object message, Stream output, ISerializerContext context) { + if (message is null || message == Array.Empty()) + { + return Task.CompletedTask; + } + using var writer = new Utf8JsonWriter(output, _writerOptions); JsonSerializer.Serialize(writer, message, _serializerOptions); diff --git a/src/KafkaFlow.Serializer.ProtobufNet/KafkaFlow.Serializer.ProtobufNet.csproj b/src/KafkaFlow.Serializer.ProtobufNet/KafkaFlow.Serializer.ProtobufNet.csproj index c528b551f..7f6553c64 100644 --- a/src/KafkaFlow.Serializer.ProtobufNet/KafkaFlow.Serializer.ProtobufNet.csproj +++ b/src/KafkaFlow.Serializer.ProtobufNet/KafkaFlow.Serializer.ProtobufNet.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs index 29d346662..b9b868e8c 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs @@ -40,7 +40,8 @@ public Task SerializeAsync(object message, Stream output, ISerializerContext con () => Activator.CreateInstance( typeof(AvroSerializer<>).MakeGenericType(message.GetType()), _schemaRegistryClient, - _serializerConfig)) + _serializerConfig, + null)) .SerializeAsync(message, output, context); } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro.csproj b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro.csproj index 732d627df..caeab3625 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro.csproj +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro.csproj @@ -14,9 +14,9 @@ - - - + + + diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs index 7b755b31c..0def4f74b 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs @@ -59,7 +59,8 @@ public Task SerializeAsync(object message, Stream output, ISerializerContext con typeof(JsonSerializer<>).MakeGenericType(message.GetType()), _schemaRegistryClient, _serializerConfig, - _schemaGeneratorSettings)) + _schemaGeneratorSettings, + null)) .SerializeAsync(message, output, context); } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson.csproj b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson.csproj index 6c4f14a60..5af360904 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson.csproj +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs index 9c0930a57..be9279fe5 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs @@ -38,7 +38,8 @@ public Task SerializeAsync(object message, Stream output, ISerializerContext con () => Activator.CreateInstance( typeof(ProtobufSerializer<>).MakeGenericType(message.GetType()), _schemaRegistryClient, - _serializerConfig)) + _serializerConfig, + null)) .SerializeAsync(message, output, context); } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs index 42237baf2..51b9c78e3 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs @@ -1,8 +1,8 @@ +extern alias GoogleProto; + using System.Linq; using System.Threading.Tasks; using Confluent.SchemaRegistry; -using Google.Protobuf; -using Google.Protobuf.Reflection; namespace KafkaFlow; @@ -19,7 +19,8 @@ public async Task ResolveAsync(int id) { var schemaString = (await _client.GetSchemaAsync(id, "serialized")).SchemaString; - var protoFields = FileDescriptorProto.Parser.ParseFrom(ByteString.FromBase64(schemaString)); + // Issue: https://github.com/confluentinc/confluent-kafka-dotnet/issues/2409 + var protoFields = GoogleProto::Google.Protobuf.Reflection.FileDescriptorProto.Parser.ParseFrom(GoogleProto::Google.Protobuf.ByteString.FromBase64(schemaString)); return $"{protoFields.Package}.{protoFields.MessageType.FirstOrDefault()?.Name}"; } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf.csproj b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf.csproj index 8b4f7dd5f..0c74c0621 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf.csproj +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf.csproj @@ -8,7 +8,13 @@ - + + + GoogleProto + + + PbNet + diff --git a/src/KafkaFlow/KafkaFlow.csproj b/src/KafkaFlow/KafkaFlow.csproj index 4925e4af5..922dcedea 100644 --- a/src/KafkaFlow/KafkaFlow.csproj +++ b/src/KafkaFlow/KafkaFlow.csproj @@ -7,7 +7,7 @@ - +