From a6566af299d69b4aa0563078b1c12c20db1980e4 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Mon, 1 May 2023 22:33:56 +0800 Subject: [PATCH] [COMMON] rewrite Header by java 17 record --- .../astraea/app/web/RecordHandlerTest.java | 4 ++-- .../main/java/org/astraea/common/Header.java | 24 ++++--------------- .../common/backup/RecordReaderBuilder.java | 2 +- .../common/partitioner/PartitionerTest.java | 4 ++-- .../astraea/common/producer/ProducerTest.java | 4 ++-- .../java/org/astraea/connector/SinkTask.java | 2 +- .../org/astraea/connector/SourceDataTest.java | 2 +- 7 files changed, 14 insertions(+), 28 deletions(-) diff --git a/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java b/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java index 09d287fe3c..ed44664d15 100644 --- a/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java @@ -588,7 +588,7 @@ void testGetResponse() { .topic(topic) .key("astraea".getBytes(UTF_8)) .value(ByteBuffer.allocate(Integer.BYTES).putInt(100).array()) - .headers(List.of(Header.of("a", "b".getBytes(UTF_8)))) + .headers(List.of(new Header("a", "b".getBytes(UTF_8)))) .timestamp(timestamp) .build()); producer.flush(); @@ -641,7 +641,7 @@ void testGetJsonResponse() { .topic(topic) .key("astraea".getBytes()) .value(ByteBuffer.allocate(Integer.BYTES).putInt(100).array()) - .headers(List.of(Header.of("a", null))) + .headers(List.of(new Header("a", null))) .timestamp(timestamp) .build()); producer.flush(); diff --git a/common/src/main/java/org/astraea/common/Header.java b/common/src/main/java/org/astraea/common/Header.java index 2a6e9673d5..0beea8ef7d 100644 --- a/common/src/main/java/org/astraea/common/Header.java +++ b/common/src/main/java/org/astraea/common/Header.java @@ -22,31 +22,17 @@ import java.util.stream.StreamSupport; import org.apache.kafka.common.header.Headers; -public interface Header { - static List
of(Headers headers) { +public record Header(String key, byte[] value) { + public static List
of(Headers headers) { var iter = headers.iterator(); // a minor optimization to avoid create extra collection. if (!iter.hasNext()) return List.of(); return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iter, 0), false) - .map(h -> of(h.key(), h.value())) + .map(h -> new Header(h.key(), h.value())) .collect(Collectors.toUnmodifiableList()); } - static Header of(String key, byte[] value) { - return new Header() { - @Override - public String key() { - return key; - } - - @Override - public byte[] value() { - return value; - } - }; + public static Header of(String key, byte[] value) { + return new Header(key, value); } - - String key(); - - byte[] value(); } diff --git a/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java b/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java index 8000b705c8..5e66a99926 100644 --- a/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java +++ b/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java @@ -75,7 +75,7 @@ public String topic() { @Override public List
headers() { return outerRecord.getHeadersList().stream() - .map(header -> Header.of(header.getKey(), header.getValue().toByteArray())) + .map(header -> new Header(header.getKey(), header.getValue().toByteArray())) .collect(Collectors.toUnmodifiableList()); } diff --git a/common/src/test/java/org/astraea/common/partitioner/PartitionerTest.java b/common/src/test/java/org/astraea/common/partitioner/PartitionerTest.java index a2515dbe85..af5fa82063 100644 --- a/common/src/test/java/org/astraea/common/partitioner/PartitionerTest.java +++ b/common/src/test/java/org/astraea/common/partitioner/PartitionerTest.java @@ -117,7 +117,7 @@ void multipleThreadTest(String className) throws IOException { var key = "tainan"; var value = "shanghai"; var timestamp = System.currentTimeMillis() + 10; - var header = Header.of("a", "b".getBytes()); + var header = new Header("a", "b".getBytes()); try (var producer = Producer.builder() .keySerializer(Serializer.STRING) @@ -173,7 +173,7 @@ void interdependentTest(String className) throws IOException { var key = "tainan"; var value = "shanghai"; var timestamp = System.currentTimeMillis() + 10; - var header = Header.of("a", "b".getBytes()); + var header = new Header("a", "b".getBytes()); try (var producer = Producer.builder() .keySerializer(Serializer.STRING) diff --git a/common/src/test/java/org/astraea/common/producer/ProducerTest.java b/common/src/test/java/org/astraea/common/producer/ProducerTest.java index 0ff4dc2b66..8ee723a5ad 100644 --- a/common/src/test/java/org/astraea/common/producer/ProducerTest.java +++ b/common/src/test/java/org/astraea/common/producer/ProducerTest.java @@ -53,7 +53,7 @@ void testSender() { var topicName = "testSender-" + System.currentTimeMillis(); var key = "key"; var timestamp = System.currentTimeMillis() + 10; - var header = Header.of("a", "b".getBytes()); + var header = new Header("a", "b".getBytes()); try (var producer = Producer.builder() .bootstrapServers(SERVICE.bootstrapServers()) @@ -100,7 +100,7 @@ void testTransaction() { var topicName = "testTransaction-" + System.currentTimeMillis(); var key = "key"; var timestamp = System.currentTimeMillis() + 10; - var header = Header.of("a", "b".getBytes()); + var header = new Header("a", "b".getBytes()); try (var producer = Producer.builder() .bootstrapServers(SERVICE.bootstrapServers()) diff --git a/connector/src/main/java/org/astraea/connector/SinkTask.java b/connector/src/main/java/org/astraea/connector/SinkTask.java index 0fcd326797..31e5152ffa 100644 --- a/connector/src/main/java/org/astraea/connector/SinkTask.java +++ b/connector/src/main/java/org/astraea/connector/SinkTask.java @@ -136,7 +136,7 @@ private static List
toHeaders(Headers headers) { var hs = new ArrayList
(headers.size()); headers .iterator() - .forEachRemaining(h -> hs.add(Header.of(h.key(), toBytes(h.schema(), h.value())))); + .forEachRemaining(h -> hs.add(new Header(h.key(), toBytes(h.schema(), h.value())))); return Collections.unmodifiableList(hs); } diff --git a/connector/src/test/java/org/astraea/connector/SourceDataTest.java b/connector/src/test/java/org/astraea/connector/SourceDataTest.java index 5f7343c4a3..9827d0562e 100644 --- a/connector/src/test/java/org/astraea/connector/SourceDataTest.java +++ b/connector/src/test/java/org/astraea/connector/SourceDataTest.java @@ -158,7 +158,7 @@ protected Collection> take() throws InterruptedException .key(KEY) .value(VALUE) .topic(t) - .headers(List.of(Header.of(HEADER_KEY, HEADER_VALUE))) + .headers(List.of(new Header(HEADER_KEY, HEADER_VALUE))) .build()) .collect(Collectors.toList()); }