From 36419bdb619bc001c568ac728d09d845aa4db359 Mon Sep 17 00:00:00 2001 From: xiniwang Date: Wed, 15 Jul 2020 11:30:36 -0700 Subject: [PATCH] add hard limit for FastSerdeCache --- .../avro/fastserde/FastSerdeCache.java | 91 ++++++++++++++++--- 1 file changed, 77 insertions(+), 14 deletions(-) diff --git a/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeCache.java b/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeCache.java index a0a61c201..0c2609030 100644 --- a/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeCache.java +++ b/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeCache.java @@ -44,6 +44,20 @@ public final class FastSerdeCache { private static volatile FastSerdeCache _INSTANCE; + /** + * Fast-avro will generate and load serializer and deserializer(SerDes) classes into metaspace during runtime. + * During serialization and deserialization, fast-avro also leverages JIT compilation to boost the SerDes speed. + * And JIT compilation code is saved in code cache. + * Too much usage of metaspace and code cache will bring GC/OOM issue. + * + * We set a hard limit of the total number of SerDes classes generated and loaded by fast-avro. + * By default, the limit is set to MAX_INT. + * Fast-avro will fall back to regular avro after the limit is hit. + * One could set the limit through {@link FastSerdeCache} constructors. + */ + private volatile int generatedFastSerDesLimit = Integer.MAX_VALUE; + private final AtomicInteger generatedSerDesNum = new AtomicInteger(0); + private final Map> fastSpecificRecordDeserializersCache = new FastAvroConcurrentHashMap<>(); private final Map> fastGenericRecordDeserializersCache = @@ -81,6 +95,20 @@ public FastSerdeCache(Executor executorService, Supplier compileClassPat this(executorService, compileClassPathSupplier.get()); } + /** + * + * @param executorService + * {@link Executor} used by serializer/deserializer compile threads + * @param compileClassPathSupplier + * custom classpath {@link Supplier} + * @param limit + * custom number {@link #generatedFastSerDesLimit} + */ + public FastSerdeCache(Executor executorService, Supplier compileClassPathSupplier, int limit) { + this(executorService, compileClassPathSupplier); + this.generatedFastSerDesLimit = limit; + } + public FastSerdeCache(String compileClassPath) { this(); this.compileClassPath = Optional.ofNullable(compileClassPath); @@ -122,6 +150,15 @@ private FastSerdeCache() { this((Executor) null); } + /** + * @param limit + * custom number {@link #generatedFastSerDesLimit} + */ + public FastSerdeCache(int limit) { + this(); + this.generatedFastSerDesLimit = limit; + } + /** * Gets default {@link FastSerdeCache} instance. Default instance classpath can be customized via * {@value #CLASSPATH} or {@value #CLASSPATH_SUPPLIER} system properties. @@ -206,10 +243,11 @@ public FastDeserializer getFastSpecificDeserializer(Schema writerSchema, Sche new FastDeserializerWithAvroSpecificImpl<>(writerSchema, readerSchema)); if (deserializer == null) { deserializer = fastSpecificRecordDeserializersCache.get(schemaKey); - CompletableFuture.supplyAsync(() -> buildSpecificDeserializer(writerSchema, readerSchema), executor) - .thenAccept(d -> { - fastSpecificRecordDeserializersCache.put(schemaKey, d); - }); + buildFastClassWithNumCheck( + () -> CompletableFuture.supplyAsync(() -> buildSpecificDeserializer(writerSchema, readerSchema), executor) + .thenAccept(s -> { + fastSpecificRecordDeserializersCache.put(schemaKey, s); + })); } } @@ -234,10 +272,11 @@ public FastDeserializer getFastGenericDeserializer(Schema writerSchema, Schem new FastDeserializerWithAvroGenericImpl(writerSchema, readerSchema)); if (deserializer == null) { deserializer = fastGenericRecordDeserializersCache.get(schemaKey); - CompletableFuture.supplyAsync(() -> buildGenericDeserializer(writerSchema, readerSchema), executor) - .thenAccept(d -> { - fastGenericRecordDeserializersCache.put(schemaKey, d); - }); + buildFastClassWithNumCheck( + () -> CompletableFuture.supplyAsync(() -> buildGenericDeserializer(writerSchema, readerSchema), executor) + .thenAccept(s -> { + fastGenericRecordDeserializersCache.put(schemaKey, s); + })); } } return deserializer; @@ -258,9 +297,11 @@ public FastSerializer getFastSpecificSerializer(Schema schema) { fastSpecificRecordSerializersCache.putIfAbsent(schemaKey, new FastSerializerWithAvroSpecificImpl(schema)); if (serializer == null) { serializer = fastSpecificRecordSerializersCache.get(schemaKey); - CompletableFuture.supplyAsync(() -> buildSpecificSerializer(schema), executor).thenAccept(s -> { - fastSpecificRecordSerializersCache.put(schemaKey, s); - }); + buildFastClassWithNumCheck( + () -> CompletableFuture.supplyAsync(() -> buildSpecificSerializer(schema), executor) + .thenAccept(s -> { + fastSpecificRecordSerializersCache.put(schemaKey, s); + })); } } @@ -283,9 +324,11 @@ public FastSerializer getFastGenericSerializer(Schema schema) { fastGenericRecordSerializersCache.putIfAbsent(schemaKey, new FastSerializerWithAvroGenericImpl(schema)); if (serializer == null) { serializer = fastGenericRecordSerializersCache.get(schemaKey); - CompletableFuture.supplyAsync(() -> buildGenericSerializer(schema), executor).thenAccept(s -> { - fastGenericRecordSerializersCache.put(schemaKey, s); - }); + buildFastClassWithNumCheck( + () -> CompletableFuture.supplyAsync(() -> buildGenericSerializer(schema), executor) + .thenAccept(s -> { + fastGenericRecordSerializersCache.put(schemaKey, s); + })); } } return serializer; @@ -296,6 +339,26 @@ private String getSchemaKey(Schema writerSchema, Schema readerSchema) { Utils.getSchemaFingerprint(readerSchema)); } + /** + * A wrapper function that limits the total number of fast de/serilizer calsses generated + * + * @param runnable The function to build and save fast de/serializer + */ + private void buildFastClassWithNumCheck(Runnable runnable) { + try { + if (this.generatedSerDesNum.incrementAndGet() <= this.generatedFastSerDesLimit) { + runnable.run(); + } else if (this.generatedSerDesNum.get() == this.generatedFastSerDesLimit + 1) { + // We still want to print the warning when the limit is hit + LOGGER.warn("Generated fast serdes classes number hits limit {}", this.generatedFastSerDesLimit); + } else { + LOGGER.debug("Generated serdes number {}, with fast serdes limit set to {}", this.generatedSerDesNum.get(), this.generatedFastSerDesLimit); + } + } catch (Exception e) { + LOGGER.error("Fast serdes class generation failed"); + } + } + /** * This function will generate a fast specific deserializer, and it will throw exception if anything wrong happens. * This function can be used to verify whether current {@link FastSerdeCache} could generate proper fast deserializer.