-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
producer.Dispose is stuck #88
Comments
Hi, is the message delivered successfully? Producer.Dispose will block until all messages in flight have either been delivered or timed out. Also creating a Producer is expensive, so it's a good idea to keep it alive between calls to ProduceMessage. |
Hi, the message delivered successfully, but still the problem persists. |
Which version are you using? Changing timeout should not change anything giving your code |
Observing the same issue with 0.9.2-ci-181 (prerelease) though from an integration test, when using Edit: Actually seeing the same with 0.9.1 as well. |
@amccague What is your exact code? |
I have the same problem. If there are messages in the queue and the broker then becomes unavailable, the The problematic code is here: if (disposing)
{
// Wait until all outstanding sends have completed
while (OutQueueLength > 0)
{
handle.Poll((IntPtr) 100);
}
handle.Dispose();
} One solution would be to add a timeout or max retry size, something to ensure that the loop at some point terminates. |
May be related to #58. |
I think @amccague is on to something. I ran into this issue as well and found that the problem only happens when I use async/await. I've created integration tests that demonstrate this using the v0.92-ci-186 NuGet package. The asynchronous version hangs forever in producer.Dispose. The synchronous version using .Result correctly finishes the unit test with an exception due to the cluster being down and no brokers available. using Microsoft.VisualStudio.TestTools.UnitTesting;
using RdKafka;
using System.Configuration;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
namespace RdKafkaDotNet
{
[TestClass]
public class RdKafkaTests
{
private const string TopicName = "test";
private const string TestMessage = "TestMessage";
[TestMethod]
public async Task AsyncTest()
{
using (var producer = CreateProducer())
{
using (var topic = CreateTopic(producer, TopicName))
{
var result = await topic.Produce(Encoding.UTF8.GetBytes(TestMessage));
Assert.IsNotNull(result);
Assert.IsTrue(result.Offset > 0);
}
}
}
[TestMethod]
public void SyncTest()
{
using (var producer = CreateProducer())
{
using (var topic = CreateTopic(producer, TopicName))
{
var result = topic.Produce(Encoding.UTF8.GetBytes(TestMessage)).Result;
Assert.IsNotNull(result);
Assert.IsTrue(result.Offset > 0);
}
}
}
private static Producer CreateProducer()
{
var config = new Config
{
["debug"] = "all",
Logger = (handle, level, fac, buf) => Debug.WriteLine($"[Logger]: {handle}, {level}, {fac}, {buf}")
};
var producer = new Producer(config, ConfigurationManager.AppSettings["kafka.brokerurl"]);
producer.OnStatistics += (sender, s) => Debug.WriteLine("[OnStatistics]: " + s);
producer.OnError += (sender, args) => Debug.WriteLine("[OnError]: " + args.Reason + " (" + args.ErrorCode + ")");
return producer;
}
private static Topic CreateTopic(Producer producer, string topic)
{
var config = new TopicConfig
{
["message.timeout.ms"] = "15000"
};
return producer.Topic(topic, config);
}
}
} |
Hi,
I try to use the SimpleProducer example.
After the produce action, it's look like the producer.Dispose enters an infinite loop.
Am I doing something wrong?
thanks!
The text was updated successfully, but these errors were encountered: