Skip to content

Commit

Permalink
Adds deserializer support and refactor Java handler code.
Browse files Browse the repository at this point in the history
  • Loading branch information
blacktooth committed May 24, 2022
1 parent d74a428 commit 2630502
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 120 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#ifndef GLUE_SCHEMA_REGISTRY_DESERIALIZER_H
#define GLUE_SCHEMA_REGISTRY_DESERIALIZER_H

#include "glue_schema_registry_schema.h"
#include "mutable_byte_array.h"
#include "read_only_byte_array.h"
#include <stdbool.h>

typedef struct glue_schema_registry_deserializer {
//This is used for storing the instance context. Currently, being used for managing GraalVM instance.
void *instance_context;
} glue_schema_registry_deserializer;

glue_schema_registry_deserializer *new_glue_schema_registry_deserializer(void);

void delete_glue_schema_registry_deserializer(glue_schema_registry_deserializer *deserializer);

mutable_byte_array *glue_schema_registry_deserializer_decode(glue_schema_registry_deserializer *deserializer,
read_only_byte_array *array);

glue_schema_registry_schema *glue_schema_registry_deserializer_decode_schema(glue_schema_registry_deserializer *deserializer,
read_only_byte_array *array);

bool glue_schema_registry_deserializer_can_decode(glue_schema_registry_deserializer *deserializer,
read_only_byte_array *array);

#endif //GLUE_SCHEMA_REGISTRY_DESERIALIZER_H
2 changes: 2 additions & 0 deletions native-schema-registry/c/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ add_library(
glue_schema_registry_schema.c
read_only_byte_array.c
mutable_byte_array.c
glue_schema_registry_serializer.c
glue_schema_registry_deserializer.c
)

