Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove raise and handle reconnection #1

Closed
wants to merge 19 commits into from

Conversation

IanVaughan
Copy link

@IanVaughan IanVaughan commented Aug 15, 2018

Initial POC of reconnection handling, attempting to solve kafkaex#298

Main change is to remove the raise if the broker cannot be found, but in doing that means the init/1 has to return something, and if the host is not found it could return {:error, reason}, but that causes the KafkaEx.ConsumerGroup.Manager supervisor that calls it issues, should it then error as well?
Instead, I split the elixir boot sequence from the kafka connection, as the elixir startup should be successful regardless of outside influences and then handle connecting.

TODO

@IanVaughan IanVaughan self-assigned this Aug 15, 2018
@IanVaughan IanVaughan force-pushed the remove-raise-and-handle-reconnection branch 2 times, most recently from abba1a7 to f7bf4e7 Compare August 16, 2018 14:01
@IanVaughan IanVaughan force-pushed the remove-raise-and-handle-reconnection branch from f7bf4e7 to 1cee8bf Compare August 17, 2018 09:23
Copy link

@qqdanielqq qqdanielqq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few comments and answer to the exceeding retry count

@IanVaughan
Copy link
Author

rebased and sorted commits, just need to confirm order and double check last "stash" commit.

@IanVaughan IanVaughan force-pushed the remove-raise-and-handle-reconnection branch 2 times, most recently from fe91475 to d71163f Compare August 17, 2018 10:04
Ian Vaughan added 7 commits August 17, 2018 11:06
By attempting to connect within the init/1 callback,
we have no room to reconnect, as the init raises, causing
the KafkaEx.ConsumerGroup.init/1 to fail when attempting
to supervise a worker, this then crashes the application.

Save the worker_opts for a new GenServer callback to use

No need to set member_id to nil within init/1
as it will already be nil

Handle create_worker and other phases within callbacks.
Each phase once successful sends a message to itself to move onto the next phase,
allowing attempts to be retried.
This removes the connection from the init/1 callback,
so it can retry.

As this module is supervised by the main OTP app,
if it returns stop too many times it will stop the app.
eg {:stop, reason} can not be used.

Could consider :ignore, but returns `{:ok, ?}`
which means start_worker cannot pattern match
Returns the value of the worker Server module
The same could be said for the following :timeout function that calls consume,
but as this matches first after connection once this completes it is known to be ready
as its set the current_offset
@IanVaughan IanVaughan force-pushed the remove-raise-and-handle-reconnection branch from d71163f to ec847d6 Compare August 17, 2018 10:07
Ian Vaughan added 11 commits August 17, 2018 11:33
But allow happy path to use faster @startup_delay
So the socket connection can be obtained if the broker is down during init but then comes back
During reconnection the default can be attempted

14:52:19.028 [error] GenServer #PID<0.196.0> terminating
** (FunctionClauseError) no function clause matching in KafkaEx.NetworkClient.send_sync_request/3
    (kafka_ex) lib/kafka_ex/network_client.ex:35: KafkaEx.NetworkClient.send_sync_request(nil, <<0, 11, 0, 0, 0, 0, 0, 8, 0, 8, 107, 97, 102, 107, 97, 95, 101, 120, 0, 18, 116, 114, 97, 99, 107, 105, 110, 103, 95, 108, 111, 99, 97, 116, 105, 111, 110, 115, 0, 0, 117, 48, 0, 0, 0, 8, 99, 111, 110, 115, ...>>, 35000)
    (kafka_ex) lib/kafka_ex/server_0_p_9_p_0.ex:189: KafkaEx.Server0P9P0.consumer_group_sync_request/4
    (kafka_ex) lib/kafka_ex/server_0_p_9_p_0.ex:116: KafkaEx.Server0P9P0.kafka_server_join_group/3
otherwise it updates the state with invalid metadata,
which then attempts to read etc, and crashes for the wrong reasons,
even though we have lost contact with the broker and want to exit anyway.
This ensures that stopped genserver call does not blow this process up as well

14:57:08.753 [error] GenServer #PID<0.182.0> terminating
** (stop) "Unable to fetch metadata from any brokers. Timeout is 3000."
Last message: {:metadata, ""}
@harlantwood
Copy link

This is really useful and exactly what I'm looking for!

Confused why this PR is not on the main kafka_ex repo? ( https://github.com/kafkaex/kafka_ex ) Is that on purpose?

@IanVaughan
Copy link
Author

Closing as its been too long and I do not have the time to finish now.

@IanVaughan IanVaughan closed this Sep 2, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants