Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancing Schema Version Management with Tag-Based Lookup Feature #318

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,32 @@ repository for the latest support: [Avro SerializationSchema and Deserialization
properties);
DataStream<GenericRecord> stream = env.addSource(consumer);
```

## Tag-Based Schema Version Lookup
We have introduced a tag-based lookup feature to efficiently retrieve schema versions in AWS Glue. This allows for faster retrieval of schema versions by utilizing metadata tags associated with the schemas.

How it works:
When a schema is registered, it can be tagged with metadata.
During retrieval, if the schema version isn't found in the cache, the system will attempt to find the schema using the specified metadata tags provided by
tagBasedLookupKeyName and tagBasedLookupKeyValue parameter.

This reduces the need for extensive lookups, optimizing the schema retrieval process.
Configuration:
To enable tag-based lookup:

Set useTagBasedLookup to true in your configuration.

Use tagBasedLookupKeyName and tagBasedLookupKeyValue to search for schema version in the scheams with the following tags
if tagBasedLookupKeyName is tagLookUp and tagBasedLookupKeyValue is yes, then this needs to be stored at the schema level.
![img.png](images/schema_level_tag.png)

Provide the metadataTagKeyName with original schema version uuid, which will be used as the lookup key.
![img.png](images/schema_version_tag.png)

![img.png](images/schema_version_key.png)


Please note that proper and correct tagging of schemas is essential for this feature to work efficiently.


## Security issue notifications
If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). Please do **not** create a public github issue.
4 changes: 4 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>glue</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>resourcegroupstaggingapi</artifactId>
</dependency>
<dependency>
<groupId>${parent.groupId}</groupId>
<artifactId>schema-registry-build-tools</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@
import software.amazon.awssdk.services.glue.model.GetTagsRequest;
import software.amazon.awssdk.services.glue.model.GetTagsResponse;
import software.amazon.awssdk.services.glue.model.GlueRequest;
import software.amazon.awssdk.services.glue.model.GlueException;
import software.amazon.awssdk.services.glue.model.MetadataKeyValuePair;
import software.amazon.awssdk.services.glue.model.MetadataInfo;
import software.amazon.awssdk.services.glue.model.ListSchemaVersionsResponse;
import software.amazon.awssdk.services.glue.model.ListSchemaVersionsRequest;
import software.amazon.awssdk.services.glue.model.PutSchemaVersionMetadataRequest;
import software.amazon.awssdk.services.glue.model.PutSchemaVersionMetadataResponse;
import software.amazon.awssdk.services.glue.model.QuerySchemaVersionMetadataRequest;
Expand All @@ -56,10 +60,17 @@
import software.amazon.awssdk.services.glue.model.RegisterSchemaVersionResponse;
import software.amazon.awssdk.services.glue.model.RegistryId;
import software.amazon.awssdk.services.glue.model.SchemaId;

import software.amazon.awssdk.services.glue.model.SchemaVersionListItem;
import software.amazon.awssdk.services.resourcegroupstaggingapi.ResourceGroupsTaggingApiClient;
import software.amazon.awssdk.services.resourcegroupstaggingapi.model.GetResourcesRequest;
import software.amazon.awssdk.services.resourcegroupstaggingapi.model.GetResourcesResponse;
import software.amazon.awssdk.services.resourcegroupstaggingapi.model.TagFilter;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.StringJoiner;
import java.util.UUID;

Expand All @@ -75,6 +86,8 @@ public class AWSSchemaRegistryClient {
private final GlueClient client;
private GlueSchemaRegistryConfiguration glueSchemaRegistryConfiguration;

private ResourceGroupsTaggingApiClient taggingApiClient;

/**
* Create Amazon Schema Registry Client.
*
Expand Down Expand Up @@ -288,7 +301,7 @@ public UUID registerSchemaVersion(String schemaDefinition, String schemaName, St
* @param schemaDefinition Schema Definition
* @param schemaName Schema Name
* @param dataFormat Data Format
* @return GetSchemaVersionResponse object.
* @return GetSchemaVersionResponse object.
* @throws AWSSchemaRegistryException on any error during the registration and fetching of schema version
*/
public GetSchemaVersionResponse registerSchemaVersion(String schemaDefinition, String schemaName, String dataFormat) throws AWSSchemaRegistryException {
Expand Down Expand Up @@ -569,4 +582,97 @@ private String buildUserAgentSuffix() {
return userAgentSuffix.toString();
}
}
/**
* Retrieves an instance of the ResourceGroupsTaggingApiClient.
*
* <p>
* This method lazily initializes the taggingApiClient if it hasn't been created yet.
* If a proxy URL is provided in the glueSchemaRegistryConfiguration, the client is configured
* to use that proxy for its HTTP connections.
* </p>
*
* @return the ResourceGroupsTaggingApiClient instance
*/

private ResourceGroupsTaggingApiClient getTaggingApiClient() {
if (taggingApiClient == null) {
UrlConnectionHttpClient.Builder urlConnectionHttpClientBuilder = UrlConnectionHttpClient.builder();

if (glueSchemaRegistryConfiguration.getProxyUrl() != null) {
log.debug("Creating http client using proxy {}", glueSchemaRegistryConfiguration.getProxyUrl().toString());
ProxyConfiguration proxy = ProxyConfiguration.builder().endpoint(glueSchemaRegistryConfiguration.getProxyUrl()).build();
urlConnectionHttpClientBuilder.proxyConfiguration(proxy);
}

taggingApiClient = ResourceGroupsTaggingApiClient.builder()
.httpClient(urlConnectionHttpClientBuilder.build())
.build();
}
return taggingApiClient;
}


/**
* Retrieve all schema ARNs based on the provided tag key and tag value.
* @param tagKey The tag key to look up.
* @param tagValue The tag value to match.
* @return List of schema ARNs
*/
public List<String> getSchemasWithTag(String tagKey, String tagValue) {
ResourceGroupsTaggingApiClient taggingApiClient = getTaggingApiClient();
List<String> schemaArns = new ArrayList<>();

TagFilter tagFilter = TagFilter.builder()
.key(tagKey)
.values(tagValue)
.build();

GetResourcesRequest request = GetResourcesRequest.builder()
.tagFilters(tagFilter)
.build();

GetResourcesResponse response = taggingApiClient.getResources(request);

response.resourceTagMappingList().forEach(resourceTagMapping -> {
schemaArns.add(resourceTagMapping.resourceARN());
});

return schemaArns;
}


/**
* List all schema versions for a given schema name.
*
* @param schemaVersionIdTagKey tag key name, schemaId, schemaVersionId
* @return schemaVersionId matched with the tag key name
* @throws AWSSchemaRegistryException on any error during the listing of schema versions
*/

public UUID filterSchemaVersionByTag(String schemaVersionIdTagKey, SchemaId schemaId, UUID schemaVersionId) {
try {
ListSchemaVersionsRequest listSchemaVersionsRequest = ListSchemaVersionsRequest.builder()
.schemaId(schemaId)
.build();

ListSchemaVersionsResponse listSchemaVersionsResponse = client.listSchemaVersions(listSchemaVersionsRequest);

for (SchemaVersionListItem schemaVersion : listSchemaVersionsResponse.schemas()) {
QuerySchemaVersionMetadataRequest metadataRequest = QuerySchemaVersionMetadataRequest.builder()
.schemaVersionId(schemaVersion.schemaVersionId())
.build();

QuerySchemaVersionMetadataResponse metadataResponse = client.querySchemaVersionMetadata(metadataRequest);
Map<String, MetadataInfo> metadataMap = metadataResponse.metadataInfoMap();
MetadataInfo schemaIdMetadata = metadataMap.get(schemaVersionIdTagKey);
if (schemaIdMetadata != null && schemaIdMetadata.metadataValue().equals(schemaVersionId.toString())) {
return UUID.fromString(schemaVersion.schemaVersionId());
}
}
return null;
} catch (GlueException e) {
String errorMessage = "Error occurred while filtering schema version by tag: " + e.getMessage();
throw new RuntimeException(errorMessage, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public class GlueSchemaRegistryConfiguration {
private Map<String, String> metadata;
private String secondaryDeserializer;
private URI proxyUrl;
// for metadata based key lookup purpose
private boolean useTagBasedLookup; // this is to be initialized first
private String metadataTagKeyName;
private String tagBasedLookupKeyName;
private String tagBasedLookupKeyValue;

/**
* Name of the application using the serializer/deserializer.
Expand Down Expand Up @@ -104,6 +109,10 @@ private void buildSchemaRegistryConfigs(Map<String, ?> configs) {
validateAndSetUserAgent(configs);
validateAndSetSecondaryDeserializer(configs);
validateAndSetProxyUrl(configs);
validateAndSetUseTagBasedLookup(configs);
validateAndSetMetadataTagKeyName(configs);
validateAndSetSchemaLookupTagKeyName(configs);
validateAndSetSchemaLookupTagKeyValue(configs);
}

private void validateAndSetSecondaryDeserializer(Map<String, ?> configs) {
Expand Down Expand Up @@ -142,7 +151,7 @@ private boolean validateCompressionType(String compressionType) {
if (!EnumUtils.isValidEnum(AWSSchemaRegistryConstants.COMPRESSION.class, compressionType.toUpperCase())) {
String errorMessage =
String.format("Invalid Compression type : %s, Accepted values are : %s", compressionType,
AWSSchemaRegistryConstants.COMPRESSION.values());
AWSSchemaRegistryConstants.COMPRESSION.values());
throw new AWSSchemaRegistryException(errorMessage);
}
return true;
Expand Down Expand Up @@ -323,6 +332,45 @@ private void validateAndSetJacksonDeserializationFeatures(Map<String, ?> configs
}
}

private void validateAndSetUseTagBasedLookup(Map<String, ?> configs) {
if (isPresent(configs, "useTagBasedLookup")) {
this.useTagBasedLookup = Boolean.parseBoolean(configs.get("useTagBasedLookup").toString());
} else {
this.useTagBasedLookup = false; // default value
}
}

private void validateAndSetMetadataTagKeyName(Map<String, ?> configs) {
if (useTagBasedLookup) {
if (isPresent(configs, "metadataTagKeyName")) {
this.metadataTagKeyName = (String) configs.get("metadataTagKeyName");
} else {
throw new AWSSchemaRegistryException("Metadata tag key name is required when tag based schemaVersionId lookup is enabled");
}
}
}

private void validateAndSetSchemaLookupTagKeyName(Map<String, ?> configs) {
if (useTagBasedLookup) {
if (isPresent(configs, "schemaLookUpTagName")) {
this.tagBasedLookupKeyName = (String) configs.get("schemaLookUpTagName");
} else {
throw new AWSSchemaRegistryException("schemaLookUpTagName is required when tag based schemaVersionId lookup is enabled. Set this tag to reduce schema lookups");
}
}
}


private void validateAndSetSchemaLookupTagKeyValue(Map<String, ?> configs) {
if (useTagBasedLookup) {
if (isPresent(configs, "schemaLookUpTagValue")) {
this.tagBasedLookupKeyValue = (String) configs.get("schemaLookUpTagValue");
} else {
throw new AWSSchemaRegistryException("Metadata tag key name and value are used to reduce schame lookups");
}
}
}

private boolean isPresent(Map<String, ?> configs,
String key) {
if (!GlueSchemaRegistryUtils.getInstance()
Expand Down
Binary file added images/schema_level_tag.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/schema_version_key.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/schema_version_tag.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient;
import com.amazonaws.services.schemaregistry.common.Schema;
import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration;
import com.amazonaws.services.schemaregistry.exception.GlueSchemaRegistryIncompatibleDataException;
import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
import com.amazonaws.services.schemaregistry.exception.GlueSchemaRegistryIncompatibleDataException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
Expand All @@ -34,10 +34,12 @@
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.glue.model.DataFormat;
import software.amazon.awssdk.services.glue.model.GetSchemaVersionResponse;
import software.amazon.awssdk.services.glue.model.SchemaId;

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -251,11 +253,50 @@ private static class AwsDeserializerSchema {
}

private class GlueSchemaRegistryDeserializationCacheLoader extends CacheLoader<UUID, Schema> {
Boolean isUseTagBasedLookup = glueSchemaRegistryConfiguration.isUseTagBasedLookup();
@Override
public Schema load(UUID schemaVersionId) {
GetSchemaVersionResponse response =
schemaRegistryClient.getSchemaVersionResponse(schemaVersionId.toString());
return new Schema(response.schemaDefinition(), response.dataFormat().name(), getSchemaName(response.schemaArn()));
public Schema load(UUID schemaVersionId) throws Exception {
try {
GetSchemaVersionResponse response = schemaRegistryClient.getSchemaVersionResponse(schemaVersionId.toString());
return new Schema(response.schemaDefinition(), response.dataFormat().name(), getSchemaName(response.schemaArn()));
} catch (Exception e) {
if (e instanceof AWSSchemaRegistryException && isUseTagBasedLookup) {
String schemaVersionIdTagKey = glueSchemaRegistryConfiguration.getMetadataTagKeyName();
String enableTagBasedLookupKey = glueSchemaRegistryConfiguration.getTagBasedLookupKeyName();
String enableTagBasedLookupValue = glueSchemaRegistryConfiguration.getTagBasedLookupKeyValue();
log.info("Schema information not found in cache, trying to find using metadata key "
+ schemaVersionIdTagKey + " in schema with tag " + enableTagBasedLookupKey + " with tag value " + enableTagBasedLookupValue );
List<String> schemaArns = schemaRegistryClient.getSchemasWithTag(enableTagBasedLookupKey, enableTagBasedLookupValue);

UUID tagSchemaVersionId = null;
for (String schemaArn : schemaArns) {
SchemaId schemaId = SchemaId.builder()
.schemaArn(schemaArn)
.build();
log.info("Trying to find schema UUID " + schemaVersionId + " in schema " + schemaArn);
tagSchemaVersionId = schemaRegistryClient.filterSchemaVersionByTag(schemaVersionIdTagKey, schemaId, schemaVersionId);
if (tagSchemaVersionId != null) {
break;
}
}

if (tagSchemaVersionId == null) {
throw new AWSSchemaRegistryException("Schema version id " + schemaVersionId + " not found");
}

if (!tagSchemaVersionId.equals(schemaVersionId)) {
Schema schema = cache.get(tagSchemaVersionId);
cache.put(schemaVersionId, schema);
log.info("Schema information stored in cache for " + schemaVersionIdTagKey + " for schemaVersionId " + schemaVersionId);
return schema;
} else {
GetSchemaVersionResponse tagResponse = schemaRegistryClient.getSchemaVersionResponse(schemaVersionId.toString());
return new Schema(tagResponse.schemaDefinition(), tagResponse.dataFormat().name(), getSchemaName(tagResponse.schemaArn()));
}
} else {
throw e;
}
}
}
}
}
Loading