target_link_libraries(
Expand Down
80 changes: 80 additions & 0 deletions native-schema-registry/c/src/glue_schema_registry_deserializer.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#include "../include/glue_schema_registry_deserializer.h"
#include "../include/error_handling.h"
#include "../../target/libnativeschemaregistry.h"
#include <stdlib.h>

glue_schema_registry_deserializer * new_glue_schema_registry_deserializer() {
glue_schema_registry_deserializer *deserializer = NULL;
deserializer =
(glue_schema_registry_deserializer *) malloc(sizeof(glue_schema_registry_deserializer));

int ret = graal_create_isolate(NULL, NULL, (graal_isolatethread_t **) &deserializer->instance_context);
if (ret != 0) {
log_error("Failed to initialize GraalVM isolate.", ERR_CODE_GRAALVM_INIT_EXCEPTION);
delete_glue_schema_registry_deserializer(deserializer);
return NULL;
}
//TODO: Handle errors here.
initialize_deserializer(deserializer->instance_context);
return deserializer;
}

void delete_glue_schema_registry_deserializer(glue_schema_registry_deserializer * deserializer) {
if (deserializer == NULL) {
log_error("Deserializer is NULL", ERR_CODE_NULL_PARAMETERS);
return;
}
if (deserializer->instance_context != NULL) {
int ret = graal_tear_down_isolate(deserializer->instance_context);
if (ret != 0) {
log_error("Error tearing down the graal isolate instance.", ERR_CODE_GRAALVM_TEARDOWN_EXCEPTION);
}
deserializer->instance_context = NULL;
}

free(deserializer);
}

mutable_byte_array *glue_schema_registry_deserializer_decode(glue_schema_registry_deserializer * deserializer, read_only_byte_array *array) {
if (deserializer == NULL || deserializer->instance_context == NULL) {
log_error("Deserializer instance or instance context is null.", ERR_CODE_INVALID_STATE);
return NULL;
}

if (array == NULL || array->len == 0) {
log_error("Byte array cannot be null", ERR_CODE_NULL_PARAMETERS);
return NULL;
}

return decode(deserializer->instance_context, array);
}

glue_schema_registry_schema *glue_schema_registry_deserializer_decode_schema(glue_schema_registry_deserializer * deserializer, read_only_byte_array *array) {
if (deserializer == NULL || deserializer->instance_context == NULL) {
log_error("Deserializer instance or instance context is null.", ERR_CODE_INVALID_STATE);
return NULL;
}

if (array == NULL || array->len == 0) {
log_error("Byte array cannot be null", ERR_CODE_NULL_PARAMETERS);
return NULL;
}

glue_schema_registry_schema * schema = decode_schema(deserializer->instance_context, array);
return schema;
}

bool glue_schema_registry_deserializer_can_decode(glue_schema_registry_deserializer * deserializer, read_only_byte_array *array) {
if (deserializer == NULL || deserializer->instance_context == NULL) {
log_error("Deserializer instance or instance context is null.", ERR_CODE_INVALID_STATE);
return NULL;
}

if (array == NULL || array->len == 0) {
log_error("Byte array cannot be null", ERR_CODE_NULL_PARAMETERS);
return NULL;
}

return can_decode(deserializer->instance_context, array);
}

5 changes: 2 additions & 3 deletions native-schema-registry/c/src/swig/mutable_byte_array.i
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
#if defined(SWIGPYTHON)
//Converts the unsigned char * to a Python Bytes object.
%typemap(out) mutable_byte_array * %{
mutable_byte_array *array = $1;
PyObject * obj = PyMemoryView_FromMemory((char *) array->data, array->max_len, PyBUF_READ);
PyObject * obj = PyMemoryView_FromMemory((char *) $1->data, $1->max_len, PyBUF_READ);
//Copy the contents to a Python byte array
$result = PyBytes_FromObject(obj);
//Release the memoryview object
Py_DECREF(obj);
Py_CLEAR(obj);
//Delete the underlying mutable_byte_array
delete_mutable_byte_array($1);
%}
Expand Down
2 changes: 0 additions & 2 deletions native-schema-registry/c/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ list(
mutable_byte_array_test
)

add_library(mock_foo SHARED mock_foo.c)

foreach (test ${tests})
add_executable(
"${test}"
Expand Down
49 changes: 36 additions & 13 deletions native-schema-registry/python/PyGsrSerDe/PyGsrSerDe.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from PyGsrSerDe import _GsrSerDe

from .GsrSerDe import *

"""
Expand All @@ -8,19 +10,12 @@

class GlueSchemaRegistrySchema:
def __init__(self, schema_name, schema_def, data_format):
self.gsr_schema = glue_schema_registry_schema(schema_name, schema_def, data_format)

def __del__(self):
del self.gsr_schema

def schema_name(self):
return self.gsr_schema.get_schema_name()
self.schema_name = schema_name
self.schema_def = schema_def
self.data_format = data_format

def data_format(self):
return self.gsr_schema.get_data_format()

def schema_def(self):
return self.gsr_schema.get_schema_def()
def __str__(self):
return f'GlueSchemaRegistrySchema: [{self.schema_name}, {self.schema_def}, {self.data_format}]'


class GlueSchemaRegistrySerializer:
Expand All @@ -32,5 +27,33 @@ def __del__(self):

def encode(self, transport_name: str, schema: GlueSchemaRegistrySchema, byte_arr: bytes) -> bytes:
ro_byte_array = read_only_byte_array(byte_arr)
encoded_byte_buffer = self.serializer.encode(ro_byte_array, transport_name, schema.gsr_schema)
gsr_schema = glue_schema_registry_schema(schema.schema_name, schema.schema_def, schema.data_format)
encoded_byte_buffer = self.serializer.encode(ro_byte_array, transport_name, gsr_schema)
return encoded_byte_buffer


class GlueSchemaRegistryDeserializer:
def __init__(self):
self.deserializer = glue_schema_registry_deserializer()

def __del__(self):
del self.deserializer

def decode(self, byte_arr: bytes) -> bytes:
ro_byte_array = read_only_byte_array(byte_arr)
decoded_byte_buffer = self.deserializer.decode(ro_byte_array)
return decoded_byte_buffer

def decode_schema(self, byte_arr: bytes) -> GlueSchemaRegistrySchema:
ro_byte_array = read_only_byte_array(byte_arr)
gsr_schema = self.deserializer.decode_schema(ro_byte_array)
schema = GlueSchemaRegistrySchema(
gsr_schema.get_schema_name(),
gsr_schema.get_schema_def(),
gsr_schema.get_data_format(),
)
return schema

def can_decode(self, byte_arr: bytes) -> bool:
ro_byte_array = read_only_byte_array(byte_arr)
return self.deserializer.can_decode(ro_byte_array)
2 changes: 1 addition & 1 deletion native-schema-registry/python/PyGsrSerDe/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .PyGsrSerDe import *

__all__ = ['GlueSchemaRegistrySchema', 'GlueSchemaRegistrySerializer']
__all__ = ['GlueSchemaRegistrySchema', 'GlueSchemaRegistrySerializer', 'GlueSchemaRegistryDeserializer']
17 changes: 10 additions & 7 deletions native-schema-registry/python/install.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
#!/bin/zsh
python_version=$1
python_cmd="python$python_version"
#TODO: Simple script to test Python bindings, upgrade this to multi-platform compatible build.
rm -rf build/ dist/ wheelhouse/ PyGsrSerDe.egg-info/
ln -sf $(PWD)/../target $(PWD)/../python/deps
ln -sf $(PWD)/../c $(PWD)/../python/c
python3.9 -m pip wheel -w dist --verbose .
ln -sf $PWD/../target $PWD/../python/deps
ln -sf $PWD/../c $PWD/../python/c
$python_cmd -m pip wheel -w dist --verbose .
cp c/src/swig/GsrSerDe.py ./PyGsrSerDe/
sed -ie "s/c.src.swig/./" PyGsrSerDe/GsrSerDe.py
$python_cmd -m pip wheel -w dist --verbose .

LD_LIBRARY_PATH=LD_LIBRARY_PATH:$(PWD)/ && auditwheel repair dist/*.whl --plat 'manylinux2014_x86_64'
export LD_LIBRARY_PATH=LD_LIBRARY_PATH:$PWD/ && auditwheel repair dist/*.whl --plat 'manylinux2014_x86_64'

python3.9 -m pip install wheelhouse/*.whl --force-reinstall
cd ../
python3.9
$python_cmd -m pip install wheelhouse/*.whl --force-reinstall
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.amazonaws.services.schemaregistry;

import org.graalvm.nativeimage.c.type.CTypeConversion;
import org.graalvm.word.PointerBase;

import java.nio.ByteBuffer;

import static com.amazonaws.services.schemaregistry.DataTypes.C_MutableByteArray;
import static com.amazonaws.services.schemaregistry.DataTypes.C_ReadOnlyByteArray;
import static com.amazonaws.services.schemaregistry.DataTypes.newMutableByteArray;
import static com.amazonaws.services.schemaregistry.DataTypes.writeToMutableArray;

/**
* Converts Java Byte arrays to/fro C byte arrays.
*/
public class ByteArrayConverter {
public static byte[] fromCReadOnlyByteArray(C_ReadOnlyByteArray c_readOnlyByteArray) {
PointerBase cData = c_readOnlyByteArray.getData();
//This is validated to fit in Integer limits.
int cDataLen = Math.toIntExact(c_readOnlyByteArray.getLen());

//Copy the bytebuffer to a byte [].
//TODO: This won't be needed if Java APIs accepted ByteBuffer instead of byte[]
ByteBuffer javaByteBuffer = CTypeConversion.asByteBuffer(cData, cDataLen);
byte[] bytes = new byte[cDataLen];
javaByteBuffer.get(bytes);

return bytes;
}

public static C_MutableByteArray toCMutableByteArray(byte[] bytes) {
int len = bytes.length;
C_MutableByteArray mutableByteArray = newMutableByteArray(len);

//TODO: Check for performance issues with this.
for (int index = 0; index < len; index++) {
writeToMutableArray(mutableByteArray, index, bytes[index]);
}
return mutableByteArray;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.amazonaws.services.schemaregistry;

import com.amazonaws.services.schemaregistry.common.Schema;
import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration;
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.google.common.collect.ImmutableMap;
import org.graalvm.nativeimage.IsolateThread;
import org.graalvm.nativeimage.c.CContext;
import org.graalvm.nativeimage.c.function.CEntryPoint;
import org.graalvm.nativeimage.c.type.CTypeConversion;

import java.util.Map;

import static com.amazonaws.services.schemaregistry.ByteArrayConverter.fromCReadOnlyByteArray;
import static com.amazonaws.services.schemaregistry.ByteArrayConverter.toCMutableByteArray;
import static com.amazonaws.services.schemaregistry.DataTypes.C_GlueSchemaRegistrySchema;
import static com.amazonaws.services.schemaregistry.DataTypes.C_MutableByteArray;
import static com.amazonaws.services.schemaregistry.DataTypes.C_ReadOnlyByteArray;
import static com.amazonaws.services.schemaregistry.DataTypes.HandlerDirectives;
import static com.amazonaws.services.schemaregistry.DataTypes.newGlueSchemaRegistrySchema;

/**
* Entry point class for the serialization methods of GSR shared library.
*/
@CContext(HandlerDirectives.class)
public class GlueSchemaRegistryDeserializationHandler {

@CEntryPoint(name = "initialize_deserializer")
public static void initializeDeserializer(IsolateThread isolateThread) {
//TODO: Add GlueSchemaRegistryConfiguration to this method. This is hard-coded for now.
//TODO: Error handling
Map<String, String> configMap =
ImmutableMap.of(
AWSSchemaRegistryConstants.AWS_REGION,
"us-east-1"
);
GlueSchemaRegistryConfiguration glueSchemaRegistryConfiguration =
new GlueSchemaRegistryConfiguration(configMap);

DeserializerInstance.create(glueSchemaRegistryConfiguration);
}

@CEntryPoint(name = "decode")
public static C_MutableByteArray decode(
IsolateThread isolateThread, C_ReadOnlyByteArray c_readOnlyByteArray) {

byte[] bytesToDecode = fromCReadOnlyByteArray(c_readOnlyByteArray);

//Assuming deserializer instance is already initialized
GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = DeserializerInstance.get();

byte[] decodedBytes =
glueSchemaRegistryDeserializer.getData(bytesToDecode);

return toCMutableByteArray(decodedBytes);
}

@CEntryPoint(name = "decode_schema")
public static C_GlueSchemaRegistrySchema decodeSchema(
IsolateThread isolateThread, C_ReadOnlyByteArray c_readOnlyByteArray) {
byte[] bytesToDecode = fromCReadOnlyByteArray(c_readOnlyByteArray);

//Assuming serializer instance is already initialized
GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = DeserializerInstance.get();
Schema decodedSchema =
glueSchemaRegistryDeserializer.getSchema(bytesToDecode);

CTypeConversion.CCharPointerHolder cSchemaNamePointer =
CTypeConversion.toCString(decodedSchema.getSchemaName());
CTypeConversion.CCharPointerHolder cSchemaDefPointer =
CTypeConversion.toCString(decodedSchema.getSchemaDefinition());
CTypeConversion.CCharPointerHolder cDataFormatPointer =
CTypeConversion.toCString(decodedSchema.getDataFormat());

//TODO: We can potentially expose the C Strings to target language layer to
//prevent copying strings repeatedly.
C_GlueSchemaRegistrySchema c_glueSchemaRegistrySchema = newGlueSchemaRegistrySchema(
cSchemaNamePointer.get(),
cSchemaDefPointer.get(),
cDataFormatPointer.get()
);
//newGlueSchemaRegistrySchema has it's own copy of these attributes.
cDataFormatPointer.close();
cSchemaDefPointer.close();
cSchemaNamePointer.close();

return c_glueSchemaRegistrySchema;
}

@CEntryPoint(name = "can_decode")
public static byte canDecode(
IsolateThread isolateThread, C_ReadOnlyByteArray c_readOnlyByteArray) {
byte[] bytesToDecode = fromCReadOnlyByteArray(c_readOnlyByteArray);

GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = DeserializerInstance.get();
boolean canDeserialize =
glueSchemaRegistryDeserializer.canDeserialize(bytesToDecode);

return CTypeConversion.toCBoolean(canDeserialize);
}
}
Loading

0 comments on commit 2630502

Please sign in to comment.