-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create Produce overload that allows providing a callback #49
Conversation
Update fork
Interesting. Could you share the program you're using to benchmark? I see what this is about, but wonder if there's a cleaner way of achieving the same performance. Have you tried TaskContinuationOptions.ExecuteSynchronously? |
Thank you. Putting Kafka aside, you can observe the performance hit if you time these two statements: 1.Parallel.For(0,1000000,=>Task.Factory.StartNew(()=>{})). Exactly as the above statements are, at least for me, the second version (with ContinueWith) is 3x (times) slower. As far as the Topic Produce is concerned, ContinueWith adds ~10% overhead (at least the way i am testing). Here is the code i am running:
The timings after running the loops are: The setup is:
Time1: uses ContinueWith Would you consider integrating the 3 change files from my change set? On a side note, is it expected that if i do not call Dispose on the various clients (producer/consumer) then undefined behavior should be expected? I noticed that my test applications crash around exit of Main. I think this is to do with me not Disposing the producer/consumer. Obviously it is on me to Dispose IDisposables, but i wonder if i need destructors too. |
These are timings if ContinueWith above does nothing i.e: Time1: |
Ok had an initial look and something like this definitely makes sense. This avoids a massive amount of garbage being created and CPU usage is substantially lower. I want to have a closer look at how the interface should look like and how to best implement this, probably over the weekend. I'm always trying to stay close to the C++ librdkafka and Java interfaces, see https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html. Are you interested in producing as fast as possible? I'm sure you can get more performance out in general, C# RdKafka is still using MSG_F_COPY and you can probably run multiple Producers (one per core?) for more throughput. And it should never crash, even if you don't call .Dispose(). If it does that's a bug, the only thing that should happen if you don't Dispose is that some resources don't get free'd up. Edit: Looks like most of the remaining CPU is spent in a really inefficient send() in Windows: https://files.gitter.im/edenhill/librdkafka/jKJL/Bildschirmfoto-2016-05-17-um-19.15.50.png |
Interesting, from the screenshot i recognise module ntdll.dll. When my test app crashes it says "Faulting module name: ntdll.dll". Investigating this. |
I could reproduce the crash using this code:
So the Main just runs the above method. It took many tries but eventually it crashed. I have a process crash dump. Looking into it now. Something to do with "ntdll!NtWaitForSingleObject" and "A heap has been corrupted." |
Interesting that i cannot reproduce (at least not yet) this after compiling librdkafka on my own... |
What do you think about the following hunches?
If the above are risks, then i think we do need to explicitly Dispose. What do you think? |
Think we should split the crash out from this PR, let's continue in https://github.com/ah-/rdkafka-dotnet/issues/50 |
{ | ||
// msg_opaque was set by Topic.Produce | ||
var gch = GCHandle.FromIntPtr(rkmessage._private); | ||
var deliveryCompletionSource = (TaskCompletionSource<DeliveryReport>) gch.Target; | ||
var deliverHandler = (IDeliveryHandler) gch.Target; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo, deliveryHandler.
So coming back to the original changes, had a look through and they look great to me. This basically corresponds to https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html and https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/Callback.html in the Java client. Could you address the few comments on your commits and write a short bit of documentation for the new Produce (mentioning it's meant for high-performance use-cases as it reduces the number of allocations/GC pressure and that users need to be careful what they do in the callback as it's running in an RdKafka-internal thread and will block other operations)? Then a git squash into one clean commit and I'm happy to merge! |
Did you have a chance to look updates and the comments i have added? I've switched the method order and made IDeliveryHandler second. Not ideal but i do not see a way to make it last without making it optional but really it is not optional. |
Any updates? |
Sorry Maxim, I want to merge this, just need to find some time to properly look at it. This weekend. |
Thank you Andreas! Please feel free to make the parameter order in Topic.Produce as you wish (i do not see a way to make IDeliveryHandler last without making it optional but really it is not optional!). I have also just added code comments. I have also squashed the commits. |
Add files via upload Update Topic.cs Update Producer.cs Update Topic.cs Not needed Make class private Update Producer.cs Use TaskCompletionSource methods directly Change Exception type using System Fix typo in local variable Switch Produce overload order and reorder parameters Update Topic.cs Add XML code comments Update IDeliveryHandler.cs Add Xml code comments.
Hi Andreas! Any update on this? Can you please merge? |
Hi Andreas, do you think you will ever integrate this? |
Sorry, yes. Thanks a lot for this! |
Thanks Andreas! |
I have noticed that using a Continuation on the task returned by Topic.Produce can affect performance of Parallel Produce loop by as much as 10%. This is at least what i observe in my testing.
This pull request adds an overload to Topic.Produce that allows passing an object that will handle the delivery report.