Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a config to limit the code-gen and class loading #83

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public class FastDeserializerGenerator<T> extends FastDeserializerGeneratorBase<
super(useGenericTypes, writer, reader, destination, classLoader, compileClassPath);
}

FastDeserializerGenerator(boolean useGenericTypes, Schema writer, Schema reader, File destination,
ClassLoader classLoader, String compileClassPath, int loadClassLimit) {
super(useGenericTypes, writer, reader, destination, classLoader, compileClassPath, loadClassLimit);
}

public FastDeserializer<T> generateDeserializer() {
String className = getClassName(writer, reader, useGenericTypes ? "Generic" : "Specific");
JPackage classPackage = codeModel._package(generatedPackageName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ public abstract class FastDeserializerGeneratorBase<T> extends FastSerdeBase {
this.reader = reader;
}

FastDeserializerGeneratorBase(boolean useGenericTypes, Schema writer, Schema reader, File destination, ClassLoader classLoader,
String compileClassPath, int loadClassLimit) {
super("deserialization", useGenericTypes, Utf8.class, destination, classLoader, compileClassPath, false, loadClassLimit);
this.writer = writer;
this.reader = reader;
}

protected static Symbol[] reverseSymbolArray(Symbol[] symbols) {
Symbol[] reversedSymbols = new Symbol[symbols.length];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ public final class FastGenericDeserializerGenerator<T> extends FastDeserializerG
String compileClassPath) {
super(true, writer, reader, destination, classLoader, compileClassPath);
}

FastGenericDeserializerGenerator(Schema writer, Schema reader, File destination, ClassLoader classLoader,
String compileClassPath, int loadClassLimit) {
super(true, writer, reader, destination, classLoader, compileClassPath, loadClassLimit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ public FastGenericSerializerGenerator(Schema schema, File destination, ClassLoad
String compileClassPath) {
super(true, schema, destination, classLoader, compileClassPath);
}

public FastGenericSerializerGenerator(Schema schema, File destination, ClassLoader classLoader,
String compileClassPath, int loadClassLimit) {
super(true, schema, destination, classLoader, compileClassPath, loadClassLimit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.tools.JavaCompiler;
import javax.tools.ToolProvider;
import org.apache.avro.Schema;
Expand All @@ -32,6 +33,9 @@ public abstract class FastSerdeBase {
protected static final String SEP = "_";
public static final String GENERATED_PACKAGE_NAME_PREFIX = "com.linkedin.avro.fastserde.generated.";

private volatile int loadClassLimit = Integer.MAX_VALUE;
private int loadClassNum = 0;

/**
* A repository of how many times a given name was used.
* N.B.: Does not actually need to be threadsafe, but it is made so just for defensive coding reasons.
Expand All @@ -47,6 +51,12 @@ public abstract class FastSerdeBase {
protected final String compileClassPath;
protected JDefinedClass generatedClass;

public FastSerdeBase(String description, boolean useGenericTypes, Class defaultStringClass, File destination, ClassLoader classLoader,
String compileClassPath, boolean isForSerializer, int loadClassLimit) {
this(description, useGenericTypes, defaultStringClass, destination, classLoader, compileClassPath, isForSerializer);
this.loadClassLimit = loadClassLimit;
}

public FastSerdeBase(String description, boolean useGenericTypes, Class defaultStringClass, File destination, ClassLoader classLoader,
String compileClassPath, boolean isForSerializer) {
this.useGenericTypes = useGenericTypes;
Expand Down Expand Up @@ -136,6 +146,29 @@ protected Class compileClass(final String className, Set<String> knownUsedFullyQ
throw new FastSerdeGeneratorException("Unable to compile:" + className + " from source file: " + filePath);
}

return classLoader.loadClass(generatedPackageName + "." + className);
return loadClassWithLimit(() -> {
try {
return classLoader.loadClass(generatedPackageName + "." + className);
} catch (ClassNotFoundException e) {
throw new FastSerdeGeneratorException("Unable to load:" + className + " from source file: " + filePath, e);
}
});
}

/**
* A wrapper function that limits the total number of fast de/serializer classes loaded
*
* @param supplier The function to load fast de/serializer class
*/
protected synchronized <T> T loadClassWithLimit(Supplier<T> supplier) {
if (this.loadClassNum < this.loadClassLimit) {
T classObj = null;
classObj = supplier.get();
this.loadClassNum++;
return classObj;
} else {
LOGGER.warn("Loaded fast serdes classes number {}, with limit set to {}", loadClassNum, loadClassLimit);
}
return null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be clearer to just throw the exception instead of returning null.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will fix it.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ public final class FastSerdeCache {

private static volatile FastSerdeCache _INSTANCE;

/**
* Fast-avro will generate and load serializer and deserializer(SerDes) classes into metaspace during runtime.
* During serialization and deserialization, fast-avro also leverages JIT compilation to boost the SerDes speed.
* And JIT compilation code is saved in code cache.
* Too much usage of metaspace and code cache will bring GC/OOM issue.
*
* We set a hard limit of the total number of SerDes classes generated and loaded by fast-avro.
* By default, the limit is set to MAX_INT.
* Fast-avro will fall back to regular avro after the limit is hit.
* One could set the limit through {@link FastSerdeCache} constructors.
*/
private volatile int generatedFastSerDesLimit = Integer.MAX_VALUE;
private final AtomicInteger generatedSerDesNum = new AtomicInteger(0);

private final Map<String, FastDeserializer<?>> fastSpecificRecordDeserializersCache =
new FastAvroConcurrentHashMap<>();
private final Map<String, FastDeserializer<?>> fastGenericRecordDeserializersCache =
Expand Down Expand Up @@ -85,6 +99,20 @@ public FastSerdeCache(Executor executorService, Supplier<String> compileClassPat
this(executorService, compileClassPathSupplier.get());
}

/**
*
* @param executorService
* {@link Executor} used by serializer/deserializer compile threads
* @param compileClassPathSupplier
* custom classpath {@link Supplier}
* @param limit
* custom number {@link #generatedFastSerDesLimit}
*/
public FastSerdeCache(Executor executorService, Supplier<String> compileClassPathSupplier, int limit) {
this(executorService, compileClassPathSupplier);
this.generatedFastSerDesLimit = limit;
}

public FastSerdeCache(String compileClassPath) {
this();
this.compileClassPath = Optional.ofNullable(compileClassPath);
Expand Down Expand Up @@ -122,10 +150,24 @@ public FastSerdeCache(Executor executorService) {
this.compileClassPath = Optional.empty();
}

public FastSerdeCache(Executor executorService, int limit) {
this(executorService);
this.generatedFastSerDesLimit = limit;
}

private FastSerdeCache() {
this((Executor) null);
}

/**
* @param limit
* custom number {@link #generatedFastSerDesLimit}
*/
public FastSerdeCache(int limit) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add other two constructors? Maybe this one is good enough.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No harm to add these two constructors :) For exmaple, I am using FastSerdeCache(Executor executorService, int limit) in the test.

this();
this.generatedFastSerDesLimit = limit;
}

/**
* Gets default {@link FastSerdeCache} instance. Default instance classpath can be customized via
* {@value #CLASSPATH} or {@value #CLASSPATH_SUPPLIER} system properties.
Expand Down Expand Up @@ -300,6 +342,25 @@ private String getSchemaKey(Schema writerSchema, Schema readerSchema) {
getSchemaFingerprint(readerSchema));
}

/**
* A wrapper function that limits the total number of fast de/serializer classes generated
*
* @param supplier The function to build and save fast de/serializer
*/
private <T> T buildFastClassWithLimit(Supplier<T> supplier) {
T result = null;
if (this.generatedSerDesNum.get() < this.generatedFastSerDesLimit) {
result = supplier.get();
} else if (this.generatedSerDesNum.get() == this.generatedFastSerDesLimit) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am seeing there are two counters, which are being maintained independently in FastSerdeCache and FastSerdeBase, and I think we should combine and just use one.
The idea in my mind:

  1. Maintaining the counter and limit in FastSerdeCache.
  2. Passing the limit enforcement function to FastSerdeBase via all the Generator classes.
  3. The limit enforcement function can be this way:
private int generatedSerDeNum;
  private int generatedFastSerDeLimit;
  Predicate<Boolean> limitPredicate = (whetherIncrementCounter) -> {
    synchronized (this) {
      if (generatedSerDeNum >= generatedFastSerDeLimit) {
        return false;
      }
      if (whetherIncrementCounter) {
        ++generatedSerDeNum;
      }
      return true;
    }
  };
  1. In FastSerdeCache and all kinds of generators before the fast class generation, we should call limitPredicate(false), and fail fast when this predicate returns false.
  2. In FastSerdeBase, before loading the new generated class, we should call limitPredicate(true), and fail fast if the predicate return false.

Essentially, the idea is to keep the counting logic in a single place to make it consistent.
Invoking this function in FastSerdeCache and various Generators is to try to avoid useless work.

Let me know if you want to have a sync up about this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed explaination!

This approach indeed helps to reduce two counters to one. However, it also leads to redundant code-gens and compilations which are CPU intensive tasks. IIUC, there's no limit for the code-gen and compilation before we really load generatedFastSerDeLimit number of fast classes. So the corner case is fast-avro may generate and compile a great number of fast classes, and then throw them away.

For the current implementation, we at most generate extra N - 1 fast classes, N is the threads number of FastSerdeCache Executor. So I think the current implementation is better. What do you think?

// We still want to print the warning when the limit is hit
LOGGER.warn("Generated fast serdes classes number hits limit {}", this.generatedFastSerDesLimit);
} else {
LOGGER.debug("Generated serdes number {}, with fast serdes limit set to {}", this.generatedSerDesNum.get(), this.generatedFastSerDesLimit);
}
generatedSerDesNum.incrementAndGet();
return result;
}

/**
* This function will generate a fast specific deserializer, and it will throw exception if anything wrong happens.
* This function can be used to verify whether current {@link FastSerdeCache} could generate proper fast deserializer.
Expand All @@ -311,7 +372,7 @@ private String getSchemaKey(Schema writerSchema, Schema readerSchema) {
public FastDeserializer<?> buildFastSpecificDeserializer(Schema writerSchema, Schema readerSchema) {
FastSpecificDeserializerGenerator<?> generator =
new FastSpecificDeserializerGenerator<>(writerSchema, readerSchema, classesDir, classLoader,
compileClassPath.orElseGet(() -> null));
compileClassPath.orElseGet(() -> null), generatedFastSerDesLimit);
FastDeserializer<?> fastDeserializer = generator.generateDeserializer();

if (LOGGER.isDebugEnabled()) {
Expand All @@ -337,7 +398,10 @@ public FastDeserializer<?> buildFastSpecificDeserializer(Schema writerSchema, Sc
*/
private FastDeserializer<?> buildSpecificDeserializer(Schema writerSchema, Schema readerSchema) {
try {
return buildFastSpecificDeserializer(writerSchema, readerSchema);
FastDeserializer<?> fastSpecificDeserializer = buildFastClassWithLimit(() -> buildFastSpecificDeserializer(writerSchema, readerSchema));
if (fastSpecificDeserializer != null) {
return fastSpecificDeserializer;
}
} catch (FastDeserializerGeneratorException e) {
LOGGER.warn("Deserializer generation exception when generating specific FastDeserializer for writer schema: "
+ "[\n{}\n] and reader schema: [\n{}\n]", writerSchema.toString(true), readerSchema.toString(true), e);
Expand Down Expand Up @@ -366,7 +430,7 @@ public Object deserialize(Object reuse, Decoder d) throws IOException {
public FastDeserializer<?> buildFastGenericDeserializer(Schema writerSchema, Schema readerSchema) {
FastGenericDeserializerGenerator<?> generator =
new FastGenericDeserializerGenerator<>(writerSchema, readerSchema, classesDir, classLoader,
compileClassPath.orElseGet(() -> null));
compileClassPath.orElseGet(() -> null), generatedFastSerDesLimit);

FastDeserializer<?> fastDeserializer = generator.generateDeserializer();

Expand Down Expand Up @@ -394,7 +458,10 @@ public FastDeserializer<?> buildFastGenericDeserializer(Schema writerSchema, Sch
*/
private FastDeserializer<?> buildGenericDeserializer(Schema writerSchema, Schema readerSchema) {
try {
return buildFastGenericDeserializer(writerSchema, readerSchema);
FastDeserializer<?> fastGenericDeserializer = buildFastClassWithLimit(() -> buildFastGenericDeserializer(writerSchema, readerSchema));
if (fastGenericDeserializer != null) {
return fastGenericDeserializer;
}
} catch (FastDeserializerGeneratorException e) {
LOGGER.warn("Deserializer generation exception when generating generic FastDeserializer for writer schema: [\n"
+ writerSchema.toString(true) + "\n] and reader schema:[\n" + readerSchema.toString(true) + "\n]", e);
Expand All @@ -419,7 +486,7 @@ public FastSerializer<?> buildFastSpecificSerializer(Schema schema) {
Utils.getAvroVersionsSupportedForSerializer());
}
FastSpecificSerializerGenerator<?> generator =
new FastSpecificSerializerGenerator<>(schema, classesDir, classLoader, compileClassPath.orElseGet(() -> null));
new FastSpecificSerializerGenerator<>(schema, classesDir, classLoader, compileClassPath.orElseGet(() -> null), generatedFastSerDesLimit);

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Generated classes dir: {} and generation of specific FastSerializer is done for schema of type: {}" +
Expand All @@ -437,7 +504,10 @@ private FastSerializer<?> buildSpecificSerializer(Schema schema) {
if (Utils.isSupportedAvroVersionsForSerializer()) {
// Only build fast specific serializer for supported Avro versions.
try {
return buildFastSpecificSerializer(schema);
FastSerializer<?> fastSpecificSerializer = buildFastClassWithLimit(() -> buildFastSpecificSerializer(schema));
if (fastSpecificSerializer != null) {
return fastSpecificSerializer;
}
} catch (FastDeserializerGeneratorException e) {
LOGGER.warn("Serializer generation exception when generating specific FastSerializer for schema: [\n{}\n]",
schema.toString(true), e);
Expand All @@ -463,7 +533,7 @@ public FastSerializer<?> buildFastGenericSerializer(Schema schema) {
+ Utils.getAvroVersionsSupportedForSerializer());
}
FastGenericSerializerGenerator<?> generator =
new FastGenericSerializerGenerator<>(schema, classesDir, classLoader, compileClassPath.orElseGet(() -> null));
new FastGenericSerializerGenerator<>(schema, classesDir, classLoader, compileClassPath.orElseGet(() -> null), generatedFastSerDesLimit);

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Generated classes dir: {} and generation of generic FastSerializer is done for schema of type: {}" +
Expand All @@ -481,7 +551,10 @@ private FastSerializer<?> buildGenericSerializer(Schema schema) {
if (Utils.isSupportedAvroVersionsForSerializer()) {
// Only build fast generic serializer for supported Avro versions.
try {
return buildFastGenericSerializer(schema);
FastSerializer<?> fastGenericSerializer = buildFastClassWithLimit(() -> buildFastGenericSerializer(schema));
if (fastGenericSerializer != null) {
return fastGenericSerializer;
}
} catch (FastDeserializerGeneratorException e) {
LOGGER.warn("Serializer generation exception when generating generic FastSerializer for schema: [\n{}\n]",
schema.toString(true), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ public FastSerializerGenerator(boolean useGenericTypes, Schema schema, File dest
this.schema = schema;
}

public FastSerializerGenerator(boolean useGenericTypes, Schema schema, File destination, ClassLoader classLoader,
String compileClassPath, int loadClassLimit) {
super("serialization", useGenericTypes, CharSequence.class, destination, classLoader, compileClassPath, true, loadClassLimit);
this.schema = schema;
}

public static String getClassName(Schema schema, String description) {
Long schemaId = Math.abs(Utils.getSchemaFingerprint(schema));
String typeName = SchemaAssistant.getTypeName(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ public final class FastSpecificDeserializerGenerator<T> extends FastDeserializer
String compileClassPath) {
super(false, writer, reader, destination, classLoader, compileClassPath);
}

FastSpecificDeserializerGenerator(Schema writer, Schema reader, File destination, ClassLoader classLoader,
String compileClassPath, int loadClassLimit) {
super(false, writer, reader, destination, classLoader, compileClassPath, loadClassLimit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ public FastSpecificSerializerGenerator(Schema schema, File destination, ClassLoa
String compileClassPath) {
super(false, schema, destination, classLoader, compileClassPath);
}

public FastSpecificSerializerGenerator(Schema schema, File destination, ClassLoader classLoader,
String compileClassPath, int loadClassLimit) {
super(false, schema, destination, classLoader, compileClassPath, loadClassLimit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,51 @@ public void shouldCreateGenericDatumReader() throws IOException, InterruptedExce
Assert.assertEquals(new Utf8("test"),
fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record)).get("test"));
}

@Test(groups = {"deserializationTest"})
@SuppressWarnings("unchecked")
public void shouldNotCreateFastDeserializerDueToLimit() throws IOException, InterruptedException {
// Set generatedFastSerDesLimit to 1
// Try to generate fast deserializer for the first schema
Schema recordSchema1 = createRecord("TestSchema1", createPrimitiveUnionFieldSchema("test", Schema.Type.STRING));
GenericRecord record1 = new GenericData.Record(recordSchema1);
record1.put("test", "test");

FastSerdeCache cacheLimit1 = new FastSerdeCache(Runnable::run, 1);
FastGenericDatumReader<GenericRecord> fastGenericDatumReader = new FastGenericDatumReader<>(recordSchema1, cacheLimit1);

// when
fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record1));

// then
FastDeserializer<GenericRecord> fastGenericDeserializer =
(FastDeserializer<GenericRecord>) cacheLimit1.getFastGenericDeserializer(recordSchema1, recordSchema1);

fastGenericDeserializer =
(FastDeserializer<GenericRecord>) cacheLimit1.getFastGenericDeserializer(recordSchema1, recordSchema1);

Assert.assertNotNull(fastGenericDeserializer);
Assert.assertNotEquals(1, fastGenericDeserializer.getClass().getDeclaredMethods().length);
Assert.assertEquals(new Utf8("test"),
fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record1)).get("test"));

// Try to generate fast deserializer for the second schema
// Verify only return FastDeserializerWithAvroGenericImpl
Schema recordSchema2 = createRecord("TestSchema2", createPrimitiveUnionFieldSchema("test", Schema.Type.STRING));
GenericRecord record2 = new GenericData.Record(recordSchema2);
record2.put("test", "test");

// when
fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record2));

// then
FastDeserializer<GenericRecord> fastGenericDeserializer2 =
(FastDeserializer<GenericRecord>) cacheLimit1.getFastGenericDeserializer(recordSchema2, recordSchema2);

fastGenericDeserializer2 =
(FastDeserializer<GenericRecord>) cacheLimit1.getFastGenericDeserializer(recordSchema2, recordSchema2);

Assert.assertNotNull(fastGenericDeserializer2);
Assert.assertEquals(1, fastGenericDeserializer2.getClass().getDeclaredMethods().length);
}
}
Loading