Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventConsumer stops consuming after a certain period #77

Open
recao opened this issue Nov 11, 2016 · 7 comments
Open

EventConsumer stops consuming after a certain period #77

recao opened this issue Nov 11, 2016 · 7 comments

Comments

@recao
Copy link

recao commented Nov 11, 2016

Hi, we observed an issue that EventConsumer stops consuming after a random amount of time (e.g., 30 min, 2 hr, 5 hr, 1 day, etc).

We tried to imitate the EventConsumer example of RdKafka, initializing our EventConsumer with follow code:

        private void InitializeEventConsumer()
        {
            LogInfo("Start initialize EventConsumer...");
            var topicConfig = new TopicConfig();
            topicConfig["auto.offset.reset"] = "latest";
            var config = new Config()
            {
                GroupId = m_ConsumerName,
                DefaultTopicConfig = topicConfig
            };
            consumer = new EventConsumer(config, kafkaConnectionList);
            consumer.Subscribe(new List<string> { m_Topic });
            LogInfo("EventConsumer initialization finished. ");
        }

Then we use a new Thread to execute the following method, which simply starts consuming and commits every 6,000 messages:

        public void Process()
        {
            LogInfo("start Process()");
            consumer.Start();
            try
            {
                kafkaBatch = new List<RdKafka.Message>();
                consumer.OnMessage += (obj, msg) =>
                {
                    //System.Console.WriteLine(System.Text.Encoding.UTF8.GetString(msg.Payload));
                    kafkaBatch.Add(msg);
                    if (kafkaBatch.Count >= 6000)
                    {
                        //ProcessKafkaMessagesBatch(kafkaBatch);
                        consumer.Commit();
                        LogInfo("MEETS 6000: rdkafka client consumed " + kafkaBatch.Count + " messages and committed! ");
                        kafkaBatch = new List<RdKafka.Message>();
                    }
                };
                
            }
            catch (Exception ex)
            {
                m_Logger.Fatal("Process() Ended with Exception:" + ex);
            }
        }

We have 16 machines for the EventConsumer cluster, and they could consume messages fine at start (with incoming qps = 3,000 for each machine). But after a certain period, all 16 machines stop consuming at the same time with no error message.

Any idea for this issue? Thanks.

@ah-
Copy link
Owner

ah- commented Nov 11, 2016

A couple of things you could look at:

Is there anything suspicious in the broker/zookeeper logs?
Turn on config["debug"] = "cgrp", does this say anything interesting? Maybe the consumer group rebalances.
Does the cluster keep working in general? Can other clients receive messages?

@recao
Copy link
Author

recao commented Nov 14, 2016

Thanks : ) Could you please tell me how to enable logging of RdKafka? I turned on config["debug"] = "cgrp", but couldn't find where the log file is...

@ah-
Copy link
Owner

ah- commented Nov 14, 2016

It goes to stdout by default, you can override that via https://github.com/ah-/rdkafka-dotnet/blob/master/src/RdKafka/Config.cs#L66

@recao
Copy link
Author

recao commented Nov 15, 2016

Thanks! The logs of Brokers and ZK look good. And I captured the log when this "stop consuming issue" happens, in the following file:

1.txt

in which, the group seems did a series of [CGRPSTATE].. Any idea of why stop consuming?

@ah-
Copy link
Owner

ah- commented Nov 15, 2016

There's a couple of connection issues in that log:

Connect to ipv4#10.152.165.45:9092 failed
FAIL| 10.152.165.233:9092/177776105: 3 request(s) timed out: disconnect
Connect to ipv4#10.152.168.196:9092 failed

How many brokers do you have running? Is it possible that there are network issues?

@recao
Copy link
Author

recao commented Nov 15, 2016

We are running 18 Brokers. Quick question: is it possible to enable more detailed logging than config["debug"] = "cgrp"? Seems that current logging didn't tell much after consuming stops..

@ah-
Copy link
Owner

ah- commented Nov 15, 2016

Wow, that's a lot of brokers!

Yes, try one of these: generic, broker, topic, metadata, queue, msg, protocol, cgrp, security, fetch, feature, all. You can use multiple by saying config["debug"] = "cgrp,fetch,topic".

See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.

Also @edenhill suggests trying out RdKafka 0.9.2-ci-177 which has a couple of edge case fixes around cgroups. It's based on the final 0.9.2 librdkafka and should be fairly stable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants