Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer subscribers are not being called. #1832

Open
8 tasks done
niemyjski opened this issue Jun 8, 2022 · 19 comments
Open
8 tasks done

Consumer subscribers are not being called. #1832

niemyjski opened this issue Jun 8, 2022 · 19 comments
Labels
investigate further we want to further investigate to understand properly LOW

Comments

@niemyjski
Copy link

niemyjski commented Jun 8, 2022

Description

We've been writing a Foundatio message bus implementation around Kafka and noticed that our tests are extremely flakey in some cases (https://github.com/FoundatioFx/Foundatio.Kafka/actions all test failures). The commonality so far is when we have multiple consumers listening to the same topic, the consumers are never notified of a topic message. 1.9.0 helped a lot with reliability locally but still get failures at random. I have a 5900x with a lot of resources locally compared to the build server.

How to reproduce

  1. Clone https://github.com/FoundatioFx/Foundatio.Kafka
  2. Run docker compose up in the cloned folder.
  3. Open the solution and run dotnet test.

The test KafkaMessageBusTests.CanSendMessageToMultipleSubscribersAsync seems to be the test most easily to reproduce this error (after a few runs) and is the simplest.

[Fact]
public override async Task CanSendMessageToMultipleSubscribersAsync() {
    var messageBus = GetMessageBus();
    if (messageBus == null)
        return;

    try {
        var countdown = new AsyncCountdownEvent(3);
        await messageBus.SubscribeAsync<SimpleMessageA>(msg => {
            Assert.Equal("Hello", msg.Data);
            countdown.Signal();
        });
        await messageBus.SubscribeAsync<SimpleMessageA>(msg => {
            Assert.Equal("Hello", msg.Data);
            countdown.Signal();
        });
        await messageBus.SubscribeAsync<SimpleMessageA>(msg => {
            Assert.Equal("Hello", msg.Data);
            countdown.Signal();
        });
        await messageBus.PublishAsync(new SimpleMessageA {
            Data = "Hello"
        });

        await countdown.WaitAsync(TimeSpan.FromSeconds(2));
        Assert.Equal(0, countdown.CurrentCount);
    } finally {
        await CleanupMessageBusAsync(messageBus);
    }
}

Under the hood, each call to subscribe will ensure topic exists, then create a consumer subscriber listening in a loop, only if an existing listener isn't already running (at most one listener per bus instance). I've included logs of varying detail.

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • Confluent.Kafka nuget version (latest 1.9.0 rc)
  • Apache Kafka version (latest)
  • Client configuration.
  • Operating system (all)
  • Provide logs (with "debug" : "..." as necessary in configuration).
  • Provide broker log excerpts.
  • Critical issue.

