Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

producer.Dispose is stuck #88

Open
madipor opened this issue Dec 1, 2016 · 8 comments
Open

producer.Dispose is stuck #88

madipor opened this issue Dec 1, 2016 · 8 comments

Comments

@madipor
Copy link

madipor commented Dec 1, 2016

Hi,
I try to use the SimpleProducer example.
After the produce action, it's look like the producer.Dispose enters an infinite loop.

public async Task<bool> ProduceMessage(string message)
        {
            try
            {
                using (Producer producer = new Producer(_kafkaOptions.Uri))
                {
                    using (Topic topic = producer.Topic(_kafkaOptions.Topic))
                    {
                        byte[] data = Encoding.UTF8.GetBytes(message);
                        await topic.Produce(data);
                        return true;
                    }
                }
            }
            catch (RdKafkaException ex)
            {
                return false;
            }
        }

Am I doing something wrong?
thanks!

@ah-
Copy link
Owner

ah- commented Dec 1, 2016

Hi, is the message delivered successfully? Producer.Dispose will block until all messages in flight have either been delivered or timed out.
Try enabling config["debug"] = "all" to see what's going on.

Also creating a Producer is expensive, so it's a good idea to keep it alive between calls to ProduceMessage.

@madipor
Copy link
Author

madipor commented Dec 3, 2016

Hi, the message delivered successfully, but still the problem persists.
If the message delivered successfully (only one message), right afterwards the dispose is called?
Can it help if I change the message timeout?
thanks

@treziac
Copy link
Contributor

treziac commented Dec 5, 2016

Which version are you using?
In 0.9.1, there are some bugs related to conf, eg if you have a group.id in your producer config, dispose will hang. Be sure to not use any consumer config.
There were also several bugs whch are corrected in latest librdkafka 0.9.2. You can try the preview of rdkafka-dotnet to bind to it

Changing timeout should not change anything giving your code
(And yes, try keeping a producer/topic open only once, recreating it every time will very highly impact your throughput as you won't used internal queue for batching)

@amccague
Copy link

amccague commented Dec 8, 2016

Observing the same issue with 0.9.2-ci-181 (prerelease) though from an integration test, when using await.
.Wait() or .Result works fine. Could be a synchronization context issue.

Edit: Actually seeing the same with 0.9.1 as well.

@treziac
Copy link
Contributor

treziac commented Dec 9, 2016

@amccague What is your exact code?

@ErikSchierboom
Copy link
Contributor

I have the same problem. If there are messages in the queue and the broker then becomes unavailable, the Dispose method enters an unending loop due to it waiting for the queue to become clear, which it never will be due to the broker being unavailable.

The problematic code is here:

if (disposing)
{
    // Wait until all outstanding sends have completed
    while (OutQueueLength > 0)
    {
        handle.Poll((IntPtr) 100);
    }

    handle.Dispose();
}

One solution would be to add a timeout or max retry size, something to ensure that the loop at some point terminates.

@tomasdeml
Copy link
Contributor

May be related to #58.

@zyzil
Copy link

zyzil commented Jan 5, 2017

I think @amccague is on to something. I ran into this issue as well and found that the problem only happens when I use async/await.

I've created integration tests that demonstrate this using the v0.92-ci-186 NuGet package. The asynchronous version hangs forever in producer.Dispose. The synchronous version using .Result correctly finishes the unit test with an exception due to the cluster being down and no brokers available.

using Microsoft.VisualStudio.TestTools.UnitTesting;
using RdKafka;
using System.Configuration;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;

namespace RdKafkaDotNet
{
    [TestClass]
    public class RdKafkaTests
    {
        private const string TopicName = "test";
        private const string TestMessage = "TestMessage";

        [TestMethod]
        public async Task AsyncTest()
        {
            using (var producer = CreateProducer())
            {
                using (var topic = CreateTopic(producer, TopicName))
                {
                    var result = await topic.Produce(Encoding.UTF8.GetBytes(TestMessage));
                    Assert.IsNotNull(result);
                    Assert.IsTrue(result.Offset > 0);
                }
            }
        }

        [TestMethod]
        public void SyncTest()
        {
            using (var producer = CreateProducer())
            {
                using (var topic = CreateTopic(producer, TopicName))
                {
                    var result = topic.Produce(Encoding.UTF8.GetBytes(TestMessage)).Result;
                    Assert.IsNotNull(result);
                    Assert.IsTrue(result.Offset > 0);
                }
            }
        }

        private static Producer CreateProducer()
        {
            var config = new Config
            {
                ["debug"] = "all",
                Logger = (handle, level, fac, buf) => Debug.WriteLine($"[Logger]: {handle}, {level}, {fac}, {buf}")
            };

            var producer = new Producer(config, ConfigurationManager.AppSettings["kafka.brokerurl"]);
            producer.OnStatistics += (sender, s) => Debug.WriteLine("[OnStatistics]: " + s);
            producer.OnError += (sender, args) => Debug.WriteLine("[OnError]: " + args.Reason + " (" + args.ErrorCode + ")");

            return producer;
        }

        private static Topic CreateTopic(Producer producer, string topic)
        {
            var config = new TopicConfig
            {
                ["message.timeout.ms"] = "15000"
            };

            return producer.Topic(topic, config);
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants