From 7a8457ee72a2f71f1794a6dde8a93c164b1d22cb Mon Sep 17 00:00:00 2001 From: Shivesh Ranjan Date: Wed, 29 Jul 2020 04:44:27 -0700 Subject: [PATCH 1/5] Issue 86: store non normalized schema Signed-off-by: Shivesh Ranjan --- .../service/SchemaRegistryService.java | 45 ++++---- .../schemaregistry/storage/SchemaStore.java | 11 +- .../storage/impl/SchemaStoreImpl.java | 13 ++- .../storage/impl/group/Group.java | 30 +++--- .../service/SchemaRegistryServiceTest.java | 20 ++-- .../storage/impl/group/GroupPravegaTest.java | 102 ++++++++++++------ .../storage/impl/group/GroupTest.java | 79 ++++++++------ 7 files changed, 182 insertions(+), 118 deletions(-) diff --git a/server/src/main/java/io/pravega/schemaregistry/service/SchemaRegistryService.java b/server/src/main/java/io/pravega/schemaregistry/service/SchemaRegistryService.java index 2a7442663..3d931e1f2 100644 --- a/server/src/main/java/io/pravega/schemaregistry/service/SchemaRegistryService.java +++ b/server/src/main/java/io/pravega/schemaregistry/service/SchemaRegistryService.java @@ -24,6 +24,7 @@ import io.pravega.common.util.Retry; import io.pravega.schemaregistry.ResultPage; import io.pravega.schemaregistry.common.FuturesUtility; +import io.pravega.schemaregistry.common.HashUtil; import io.pravega.schemaregistry.common.NameUtil; import io.pravega.schemaregistry.contract.data.BackwardAndForward; import io.pravega.schemaregistry.contract.data.CodecType; @@ -53,6 +54,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.AbstractMap; import java.util.Collections; @@ -267,22 +269,25 @@ public CompletableFuture addSchema(String namespace, String group, // 2. get checker for serialization format. // validate schema against group compatibility policy on schema // 3. conditionally update the schema - return RETRY.runAsync(() -> store.getGroupEtag(namespace, group) - .thenCompose(etag -> - store.getGroupProperties(namespace, group) - .thenCompose(prop -> { - return Futures.exceptionallyComposeExpecting(store.getSchemaVersion(namespace, group, schema), - e -> Exceptions.unwrap(e) instanceof StoreExceptions.DataNotFoundException, - () -> { // Schema doesnt exist. Validate and add it - return validateSchema(namespace, group, schema, prop.getCompatibility()) - .thenCompose(valid -> { - if (!valid) { - throw new IncompatibleSchemaException(String.format("%s is incompatible", schema.getType())); - } - return store.addSchema(namespace, group, schema, prop, etag); - }); - }); - })), executor) + return RETRY.runAsync(() -> + store.getGroupEtag(namespace, group) + .thenCompose(etag -> + store.getGroupProperties(namespace, group) + .thenCompose(prop -> { + return Futures.exceptionallyComposeExpecting(store.getSchemaVersion(namespace, group, schema, getFingerprint(schema)), + e -> Exceptions.unwrap(e) instanceof StoreExceptions.DataNotFoundException, + () -> { // Schema doesnt exist. Validate and add it + return validateSchema(namespace, group, schema, prop.getCompatibility()) + .thenCompose(valid -> { + if (!valid) { + throw new IncompatibleSchemaException(String.format("%s is incompatible", schema.getType())); + } + // we will compute the fingerprint from normalized form. + return store.addSchema(namespace, group, schemaInfo, schema, + getFingerprint(schema), prop, etag); + }); + }); + })), executor) .whenComplete((r, e) -> { if (e == null) { log.debug("Group {} {}, schema {} added successfully.", namespace, group, schema.getType()); @@ -491,7 +496,7 @@ public CompletableFuture getSchemaVersion(String namespace, String log.debug("Group {} {}, getSchemaVersion for {}.", namespace, group, schemaInfo.getType()); SchemaInfo schema = normalizeSchemaBinary(schemaInfo); - return store.getSchemaVersion(namespace, group, schema) + return store.getSchemaVersion(namespace, group, schema, getFingerprint(schema)) .whenComplete((r, e) -> { if (e == null) { log.debug("Group {} {}, version = {}.", namespace, group, r); @@ -968,11 +973,15 @@ public CompletableFuture> getSchemaReferences(String na return store.getGroupsUsing(namespace, schema) .thenCompose(groups -> Futures.allOfWithResults( groups.stream().collect(Collectors.toMap(x -> x, x -> - Futures.exceptionallyExpecting(store.getSchemaVersion(namespace, x, schema), + Futures.exceptionallyExpecting(store.getSchemaVersion(namespace, x, schema, getFingerprint(schema)), e -> Exceptions.unwrap(e) instanceof StoreExceptions.DataNotFoundException, EMPTY_VERSION)))) .thenApply(result -> { return result.entrySet().stream().filter(x -> !x.getValue().equals(EMPTY_VERSION)) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); })); } + + private BigInteger getFingerprint(SchemaInfo schemaInfo) { + return HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + } } diff --git a/server/src/main/java/io/pravega/schemaregistry/storage/SchemaStore.java b/server/src/main/java/io/pravega/schemaregistry/storage/SchemaStore.java index d8c6ea617..c46bc58be 100644 --- a/server/src/main/java/io/pravega/schemaregistry/storage/SchemaStore.java +++ b/server/src/main/java/io/pravega/schemaregistry/storage/SchemaStore.java @@ -22,6 +22,7 @@ import io.pravega.schemaregistry.contract.data.VersionInfo; import javax.annotation.Nullable; +import java.math.BigInteger; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -235,11 +236,15 @@ public interface SchemaStore { * @param namespace namespace * @param group group * @param schemaInfo schema to add + * @param normalized normalized form of schema to add. + * @param fingerprint 256 bit sha hash of schema binary. + * Two schema binary representation will be considered identical if their fingerprints match. * @param prop group properties applied at the time of schema addition. * @param etag entity tag for the group. * @return Completablefuture that holds version info for the schema that is added. */ - CompletableFuture addSchema(String namespace, String group, SchemaInfo schemaInfo, GroupProperties prop, Etag etag); + CompletableFuture addSchema(String namespace, String group, SchemaInfo schemaInfo, SchemaInfo normalized, + BigInteger fingerprint, GroupProperties prop, Etag etag); /** * Get the version corresponding to the schema. @@ -247,9 +252,11 @@ public interface SchemaStore { * @param namespace namespace * @param group group * @param schemaInfo schemainfo + * @param fingerprint 256 bit sha hash of schema binary. + * Two schema binary representation will be considered identical if their fingerprints match. * @return Completablefuture that holds versioninfo for the schema. */ - CompletableFuture getSchemaVersion(String namespace, String group, SchemaInfo schemaInfo); + CompletableFuture getSchemaVersion(String namespace, String group, SchemaInfo schemaInfo, BigInteger fingerprint); /** * Get the encoding id corresponding to versioninfo and codectype. It returns Etag for the group if the encoding id diff --git a/server/src/main/java/io/pravega/schemaregistry/storage/impl/SchemaStoreImpl.java b/server/src/main/java/io/pravega/schemaregistry/storage/impl/SchemaStoreImpl.java index 9b36f3b15..2004c425b 100644 --- a/server/src/main/java/io/pravega/schemaregistry/storage/impl/SchemaStoreImpl.java +++ b/server/src/main/java/io/pravega/schemaregistry/storage/impl/SchemaStoreImpl.java @@ -29,6 +29,7 @@ import io.pravega.schemaregistry.storage.impl.schemas.Schemas; import javax.annotation.Nullable; +import java.math.BigInteger; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -130,14 +131,16 @@ public CompletableFuture getLatestSchemaVersion(String namesp } @Override - public CompletableFuture addSchema(String namespace, String groupId, SchemaInfo schemaInfo, GroupProperties prop, Etag etag) { - return schemas.addSchema(schemaInfo, namespace, groupId) - .thenCompose(v -> getGroup(namespace, groupId).thenCompose(grp -> grp.addSchema(schemaInfo, prop, etag))); + public CompletableFuture addSchema(String namespace, String groupId, SchemaInfo schemaInfo, SchemaInfo normalized, + BigInteger fingerprint, GroupProperties prop, Etag etag) { + // Store normalized form of schema with the global schemas while the original form is stored within the group. + return schemas.addSchema(normalized, namespace, groupId) + .thenCompose(v -> getGroup(namespace, groupId).thenCompose(grp -> grp.addSchema(schemaInfo, fingerprint, prop, etag))); } @Override - public CompletableFuture getSchemaVersion(String namespace, String groupId, SchemaInfo schemaInfo) { - return getGroup(namespace, groupId).thenCompose(grp -> grp.getVersion(schemaInfo)); + public CompletableFuture getSchemaVersion(String namespace, String groupId, SchemaInfo schemaInfo, BigInteger fingerprint) { + return getGroup(namespace, groupId).thenCompose(grp -> grp.getVersion(schemaInfo, fingerprint)); } @Override diff --git a/server/src/main/java/io/pravega/schemaregistry/storage/impl/group/Group.java b/server/src/main/java/io/pravega/schemaregistry/storage/impl/group/Group.java index 6e6da05b0..81143dea5 100644 --- a/server/src/main/java/io/pravega/schemaregistry/storage/impl/group/Group.java +++ b/server/src/main/java/io/pravega/schemaregistry/storage/impl/group/Group.java @@ -22,7 +22,6 @@ import io.pravega.common.util.ByteArraySegment; import io.pravega.common.util.Retry; import io.pravega.schemaregistry.common.Either; -import io.pravega.schemaregistry.common.HashUtil; import io.pravega.schemaregistry.contract.data.CodecType; import io.pravega.schemaregistry.contract.data.Compatibility; import io.pravega.schemaregistry.contract.data.EncodingId; @@ -43,7 +42,6 @@ import java.math.BigInteger; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.Iterator; @@ -362,8 +360,7 @@ private CompletableFuture getSchema(int id, boolean throwOnDeleted) }); } - public CompletableFuture getVersion(SchemaInfo schemaInfo) { - BigInteger fingerprint = getFingerprint(schemaInfo); + public CompletableFuture getVersion(SchemaInfo schemaInfo, BigInteger fingerprint) { SchemaFingerprintKey key = new SchemaFingerprintKey(fingerprint); return groupTable.getEntry(key, SchemaVersionList.class) @@ -521,10 +518,10 @@ public CompletableFuture> getHistory(String type) { .collect(Collectors.toList())); } - public CompletableFuture addSchema(SchemaInfo schemaInfo, GroupProperties prop, Etag etag) { + public CompletableFuture addSchema(SchemaInfo schemaInfo, BigInteger fingerprint, GroupProperties prop, Etag etag) { List keys = new ArrayList<>(); keys.add(LATEST_SCHEMAS_KEY); - SchemaFingerprintKey schemaFingerprintKey = new SchemaFingerprintKey(getFingerprint(schemaInfo)); + SchemaFingerprintKey schemaFingerprintKey = new SchemaFingerprintKey(fingerprint); keys.add(schemaFingerprintKey); // add or upadte following entries: @@ -653,10 +650,6 @@ public CompletableFuture getGroupProperties() { }); } - private BigInteger getFingerprint(SchemaInfo schemaInfo) { - return HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); - } - private CompletableFuture generateNewEncodingId(VersionInfo versionInfo, String codecType, Etag etag) { return getSchema(versionInfo.getId(), true) .thenCompose(schema -> getCodecTypeNames() @@ -703,14 +696,21 @@ public CompletableFuture> getEncodingId(VersionInfo ver private CompletableFuture findVersion(List versions, SchemaInfo toFind) { AtomicReference found = new AtomicReference<>(); Iterator iterator = versions.iterator(); - return Futures.loop(() -> { - return iterator.hasNext() && found.get() == null; - }, () -> { + return Futures.loop(() -> iterator.hasNext() && found.get() == null, () -> { VersionInfo version = iterator.next(); return Futures.exceptionallyExpecting(getSchema(version.getId(), true) .thenAccept(schema -> { - if (Arrays.equals(schema.getSchemaData().array(), toFind.getSchemaData().array()) - && schema.getType().equals(toFind.getType()) && schema.getSerializationFormat().equals(toFind.getSerializationFormat())) { + // Do note that we store the user supplied schema in its original avatar. While the fingerprint + // is computed on the normalized form. So when we fetch the schemas that have identical fingerprints, + // we will only compare type name and format and declare two schemas equal. + // We can do this with fair confidence because we use the sha 256 hash for fingerprints. + // The probability of collision on two non identical byte arrays to produce same fingerprint + // is next to impossible. + // However, since we also deal with composite schemas (e.g. protobuf file descriptor set or avro union) + // where same schema could include definition for multiple objects and the type name distinguishes + // different entities, we will still need to compare type and format if the schema binary has identical + // fingerprint before we consider two schemas to be identical. + if (schema.getType().equals(toFind.getType()) && schema.getSerializationFormat().equals(toFind.getSerializationFormat())) { found.set(version); } }), e -> Exceptions.unwrap(e) instanceof StoreExceptions.DataNotFoundException, null); diff --git a/server/src/test/java/io/pravega/schemaregistry/service/SchemaRegistryServiceTest.java b/server/src/test/java/io/pravega/schemaregistry/service/SchemaRegistryServiceTest.java index 37e9f6a1c..72aff3432 100644 --- a/server/src/test/java/io/pravega/schemaregistry/service/SchemaRegistryServiceTest.java +++ b/server/src/test/java/io/pravega/schemaregistry/service/SchemaRegistryServiceTest.java @@ -210,10 +210,10 @@ public void testAddSchema() { ByteBuffer.wrap(schemaData), ImmutableMap.of()); VersionInfo versionInfo = new VersionInfo("objectType", 5, 7); - doAnswer(x -> CompletableFuture.completedFuture(versionInfo)).when(store).addSchema(any(), anyString(), any(), - any(), any()); + doAnswer(x -> CompletableFuture.completedFuture(versionInfo)).when(store).addSchema(any(), anyString(), any(), any(), + any(), any(), any()); doAnswer(x -> CompletableFuture.completedFuture(versionInfo)).when(store).getSchemaVersion(any(), anyString(), - any()); + any(), any()); VersionInfo versionInfo1 = service.addSchema(null, "mygroup", schemaInfo).join(); assertEquals(7, versionInfo1.getId()); // SerializationFormatMismatch Exception @@ -225,7 +225,7 @@ public void testAddSchema() { any(), anyString()); doAnswer(x -> Futures.failedFuture( StoreExceptions.create(StoreExceptions.Type.DATA_NOT_FOUND, "Group Not Found"))).when( - store).getSchemaVersion(any(), anyString(), any()); + store).getSchemaVersion(any(), anyString(), any(), any()); AssertExtensions.assertThrows("An exception should have been thrown", () -> service.addSchema(null, "mygroup", schemaInfo).join(), e -> e instanceof SerializationFormatMismatchException); @@ -239,7 +239,7 @@ public void testAddSchema() { any(), anyString()); doAnswer(x -> Futures.failedFuture( StoreExceptions.create(StoreExceptions.Type.DATA_NOT_FOUND, "Group Not Found"))).when( - store).getSchemaVersion(any(), anyString(), any()); + store).getSchemaVersion(any(), anyString(), any(), any()); SchemaWithVersion schemaWithVersion = new SchemaWithVersion(schemaInfo, versionInfo); List schemaWithVersionList = new ArrayList<>(); schemaWithVersionList.add(schemaWithVersion); @@ -248,7 +248,7 @@ public void testAddSchema() { // get CheckCompatibility to fail versionInfo1 = service.addSchema(null, "mygroup", schemaInfo).join(); // Runtime Exception - doAnswer(x -> Futures.failedFuture(new RuntimeException())).when(store).getSchemaVersion(any(), anyString(), + doAnswer(x -> Futures.failedFuture(new RuntimeException())).when(store).getSchemaVersion(any(), anyString(), any(), any()); AssertExtensions.assertThrows("An exception should have been thrown", () -> service.addSchema(null, "mygroup", schemaInfo).join(), e -> e instanceof RuntimeException); @@ -402,7 +402,7 @@ public void testGetGroupHistory() { @Test public void testGetSchemaVersion() { VersionInfo versionInfo = new VersionInfo("objectTYpe", 5, 7); - doAnswer(x -> CompletableFuture.completedFuture(versionInfo)).when(store).getSchemaVersion(any(), anyString(), + doAnswer(x -> CompletableFuture.completedFuture(versionInfo)).when(store).getSchemaVersion(any(), anyString(), any(), any()); byte[] schemaData = new byte[0]; io.pravega.schemaregistry.contract.data.SchemaInfo schemaInfo = @@ -414,12 +414,12 @@ public void testGetSchemaVersion() { //GroupNotFoundException doAnswer(x -> Futures.failedFuture( StoreExceptions.create(StoreExceptions.Type.DATA_NOT_FOUND, "Group NotFound"))).when( - store).getSchemaVersion(any(), anyString(), any()); + store).getSchemaVersion(any(), anyString(), any(), any()); AssertExtensions.assertThrows("An Exception should have been thrown", () -> service.getSchemaVersion(null, "mygroup", schemaInfo).join(), e -> e instanceof StoreExceptions.DataNotFoundException); //Runtime Exception - doAnswer(x -> Futures.failedFuture(new RuntimeException())).when(store).getSchemaVersion(any(), anyString(), + doAnswer(x -> Futures.failedFuture(new RuntimeException())).when(store).getSchemaVersion(any(), anyString(), any(), any()); AssertExtensions.assertThrows("An Exception should have been thrown", () -> service.getSchemaVersion(null, "mygroup", schemaInfo).join(), e -> e instanceof RuntimeException); @@ -552,7 +552,7 @@ public void testGetSchemaReferences() { List groupNameList = new ArrayList<>(); groupNameList.add(groupName); doAnswer(x -> CompletableFuture.completedFuture(groupNameList)).when(store).getGroupsUsing(any(), any()); - doAnswer(x -> CompletableFuture.completedFuture(versionInfo)).when(store).getSchemaVersion(any(), anyString(), + doAnswer(x -> CompletableFuture.completedFuture(versionInfo)).when(store).getSchemaVersion(any(), anyString(), any(), any()); Map map = service.getSchemaReferences(null, schemaInfo).join(); assertTrue(map.get(groupName).equals(versionInfo)); diff --git a/server/src/test/java/io/pravega/schemaregistry/storage/impl/group/GroupPravegaTest.java b/server/src/test/java/io/pravega/schemaregistry/storage/impl/group/GroupPravegaTest.java index 3fc4d5382..7cbeb284e 100644 --- a/server/src/test/java/io/pravega/schemaregistry/storage/impl/group/GroupPravegaTest.java +++ b/server/src/test/java/io/pravega/schemaregistry/storage/impl/group/GroupPravegaTest.java @@ -40,6 +40,7 @@ import org.junit.Before; import org.junit.Test; +import java.math.BigInteger; import java.net.URI; import java.nio.ByteBuffer; import java.util.Arrays; @@ -140,13 +141,15 @@ public void testGetLatestSchemas() { SchemaInfo schemaInfo1 = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, etag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, etag).join(); schemaWithVersionList = pravegaKeyValueGroups.getGroup(null, groupName).join().getLatestSchemas().join(); assertEquals(1, schemaWithVersionList.size()); assertEquals(anygroup, schemaWithVersionList.get(0).getSchemaInfo().getType()); // two schemas etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo1, groupProperties, etag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo1, + HashUtil.getFingerprint(schemaInfo1.getSchemaData().array()), groupProperties, etag).join(); schemaWithVersionList = pravegaKeyValueGroups.getGroup(null, groupName).join().getLatestSchemas().join(); assertEquals(2, schemaWithVersionList.size()); assertEquals(anygroup, schemaWithVersionList.get(0).getSchemaInfo().getType()); @@ -167,7 +170,8 @@ public void testAddSchema() { SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, etag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), + groupProperties, etag).join(); GroupsValue gv = tableStore.getEntry(GROUPS, new NamespaceAndGroup(null, groupName).toBytes(), GroupsValue::fromBytes).join().getRecord(); // one schema @@ -201,16 +205,17 @@ public void testAddSchema() { SchemaInfo schemaInfo1 = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo1, groupProperties, etag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo1, HashUtil.getFingerprint(schemaInfo1.getSchemaData().array()), + groupProperties, etag).join(); VersionInfo versionInfo1 = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion( - schemaInfo1).join(); + schemaInfo1, HashUtil.getFingerprint(schemaInfo1.getSchemaData().array())).join(); SchemaInfo schemaInfo2 = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo2, groupProperties, etag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo2, HashUtil.getFingerprint(schemaInfo2.getSchemaData().array()), groupProperties, etag).join(); VersionInfo versionInfo2 = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion( - schemaInfo2).join(); + schemaInfo2, HashUtil.getFingerprint(schemaInfo1.getSchemaData().array())).join(); //versionInfo key schemaRecord = tableStore.getEntry(String.format(TABLE_NAME_FORMAT, gv.getId()), new TableKeySerializer().toBytes(new TableRecords.SchemaIdKey(versionInfo1.getId())), @@ -259,14 +264,16 @@ public void testGetSchemas() { SchemaInfo schemaInfo1 = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, etag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, etag).join(); List schemaWitheVersionList = pravegaKeyValueGroups.getGroup(null, groupName).join().getSchemas().join(); assertEquals(1, schemaWitheVersionList.size()); assertEquals(anygroup, schemaWitheVersionList.get(0).getSchemaInfo().getType()); //two schemas etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo1, groupProperties, etag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo1, + HashUtil.getFingerprint(schemaInfo1.getSchemaData().array()), groupProperties, etag).join(); schemaWitheVersionList = pravegaKeyValueGroups.getGroup(null, groupName).join().getSchemas().join(); assertEquals(2, schemaWitheVersionList.size()); assertEquals(anygroup, schemaWitheVersionList.get(0).getSchemaInfo().getType()); @@ -280,7 +287,8 @@ public void testGetSchemas() { SchemaInfo schemaInfo2 = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo2, groupProperties, etag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo2, + HashUtil.getFingerprint(schemaInfo2.getSchemaData().array()), groupProperties, etag).join(); schemaWitheVersionList = pravegaKeyValueGroups.getGroup(null, groupName).join().getSchemas(anygroup, 1).join(); assertEquals(1, schemaWitheVersionList.size()); assertEquals(ImmutableMap.of(), @@ -305,12 +313,15 @@ public void testGetVersion() { SchemaInfo schemaInfo1 = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, etag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, etag).join(); etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo1, groupProperties, etag).join(); - VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo1, + HashUtil.getFingerprint(schemaInfo1.getSchemaData().array()), groupProperties, etag).join(); + VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array())).join(); VersionInfo versionInfo1 = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion( - schemaInfo1).join(); + schemaInfo1, HashUtil.getFingerprint(schemaInfo1.getSchemaData().array())).join(); TableRecords.SchemaVersionList schemaVersionList = tableStore.getEntry( String.format(TABLE_NAME_FORMAT, gv.getId()), new TableKeySerializer().toBytes( new TableRecords.SchemaFingerprintKey( @@ -391,8 +402,11 @@ public void testCreateEncodingId() { SchemaInfo schemaInfo = new SchemaInfo("anygroup", SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo, + fingerprint).join(); pravegaKeyValueGroups.getGroup(null, groupName).join().addCodecType(new CodecType("gzip")).join(); eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); EncodingId encodingId = pravegaKeyValueGroups.getGroup(null, groupName).join().createEncodingId(versionInfo, @@ -423,8 +437,11 @@ public void testGetEncodingInfo() { SchemaInfo schemaInfo = new SchemaInfo("anygroup", SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo, + fingerprint).join(); pravegaKeyValueGroups.getGroup(null, groupName).join().addCodecType(new CodecType("gzip")).join(); eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); EncodingId encodingId = pravegaKeyValueGroups.getGroup(null, groupName).join().createEncodingId(versionInfo, @@ -454,8 +471,11 @@ public void testGetEncodingId() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo, + fingerprint).join(); pravegaKeyValueGroups.getGroup(null, groupName).join().addCodecType(new CodecType("gzip")).join(); Either idEtagEither = pravegaKeyValueGroups.getGroup(null, groupName).join().getEncodingId( versionInfo, @@ -499,15 +519,18 @@ public void testGetLatestSchemaVersion() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); schemaInfo = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); SchemaWithVersion schemaWithVersion = pravegaKeyValueGroups.getGroup( null, groupName).join().getLatestSchemaVersion().join(); assertEquals(anygroup1, schemaWithVersion.getSchemaInfo().getType()); @@ -540,12 +563,14 @@ public void testGetHistory() { SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Avro, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); schemaData = new byte[5]; schemaInfo = new SchemaInfo(anygroup1, SerializationFormat.Avro, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); List groupHistoryRecords = pravegaKeyValueGroups.getGroup(null, groupName).join().getHistory().join(); assertEquals(2, groupHistoryRecords.size()); @@ -556,7 +581,8 @@ public void testGetHistory() { schemaInfo = new SchemaInfo(anygroup1, SerializationFormat.Avro, ByteBuffer.wrap(schemaData1), ImmutableMap.of()); eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); groupHistoryRecords = pravegaKeyValueGroups.getGroup(null, groupName).join().getHistory(anygroup1).join(); assertEquals(2, groupHistoryRecords.size()); assertTrue(Arrays.equals(ByteBuffer.wrap(schemaData).array(), @@ -638,13 +664,17 @@ public void testDeleteSchema() { SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Avro, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); schemaData = new byte[5]; schemaInfo = new SchemaInfo(anygroup1, SerializationFormat.Avro, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo, + fingerprint).join(); Etag etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); pravegaKeyValueGroups.getGroup(null, groupName).join().deleteSchema(versionInfo.getId(), etag).join(); TableRecords.VersionDeletedRecord versionDeletedRecordKey = new TableRecords.VersionDeletedRecord( @@ -687,8 +717,11 @@ public void testGetSchema() { SchemaInfo schemaInfo = new SchemaInfo("anygroup", SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo1 = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo1 = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo, + fingerprint).join(); SchemaInfo schemaInfo1 = pravegaKeyValueGroups.getGroup(null, groupName).join().getSchema( versionInfo1.getId()).join(); assertEquals(schemaInfo, schemaInfo1); @@ -721,8 +754,11 @@ public void testGetSchemaUsingTypeAndVersion() { SchemaInfo schemaInfo = new SchemaInfo("anygroup", SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo1 = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo1 = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo, + fingerprint).join(); SchemaInfo schemaInfo1 = pravegaKeyValueGroups.getGroup(null, groupName).join().getSchema( versionInfo1.getType(), versionInfo1.getVersion()).join(); diff --git a/server/src/test/java/io/pravega/schemaregistry/storage/impl/group/GroupTest.java b/server/src/test/java/io/pravega/schemaregistry/storage/impl/group/GroupTest.java index 97133459b..5ce47309e 100644 --- a/server/src/test/java/io/pravega/schemaregistry/storage/impl/group/GroupTest.java +++ b/server/src/test/java/io/pravega/schemaregistry/storage/impl/group/GroupTest.java @@ -11,6 +11,7 @@ import com.google.common.collect.ImmutableMap; import io.pravega.schemaregistry.common.Either; +import io.pravega.schemaregistry.common.HashUtil; import io.pravega.schemaregistry.contract.data.CodecType; import io.pravega.schemaregistry.contract.data.Compatibility; import io.pravega.schemaregistry.contract.data.EncodingId; @@ -29,6 +30,7 @@ import org.junit.Before; import org.junit.Test; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; @@ -111,9 +113,9 @@ public void testGetTypes() { ImmutableMap.of()); SchemaInfo schemaInfo1 = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); eTag = inMemoryGroup.getCurrentEtag().join(); - inMemoryGroup.addSchema(schemaInfo1, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo1, HashUtil.getFingerprint(schemaInfo1.getSchemaData().array()), groupProperties, eTag).join(); schemaWithVersionList = inMemoryGroup.getLatestSchemas().join(); assertEquals(2, schemaWithVersionList.size()); assertEquals(anygroup, schemaWithVersionList.get(0).getSchemaInfo().getType()); @@ -132,7 +134,7 @@ public void testAddSchema() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); List tableValueListEtag = inMemoryGroupTable.getTable().entrySet().stream().filter( x -> x.getKey() instanceof TableRecords.Etag).map( x -> x.getValue().getValue()).collect( @@ -189,9 +191,9 @@ public void testGetSchemas() { ImmutableMap.of()); SchemaInfo schemaInfo1 = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); eTag = inMemoryGroup.getCurrentEtag().join(); - inMemoryGroup.addSchema(schemaInfo1, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo1, HashUtil.getFingerprint(schemaInfo1.getSchemaData().array()), groupProperties, eTag).join(); List schemaWithVersionListWithToken = inMemoryGroup.getSchemas().join(); assertEquals(2, schemaWithVersionListWithToken.size()); assertEquals(SerializationFormat.Custom, @@ -236,11 +238,11 @@ public void testGetVersion() { ImmutableMap.of()); SchemaInfo schemaInfo1 = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); eTag = inMemoryGroup.getCurrentEtag().join(); - inMemoryGroup.addSchema(schemaInfo1, groupProperties, eTag).join(); - VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo).join(); - VersionInfo versionInfo1 = inMemoryGroup.getVersion(schemaInfo1).join(); + inMemoryGroup.addSchema(schemaInfo1, HashUtil.getFingerprint(schemaInfo1.getSchemaData().array()), groupProperties, eTag).join(); + VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array())).join(); + VersionInfo versionInfo1 = inMemoryGroup.getVersion(schemaInfo1, HashUtil.getFingerprint(schemaInfo1.getSchemaData().array())).join(); List tableValueListVersionInfo = inMemoryGroupTable.getTable().entrySet().stream().filter( x -> x.getKey() instanceof TableRecords.SchemaIdKey).map( @@ -278,8 +280,9 @@ public void testCreateEncodingId() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + inMemoryGroup.addSchema(schemaInfo, fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo, fingerprint).join(); inMemoryGroup.addCodecType(new CodecType("gzip")).join(); eTag = inMemoryGroup.getCurrentEtag().join(); EncodingId encodingId = inMemoryGroup.createEncodingId(versionInfo, "gzip", eTag).join(); @@ -288,8 +291,9 @@ public void testCreateEncodingId() { schemaData = new byte[5]; schemaInfo = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); - versionInfo = inMemoryGroup.getVersion(schemaInfo).join(); + fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + inMemoryGroup.addSchema(schemaInfo, fingerprint, groupProperties, eTag).join(); + versionInfo = inMemoryGroup.getVersion(schemaInfo, fingerprint).join(); eTag = inMemoryGroup.getCurrentEtag().join(); EncodingId encodingId1 = inMemoryGroup.createEncodingId(versionInfo, "snappy", eTag).join(); List encodingInfoRecordList = inMemoryGroupTable.getTable().entrySet().stream().filter( @@ -316,8 +320,9 @@ public void testGetEncodingInfo() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + inMemoryGroup.addSchema(schemaInfo, fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo, fingerprint).join(); inMemoryGroup.addCodecType(new CodecType("gzip")).join(); eTag = inMemoryGroup.getCurrentEtag().join(); EncodingId encodingId = inMemoryGroup.createEncodingId(versionInfo, "gzip", eTag).join(); @@ -343,8 +348,9 @@ public void testGetLatestSchemaVersion() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + inMemoryGroup.addSchema(schemaInfo, fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo, fingerprint).join(); schemaWithVersion = inMemoryGroup.getLatestSchemaVersion().join(); assertEquals(versionInfo, schemaWithVersion.getVersionInfo()); assertEquals(anygroup, schemaWithVersion.getSchemaInfo().getType()); @@ -354,8 +360,9 @@ public void testGetLatestSchemaVersion() { schemaData = new byte[5]; SchemaInfo schemaInfo1 = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo1, groupProperties, eTag).join(); - VersionInfo versionInfo1 = inMemoryGroup.getVersion(schemaInfo1).join(); + BigInteger fingerprint1 = HashUtil.getFingerprint(schemaInfo1.getSchemaData().array()); + inMemoryGroup.addSchema(schemaInfo1, fingerprint1, groupProperties, eTag).join(); + VersionInfo versionInfo1 = inMemoryGroup.getVersion(schemaInfo1, fingerprint1).join(); // null schemaWithVersion = inMemoryGroup.getLatestSchemaVersion("anygroup2").join(); assertNull(schemaWithVersion); @@ -406,12 +413,12 @@ public void testGetHistory() { byte[] schemaData = new byte[3]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Avro, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); eTag = inMemoryGroup.getCurrentEtag().join(); schemaData = new byte[5]; schemaInfo = new SchemaInfo(anygroup1, SerializationFormat.Avro, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); List groupHistoryRecords = inMemoryGroup.getHistory().join(); assertEquals(2, groupHistoryRecords.size()); assertEquals(anygroup, groupHistoryRecords.get(0).getSchemaInfo().getType()); @@ -421,7 +428,7 @@ public void testGetHistory() { schemaInfo = new SchemaInfo(anygroup1, SerializationFormat.Avro, ByteBuffer.wrap(schemaData1), ImmutableMap.of()); eTag = inMemoryGroup.getCurrentEtag().join(); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); groupHistoryRecords = inMemoryGroup.getHistory(anygroup1).join(); assertEquals(2, groupHistoryRecords.size()); assertTrue(Arrays.equals(ByteBuffer.wrap(schemaData).array(), @@ -471,8 +478,8 @@ public void testGetEncodingId() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); + VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array())).join(); inMemoryGroup.addCodecType(new CodecType("gzip")).join(); Either idEtagEither = inMemoryGroup.getEncodingId(versionInfo, "gzip").join(); assertTrue(idEtagEither.isRight()); @@ -484,8 +491,8 @@ public void testGetEncodingId() { schemaData = new byte[5]; schemaInfo = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag); - VersionInfo versionInfo1 = inMemoryGroup.getVersion(schemaInfo).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag); + VersionInfo versionInfo1 = inMemoryGroup.getVersion(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array())).join(); eTag = inMemoryGroup.getCurrentEtag().join(); EncodingId encodingId1 = inMemoryGroup.createEncodingId(versionInfo1, "snappy", eTag).join(); idEtagEither = inMemoryGroup.getEncodingId(versionInfo, "gzip").join(); @@ -508,13 +515,13 @@ public void testDeleteSchema() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); eTag = inMemoryGroup.getCurrentEtag().join(); schemaData = new byte[5]; schemaInfo = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag); - VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag); + VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array())).join(); eTag = inMemoryGroup.getCurrentEtag().join(); inMemoryGroup.deleteSchema(versionInfo.getId(), eTag).join(); List deletedOrdinalList = inMemoryGroupTable.getTable().entrySet().stream().filter( @@ -542,8 +549,9 @@ public void testGetSchema() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + inMemoryGroup.addSchema(schemaInfo, fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo, fingerprint).join(); SchemaInfo schemaInfo1 = inMemoryGroup.getSchema(versionInfo.getId()).join(); assertEquals(schemaInfo, schemaInfo1); List schemaRecordValues = inMemoryGroupTable.getTable().entrySet().stream().filter( @@ -569,8 +577,9 @@ public void testGetSchemaUsingTypeAndVersion() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + inMemoryGroup.addSchema(schemaInfo, fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo, fingerprint).join(); SchemaInfo schemaInfo1 = inMemoryGroup.getSchema(versionInfo.getType(), versionInfo.getVersion()).join(); assertEquals(schemaInfo, schemaInfo1); // testing with 2 schemas @@ -578,8 +587,8 @@ public void testGetSchemaUsingTypeAndVersion() { schemaData = new byte[5]; SchemaInfo schemaInfo2 = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo2, groupProperties, eTag); - VersionInfo versionInfo1 = inMemoryGroup.getVersion(schemaInfo2).join(); + inMemoryGroup.addSchema(schemaInfo2, HashUtil.getFingerprint(schemaInfo2.getSchemaData().array()), groupProperties, eTag); + VersionInfo versionInfo1 = inMemoryGroup.getVersion(schemaInfo2, HashUtil.getFingerprint(schemaInfo2.getSchemaData().array())).join(); SchemaInfo schemaInfo3 = inMemoryGroup.getSchema(versionInfo1.getType(), versionInfo1.getVersion()).join(); assertEquals(schemaInfo2, schemaInfo3); // testing with incorrect input data - getVersionOrdianal will fail From 6c24e12dbd3160a526790389b38edeb45cb2c119 Mon Sep 17 00:00:00 2001 From: Shivesh Ranjan Date: Wed, 29 Jul 2020 05:52:38 -0700 Subject: [PATCH 2/5] Unit test for json string normalization Signed-off-by: Shivesh Ranjan --- .../service/SchemaRegistryService.java | 3 +- .../service/SchemaRegistryServiceTest.java | 57 +++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/io/pravega/schemaregistry/service/SchemaRegistryService.java b/server/src/main/java/io/pravega/schemaregistry/service/SchemaRegistryService.java index 3d931e1f2..016c5fb69 100644 --- a/server/src/main/java/io/pravega/schemaregistry/service/SchemaRegistryService.java +++ b/server/src/main/java/io/pravega/schemaregistry/service/SchemaRegistryService.java @@ -860,7 +860,8 @@ private SchemaInfo normalizeSchemaBinary(SchemaInfo schemaInfo) { // in alphabetical order. This ensures that identical schemas with different order of fields are // treated to be equal. JsonNode jsonNode = OBJECT_MAPPER.readTree(schemaString); - schemaBinary = ByteBuffer.wrap(OBJECT_MAPPER.writeValueAsString(jsonNode).getBytes(Charsets.UTF_8)); + Object obj = OBJECT_MAPPER.treeToValue(jsonNode, Object.class); + schemaBinary = ByteBuffer.wrap(OBJECT_MAPPER.writeValueAsString(obj).getBytes(Charsets.UTF_8)); break; case Any: break; diff --git a/server/src/test/java/io/pravega/schemaregistry/service/SchemaRegistryServiceTest.java b/server/src/test/java/io/pravega/schemaregistry/service/SchemaRegistryServiceTest.java index 72aff3432..be4c720f9 100644 --- a/server/src/test/java/io/pravega/schemaregistry/service/SchemaRegistryServiceTest.java +++ b/server/src/test/java/io/pravega/schemaregistry/service/SchemaRegistryServiceTest.java @@ -9,6 +9,7 @@ */ package io.pravega.schemaregistry.service; +import com.google.common.base.Charsets; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.pravega.common.Exceptions; @@ -31,6 +32,7 @@ import io.pravega.schemaregistry.storage.ContinuationToken; import io.pravega.schemaregistry.storage.Etag; import io.pravega.schemaregistry.storage.SchemaStore; +import io.pravega.schemaregistry.storage.SchemaStoreFactory; import io.pravega.schemaregistry.storage.StoreExceptions; import io.pravega.schemaregistry.storage.impl.group.InMemoryGroupTable; import io.pravega.test.common.AssertExtensions; @@ -48,6 +50,7 @@ import java.util.concurrent.ScheduledExecutorService; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -645,4 +648,58 @@ public void testDeleteUsingTypeAndVersion() { () -> service.deleteSchema(null, groupName, schemaName, version).join(), e -> e instanceof RuntimeException); } + + @Test + public void testSchemaNormalization() { + SchemaStore schemaStore = SchemaStoreFactory.createInMemoryStore(executor); + SchemaRegistryService service = new SchemaRegistryService(schemaStore, executor); + String namespace = "n"; + String group = "g"; + service.createGroup(namespace, group, + GroupProperties.builder().allowMultipleTypes(false).properties(ImmutableMap.builder().build()) + .serializationFormat(SerializationFormat.Json) + .compatibility(Compatibility.allowAny()).build()).join(); + + String jsonSchemaString = "{" + + "\"title\": \"Person\", " + + "\"type\": \"object\", " + + "\"properties\": { " + + "\"name\": {" + + "\"type\": \"string\"" + + "}," + + "\"age\": {" + + "\"type\": \"integer\", \"minimum\": 0" + + "}" + + "}" + + "}"; + String jsonSchemaString2 = "{" + + "\"title\": \"Person\", " + + "\"type\": \"object\", " + + "\"properties\": { " + + "\"age\": {" + + "\"type\": \"integer\", \"minimum\": 0" + + "}," + + "\"name\": {" + + "\"type\": \"string\"" + + "}" + + "}" + + "}"; + SchemaInfo original = SchemaInfo.builder().type("person").serializationFormat(SerializationFormat.Json) + .schemaData(ByteBuffer.wrap(jsonSchemaString.getBytes(Charsets.UTF_8))) + .properties(ImmutableMap.of()).build(); + VersionInfo v = service.addSchema(namespace, group, original).join(); + SchemaInfo schema = service.getSchema(namespace, group, v.getId()).join(); + assertEquals(schema, original); + + // check with different order + SchemaInfo secondOrder = SchemaInfo.builder().type("person").serializationFormat(SerializationFormat.Json) + .schemaData(ByteBuffer.wrap(jsonSchemaString2.getBytes(Charsets.UTF_8))) + .properties(ImmutableMap.of()).build(); + VersionInfo v2 = service.addSchema(namespace, group, secondOrder).join(); + // add should have been idempotent + assertEquals(v2, v); + + schema = service.getSchema(namespace, group, v.getId()).join(); + assertNotEquals(schema, secondOrder); + } } \ No newline at end of file From eb61c9e36db5929a48d228f9ddad1db9bd6399b7 Mon Sep 17 00:00:00 2001 From: Shivesh Ranjan Date: Fri, 21 Aug 2020 04:57:41 -0700 Subject: [PATCH 3/5] Issue 103: Class cast Exception Signed-off-by: Shivesh Ranjan --- .../contract/transform/ModelHelper.java | 4 +- .../contract/transform/ModelHelperTest.java | 38 +++++++++++-------- .../integrationtest/TestEndToEnd.java | 2 + 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/contract/src/main/java/io/pravega/schemaregistry/contract/transform/ModelHelper.java b/contract/src/main/java/io/pravega/schemaregistry/contract/transform/ModelHelper.java index 3d6892068..4597a1b1c 100644 --- a/contract/src/main/java/io/pravega/schemaregistry/contract/transform/ModelHelper.java +++ b/contract/src/main/java/io/pravega/schemaregistry/contract/transform/ModelHelper.java @@ -147,7 +147,7 @@ public static io.pravega.schemaregistry.contract.data.BackwardAndForward.Backwar return new io.pravega.schemaregistry.contract.data.BackwardAndForward.Backward(); } else if (obj instanceof BackwardTill) { return new io.pravega.schemaregistry.contract.data.BackwardAndForward.BackwardTill( - decode(((io.pravega.schemaregistry.contract.generated.rest.model.BackwardTill) backward.getBackwardPolicy()).getVersionInfo())); + decode(((io.pravega.schemaregistry.contract.generated.rest.model.BackwardTill) obj).getVersionInfo())); } else if (obj instanceof BackwardTransitive) { return new io.pravega.schemaregistry.contract.data.BackwardAndForward.BackwardTransitive(); } else { @@ -176,7 +176,7 @@ public static io.pravega.schemaregistry.contract.data.BackwardAndForward.Forward return new io.pravega.schemaregistry.contract.data.BackwardAndForward.Forward(); } else if (obj instanceof ForwardTill) { return new io.pravega.schemaregistry.contract.data.BackwardAndForward.ForwardTill( - decode(((io.pravega.schemaregistry.contract.generated.rest.model.ForwardTill) forward.getForwardPolicy()).getVersionInfo())); + decode(((io.pravega.schemaregistry.contract.generated.rest.model.ForwardTill) obj).getVersionInfo())); } else if (obj instanceof ForwardTransitive) { return new io.pravega.schemaregistry.contract.data.BackwardAndForward.ForwardTransitive(); } else { diff --git a/contract/src/test/java/io/pravega/schemaregistry/contract/transform/ModelHelperTest.java b/contract/src/test/java/io/pravega/schemaregistry/contract/transform/ModelHelperTest.java index ab9f92366..9df6ce455 100644 --- a/contract/src/test/java/io/pravega/schemaregistry/contract/transform/ModelHelperTest.java +++ b/contract/src/test/java/io/pravega/schemaregistry/contract/transform/ModelHelperTest.java @@ -9,6 +9,7 @@ */ package io.pravega.schemaregistry.contract.transform; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import io.pravega.schemaregistry.contract.generated.rest.model.Backward; import io.pravega.schemaregistry.contract.generated.rest.model.BackwardAndForward; @@ -28,6 +29,7 @@ import io.pravega.schemaregistry.contract.generated.rest.model.VersionInfo; import org.junit.Test; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; @@ -35,6 +37,8 @@ import static org.junit.Assert.*; public class ModelHelperTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Test public void testDecode() { SerializationFormat type = new SerializationFormat().serializationFormat(SerializationFormat.SerializationFormatEnum.CUSTOM).fullTypeName("a"); @@ -159,73 +163,77 @@ public void testEncode() { } @Test - public void testEncodeAndDecodeCompatibility() { + public void testEncodeAndDecodeCompatibility() throws IOException { io.pravega.schemaregistry.contract.data.Compatibility compatibility = io.pravega.schemaregistry.contract.data.Compatibility.allowAny(); - Compatibility encoded = ModelHelper.encode(compatibility); + Compatibility encoded = convert(ModelHelper.encode(compatibility), Compatibility.class); io.pravega.schemaregistry.contract.data.Compatibility decoded = ModelHelper.decode(encoded); assertEquals(compatibility, decoded); compatibility = io.pravega.schemaregistry.contract.data.Compatibility.denyAll(); - encoded = ModelHelper.encode(compatibility); + encoded = convert(ModelHelper.encode(compatibility), Compatibility.class); decoded = ModelHelper.decode(encoded); assertEquals(compatibility, decoded); compatibility = io.pravega.schemaregistry.contract.data.Compatibility.backward(); - encoded = ModelHelper.encode(compatibility); + encoded = convert(ModelHelper.encode(compatibility), Compatibility.class); decoded = ModelHelper.decode(encoded); assertEquals(compatibility, decoded); compatibility = io.pravega.schemaregistry.contract.data.Compatibility.forward(); - encoded = ModelHelper.encode(compatibility); + encoded = convert(ModelHelper.encode(compatibility), Compatibility.class); decoded = ModelHelper.decode(encoded); assertEquals(compatibility, decoded); compatibility = io.pravega.schemaregistry.contract.data.Compatibility.backwardTransitive(); - encoded = ModelHelper.encode(compatibility); + encoded = convert(ModelHelper.encode(compatibility), Compatibility.class); decoded = ModelHelper.decode(encoded); assertEquals(compatibility, decoded); compatibility = io.pravega.schemaregistry.contract.data.Compatibility.forwardTransitive(); - encoded = ModelHelper.encode(compatibility); + encoded = convert(ModelHelper.encode(compatibility), Compatibility.class); decoded = ModelHelper.decode(encoded); assertEquals(compatibility, decoded); compatibility = io.pravega.schemaregistry.contract.data.Compatibility.full(); - encoded = ModelHelper.encode(compatibility); + encoded = convert(ModelHelper.encode(compatibility), Compatibility.class); decoded = ModelHelper.decode(encoded); assertEquals(compatibility, decoded); compatibility = io.pravega.schemaregistry.contract.data.Compatibility.fullTransitive(); - encoded = ModelHelper.encode(compatibility); + encoded = convert(ModelHelper.encode(compatibility), Compatibility.class); decoded = ModelHelper.decode(encoded); assertEquals(compatibility, decoded); io.pravega.schemaregistry.contract.data.VersionInfo versionInfo = new io.pravega.schemaregistry.contract.data.VersionInfo("a", 1, 1); compatibility = io.pravega.schemaregistry.contract.data.Compatibility.backwardTill(versionInfo); - encoded = ModelHelper.encode(compatibility); + encoded = convert(ModelHelper.encode(compatibility), Compatibility.class); decoded = ModelHelper.decode(encoded); assertEquals(compatibility, decoded); compatibility = io.pravega.schemaregistry.contract.data.Compatibility.forwardTill(versionInfo); - encoded = ModelHelper.encode(compatibility); + encoded = convert(ModelHelper.encode(compatibility), Compatibility.class); decoded = ModelHelper.decode(encoded); assertEquals(compatibility, decoded); compatibility = io.pravega.schemaregistry.contract.data.Compatibility.backwardTillAndForwardTill(versionInfo, versionInfo); - encoded = ModelHelper.encode(compatibility); + encoded = convert(ModelHelper.encode(compatibility), Compatibility.class); decoded = ModelHelper.decode(encoded); assertEquals(compatibility, decoded); compatibility = io.pravega.schemaregistry.contract.data.Compatibility.backwardOneAndForwardTill(versionInfo); - encoded = ModelHelper.encode(compatibility); + encoded = convert(ModelHelper.encode(compatibility), Compatibility.class); decoded = ModelHelper.decode(encoded); assertEquals(compatibility, decoded); compatibility = io.pravega.schemaregistry.contract.data.Compatibility.backwardTillAndForwardOne(versionInfo); - encoded = ModelHelper.encode(compatibility); + encoded = convert(ModelHelper.encode(compatibility), Compatibility.class); decoded = ModelHelper.decode(encoded); assertEquals(compatibility, decoded); - + } + + private T convert(T t, Class tClass) throws IOException { + String str = OBJECT_MAPPER.writeValueAsString(t); + return OBJECT_MAPPER.readValue(str, tClass); } } diff --git a/test/src/test/java/io/pravega/schemaregistry/integrationtest/TestEndToEnd.java b/test/src/test/java/io/pravega/schemaregistry/integrationtest/TestEndToEnd.java index 9898ed664..4824dde3d 100644 --- a/test/src/test/java/io/pravega/schemaregistry/integrationtest/TestEndToEnd.java +++ b/test/src/test/java/io/pravega/schemaregistry/integrationtest/TestEndToEnd.java @@ -163,6 +163,8 @@ public void testEndToEnd() { assertEquals(version2.getId(), 1); assertEquals(version2.getType(), myTest); + assertTrue(client.updateCompatibility(group, Compatibility.backwardTillAndForwardOne(version1), null)); + assertFalse(client.updateCompatibility(group, Compatibility.fullTransitive(), Compatibility.forward())); assertTrue(client.updateCompatibility(group, Compatibility.fullTransitive(), null)); From 72a7d2b2b18350a5029d924e8e215ff4b486b2ec Mon Sep 17 00:00:00 2001 From: shivesh ranjan <13659022+shiveshr@users.noreply.github.com> Date: Mon, 7 Sep 2020 11:32:13 +0530 Subject: [PATCH 4/5] Add README.md (#106) * Issue 103: Class cast Exception Signed-off-by: Shivesh Ranjan * Add README.md Signed-off-by: Shivesh Ranjan * Update comment Signed-off-by: Shivesh Ranjan * license Signed-off-by: Shivesh Ranjan * PR comment Signed-off-by: Shivesh Ranjan --- README.md | 76 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 000000000..78b409ee9 --- /dev/null +++ b/README.md @@ -0,0 +1,76 @@ + + +# Pravega Schema Registry Repository + +Pravega Schema Registry is the latest service offering from Pravega family. The registry service is designed to store and manage schemas for the unstructured data stored in Pravega streams. The service is designed to not be limited to the data stored in Pravega and can serve as a general purpose management solution for storing and evolving schemas in wide variety of streaming and non streaming use cases. + +It provides RESTful interface to store and manage schemas under schema groups. Users can safely evolve their schemas within the context of the schema group based on desired schema compatibility policy configured at a group level. The service has built in support for popular serialization formats in Avro, Profobuf and JSON schemas, however, users can also store and manage schemas from any serialization system. The service allows users to specify desired compatibility policies for evolution of their schemas but these are employed only for the natively supported serialization systems. + +Along with providing a storage layer for schema, the service also stores and manages additional encoding information in form of codec information. Codecs could correspond to different compression or encryption used while encoding the serialized data at rest. The service generates unique identifiers for schemas and codec information pairs that users may use to tag their data with. + +Please find relevant documentation and usage samples at following links:- +- [Pravega](https://pravega.io) +- [Schema Registry Design Proposal](https://github.com/pravega/schema-registry/wiki/PDP-1:-Schema-Registry) +- [Schema Registry REST API documentation](https://github.com/pravega/schema-registry/wiki/REST-documentation) +- [Installation Guide](https://github.com/pravega/schema-registry/wiki/Installation-Guide) +- [REST API usage examples](https://github.com/pravega/schema-registry/wiki/REST-API-Usage-Samples) +- [Pravega Application usage examples](https://github.com/pravega/schema-registry/wiki/Sample-Usage:-Pravega-Application) + +## Quick Start +---------- +Schema Registry uses Pravega to store the schemas durably. Following steps assumes you have pravega deployed and running. + +### Running Schema Registry +----------------------------- +To start schema registry locally. +``` +1. ./gradlew install +2. cd server/build/distributions/ +3. uncompress schema-registry-.tar or schema-registry-.zip +4. cd schema-registry- +5. change CONTROLLER_URL in conf/schema-registry.config.properties +6. ./bin/schema-registry +``` +The above will start the schema registry server listening on port 9092. + +### Helm Chart +----------------------------- + +Schema registry also includes Helm Charts to deploy Schema Registry service on a Kubernetes cluster. +Detailed instructions can be found [here](https://github.com/pravega/schema-registry/blob/master/charts/schema-registry/README.md) + +``` +helm install charts/schema-registry +``` +The charts can be configured to change the number of replicas, supply TLS configuration, controller uri and other schema registry configurations. + +## Development +----------- + +Fork and clone schema registry repository. + +To build: + +```bash +./gradlew build -x test +``` + +To run the unit and integration tests: + +```bash +./gradlew test +``` + +License +------- +The project is licensed under [Apache 2.0 license](LICENSE-Apache). + + From 493c8b582f0971dc01b28688aab5fe1125698c59 Mon Sep 17 00:00:00 2001 From: shivesh ranjan <13659022+shiveshr@users.noreply.github.com> Date: Mon, 7 Sep 2020 11:32:13 +0530 Subject: [PATCH 5/5] Issue 10: Add README.md (#106) Signed-off-by: Shivesh Ranjan --- README.md | 76 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 000000000..78b409ee9 --- /dev/null +++ b/README.md @@ -0,0 +1,76 @@ + + +# Pravega Schema Registry Repository + +Pravega Schema Registry is the latest service offering from Pravega family. The registry service is designed to store and manage schemas for the unstructured data stored in Pravega streams. The service is designed to not be limited to the data stored in Pravega and can serve as a general purpose management solution for storing and evolving schemas in wide variety of streaming and non streaming use cases. + +It provides RESTful interface to store and manage schemas under schema groups. Users can safely evolve their schemas within the context of the schema group based on desired schema compatibility policy configured at a group level. The service has built in support for popular serialization formats in Avro, Profobuf and JSON schemas, however, users can also store and manage schemas from any serialization system. The service allows users to specify desired compatibility policies for evolution of their schemas but these are employed only for the natively supported serialization systems. + +Along with providing a storage layer for schema, the service also stores and manages additional encoding information in form of codec information. Codecs could correspond to different compression or encryption used while encoding the serialized data at rest. The service generates unique identifiers for schemas and codec information pairs that users may use to tag their data with. + +Please find relevant documentation and usage samples at following links:- +- [Pravega](https://pravega.io) +- [Schema Registry Design Proposal](https://github.com/pravega/schema-registry/wiki/PDP-1:-Schema-Registry) +- [Schema Registry REST API documentation](https://github.com/pravega/schema-registry/wiki/REST-documentation) +- [Installation Guide](https://github.com/pravega/schema-registry/wiki/Installation-Guide) +- [REST API usage examples](https://github.com/pravega/schema-registry/wiki/REST-API-Usage-Samples) +- [Pravega Application usage examples](https://github.com/pravega/schema-registry/wiki/Sample-Usage:-Pravega-Application) + +## Quick Start +---------- +Schema Registry uses Pravega to store the schemas durably. Following steps assumes you have pravega deployed and running. + +### Running Schema Registry +----------------------------- +To start schema registry locally. +``` +1. ./gradlew install +2. cd server/build/distributions/ +3. uncompress schema-registry-.tar or schema-registry-.zip +4. cd schema-registry- +5. change CONTROLLER_URL in conf/schema-registry.config.properties +6. ./bin/schema-registry +``` +The above will start the schema registry server listening on port 9092. + +### Helm Chart +----------------------------- + +Schema registry also includes Helm Charts to deploy Schema Registry service on a Kubernetes cluster. +Detailed instructions can be found [here](https://github.com/pravega/schema-registry/blob/master/charts/schema-registry/README.md) + +``` +helm install charts/schema-registry +``` +The charts can be configured to change the number of replicas, supply TLS configuration, controller uri and other schema registry configurations. + +## Development +----------- + +Fork and clone schema registry repository. + +To build: + +```bash +./gradlew build -x test +``` + +To run the unit and integration tests: + +```bash +./gradlew test +``` + +License +------- +The project is licensed under [Apache 2.0 license](LICENSE-Apache). + +