Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Clients Compatibility to v0.10 #1

Open
wants to merge 159 commits into
base: trunk
Choose a base branch
from

Conversation

manderson202
Copy link

This PR pulled in all commits from upstream Kafka. Further details below in sections on sub-modules:

Clients

The necessary modifications to enable MapR Streams compatibility in the clients sub-module were made. All Kafka unit tests pass. When unit-tests were run with MapR (using the MapR Sandbox), there were 4 failures due to differences in the API's. These should probably be investigated to ensure compatible behavior between OSS Kafka and MapR. Additionally, there are a few tests using Producer or Consumer test constructors that should be retrofitted to test for MapR.

To run the unit tests to Streams, ensure there is a MapR Client with necessary privileges running on the machine and run the tests with:

./gradlew -DtestMapr=true :clients:clean :clients:test

The below chart shows additional changes needed in the Native Code and/or the com.mapr.streams:mapr-streams jar:

Issue kafka-clients (TODO's in PR) mapr-streams Native Code
1 org/apache/kafka/clients/consumer/ConsumerRecord#98

- Update assignments for added fields

com/mapr/streams/impl/listener/MarlinListener#197

- Use ctor w/ additional fields

Add support for additional fields: timestampType, checksum, serializedKeySize, serializedValueSize
2 com/mapr/streams/impl/listener/MarlinListener

com/mapr/streams/impl/listener/MarlinListenerImpl

- Update method signatures to match new Kafka10 Consumer

3 org/apache/kafka/clients/consumer/KafkaConsumer#1894

- Call paused() from consumerDriver instead of Exception

com/mapr/streams/impl/listener/MarlinListener

- Add support for new paused() method

Add support for paused(), which gets the set of partitions that were previously paused by a call to pause
4 Verify updated description of timeout parameter to poll() is accurate: "The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. Must not be negative."
5 org/apache/kafka/clients/consumer/KafkaConsumer#676

- Remove the log.warn

Include ConsumerInterceptor Functionality

- onConsume should be called during poll

- onCommit should be called during commitOffsetsAsync and commitOffsetsSync

6 Verify seekToBeginning and seekToEnd are compatible with documentation update: "If no partition is provided, seek to the first offset for all of the currently assigned partitions."
7 org/apache/kafka/clients/consumer/KafkaProducer#285

- Remove the UnsupportedOperationException

Include ProducerInterceptor Functionality

- onSend should be called during send

- onAcknowledgement should be called during InterceptorCallback#onCompletion

8 New Configurations were added and these should be investigated for compatibility.

Finally, the release notes for 10 should be reviewed to ensure all behavior is compatible.

Streams

Kafka Streams is a new feature of the API that provides a nice streaming workflow style interface for clients. It essentially replaces Samza in a light-weight library. The API leverages the lower level KafkaConsumer and KafkaProducer and should be compatible inasmuch as MapR is compatible with the OSS version, however there are some other blockers with functionality in the core streams code that prevent me from making progress on ensuring Streams is compatible. These are outlined below:

  • The KTable and KStream classes allow users to dynamically assign a Producer Partitioner (StreamPartitioner interface) similar to the Partitioner interface in clients. MapR uses StreamsPartitioner. Should investigate if there are discrepancies in how these interrelate or if they are compatible. If possible, it would be nice to use the same Producer Partitioner interface for both MapR and OSS.
  • Kafka Streams uses its own Consumer Partition Assignor (impl of PartitionAssignor) and configures it using the partition.assignment.strategy property. MapR ignores this property and so Streams will not work in the current iteration. MapR Streams will need to support this interface in clients.
  • The StreamPartitionAssignor that is used by Kafka Streams leverages a class called InternalTopicManager, which uses ZooKeeper for admin functionality. Will need to expose some MapR admin functions via a programmatic API to sub for these. The specific features used by InternalTopicManager are:
    • Getting brokers (returns a List<Integer>)
    • Getting partitions (returns a Map<Integer, List<Integer>>)
    • Create Topic
    • Delete Topic
    • Add Partitions

(FYI: we are a MapR customer currently using Streams. We will follow up with our contact at MapR to discuss further and I am happy to meet to discuss these changes in more detail)

gwenshap and others added 30 commits November 6, 2015 19:46
Author: Jason Gustafson <[email protected]>

Reviewers: Guozhang Wang

Closes apache#452 from hachikuji/KAFKA-2723
…back (0.9.0)

Author: Jason Gustafson <[email protected]>

Reviewers: Guozhang Wang

Closes apache#453 from hachikuji/hotfix-group-coordinator-0.9
…ink tasks.

Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Gwen Shapira

Closes apache#450 from ewencp/kafka-2480-unrecoverable-task-errors

(cherry picked from commit f4b87de)
Signed-off-by: Gwen Shapira <[email protected]>
…tatic zk/broker cluster

Author: Geoff Anderson <[email protected]>

Reviewers: Gwen Shapira

Closes apache#455 from granders/KAFKA-2773-0.9.0-vagrant-fix
Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Gwen Shapira

Closes apache#456 from ewencp/kafka-2774-rename-copycat

(cherry picked from commit f2031d4)
Signed-off-by: Gwen Shapira <[email protected]>
Close socket channel in finally block to avoid file descriptor leak when remote end closes the connection

Author: Rajini Sivaram <[email protected]>

Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]>

