Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 86: Preserve schema field order #87

Merged
merged 14 commits into from
Sep 16, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.pravega.common.util.Retry;
import io.pravega.schemaregistry.ResultPage;
import io.pravega.schemaregistry.common.FuturesUtility;
import io.pravega.schemaregistry.common.HashUtil;
import io.pravega.schemaregistry.common.NameUtil;
import io.pravega.schemaregistry.contract.data.BackwardAndForward;
import io.pravega.schemaregistry.contract.data.CodecType;
Expand Down Expand Up @@ -53,6 +54,7 @@

import javax.annotation.Nullable;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.Collections;
Expand Down Expand Up @@ -267,22 +269,25 @@ public CompletableFuture<VersionInfo> addSchema(String namespace, String group,
// 2. get checker for serialization format.
// validate schema against group compatibility policy on schema
// 3. conditionally update the schema
return RETRY.runAsync(() -> store.getGroupEtag(namespace, group)
.thenCompose(etag ->
store.getGroupProperties(namespace, group)
.thenCompose(prop -> {
return Futures.exceptionallyComposeExpecting(store.getSchemaVersion(namespace, group, schema),
e -> Exceptions.unwrap(e) instanceof StoreExceptions.DataNotFoundException,
() -> { // Schema doesnt exist. Validate and add it
return validateSchema(namespace, group, schema, prop.getCompatibility())
.thenCompose(valid -> {
if (!valid) {
throw new IncompatibleSchemaException(String.format("%s is incompatible", schema.getType()));
}
return store.addSchema(namespace, group, schema, prop, etag);
});
});
})), executor)
return RETRY.runAsync(() ->
store.getGroupEtag(namespace, group)
.thenCompose(etag ->
store.getGroupProperties(namespace, group)
.thenCompose(prop -> {
return Futures.exceptionallyComposeExpecting(store.getSchemaVersion(namespace, group, schema, getFingerprint(schema)),
e -> Exceptions.unwrap(e) instanceof StoreExceptions.DataNotFoundException,
() -> { // Schema doesnt exist. Validate and add it
return validateSchema(namespace, group, schema, prop.getCompatibility())
shiveshr marked this conversation as resolved.
Show resolved Hide resolved
.thenCompose(valid -> {
if (!valid) {
throw new IncompatibleSchemaException(String.format("%s is incompatible", schema.getType()));
}
// we will compute the fingerprint from normalized form.
fpj marked this conversation as resolved.
Show resolved Hide resolved
return store.addSchema(namespace, group, schemaInfo, schema,
getFingerprint(schema), prop, etag);
});
});
})), executor)
.whenComplete((r, e) -> {
if (e == null) {
log.debug("Group {} {}, schema {} added successfully.", namespace, group, schema.getType());
Expand Down Expand Up @@ -491,7 +496,7 @@ public CompletableFuture<VersionInfo> getSchemaVersion(String namespace, String
log.debug("Group {} {}, getSchemaVersion for {}.", namespace, group, schemaInfo.getType());
SchemaInfo schema = normalizeSchemaBinary(schemaInfo);

return store.getSchemaVersion(namespace, group, schema)
return store.getSchemaVersion(namespace, group, schema, getFingerprint(schema))
.whenComplete((r, e) -> {
if (e == null) {
log.debug("Group {} {}, version = {}.", namespace, group, r);
Expand Down Expand Up @@ -855,7 +860,8 @@ private SchemaInfo normalizeSchemaBinary(SchemaInfo schemaInfo) {
// in alphabetical order. This ensures that identical schemas with different order of fields are
// treated to be equal.
JsonNode jsonNode = OBJECT_MAPPER.readTree(schemaString);
schemaBinary = ByteBuffer.wrap(OBJECT_MAPPER.writeValueAsString(jsonNode).getBytes(Charsets.UTF_8));
Object obj = OBJECT_MAPPER.treeToValue(jsonNode, Object.class);
schemaBinary = ByteBuffer.wrap(OBJECT_MAPPER.writeValueAsString(obj).getBytes(Charsets.UTF_8));
break;
case Any:
break;
Expand Down Expand Up @@ -968,11 +974,15 @@ public CompletableFuture<Map<String, VersionInfo>> getSchemaReferences(String na
return store.getGroupsUsing(namespace, schema)
.thenCompose(groups -> Futures.allOfWithResults(
groups.stream().collect(Collectors.toMap(x -> x, x ->
Futures.exceptionallyExpecting(store.getSchemaVersion(namespace, x, schema),
Futures.exceptionallyExpecting(store.getSchemaVersion(namespace, x, schema, getFingerprint(schema)),
e -> Exceptions.unwrap(e) instanceof StoreExceptions.DataNotFoundException, EMPTY_VERSION))))
.thenApply(result -> {
return result.entrySet().stream().filter(x -> !x.getValue().equals(EMPTY_VERSION))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}));
}

private BigInteger getFingerprint(SchemaInfo schemaInfo) {
return HashUtil.getFingerprint(schemaInfo.getSchemaData().array());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.pravega.schemaregistry.contract.data.VersionInfo;

import javax.annotation.Nullable;
import java.math.BigInteger;
import java.util.List;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -235,21 +236,27 @@ public interface SchemaStore {
* @param namespace namespace
* @param group group
* @param schemaInfo schema to add
* @param normalized normalized form of schema to add.
* @param fingerprint 256 bit sha hash of schema binary.
* Two schema binary representation will be considered identical if their fingerprints match.
* @param prop group properties applied at the time of schema addition.
* @param etag entity tag for the group.
* @return Completablefuture that holds version info for the schema that is added.
*/
CompletableFuture<VersionInfo> addSchema(String namespace, String group, SchemaInfo schemaInfo, GroupProperties prop, Etag etag);
CompletableFuture<VersionInfo> addSchema(String namespace, String group, SchemaInfo schemaInfo, SchemaInfo normalized,
BigInteger fingerprint, GroupProperties prop, Etag etag);

/**
* Get the version corresponding to the schema.
*
* @param namespace namespace
* @param group group
* @param schemaInfo schemainfo
* @param fingerprint 256 bit sha hash of schema binary.
* Two schema binary representation will be considered identical if their fingerprints match.
* @return Completablefuture that holds versioninfo for the schema.
*/
CompletableFuture<VersionInfo> getSchemaVersion(String namespace, String group, SchemaInfo schemaInfo);
CompletableFuture<VersionInfo> getSchemaVersion(String namespace, String group, SchemaInfo schemaInfo, BigInteger fingerprint);

/**
* Get the encoding id corresponding to versioninfo and codectype. It returns Etag for the group if the encoding id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.pravega.schemaregistry.storage.impl.schemas.Schemas;

import javax.annotation.Nullable;
import java.math.BigInteger;
import java.util.List;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -130,14 +131,16 @@ public CompletableFuture<SchemaWithVersion> getLatestSchemaVersion(String namesp
}

@Override
public CompletableFuture<VersionInfo> addSchema(String namespace, String groupId, SchemaInfo schemaInfo, GroupProperties prop, Etag etag) {
return schemas.addSchema(schemaInfo, namespace, groupId)
.thenCompose(v -> getGroup(namespace, groupId).thenCompose(grp -> grp.addSchema(schemaInfo, prop, etag)));
public CompletableFuture<VersionInfo> addSchema(String namespace, String groupId, SchemaInfo schemaInfo, SchemaInfo normalized,
BigInteger fingerprint, GroupProperties prop, Etag etag) {
// Store normalized form of schema with the global schemas while the original form is stored within the group.
fpj marked this conversation as resolved.
Show resolved Hide resolved
return schemas.addSchema(normalized, namespace, groupId)
.thenCompose(v -> getGroup(namespace, groupId).thenCompose(grp -> grp.addSchema(schemaInfo, fingerprint, prop, etag)));
}

@Override
public CompletableFuture<VersionInfo> getSchemaVersion(String namespace, String groupId, SchemaInfo schemaInfo) {
return getGroup(namespace, groupId).thenCompose(grp -> grp.getVersion(schemaInfo));
public CompletableFuture<VersionInfo> getSchemaVersion(String namespace, String groupId, SchemaInfo schemaInfo, BigInteger fingerprint) {
return getGroup(namespace, groupId).thenCompose(grp -> grp.getVersion(schemaInfo, fingerprint));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.pravega.common.util.ByteArraySegment;
import io.pravega.common.util.Retry;
import io.pravega.schemaregistry.common.Either;
import io.pravega.schemaregistry.common.HashUtil;
import io.pravega.schemaregistry.contract.data.CodecType;
import io.pravega.schemaregistry.contract.data.Compatibility;
import io.pravega.schemaregistry.contract.data.EncodingId;
Expand All @@ -43,7 +42,6 @@

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
Expand Down Expand Up @@ -362,8 +360,7 @@ private CompletableFuture<SchemaInfo> getSchema(int id, boolean throwOnDeleted)
});
}

public CompletableFuture<VersionInfo> getVersion(SchemaInfo schemaInfo) {
BigInteger fingerprint = getFingerprint(schemaInfo);
public CompletableFuture<VersionInfo> getVersion(SchemaInfo schemaInfo, BigInteger fingerprint) {
SchemaFingerprintKey key = new SchemaFingerprintKey(fingerprint);

return groupTable.getEntry(key, SchemaVersionList.class)
Expand Down Expand Up @@ -521,10 +518,10 @@ public CompletableFuture<List<GroupHistoryRecord>> getHistory(String type) {
.collect(Collectors.toList()));
}

public CompletableFuture<VersionInfo> addSchema(SchemaInfo schemaInfo, GroupProperties prop, Etag etag) {
public CompletableFuture<VersionInfo> addSchema(SchemaInfo schemaInfo, BigInteger fingerprint, GroupProperties prop, Etag etag) {
List<TableKey> keys = new ArrayList<>();
keys.add(LATEST_SCHEMAS_KEY);
SchemaFingerprintKey schemaFingerprintKey = new SchemaFingerprintKey(getFingerprint(schemaInfo));
SchemaFingerprintKey schemaFingerprintKey = new SchemaFingerprintKey(fingerprint);
keys.add(schemaFingerprintKey);

// add or upadte following entries:
Expand Down Expand Up @@ -653,10 +650,6 @@ public CompletableFuture<GroupProperties> getGroupProperties() {
});
}

private BigInteger getFingerprint(SchemaInfo schemaInfo) {
return HashUtil.getFingerprint(schemaInfo.getSchemaData().array());
}

private CompletableFuture<EncodingId> generateNewEncodingId(VersionInfo versionInfo, String codecType, Etag etag) {
return getSchema(versionInfo.getId(), true)
.thenCompose(schema -> getCodecTypeNames()
Expand Down Expand Up @@ -703,14 +696,21 @@ public CompletableFuture<Either<EncodingId, Etag>> getEncodingId(VersionInfo ver
private CompletableFuture<VersionInfo> findVersion(List<VersionInfo> versions, SchemaInfo toFind) {
AtomicReference<VersionInfo> found = new AtomicReference<>();
Iterator<VersionInfo> iterator = versions.iterator();
return Futures.loop(() -> {
return iterator.hasNext() && found.get() == null;
}, () -> {
return Futures.loop(() -> iterator.hasNext() && found.get() == null, () -> {
VersionInfo version = iterator.next();
return Futures.exceptionallyExpecting(getSchema(version.getId(), true)
.thenAccept(schema -> {
if (Arrays.equals(schema.getSchemaData().array(), toFind.getSchemaData().array())
&& schema.getType().equals(toFind.getType()) && schema.getSerializationFormat().equals(toFind.getSerializationFormat())) {
// Do note that we store the user supplied schema in its original avatar. While the fingerprint
// is computed on the normalized form. So when we fetch the schemas that have identical fingerprints,
// we will only compare type name and format and declare two schemas equal.
// We can do this with fair confidence because we use the sha 256 hash for fingerprints.
// The probability of collision on two non identical byte arrays to produce same fingerprint
// is next to impossible.
// However, since we also deal with composite schemas (e.g. protobuf file descriptor set or avro union)
// where same schema could include definition for multiple objects and the type name distinguishes
// different entities, we will still need to compare type and format if the schema binary has identical
// fingerprint before we consider two schemas to be identical.
if (schema.getType().equals(toFind.getType()) && schema.getSerializationFormat().equals(toFind.getSerializationFormat())) {
found.set(version);
}
}), e -> Exceptions.unwrap(e) instanceof StoreExceptions.DataNotFoundException, null);
Expand Down
Loading