Skip to content

Commit

Permalink
Improve performance of avro codegen
Browse files Browse the repository at this point in the history
  • Loading branch information
li-kramgopa committed Jan 16, 2024
1 parent cdf7f90 commit db5faab
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
import com.squareup.javapoet.JavaFile;
import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
Expand Down Expand Up @@ -57,85 +55,65 @@ public void createOperations(BuilderPluginContext context) {
}

private void generateCode(OperationContext opContext) {
//mkdir any output folders that don't exist
if (!config.getOutputSpecificRecordClassesRoot().exists() && !config.getOutputSpecificRecordClassesRoot()
.mkdirs()) {
throw new IllegalStateException(
"unable to create destination folder " + config.getOutputSpecificRecordClassesRoot());
// Make sure the output folder exists
File outputFolder = config.getOutputSpecificRecordClassesRoot();
if (!outputFolder.exists() && !outputFolder.mkdirs()) {
throw new IllegalStateException("unable to create output folder " + outputFolder);
}
final Path outputDirectoryPath = outputFolder.toPath();

final AtomicInteger schemaCounter = new AtomicInteger(0);
final int schemaChunkSize = 500;
Collection<List<AvroNamedSchema>> allNamedSchemaList = opContext.getAvroSchemas().stream().flatMap(schema -> {
if (schema instanceof AvroNamedSchema) {
return Stream.of((AvroNamedSchema) schema);
} else if (AvroType.UNION.equals(schema.type())) {
return ((AvroUnionSchema) schema).getTypes()
.stream()
.map(schemaOrRef -> (AvroNamedSchema) schemaOrRef.getSchema());
} else {
return Stream.empty();
}
}).collect(Collectors.groupingBy(it -> schemaCounter.getAndIncrement() / schemaChunkSize)).values();

// Collect all named schemas.
long collectStart = System.currentTimeMillis();
Collection<AvroNamedSchema> allNamedSchemas = opContext.getAvroSchemas()
.stream()
.flatMap(schema -> {
if (schema instanceof AvroNamedSchema) {
return Stream.of((AvroNamedSchema) schema);
} else if (AvroType.UNION.equals(schema.type())) {
return ((AvroUnionSchema) schema).getTypes()
.stream()
.filter(schemaOrRef -> schemaOrRef.getSchema() instanceof AvroNamedSchema)
.map(schemaOrRef -> (AvroNamedSchema) schemaOrRef.getSchema());
} else {
return Stream.empty();
}
})
.flatMap(namedSchema -> {
// Collect inner schemas.
return Stream.concat(Stream.of(namedSchema),
SpecificRecordGeneratorUtil.getNestedInternalSchemaList(namedSchema).stream());
})
.filter(namedSchema -> {
// Skip codegen if schema is on classpath and config says to skip
return !config.shouldSkipCodegenIfSchemaOnClasspath() || !doesSchemaExistOnClasspath(namedSchema,
opContext.getLookupSchemaSet());
})
.collect(Collectors.toMap(AvroNamedSchema::getFullName, Function.identity(), (s1, s2) -> s1))
.values();
long collectEnd = System.currentTimeMillis();
LOGGER.info("Collected {} named avro schemas in {} millis", allNamedSchemas.size(), collectEnd - collectStart);

// Generate avro binding java classes.
long genStart = System.currentTimeMillis();

final SpecificRecordGenerationConfig generationConfig =
SpecificRecordGenerationConfig.getBroadCompatibilitySpecificRecordGenerationConfig(
AvroJavaStringRepresentation.fromJson(config.getStringRepresentation().toString()),
AvroJavaStringRepresentation.fromJson(config.getMethodStringRepresentation().toString()),
config.getMinAvroVersion(), config.isUtf8EncodingPutByIndexEnabled());

// Make sure the output folder exists
File outputFolder = config.getOutputSpecificRecordClassesRoot();
if (!outputFolder.exists() && !outputFolder.mkdirs()) {
throw new IllegalStateException("unable to create output folder " + outputFolder);
}
final Path outputDirectoryPath = outputFolder.toPath();
final SpecificRecordClassGenerator generator = new SpecificRecordClassGenerator();

int totalGeneratedClasses = allNamedSchemaList.parallelStream().map(allNamedSchemas -> {
HashSet<String> alreadyGeneratedSchemaNames = new HashSet<>();
List<JavaFile> generatedSpecificClasses = new ArrayList<>(allNamedSchemas.size());
for (AvroNamedSchema namedSchema : allNamedSchemas) {
try {
if (!alreadyGeneratedSchemaNames.contains(namedSchema.getFullName())) {
// skip codegen if schema is on classpath and config says to skip
if (config.shouldSkipCodegenIfSchemaOnClasspath() &&
doesSchemaExistOnClasspath(namedSchema, opContext.getLookupSchemaSet())) {
continue;
}

//top level schema
alreadyGeneratedSchemaNames.add(namedSchema.getFullName());
generatedSpecificClasses.add(generator.generateSpecificClass(namedSchema, generationConfig));

// generate internal schemas if not already present
List<AvroNamedSchema> internalSchemaList =
SpecificRecordGeneratorUtil.getNestedInternalSchemaList(namedSchema);
for (AvroNamedSchema namedInternalSchema : internalSchemaList) {
if (!alreadyGeneratedSchemaNames.contains(namedInternalSchema.getFullName())) {
// skip codegen for nested schemas if schema is on classpath and config says to skip
if (config.shouldSkipCodegenIfSchemaOnClasspath() &&
doesSchemaExistOnClasspath(namedInternalSchema, opContext.getLookupSchemaSet())) {
continue;
}

generatedSpecificClasses.add(generator.generateSpecificClass(namedInternalSchema, generationConfig));
alreadyGeneratedSchemaNames.add(namedInternalSchema.getFullName());
}
}
}
} catch (Exception e) {
throw new RuntimeException("failed to generate class for " + namedSchema.getFullName(), e);
}
List<JavaFile> generatedClasses = allNamedSchemas.parallelStream().map(namedSchema -> {
try {
// Top level schema
return generator.generateSpecificClass(namedSchema, generationConfig);
} catch (Exception e) {
throw new RuntimeException("failed to generate class for " + namedSchema.getFullName(), e);
}
writeJavaFilesToDisk(generatedSpecificClasses, outputDirectoryPath);
return generatedSpecificClasses.size();
}).reduce(0, Integer::sum);

}).collect(Collectors.toList());
long genEnd = System.currentTimeMillis();
LOGGER.info("Generated {} java source files in {} millis", totalGeneratedClasses, genEnd - genStart);
LOGGER.info("Generated {} java source files in {} millis", generatedClasses.size(), genEnd - genStart);

// Write to disk.
writeJavaFilesToDisk(generatedClasses, outputDirectoryPath);
}

private boolean doesSchemaExistOnClasspath(AvroNamedSchema schema, SchemaSet schemaSet) {
Expand All @@ -162,7 +140,7 @@ private void writeJavaFilesToDisk(Collection<JavaFile> javaFiles, Path outputFol
}).reduce(0, Integer::sum);

long writeEnd = System.currentTimeMillis();
LOGGER.info("wrote out {} generated java source files under {} in {} millis", filesWritten, outputFolderPath,
LOGGER.info("Wrote out {} generated java source files under {} in {} millis", filesWritten, outputFolderPath,
writeEnd - writeStart);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.squareup.javapoet.TypeName;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -206,7 +207,7 @@ public static List<AvroNamedSchema> getNestedInternalSchemaList(AvroNamedSchema
switch (type) {
case ENUM:
case FIXED:
return new ArrayList<>();
return Collections.emptyList();
case RECORD:
return getNestedInternalSchemaListForRecord((AvroRecordSchema) topLevelSchema);
default:
Expand Down

0 comments on commit db5faab

Please sign in to comment.