Closes apache#460 from rajinisivaram/KAFKA-2779

(cherry picked from commit efbebc6)
Signed-off-by: Jun Rao <[email protected]>
Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Jun Rao <[email protected]>

Closes apache#461 from ewencp/kafka-2781-no-signing-for-install

(cherry picked from commit a24f9a2)
Signed-off-by: Jun Rao <[email protected]>
Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Gwen Shapira

Closes apache#458 from ewencp/kafka-2776-json-converter-cache-config-fix

(cherry picked from commit e9fc7b8)
Signed-off-by: Gwen Shapira <[email protected]>
The bug causes only the first instance of group metadata in the topic to be written to the cache (because of the putIfNotExists in addGroup). Coordinator fail-over won't work properly unless the cache is loaded with the right metadata.

Author: Jason Gustafson <[email protected]>

Reviewers: Guozhang Wang

Closes apache#462 from hachikuji/hotfix-group-loading

(cherry picked from commit 2b04004)
Signed-off-by: Guozhang Wang <[email protected]>
Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Gwen Shapira

Closes apache#457 from ewencp/kafka-2775-exceptions-in-api-package

(cherry picked from commit bc76e67)
Signed-off-by: Gwen Shapira <[email protected]>
…ucers.

Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Gwen Shapira

Closes apache#459 from ewencp/kafka-2778-connect-source-zero-loss-settings

(cherry picked from commit 13ba57d)
Signed-off-by: Gwen Shapira <[email protected]>
…est thread.

Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Gwen Shapira

Closes apache#463 from ewencp/kafka-2782-fix-kafka-based-log-test-assertion

(cherry picked from commit 75f2b8c)
Signed-off-by: Gwen Shapira <[email protected]>
This PR adds failover to simple end to end mirror maker test

Marked as WIP for 2 reasons:
- We may want to add a couple more test cases where kafka is being used to store offsets
- There appears to be a test failure in the hard failover case

Author: Geoff Anderson <[email protected]>

Reviewers: Ewen Cheslack-Postava

Closes apache#427 from granders/KAFKA-2258-mirrormaker-test

(cherry picked from commit 7073fa7)
Signed-off-by: Gwen Shapira <[email protected]>
Author: Grant Henke <[email protected]>

Reviewers: Gwen Shapira

Closes apache#466 from granthenke/drop-contrib

(cherry picked from commit 69af573)
Signed-off-by: Gwen Shapira <[email protected]>
Author: Jason Gustafson <[email protected]>

Reviewers: Guozhang Wang

Closes apache#467 from hachikuji/KAFKA-2674

(cherry picked from commit 359be3a)
Signed-off-by: Guozhang Wang <[email protected]>
Author: Guozhang Wang <[email protected]>

Reviewers: Gwen Shapira

Closes apache#468 from guozhangwang/WikiUpdate

(cherry picked from commit a87b978)
Signed-off-by: Gwen Shapira <[email protected]>
Author: Ashish Singh <[email protected]>

Reviewers: Guozhang Wang

Closes apache#471 from SinghAsDev/ExceptionMessage

(cherry picked from commit 3ebb4ee)
Signed-off-by: Guozhang Wang <[email protected]>
Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Jun Rao <[email protected]>

Closes apache#473 from ewencp/kafka-2785-include-copycat-jars-in-release

(cherry picked from commit 79bdc17)
Signed-off-by: Jun Rao <[email protected]>
Author: Guozhang Wang <[email protected]>

Reviewers: Jun Rao

Closes apache#474 from guozhangwang/removeKStream
Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Gwen Shapira

Closes apache#475 from ewencp/kafka-2379-connect-docs

(cherry picked from commit 83eaf32)
Signed-off-by: Gwen Shapira <[email protected]>
…incl. session timeouts and corresponding fixes

-- Refactored multi-consumer integration group assignment validation tests for round-robin assignment
-- Added multi-consumer integration tests for session timeout expiration:
   1. When a consumer stops polling
   2. When a consumer calls close()

-- Fixes to issues found with session timeout expiration tests woth help from Jason Gustafson: Try to avoid  SendFailedException exception by cancelling the scheduled tasks and ensuring metadata update before sending group leave requests + send leave group request with retries.

Author: Anna Povzner <[email protected]>

Reviewers: Jason Gustafson, Guozhang Wang

Closes apache#472 from apovzner/cpkafka-81

(cherry picked from commit c9264b4)
Signed-off-by: Guozhang Wang <[email protected]>
Author: Jason Gustafson <[email protected]>

Reviewers: Guozhang Wang, Geoff Anderson

Closes apache#465 from hachikuji/KAFKA-2274
…rkerSinkTask has finished starting up.

Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Gwen Shapira

Closes apache#476 from ewencp/kafka-2786-on-partitions-assigned-only-after-start

(cherry picked from commit 590a461)
Signed-off-by: Gwen Shapira <[email protected]>
… s the .gitignore to generate the list of files to ignore

Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Gwen Shapira

Closes apache#485 from ewencp/kafka-2797-disable-rat-when-git-missing

(cherry picked from commit fe11488)
Signed-off-by: Gwen Shapira <[email protected]>
… closing the new consumer.

Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Onur Karaman, Gwen Shapira

Closes apache#480 from ewencp/kafka-2792-fix-blocking-consumer-close

(cherry picked from commit ae5a5d7)
Signed-off-by: Gwen Shapira <[email protected]>
…nd consumer settings so they do not conflict with the distributed herder's settings.

Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Gwen Shapira

Closes apache#486 from ewencp/kafka-2798-conflicting-herder-producer-consumer-configs

(cherry picked from commit 403d89e)
Signed-off-by: Gwen Shapira <[email protected]>
Author: Geoff Anderson <[email protected]>

Reviewers: Guozhang Wang

Closes apache#487 from granders/minor-update-test-readme

(cherry picked from commit 1d884d1)
Signed-off-by: Guozhang Wang <[email protected]>
Author: Jason Gustafson <[email protected]>

Reviewers: Onur Karaman, Guozhang Wang

Closes apache#488 from hachikuji/KAFKA-2795

(cherry picked from commit c455e60)
Signed-off-by: Guozhang Wang <[email protected]>
Author: Parth Brahmbhatt <[email protected]>

Reviewers: Jun Rao <[email protected]>

Closes apache#489 from Parth-Brahmbhatt/KAFKA-2788

(cherry picked from commit 60c0673)
Signed-off-by: Jun Rao <[email protected]>
@manderson202
Copy link
Author

FYI: this comparison shows exactly what was changed from the upstream Kafka and this version. Also, neglected to mention that to get the mapr tests to run I had to hack the mapr-streams jar to update the method signatures in the Listener classes that changed in the updated Consumer interface (re: issue 2 above).

