Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Produce overload that allows providing a callback #49

Merged
merged 2 commits into from
Oct 12, 2016
Merged

Create Produce overload that allows providing a callback #49

merged 2 commits into from
Oct 12, 2016

Conversation

MaximGurschi
Copy link
Contributor

@MaximGurschi MaximGurschi commented Jul 6, 2016

I have noticed that using a Continuation on the task returned by Topic.Produce can affect performance of Parallel Produce loop by as much as 10%. This is at least what i observe in my testing.

This pull request adds an overload to Topic.Produce that allows passing an object that will handle the delivery report.

@ah-
Copy link
Owner

ah- commented Jul 6, 2016

Interesting. Could you share the program you're using to benchmark?

I see what this is about, but wonder if there's a cleaner way of achieving the same performance.
I quite like having one clear way of doing things, and this introduces a second way of dealing with DeliveryReports for relatively little performance gain.

Have you tried TaskContinuationOptions.ExecuteSynchronously?

@MaximGurschi
Copy link
Contributor Author

MaximGurschi commented Jul 6, 2016

Thank you. Putting Kafka aside, you can observe the performance hit if you time these two statements:

1.Parallel.For(0,1000000,=>Task.Factory.StartNew(()=>{})).
2.Parallel.For(0,1000000,
=>Task.Factory.StartNew(()=>{}).ContinueWith(_=>{}).

Exactly as the above statements are, at least for me, the second version (with ContinueWith) is 3x (times) slower.

As far as the Topic Produce is concerned, ContinueWith adds ~10% overhead (at least the way i am testing).

Here is the code i am running:
` const int Count = 1000000;

    static TimeSpan Time1(RdKafka.Topic topic)
    {
        var stw = Stopwatch.StartNew();
        Parallel.For(0, Count, i => topic.Produce(null).ContinueWith(t => { if (i < 0) Console.WriteLine(i); }));
        stw.Stop();
        Console.WriteLine(Count / stw.Elapsed.TotalSeconds);
        return stw.Elapsed;
    }

    static TimeSpan Time2(RdKafka.Topic topic)
    {
        var stw = Stopwatch.StartNew();
        Parallel.For(0, Count, i => topic.Produce(null));
        stw.Stop();
        Console.WriteLine(Count / stw.Elapsed.TotalSeconds);
        return stw.Elapsed;
    }

    static void Runner()
    {
        var producer = new Producer(new Config
        {
            ["compression.codec"] = "snappy",
            ["heartbeat.interval.ms"] = "10000",
            ["session.timeout.ms"] = "30000",
            ["topic.metadata.refresh.sparse"] = "true"
        }, **_broker_**);
        var topic = producer.Topic("parallel", new TopicConfig
        {
            ["request.required.acks"] = "-1",
            ["message.timeout.ms"] = "60000"
        });
        const int tries = 10;
        Console.WriteLine("Time1:");
        var ts1 = new TimeSpan();
        for (var i = 0; i < tries; ++i) ts1 += Time1(topic);
        Console.WriteLine($"Done. {Count*tries/ts1.TotalSeconds}");
        Thread.Sleep(TimeSpan.FromSeconds(5));
        var ts2 = new TimeSpan();
        Console.WriteLine("Time2:");
        for (var i = 0; i < tries; ++i) ts2 += Time2(topic);
        Console.WriteLine($"Done. {Count*tries/ts2.TotalSeconds}");
        Console.ReadKey();
    }`

The timings after running the loops are:
Time1:
87412.0196929141
82073.6646760438
75927.120442318
67201.8216532199
100863.532034525
105072.619207067
80995.4050172798
106137.003958496
75855.9448273619
87404.5819474681
Done. 85107.0842610977
Time2:
111185.966445321
111068.967842413
108334.691579529
107016.174071396
108402.25675287
103306.993218144
109910.459026524
111403.222739269
112012.75278632
108479.686804391
Done. 109053.912174472

The setup is:

  1. Produce with and without ContinueWith are tested.
  2. Each is executed using 1000000 Parallel Produce requests.
  3. Each Parallel loop is repeated 10 times.

Time1: uses ContinueWith
Time2: does not
Done: indicates the average

Would you consider integrating the 3 change files from my change set?

On a side note, is it expected that if i do not call Dispose on the various clients (producer/consumer) then undefined behavior should be expected? I noticed that my test applications crash around exit of Main. I think this is to do with me not Disposing the producer/consumer. Obviously it is on me to Dispose IDisposables, but i wonder if i need destructors too.

@MaximGurschi
Copy link
Contributor Author

These are timings if ContinueWith above does nothing i.e:
Parallel.For(0, Count, i => topic.Produce(null).ContinueWith(_ => { }));

Time1:
97693.7008420562
90466.7504430994
72557.8904217098
83234.4771320848
108528.416961316
108230.105082557
76566.6350887998
79678.4629807766
104210.532787812
89602.6139304628
Done. 89359.699409484
Time2:
108399.693909616
102632.046031212
102919.150410235
109463.349579811
106232.517314626
110224.453361113
101380.660637343
105551.251703267
103427.362492727
121396.784114211
Done. 106894.510691409

@ah-
Copy link
Owner

ah- commented Jul 6, 2016

Ok had an initial look and something like this definitely makes sense. This avoids a massive amount of garbage being created and CPU usage is substantially lower.

I want to have a closer look at how the interface should look like and how to best implement this, probably over the weekend. I'm always trying to stay close to the C++ librdkafka and Java interfaces, see https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html.

Are you interested in producing as fast as possible? I'm sure you can get more performance out in general, C# RdKafka is still using MSG_F_COPY and you can probably run multiple Producers (one per core?) for more throughput.

And it should never crash, even if you don't call .Dispose(). If it does that's a bug, the only thing that should happen if you don't Dispose is that some resources don't get free'd up.

Edit: Looks like most of the remaining CPU is spent in a really inefficient send() in Windows: https://files.gitter.im/edenhill/librdkafka/jKJL/Bildschirmfoto-2016-05-17-um-19.15.50.png
Might be worth investigating that next.

@MaximGurschi
Copy link
Contributor Author

Interesting, from the screenshot i recognise module ntdll.dll. When my test app crashes it says "Faulting module name: ntdll.dll". Investigating this.

@MaximGurschi
Copy link
Contributor Author

MaximGurschi commented Jul 7, 2016

I could reproduce the crash using this code:

private static void Runner()
        {
            const int tries = 10;
            for (var i = 0; i < tries; ++i)
            {
                var producer = new Producer(new Config
                {
                    ["compression.codec"] = "snappy",
                    ["heartbeat.interval.ms"] = "10000",
                    ["session.timeout.ms"] = "30000",
                    ["topic.metadata.refresh.sparse"] = "true"
                }, broker);
                var topic = producer.Topic("Abc", new TopicConfig
                {
                    ["request.required.acks"] = "-1",
                    ["message.timeout.ms"] = "60000"
                });
                Parallel.For(0, Count, _ => topic.Produce(null));
                Console.WriteLine($"Done: {i}.");
            }
            Console.WriteLine("Done.");
        }

So the Main just runs the above method. It took many tries but eventually it crashed. I have a process crash dump. Looking into it now. Something to do with "ntdll!NtWaitForSingleObject" and "A heap has been corrupted."

@MaximGurschi
Copy link
Contributor Author

Interesting that i cannot reproduce (at least not yet) this after compiling librdkafka on my own...

@MaximGurschi
Copy link
Contributor Author

Apparently i cannot reproduce this with my own build because i was building librdkafka in Debug. After building in release i got the crash again. Here is the native stack:

image

@MaximGurschi
Copy link
Contributor Author

MaximGurschi commented Jul 7, 2016

What do you think about the following hunches?

  1. When SafeKafkaHandle.ReleaseHandle is called via the SafeHandle finalizer, these can be called in random order. Hence a producer can be destroyed before a topic. Basically, given that finalization order is non deterministic, is it possible that it violates proper librdkafka termination?
  2. Finalizers are allowed to run for no more than 2 seconds. If destroying any handle takes more, can this cause any issues?
  3. Is the producer going to wait for outstanding requests to be transmitted and handled? I am not sure Handle.Dispose gets called during finalizations (maybe it does not need to be).

If the above are risks, then i think we do need to explicitly Dispose. What do you think?

@ah-
Copy link
Owner

ah- commented Jul 7, 2016

Think we should split the crash out from this PR, let's continue in https://github.com/ah-/rdkafka-dotnet/issues/50

{
// msg_opaque was set by Topic.Produce
var gch = GCHandle.FromIntPtr(rkmessage._private);
var deliveryCompletionSource = (TaskCompletionSource<DeliveryReport>) gch.Target;
var deliverHandler = (IDeliveryHandler) gch.Target;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo, deliveryHandler.

@ah-
Copy link
Owner

ah- commented Jul 10, 2016

So coming back to the original changes, had a look through and they look great to me. This basically corresponds to https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html and https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/Callback.html in the Java client.

Could you address the few comments on your commits and write a short bit of documentation for the new Produce (mentioning it's meant for high-performance use-cases as it reduces the number of allocations/GC pressure and that users need to be careful what they do in the callback as it's running in an RdKafka-internal thread and will block other operations)?

Then a git squash into one clean commit and I'm happy to merge!

@MaximGurschi
Copy link
Contributor Author

Did you have a chance to look updates and the comments i have added? I've switched the method order and made IDeliveryHandler second. Not ideal but i do not see a way to make it last without making it optional but really it is not optional.

@MaximGurschi
Copy link
Contributor Author

Any updates?

@ah-
Copy link
Owner

ah- commented Aug 4, 2016

Sorry Maxim, I want to merge this, just need to find some time to properly look at it. This weekend.

@MaximGurschi
Copy link
Contributor Author

MaximGurschi commented Aug 5, 2016

Thank you Andreas! Please feel free to make the parameter order in Topic.Produce as you wish (i do not see a way to make IDeliveryHandler last without making it optional but really it is not optional!). I have also just added code comments.

I have also squashed the commits.

Add files via upload
Update Topic.cs
Update Producer.cs
Update Topic.cs
Not needed
Make class private
Update Producer.cs
Use TaskCompletionSource methods directly
Change Exception type
using System
Fix typo in local variable
Switch Produce overload order and reorder parameters
Update Topic.cs

Add XML code comments
Update IDeliveryHandler.cs

Add Xml code comments.
@MaximGurschi
Copy link
Contributor Author

Hi Andreas! Any update on this? Can you please merge?

@MaximGurschi
Copy link
Contributor Author

Hi Andreas, do you think you will ever integrate this?

@ah-
Copy link
Owner

ah- commented Oct 12, 2016

Sorry, yes. Thanks a lot for this!

@ah- ah- merged commit ef97de4 into ah-:master Oct 12, 2016
@MaximGurschi
Copy link
Contributor Author

Thanks Andreas!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants