-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-17747: [2/N] Add compute topic and group hash (wip) #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
Signed-off-by: PoAn Yang <[email protected]>
WalkthroughThe changes introduce deterministic hashing methods for topics and groups in the Changes
Sequence Diagram(s)sequenceDiagram
participant Test as GroupTest
participant Group as Group (static methods)
participant Guava as Guava Hashing
participant Metadata as MetadataImage/ClusterImage
Test->>Group: computeTopicHash(topicImage, clusterImage)
Group->>Metadata: Retrieve topic and partition metadata
Group->>Guava: Hash topic metadata, partition info, rack assignments
Guava-->>Group: Return computed hash
Group-->>Test: Return topic hash
Test->>Group: computeGroupHash(topicHashes)
Group->>Guava: Combine topic hashes in order
Guava-->>Group: Return combined group hash
Group-->>Test: Return group hash
Poem
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (6)
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java (6)
41-41
: Consider using a fixed UUID instead of random for deterministic testsUsing
Uuid.randomUuid()
as a constant might introduce non-deterministic behavior in tests across different runs. For unit tests, it's generally better to use a fixed, predefined UUID to ensure consistent and reproducible test results.- private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); + private static final Uuid FOO_TOPIC_ID = Uuid.fromString("123e4567-e89b-12d3-a456-426614174000");
45-48
: Missing documentation for MetadataImageBuilderThe
MetadataImageBuilder
appears to be a custom test helper class, but there's no documentation explaining its purpose or how it works. Consider adding a class-level comment or a reference to where this builder is defined to improve code maintainability.
56-56
: Add documentation for the magic byteThe purpose of the magic byte used in the hash function is not explained. Consider adding a comment explaining what this value represents and why it's used in the hash computation.
- .putByte((byte) 0) // magic byte + .putByte((byte) 0) // magic byte - version identifier for the hash algorithm
50-65
: Document hash function sensitivity in test methodThe test verifies the topic hash function but doesn't clearly document what properties are being validated. Consider adding a descriptive comment explaining that the test is verifying the hash function's correctness and its sensitivity to topic metadata.
@Test void testComputeTopicHash() { + // Verify that topic hash computation is correct and sensitive to topic ID, name, + // partition count, partition order, and rack assignments long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());
162-174
: Document group hash function purposeThe test for
computeGroupHash
could benefit from a comment explaining the purpose of this function and why it's important for it to be order-sensitive with respect to the input mappings.@Test void testComputeGroupHash() { + // Verify that group hash computation correctly combines topic offsets in a deterministic order long result = Group.computeGroupHash(Map.of( BAR_TOPIC_NAME, 123L, FOO_TOPIC_NAME, 456L ));
176-188
: Verify order sensitivity explicitly in test name and commentsThe test confirms that
computeGroupHash
is order-sensitive, which is an important property. Consider making this more explicit in the test name and comments for better clarity.- void testComputeGroupHashWithDifferentOrder() { + void testComputeGroupHashIsOrderSensitive() { + // Verify that the group hash result changes when the order of hash inputs changes, + // confirming that the implementation is correctly order-sensitive long result = Group.computeGroupHash(Map.of( BAR_TOPIC_NAME, 123L, FOO_TOPIC_NAME, 456L ));
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
build.gradle
(1 hunks)checkstyle/import-control-group-coordinator.xml
(1 hunks)gradle/dependencies.gradle
(2 hunks)group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
(2 hunks)group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java (2)
metadata/src/main/java/org/apache/kafka/image/ClusterImage.java (1)
ClusterImage
(36-108)metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java (1)
BrokerRegistration
(44-398)
🔇 Additional comments (9)
gradle/dependencies.gradle (2)
64-64
: Guava dependency version looks appropriate.Adding Guava 33.4.0-jre for the hashing functionality is a good choice. This is a recent stable version with good performance and reliability.
151-151
: Maven coordinate for Guava properly defined.The dependency definition follows the project's standard pattern for library dependencies.
build.gradle (1)
1423-1423
: Guava dependency correctly added to group-coordinator module.The implementation dependency is properly added to the group-coordinator module to support the new hashing functionality.
checkstyle/import-control-group-coordinator.xml (1)
80-80
: Appropriate import allowance for Guava hash package.The import control rule for allowing
com.google.common.hash
is properly added, which is needed for the new hashing functionality in the Group interface.group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java (3)
22-36
: Required imports properly added.The imports for Kafka metadata classes and Guava hash utilities are correctly added to support the new hashing functionality.
223-238
: Well-implemented group hash computation method.The
computeGroupHash
method correctly creates a deterministic hash by:
- Sorting topic entries by name
- Converting long hash values to HashCode objects
- Using Guava's
Hashing.combineOrdered
to ensure order-sensitivityThis approach ensures consistent results regardless of input map iteration order.
239-267
:✅ Verification successful
Comprehensive topic hash computation with appropriate handling of metadata.
The
computeTopicHash
method creates a deterministic Murmur3 hash that includes:
- Topic ID and name
- Number of partitions
- Partition IDs (sorted)
- Rack information (sorted)
The implementation correctly handles null values, empty optionals, and uses proper sorting to ensure deterministic results.
One recommendation: consider adding a unit test that verifies hash stability across JVM restarts, as this will be important for consistent behavior in a distributed environment.
🏁 Script executed:
#!/bin/bash # Check if there are comprehensive tests for the hash functions rg -A 5 "test.*Hash" group-coordinator/src/test/java/Length of output: 14144
Implementation and tests are comprehensive
The
computeTopicHash
method correctly hashes topic ID, name, partition count, sorted partition IDs, and sorted rack lists. Stability and determinism are already verified in group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java (varying magic byte, partition order, rack order, and field differences). No additional tests are needed.group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java (2)
119-131
: Well-structured parameterized testThe parameterized test design effectively tests different scenarios that should result in different hash values. This is a good practice as it systematically verifies the hash function's sensitivity to changes in various fields.
1-189
:✅ Verification successful
Request verification of hash algorithm implementation
The tests verify the behavior of
computeTopicHash
andcomputeGroupHash
methods, but without seeing the implementation in theGroup
class, it's hard to confirm if these tests align with the actual implementation. Consider checking the implementation details to ensure complete coverage.
🏁 Script executed:
#!/bin/bash # Check the implementation of the hash methods in the Group class fd -t f "Group.java" --exec cat {}Length of output: 91794
🏁 Script executed:
#!/bin/bash # Find the implementations of computeTopicHash and computeGroupHash rg -n "computeTopicHash" -t java rg -n "computeGroupHash" -t javaLength of output: 1847
🏁 Script executed:
#!/bin/bash # Display computeGroupHash implementation sed -n '220,260p' group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java # Display computeTopicHash implementation sed -n '260,320p' group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.javaLength of output: 2197
computeTopicHash and computeGroupHash tests aligned with implementation
I’ve reviewed both methods in Group.java and confirmed the tests cover:
- The magic byte and all topic fields (ID, name, partition count)
- Partition ordering via sorted entries
- Rack ordering via sorted rack list
- Group hash ordering via sorted topic names
No further changes are needed.
@coderabbitai full review |
✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java (4)
41-41
: Consider using a fixed UUID for deterministic tests.Using
Uuid.randomUuid()
could potentially make tests non-deterministic across test runs. While it doesn't affect the current tests since you're comparing computations with the same UUID within each test, it's generally better practice to use fixed values in unit tests.- private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); + private static final Uuid FOO_TOPIC_ID = Uuid.fromString("123e4567-e89b-12d3-a456-426614174000");
56-56
: Extract magic byte as a constant.The magic byte
(byte) 0
appears to be a significant value in the hash computation. Consider extracting it as a constant for better maintainability and to avoid magic numbers.+ private static final byte TOPIC_HASH_MAGIC_BYTE = 0; // Then in the test method: - .putByte((byte) 0) // magic byte + .putByte(TOPIC_HASH_MAGIC_BYTE) // magic byte
45-48
: Add edge case tests for the hash functions.Consider adding test cases for edge scenarios:
- Empty topics (0 partitions)
- Topics with many partitions
- Empty rack assignments
- Null or empty group names
162-188
: Enhance group hash tests with more variations.The tests for
computeGroupHash
are more limited compared to those forcomputeTopicHash
. Consider adding tests for:
- Empty maps
- Maps with a single entry
- Maps with many entries
- Maps with duplicate hash values
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Lite
📒 Files selected for processing (5)
build.gradle
(1 hunks)checkstyle/import-control-group-coordinator.xml
(1 hunks)gradle/dependencies.gradle
(2 hunks)group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
(2 hunks)group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
(1 hunks)
🔇 Additional comments (10)
gradle/dependencies.gradle (2)
64-64
: Adding Guava dependency version is appropriate.The addition of Guava version 33.4.0-jre is correctly defined in the versions map, following the project's dependency management pattern. This is the latest stable version of Guava as of early 2024.
151-151
: Guava dependency entry correctly defined.The Guava library is properly added with the Maven coordinate referencing the version variable. This follows the project's standard pattern for library declarations.
checkstyle/import-control-group-coordinator.xml (1)
80-80
: Well-scoped import allowance for Guava hashing.The import control is correctly updated to allow only the specific
com.google.common.hash
package rather than the entire Guava library. This follows good security practices by limiting the scope of allowed external dependencies.build.gradle (1)
1423-1423
: Properly scoped Guava dependency.Guava is correctly added as an implementation dependency to the
:group-coordinator
project, which is appropriate since the hashing utilities are used internally and not exposed through the API.group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java (4)
22-24
: Appropriate imports for new functionality.The imports for ClusterImage, TopicImage, BrokerRegistration, Guava hashing utilities, and standard Java utilities are correctly added to support the new hashing methods. The imports are organized following project conventions.
Also applies to: 26-32, 36-36
223-237
: Well-implemented deterministic group hash computation.The
computeGroupHash
method provides a deterministic way to hash multiple topics within a group. Good practices are used:
- Deterministic ordering of topic entries by sorting by name
- Using Guava's
combineOrdered
to ensure hash consistency- Clean functional style with streams
This implementation will produce consistent hash values as long as the input topic hashes are consistent.
239-267
: Thorough topic hash implementation with comprehensive metadata coverage.The
computeTopicHash
method effectively captures all critical aspects of a topic:
- Uses Murmur3 (strong hash algorithm with good distribution)
- Includes magic byte, topic ID, name, and partition count
- Processes partitions in a deterministic order
- Incorporates rack awareness by including sorted rack IDs
The deterministic ordering of partitions and racks ensures consistent hashing regardless of order in which metadata is received.
223-267
: Verify the hash computation consistency across clusters.While the implementation is sound, it's important to verify that these hash computations remain stable across different Kafka cluster deployments and versions. The reliance on rack information could potentially cause hash changes when brokers are replaced or racks are reconfigured.
Consider how the hash values might change during operations like broker replacements, rack reconfigurations, or rolling updates. If hash stability is critical for your use case, ensure you have tests covering these scenarios.
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java (2)
1-189
: Well-structured comprehensive test suite for the new hashing functionality.The test cases effectively verify the behavior of the new
computeTopicHash
andcomputeGroupHash
functions with good coverage of variations. You've included tests for different magic bytes, partition orders, rack orders, and field variations, which will ensure the hashing functions are deterministic and sensitive to all relevant inputs.The use of parameterized tests with
MethodSource
is an excellent approach for testing multiple variations efficiently.
162-174
:✅ Verification successful
Ensure order consistency in topic name map.
The test assumes that
Map.of(BAR_TOPIC_NAME, 123L, FOO_TOPIC_NAME, 456L)
will always maintain the same iteration order. While this is true for small maps in recent Java versions, it's worth ensuring order consistency by using an ordered map implementation or sorting the keys before hashing.
🏁 Script executed:
#!/bin/bash # Check if Group.computeGroupHash sorts the map entries or uses a specific ordered map implementation rg -A 10 "computeGroupHash" --type java | grep -E "(TreeMap|LinkedHashMap|sort)"Length of output: 210
ComputeGroupHash already enforces key ordering
The implementation of
Group.computeGroupHash
uses.entries() .stream() .sorted(Map.Entry.comparingByKey())to ensure a consistent iteration order before hashing. The existing test using
Map.of(...)
is therefore deterministic and does not require switching to an ordered map or additional sorting.
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces deterministic hashing methods for topics and groups, along with related unit tests and a new Guava dependency. The changes seem well-structured and include comprehensive unit tests. However, there are a few areas that could be improved for clarity and efficiency.
Summary of Findings
- Magic Byte in
computeTopicHash
: The magic byte incomputeTopicHash
is not explained. It would be beneficial to add a comment explaining its purpose and potential values. - Potential NullPointerException in
computeTopicHash
: ThecomputeTopicHash
method could potentially throw a NullPointerException ifclusterImage::broker
returns null andOptional::get
is called on an empty Optional. Consider adding a check to prevent this. - Inconsistent Hashing: The
computeTopicHash
method usestopicImage.id().hashCode()
which may not be consistent across different JVMs. Consider using a consistent hashing method for the topic ID.
Merge Readiness
The pull request introduces important hashing functionality and includes thorough unit tests. However, the potential for a NullPointerException in computeTopicHash
and the inconsistent hashing of topic IDs should be addressed before merging. I am unable to approve this pull request, and recommend that others review and approve this code before merging. Given the high
severity issue, I recommend that the pull request not be merged until it is addressed.
/codehelper review |
* | ||
* @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. | ||
* @return The hash of the group. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: Consider adding a brief JavaDoc comment explaining the purpose of this method and when it should be used.
* @param topicImage The topic image. | ||
* @param clusterImage The cluster image. | ||
* @return The hash of the topic. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improvement: The computeTopicHash
method is quite complex. Consider breaking it down into smaller helper methods for better readability and maintainability.
* @param clusterImage The cluster image. | ||
* @return The hash of the topic. | ||
*/ | ||
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Security Note: Be cautious when using Murmur3 for security-sensitive hashing. It's designed for speed and distribution quality, not cryptographic security. If this hash is used for any security purposes, consider using a cryptographic hash function instead.
.putLong(topicImage.id().hashCode()) // topic Id | ||
.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name | ||
.putInt(topicImage.partitions().size()); // number of partitions | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Performance: The stream operation here creates intermediate collections and could be optimized. Consider using a more direct approach if this is in a performance-critical path.
*/ | ||
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { | ||
HashFunction hf = Hashing.murmur3_128(); | ||
Hasher topicHasher = hf.newHasher() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maintainability: The magic byte (byte) 0
should be defined as a named constant with a comment explaining its purpose.
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> { | ||
topicHasher.putInt(entry.getKey()); // partition id | ||
String racks = Arrays.stream(entry.getValue().replicas) | ||
.mapToObj(clusterImage::broker) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: Consider extracting the rack string generation into a separate method for better readability and potential reuse.
private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); | ||
private static final String FOO_TOPIC_NAME = "foo"; | ||
private static final String BAR_TOPIC_NAME = "bar"; | ||
private static final int FOO_NUM_PARTITIONS = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing: This test class is missing MetadataImageBuilder
. Make sure it's properly defined or imported.
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name | ||
.putInt(FOO_NUM_PARTITIONS) // number of partitions | ||
.putInt(0) // partition 0 | ||
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improvement: Consider using a constant for the rack separator string (;
) to ensure consistency across the codebase.
) | ||
); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing: Consider adding a test case for an empty map of topic hashes to ensure proper handling of edge cases.
guava
to dependencies.computeTopicHash
andcomputeGroupHash
functions toGroup
class.
Summary by CodeRabbit
New Features
Tests
Chores