mapr-devops pushed a commit that referenced this pull request Apr 26, 2019
…tup control (apache#6638)

This merge consists of two commits previously merged into later branches.
Author: Cyrus Vafadari <[email protected]>
Reviewers: Randall Hauch <[email protected]>

Commit #1:
MINOR: Add async and different sync startup modes in connect service test class

Allow Connect Service in system tests to start asynchronously.

Specifically, allow for three startup conditions:
1. No condition - start async and return immediately.
2. Semi-async - start immediately after plugins have been discovered successfully.
3. Sync - start returns after the worker has completed startup. This is the current mode, but its condition is improved by checking that the port of Connect's REST interface is open, rather than that a log line has appeared in the logs.

Author: Konstantine Karantasis <[email protected]>
Reviewers: Randall Hauch <[email protected]>, Ewen Cheslack-Postava <[email protected]>
Closes apache#4423 from kkonstantine/MINOR-Add-async-and-different-sync-startup-modes-in-ConnectService-test-class

Commit apache#2:
MINOR: Modify Connect service's startup timeout to be passed via the init (apache#5882)

Currently, the startup timeout is hardcoded to be 60 seconds in Connect's test service. Modifying it to be passable via init.

Author: Magesh Nandakumar <[email protected]>
Reviewers: Randall Hauch <[email protected]>, Jason Gustafson <[email protected]>
mapr-devops pushed a commit that referenced this pull request Oct 16, 2019
…pache#7305)

A partition log in initialized in following steps:

1. Fetch log config from ZK
2. Call LogManager.getOrCreateLog which creates the Log object, then
3. Registers the Log object

Step apache#3 enables Configuration update thread to deliver configuration
updates to the log. But if any update arrives between step #1 and apache#3
then that update is missed. It breaks following use case:

1. Create a topic with default configuration, and immediately after that
2. Update the configuration of topic

There is a race condition here and in random cases update made in
second step will get dropped.

This change fixes it by tracking updates arriving between step #1 and apache#3
Once a Partition is done initializing log, it checks if it has missed any
update. If yes, then the configuration is read from ZK again.

Added unit tests to make sure a dirty configuration is refreshed. Tested
on local cluster to make sure that topic configuration and updates are
handled correctly.

Reviewers: Jason Gustafson <[email protected]>
mapr-devops pushed a commit that referenced this pull request Oct 16, 2019
…pache#7305)

A partition log in initialized in following steps:

1. Fetch log config from ZK
2. Call LogManager.getOrCreateLog which creates the Log object, then
3. Registers the Log object

Step apache#3 enables Configuration update thread to deliver configuration
updates to the log. But if any update arrives between step #1 and apache#3
then that update is missed. It breaks following use case:

1. Create a topic with default configuration, and immediately after that
2. Update the configuration of topic

There is a race condition here and in random cases update made in
second step will get dropped.

This change fixes it by tracking updates arriving between step #1 and apache#3
Once a Partition is done initializing log, it checks if it has missed any
update. If yes, then the configuration is read from ZK again.

Added unit tests to make sure a dirty configuration is refreshed. Tested
on local cluster to make sure that topic configuration and updates are
handled correctly.

Reviewers: Jason Gustafson <[email protected]>
mapr-devops pushed a commit that referenced this pull request Feb 9, 2021
…to get end offsets and create topics (apache#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <[email protected]>
Reviewer: Konstantine Karantasis <[email protected]>
mapr-devops pushed a commit that referenced this pull request Feb 10, 2021
…to get end offsets and create topics (apache#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <[email protected]>
Reviewer: Konstantine Karantasis <[email protected]>
mapr-devops pushed a commit that referenced this pull request Feb 10, 2021
…to get end offsets and create topics (apache#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <[email protected]>
Reviewer: Konstantine Karantasis <[email protected]>
mapr-devops pushed a commit that referenced this pull request Feb 10, 2021
…to get end offsets and create topics (apache#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <[email protected]>
Reviewer: Konstantine Karantasis <[email protected]>
mapr-devops pushed a commit that referenced this pull request Feb 10, 2021
…to get end offsets and create topics (apache#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <[email protected]>
Reviewer: Konstantine Karantasis <[email protected]>
mapr-devops pushed a commit that referenced this pull request Feb 14, 2021
…to get end offsets and create topics (apache#9780)

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <[email protected]>
Reviewer: Konstantine Karantasis <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.