Skip to content

KAFKA-19234: broker should return UNAUTHORIZATION error for non-existing topic in produce request #19635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: trunk
Choose a base branch
from

Conversation

FrankYang0529
Copy link
Member

@FrankYang0529 FrankYang0529 commented May 4, 2025

Since topic name is sensitive information, it should return a
TOPIC_AUTHORIZATION_FAILED error for non-existing topic. The Fetch
request also follows this pattern.

Co-authored-by: John Doe [email protected]

…ing topic in produce request

Signed-off-by: PoAn Yang <[email protected]>
Copy link
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank @FrankYang0529 for this patch, some little comments

@FrankYang0529 FrankYang0529 requested a review from chia7712 May 5, 2025 12:53
Copy link
Collaborator

@TaiJuWu TaiJuWu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@@ -924,6 +924,33 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
sendRequests(requestKeyToRequest, false, topicNames)
}

@Test
def testAuthorizationWithTopicNotExistingForProduceReqeustVersionLessThan13(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  @Test
  def testAuthorizationWithTopicNotExistingForProduceRequestVersionLessThan13(): Unit = {
    sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest))
    def createTpd(version: Int) =
      if (version <= 12) new ProduceRequestData.TopicProduceData()
        .setName(tp.topic())
      else new ProduceRequestData.TopicProduceData()
        .setTopicId(getTopicIds()(tp.topic()))
    for (version <- ApiKeys.PRODUCE.oldestVersion to ApiKeys.PRODUCE.latestVersion) {
      val request = requests.ProduceRequest.builder(new ProduceRequestData()
          .setTopicData(new ProduceRequestData.TopicProduceDataCollection(
            util.List.of(createTpd(version)
                .setPartitionData(util.List.of(
                  new ProduceRequestData.PartitionProduceData()
                    .setIndex(tp.partition)
                    .setRecords(MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("test".getBytes))))))
              .iterator))
          .setAcks(1.toShort)
          .setTimeoutMs(5000))
        .build(version.toShort)
      val data = connectAndReceive[AbstractResponse](request, listenerName = listenerName).asInstanceOf[ProduceResponse].data().responses()
      assertEquals(1, data.size())
      val response = if (version <= 12) data.find(tp.topic(), Uuid.ZERO_UUID)
      else data.find("", getTopicIds()(tp.topic()))
      assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(),
        response.partitionResponses().asScala.find(_.index == part).get.errorCode, s"unexpected error for produce request version $version")
    }
  }

we need to test version 13, but it should use topic id instead of topic name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @chia7712, the version 13 is covered by testAuthorizationWithTopicNotExisting. This PR focuses on old versions behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add comments for the version=13. Additionally, there is a typo: Reqeust

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 : Thanks for the PR. Left a comment.

val request = requests.ProduceRequest.builder(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
util.List.of(new ProduceRequestData.TopicProduceData()
.setName(tp.topic())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This topic exists on the broker, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect to return TOPIC_AUTHORIZATION_FAILED (if it failed to authorize) regardless of topic existence if the request is using topic name. Maybe we should test both cases - 1) topic exists and 2) topic does not exist (similar to testAuthorizationFetchV12WithTopicNotExisting)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. I am just pointing out this doesn't match the test name, which says WithTopicNotExisting.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @junrao, This case follows testAuthorizationWithTopicNotExisting. In AuthorizerIntegrationTest, if a case needs a existing topic like case testAuthorizationWithTopicExisting, it needs to call sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest)).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 could you please add the test case for the existent topic? Additionally, it would be useful to add case of existent topic for fetch RPC too.

@@ -413,8 +413,6 @@ class KafkaApis(val requestChannel: RequestChannel,
val topicPartition = new TopicPartition(topicName, partition.index())
if (topicName.isEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another subtle existing issue. If a produce request before v13 has an empty topic name, we used to return UNKNOWN_TOPIC_OR_PARTITION. Now, we return UNKNOWN_TOPIC_ID. It's probably better to change the condition to if (topicName.isEmpty && topic.topicId().equals(Uuid.ZERO_UUID)).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably better to change the condition to if (topicName.isEmpty && topic.topicId().equals(Uuid.ZERO_UUID)).

The default value of topic.topicId() is Uuid.ZERO_UUID, and hence if (topicName.isEmpty && topic.topicId().equals(Uuid.ZERO_UUID))is always true if request is before v13. Maybe we can try another condition:if (topicName.isEmpty && version > 12`?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant if (topicName.isEmpty && !topic.topicId().equals(Uuid.ZERO_UUID)).

val request = requests.ProduceRequest.builder(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
util.List.of(new ProduceRequestData.TopicProduceData()
.setName(tp.topic())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. I am just pointing out this doesn't match the test name, which says WithTopicNotExisting.

Copy link
Contributor

@frankvicky frankvicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, LGTM assuming the remaining comments are addressed.

@FrankYang0529
Copy link
Member Author

@junrao @chia7712 Thanks for review. I addressed all comments. Could you take a look when you have time? Thanks.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 : Thanks for the updated PR. A couple of more comments.

authorizationForProduceRequestVersionLessThan13(true)
}

def authorizationForProduceRequestVersionLessThan13(createTopic: Boolean): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be private?


def authorizationForProduceRequestVersionLessThan13(createTopic: Boolean): Unit = {
if (createTopic) {
sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use createTopicWithBrokerPrincipal()? This will wait until the topic metadata is propagated to all brokers. Ditto for authorizationFetchV12.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow junrao, it would be good to follow testTopicIdAuthorization method?

  • Use @ParameterizedTest and rename the parameter to withTopicExisting for consistent code style in this file.
  • Use createTopicWithBrokerPrincipal.
  @ParameterizedTest
  @CsvSource(value = Array("false", "true"))
  def testTopicIdAuthorization(withTopicExisting: Boolean): Unit = {
    val topicId = if (withTopicExisting) {
      createTopicWithBrokerPrincipal(topic)
      getTopicIds()(topic)
    } else {
      Uuid.randomUuid()
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I changed to use ParameterizedTest, so we can remove authorizationForProduceRequestVersionLessThan13 and authorizationFetchV12.

Signed-off-by: PoAn Yang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker small Small PRs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants