Skip to content

Commit

Permalink
Merge pull request #970: [proxima-beam-tools] fix JsonElement registr…
Browse files Browse the repository at this point in the history
…ation
  • Loading branch information
je-ik authored Feb 27, 2025
2 parents 66a927d + 8c3bf14 commit 0f2d91b
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
Expand Down Expand Up @@ -231,7 +232,13 @@ private ObserveHandle startObserve(Partition partition, BatchLogObserver observe
@GetInitialRestriction
public PartitionList initialRestriction() {
BatchLogReader reader = readerFactory.apply(repoFactory.apply());
return PartitionList.initialRestriction(reader.getPartitions(startStamp, endStamp), limit);
return PartitionList.initialRestriction(
reader.getPartitions(startStamp, endStamp).stream()
// sort partitions decreasing by size so that in case of uneven downstream processing
// the smallest partition would be processed last
.sorted(Comparator.comparing(Partition::size).reversed())
.collect(Collectors.toList()),
limit);
}

@SplitRestriction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ public void setup() {
runningObserves =
CacheBuilder.newBuilder()
.concurrencyLevel(1)
.expireAfterAccess(5, TimeUnit.SECONDS)
.expireAfterAccess(1, TimeUnit.MINUTES)
.<Integer, ObserveHandle>removalListener(
notification -> {
if (notification.wasEvicted()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
import cz.o2.proxima.internal.com.google.gson.JsonNull;
import cz.o2.proxima.internal.com.google.gson.JsonObject;
import cz.o2.proxima.internal.com.google.gson.JsonPrimitive;
import cz.o2.proxima.internal.com.google.gson.internal.LazilyParsedNumber;
import cz.o2.proxima.internal.com.google.gson.internal.LinkedTreeMap;
import cz.o2.proxima.internal.com.google.gson.internal.NonNullElementWrapperList;
import cz.o2.proxima.tools.groovy.RepositoryProvider;
import cz.o2.proxima.tools.groovy.Stream;
import cz.o2.proxima.tools.groovy.StreamProvider.TerminatePredicate;
Expand Down Expand Up @@ -630,25 +633,25 @@ public void persistIntoTargetReplica(

@Override
public void persistIntoTargetFamily(
RepositoryProvider repoProvider, String targetFamilyname, int parallelism) {
RepositoryProvider repoProvider, String targetFamilyName, int parallelism) {
DirectAttributeFamilyDescriptor familyDescriptor =
repoProvider
.getDirect()
.getAllFamilies()
.filter(af -> af.getDesc().getName().equals(targetFamilyname))
.filter(af -> af.getDesc().getName().equals(targetFamilyName))
.findAny()
.orElseThrow(
() ->
new IllegalArgumentException(
String.format("Family [%s] does not exist", targetFamilyname)));
String.format("Family [%s] does not exist", targetFamilyName)));
Preconditions.checkArgument(!familyDescriptor.getDesc().getAccess().isReadonly());
AttributeWriterBase rawWriter =
familyDescriptor
.getWriter()
.orElseThrow(
() ->
new IllegalArgumentException(
String.format("Family [%s] does not have writer", targetFamilyname)));
String.format("Family [%s] does not have writer", targetFamilyName)));
RepositoryFactory repositoryFactory = repoProvider.getRepo().asFactory();
AttributeWriterBase.Factory<?> writerFactory = rawWriter.asFactory();
SerializableScopedValue<Integer, AttributeWriterBase> writer =
Expand All @@ -660,7 +663,7 @@ public void persistIntoTargetFamily(
switch (rawWriter.getType()) {
case ONLINE:
writeUsingOnlineWriterFactory(
"write-to-" + targetFamilyname,
"write-to-" + targetFamilyName,
el -> {
Preconditions.checkArgument(
el == null || allowedAttributes.contains(el.getAttributeDescriptor().getName()));
Expand All @@ -669,12 +672,12 @@ public void persistIntoTargetFamily(
break;
case BULK:
writeUsingBulkWriterFactory(
"write-bulk-to-" + targetFamilyname,
"write-bulk-to-" + targetFamilyName,
parallelism,
BulkWriterFactory.wrap(writer, allowedAttributes));
break;
default:
throw new IllegalArgumentException("Unknonw type " + rawWriter.getType());
throw new IllegalArgumentException("Unknown type " + rawWriter.getType());
}
}

Expand Down Expand Up @@ -1269,6 +1272,9 @@ private static void registerCommonTypes(Kryo kryo, Repository repo) {
JsonNull.class,
JsonObject.class,
JsonPrimitive.class,
LinkedTreeMap.class,
LazilyParsedNumber.class,
NonNullElementWrapperList.class,
String[].class,
Integer[].class,
Long[].class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,7 @@ private void submitConsumerWithObserver(
endOffsets = stopAtCurrent ? findNonEmptyEndOffsets(kafka) : null;
if (log.isDebugEnabled()) {
log.debug(
"End offsets of current assignment {}: {}",
kafka.assignment(),
endOffsets);
"End offsets of current assignment {}: {}", kafka.assignment(), endOffsets);
}
} while (poll.isEmpty()
&& accessor.isTopicRegex()
Expand Down

0 comments on commit 0f2d91b

Please sign in to comment.