Test logs with handlers not commented out (https://github.com/FoundatioFx/Foundatio.Kafka/blob/main/src/Foundatio.Kafka/Messaging/KafkaMessageBus.cs#L167-L173)

I saw a similar issue and I wondered if I should not be using these handlers (https://github.com/ah-/rdkafka-dotnet/issues/61). Upon googling I came across this which I didn't know if it was similar (confluentinc/confluent-kafka-python#970)

See the following gist for all the unit logs (both passing and failing with variying levels of debug logs). GitHub wouldn't let me post it here as it said the commend was too long: https://gist.github.com/niemyjski/bac539002aa046738d6e029d0d1ba688

Broker logs from recent failure

kafka                   | [2022-06-08 20:11:53,310] INFO Creating topic test_1ab960c172c84b5caf31c969d87c5a4f with configuration {} and initial partition assignment Map(0 -> ArrayBuffer(1)) (kafka.zk.AdminZkClient)
kafka                   | [2022-06-08 20:11:53,319] INFO [Controller id=1] New topics: [Set(test_1ab960c172c84b5caf31c969d87c5a4f)], deleted topics: [Set()], new partition replica assignment [Set(TopicIdReplicaAssignment(test_1ab960c172c84b5caf31c969d87c5a4f,Some(CJssgWbAThWmPO9sa96mYg),Map(test_1ab960c172c84b5caf31c969d87c5a4f-0 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=))))] (kafka.controller.KafkaController)
kafka                   | [2022-06-08 20:11:53,319] INFO [Controller id=1] New partition creation callback for test_1ab960c172c84b5caf31c969d87c5a4f-0 (kafka.controller.KafkaController)
kafka                   | [2022-06-08 20:11:53,319] INFO [Controller id=1 epoch=1] Changed partition test_1ab960c172c84b5caf31c969d87c5a4f-0 state from NonExistentPartition to NewPartition with assigned replicas 1 (state.change.logger)
kafka                   | [2022-06-08 20:11:53,319] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers Set() for 0 partitions (state.change.logger)
kafka                   | [2022-06-08 20:11:53,320] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers Set() for 0 partitions (state.change.logger)
kafka                   | [2022-06-08 20:11:53,328] INFO [Controller id=1 epoch=1] Changed partition test_1ab960c172c84b5caf31c969d87c5a4f-0 from NewPartition to OnlinePartition with state LeaderAndIsr(leader=1, leaderEpoch=0, isr=List(1), leaderRecoveryState=RECOVERED, zkVersion=0) (state.change.logger)
kafka                   | [2022-06-08 20:11:53,328] INFO [Controller id=1 epoch=1] Sending LeaderAndIsr request to broker 1 with 1 become-leader and 0 become-follower partitions (state.change.logger)
kafka                   | [2022-06-08 20:11:53,328] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers Set(1) for 1 partitions (state.change.logger)
kafka                   | [2022-06-08 20:11:53,328] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers Set() for 0 partitions (state.change.logger)
kafka                   | [2022-06-08 20:11:53,328] INFO [Broker id=1] Handling LeaderAndIsr request correlationId 1317 from controller 1 for 1 partitions (state.change.logger)
kafka                   | [2022-06-08 20:11:53,329] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test_1ab960c172c84b5caf31c969d87c5a4f-0) (kafka.server.ReplicaFetcherManager)
kafka                   | [2022-06-08 20:11:53,329] INFO [Broker id=1] Stopped fetchers as part of LeaderAndIsr request correlationId 1317 from controller 1 epoch 1 as part of the become-leader transition for 1 partitions (state.change.logger)
kafka                   | [2022-06-08 20:11:53,330] INFO [LogLoader partition=test_1ab960c172c84b5caf31c969d87c5a4f-0, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
kafka                   | [2022-06-08 20:11:53,331] INFO Created log for partition test_1ab960c172c84b5caf31c969d87c5a4f-0 in /bitnami/kafka/data/test_1ab960c172c84b5caf31c969d87c5a4f-0 with properties {} (kafka.log.LogManager)
kafka                   | [2022-06-08 20:11:53,331] INFO [Partition test_1ab960c172c84b5caf31c969d87c5a4f-0 broker=1] No checkpointed highwatermark is found for partition test_1ab960c172c84b5caf31c969d87c5a4f-0 (kafka.cluster.Partition)
kafka                   | [2022-06-08 20:11:53,331] INFO [Partition test_1ab960c172c84b5caf31c969d87c5a4f-0 broker=1] Log loaded for partition test_1ab960c172c84b5caf31c969d87c5a4f-0 with initial high watermark 0 (kafka.cluster.Partition)
kafka                   | [2022-06-08 20:11:53,331] INFO [Broker id=1] Leader test_1ab960c172c84b5caf31c969d87c5a4f-0 starts at leader epoch 0 from offset 0 with high watermark 0 ISR [1] addingReplicas [] removingReplicas []. Previous leader epoch was -1. (state.change.logger)
kafka                   | [2022-06-08 20:11:53,348] INFO [Broker id=1] Finished LeaderAndIsr request in 20ms correlationId 1317 from controller 1 for 1 partitions (state.change.logger)
kafka                   | [2022-06-08 20:11:53,349] INFO [Broker id=1] Add 1 partitions and deleted 0 partitions from metadata cache in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 1318 (state.change.logger)
@niemyjski
Copy link
Author

@edenhill this has me at a loss as the logs don't get give much to go off of. From the logs, it seems like it's something to do with Group assignment may not be happening so the subscriber doesn't get notified, or an issue with topic metadata propagation? I'm still a huge kafka noob but any and all advice would be greatly appreciated.

niemyjski added a commit to FoundatioFx/Foundatio.Kafka that referenced this issue Jun 8, 2022
@niemyjski
Copy link
Author

One of the highly concurrent tests just failed with:

20:20.45457 E:KafkaMessageBus - Error committing message test_5b929ca603a24f2f954b91c796547eeb [[0]] @56: Broker: Unknown topic or partition
20:20.45457 E:KafkaMessageBus - Error committing message test_5b929ca603a24f2f954b91c796547eeb [[0]] @56: Broker: Unknown topic or partition
20:20.45457 E:KafkaMessageBus - Error committing message test_5b929ca603a24f2f954b91c796547eeb [[0]] @57: Broker: Unknown topic or partition
20:20.45457 E:KafkaMessageBus - Error committing message test_5b929ca603a24f2f954b91c796547eeb [[0]] @51: Broker: Unknown topic or partition
20:20.87403 E:KafkaMessageBus - Error consuming test_5b929ca603a24f2f954b91c796547eeb GroupId=44b59d379c0b4de28ee49f6d142eb630 message: Failed to query logical offset END: Broker: Unknown topic or partition
20:20.88929 E:KafkaMessageBus - Error consuming test_5b929ca603a24f2f954b91c796547eeb GroupId=4ac196fe056b4c49bedc0d24a774747f message: Failed to query logical offset END: Broker: Unknown topic or partition

@mhowlett
Copy link
Contributor

mhowlett commented Jun 9, 2022

Under the hood, each call to subscribe will ensure topic exists

metadata doesn't propagate synchronously. my first thought is what you're seeing may in some way be due to this (given the error in the previous comment). if you try setting some arbitrary delay after topic creation (maybe 2s or something), does this help?

@niemyjski
Copy link
Author

@mhowlett I added a second delay and it helped but caused some test failures. I added a two second delay via referenced commit and it took a lot of runs before it failed locally. Two seconds on the build server caused an immediate failure.

If this is the issue, can we get a wait option when we call the create topic apis? If there is any chance you could debug this locally that would be a massive help. I'd be willing to meet up and work with you as well.

@niemyjski
Copy link
Author

@mhowlett I still haven't been able to narrow this down even on 1.9.0

@mhowlett
Copy link
Contributor

is this a single broker cluster?

@niemyjski
Copy link
Author

Yes, full reproduction is available in the linked project above via simple clone and run. I did a single broker cluster for resource constraints for running on github actions and locally.

@niemyjski
Copy link
Author

niemyjski commented Jun 17, 2022

I made some changes to try out on mac arm (in latest commit), but it's way less stable than windows and seems like my broker becomes unresponsive locally even via kcat while running tests (might be related?).

@mhowlett
Copy link
Contributor

getting Broker: Unknown topic or partition after you've waited for topic creation on a single broker cluster seems very odd

@mhowlett
Copy link
Contributor

consistent with broker issue..

@niemyjski
Copy link
Author

niemyjski commented Jun 17, 2022

I agree, it's all very odd. I would think that single broker would be the most reliable for these tests as less communication and sync across (only single) brokers.

@niemyjski
Copy link
Author

@mhowlett Do you think 1.9.1 will help or what are the next steps?

@niemyjski
Copy link
Author

@mhowlett do you think any of my issues could be related to the issues you are running into with .NET 6?

@mhowlett
Copy link
Contributor

no

@mhowlett mhowlett added investigate further we want to further investigate to understand properly LOW and removed wait-info labels Oct 7, 2022
@mhowlett
Copy link
Contributor

mhowlett commented Oct 7, 2022

i'm triaging this as investigate further/low. we appreciate the testing and there is some chance this may be reflective of an actual issue.

@niemyjski
Copy link
Author

niemyjski commented Oct 28, 2022

Thanks, I just tried the latest rc and still have failures and tests seems slower to run locally.

https://github.com/FoundatioFx/Foundatio.Kafka/actions/runs/3348053132

@niemyjski
Copy link
Author

@mhowlett were you able to figure out if this is apart of a larger issue?

@niemyjski
Copy link
Author

niemyjski commented Feb 24, 2023

Latest 2.0.2 release seems better but still getting failures

43:46.62225 E:KafkaMessageBus - Error consuming test_ef3b4c5799cd43a5b174f2ed3aee1d43 GroupId=5e9f8df1a6604840bc4ed9642045cb43 message: Failed to query logical offset END: Broker: Unknown topic or partition
43:46.62443 E:KafkaMessageBus - Error consuming test_ef3b4c5799cd43a5b174f2ed3aee1d43 GroupId=0843dbcf82254112bf558efef3a773f4 message: Failed to query logical offset END: Broker: Unknown topic or partition
43:46.62447 E:KafkaMessageBus - Error consuming test_ef3b4c5799cd43a5b174f2ed3aee1d43 GroupId=51da660955af4f0aabf1a3838550b0b1 message: Failed to query logical offset END: Broker: Unknown topic or partition
43:46.62448 E:KafkaMessageBus - Error consuming test_ef3b4c5799cd43a5b174f2ed3aee1d43 GroupId=9dee71745289471aadb14bf5ccc874bc message: Failed to query logical offset END: Broker: Unknown topic or partition
43:46.62449 E:KafkaMessageBus - Error consuming test_ef3b4c5799cd43a5b174f2ed3aee1d43 GroupId=468a61b9b37641b092f930e799057293 message: Failed to query logical offset END: Broker: Unknown topic or partition
43:46.62450 E:KafkaMessageBus - Error consuming test_ef3b4c5799cd43a5b174f2ed3aee1d43 GroupId=d65ac7ebe61841558e02e80385800493 message: Failed to query logical offset END: Broker: Unknown topic or partition
43:46.92477 E:KafkaMessageBus - Error consuming test_ef3b4c5799cd43a5b174f2ed3aee1d43 GroupId=d65ac7ebe61841558e02e80385800493 message: Failed to query logical offset END: Broker: Unknown topic or partition

@niemyjski
Copy link
Author

niemyjski commented Nov 25, 2024

Just bumping to see if there is any idea what might be causing this. The following pr is tracking every new release and still has this issue: FoundatioFx/Foundatio.Kafka#8

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
investigate further we want to further investigate to understand properly LOW
Projects
None yet
Development

No branches or pull requests

2 participants