Skip to content

Commit

Permalink
[CONNECTOR] rewrite sink connector by java 17 toList (#1754)
Browse files Browse the repository at this point in the history
  • Loading branch information
Haser0305 authored May 17, 2023
1 parent 4d0c726 commit 94aa36a
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef;
import org.astraea.common.Configuration;
import org.astraea.common.VersionUtils;
Expand Down Expand Up @@ -53,9 +52,7 @@ public final Class<? extends org.apache.kafka.connect.sink.SinkTask> taskClass()

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return takeConfiguration(maxTasks).stream()
.map(Configuration::raw)
.collect(Collectors.toList());
return takeConfiguration(maxTasks).stream().map(Configuration::raw).toList();
}

@Override
Expand Down
3 changes: 1 addition & 2 deletions connector/src/main/java/org/astraea/connector/SinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Headers;
Expand Down Expand Up @@ -57,7 +56,7 @@ public final void start(Map<String, String> props) {
@Override
public final void put(Collection<SinkRecord> records) {
if (records != null && !records.isEmpty())
put(records.stream().map(SinkTask::toRecord).collect(Collectors.toList()));
put(records.stream().map(SinkTask::toRecord).toList());
}

private static byte[] toBytes(Schema schema, Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.astraea.common.Configuration;
Expand Down Expand Up @@ -128,7 +127,7 @@ protected Class<? extends SinkTask> task() {

@Override
protected List<Configuration> takeConfiguration(int maxTasks) {
return IntStream.range(0, maxTasks).mapToObj(ignored -> configs).collect(Collectors.toList());
return IntStream.range(0, maxTasks).mapToObj(ignored -> configs).toList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.common.Configuration;
import org.astraea.common.Utils;
Expand Down Expand Up @@ -50,7 +49,7 @@ protected Class<? extends SinkTask> task() {

@Override
protected List<Configuration> takeConfiguration(int maxTasks) {
return IntStream.range(0, maxTasks).mapToObj(i -> config).collect(Collectors.toList());
return IntStream.range(0, maxTasks).mapToObj(i -> config).toList();
}

@Override
Expand Down

0 comments on commit 94aa36a

Please sign in to comment.