Skip to content

Commit

Permalink
[COMMON] rewrite Header by java 17 record (#1687)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored May 2, 2023
1 parent 7fa1a22 commit cb428f2
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 28 deletions.
4 changes: 2 additions & 2 deletions app/src/test/java/org/astraea/app/web/RecordHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ void testGetResponse() {
.topic(topic)
.key("astraea".getBytes(UTF_8))
.value(ByteBuffer.allocate(Integer.BYTES).putInt(100).array())
.headers(List.of(Header.of("a", "b".getBytes(UTF_8))))
.headers(List.of(new Header("a", "b".getBytes(UTF_8))))
.timestamp(timestamp)
.build());
producer.flush();
Expand Down Expand Up @@ -641,7 +641,7 @@ void testGetJsonResponse() {
.topic(topic)
.key("astraea".getBytes())
.value(ByteBuffer.allocate(Integer.BYTES).putInt(100).array())
.headers(List.of(Header.of("a", null)))
.headers(List.of(new Header("a", null)))
.timestamp(timestamp)
.build());
producer.flush();
Expand Down
24 changes: 5 additions & 19 deletions common/src/main/java/org/astraea/common/Header.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,17 @@
import java.util.stream.StreamSupport;
import org.apache.kafka.common.header.Headers;

public interface Header {
static List<Header> of(Headers headers) {
public record Header(String key, byte[] value) {
public static List<Header> of(Headers headers) {
var iter = headers.iterator();
// a minor optimization to avoid create extra collection.
if (!iter.hasNext()) return List.of();
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iter, 0), false)
.map(h -> of(h.key(), h.value()))
.map(h -> new Header(h.key(), h.value()))
.collect(Collectors.toUnmodifiableList());
}

static Header of(String key, byte[] value) {
return new Header() {
@Override
public String key() {
return key;
}

@Override
public byte[] value() {
return value;
}
};
public static Header of(String key, byte[] value) {
return new Header(key, value);
}

String key();

byte[] value();
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public String topic() {
@Override
public List<Header> headers() {
return outerRecord.getHeadersList().stream()
.map(header -> Header.of(header.getKey(), header.getValue().toByteArray()))
.map(header -> new Header(header.getKey(), header.getValue().toByteArray()))
.collect(Collectors.toUnmodifiableList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void multipleThreadTest(String className) throws IOException {
var key = "tainan";
var value = "shanghai";
var timestamp = System.currentTimeMillis() + 10;
var header = Header.of("a", "b".getBytes());
var header = new Header("a", "b".getBytes());
try (var producer =
Producer.builder()
.keySerializer(Serializer.STRING)
Expand Down Expand Up @@ -173,7 +173,7 @@ void interdependentTest(String className) throws IOException {
var key = "tainan";
var value = "shanghai";
var timestamp = System.currentTimeMillis() + 10;
var header = Header.of("a", "b".getBytes());
var header = new Header("a", "b".getBytes());
try (var producer =
Producer.builder()
.keySerializer(Serializer.STRING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void testSender() {
var topicName = "testSender-" + System.currentTimeMillis();
var key = "key";
var timestamp = System.currentTimeMillis() + 10;
var header = Header.of("a", "b".getBytes());
var header = new Header("a", "b".getBytes());
try (var producer =
Producer.builder()
.bootstrapServers(SERVICE.bootstrapServers())
Expand Down Expand Up @@ -100,7 +100,7 @@ void testTransaction() {
var topicName = "testTransaction-" + System.currentTimeMillis();
var key = "key";
var timestamp = System.currentTimeMillis() + 10;
var header = Header.of("a", "b".getBytes());
var header = new Header("a", "b".getBytes());
try (var producer =
Producer.builder()
.bootstrapServers(SERVICE.bootstrapServers())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private static List<Header> toHeaders(Headers headers) {
var hs = new ArrayList<Header>(headers.size());
headers
.iterator()
.forEachRemaining(h -> hs.add(Header.of(h.key(), toBytes(h.schema(), h.value()))));
.forEachRemaining(h -> hs.add(new Header(h.key(), toBytes(h.schema(), h.value()))));
return Collections.unmodifiableList(hs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ protected Collection<Record<byte[], byte[]>> take() throws InterruptedException
.key(KEY)
.value(VALUE)
.topic(t)
.headers(List.of(Header.of(HEADER_KEY, HEADER_VALUE)))
.headers(List.of(new Header(HEADER_KEY, HEADER_VALUE)))
.build())
.collect(Collectors.toList());
}
Expand Down

0 comments on commit cb428f2

Please sign in to comment.