-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added a config to limit the code-gen and class loading #83
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,6 +48,20 @@ public final class FastSerdeCache { | |
|
||
private static volatile FastSerdeCache _INSTANCE; | ||
|
||
/** | ||
* Fast-avro will generate and load serializer and deserializer(SerDes) classes into metaspace during runtime. | ||
* During serialization and deserialization, fast-avro also leverages JIT compilation to boost the SerDes speed. | ||
* And JIT compilation code is saved in code cache. | ||
* Too much usage of metaspace and code cache will bring GC/OOM issue. | ||
* | ||
* We set a hard limit of the total number of SerDes classes generated and loaded by fast-avro. | ||
* By default, the limit is set to MAX_INT. | ||
* Fast-avro will fall back to regular avro after the limit is hit. | ||
* One could set the limit through {@link FastSerdeCache} constructors. | ||
*/ | ||
private volatile int generatedFastSerDesLimit = Integer.MAX_VALUE; | ||
private final AtomicInteger generatedSerDesNum = new AtomicInteger(0); | ||
|
||
private final Map<String, FastDeserializer<?>> fastSpecificRecordDeserializersCache = | ||
new FastAvroConcurrentHashMap<>(); | ||
private final Map<String, FastDeserializer<?>> fastGenericRecordDeserializersCache = | ||
|
@@ -85,6 +99,20 @@ public FastSerdeCache(Executor executorService, Supplier<String> compileClassPat | |
this(executorService, compileClassPathSupplier.get()); | ||
} | ||
|
||
/** | ||
* | ||
* @param executorService | ||
* {@link Executor} used by serializer/deserializer compile threads | ||
* @param compileClassPathSupplier | ||
* custom classpath {@link Supplier} | ||
* @param limit | ||
* custom number {@link #generatedFastSerDesLimit} | ||
*/ | ||
public FastSerdeCache(Executor executorService, Supplier<String> compileClassPathSupplier, int limit) { | ||
this(executorService, compileClassPathSupplier); | ||
this.generatedFastSerDesLimit = limit; | ||
} | ||
|
||
public FastSerdeCache(String compileClassPath) { | ||
this(); | ||
this.compileClassPath = Optional.ofNullable(compileClassPath); | ||
|
@@ -122,10 +150,24 @@ public FastSerdeCache(Executor executorService) { | |
this.compileClassPath = Optional.empty(); | ||
} | ||
|
||
public FastSerdeCache(Executor executorService, int limit) { | ||
this(executorService); | ||
this.generatedFastSerDesLimit = limit; | ||
} | ||
|
||
private FastSerdeCache() { | ||
this((Executor) null); | ||
} | ||
|
||
/** | ||
* @param limit | ||
* custom number {@link #generatedFastSerDesLimit} | ||
*/ | ||
public FastSerdeCache(int limit) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to add other two constructors? Maybe this one is good enough. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No harm to add these two constructors :) For exmaple, I am using |
||
this(); | ||
this.generatedFastSerDesLimit = limit; | ||
} | ||
|
||
/** | ||
* Gets default {@link FastSerdeCache} instance. Default instance classpath can be customized via | ||
* {@value #CLASSPATH} or {@value #CLASSPATH_SUPPLIER} system properties. | ||
|
@@ -300,6 +342,25 @@ private String getSchemaKey(Schema writerSchema, Schema readerSchema) { | |
getSchemaFingerprint(readerSchema)); | ||
} | ||
|
||
/** | ||
* A wrapper function that limits the total number of fast de/serializer classes generated | ||
* | ||
* @param supplier The function to build and save fast de/serializer | ||
*/ | ||
private <T> T buildFastClassWithLimit(Supplier<T> supplier) { | ||
T result = null; | ||
if (this.generatedSerDesNum.get() < this.generatedFastSerDesLimit) { | ||
result = supplier.get(); | ||
} else if (this.generatedSerDesNum.get() == this.generatedFastSerDesLimit) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am seeing there are two counters, which are being maintained independently in FastSerdeCache and FastSerdeBase, and I think we should combine and just use one.
Essentially, the idea is to keep the counting logic in a single place to make it consistent. Let me know if you want to have a sync up about this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the detailed explaination! This approach indeed helps to reduce two counters to one. However, it also leads to redundant code-gens and compilations which are CPU intensive tasks. IIUC, there's no limit for the code-gen and compilation before we really load For the current implementation, we at most generate extra N - 1 fast classes, N is the threads number of FastSerdeCache Executor. So I think the current implementation is better. What do you think? |
||
// We still want to print the warning when the limit is hit | ||
LOGGER.warn("Generated fast serdes classes number hits limit {}", this.generatedFastSerDesLimit); | ||
} else { | ||
LOGGER.debug("Generated serdes number {}, with fast serdes limit set to {}", this.generatedSerDesNum.get(), this.generatedFastSerDesLimit); | ||
} | ||
generatedSerDesNum.incrementAndGet(); | ||
return result; | ||
} | ||
|
||
/** | ||
* This function will generate a fast specific deserializer, and it will throw exception if anything wrong happens. | ||
* This function can be used to verify whether current {@link FastSerdeCache} could generate proper fast deserializer. | ||
|
@@ -311,7 +372,7 @@ private String getSchemaKey(Schema writerSchema, Schema readerSchema) { | |
public FastDeserializer<?> buildFastSpecificDeserializer(Schema writerSchema, Schema readerSchema) { | ||
FastSpecificDeserializerGenerator<?> generator = | ||
new FastSpecificDeserializerGenerator<>(writerSchema, readerSchema, classesDir, classLoader, | ||
compileClassPath.orElseGet(() -> null)); | ||
compileClassPath.orElseGet(() -> null), generatedFastSerDesLimit); | ||
FastDeserializer<?> fastDeserializer = generator.generateDeserializer(); | ||
|
||
if (LOGGER.isDebugEnabled()) { | ||
|
@@ -337,7 +398,10 @@ public FastDeserializer<?> buildFastSpecificDeserializer(Schema writerSchema, Sc | |
*/ | ||
private FastDeserializer<?> buildSpecificDeserializer(Schema writerSchema, Schema readerSchema) { | ||
try { | ||
return buildFastSpecificDeserializer(writerSchema, readerSchema); | ||
FastDeserializer<?> fastSpecificDeserializer = buildFastClassWithLimit(() -> buildFastSpecificDeserializer(writerSchema, readerSchema)); | ||
if (fastSpecificDeserializer != null) { | ||
return fastSpecificDeserializer; | ||
} | ||
} catch (FastDeserializerGeneratorException e) { | ||
LOGGER.warn("Deserializer generation exception when generating specific FastDeserializer for writer schema: " | ||
+ "[\n{}\n] and reader schema: [\n{}\n]", writerSchema.toString(true), readerSchema.toString(true), e); | ||
|
@@ -366,7 +430,7 @@ public Object deserialize(Object reuse, Decoder d) throws IOException { | |
public FastDeserializer<?> buildFastGenericDeserializer(Schema writerSchema, Schema readerSchema) { | ||
FastGenericDeserializerGenerator<?> generator = | ||
new FastGenericDeserializerGenerator<>(writerSchema, readerSchema, classesDir, classLoader, | ||
compileClassPath.orElseGet(() -> null)); | ||
compileClassPath.orElseGet(() -> null), generatedFastSerDesLimit); | ||
|
||
FastDeserializer<?> fastDeserializer = generator.generateDeserializer(); | ||
|
||
|
@@ -394,7 +458,10 @@ public FastDeserializer<?> buildFastGenericDeserializer(Schema writerSchema, Sch | |
*/ | ||
private FastDeserializer<?> buildGenericDeserializer(Schema writerSchema, Schema readerSchema) { | ||
try { | ||
return buildFastGenericDeserializer(writerSchema, readerSchema); | ||
FastDeserializer<?> fastGenericDeserializer = buildFastClassWithLimit(() -> buildFastGenericDeserializer(writerSchema, readerSchema)); | ||
if (fastGenericDeserializer != null) { | ||
return fastGenericDeserializer; | ||
} | ||
} catch (FastDeserializerGeneratorException e) { | ||
LOGGER.warn("Deserializer generation exception when generating generic FastDeserializer for writer schema: [\n" | ||
+ writerSchema.toString(true) + "\n] and reader schema:[\n" + readerSchema.toString(true) + "\n]", e); | ||
|
@@ -419,7 +486,7 @@ public FastSerializer<?> buildFastSpecificSerializer(Schema schema) { | |
Utils.getAvroVersionsSupportedForSerializer()); | ||
} | ||
FastSpecificSerializerGenerator<?> generator = | ||
new FastSpecificSerializerGenerator<>(schema, classesDir, classLoader, compileClassPath.orElseGet(() -> null)); | ||
new FastSpecificSerializerGenerator<>(schema, classesDir, classLoader, compileClassPath.orElseGet(() -> null), generatedFastSerDesLimit); | ||
|
||
if (LOGGER.isDebugEnabled()) { | ||
LOGGER.debug("Generated classes dir: {} and generation of specific FastSerializer is done for schema of type: {}" + | ||
|
@@ -437,7 +504,10 @@ private FastSerializer<?> buildSpecificSerializer(Schema schema) { | |
if (Utils.isSupportedAvroVersionsForSerializer()) { | ||
// Only build fast specific serializer for supported Avro versions. | ||
try { | ||
return buildFastSpecificSerializer(schema); | ||
FastSerializer<?> fastSpecificSerializer = buildFastClassWithLimit(() -> buildFastSpecificSerializer(schema)); | ||
if (fastSpecificSerializer != null) { | ||
return fastSpecificSerializer; | ||
} | ||
} catch (FastDeserializerGeneratorException e) { | ||
LOGGER.warn("Serializer generation exception when generating specific FastSerializer for schema: [\n{}\n]", | ||
schema.toString(true), e); | ||
|
@@ -463,7 +533,7 @@ public FastSerializer<?> buildFastGenericSerializer(Schema schema) { | |
+ Utils.getAvroVersionsSupportedForSerializer()); | ||
} | ||
FastGenericSerializerGenerator<?> generator = | ||
new FastGenericSerializerGenerator<>(schema, classesDir, classLoader, compileClassPath.orElseGet(() -> null)); | ||
new FastGenericSerializerGenerator<>(schema, classesDir, classLoader, compileClassPath.orElseGet(() -> null), generatedFastSerDesLimit); | ||
|
||
if (LOGGER.isDebugEnabled()) { | ||
LOGGER.debug("Generated classes dir: {} and generation of generic FastSerializer is done for schema of type: {}" + | ||
|
@@ -481,7 +551,10 @@ private FastSerializer<?> buildGenericSerializer(Schema schema) { | |
if (Utils.isSupportedAvroVersionsForSerializer()) { | ||
// Only build fast generic serializer for supported Avro versions. | ||
try { | ||
return buildFastGenericSerializer(schema); | ||
FastSerializer<?> fastGenericSerializer = buildFastClassWithLimit(() -> buildFastGenericSerializer(schema)); | ||
if (fastGenericSerializer != null) { | ||
return fastGenericSerializer; | ||
} | ||
} catch (FastDeserializerGeneratorException e) { | ||
LOGGER.warn("Serializer generation exception when generating generic FastSerializer for schema: [\n{}\n]", | ||
schema.toString(true), e); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be clearer to just throw the exception instead of returning null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will fix it.