-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove raise and handle reconnection #1
Conversation
abba1a7
to
f7bf4e7
Compare
f7bf4e7
to
1cee8bf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few comments and answer to the exceeding retry count
rebased and sorted commits, just need to confirm order and double check last "stash" commit. |
fe91475
to
d71163f
Compare
By attempting to connect within the init/1 callback, we have no room to reconnect, as the init raises, causing the KafkaEx.ConsumerGroup.init/1 to fail when attempting to supervise a worker, this then crashes the application. Save the worker_opts for a new GenServer callback to use No need to set member_id to nil within init/1 as it will already be nil Handle create_worker and other phases within callbacks. Each phase once successful sends a message to itself to move onto the next phase, allowing attempts to be retried.
This removes the connection from the init/1 callback, so it can retry. As this module is supervised by the main OTP app, if it returns stop too many times it will stop the app. eg {:stop, reason} can not be used. Could consider :ignore, but returns `{:ok, ?}` which means start_worker cannot pattern match
Returns the value of the worker Server module
The same could be said for the following :timeout function that calls consume, but as this matches first after connection once this completes it is known to be ready as its set the current_offset
d71163f
to
ec847d6
Compare
But allow happy path to use faster @startup_delay
So the socket connection can be obtained if the broker is down during init but then comes back
During reconnection the default can be attempted 14:52:19.028 [error] GenServer #PID<0.196.0> terminating ** (FunctionClauseError) no function clause matching in KafkaEx.NetworkClient.send_sync_request/3 (kafka_ex) lib/kafka_ex/network_client.ex:35: KafkaEx.NetworkClient.send_sync_request(nil, <<0, 11, 0, 0, 0, 0, 0, 8, 0, 8, 107, 97, 102, 107, 97, 95, 101, 120, 0, 18, 116, 114, 97, 99, 107, 105, 110, 103, 95, 108, 111, 99, 97, 116, 105, 111, 110, 115, 0, 0, 117, 48, 0, 0, 0, 8, 99, 111, 110, 115, ...>>, 35000) (kafka_ex) lib/kafka_ex/server_0_p_9_p_0.ex:189: KafkaEx.Server0P9P0.consumer_group_sync_request/4 (kafka_ex) lib/kafka_ex/server_0_p_9_p_0.ex:116: KafkaEx.Server0P9P0.kafka_server_join_group/3
otherwise it updates the state with invalid metadata, which then attempts to read etc, and crashes for the wrong reasons, even though we have lost contact with the broker and want to exit anyway.
This ensures that stopped genserver call does not blow this process up as well 14:57:08.753 [error] GenServer #PID<0.182.0> terminating ** (stop) "Unable to fetch metadata from any brokers. Timeout is 3000." Last message: {:metadata, ""}
This is really useful and exactly what I'm looking for! Confused why this PR is not on the main kafka_ex repo? ( https://github.com/kafkaex/kafka_ex ) Is that on purpose? |
Closing as its been too long and I do not have the time to finish now. |
Initial POC of reconnection handling, attempting to solve kafkaex#298
Main change is to remove the
raise
if the broker cannot be found, but in doing that means theinit/1
has to return something, and if the host is not found it could return{:error, reason}
, but that causes theKafkaEx.ConsumerGroup.Manager
supervisor that calls it issues, should it then error as well?Instead, I split the elixir boot sequence from the kafka connection, as the elixir startup should be successful regardless of outside influences and then handle connecting.
TODO