diff --git a/build.gradle b/build.gradle
index 4e573bd5c..0da6629f3 100644
--- a/build.gradle
+++ b/build.gradle
@@ -215,17 +215,117 @@ def getProjectVersion() {
return ver
}
-project('serializers') {
+project('serializers:shared') {
dependencies {
compile project(':common')
compile project(':client')
+ compile group: 'io.pravega', name: 'pravega-client', version: pravegaVersion
+ compile group: 'org.xerial.snappy', name: 'snappy-java', version: snappyVersion
+ testCompile group: 'org.slf4j', name: 'log4j-over-slf4j', version: slf4jApiVersion
+ testCompile group: 'ch.qos.logback', name: 'logback-classic', version: qosLogbackVersion
+ testCompile group: 'io.pravega', name: 'pravega-test-testcommon', version: pravegaVersion
+ }
+
+ javadoc {
+ title = "Serializers shared"
+ dependsOn delombok
+ source = delombok.outputDir
+ failOnError = true
+ exclude "**/impl/**";
+ options.addBooleanOption("Xdoclint:all,-reference", true)
+ }
+}
+
+project('serializers:avro') {
+ dependencies {
+ compile project(':serializers:shared')
compile group: 'org.apache.avro', name: 'avro', version: avroVersion
+ testCompile project(path:':serializers:shared', configuration:'testRuntime')
+ testCompile group: 'org.slf4j', name: 'log4j-over-slf4j', version: slf4jApiVersion
+ testCompile group: 'ch.qos.logback', name: 'logback-classic', version: qosLogbackVersion
+ testCompile group: 'io.pravega', name: 'pravega-test-testcommon', version: pravegaVersion
+ }
+
+ javadoc {
+ title = "Serializers avro"
+ dependsOn delombok
+ source = delombok.outputDir
+ failOnError = true
+ exclude "**/impl/**";
+ options.addBooleanOption("Xdoclint:all,-reference", true)
+ }
+
+ jar {
+ from {
+ configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
+ }
+ }
+}
+
+project('serializers:protobuf') {
+ dependencies {
+ compile project(':serializers:shared')
compile group: 'com.google.protobuf', name: 'protobuf-java', version: protobufProtocVersion
compile group: 'com.google.protobuf', name: 'protobuf-java-util', version: protobufUtilVersion
- compile group: 'io.pravega', name: 'pravega-client', version: pravegaVersion
- compile group: 'org.xerial.snappy', name: 'snappy-java', version: snappyVersion
- compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-jsonSchema', version: jacksonVersion
+ testCompile project(path:':serializers:shared', configuration:'testRuntime')
+ testCompile group: 'org.slf4j', name: 'log4j-over-slf4j', version: slf4jApiVersion
+ testCompile group: 'ch.qos.logback', name: 'logback-classic', version: qosLogbackVersion
+ testCompile group: 'io.pravega', name: 'pravega-test-testcommon', version: pravegaVersion
+ }
+
+ javadoc {
+ title = "Serializers protobuf"
+ dependsOn delombok
+ source = delombok.outputDir
+ failOnError = true
+ exclude "**/impl/**";
+ options.addBooleanOption("Xdoclint:all,-reference", true)
+ }
+
+ jar {
+ from {
+ configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
+ }
+ }
+}
+
+project('serializers:json') {
+ dependencies {
+ compile project(':serializers:shared')
compile group: 'com.github.everit-org.json-schema', name: 'org.everit.json.schema', version: everitVersion
+ compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-jsonSchema', version: jacksonVersion
+ testCompile project(path:':serializers:shared', configuration:'testRuntime')
+ testCompile group: 'org.slf4j', name: 'log4j-over-slf4j', version: slf4jApiVersion
+ testCompile group: 'ch.qos.logback', name: 'logback-classic', version: qosLogbackVersion
+ testCompile group: 'io.pravega', name: 'pravega-test-testcommon', version: pravegaVersion
+ }
+
+ javadoc {
+ title = "Serializers json"
+ dependsOn delombok
+ source = delombok.outputDir
+ failOnError = true
+ exclude "**/impl/**";
+ options.addBooleanOption("Xdoclint:all,-reference", true)
+ }
+
+ jar {
+ from {
+ configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
+ }
+ }
+}
+
+project('serializers') {
+ dependencies {
+ compile project(':serializers:avro')
+ compile project(':serializers:protobuf')
+ compile project(':serializers:json')
+ compile group: 'org.xerial.snappy', name: 'snappy-java', version: snappyVersion
+ testCompile project(path:':serializers:shared', configuration:'testRuntime')
+ testCompile files(project(':serializers:avro').sourceSets.test.output)
+ testCompile files(project(':serializers:protobuf').sourceSets.test.output)
+ testCompile files(project(':serializers:json').sourceSets.test.output)
testCompile group: 'org.slf4j', name: 'log4j-over-slf4j', version: slf4jApiVersion
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: qosLogbackVersion
testCompile group: 'io.pravega', name: 'pravega-test-testcommon', version: pravegaVersion
@@ -242,6 +342,8 @@ project('serializers') {
jar {
manifest {}
+ // add zip64=true so that we are able to pack more than 65k files in the jar.
+ zip64=true
from {
configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
@@ -317,6 +419,47 @@ project('server') {
}
}
+project('test') {
+ sourceSets {
+ test.resources.srcDirs += "$rootDir/resources"
+ test.java.srcDir project(':server').file("src/test/java")
+ test.java.srcDir project(':serializers').file("src/test/java")
+ test.java.srcDir project(':serializers:shared').file("src/test/java")
+ test.java.srcDir project(':serializers:protobuf').file("src/test/java")
+ test.java.srcDir project(':serializers:json').file("src/test/java")
+ test.java.srcDir project(':serializers:avro').file("src/test/java")
+ }
+ compileTestJava.dependsOn tasks.getByPath(':server:testClasses')
+ dependencies {
+ compile project(':common')
+ compile project(':contract')
+ compile project(':client')
+ compile project(':server')
+ compile project(':serializers')
+ compile project(':serializers:protobuf')
+ compile project(':serializers:avro')
+ compile project(':serializers:json')
+ compile project(':serializers:shared')
+ testCompile (group: 'io.pravega', name: 'pravega-standalone', version: pravegaVersion) {
+ exclude group: 'javax.ws.rs', module: 'jsr311-api'
+ }
+ testCompile group: 'io.pravega', name: 'pravega-test-testcommon', version: pravegaVersion
+ testCompile group: 'org.slf4j', name: 'log4j-over-slf4j', version: slf4jApiVersion
+ testCompile group: 'ch.qos.logback', name: 'logback-classic', version: qosLogbackVersion
+ testCompile files(project(':server').sourceSets.test.output.classesDir)
+ testCompile group: 'org.glassfish.jersey.test-framework.providers', name: 'jersey-test-framework-provider-grizzly2', version: jerseyVersion
+ }
+
+ javadoc {
+ title = "Test"
+ dependsOn delombok
+ source = delombok.outputDir
+ failOnError = true
+ exclude "**/impl/**";
+ options.addBooleanOption("Xdoclint:all,-reference", true)
+ }
+}
+
apply plugin: 'distribution'
distributions {
main {
@@ -393,6 +536,10 @@ task publishAllJars() {
dependsOn ':common:publish'
dependsOn ':contract:publish'
dependsOn ':server:publish'
+ dependsOn ':serializers:shared:publish'
+ dependsOn ':serializers:avro:publish'
+ dependsOn ':serializers:json:publish'
+ dependsOn ':serializers:protobuf:publish'
dependsOn ':serializers:publish'
}
diff --git a/checkstyle/spotbugs-exclude.xml b/checkstyle/spotbugs-exclude.xml
index 31efb43d4..a1ed58ff9 100644
--- a/checkstyle/spotbugs-exclude.xml
+++ b/checkstyle/spotbugs-exclude.xml
@@ -3,10 +3,16 @@
-
+
-
+
+
+
+
+
+
+
diff --git a/client/src/main/java/io/pravega/schemaregistry/client/SchemaRegistryClientConfig.java b/client/src/main/java/io/pravega/schemaregistry/client/SchemaRegistryClientConfig.java
index 2ba913a52..7f98e72f2 100644
--- a/client/src/main/java/io/pravega/schemaregistry/client/SchemaRegistryClientConfig.java
+++ b/client/src/main/java/io/pravega/schemaregistry/client/SchemaRegistryClientConfig.java
@@ -24,18 +24,102 @@ public class SchemaRegistryClientConfig {
* URI for connecting with registry client.
*/
private final URI schemaRegistryUri;
+ /**
+ * Flag to indicate if authentication is enabled.
+ */
private final boolean authEnabled;
+ /**
+ * Authentication method.
+ */
private final String authMethod;
+ /**
+ * Authentication token.
+ */
private final String authToken;
+ /*
+ * Path to trust store for TLS server authentication certificate.
+ */
+ private final String trustStore;
+ /**
+ * Type of key store used as the trust store - e.g. jks, pkcs11, pkcs12, dks etc. If not specified then either
+ * certificate (if configured) or default java TLS store as specified in system properties would be used.
+ */
+ private final String trustStoreType;
+ /**
+ * Password for the trust store. Defaults to null.
+ */
+ private final String trustStorePassword;
+ /**
+ * If the trust store is a certificate file, typically DER or PEM file.
+ */
+ private final String certificate;
+ /**
+ * Flag to indicate whether client should perform host name validation in server authentication certificate.
+ */
+ private final boolean validateHostName;
- private SchemaRegistryClientConfig(URI schemaRegistryUri, boolean authEnabled, String authMethod, String authToken) {
+ private SchemaRegistryClientConfig(URI schemaRegistryUri, boolean authEnabled, String authMethod, String authToken,
+ String trustStore, String trustStoreType, String trustStorePassword,
+ String certificate, boolean validateHostName) {
this.schemaRegistryUri = schemaRegistryUri;
this.authEnabled = authEnabled;
this.authMethod = authMethod;
this.authToken = authToken;
+ this.trustStore = trustStore;
+ this.certificate = certificate;
+ this.trustStoreType = trustStoreType;
+ this.trustStorePassword = trustStorePassword;
+ this.validateHostName = validateHostName;
}
public static final class SchemaRegistryClientConfigBuilder {
private boolean authEnabled = false;
+ private boolean validateHostName = false;
+ private String trustStore = null;
+ private String trustStoreType = null;
+ private String trustStorePassword = null;
+ private String certificate = null;
+
+ public SchemaRegistryClientConfigBuilder certificate(String certificate) {
+ this.certificate = certificate;
+ return this;
+ }
+
+ public SchemaRegistryClientConfigBuilder trustStore(String trustStore, String trustStoreType, String trustStorePassword) {
+ this.trustStore = trustStore;
+ return this.trustStoreType(trustStoreType)
+ .trustStorePassword(trustStorePassword);
+ }
+
+ private SchemaRegistryClientConfigBuilder trustStoreType(String trustStoreType) {
+ this.trustStoreType = trustStoreType;
+ return this;
+ }
+
+ private SchemaRegistryClientConfigBuilder trustStorePassword(String trustStorePassword) {
+ this.trustStorePassword = trustStorePassword;
+ return this;
+ }
+
+ public SchemaRegistryClientConfigBuilder authentication(String authMethod, String authToken) {
+ return this.authEnabled()
+ .authMethod(authMethod)
+ .authToken(authToken);
+ }
+
+ private SchemaRegistryClientConfigBuilder authEnabled() {
+ this.authEnabled = true;
+ return this;
+ }
+
+ private SchemaRegistryClientConfigBuilder authMethod(String authMethod) {
+ this.authMethod = authMethod;
+ return this;
+ }
+
+ private SchemaRegistryClientConfigBuilder authToken(String authToken) {
+ this.authToken = authToken;
+ return this;
+ }
}
}
diff --git a/client/src/main/java/io/pravega/schemaregistry/client/SchemaRegistryClientImpl.java b/client/src/main/java/io/pravega/schemaregistry/client/SchemaRegistryClientImpl.java
index fb4aecb9b..e21eaa9e0 100644
--- a/client/src/main/java/io/pravega/schemaregistry/client/SchemaRegistryClientImpl.java
+++ b/client/src/main/java/io/pravega/schemaregistry/client/SchemaRegistryClientImpl.java
@@ -10,8 +10,10 @@
package io.pravega.schemaregistry.client;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import io.pravega.common.Exceptions;
import io.pravega.common.util.Retry;
+import io.pravega.common.util.CertificateUtils;
import io.pravega.schemaregistry.common.AuthHelper;
import io.pravega.schemaregistry.common.ContinuationTokenIterator;
import io.pravega.schemaregistry.contract.data.CodecType;
@@ -34,15 +36,25 @@
import io.pravega.schemaregistry.contract.generated.rest.model.ValidateRequest;
import io.pravega.schemaregistry.contract.transform.ModelHelper;
import io.pravega.schemaregistry.contract.v1.ApiV1;
+import lombok.SneakyThrows;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.proxy.WebResourceFactory;
import javax.annotation.Nullable;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.ClientRequestFilter;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.Comparator;
@@ -70,6 +82,8 @@ public class SchemaRegistryClientImpl implements SchemaRegistryClient {
.withExpBackoff(100, 2, 10, 1000)
.retryWhen(x -> Exceptions.unwrap(x) instanceof ConnectionException);
private static final int GROUP_LIMIT = 100;
+ private static final String HTTPS = "https";
+ private static final String TLS = "TLS";
private final ApiV1.GroupsApi groupProxy;
private final ApiV1.SchemasApi schemaProxy;
@@ -77,7 +91,17 @@ public class SchemaRegistryClientImpl implements SchemaRegistryClient {
private final Client client;
SchemaRegistryClientImpl(SchemaRegistryClientConfig config, String namespace) {
- client = ClientBuilder.newClient(new ClientConfig());
+ Preconditions.checkNotNull(config);
+ Preconditions.checkNotNull(config.getSchemaRegistryUri());
+ ClientBuilder clientBuilder = ClientBuilder.newBuilder().withConfig(new ClientConfig());
+ if (HTTPS.equalsIgnoreCase(config.getSchemaRegistryUri().getScheme())) {
+ clientBuilder = clientBuilder.sslContext(getSSLContext(config));
+ if (!config.isValidateHostName()) {
+ // host name verification is done by default. To disable it we will add an always true verifier
+ clientBuilder.hostnameVerifier((a, b) -> true);
+ }
+ }
+ client = clientBuilder.build();
if (config.isAuthEnabled()) {
client.register((ClientRequestFilter) context -> {
context.getHeaders().add(HttpHeaders.AUTHORIZATION,
@@ -486,4 +510,44 @@ public void close() throws Exception {
client.close();
}
}
+
+ @SneakyThrows(IOException.class)
+ private SSLContext getSSLContext(SchemaRegistryClientConfig config) {
+ try {
+ // If trust store is specified, use it.
+ // Else check if certificate is provided.
+ // Else use default SSL context.
+ KeyStore trustStore;
+ if (config.getTrustStore() != null) {
+ trustStore = getTrustStore(config);
+ } else if (config.getCertificate() != null) {
+ trustStore = CertificateUtils.createTrustStore(config.getCertificate());
+ } else {
+ return SSLContext.getDefault();
+ }
+ TrustManagerFactory factory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ factory.init(trustStore);
+ SSLContext tlsContext = SSLContext.getInstance(TLS);
+ tlsContext.init(null, factory.getTrustManagers(), null);
+ return tlsContext;
+ } catch (KeyManagementException | KeyStoreException | NoSuchAlgorithmException |
+ CertificateException e) {
+ throw new IllegalStateException("Failure initializing trust store", e);
+ }
+ }
+
+ private KeyStore getTrustStore(SchemaRegistryClientConfig config) throws KeyStoreException,
+ IOException, NoSuchAlgorithmException, CertificateException {
+ KeyStore trustStore;
+ trustStore = KeyStore.getInstance(config.getTrustStoreType());
+ try (FileInputStream fin = new FileInputStream(config.getTrustStore())) {
+ String trustStorePassword = config.getTrustStorePassword();
+ if (trustStorePassword != null) {
+ trustStore.load(fin, trustStorePassword.toCharArray());
+ } else {
+ trustStore.load(fin, null);
+ }
+ }
+ return trustStore;
+ }
}
diff --git a/client/src/test/java/io/pravega/schemaregistry/client/SchemaRegistryClientConfigTest.java b/client/src/test/java/io/pravega/schemaregistry/client/SchemaRegistryClientConfigTest.java
new file mode 100644
index 000000000..c056193f4
--- /dev/null
+++ b/client/src/test/java/io/pravega/schemaregistry/client/SchemaRegistryClientConfigTest.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.schemaregistry.client;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class SchemaRegistryClientConfigTest {
+ @Test
+ public void testSSLConfig() {
+ SchemaRegistryClientConfig config = SchemaRegistryClientConfig.builder().build();
+ assertNull(config.getCertificate());
+ assertNull(config.getTrustStore());
+ assertNull(config.getTrustStoreType());
+ assertNull(config.getTrustStorePassword());
+
+ config = SchemaRegistryClientConfig.builder().certificate("certPath").build();
+ assertEquals(config.getCertificate(), "certPath");
+ assertNull(config.getTrustStorePassword());
+ assertNull(config.getTrustStoreType());
+ assertNull(config.getTrustStorePassword());
+
+ config = SchemaRegistryClientConfig.builder().trustStore("trustStorePath", "JKS", null).build();
+ assertNull(config.getCertificate());
+ assertEquals(config.getTrustStore(), "trustStorePath");
+ assertEquals(config.getTrustStoreType(), "JKS");
+ assertNull(config.getTrustStorePassword());
+
+ config = SchemaRegistryClientConfig.builder().trustStore("trustStorePath", "JKS", "password").build();
+ assertNull(config.getCertificate());
+ assertEquals(config.getTrustStore(), "trustStorePath");
+ assertEquals(config.getTrustStoreType(), "JKS");
+ assertEquals(config.getTrustStorePassword(), "password");
+ }
+
+ @Test
+ public void testAuthConfig() {
+ SchemaRegistryClientConfig config = SchemaRegistryClientConfig.builder().build();
+ assertFalse(config.isAuthEnabled());
+
+ config = SchemaRegistryClientConfig.builder().authentication("method", "token").build();
+ assertTrue(config.isAuthEnabled());
+ assertEquals(config.getAuthMethod(), "method");
+ assertEquals(config.getAuthToken(), "token");
+ }
+}
diff --git a/contract/src/main/java/io/pravega/schemaregistry/contract/transform/ModelHelper.java b/contract/src/main/java/io/pravega/schemaregistry/contract/transform/ModelHelper.java
index 3d6892068..4597a1b1c 100644
--- a/contract/src/main/java/io/pravega/schemaregistry/contract/transform/ModelHelper.java
+++ b/contract/src/main/java/io/pravega/schemaregistry/contract/transform/ModelHelper.java
@@ -147,7 +147,7 @@ public static io.pravega.schemaregistry.contract.data.BackwardAndForward.Backwar
return new io.pravega.schemaregistry.contract.data.BackwardAndForward.Backward();
} else if (obj instanceof BackwardTill) {
return new io.pravega.schemaregistry.contract.data.BackwardAndForward.BackwardTill(
- decode(((io.pravega.schemaregistry.contract.generated.rest.model.BackwardTill) backward.getBackwardPolicy()).getVersionInfo()));
+ decode(((io.pravega.schemaregistry.contract.generated.rest.model.BackwardTill) obj).getVersionInfo()));
} else if (obj instanceof BackwardTransitive) {
return new io.pravega.schemaregistry.contract.data.BackwardAndForward.BackwardTransitive();
} else {
@@ -176,7 +176,7 @@ public static io.pravega.schemaregistry.contract.data.BackwardAndForward.Forward
return new io.pravega.schemaregistry.contract.data.BackwardAndForward.Forward();
} else if (obj instanceof ForwardTill) {
return new io.pravega.schemaregistry.contract.data.BackwardAndForward.ForwardTill(
- decode(((io.pravega.schemaregistry.contract.generated.rest.model.ForwardTill) forward.getForwardPolicy()).getVersionInfo()));
+ decode(((io.pravega.schemaregistry.contract.generated.rest.model.ForwardTill) obj).getVersionInfo()));
} else if (obj instanceof ForwardTransitive) {
return new io.pravega.schemaregistry.contract.data.BackwardAndForward.ForwardTransitive();
} else {
diff --git a/contract/src/test/java/io/pravega/schemaregistry/contract/transform/ModelHelperTest.java b/contract/src/test/java/io/pravega/schemaregistry/contract/transform/ModelHelperTest.java
index ab9f92366..9df6ce455 100644
--- a/contract/src/test/java/io/pravega/schemaregistry/contract/transform/ModelHelperTest.java
+++ b/contract/src/test/java/io/pravega/schemaregistry/contract/transform/ModelHelperTest.java
@@ -9,6 +9,7 @@
*/
package io.pravega.schemaregistry.contract.transform;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.pravega.schemaregistry.contract.generated.rest.model.Backward;
import io.pravega.schemaregistry.contract.generated.rest.model.BackwardAndForward;
@@ -28,6 +29,7 @@
import io.pravega.schemaregistry.contract.generated.rest.model.VersionInfo;
import org.junit.Test;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
@@ -35,6 +37,8 @@
import static org.junit.Assert.*;
public class ModelHelperTest {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
@Test
public void testDecode() {
SerializationFormat type = new SerializationFormat().serializationFormat(SerializationFormat.SerializationFormatEnum.CUSTOM).fullTypeName("a");
@@ -159,73 +163,77 @@ public void testEncode() {
}
@Test
- public void testEncodeAndDecodeCompatibility() {
+ public void testEncodeAndDecodeCompatibility() throws IOException {
io.pravega.schemaregistry.contract.data.Compatibility compatibility =
io.pravega.schemaregistry.contract.data.Compatibility.allowAny();
- Compatibility encoded = ModelHelper.encode(compatibility);
+ Compatibility encoded = convert(ModelHelper.encode(compatibility), Compatibility.class);
io.pravega.schemaregistry.contract.data.Compatibility decoded = ModelHelper.decode(encoded);
assertEquals(compatibility, decoded);
compatibility = io.pravega.schemaregistry.contract.data.Compatibility.denyAll();
- encoded = ModelHelper.encode(compatibility);
+ encoded = convert(ModelHelper.encode(compatibility), Compatibility.class);
decoded = ModelHelper.decode(encoded);
assertEquals(compatibility, decoded);
compatibility = io.pravega.schemaregistry.contract.data.Compatibility.backward();
- encoded = ModelHelper.encode(compatibility);
+ encoded = convert(ModelHelper.encode(compatibility), Compatibility.class);
decoded = ModelHelper.decode(encoded);
assertEquals(compatibility, decoded);
compatibility = io.pravega.schemaregistry.contract.data.Compatibility.forward();
- encoded = ModelHelper.encode(compatibility);
+ encoded = convert(ModelHelper.encode(compatibility), Compatibility.class);
decoded = ModelHelper.decode(encoded);
assertEquals(compatibility, decoded);
compatibility = io.pravega.schemaregistry.contract.data.Compatibility.backwardTransitive();
- encoded = ModelHelper.encode(compatibility);
+ encoded = convert(ModelHelper.encode(compatibility), Compatibility.class);
decoded = ModelHelper.decode(encoded);
assertEquals(compatibility, decoded);
compatibility = io.pravega.schemaregistry.contract.data.Compatibility.forwardTransitive();
- encoded = ModelHelper.encode(compatibility);
+ encoded = convert(ModelHelper.encode(compatibility), Compatibility.class);
decoded = ModelHelper.decode(encoded);
assertEquals(compatibility, decoded);
compatibility = io.pravega.schemaregistry.contract.data.Compatibility.full();
- encoded = ModelHelper.encode(compatibility);
+ encoded = convert(ModelHelper.encode(compatibility), Compatibility.class);
decoded = ModelHelper.decode(encoded);
assertEquals(compatibility, decoded);
compatibility = io.pravega.schemaregistry.contract.data.Compatibility.fullTransitive();
- encoded = ModelHelper.encode(compatibility);
+ encoded = convert(ModelHelper.encode(compatibility), Compatibility.class);
decoded = ModelHelper.decode(encoded);
assertEquals(compatibility, decoded);
io.pravega.schemaregistry.contract.data.VersionInfo versionInfo = new io.pravega.schemaregistry.contract.data.VersionInfo("a", 1, 1);
compatibility = io.pravega.schemaregistry.contract.data.Compatibility.backwardTill(versionInfo);
- encoded = ModelHelper.encode(compatibility);
+ encoded = convert(ModelHelper.encode(compatibility), Compatibility.class);
decoded = ModelHelper.decode(encoded);
assertEquals(compatibility, decoded);
compatibility = io.pravega.schemaregistry.contract.data.Compatibility.forwardTill(versionInfo);
- encoded = ModelHelper.encode(compatibility);
+ encoded = convert(ModelHelper.encode(compatibility), Compatibility.class);
decoded = ModelHelper.decode(encoded);
assertEquals(compatibility, decoded);
compatibility = io.pravega.schemaregistry.contract.data.Compatibility.backwardTillAndForwardTill(versionInfo, versionInfo);
- encoded = ModelHelper.encode(compatibility);
+ encoded = convert(ModelHelper.encode(compatibility), Compatibility.class);
decoded = ModelHelper.decode(encoded);
assertEquals(compatibility, decoded);
compatibility = io.pravega.schemaregistry.contract.data.Compatibility.backwardOneAndForwardTill(versionInfo);
- encoded = ModelHelper.encode(compatibility);
+ encoded = convert(ModelHelper.encode(compatibility), Compatibility.class);
decoded = ModelHelper.decode(encoded);
assertEquals(compatibility, decoded);
compatibility = io.pravega.schemaregistry.contract.data.Compatibility.backwardTillAndForwardOne(versionInfo);
- encoded = ModelHelper.encode(compatibility);
+ encoded = convert(ModelHelper.encode(compatibility), Compatibility.class);
decoded = ModelHelper.decode(encoded);
assertEquals(compatibility, decoded);
-
+ }
+
+ private T convert(T t, Class tClass) throws IOException {
+ String str = OBJECT_MAPPER.writeValueAsString(t);
+ return OBJECT_MAPPER.readValue(str, tClass);
}
}
diff --git a/gradle.properties b/gradle.properties
index 3493130ec..867349c4e 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -46,7 +46,7 @@ gradleGitPluginVersion=2.2.0
avroVersion=1.9.1
avroProtobufVersion=1.7.7
snappyVersion=1.1.7.3
-pravegaVersion=0.8.0-2591.37c5082-SNAPSHOT
+pravegaVersion=0.8.0-2623.279ac21-SNAPSHOT
pravegaKeyCloakVersion=0.7.0
# Version and base tags can be overridden at build time
diff --git a/gradle/maven.gradle b/gradle/maven.gradle
index 2bc88598c..e4fbd4aa8 100644
--- a/gradle/maven.gradle
+++ b/gradle/maven.gradle
@@ -39,7 +39,7 @@ plugins.withId('maven') {
}
}
- pom.artifactId = project.path.replace(':', '')
+ pom.artifactId = project.path.replaceFirst(':', '').replace(':', '-')
pom.project {
name "Pravega"
url "http://pravega.io"
@@ -80,7 +80,7 @@ plugins.withId('maven') {
install {
repositories {
mavenInstaller {
- pom.artifactId = project.path.replace(':', '')
+ pom.artifactId = project.path.replaceFirst(':', '').replace(':', '-')
}
}
}
diff --git a/serializers/src/main/java/io/pravega/schemaregistry/serializers/AvroDeserializer.java b/serializers/avro/src/main/java/io/pravega/schemaregistry/serializer/avro/impl/AvroDeserializer.java
similarity index 84%
rename from serializers/src/main/java/io/pravega/schemaregistry/serializers/AvroDeserializer.java
rename to serializers/avro/src/main/java/io/pravega/schemaregistry/serializer/avro/impl/AvroDeserializer.java
index a8db6971b..d73bd2832 100644
--- a/serializers/src/main/java/io/pravega/schemaregistry/serializers/AvroDeserializer.java
+++ b/serializers/avro/src/main/java/io/pravega/schemaregistry/serializer/avro/impl/AvroDeserializer.java
@@ -7,13 +7,16 @@
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
-package io.pravega.schemaregistry.serializers;
+package io.pravega.schemaregistry.serializer.avro.impl;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
+import io.pravega.schemaregistry.serializer.avro.schemas.AvroSchema;
import io.pravega.schemaregistry.client.SchemaRegistryClient;
import io.pravega.schemaregistry.contract.data.SchemaInfo;
-import io.pravega.schemaregistry.schemas.AvroSchema;
+import io.pravega.schemaregistry.serializer.shared.impl.AbstractDeserializer;
+import io.pravega.schemaregistry.serializer.shared.impl.EncodingCache;
+import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
@@ -39,7 +42,7 @@ class AvroDeserializer extends AbstractDeserializer {
}
@Override
- protected T deserialize(InputStream inputStream, SchemaInfo writerSchemaInfo, SchemaInfo readerSchemaInfo) throws IOException {
+ public final T deserialize(InputStream inputStream, SchemaInfo writerSchemaInfo, SchemaInfo readerSchemaInfo) throws IOException {
Preconditions.checkNotNull(writerSchemaInfo);
Schema writerSchema;
if (knownSchemas.containsKey(writerSchemaInfo)) {
diff --git a/serializers/src/main/java/io/pravega/schemaregistry/serializers/AvroGenericDeserializer.java b/serializers/avro/src/main/java/io/pravega/schemaregistry/serializer/avro/impl/AvroGenericDeserializer.java
similarity index 70%
rename from serializers/src/main/java/io/pravega/schemaregistry/serializers/AvroGenericDeserializer.java
rename to serializers/avro/src/main/java/io/pravega/schemaregistry/serializer/avro/impl/AvroGenericDeserializer.java
index 77af89edf..5cd0e23ca 100644
--- a/serializers/src/main/java/io/pravega/schemaregistry/serializers/AvroGenericDeserializer.java
+++ b/serializers/avro/src/main/java/io/pravega/schemaregistry/serializer/avro/impl/AvroGenericDeserializer.java
@@ -7,12 +7,15 @@
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
-package io.pravega.schemaregistry.serializers;
+package io.pravega.schemaregistry.serializer.avro.impl;
import com.google.common.base.Preconditions;
+import io.pravega.schemaregistry.serializer.avro.schemas.AvroSchema;
import io.pravega.schemaregistry.client.SchemaRegistryClient;
import io.pravega.schemaregistry.contract.data.SchemaInfo;
-import io.pravega.schemaregistry.schemas.AvroSchema;
+import io.pravega.schemaregistry.serializer.shared.impl.AbstractDeserializer;
+import io.pravega.schemaregistry.serializer.shared.impl.EncodingCache;
+import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.BinaryDecoder;
@@ -23,17 +26,17 @@
import java.io.InputStream;
import java.util.concurrent.ConcurrentHashMap;
-class AvroGenericDeserializer extends AbstractDeserializer