From 126c5e6f2118139857614876d340a8a423f88307 Mon Sep 17 00:00:00 2001 From: Haser Date: Thu, 18 May 2023 06:52:56 +0800 Subject: [PATCH] [CONNECTOR] rewrite connect source by java 17 record and toList (#1756) --- .../astraea/connector/SourceConnector.java | 5 +--- .../org/astraea/connector/SourceTask.java | 30 ++----------------- .../astraea/connector/backup/Importer.java | 3 +- 3 files changed, 5 insertions(+), 33 deletions(-) diff --git a/connector/src/main/java/org/astraea/connector/SourceConnector.java b/connector/src/main/java/org/astraea/connector/SourceConnector.java index 51de0da065..e2cd3cc9df 100644 --- a/connector/src/main/java/org/astraea/connector/SourceConnector.java +++ b/connector/src/main/java/org/astraea/connector/SourceConnector.java @@ -18,7 +18,6 @@ import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.kafka.common.config.ConfigDef; import org.astraea.common.Configuration; import org.astraea.common.VersionUtils; @@ -51,9 +50,7 @@ public final Class taskCla @Override public List> taskConfigs(int maxTasks) { - return takeConfiguration(maxTasks).stream() - .map(Configuration::raw) - .collect(Collectors.toList()); + return takeConfiguration(maxTasks).stream().map(Configuration::raw).toList(); } @Override diff --git a/connector/src/main/java/org/astraea/connector/SourceTask.java b/connector/src/main/java/org/astraea/connector/SourceTask.java index a2de195483..f82316cb68 100644 --- a/connector/src/main/java/org/astraea/connector/SourceTask.java +++ b/connector/src/main/java/org/astraea/connector/SourceTask.java @@ -76,7 +76,7 @@ public final List poll() r.headers().stream() .map(h -> new HeaderImpl(h.key(), null, h.value())) .collect(Collectors.toList()))) - .collect(Collectors.toList()); + .toList(); } @Override @@ -92,32 +92,8 @@ public final void commitRecord( commit(Metadata.of(metadata)); } - private static class HeaderImpl implements org.apache.kafka.connect.header.Header { - - private final String key; - private final Schema schema; - private final Object value; - - private HeaderImpl(String key, Schema schema, Object value) { - this.key = key; - this.schema = schema; - this.value = value; - } - - @Override - public String key() { - return key; - } - - @Override - public Schema schema() { - return schema; - } - - @Override - public Object value() { - return value; - } + private record HeaderImpl(String key, Schema schema, Object value) + implements org.apache.kafka.connect.header.Header { @Override public org.apache.kafka.connect.header.Header with(Schema schema, Object value) { diff --git a/connector/src/main/java/org/astraea/connector/backup/Importer.java b/connector/src/main/java/org/astraea/connector/backup/Importer.java index 31ee09d507..f8d34cb8da 100644 --- a/connector/src/main/java/org/astraea/connector/backup/Importer.java +++ b/connector/src/main/java/org/astraea/connector/backup/Importer.java @@ -26,7 +26,6 @@ import java.util.Optional; import java.util.Set; import java.util.function.Predicate; -import java.util.stream.Collectors; import java.util.stream.IntStream; import org.astraea.common.Configuration; import org.astraea.common.Utils; @@ -117,7 +116,7 @@ protected List takeConfiguration(int maxTasks) { taskMap.put(TASKS_COUNT_KEY, String.valueOf(maxTasks)); return Configuration.of(taskMap); }) - .collect(Collectors.toList()); + .toList(); } @Override