diff --git a/common/src/main/java/org/astraea/common/admin/Transaction.java b/common/src/main/java/org/astraea/common/admin/Transaction.java index 36d8e90798..dd7c9e1829 100644 --- a/common/src/main/java/org/astraea/common/admin/Transaction.java +++ b/common/src/main/java/org/astraea/common/admin/Transaction.java @@ -19,59 +19,24 @@ import java.util.Set; import java.util.stream.Collectors; -public interface Transaction { +public record Transaction( + String transactionId, + int coordinatorId, + TransactionState state, + long producerId, + int producerEpoch, + long transactionTimeoutMs, + Set topicPartitions) { static Transaction of( String transactionId, org.apache.kafka.clients.admin.TransactionDescription td) { - return new Transaction() { - @Override - public String transactionId() { - return transactionId; - } - - @Override - public int coordinatorId() { - return td.coordinatorId(); - } - - @Override - public TransactionState state() { - return TransactionState.of(td.state()); - } - - @Override - public long producerId() { - return td.producerId(); - } - - @Override - public int producerEpoch() { - return td.producerEpoch(); - } - - @Override - public long transactionTimeoutMs() { - return 0; - } - - @Override - public Set topicPartitions() { - return td.topicPartitions().stream().map(TopicPartition::from).collect(Collectors.toSet()); - } - }; + return new Transaction( + transactionId, + td.coordinatorId(), + TransactionState.of(td.state()), + td.producerId(), + td.producerEpoch(), + td.transactionTimeoutMs(), + td.topicPartitions().stream().map(TopicPartition::from).collect(Collectors.toSet())); } - - String transactionId(); - - int coordinatorId(); - - TransactionState state(); - - long producerId(); - - int producerEpoch(); - - long transactionTimeoutMs(); - - Set topicPartitions(); } diff --git a/common/src/test/java/org/astraea/common/admin/AdminTest.java b/common/src/test/java/org/astraea/common/admin/AdminTest.java index e6e87963d1..42465de7a5 100644 --- a/common/src/test/java/org/astraea/common/admin/AdminTest.java +++ b/common/src/test/java/org/astraea/common/admin/AdminTest.java @@ -2192,4 +2192,32 @@ void testClusterId() { .clusterId()); } } + + @Test + void testTransaction() { + var records = + IntStream.range(0, 100) + .mapToObj( + index -> Record.builder().topic(Utils.randomString()).key(new byte[100]).build()) + .toList(); + try (var admin = Admin.of(SERVICE.bootstrapServers()); + var producer = + Producer.builder().bootstrapServers(SERVICE.bootstrapServers()).buildTransactional()) { + producer.send(records); + Assertions.assertTrue( + admin + .transactionIds() + .toCompletableFuture() + .join() + .contains(producer.transactionId().get())); + var transactions = + admin.transactions(Set.of(producer.transactionId().get())).toCompletableFuture().join(); + Assertions.assertNotEquals(0, transactions.size()); + transactions.forEach( + t -> { + Assertions.assertEquals(producer.transactionId().get(), t.transactionId()); + Assertions.assertNotEquals(0, t.transactionTimeoutMs()); + }); + } + } }