Skip to content

Commit

Permalink
Add logic to call the proper delete method based on whether the topic…
Browse files Browse the repository at this point in the history
… is partitioned or not (#110)
  • Loading branch information
david-streamlio authored Aug 2, 2023
1 parent 4a5757b commit 83a5a4c
Showing 1 changed file with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.TopicStats;

@Slf4j
Expand Down Expand Up @@ -53,16 +54,30 @@ public final void delete() throws JMSException {
throw new JMSException("Cannot delete a temporary destination with active consumers");
}

session
if (session
.getFactory()
.getPulsarAdmin()
.topics()
.delete(
fullQualifiedTopicName,
session.getFactory().isForceDeleteTemporaryDestinations(),
true);
} catch (Exception err) {
throw Utils.handleException(err);
.getPartitionedTopicList(session.getFactory().getSystemNamespace())
.stream()
.anyMatch(t -> t.equals(fullQualifiedTopicName))) {
session
.getFactory()
.getPulsarAdmin()
.topics()
.deletePartitionedTopic(
fullQualifiedTopicName, session.getFactory().isForceDeleteTemporaryDestinations());
} else {
session
.getFactory()
.getPulsarAdmin()
.topics()
.delete(
fullQualifiedTopicName, session.getFactory().isForceDeleteTemporaryDestinations());
}

} catch (final PulsarAdminException paEx) {
Utils.handleException(paEx);
} finally {
session.getConnection().removeTemporaryDestination(this);
}
Expand Down

0 comments on commit 83a5a4c

Please sign in to comment.