forked from apache/kafka
-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrade Clients Compatibility to v0.10 #1
Open
manderson202
wants to merge
159
commits into
mapr:trunk
Choose a base branch
from
manderson202:0100-mapr
base: trunk
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Author: Jason Gustafson <[email protected]> Reviewers: Guozhang Wang Closes apache#452 from hachikuji/KAFKA-2723
…back (0.9.0) Author: Jason Gustafson <[email protected]> Reviewers: Guozhang Wang Closes apache#453 from hachikuji/hotfix-group-coordinator-0.9
…ink tasks. Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Gwen Shapira Closes apache#450 from ewencp/kafka-2480-unrecoverable-task-errors (cherry picked from commit f4b87de) Signed-off-by: Gwen Shapira <[email protected]>
…tatic zk/broker cluster Author: Geoff Anderson <[email protected]> Reviewers: Gwen Shapira Closes apache#455 from granders/KAFKA-2773-0.9.0-vagrant-fix
Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Gwen Shapira Closes apache#456 from ewencp/kafka-2774-rename-copycat (cherry picked from commit f2031d4) Signed-off-by: Gwen Shapira <[email protected]>
Close socket channel in finally block to avoid file descriptor leak when remote end closes the connection Author: Rajini Sivaram <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]> Closes apache#460 from rajinisivaram/KAFKA-2779 (cherry picked from commit efbebc6) Signed-off-by: Jun Rao <[email protected]>
Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Jun Rao <[email protected]> Closes apache#461 from ewencp/kafka-2781-no-signing-for-install (cherry picked from commit a24f9a2) Signed-off-by: Jun Rao <[email protected]>
Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Gwen Shapira Closes apache#458 from ewencp/kafka-2776-json-converter-cache-config-fix (cherry picked from commit e9fc7b8) Signed-off-by: Gwen Shapira <[email protected]>
The bug causes only the first instance of group metadata in the topic to be written to the cache (because of the putIfNotExists in addGroup). Coordinator fail-over won't work properly unless the cache is loaded with the right metadata. Author: Jason Gustafson <[email protected]> Reviewers: Guozhang Wang Closes apache#462 from hachikuji/hotfix-group-loading (cherry picked from commit 2b04004) Signed-off-by: Guozhang Wang <[email protected]>
Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Gwen Shapira Closes apache#457 from ewencp/kafka-2775-exceptions-in-api-package (cherry picked from commit bc76e67) Signed-off-by: Gwen Shapira <[email protected]>
…ucers. Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Gwen Shapira Closes apache#459 from ewencp/kafka-2778-connect-source-zero-loss-settings (cherry picked from commit 13ba57d) Signed-off-by: Gwen Shapira <[email protected]>
…est thread. Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Gwen Shapira Closes apache#463 from ewencp/kafka-2782-fix-kafka-based-log-test-assertion (cherry picked from commit 75f2b8c) Signed-off-by: Gwen Shapira <[email protected]>
This PR adds failover to simple end to end mirror maker test Marked as WIP for 2 reasons: - We may want to add a couple more test cases where kafka is being used to store offsets - There appears to be a test failure in the hard failover case Author: Geoff Anderson <[email protected]> Reviewers: Ewen Cheslack-Postava Closes apache#427 from granders/KAFKA-2258-mirrormaker-test (cherry picked from commit 7073fa7) Signed-off-by: Gwen Shapira <[email protected]>
Author: Grant Henke <[email protected]> Reviewers: Gwen Shapira Closes apache#466 from granthenke/drop-contrib (cherry picked from commit 69af573) Signed-off-by: Gwen Shapira <[email protected]>
Author: Jason Gustafson <[email protected]> Reviewers: Guozhang Wang Closes apache#467 from hachikuji/KAFKA-2674 (cherry picked from commit 359be3a) Signed-off-by: Guozhang Wang <[email protected]>
Author: Guozhang Wang <[email protected]> Reviewers: Gwen Shapira Closes apache#468 from guozhangwang/WikiUpdate (cherry picked from commit a87b978) Signed-off-by: Gwen Shapira <[email protected]>
Author: Ashish Singh <[email protected]> Reviewers: Guozhang Wang Closes apache#471 from SinghAsDev/ExceptionMessage (cherry picked from commit 3ebb4ee) Signed-off-by: Guozhang Wang <[email protected]>
Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Jun Rao <[email protected]> Closes apache#473 from ewencp/kafka-2785-include-copycat-jars-in-release (cherry picked from commit 79bdc17) Signed-off-by: Jun Rao <[email protected]>
Author: Guozhang Wang <[email protected]> Reviewers: Jun Rao Closes apache#474 from guozhangwang/removeKStream
Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Gwen Shapira Closes apache#475 from ewencp/kafka-2379-connect-docs (cherry picked from commit 83eaf32) Signed-off-by: Gwen Shapira <[email protected]>
…incl. session timeouts and corresponding fixes -- Refactored multi-consumer integration group assignment validation tests for round-robin assignment -- Added multi-consumer integration tests for session timeout expiration: 1. When a consumer stops polling 2. When a consumer calls close() -- Fixes to issues found with session timeout expiration tests woth help from Jason Gustafson: Try to avoid SendFailedException exception by cancelling the scheduled tasks and ensuring metadata update before sending group leave requests + send leave group request with retries. Author: Anna Povzner <[email protected]> Reviewers: Jason Gustafson, Guozhang Wang Closes apache#472 from apovzner/cpkafka-81 (cherry picked from commit c9264b4) Signed-off-by: Guozhang Wang <[email protected]>
Author: Jason Gustafson <[email protected]> Reviewers: Guozhang Wang, Geoff Anderson Closes apache#465 from hachikuji/KAFKA-2274
…rkerSinkTask has finished starting up. Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Gwen Shapira Closes apache#476 from ewencp/kafka-2786-on-partitions-assigned-only-after-start (cherry picked from commit 590a461) Signed-off-by: Gwen Shapira <[email protected]>
… s the .gitignore to generate the list of files to ignore Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Gwen Shapira Closes apache#485 from ewencp/kafka-2797-disable-rat-when-git-missing (cherry picked from commit fe11488) Signed-off-by: Gwen Shapira <[email protected]>
… closing the new consumer. Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Onur Karaman, Gwen Shapira Closes apache#480 from ewencp/kafka-2792-fix-blocking-consumer-close (cherry picked from commit ae5a5d7) Signed-off-by: Gwen Shapira <[email protected]>
…nd consumer settings so they do not conflict with the distributed herder's settings. Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Gwen Shapira Closes apache#486 from ewencp/kafka-2798-conflicting-herder-producer-consumer-configs (cherry picked from commit 403d89e) Signed-off-by: Gwen Shapira <[email protected]>
Author: Geoff Anderson <[email protected]> Reviewers: Guozhang Wang Closes apache#487 from granders/minor-update-test-readme (cherry picked from commit 1d884d1) Signed-off-by: Guozhang Wang <[email protected]>
Author: Jason Gustafson <[email protected]> Reviewers: Onur Karaman, Guozhang Wang Closes apache#488 from hachikuji/KAFKA-2795 (cherry picked from commit c455e60) Signed-off-by: Guozhang Wang <[email protected]>
Author: Parth Brahmbhatt <[email protected]> Reviewers: Jun Rao <[email protected]> Closes apache#489 from Parth-Brahmbhatt/KAFKA-2788 (cherry picked from commit 60c0673) Signed-off-by: Jun Rao <[email protected]>
- added conf param streams.record.strip.streampath. If set to true, the stream-path is stripped from consumer records.
…ient - renamed streams.rpc.timeout.ms to fs.mapr.rpc.timeout - added conf variable fs.mapr.hardmount
- increase the hardcoded timeout for new consumer to 5s
…ncy should not be added to Maven-style artifact
FYI: this comparison shows exactly what was changed from the upstream Kafka and this version. Also, neglected to mention that to get the mapr tests to run I had to hack the mapr-streams jar to update the method signatures in the Listener classes that changed in the updated |
mapr-devops
pushed a commit
that referenced
this pull request
Apr 26, 2019
…tup control (apache#6638) This merge consists of two commits previously merged into later branches. Author: Cyrus Vafadari <[email protected]> Reviewers: Randall Hauch <[email protected]> Commit #1: MINOR: Add async and different sync startup modes in connect service test class Allow Connect Service in system tests to start asynchronously. Specifically, allow for three startup conditions: 1. No condition - start async and return immediately. 2. Semi-async - start immediately after plugins have been discovered successfully. 3. Sync - start returns after the worker has completed startup. This is the current mode, but its condition is improved by checking that the port of Connect's REST interface is open, rather than that a log line has appeared in the logs. Author: Konstantine Karantasis <[email protected]> Reviewers: Randall Hauch <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes apache#4423 from kkonstantine/MINOR-Add-async-and-different-sync-startup-modes-in-ConnectService-test-class Commit apache#2: MINOR: Modify Connect service's startup timeout to be passed via the init (apache#5882) Currently, the startup timeout is hardcoded to be 60 seconds in Connect's test service. Modifying it to be passable via init. Author: Magesh Nandakumar <[email protected]> Reviewers: Randall Hauch <[email protected]>, Jason Gustafson <[email protected]>
mapr-devops
pushed a commit
that referenced
this pull request
Oct 16, 2019
…pache#7305) A partition log in initialized in following steps: 1. Fetch log config from ZK 2. Call LogManager.getOrCreateLog which creates the Log object, then 3. Registers the Log object Step apache#3 enables Configuration update thread to deliver configuration updates to the log. But if any update arrives between step #1 and apache#3 then that update is missed. It breaks following use case: 1. Create a topic with default configuration, and immediately after that 2. Update the configuration of topic There is a race condition here and in random cases update made in second step will get dropped. This change fixes it by tracking updates arriving between step #1 and apache#3 Once a Partition is done initializing log, it checks if it has missed any update. If yes, then the configuration is read from ZK again. Added unit tests to make sure a dirty configuration is refreshed. Tested on local cluster to make sure that topic configuration and updates are handled correctly. Reviewers: Jason Gustafson <[email protected]>
mapr-devops
pushed a commit
that referenced
this pull request
Oct 16, 2019
…pache#7305) A partition log in initialized in following steps: 1. Fetch log config from ZK 2. Call LogManager.getOrCreateLog which creates the Log object, then 3. Registers the Log object Step apache#3 enables Configuration update thread to deliver configuration updates to the log. But if any update arrives between step #1 and apache#3 then that update is missed. It breaks following use case: 1. Create a topic with default configuration, and immediately after that 2. Update the configuration of topic There is a race condition here and in random cases update made in second step will get dropped. This change fixes it by tracking updates arriving between step #1 and apache#3 Once a Partition is done initializing log, it checks if it has missed any update. If yes, then the configuration is read from ZK again. Added unit tests to make sure a dirty configuration is refreshed. Tested on local cluster to make sure that topic configuration and updates are handled correctly. Reviewers: Jason Gustafson <[email protected]>
mapr-devops
pushed a commit
that referenced
this pull request
Feb 9, 2021
…to get end offsets and create topics (apache#9780) The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic. However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end. Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity. Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance. The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes. These changes are implemented as follows: 1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets. 2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance. 3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed. 4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic. 5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.) 6. Change `MirrorMaker` similarly to `ConnectDistributed`. 7. Change existing unit tests to no longer use deprecated constructors. 8. Add unit tests for new functionality. Author: Randall Hauch <[email protected]> Reviewer: Konstantine Karantasis <[email protected]>
mapr-devops
pushed a commit
that referenced
this pull request
Feb 10, 2021
…to get end offsets and create topics (apache#9780) The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic. However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end. Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity. Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance. The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes. These changes are implemented as follows: 1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets. 2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance. 3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed. 4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic. 5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.) 6. Change `MirrorMaker` similarly to `ConnectDistributed`. 7. Change existing unit tests to no longer use deprecated constructors. 8. Add unit tests for new functionality. Author: Randall Hauch <[email protected]> Reviewer: Konstantine Karantasis <[email protected]>
mapr-devops
pushed a commit
that referenced
this pull request
Feb 10, 2021
…to get end offsets and create topics (apache#9780) The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic. However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end. Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity. Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance. The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes. These changes are implemented as follows: 1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets. 2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance. 3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed. 4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic. 5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.) 6. Change `MirrorMaker` similarly to `ConnectDistributed`. 7. Change existing unit tests to no longer use deprecated constructors. 8. Add unit tests for new functionality. Author: Randall Hauch <[email protected]> Reviewer: Konstantine Karantasis <[email protected]>
mapr-devops
pushed a commit
that referenced
this pull request
Feb 10, 2021
…to get end offsets and create topics (apache#9780) The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic. However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end. Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity. Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance. The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes. These changes are implemented as follows: 1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets. 2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance. 3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed. 4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic. 5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.) 6. Change `MirrorMaker` similarly to `ConnectDistributed`. 7. Change existing unit tests to no longer use deprecated constructors. 8. Add unit tests for new functionality. Author: Randall Hauch <[email protected]> Reviewer: Konstantine Karantasis <[email protected]>
mapr-devops
pushed a commit
that referenced
this pull request
Feb 10, 2021
…to get end offsets and create topics (apache#9780) The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic. However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end. Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity. Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance. The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes. These changes are implemented as follows: 1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets. 2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance. 3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed. 4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic. 5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.) 6. Change `MirrorMaker` similarly to `ConnectDistributed`. 7. Change existing unit tests to no longer use deprecated constructors. 8. Add unit tests for new functionality. Author: Randall Hauch <[email protected]> Reviewer: Konstantine Karantasis <[email protected]>
mapr-devops
pushed a commit
that referenced
this pull request
Feb 14, 2021
…to get end offsets and create topics (apache#9780) The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic. However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end. Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity. Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance. The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes. These changes are implemented as follows: 1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets. 2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance. 3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed. 4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic. 5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.) 6. Change `MirrorMaker` similarly to `ConnectDistributed`. 7. Change existing unit tests to no longer use deprecated constructors. 8. Add unit tests for new functionality. Author: Randall Hauch <[email protected]> Reviewer: Konstantine Karantasis <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR pulled in all commits from upstream Kafka. Further details below in sections on sub-modules:
Clients
The necessary modifications to enable MapR Streams compatibility in the
clients
sub-module were made. All Kafka unit tests pass. When unit-tests were run with MapR (using the MapR Sandbox), there were 4 failures due to differences in the API's. These should probably be investigated to ensure compatible behavior between OSS Kafka and MapR. Additionally, there are a few tests using Producer or Consumer test constructors that should be retrofitted to test for MapR.To run the unit tests to Streams, ensure there is a MapR Client with necessary privileges running on the machine and run the tests with:
The below chart shows additional changes needed in the Native Code and/or the
com.mapr.streams:mapr-streams
jar:- Update assignments for added fields
- Use ctor w/ additional fields
com/mapr/streams/impl/listener/MarlinListenerImpl
- Update method signatures to match new Kafka10 Consumer
- Call
paused()
from consumerDriver instead of Exception- Add support for new
paused()
methodpaused()
, which gets the set of partitions that were previously paused by a call to pausetimeout
parameter topoll()
is accurate: "The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. Must not be negative."- Remove the log.warn
- onConsume should be called during
poll
- onCommit should be called during commitOffsetsAsync and commitOffsetsSync
seekToBeginning
andseekToEnd
are compatible with documentation update: "If no partition is provided, seek to the first offset for all of the currently assigned partitions."- Remove the UnsupportedOperationException
- onSend should be called during
send
- onAcknowledgement should be called during
InterceptorCallback#onCompletion
Finally, the release notes for 10 should be reviewed to ensure all behavior is compatible.
Streams
Kafka Streams is a new feature of the API that provides a nice streaming workflow style interface for clients. It essentially replaces Samza in a light-weight library. The API leverages the lower level KafkaConsumer and KafkaProducer and should be compatible inasmuch as MapR is compatible with the OSS version, however there are some other blockers with functionality in the core streams code that prevent me from making progress on ensuring Streams is compatible. These are outlined below:
KTable
andKStream
classes allow users to dynamically assign a Producer Partitioner (StreamPartitioner
interface) similar to thePartitioner
interface in clients. MapR usesStreamsPartitioner
. Should investigate if there are discrepancies in how these interrelate or if they are compatible. If possible, it would be nice to use the same ProducerPartitioner
interface for both MapR and OSS.PartitionAssignor
) and configures it using thepartition.assignment.strategy
property. MapR ignores this property and so Streams will not work in the current iteration. MapR Streams will need to support this interface in clients.StreamPartitionAssignor
that is used by Kafka Streams leverages a class calledInternalTopicManager
, which uses ZooKeeper for admin functionality. Will need to expose some MapR admin functions via a programmatic API to sub for these. The specific features used byInternalTopicManager
are:List<Integer>
)Map<Integer, List<Integer>>
)(FYI: we are a MapR customer currently using Streams. We will follow up with our contact at MapR to discuss further and I am happy to meet to discuss these changes in more detail)