diff --git a/README.md b/README.md index be345ab..731050e 100644 --- a/README.md +++ b/README.md @@ -23,28 +23,14 @@ Import the `ballerinax/confluent.cavroserdes` module into your Ballerina project import ballerinax/confluent.cavroserdes; ``` -### Step 2: Instantiate a new connector +### Step 2: Invoke the connector operation -```ballerina -configurable string baseUrl = ?; -configurable int identityMapCapacity = ?; -configurable map originals = ?; -configurable map headers = ?; - -cavroserdes:Client avroSerDes = check new ({ - baseUrl, - identityMapCapacity, - originals, - headers -}); -``` - -### Step 3: Invoke the connector operation - -You can now utilize the operations available within the connector. +You can now utilize the operations available within the connector. To instantiate a `cregistry:Client` instance refer to the guidelines [here](https://central.ballerina.io/ballerinax/confluent.cregistry/latest). ```ballerina public function main() returns error? { + cregistry:Client registry = ; // instantiate a schema registry client + string schema = string ` { "type": "int", @@ -53,12 +39,12 @@ public function main() returns error? { }`; int value = 1; - byte[] bytes = check avroSerDes->serialize(schema, value, "subject"); - int number = check avroSerDes->deserialize(bytes); + byte[] bytes = check cavroserdes:serialize(registry, schema, value, "subject"); + int number = check cavroserdes:deserialize(registry, bytes); } ``` -### Step 4: Run the Ballerina application +### Step 3: Run the Ballerina application Use the following command to compile and run the Ballerina program. @@ -66,6 +52,14 @@ Use the following command to compile and run the Ballerina program. bal run ``` +## Examples + +The Ballerina Avro Serializer/Deserializer connector for Confluent Schema Registry provides practical examples illustrating usage in various scenarios. Explore these [examples](https://github.com/ballerina-platform/module-ballerinax-confluent.cavroserdes/tree/main/examples). + +1. [Kafka Avro producer](https://github.com/ballerina-platform/module-ballerinax-confluent.cavroserdes/tree/main/examples/kafka-avro-producer) - This example demonstrates how to publish Avro serialized data to a Kafka topic. + +2. [Kafka Avro consumer](https://github.com/ballerina-platform/module-ballerinax-confluent.cavroserdes/tree/main/examples/kafka-avro-consumer) - This guide demonstrates how to consume data in the correct format according to the Avro schema from a Kafka topic. + ## Issues and projects The **Issues** and **Projects** tabs are disabled for this repository as this is part of the Ballerina library. To report bugs, request new features, start new discussions, view project boards, etc., visit the Ballerina library [parent repository](https://github.com/ballerina-platform/ballerina-library). diff --git a/ballerina/Module.md b/ballerina/Module.md index 300c5db..21b209a 100644 --- a/ballerina/Module.md +++ b/ballerina/Module.md @@ -16,28 +16,14 @@ Import the `ballerinax/confluent.cavroserdes` module into your Ballerina project import ballerinax/confluent.cavroserdes; ``` -### Step 2: Instantiate a new connector +### Step 2: Invoke the connector operation -```ballerina -configurable string baseUrl = ?; -configurable int identityMapCapacity = ?; -configurable map originals = ?; -configurable map headers = ?; - -cavroserdes:Client avroSerDes = check new ({ - baseUrl, - identityMapCapacity, - originals, - headers -}); -``` - -### Step 3: Invoke the connector operation - -You can now utilize the operations available within the connector. +You can now utilize the operations available within the connector. To instantiate a `cregistry:Client` instance refer to the guidelines [here](https://central.ballerina.io/ballerinax/confluent.cregistry/latest). ```ballerina public function main() returns error? { + cregistry:Client registry = ; // instantiate a schema registry client + string schema = string ` { "type": "int", @@ -46,15 +32,23 @@ public function main() returns error? { }`; int value = 1; - byte[] bytes = check avroSerDes->serialize(schema, value, "subject"); - int number = check avroSerDes->deserialize(bytes); + byte[] bytes = check cavroserdes:serialize(registry, schema, value, "subject"); + int number = check cavroserdes:deserialize(registry, bytes); } ``` -### Step 4: Run the Ballerina application +### Step 3: Run the Ballerina application Use the following command to compile and run the Ballerina program. ```bash bal run ``` + +## Examples + +The Ballerina Avro Serializer/Deserializer connector for Confluent Schema Registry provides practical examples illustrating usage in various scenarios. Explore these [examples](https://github.com/ballerina-platform/module-ballerinax-confluent.cavroserdes/tree/main/examples). + +1. [Kafka Avro producer](https://github.com/ballerina-platform/module-ballerinax-confluent.cavroserdes/tree/main/examples/kafka-avro-producer) - This example demonstrates how to publish Avro serialized data to a Kafka topic. + +2. [Kafka Avro consumer](https://github.com/ballerina-platform/module-ballerinax-confluent.cavroserdes/tree/main/examples/kafka-avro-consumer) - This guide demonstrates how to consume data in the correct format according to the Avro schema from a Kafka topic. diff --git a/ballerina/Package.md b/ballerina/Package.md index 1ca9fad..c524035 100644 --- a/ballerina/Package.md +++ b/ballerina/Package.md @@ -16,28 +16,14 @@ Import the `ballerinax/confluent.cavroserdes` module into your Ballerina project import ballerinax/confluent.cavroserdes; ``` -### Step 2: Instantiate a new connector +### Step 2: Invoke the connector operation -```ballerina -configurable string baseUrl = ?; -configurable int identityMapCapacity = ?; -configurable map originals = ?; -configurable map headers = ?; - -cavroserdes:Client avroSerDes = check new ({ - baseUrl, - identityMapCapacity, - originals, - headers -}); -``` - -### Step 3: Invoke the connector operation - -You can now utilize the operations available within the connector. +You can now utilize the operations available within the connector. To instantiate a `cregistry:Client` instance refer to the guidelines [here](https://central.ballerina.io/ballerinax/confluent.cregistry/latest). ```ballerina public function main() returns error? { + cregistry:Client registry = ; // instantiate a schema registry client + string schema = string ` { "type": "int", @@ -46,15 +32,23 @@ public function main() returns error? { }`; int value = 1; - byte[] bytes = check avroSerDes->serialize(schema, value, "subject"); - int number = check avroSerDes->deserialize(bytes); + byte[] bytes = check cavroserdes:serialize(registry, schema, value, "subject"); + int number = check cavroserdes:deserialize(registry, bytes); } ``` -### Step 4: Run the Ballerina application +### Step 3: Run the Ballerina application Use the following command to compile and run the Ballerina program. ```bash bal run ``` + +## Examples + +The Ballerina Avro Serializer/Deserializer connector for Confluent Schema Registry provides practical examples illustrating usage in various scenarios. Explore these [examples](https://github.com/ballerina-platform/module-ballerinax-confluent.cavroserdes/tree/main/examples). + +1. [Kafka Avro producer](https://github.com/ballerina-platform/module-ballerinax-confluent.cavroserdes/tree/main/examples/kafka-avro-producer) - This example demonstrates how to publish Avro serialized data to a Kafka topic. + +2. [Kafka Avro consumer](https://github.com/ballerina-platform/module-ballerinax-confluent.cavroserdes/tree/main/examples/kafka-avro-consumer) - This guide demonstrates how to consume data in the correct format according to the Avro schema from a Kafka topic. diff --git a/ballerina/client.bal b/ballerina/client.bal index 7cf51e7..4e873c6 100644 --- a/ballerina/client.bal +++ b/ballerina/client.bal @@ -18,51 +18,51 @@ import ballerina/avro; import ballerina/jballerina.java; import ballerinax/confluent.cregistry; -# Consists of APIs to integrate with Avro Serializer/Deserializer for Confluent Schema Registry. -public isolated client class Client { - private final cregistry:Client schemaClient; - - public isolated function init(*cregistry:ConnectionConfig config) returns Error? { - cregistry:Client|error schemaClient = new (config); - if schemaClient is error { - return error Error("Client invocation error", schemaClient); - } - self.schemaClient = schemaClient; +# Serializes the given data according to the Avro format and registers the schema into the schema registry. +# +# + registry - The schema registry client +# + schema - The Avro schema +# + data - The data to be serialized according to the schema +# + subject - The subject under which the schema should be registered +# + return - A `byte` array of the serialized data or else an `cavroserdes:Error` +public isolated function serialize(cregistry:Client registry, string schema, anydata data, + string subject) returns byte[]|Error { + do { + int id = check registry->register(subject, schema); + byte[] encodedId = check toBytes(id); + avro:Schema avroClient = check new (schema); + byte[] serializedData = check avroClient.toAvro(data); + return [...encodedId, ...serializedData]; + } on fail error e { + return error Error(SERIALIZATION_ERROR, e); } +} - # Serializes the given data according to the Avro format and registers the schema into the schema registry. - # - # + schema - The Avro schema - # + data - The data to be serialized according to the schema - # + subject - The subject under which the schema should be registered - # + return - A `byte` array of the serialized data or else an `cavroserdes:Error` - remote isolated function serialize(string schema, anydata data, string subject) returns byte[]|Error { - do { - int id = check self.schemaClient->register(subject, schema); - byte[] encodedId = check toBytes(id); - avro:Schema avroClient = check new (schema); - byte[] serializedData = check avroClient.toAvro(data); - return [...encodedId, ...serializedData]; - } on fail error e { - return error Error(SERIALIZATION_ERROR, e); - } - } +# Deserializes the given Avro serialized message to the given data type by retrieving the schema +# from the schema registry. +# +# + registry - The schema registry client +# + data - Avro serialized data which includes the schema id +# + targetType - Default parameter use to infer the user specified type +# + return - A deserialized data with the given type or else an `cavroserdes:Error` +public isolated function deserialize(cregistry:Client registry, byte[] data, typedesc targetType = <>) + returns targetType|Error = @java:Method { + 'class: "io.ballerina.lib.confluent.avro.serdes.AvroDeserializer" +} external; - # Deserializes the given Avro serialized message to the given data type by retrieving the schema - # from the schema registry. - # - # + data - Avro serialized data which includes the schema id - # + targetType - Default parameter use to infer the user specified type - # + return - A deserialized data with the given type or else an `cavroserdes:Error` - remote isolated function deserialize(byte[] data, typedesc targetType = <>) - returns targetType|Error = @java:Method { - 'class: "io.ballerina.lib.confluent.avro.serdes.AvroSerializer" - } external; +isolated function toBytes(int id) returns byte[]|error = @java:Method { + 'class: "io.ballerina.lib.confluent.avro.serdes.AvroDeserializer" +} external; - isolated function deserializeData(byte[] data) returns anydata|Error { +isolated function getId(byte[] bytes) returns int = @java:Method { + 'class: "io.ballerina.lib.confluent.avro.serdes.AvroDeserializer" +} external; + +class Deserializer { + isolated function deserializeData(cregistry:Client registry, byte[] data) returns anydata|Error { do { int schemaId = getId(data.slice(1, 5)); - string retrievedSchema = check self.schemaClient->getSchemaById(schemaId); + string retrievedSchema = check registry->getSchemaById(schemaId); avro:Schema avroClient = check new (retrievedSchema); anydata deserializedData = check avroClient.fromAvro(data.slice(5, data.length())); return deserializedData; @@ -71,11 +71,3 @@ public isolated client class Client { } } } - -isolated function toBytes(int id) returns byte[]|error = @java:Method { - 'class: "io.ballerina.lib.confluent.avro.serdes.AvroSerializer" -} external; - -isolated function getId(byte[] bytes) returns int = @java:Method { - 'class: "io.ballerina.lib.confluent.avro.serdes.AvroSerializer" -} external; diff --git a/ballerina/tests/tests.bal b/ballerina/tests/tests.bal index aa520a7..ecfcfc6 100644 --- a/ballerina/tests/tests.bal +++ b/ballerina/tests/tests.bal @@ -15,13 +15,14 @@ // under the License. import ballerina/test; +import ballerinax/confluent.cregistry; configurable string baseUrl = ?; configurable int identityMapCapacity = ?; configurable map originals = ?; configurable map headers = ?; -Client avroSerDes = check new ({ +cregistry:Client regsitry = check new ({ baseUrl, identityMapCapacity, originals, @@ -45,9 +46,8 @@ public function testSerDes() returns error? { name: "Red", colors: ["maroon", "dark red", "light red"] }; - - byte[] bytes = check avroSerDes->serialize(schema, colors, "subject-0"); - Color getColors = check avroSerDes->deserialize(bytes); + byte[] bytes = check serialize(regsitry, schema, colors, "subject-0"); + Color getColors = check deserialize(regsitry, bytes); test:assertEquals(getColors, colors); } @@ -69,8 +69,8 @@ public function testWithRecords() returns error? { subject: "Math" }; - byte[] bytes = check avroSerDes->serialize(schema, student, "subject-1"); - Student getStudent = check avroSerDes->deserialize(bytes); + byte[] bytes = check serialize(regsitry, schema, student, "subject-1"); + Student getStudent = check deserialize(regsitry, bytes); test:assertEquals(getStudent, student); } @@ -85,8 +85,8 @@ public function testSerDesWithInteger() returns error? { int value = 1; - byte[] bytes = check avroSerDes->serialize(schema, value, "subject-5"); - int getValue = check avroSerDes->deserialize(bytes); + byte[] bytes = check serialize(regsitry, schema, value, "subject-5"); + int getValue = check deserialize(regsitry, bytes); test:assertEquals(getValue, value); } @@ -108,8 +108,8 @@ public function testSerDesWithCourse() returns error? { credits: 3 }; - byte[] bytes = check avroSerDes->serialize(schema, course, "subject-3"); - Course getCourse = check avroSerDes->deserialize(bytes); + byte[] bytes = check serialize(regsitry, schema, course, "subject-3"); + Course getCourse = check deserialize(regsitry, bytes); test:assertEquals(getCourse, course); } diff --git a/docs/spec/spec.md b/docs/spec/spec.md index 7ebb8d3..5c8f0b0 100644 --- a/docs/spec/spec.md +++ b/docs/spec/spec.md @@ -3,7 +3,7 @@ _Authors_: @Nuvindu \ _Reviewers_: @ThisaruGuruge \ _Created_: 2024/04/10 \ -_Updated_: 2024/04/10 \ +_Updated_: 2024/04/11 \ _Edition_: Swan Lake ## Introduction @@ -19,13 +19,11 @@ The conforming implementation of the specification is released and included in t ## Contents 1. [Overview](#1-overview) -2. [Initialize the Avro Serializer/Deserializer client](#2-initialize-the-avro-serializerdeserializer-client) - * 2.1 [The `init` method](#21-the-init-method) -3. [Serialize data](#3-serialize-data) - * 3.1 [The `serialize` API](#31-the-serialize-api) -4. [Deserialize data](#4-deserialize-data) - * 4.1 [The `deserialize` API](#41-the-deserialize-api) -5. [The `cavroserdes:Error` Type](#5-the-cavroserdeserror-type) +2. [Serialize data](#3-serialize-data) + * 2.1 [The `serialize` API](#31-the-serialize-api) +3. [Deserialize data](#4-deserialize-data) + * 3.1 [The `deserialize` API](#41-the-deserialize-api) +4. [The `cavroserdes:Error` Type](#5-the-cavroserdeserror-type) ## 1. Overview @@ -34,29 +32,7 @@ This specification provides a detailed explanation of the functionalities offere 1. Serialize data 2. Deserialize data -## 2. Initialize the Avro Serializer/Deserializer client - -The Avro Serializer/Deserializer client needs to be initialized before performing the functionalities. - -### 2.1 The `init` method - -The `init` method initializes the Avro Serializer/Deserializer client. It takes a `config` parameter, which contains the necessary configuration to connect to the Schema Registry. The method returns an error if the initialization fails. - -```ballerina -configurable string baseUrl = ?; -configurable int identityMapCapacity = ?; -configurable map originals = ?; -configurable map headers = ?; - -cavroserdes:Client avroSerDes = check new ({ - baseUrl, - identityMapCapacity, - originals, - headers -}); -``` - -## 3. Serialize data +## 2. Serialize data The Avro Serializer/Deserializer module allows serializing data to Avro format using the schemas from the registry. @@ -73,7 +49,8 @@ string schema = string ` }`; int value = 1; -byte[] bytes = check avroSerDes.serialize(schema, value, "subject"); +cregistry:Client registry = ; //instantiates a schema registry client +byte[] bytes = check cavroserdes:serialize(registry, schema, value, "subject"); ``` ## 4. Deserialize data @@ -85,7 +62,7 @@ This section details the process of deserializing Avro data using the schemas fr The `deserialize` method deserializes a given Avro data to its original form using the schema from the registry. ```ballerina -int number = check avroSerDes.deserialize(bytes); +int number = check cavroserdes:deserialize(registry, bytes); ``` ## 5. The `cavroserdes:Error` Type diff --git a/examples/kafka-avro-consumer/main.bal b/examples/kafka-avro-consumer/main.bal index 18d76f1..b5c5475 100644 --- a/examples/kafka-avro-consumer/main.bal +++ b/examples/kafka-avro-consumer/main.bal @@ -16,6 +16,7 @@ import ballerina/io; import ballerinax/confluent.cavroserdes; +import ballerinax/confluent.cregistry; import ballerinax/kafka; type Order readonly & record { @@ -34,7 +35,7 @@ public function main() returns error? { topics: "test-topic" }); - cavroserdes:Client registry = check new ({ + cregistry:Client registry = check new ({ baseUrl, identityMapCapacity, originals, @@ -42,9 +43,8 @@ public function main() returns error? { }); while true { - kafka:AnydataConsumerRecord[] getValues = check orderConsumer->poll(60); - byte[] orderData = getValues[0].value; - Order getOrder = check registry->deserialize(orderData); + byte[][] getValues = check orderConsumer->pollPayload(60); + Order getOrder = check cavroserdes:deserialize(registry, getValues[0]); io:println("Order : ", getOrder); } } diff --git a/examples/kafka-avro-producer/main.bal b/examples/kafka-avro-producer/main.bal index 9e9fc9f..c7892b0 100644 --- a/examples/kafka-avro-producer/main.bal +++ b/examples/kafka-avro-producer/main.bal @@ -16,6 +16,7 @@ import ballerina/http; import ballerinax/confluent.cavroserdes; +import ballerinax/confluent.cregistry; import ballerinax/kafka; configurable string baseUrl = ?; @@ -29,7 +30,7 @@ type Order readonly & record { service / on new http:Listener(9090) { private final kafka:Producer orderProducer; - private final cavroserdes:Client registry; + private final cregistry:Client registry; function init() returns error? { self.orderProducer = check new (kafka:DEFAULT_URL); @@ -51,7 +52,8 @@ service / on new http:Listener(9090) { {"name": "productName", "type": "string"} ] }`; - byte[] byteValue = check self.registry->serialize(schema, newOrder, "new-subject"); + + byte[] byteValue = check cavroserdes:serialize(self.registry, schema, newOrder, "new-subject"); check self.orderProducer->send({ topic: "test-topic", value: byteValue diff --git a/native/src/main/java/io/ballerina/lib/confluent/avro/serdes/AvroSerializer.java b/native/src/main/java/io/ballerina/lib/confluent/avro/serdes/AvroDeserializer.java similarity index 83% rename from native/src/main/java/io/ballerina/lib/confluent/avro/serdes/AvroSerializer.java rename to native/src/main/java/io/ballerina/lib/confluent/avro/serdes/AvroDeserializer.java index 51fc20e..1b98a69 100644 --- a/native/src/main/java/io/ballerina/lib/confluent/avro/serdes/AvroSerializer.java +++ b/native/src/main/java/io/ballerina/lib/confluent/avro/serdes/AvroDeserializer.java @@ -36,8 +36,9 @@ /** * Provide APIs related to Avro Serialization/Deserialization with the Schema Registry. */ -public class AvroSerializer { +public class AvroDeserializer { private static final String DESERIALIZE_FUNCTION = "deserializeData"; + private static final String DESERIALIZER = "Deserializer"; public static final StrandMetadata EXECUTION_STRAND = new StrandMetadata( getModule().getOrg(), @@ -45,15 +46,15 @@ public class AvroSerializer { getModule().getMajorVersion(), DESERIALIZE_FUNCTION); - public static Object deserialize(Environment env, BObject kafkaSerDes, BArray data, - BTypedesc typeDesc) { + public static Object deserialize(Environment env, BObject registry, BArray data, BTypedesc typeDesc) { + BObject deserializer = ValueCreator.createObjectValue(getModule(), DESERIALIZER, null, null); Future future = env.markAsync(); ExecutionCallback executionCallback = new ExecutionCallback(future, typeDesc); - Object[] arguments = new Object[]{data, true}; + Object[] arguments = new Object[]{registry, true, data, true}; UnionType typeUnion = TypeCreator.createUnionType(PredefinedTypes.TYPE_ANYDATA_ARRAY, PredefinedTypes.TYPE_ERROR); env.getRuntime() - .invokeMethodAsyncConcurrently(kafkaSerDes, DESERIALIZE_FUNCTION, null, EXECUTION_STRAND, + .invokeMethodAsyncConcurrently(deserializer, DESERIALIZE_FUNCTION, null, EXECUTION_STRAND, executionCallback, null, typeUnion, arguments); return null; }