diff --git a/server/src/main/java/io/pravega/schemaregistry/service/SchemaRegistryService.java b/server/src/main/java/io/pravega/schemaregistry/service/SchemaRegistryService.java index 66d144a7d..a590ded53 100644 --- a/server/src/main/java/io/pravega/schemaregistry/service/SchemaRegistryService.java +++ b/server/src/main/java/io/pravega/schemaregistry/service/SchemaRegistryService.java @@ -24,6 +24,7 @@ import io.pravega.common.util.Retry; import io.pravega.schemaregistry.ResultPage; import io.pravega.schemaregistry.common.FuturesUtility; +import io.pravega.schemaregistry.common.HashUtil; import io.pravega.schemaregistry.common.NameUtil; import io.pravega.schemaregistry.contract.data.BackwardAndForward; import io.pravega.schemaregistry.contract.data.CodecType; @@ -53,6 +54,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.AbstractMap; import java.util.Collections; @@ -267,22 +269,25 @@ public CompletableFuture addSchema(String namespace, String group, // 2. get checker for serialization format. // validate schema against group compatibility policy on schema // 3. conditionally update the schema - return RETRY.runAsync(() -> store.getGroupEtag(namespace, group) - .thenCompose(etag -> - store.getGroupProperties(namespace, group) - .thenCompose(prop -> { - return Futures.exceptionallyComposeExpecting(store.getSchemaVersion(namespace, group, schema), - e -> Exceptions.unwrap(e) instanceof StoreExceptions.DataNotFoundException, - () -> { // Schema doesnt exist. Validate and add it - return validateSchema(namespace, group, schema, prop.getCompatibility()) - .thenCompose(valid -> { - if (!valid) { - throw new IncompatibleSchemaException(String.format("%s is incompatible", schema.getType())); - } - return store.addSchema(namespace, group, schema, prop, etag); - }); - }); - })), executor) + return RETRY.runAsync(() -> + store.getGroupEtag(namespace, group) + .thenCompose(etag -> + store.getGroupProperties(namespace, group) + .thenCompose(prop -> { + return Futures.exceptionallyComposeExpecting(store.getSchemaVersion(namespace, group, schema, getFingerprint(schema)), + e -> Exceptions.unwrap(e) instanceof StoreExceptions.DataNotFoundException, + () -> { // Schema doesnt exist. Validate and add it + return validateSchema(namespace, group, schema, prop.getCompatibility()) + .thenCompose(valid -> { + if (!valid) { + throw new IncompatibleSchemaException(String.format("%s is incompatible", schema.getType())); + } + // we will compute the fingerprint from normalized form. + return store.addSchema(namespace, group, schemaInfo, schema, + getFingerprint(schema), prop, etag); + }); + }); + })), executor) .whenComplete((r, e) -> { if (e == null) { log.debug("Group {} {}, schema {} added successfully.", namespace, group, schema.getType()); @@ -491,7 +496,7 @@ public CompletableFuture getSchemaVersion(String namespace, String log.debug("Group {} {}, getSchemaVersion for {}.", namespace, group, schemaInfo.getType()); SchemaInfo schema = normalizeSchemaBinary(schemaInfo); - return store.getSchemaVersion(namespace, group, schema) + return store.getSchemaVersion(namespace, group, schema, getFingerprint(schema)) .whenComplete((r, e) -> { if (e == null) { log.debug("Group {} {}, version = {}.", namespace, group, r); @@ -854,7 +859,8 @@ private SchemaInfo normalizeSchemaBinary(SchemaInfo schemaInfo) { // in alphabetical order. This ensures that identical schemas with different order of fields are // treated to be equal. JsonNode jsonNode = OBJECT_MAPPER.readTree(schemaString); - schemaBinary = ByteBuffer.wrap(OBJECT_MAPPER.writeValueAsString(jsonNode).getBytes(Charsets.UTF_8)); + Object obj = OBJECT_MAPPER.treeToValue(jsonNode, Object.class); + schemaBinary = ByteBuffer.wrap(OBJECT_MAPPER.writeValueAsString(obj).getBytes(Charsets.UTF_8)); break; case Any: break; @@ -967,11 +973,15 @@ public CompletableFuture> getSchemaReferences(String na return store.getGroupsUsing(namespace, schema) .thenCompose(groups -> Futures.allOfWithResults( groups.stream().collect(Collectors.toMap(x -> x, x -> - Futures.exceptionallyExpecting(store.getSchemaVersion(namespace, x, schema), + Futures.exceptionallyExpecting(store.getSchemaVersion(namespace, x, schema, getFingerprint(schema)), e -> Exceptions.unwrap(e) instanceof StoreExceptions.DataNotFoundException, EMPTY_VERSION)))) .thenApply(result -> { return result.entrySet().stream().filter(x -> !x.getValue().equals(EMPTY_VERSION)) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); })); } + + private BigInteger getFingerprint(SchemaInfo schemaInfo) { + return HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + } } diff --git a/server/src/main/java/io/pravega/schemaregistry/storage/SchemaStore.java b/server/src/main/java/io/pravega/schemaregistry/storage/SchemaStore.java index d8c6ea617..c46bc58be 100644 --- a/server/src/main/java/io/pravega/schemaregistry/storage/SchemaStore.java +++ b/server/src/main/java/io/pravega/schemaregistry/storage/SchemaStore.java @@ -22,6 +22,7 @@ import io.pravega.schemaregistry.contract.data.VersionInfo; import javax.annotation.Nullable; +import java.math.BigInteger; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -235,11 +236,15 @@ public interface SchemaStore { * @param namespace namespace * @param group group * @param schemaInfo schema to add + * @param normalized normalized form of schema to add. + * @param fingerprint 256 bit sha hash of schema binary. + * Two schema binary representation will be considered identical if their fingerprints match. * @param prop group properties applied at the time of schema addition. * @param etag entity tag for the group. * @return Completablefuture that holds version info for the schema that is added. */ - CompletableFuture addSchema(String namespace, String group, SchemaInfo schemaInfo, GroupProperties prop, Etag etag); + CompletableFuture addSchema(String namespace, String group, SchemaInfo schemaInfo, SchemaInfo normalized, + BigInteger fingerprint, GroupProperties prop, Etag etag); /** * Get the version corresponding to the schema. @@ -247,9 +252,11 @@ public interface SchemaStore { * @param namespace namespace * @param group group * @param schemaInfo schemainfo + * @param fingerprint 256 bit sha hash of schema binary. + * Two schema binary representation will be considered identical if their fingerprints match. * @return Completablefuture that holds versioninfo for the schema. */ - CompletableFuture getSchemaVersion(String namespace, String group, SchemaInfo schemaInfo); + CompletableFuture getSchemaVersion(String namespace, String group, SchemaInfo schemaInfo, BigInteger fingerprint); /** * Get the encoding id corresponding to versioninfo and codectype. It returns Etag for the group if the encoding id diff --git a/server/src/main/java/io/pravega/schemaregistry/storage/impl/SchemaStoreImpl.java b/server/src/main/java/io/pravega/schemaregistry/storage/impl/SchemaStoreImpl.java index 9b36f3b15..2004c425b 100644 --- a/server/src/main/java/io/pravega/schemaregistry/storage/impl/SchemaStoreImpl.java +++ b/server/src/main/java/io/pravega/schemaregistry/storage/impl/SchemaStoreImpl.java @@ -29,6 +29,7 @@ import io.pravega.schemaregistry.storage.impl.schemas.Schemas; import javax.annotation.Nullable; +import java.math.BigInteger; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -130,14 +131,16 @@ public CompletableFuture getLatestSchemaVersion(String namesp } @Override - public CompletableFuture addSchema(String namespace, String groupId, SchemaInfo schemaInfo, GroupProperties prop, Etag etag) { - return schemas.addSchema(schemaInfo, namespace, groupId) - .thenCompose(v -> getGroup(namespace, groupId).thenCompose(grp -> grp.addSchema(schemaInfo, prop, etag))); + public CompletableFuture addSchema(String namespace, String groupId, SchemaInfo schemaInfo, SchemaInfo normalized, + BigInteger fingerprint, GroupProperties prop, Etag etag) { + // Store normalized form of schema with the global schemas while the original form is stored within the group. + return schemas.addSchema(normalized, namespace, groupId) + .thenCompose(v -> getGroup(namespace, groupId).thenCompose(grp -> grp.addSchema(schemaInfo, fingerprint, prop, etag))); } @Override - public CompletableFuture getSchemaVersion(String namespace, String groupId, SchemaInfo schemaInfo) { - return getGroup(namespace, groupId).thenCompose(grp -> grp.getVersion(schemaInfo)); + public CompletableFuture getSchemaVersion(String namespace, String groupId, SchemaInfo schemaInfo, BigInteger fingerprint) { + return getGroup(namespace, groupId).thenCompose(grp -> grp.getVersion(schemaInfo, fingerprint)); } @Override diff --git a/server/src/main/java/io/pravega/schemaregistry/storage/impl/group/Group.java b/server/src/main/java/io/pravega/schemaregistry/storage/impl/group/Group.java index 6e6da05b0..81143dea5 100644 --- a/server/src/main/java/io/pravega/schemaregistry/storage/impl/group/Group.java +++ b/server/src/main/java/io/pravega/schemaregistry/storage/impl/group/Group.java @@ -22,7 +22,6 @@ import io.pravega.common.util.ByteArraySegment; import io.pravega.common.util.Retry; import io.pravega.schemaregistry.common.Either; -import io.pravega.schemaregistry.common.HashUtil; import io.pravega.schemaregistry.contract.data.CodecType; import io.pravega.schemaregistry.contract.data.Compatibility; import io.pravega.schemaregistry.contract.data.EncodingId; @@ -43,7 +42,6 @@ import java.math.BigInteger; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.Iterator; @@ -362,8 +360,7 @@ private CompletableFuture getSchema(int id, boolean throwOnDeleted) }); } - public CompletableFuture getVersion(SchemaInfo schemaInfo) { - BigInteger fingerprint = getFingerprint(schemaInfo); + public CompletableFuture getVersion(SchemaInfo schemaInfo, BigInteger fingerprint) { SchemaFingerprintKey key = new SchemaFingerprintKey(fingerprint); return groupTable.getEntry(key, SchemaVersionList.class) @@ -521,10 +518,10 @@ public CompletableFuture> getHistory(String type) { .collect(Collectors.toList())); } - public CompletableFuture addSchema(SchemaInfo schemaInfo, GroupProperties prop, Etag etag) { + public CompletableFuture addSchema(SchemaInfo schemaInfo, BigInteger fingerprint, GroupProperties prop, Etag etag) { List keys = new ArrayList<>(); keys.add(LATEST_SCHEMAS_KEY); - SchemaFingerprintKey schemaFingerprintKey = new SchemaFingerprintKey(getFingerprint(schemaInfo)); + SchemaFingerprintKey schemaFingerprintKey = new SchemaFingerprintKey(fingerprint); keys.add(schemaFingerprintKey); // add or upadte following entries: @@ -653,10 +650,6 @@ public CompletableFuture getGroupProperties() { }); } - private BigInteger getFingerprint(SchemaInfo schemaInfo) { - return HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); - } - private CompletableFuture generateNewEncodingId(VersionInfo versionInfo, String codecType, Etag etag) { return getSchema(versionInfo.getId(), true) .thenCompose(schema -> getCodecTypeNames() @@ -703,14 +696,21 @@ public CompletableFuture> getEncodingId(VersionInfo ver private CompletableFuture findVersion(List versions, SchemaInfo toFind) { AtomicReference found = new AtomicReference<>(); Iterator iterator = versions.iterator(); - return Futures.loop(() -> { - return iterator.hasNext() && found.get() == null; - }, () -> { + return Futures.loop(() -> iterator.hasNext() && found.get() == null, () -> { VersionInfo version = iterator.next(); return Futures.exceptionallyExpecting(getSchema(version.getId(), true) .thenAccept(schema -> { - if (Arrays.equals(schema.getSchemaData().array(), toFind.getSchemaData().array()) - && schema.getType().equals(toFind.getType()) && schema.getSerializationFormat().equals(toFind.getSerializationFormat())) { + // Do note that we store the user supplied schema in its original avatar. While the fingerprint + // is computed on the normalized form. So when we fetch the schemas that have identical fingerprints, + // we will only compare type name and format and declare two schemas equal. + // We can do this with fair confidence because we use the sha 256 hash for fingerprints. + // The probability of collision on two non identical byte arrays to produce same fingerprint + // is next to impossible. + // However, since we also deal with composite schemas (e.g. protobuf file descriptor set or avro union) + // where same schema could include definition for multiple objects and the type name distinguishes + // different entities, we will still need to compare type and format if the schema binary has identical + // fingerprint before we consider two schemas to be identical. + if (schema.getType().equals(toFind.getType()) && schema.getSerializationFormat().equals(toFind.getSerializationFormat())) { found.set(version); } }), e -> Exceptions.unwrap(e) instanceof StoreExceptions.DataNotFoundException, null); diff --git a/server/src/test/java/io/pravega/schemaregistry/service/SchemaRegistryServiceTest.java b/server/src/test/java/io/pravega/schemaregistry/service/SchemaRegistryServiceTest.java index c0aadb4b2..76a200c04 100644 --- a/server/src/test/java/io/pravega/schemaregistry/service/SchemaRegistryServiceTest.java +++ b/server/src/test/java/io/pravega/schemaregistry/service/SchemaRegistryServiceTest.java @@ -9,6 +9,7 @@ */ package io.pravega.schemaregistry.service; +import com.google.common.base.Charsets; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.pravega.common.Exceptions; @@ -32,6 +33,7 @@ import io.pravega.schemaregistry.storage.ContinuationToken; import io.pravega.schemaregistry.storage.Etag; import io.pravega.schemaregistry.storage.SchemaStore; +import io.pravega.schemaregistry.storage.SchemaStoreFactory; import io.pravega.schemaregistry.storage.StoreExceptions; import io.pravega.schemaregistry.storage.impl.group.InMemoryGroupTable; import io.pravega.test.common.AssertExtensions; @@ -49,6 +51,7 @@ import java.util.concurrent.ScheduledExecutorService; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -210,11 +213,11 @@ public void testAddSchema() { SchemaInfo schemaInfo = new SchemaInfo("type", SerializationFormat.custom("custom1"), ByteBuffer.wrap(schemaData), ImmutableMap.of()); - VersionInfo versionInfo = new VersionInfo("type", 5, 7); - doAnswer(x -> CompletableFuture.completedFuture(versionInfo)).when(store).addSchema(any(), anyString(), any(), - any(), any()); + VersionInfo versionInfo = new VersionInfo("objectType", 5, 7); + doAnswer(x -> CompletableFuture.completedFuture(versionInfo)).when(store).addSchema(any(), anyString(), any(), any(), + any(), any(), any()); doAnswer(x -> CompletableFuture.completedFuture(versionInfo)).when(store).getSchemaVersion(any(), anyString(), - any()); + any(), any()); VersionInfo versionInfo1 = service.addSchema(null, "mygroup", schemaInfo).join(); assertEquals(7, versionInfo1.getId()); // SerializationFormatMismatch Exception @@ -226,7 +229,7 @@ public void testAddSchema() { any(), anyString()); doAnswer(x -> Futures.failedFuture( StoreExceptions.create(StoreExceptions.Type.DATA_NOT_FOUND, "Group Not Found"))).when( - store).getSchemaVersion(any(), anyString(), any()); + store).getSchemaVersion(any(), anyString(), any(), any()); AssertExtensions.assertThrows("An exception should have been thrown", () -> service.addSchema(null, "mygroup", schemaInfo).join(), e -> e instanceof SerializationFormatMismatchException); @@ -240,7 +243,7 @@ public void testAddSchema() { any(), anyString()); doAnswer(x -> Futures.failedFuture( StoreExceptions.create(StoreExceptions.Type.DATA_NOT_FOUND, "Group Not Found"))).when( - store).getSchemaVersion(any(), anyString(), any()); + store).getSchemaVersion(any(), anyString(), any(), any()); schemaData = new byte[1]; SchemaInfo schemaInfo1 = new SchemaInfo("type1", SerializationFormat.custom("custom1"), ByteBuffer.wrap(schemaData), @@ -253,7 +256,7 @@ public void testAddSchema() { // CheckCompatibility will fail due to differing types. allowMultipleTypes is false. AssertExtensions.assertThrows("An exception should have been thrown", () -> service.addSchema(null, "mygroup", schemaInfo).join(), e -> e instanceof IncompatibleSchemaException); // Runtime Exception - doAnswer(x -> Futures.failedFuture(new RuntimeException())).when(store).getSchemaVersion(any(), anyString(), + doAnswer(x -> Futures.failedFuture(new RuntimeException())).when(store).getSchemaVersion(any(), anyString(), any(), any()); AssertExtensions.assertThrows("An exception should have been thrown", () -> service.addSchema(null, "mygroup", schemaInfo).join(), e -> e instanceof RuntimeException); @@ -407,7 +410,7 @@ public void testGetGroupHistory() { @Test public void testGetSchemaVersion() { VersionInfo versionInfo = new VersionInfo("objectTYpe", 5, 7); - doAnswer(x -> CompletableFuture.completedFuture(versionInfo)).when(store).getSchemaVersion(any(), anyString(), + doAnswer(x -> CompletableFuture.completedFuture(versionInfo)).when(store).getSchemaVersion(any(), anyString(), any(), any()); byte[] schemaData = new byte[0]; io.pravega.schemaregistry.contract.data.SchemaInfo schemaInfo = @@ -419,12 +422,12 @@ public void testGetSchemaVersion() { //GroupNotFoundException doAnswer(x -> Futures.failedFuture( StoreExceptions.create(StoreExceptions.Type.DATA_NOT_FOUND, "Group NotFound"))).when( - store).getSchemaVersion(any(), anyString(), any()); + store).getSchemaVersion(any(), anyString(), any(), any()); AssertExtensions.assertThrows("An Exception should have been thrown", () -> service.getSchemaVersion(null, "mygroup", schemaInfo).join(), e -> e instanceof StoreExceptions.DataNotFoundException); //Runtime Exception - doAnswer(x -> Futures.failedFuture(new RuntimeException())).when(store).getSchemaVersion(any(), anyString(), + doAnswer(x -> Futures.failedFuture(new RuntimeException())).when(store).getSchemaVersion(any(), anyString(), any(), any()); AssertExtensions.assertThrows("An Exception should have been thrown", () -> service.getSchemaVersion(null, "mygroup", schemaInfo).join(), e -> e instanceof RuntimeException); @@ -557,7 +560,7 @@ public void testGetSchemaReferences() { List groupNameList = new ArrayList<>(); groupNameList.add(groupName); doAnswer(x -> CompletableFuture.completedFuture(groupNameList)).when(store).getGroupsUsing(any(), any()); - doAnswer(x -> CompletableFuture.completedFuture(versionInfo)).when(store).getSchemaVersion(any(), anyString(), + doAnswer(x -> CompletableFuture.completedFuture(versionInfo)).when(store).getSchemaVersion(any(), anyString(), any(), any()); Map map = service.getSchemaReferences(null, schemaInfo).join(); assertTrue(map.get(groupName).equals(versionInfo)); @@ -650,4 +653,58 @@ public void testDeleteUsingTypeAndVersion() { () -> service.deleteSchema(null, groupName, schemaName, version).join(), e -> e instanceof RuntimeException); } + + @Test + public void testSchemaNormalization() { + SchemaStore schemaStore = SchemaStoreFactory.createInMemoryStore(executor); + SchemaRegistryService service = new SchemaRegistryService(schemaStore, executor); + String namespace = "n"; + String group = "g"; + service.createGroup(namespace, group, + GroupProperties.builder().allowMultipleTypes(false).properties(ImmutableMap.builder().build()) + .serializationFormat(SerializationFormat.Json) + .compatibility(Compatibility.allowAny()).build()).join(); + + String jsonSchemaString = "{" + + "\"title\": \"Person\", " + + "\"type\": \"object\", " + + "\"properties\": { " + + "\"name\": {" + + "\"type\": \"string\"" + + "}," + + "\"age\": {" + + "\"type\": \"integer\", \"minimum\": 0" + + "}" + + "}" + + "}"; + String jsonSchemaString2 = "{" + + "\"title\": \"Person\", " + + "\"type\": \"object\", " + + "\"properties\": { " + + "\"age\": {" + + "\"type\": \"integer\", \"minimum\": 0" + + "}," + + "\"name\": {" + + "\"type\": \"string\"" + + "}" + + "}" + + "}"; + SchemaInfo original = SchemaInfo.builder().type("person").serializationFormat(SerializationFormat.Json) + .schemaData(ByteBuffer.wrap(jsonSchemaString.getBytes(Charsets.UTF_8))) + .properties(ImmutableMap.of()).build(); + VersionInfo v = service.addSchema(namespace, group, original).join(); + SchemaInfo schema = service.getSchema(namespace, group, v.getId()).join(); + assertEquals(schema, original); + + // check with different order + SchemaInfo secondOrder = SchemaInfo.builder().type("person").serializationFormat(SerializationFormat.Json) + .schemaData(ByteBuffer.wrap(jsonSchemaString2.getBytes(Charsets.UTF_8))) + .properties(ImmutableMap.of()).build(); + VersionInfo v2 = service.addSchema(namespace, group, secondOrder).join(); + // add should have been idempotent + assertEquals(v2, v); + + schema = service.getSchema(namespace, group, v.getId()).join(); + assertNotEquals(schema, secondOrder); + } } \ No newline at end of file diff --git a/server/src/test/java/io/pravega/schemaregistry/storage/impl/group/GroupPravegaTest.java b/server/src/test/java/io/pravega/schemaregistry/storage/impl/group/GroupPravegaTest.java index 3fc4d5382..7cbeb284e 100644 --- a/server/src/test/java/io/pravega/schemaregistry/storage/impl/group/GroupPravegaTest.java +++ b/server/src/test/java/io/pravega/schemaregistry/storage/impl/group/GroupPravegaTest.java @@ -40,6 +40,7 @@ import org.junit.Before; import org.junit.Test; +import java.math.BigInteger; import java.net.URI; import java.nio.ByteBuffer; import java.util.Arrays; @@ -140,13 +141,15 @@ public void testGetLatestSchemas() { SchemaInfo schemaInfo1 = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, etag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, etag).join(); schemaWithVersionList = pravegaKeyValueGroups.getGroup(null, groupName).join().getLatestSchemas().join(); assertEquals(1, schemaWithVersionList.size()); assertEquals(anygroup, schemaWithVersionList.get(0).getSchemaInfo().getType()); // two schemas etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo1, groupProperties, etag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo1, + HashUtil.getFingerprint(schemaInfo1.getSchemaData().array()), groupProperties, etag).join(); schemaWithVersionList = pravegaKeyValueGroups.getGroup(null, groupName).join().getLatestSchemas().join(); assertEquals(2, schemaWithVersionList.size()); assertEquals(anygroup, schemaWithVersionList.get(0).getSchemaInfo().getType()); @@ -167,7 +170,8 @@ public void testAddSchema() { SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, etag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), + groupProperties, etag).join(); GroupsValue gv = tableStore.getEntry(GROUPS, new NamespaceAndGroup(null, groupName).toBytes(), GroupsValue::fromBytes).join().getRecord(); // one schema @@ -201,16 +205,17 @@ public void testAddSchema() { SchemaInfo schemaInfo1 = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo1, groupProperties, etag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo1, HashUtil.getFingerprint(schemaInfo1.getSchemaData().array()), + groupProperties, etag).join(); VersionInfo versionInfo1 = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion( - schemaInfo1).join(); + schemaInfo1, HashUtil.getFingerprint(schemaInfo1.getSchemaData().array())).join(); SchemaInfo schemaInfo2 = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo2, groupProperties, etag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo2, HashUtil.getFingerprint(schemaInfo2.getSchemaData().array()), groupProperties, etag).join(); VersionInfo versionInfo2 = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion( - schemaInfo2).join(); + schemaInfo2, HashUtil.getFingerprint(schemaInfo1.getSchemaData().array())).join(); //versionInfo key schemaRecord = tableStore.getEntry(String.format(TABLE_NAME_FORMAT, gv.getId()), new TableKeySerializer().toBytes(new TableRecords.SchemaIdKey(versionInfo1.getId())), @@ -259,14 +264,16 @@ public void testGetSchemas() { SchemaInfo schemaInfo1 = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, etag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, etag).join(); List schemaWitheVersionList = pravegaKeyValueGroups.getGroup(null, groupName).join().getSchemas().join(); assertEquals(1, schemaWitheVersionList.size()); assertEquals(anygroup, schemaWitheVersionList.get(0).getSchemaInfo().getType()); //two schemas etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo1, groupProperties, etag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo1, + HashUtil.getFingerprint(schemaInfo1.getSchemaData().array()), groupProperties, etag).join(); schemaWitheVersionList = pravegaKeyValueGroups.getGroup(null, groupName).join().getSchemas().join(); assertEquals(2, schemaWitheVersionList.size()); assertEquals(anygroup, schemaWitheVersionList.get(0).getSchemaInfo().getType()); @@ -280,7 +287,8 @@ public void testGetSchemas() { SchemaInfo schemaInfo2 = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo2, groupProperties, etag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo2, + HashUtil.getFingerprint(schemaInfo2.getSchemaData().array()), groupProperties, etag).join(); schemaWitheVersionList = pravegaKeyValueGroups.getGroup(null, groupName).join().getSchemas(anygroup, 1).join(); assertEquals(1, schemaWitheVersionList.size()); assertEquals(ImmutableMap.of(), @@ -305,12 +313,15 @@ public void testGetVersion() { SchemaInfo schemaInfo1 = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, etag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, etag).join(); etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo1, groupProperties, etag).join(); - VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo1, + HashUtil.getFingerprint(schemaInfo1.getSchemaData().array()), groupProperties, etag).join(); + VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array())).join(); VersionInfo versionInfo1 = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion( - schemaInfo1).join(); + schemaInfo1, HashUtil.getFingerprint(schemaInfo1.getSchemaData().array())).join(); TableRecords.SchemaVersionList schemaVersionList = tableStore.getEntry( String.format(TABLE_NAME_FORMAT, gv.getId()), new TableKeySerializer().toBytes( new TableRecords.SchemaFingerprintKey( @@ -391,8 +402,11 @@ public void testCreateEncodingId() { SchemaInfo schemaInfo = new SchemaInfo("anygroup", SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo, + fingerprint).join(); pravegaKeyValueGroups.getGroup(null, groupName).join().addCodecType(new CodecType("gzip")).join(); eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); EncodingId encodingId = pravegaKeyValueGroups.getGroup(null, groupName).join().createEncodingId(versionInfo, @@ -423,8 +437,11 @@ public void testGetEncodingInfo() { SchemaInfo schemaInfo = new SchemaInfo("anygroup", SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo, + fingerprint).join(); pravegaKeyValueGroups.getGroup(null, groupName).join().addCodecType(new CodecType("gzip")).join(); eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); EncodingId encodingId = pravegaKeyValueGroups.getGroup(null, groupName).join().createEncodingId(versionInfo, @@ -454,8 +471,11 @@ public void testGetEncodingId() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo, + fingerprint).join(); pravegaKeyValueGroups.getGroup(null, groupName).join().addCodecType(new CodecType("gzip")).join(); Either idEtagEither = pravegaKeyValueGroups.getGroup(null, groupName).join().getEncodingId( versionInfo, @@ -499,15 +519,18 @@ public void testGetLatestSchemaVersion() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); schemaInfo = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); SchemaWithVersion schemaWithVersion = pravegaKeyValueGroups.getGroup( null, groupName).join().getLatestSchemaVersion().join(); assertEquals(anygroup1, schemaWithVersion.getSchemaInfo().getType()); @@ -540,12 +563,14 @@ public void testGetHistory() { SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Avro, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); schemaData = new byte[5]; schemaInfo = new SchemaInfo(anygroup1, SerializationFormat.Avro, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); List groupHistoryRecords = pravegaKeyValueGroups.getGroup(null, groupName).join().getHistory().join(); assertEquals(2, groupHistoryRecords.size()); @@ -556,7 +581,8 @@ public void testGetHistory() { schemaInfo = new SchemaInfo(anygroup1, SerializationFormat.Avro, ByteBuffer.wrap(schemaData1), ImmutableMap.of()); eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); groupHistoryRecords = pravegaKeyValueGroups.getGroup(null, groupName).join().getHistory(anygroup1).join(); assertEquals(2, groupHistoryRecords.size()); assertTrue(Arrays.equals(ByteBuffer.wrap(schemaData).array(), @@ -638,13 +664,17 @@ public void testDeleteSchema() { SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Avro, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); schemaData = new byte[5]; schemaInfo = new SchemaInfo(anygroup1, SerializationFormat.Avro, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo, + fingerprint).join(); Etag etag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); pravegaKeyValueGroups.getGroup(null, groupName).join().deleteSchema(versionInfo.getId(), etag).join(); TableRecords.VersionDeletedRecord versionDeletedRecordKey = new TableRecords.VersionDeletedRecord( @@ -687,8 +717,11 @@ public void testGetSchema() { SchemaInfo schemaInfo = new SchemaInfo("anygroup", SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo1 = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo1 = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo, + fingerprint).join(); SchemaInfo schemaInfo1 = pravegaKeyValueGroups.getGroup(null, groupName).join().getSchema( versionInfo1.getId()).join(); assertEquals(schemaInfo, schemaInfo1); @@ -721,8 +754,11 @@ public void testGetSchemaUsingTypeAndVersion() { SchemaInfo schemaInfo = new SchemaInfo("anygroup", SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); Etag eTag = pravegaKeyValueGroups.getGroup(null, groupName).join().getCurrentEtag().join(); - pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo1 = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + pravegaKeyValueGroups.getGroup(null, groupName).join().addSchema(schemaInfo, + fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo1 = pravegaKeyValueGroups.getGroup(null, groupName).join().getVersion(schemaInfo, + fingerprint).join(); SchemaInfo schemaInfo1 = pravegaKeyValueGroups.getGroup(null, groupName).join().getSchema( versionInfo1.getType(), versionInfo1.getVersion()).join(); diff --git a/server/src/test/java/io/pravega/schemaregistry/storage/impl/group/GroupTest.java b/server/src/test/java/io/pravega/schemaregistry/storage/impl/group/GroupTest.java index 97133459b..5ce47309e 100644 --- a/server/src/test/java/io/pravega/schemaregistry/storage/impl/group/GroupTest.java +++ b/server/src/test/java/io/pravega/schemaregistry/storage/impl/group/GroupTest.java @@ -11,6 +11,7 @@ import com.google.common.collect.ImmutableMap; import io.pravega.schemaregistry.common.Either; +import io.pravega.schemaregistry.common.HashUtil; import io.pravega.schemaregistry.contract.data.CodecType; import io.pravega.schemaregistry.contract.data.Compatibility; import io.pravega.schemaregistry.contract.data.EncodingId; @@ -29,6 +30,7 @@ import org.junit.Before; import org.junit.Test; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; @@ -111,9 +113,9 @@ public void testGetTypes() { ImmutableMap.of()); SchemaInfo schemaInfo1 = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); eTag = inMemoryGroup.getCurrentEtag().join(); - inMemoryGroup.addSchema(schemaInfo1, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo1, HashUtil.getFingerprint(schemaInfo1.getSchemaData().array()), groupProperties, eTag).join(); schemaWithVersionList = inMemoryGroup.getLatestSchemas().join(); assertEquals(2, schemaWithVersionList.size()); assertEquals(anygroup, schemaWithVersionList.get(0).getSchemaInfo().getType()); @@ -132,7 +134,7 @@ public void testAddSchema() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); List tableValueListEtag = inMemoryGroupTable.getTable().entrySet().stream().filter( x -> x.getKey() instanceof TableRecords.Etag).map( x -> x.getValue().getValue()).collect( @@ -189,9 +191,9 @@ public void testGetSchemas() { ImmutableMap.of()); SchemaInfo schemaInfo1 = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); eTag = inMemoryGroup.getCurrentEtag().join(); - inMemoryGroup.addSchema(schemaInfo1, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo1, HashUtil.getFingerprint(schemaInfo1.getSchemaData().array()), groupProperties, eTag).join(); List schemaWithVersionListWithToken = inMemoryGroup.getSchemas().join(); assertEquals(2, schemaWithVersionListWithToken.size()); assertEquals(SerializationFormat.Custom, @@ -236,11 +238,11 @@ public void testGetVersion() { ImmutableMap.of()); SchemaInfo schemaInfo1 = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); eTag = inMemoryGroup.getCurrentEtag().join(); - inMemoryGroup.addSchema(schemaInfo1, groupProperties, eTag).join(); - VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo).join(); - VersionInfo versionInfo1 = inMemoryGroup.getVersion(schemaInfo1).join(); + inMemoryGroup.addSchema(schemaInfo1, HashUtil.getFingerprint(schemaInfo1.getSchemaData().array()), groupProperties, eTag).join(); + VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array())).join(); + VersionInfo versionInfo1 = inMemoryGroup.getVersion(schemaInfo1, HashUtil.getFingerprint(schemaInfo1.getSchemaData().array())).join(); List tableValueListVersionInfo = inMemoryGroupTable.getTable().entrySet().stream().filter( x -> x.getKey() instanceof TableRecords.SchemaIdKey).map( @@ -278,8 +280,9 @@ public void testCreateEncodingId() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + inMemoryGroup.addSchema(schemaInfo, fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo, fingerprint).join(); inMemoryGroup.addCodecType(new CodecType("gzip")).join(); eTag = inMemoryGroup.getCurrentEtag().join(); EncodingId encodingId = inMemoryGroup.createEncodingId(versionInfo, "gzip", eTag).join(); @@ -288,8 +291,9 @@ public void testCreateEncodingId() { schemaData = new byte[5]; schemaInfo = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); - versionInfo = inMemoryGroup.getVersion(schemaInfo).join(); + fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + inMemoryGroup.addSchema(schemaInfo, fingerprint, groupProperties, eTag).join(); + versionInfo = inMemoryGroup.getVersion(schemaInfo, fingerprint).join(); eTag = inMemoryGroup.getCurrentEtag().join(); EncodingId encodingId1 = inMemoryGroup.createEncodingId(versionInfo, "snappy", eTag).join(); List encodingInfoRecordList = inMemoryGroupTable.getTable().entrySet().stream().filter( @@ -316,8 +320,9 @@ public void testGetEncodingInfo() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + inMemoryGroup.addSchema(schemaInfo, fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo, fingerprint).join(); inMemoryGroup.addCodecType(new CodecType("gzip")).join(); eTag = inMemoryGroup.getCurrentEtag().join(); EncodingId encodingId = inMemoryGroup.createEncodingId(versionInfo, "gzip", eTag).join(); @@ -343,8 +348,9 @@ public void testGetLatestSchemaVersion() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + inMemoryGroup.addSchema(schemaInfo, fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo, fingerprint).join(); schemaWithVersion = inMemoryGroup.getLatestSchemaVersion().join(); assertEquals(versionInfo, schemaWithVersion.getVersionInfo()); assertEquals(anygroup, schemaWithVersion.getSchemaInfo().getType()); @@ -354,8 +360,9 @@ public void testGetLatestSchemaVersion() { schemaData = new byte[5]; SchemaInfo schemaInfo1 = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo1, groupProperties, eTag).join(); - VersionInfo versionInfo1 = inMemoryGroup.getVersion(schemaInfo1).join(); + BigInteger fingerprint1 = HashUtil.getFingerprint(schemaInfo1.getSchemaData().array()); + inMemoryGroup.addSchema(schemaInfo1, fingerprint1, groupProperties, eTag).join(); + VersionInfo versionInfo1 = inMemoryGroup.getVersion(schemaInfo1, fingerprint1).join(); // null schemaWithVersion = inMemoryGroup.getLatestSchemaVersion("anygroup2").join(); assertNull(schemaWithVersion); @@ -406,12 +413,12 @@ public void testGetHistory() { byte[] schemaData = new byte[3]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Avro, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); eTag = inMemoryGroup.getCurrentEtag().join(); schemaData = new byte[5]; schemaInfo = new SchemaInfo(anygroup1, SerializationFormat.Avro, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); List groupHistoryRecords = inMemoryGroup.getHistory().join(); assertEquals(2, groupHistoryRecords.size()); assertEquals(anygroup, groupHistoryRecords.get(0).getSchemaInfo().getType()); @@ -421,7 +428,7 @@ public void testGetHistory() { schemaInfo = new SchemaInfo(anygroup1, SerializationFormat.Avro, ByteBuffer.wrap(schemaData1), ImmutableMap.of()); eTag = inMemoryGroup.getCurrentEtag().join(); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); groupHistoryRecords = inMemoryGroup.getHistory(anygroup1).join(); assertEquals(2, groupHistoryRecords.size()); assertTrue(Arrays.equals(ByteBuffer.wrap(schemaData).array(), @@ -471,8 +478,8 @@ public void testGetEncodingId() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); + VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array())).join(); inMemoryGroup.addCodecType(new CodecType("gzip")).join(); Either idEtagEither = inMemoryGroup.getEncodingId(versionInfo, "gzip").join(); assertTrue(idEtagEither.isRight()); @@ -484,8 +491,8 @@ public void testGetEncodingId() { schemaData = new byte[5]; schemaInfo = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag); - VersionInfo versionInfo1 = inMemoryGroup.getVersion(schemaInfo).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag); + VersionInfo versionInfo1 = inMemoryGroup.getVersion(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array())).join(); eTag = inMemoryGroup.getCurrentEtag().join(); EncodingId encodingId1 = inMemoryGroup.createEncodingId(versionInfo1, "snappy", eTag).join(); idEtagEither = inMemoryGroup.getEncodingId(versionInfo, "gzip").join(); @@ -508,13 +515,13 @@ public void testDeleteSchema() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag).join(); eTag = inMemoryGroup.getCurrentEtag().join(); schemaData = new byte[5]; schemaInfo = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag); - VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo).join(); + inMemoryGroup.addSchema(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array()), groupProperties, eTag); + VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo, HashUtil.getFingerprint(schemaInfo.getSchemaData().array())).join(); eTag = inMemoryGroup.getCurrentEtag().join(); inMemoryGroup.deleteSchema(versionInfo.getId(), eTag).join(); List deletedOrdinalList = inMemoryGroupTable.getTable().entrySet().stream().filter( @@ -542,8 +549,9 @@ public void testGetSchema() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + inMemoryGroup.addSchema(schemaInfo, fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo, fingerprint).join(); SchemaInfo schemaInfo1 = inMemoryGroup.getSchema(versionInfo.getId()).join(); assertEquals(schemaInfo, schemaInfo1); List schemaRecordValues = inMemoryGroupTable.getTable().entrySet().stream().filter( @@ -569,8 +577,9 @@ public void testGetSchemaUsingTypeAndVersion() { byte[] schemaData = new byte[0]; SchemaInfo schemaInfo = new SchemaInfo(anygroup, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo, groupProperties, eTag).join(); - VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo).join(); + BigInteger fingerprint = HashUtil.getFingerprint(schemaInfo.getSchemaData().array()); + inMemoryGroup.addSchema(schemaInfo, fingerprint, groupProperties, eTag).join(); + VersionInfo versionInfo = inMemoryGroup.getVersion(schemaInfo, fingerprint).join(); SchemaInfo schemaInfo1 = inMemoryGroup.getSchema(versionInfo.getType(), versionInfo.getVersion()).join(); assertEquals(schemaInfo, schemaInfo1); // testing with 2 schemas @@ -578,8 +587,8 @@ public void testGetSchemaUsingTypeAndVersion() { schemaData = new byte[5]; SchemaInfo schemaInfo2 = new SchemaInfo(anygroup1, SerializationFormat.Custom, ByteBuffer.wrap(schemaData), ImmutableMap.of()); - inMemoryGroup.addSchema(schemaInfo2, groupProperties, eTag); - VersionInfo versionInfo1 = inMemoryGroup.getVersion(schemaInfo2).join(); + inMemoryGroup.addSchema(schemaInfo2, HashUtil.getFingerprint(schemaInfo2.getSchemaData().array()), groupProperties, eTag); + VersionInfo versionInfo1 = inMemoryGroup.getVersion(schemaInfo2, HashUtil.getFingerprint(schemaInfo2.getSchemaData().array())).join(); SchemaInfo schemaInfo3 = inMemoryGroup.getSchema(versionInfo1.getType(), versionInfo1.getVersion()).join(); assertEquals(schemaInfo2, schemaInfo3); // testing with incorrect input data - getVersionOrdianal will fail