Skip to content

Commit

Permalink
[CONNECTOR] rewrite connect source by java 17 record and toList (#1756)
Browse files Browse the repository at this point in the history
  • Loading branch information
Haser0305 authored May 17, 2023
1 parent 12bcb0b commit 126c5e6
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef;
import org.astraea.common.Configuration;
import org.astraea.common.VersionUtils;
Expand Down Expand Up @@ -51,9 +50,7 @@ public final Class<? extends org.apache.kafka.connect.source.SourceTask> taskCla

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return takeConfiguration(maxTasks).stream()
.map(Configuration::raw)
.collect(Collectors.toList());
return takeConfiguration(maxTasks).stream().map(Configuration::raw).toList();
}

@Override
Expand Down
30 changes: 3 additions & 27 deletions connector/src/main/java/org/astraea/connector/SourceTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public final List<org.apache.kafka.connect.source.SourceRecord> poll()
r.headers().stream()
.map(h -> new HeaderImpl(h.key(), null, h.value()))
.collect(Collectors.toList())))
.collect(Collectors.toList());
.toList();
}

@Override
Expand All @@ -92,32 +92,8 @@ public final void commitRecord(
commit(Metadata.of(metadata));
}

private static class HeaderImpl implements org.apache.kafka.connect.header.Header {

private final String key;
private final Schema schema;
private final Object value;

private HeaderImpl(String key, Schema schema, Object value) {
this.key = key;
this.schema = schema;
this.value = value;
}

@Override
public String key() {
return key;
}

@Override
public Schema schema() {
return schema;
}

@Override
public Object value() {
return value;
}
private record HeaderImpl(String key, Schema schema, Object value)
implements org.apache.kafka.connect.header.Header {

@Override
public org.apache.kafka.connect.header.Header with(Schema schema, Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.common.Configuration;
import org.astraea.common.Utils;
Expand Down Expand Up @@ -117,7 +116,7 @@ protected List<Configuration> takeConfiguration(int maxTasks) {
taskMap.put(TASKS_COUNT_KEY, String.valueOf(maxTasks));
return Configuration.of(taskMap);
})
.collect(Collectors.toList());
.toList();
}

@Override
Expand Down

0 comments on commit 126c5e6

Please sign in to comment.