-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-19234: broker should return UNAUTHORIZATION error for non-existing topic in produce request #19635
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
…ing topic in produce request Signed-off-by: PoAn Yang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank @FrankYang0529 for this patch, some little comments
Co-authored-by: Ken Huang <[email protected]>
Co-authored-by: Ken Huang <[email protected]>
Co-authored-by: Jhen-Yung Hsu <[email protected]>
…oduceReqeustVersionLessThan13 Signed-off-by: PoAn Yang <[email protected]>
Signed-off-by: PoAn Yang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@@ -924,6 +924,33 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { | |||
sendRequests(requestKeyToRequest, false, topicNames) | |||
} | |||
|
|||
@Test | |||
def testAuthorizationWithTopicNotExistingForProduceReqeustVersionLessThan13(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Test
def testAuthorizationWithTopicNotExistingForProduceRequestVersionLessThan13(): Unit = {
sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest))
def createTpd(version: Int) =
if (version <= 12) new ProduceRequestData.TopicProduceData()
.setName(tp.topic())
else new ProduceRequestData.TopicProduceData()
.setTopicId(getTopicIds()(tp.topic()))
for (version <- ApiKeys.PRODUCE.oldestVersion to ApiKeys.PRODUCE.latestVersion) {
val request = requests.ProduceRequest.builder(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
util.List.of(createTpd(version)
.setPartitionData(util.List.of(
new ProduceRequestData.PartitionProduceData()
.setIndex(tp.partition)
.setRecords(MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("test".getBytes))))))
.iterator))
.setAcks(1.toShort)
.setTimeoutMs(5000))
.build(version.toShort)
val data = connectAndReceive[AbstractResponse](request, listenerName = listenerName).asInstanceOf[ProduceResponse].data().responses()
assertEquals(1, data.size())
val response = if (version <= 12) data.find(tp.topic(), Uuid.ZERO_UUID)
else data.find("", getTopicIds()(tp.topic()))
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(),
response.partitionResponses().asScala.find(_.index == part).get.errorCode, s"unexpected error for produce request version $version")
}
}
we need to test version 13, but it should use topic id instead of topic name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @chia7712, the version 13 is covered by testAuthorizationWithTopicNotExisting
. This PR focuses on old versions behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add comments for the version=13. Additionally, there is a typo: Reqeust
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 : Thanks for the PR. Left a comment.
val request = requests.ProduceRequest.builder(new ProduceRequestData() | ||
.setTopicData(new ProduceRequestData.TopicProduceDataCollection( | ||
util.List.of(new ProduceRequestData.TopicProduceData() | ||
.setName(tp.topic()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This topic exists on the broker, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We expect to return TOPIC_AUTHORIZATION_FAILED
(if it failed to authorize) regardless of topic existence if the request is using topic name. Maybe we should test both cases - 1) topic exists and 2) topic does not exist (similar to testAuthorizationFetchV12WithTopicNotExisting
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could. I am just pointing out this doesn't match the test name, which says WithTopicNotExisting
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @junrao, This case follows testAuthorizationWithTopicNotExisting
. In AuthorizerIntegrationTest
, if a case needs a existing topic like case testAuthorizationWithTopicExisting
, it needs to call sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest))
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 could you please add the test case for the existent topic? Additionally, it would be useful to add case of existent topic for fetch RPC too.
@@ -413,8 +413,6 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
val topicPartition = new TopicPartition(topicName, partition.index()) | |||
if (topicName.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is another subtle existing issue. If a produce request before v13 has an empty topic name, we used to return UNKNOWN_TOPIC_OR_PARTITION. Now, we return UNKNOWN_TOPIC_ID. It's probably better to change the condition to if (topicName.isEmpty && topic.topicId().equals(Uuid.ZERO_UUID))
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably better to change the condition to if (topicName.isEmpty && topic.topicId().equals(Uuid.ZERO_UUID)).
The default value of topic.topicId()
is Uuid.ZERO_UUID, and hence
if (topicName.isEmpty && topic.topicId().equals(Uuid.ZERO_UUID))is always true if request is before v13. Maybe we can try another condition:
if (topicName.isEmpty && version > 12`?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I meant if (topicName.isEmpty && !topic.topicId().equals(Uuid.ZERO_UUID))
.
val request = requests.ProduceRequest.builder(new ProduceRequestData() | ||
.setTopicData(new ProduceRequestData.TopicProduceDataCollection( | ||
util.List.of(new ProduceRequestData.TopicProduceData() | ||
.setName(tp.topic()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could. I am just pointing out this doesn't match the test name, which says WithTopicNotExisting
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, LGTM assuming the remaining comments are addressed.
Signed-off-by: PoAn Yang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 : Thanks for the updated PR. A couple of more comments.
authorizationForProduceRequestVersionLessThan13(true) | ||
} | ||
|
||
def authorizationForProduceRequestVersionLessThan13(createTopic: Boolean): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be private?
|
||
def authorizationForProduceRequestVersionLessThan13(createTopic: Boolean): Unit = { | ||
if (createTopic) { | ||
sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use createTopicWithBrokerPrincipal()
? This will wait until the topic metadata is propagated to all brokers. Ditto for authorizationFetchV12
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow junrao, it would be good to follow testTopicIdAuthorization method?
- Use @ParameterizedTest and rename the parameter to withTopicExisting for consistent code style in this file.
- Use
createTopicWithBrokerPrincipal
.
@ParameterizedTest
@CsvSource(value = Array("false", "true"))
def testTopicIdAuthorization(withTopicExisting: Boolean): Unit = {
val topicId = if (withTopicExisting) {
createTopicWithBrokerPrincipal(topic)
getTopicIds()(topic)
} else {
Uuid.randomUuid()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I changed to use ParameterizedTest
, so we can remove authorizationForProduceRequestVersionLessThan13
and authorizationFetchV12
.
Signed-off-by: PoAn Yang <[email protected]>
Since topic name is sensitive information, it should return a
TOPIC_AUTHORIZATION_FAILED error for non-existing topic. The Fetch
request also follows this pattern.
Co-authored-by: John Doe [email protected]